Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split out manifests for smaller "coordinate" arrays #539

Open
dcherian opened this issue Jan 3, 2025 · 6 comments
Open

split out manifests for smaller "coordinate" arrays #539

dcherian opened this issue Jan 3, 2025 · 6 comments

Comments

@dcherian
Copy link
Contributor

dcherian commented Jan 3, 2025

One of our ideas to reduce time to open a dataset with Xarray is to create a separate manifest files for "coordinate" arrays that are commonly loaded in to memory eagerly.

Here is a survey of some datasets with coordinate arrays on the large end of the spectrum (the values for MPAS are an estimate)

Name Uncomp Size Numel Dtype N Chunks Shape Chunk Shape
ARCO ERA5 time 10MB 1.3M int64 20 (1336320,) (67706,)
HRRR 2D lat,lon 10MB 1.9M float64 1 (1059, 1799) (1059, 1799)
landsat datacube x 10MB 1.3M float64 1 (1336320,) (1336320,)
landsat datacube y 5MB 0.6M float64 1 (668160,)
Sentinel datacube 1.3MB 0.2M float64 1
LLC4320 XC 970MB 18.6M float64 13 (13, 4320, 4320) (1, 4320, 4320)
LLC4320 all grid vars 16GB 254
3.75km MPAS edge_face_connectivity 1.9GB 0.2B int64 150 [12MiB] (125829120, 2)

If we want a manifest file of size 1MiB, and our default inline_chunk_threshold_bytes is 512 bytes, then we get to store max 2048 chunks (assuming they are all inlined).

So one no-config approach would be to:

  1. take all the array metadata (combining the snapshot & changeset),
  2. sort arrays by number of chunks, and
  3. split it so that the first <=~2048 chunks get written to one manifest, and then put the rest in another manifest (or split this using other heuristics). We can be smart about splitting so that no array's chunks are split between the first and second manifest.

This ignores a bunch of complexity:

  1. Sparsity of chunks (minor IMO)
  2. underestimates number of chunks we could stick in there (IMO probably OK for a first pass)
  3. If one builds a dataset using incrementally appends and commits after every append, we may end up rewriting the manifest for the first few appends. (this is OK IMO)

Thoughts?

@dcherian
Copy link
Contributor Author

dcherian commented Jan 3, 2025

And here is some python code to test out this idea on example datasets:

import xarray

def partition_manifest(ds: xarray.Dataset):
    import math
    import operator
    from itertools import dropwhile, takewhile

    from toolz import accumulate

    THRESHOLD_CHUNKS = 2048
    
    allvars: dict[str, int] = {}
    for name, var in ds.variables.items():
        # we know this from Zarr metadata
        nchunks = math.prod(math.ceil(s/c) for s, c in zip(var.shape, var.encoding["chunks"], strict=True))
        allvars[name] = nchunks

    allvars = dict(sorted(allvars.items(), key=lambda x: x[1]))
    accumulated = tuple(accumulate(operator.add, allvars.values()))
    
    threshold_filter = lambda tup: tup[1] < THRESHOLD_CHUNKS
    first = lambda tup: next(iter(tup))
    small = dict(map(first, takewhile(threshold_filter, zip(allvars.items(), accumulated, strict=True))))
    big   = dict(map(first, dropwhile(threshold_filter, zip(allvars.items(), accumulated, strict=True))))
    assert not set(small) & set(big)
    print(f"\nsmall: {sum(small.values())} chunks", tuple(small), f"\nbig: {sum(big.values())} chunks over {len(big)} arrays")
grid = xr.open_zarr("gs://pangeo-ecco-llc4320/grid", chunks={})
partition_manifest(grid)
# small: 253 chunks ('PHrefC', 'PHrefF', 'Z', 'Zl', 'Zp1', 'Zu', 'drC', 'drF', 'face', 'i', 'i_g', 'iter', 'j', 'j_g', 'k', 'k_l', 'k_p1', 'k_u',
# 'time', 'CS', 'Depth', 'SN', 'XC', 'XG', 'YC', 'YG', 'dxC', 'dxG', 'dyC', 'dyG', 'hFacC', 'hFacS', 'hFacW', 'rA', 'rAs', 'rAw', 'rAz') 
ds = xr.open_zarr("gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3", chunks=None, decode_cf=False)
partition_manifest(ds)
# small: 23 chunks ('latitude', 'level', 'longitude', 'time') 
# big: 361355904 chunks over 273 arrays
import arraylake as al

client = al.Client()
hrrr = client.get_repo("earthmover-demos/hrrr").to_xarray("solar")
gfs = client.get_repo("earthmover-demos/gfs").to_xarray("solar")

partition_manifest(hrrr)
partition_manifest(gfs)
# small: 54 chunks ('latitude', 'longitude', 'spatial_ref', 'step', 'x', 'y', 'time') 
# big: 4589460 chunks over 6 arrays
# small: 11 chunks ('latitude', 'longitude', 'step', 'time') 
# big: 1189440 chunks over 5 arrays

@paraseba
Copy link
Contributor

paraseba commented Jan 7, 2025

I really like this @dcherian . It's simple and it will probably make a big difference for interactive use cases. I wonder what are the things we can generalize. Examples:

  • Maybe we can generate multiple small-ish manifests if there are many small arrays.
  • Maybe for the large coordinate arrays we still want to have a separate manifest? That would help with interactive use cases without having to download massive manifests? Can we identify what are coordinate arrays?

@rabernat
Copy link
Contributor

rabernat commented Jan 7, 2025

If we want a manifest file of size 1MiB

Where does this come from? It seems like a very important number. I guess is is the size of a file we expect to be able to fetch from S3 quickly using a single thread?

According to our model, the fetching time should be T(n) = T0 + n / B0 =~ 10 ms + 1MB / 100 MB/s =~ 20ms. By this math, we could get away with 4 MB and still stay at around 50 ms.

3. If one builds a dataset using incrementally appends and commits after every append, we may end up rewriting the manifest for the first few appends. (this is OK IMO)

Agree it's okay to rewrite this file with every commit.


Could you try running the sample code on some big EO data cubes? Like something from Sylvera? Or https://app.earthmover.io/earthmover-demos/sentinel-datacube-South-America-3

@rabernat
Copy link
Contributor

rabernat commented Jan 7, 2025

then we get to store max 2048 chunks (assuming they are all inlined).

It seems like we should also check this assumption for common datasets. It's unclear to me how much the logic you're proposing here depends on the assumption on inline chunks.

@rabernat
Copy link
Contributor

rabernat commented Jan 7, 2025

Last comment about inlining:

Are the cases where it would make sense to actually rechunk the coordinate data before storing it? Like for this dataset:

# small: 23 chunks ('latitude', 'level', 'longitude', 'time')

Why should we store 23 chunks? Why not just 4? As long as we are inlining, there is not really any benefit to chunking. In the scenario where we recunk, we might exceed the 512 byte inline threshold, but we would still be storing less data total, since we would only have one "big" chunk.

Put differently, is there ever a good reason to chunk an array if it is being inlined into a single manifest?

@dcherian
Copy link
Contributor Author

dcherian commented Jan 7, 2025

Maybe we can generate multiple small-ish manifests if there are many small arrays.
Maybe for the large coordinate arrays we still want to have a separate manifest?

I think this really is an issue about how to split the remaining arrays, that is we should make sure manifests for big arrays (big in terms of number of chunks) are stored separately. I think that can be a followup.

Can we identify what are coordinate arrays?

We should ask: what are the characteristics of "coordinate arrays"? From my survey they seem to be arrays that are "generally" small in terms of number of chunks, and empirically we can make good decisions without needing the user to annotate their data.

I guess is is the size of a file we expect to be able to fetch from S3 quickly using a single thread? By this math, we could get away with 4 MB and still stay at around 50 ms.

Yes we want to be in the "latency range" so <8MB.

Could you try running the sample code on some big EO data cubes? Like something from Sylvera? Or app.earthmover.io/earthmover-demos/sentinel-datacube-South-America-3

These are the landsat examples in the table; they all have single chunks and will be easily detected.

It's unclear to me how much the logic you're proposing here depends on the assumption on inline chunks.

It does not depend at all on inlining. Using the inline threshold gives us a worst-case-maximum estimate of number of chunks in a single manifest given the chosen size threshold.

Why should we store 23 chunks?

This is how that dataset was created. My only goal here was to see if there is a no-config way to detect these "coordinate" variables in real world scenarios. I think there is.

In the scenario where we recunk, we might exceed the 512 byte inline threshold, but we would still be storing less data total, since we would only have one "big" chunk.

I think this is all orthogonal to the idea here, which is to simply split out a "small" manifest and a "big manifest". Other optimizations can be explored later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: No status
Development

No branches or pull requests

3 participants