diff --git a/kerchunk/combine.py b/kerchunk/combine.py index 376a8003..01dbca26 100644 --- a/kerchunk/combine.py +++ b/kerchunk/combine.py @@ -203,10 +203,11 @@ def append( target_options=target_options, asynchronous=True ) + store = fs_as_store(fs) ds = xr.open_dataset( - fs.get_mapper(), engine="zarr", backend_kwargs={"consolidated": False} + store, engine="zarr", backend_kwargs={"consolidated": False} ) - z = zarr.open(fs.get_mapper(), zarr_format=2) + z = zarr.open_group(store, zarr_format=2, use_consolidated=False) mzz = MultiZarrToZarr( path, out=fs.references, # dict or parquet/lazy @@ -373,8 +374,8 @@ def first_pass(self): fs._dircache_from_items() logger.debug("First pass: %s", i) - z_store = fs_as_store(fs, read_only=False) - z = zarr.open_group(z_store, zarr_format=2) + z_store = fs_as_store(fs, read_only=True) + z = zarr.open_group(z_store, mode="r", zarr_format=2, use_consolidated=False) for var in self.concat_dims: value = self._get_value(i, z, var, fn=self._paths[i]) if isinstance(value, np.ndarray): @@ -399,11 +400,13 @@ def store_coords(self): """ Write coordinate arrays into the output """ + # group to write new refs to; backed by kv dict kv = {} - store = zarr.storage.MemoryStore(kv) - group = zarr.open_group(store, zarr_format=2) - m = fs_as_store(self.fss[0], read_only=False) - z = zarr.open(m, zarr_format=2) + store = zarr.storage.MemoryStore(store_dict=kv) + group = zarr.open_group(store, mode="w", zarr_format=2, use_consolidated=False) + # group to read coords from + m = fs_as_store(self.fss[0], read_only=True) + z = zarr.open_group(m, zarr_format=2, mode="r", use_consolidated=False) for k, v in self.coos.items(): if k == "var": # The names of the variables to write in the second pass, not a coordinate @@ -454,6 +457,7 @@ def store_coords(self): else: arr.attrs.update(self.cf_units[k]) # TODO: rewrite .zarray/.zattrs with ujson to save space. Maybe make them by hand anyway. + translate_refs_serializable(kv) self.out.update(kv) logger.debug("Written coordinates") @@ -474,8 +478,8 @@ def second_pass(self): for i, fs in enumerate(self.fss): to_download = {} - m = fs_as_store(fs, read_only=False) - z = zarr.open(m, zarr_format=2) + m = fs_as_store(fs, read_only=True) + z = zarr.open_group(m, zarr_format=2, mode="r", use_consolidated=False) if no_deps is None: # done first time only diff --git a/kerchunk/tests/test_combine.py b/kerchunk/tests/test_combine.py index 0cfb9505..37e841f8 100644 --- a/kerchunk/tests/test_combine.py +++ b/kerchunk/tests/test_combine.py @@ -9,6 +9,7 @@ import kerchunk.combine from kerchunk.zarr import single_zarr from kerchunk.combine import MultiZarrToZarr +from kerchunk.utils import refs_as_store fs = fsspec.filesystem("memory") arr = np.random.rand(1, 10, 10) @@ -19,28 +20,28 @@ dims=["x", "y"], name="data", ) -xr.Dataset({"data": data}, attrs={"attr0": 3}).to_zarr("memory://simple1.zarr") +xr.Dataset({"data": data}, attrs={"attr0": 3}).to_zarr("memory://simple1.zarr", zarr_format=2) data = xr.DataArray( data=arr.squeeze() + 1, dims=["x", "y"], name="data", ) -xr.Dataset({"data": data}, attrs={"attr0": 4}).to_zarr("memory://simple2.zarr") +xr.Dataset({"data": data}, attrs={"attr0": 4}).to_zarr("memory://simple2.zarr", zarr_format=2) data = xr.DataArray( data=arr.squeeze(), dims=["x", "y"], name="datum", ) -xr.Dataset({"datum": data}, attrs={"attr0": 3}).to_zarr("memory://simple_var1.zarr") +xr.Dataset({"datum": data}, attrs={"attr0": 3}).to_zarr("memory://simple_var1.zarr", zarr_format=2) data = xr.DataArray( data=arr.squeeze() + 1, dims=["x", "y"], name="datum", ) -xr.Dataset({"datum": data}, attrs={"attr0": 4}).to_zarr("memory://simple_var2.zarr") +xr.Dataset({"datum": data}, attrs={"attr0": 4}).to_zarr("memory://simple_var2.zarr", zarr_format=2) data = xr.DataArray( data=arr, @@ -50,7 +51,7 @@ attrs={"attr0": 3}, ) xr.Dataset({"data": data, "static": static}, attrs={"attr1": 5}).to_zarr( - "memory://single1.zarr" + "memory://single1.zarr", zarr_format=2 ) data = xr.DataArray( @@ -61,7 +62,7 @@ attrs={"attr0": 4}, ) xr.Dataset({"data": data, "static": static}, attrs={"attr1": 6}).to_zarr( - "memory://single2.zarr" + "memory://single2.zarr", zarr_format=2 ) data = xr.DataArray( @@ -72,7 +73,7 @@ attrs={"attr0": 4}, ) xr.Dataset({"data": data, "static": static}, attrs={"attr1": 6}).to_zarr( - "memory://single3.zarr" + "memory://single3.zarr", zarr_format=2 ) data = xr.DataArray( @@ -82,8 +83,8 @@ name="data", attrs={"attr0": 0}, ) -xr.Dataset({"data": data}).to_zarr("memory://quad_nochunk1.zarr") -xr.Dataset({"data": data}).to_zarr("memory://group1.zarr", group="group") +xr.Dataset({"data": data}).to_zarr("memory://quad_nochunk1.zarr", zarr_format=2) +xr.Dataset({"data": data}).to_zarr("memory://group1.zarr", group="group", zarr_format=2) data = xr.DataArray( data=np.vstack([arr] * 4), @@ -92,8 +93,8 @@ name="data", attrs={"attr0": 0}, ) -xr.Dataset({"data": data}).to_zarr("memory://quad_nochunk2.zarr") -xr.Dataset({"data": data}).to_zarr("memory://group2.zarr", group="group") +xr.Dataset({"data": data}).to_zarr("memory://quad_nochunk2.zarr", zarr_format=2) +xr.Dataset({"data": data}).to_zarr("memory://group2.zarr", group="group", zarr_format=2) data = xr.DataArray( data=da.from_array(np.vstack([arr] * 4), chunks=(1, 10, 10)), @@ -102,7 +103,7 @@ name="data", attrs={"attr0": 0}, ) -xr.Dataset({"data": data}).to_zarr("memory://quad_1chunk1.zarr") +xr.Dataset({"data": data}).to_zarr("memory://quad_1chunk1.zarr", zarr_format=2) data = xr.DataArray( data=da.from_array(np.vstack([arr] * 4), chunks=(1, 10, 10)), @@ -111,7 +112,7 @@ name="data", attrs={"attr0": 0}, ) -xr.Dataset({"data": data}).to_zarr("memory://quad_1chunk2.zarr") +xr.Dataset({"data": data}).to_zarr("memory://quad_1chunk2.zarr", zarr_format=2) data = xr.DataArray( data=da.from_array(np.vstack([arr] * 4), chunks=(2, 10, 10)), @@ -120,7 +121,7 @@ name="data", attrs={"attr0": 0}, ) -xr.Dataset({"data": data}).to_zarr("memory://quad_2chunk1.zarr") +xr.Dataset({"data": data}).to_zarr("memory://quad_2chunk1.zarr", zarr_format=2) data = xr.DataArray( data=da.from_array(np.vstack([arr] * 4), chunks=(2, 10, 10)), @@ -129,11 +130,11 @@ name="data", attrs={"attr0": 0}, ) -xr.Dataset({"data": data}).to_zarr("memory://quad_2chunk2.zarr") +xr.Dataset({"data": data}).to_zarr("memory://quad_2chunk2.zarr", zarr_format=2) # simple time arrays - xarray can't make these! m = fs.get_mapper("time1.zarr") -z = zarr.open(m, mode="w", zarr_format=2) +z = zarr.open_group("memory://time1.zarr", mode="w", zarr_format=2) time1_array = np.array([1], dtype="M8[s]") ar = z.create_array("time", data=time1_array, shape=time1_array.shape) ar.attrs.update({"_ARRAY_DIMENSIONS": ["time"]}) @@ -141,7 +142,7 @@ ar.attrs.update({"_ARRAY_DIMENSIONS": ["time", "x", "y"]}) m = fs.get_mapper("time2.zarr") -z = zarr.open(m, mode="w", zarr_format=2) +z = zarr.open_group("memory://time2.zarr", mode="w", zarr_format=2) time2_array = np.array([2], dtype="M8[s]") ar = z.create_array("time", data=time2_array, shape=time2_array.shape) ar.attrs.update({"_ARRAY_DIMENSIONS": ["time"]}) @@ -156,7 +157,7 @@ dims=["time", "x", "y"], name="data", ) -xr.Dataset({"data": tdata1}).to_zarr("memory://cfstdtime1.zarr") +xr.Dataset({"data": tdata1}).to_zarr("memory://cfstdtime1.zarr", zarr_format=2) fs.pipe( "cfstdtime1.zarr/time/.zattrs", b'{"_ARRAY_DIMENSIONS": ["time"], "units": "seconds since ' @@ -169,7 +170,7 @@ dims=["time", "x", "y"], name="data", ) -xr.Dataset({"data": tdata1}).to_zarr("memory://cfstdtime2.zarr") +xr.Dataset({"data": tdata1}).to_zarr("memory://cfstdtime2.zarr", zarr_format=2) fs.pipe( "cfstdtime2.zarr/time/.zattrs", b'{"_ARRAY_DIMENSIONS": ["time"], "units": "seconds since ' @@ -182,7 +183,7 @@ dims=["time", "x", "y"], name="data", ) -xr.Dataset({"data": tdata1}).to_zarr("memory://cfstdtime3.zarr") +xr.Dataset({"data": tdata1}).to_zarr("memory://cfstdtime3.zarr", zarr_format=2) fs.pipe( "cfstdtime3.zarr/time/.zattrs", b'{"_ARRAY_DIMENSIONS": ["time"], "units": "seconds since ' @@ -197,7 +198,7 @@ name="data", attrs={"units": "months since 1970-01-01", "calendar": "360_day"}, ) -xr.Dataset({"data": tdata1}).to_zarr("memory://cfnontime1.zarr") +xr.Dataset({"data": tdata1}).to_zarr("memory://cfnontime1.zarr", zarr_format=2) fs.pipe( "cfnontime1.zarr/time/.zattrs", b'{"_ARRAY_DIMENSIONS": ["time"], "units": "months since 1970-01-01", "calendar": "360_day"}', @@ -210,7 +211,7 @@ name="data", attrs={"units": "months since 1970-01-01", "calendar": "360_day"}, ) -xr.Dataset({"data": tdata1}).to_zarr("memory://cfnontime2.zarr") +xr.Dataset({"data": tdata1}).to_zarr("memory://cfnontime2.zarr", zarr_format=2) fs.pipe( "cfnontime2.zarr/time/.zattrs", b'{"_ARRAY_DIMENSIONS": ["time"], "units": "months since 1970-01-01", "calendar": "360_day"}', @@ -228,8 +229,9 @@ def refs(): def test_fixture(refs): # effectively checks that single_zarr works assert "single1" in refs - m = fsspec.get_mapper("reference://", fo=refs["single1"], remote_protocol="memory") - g = xr.open_dataset(m, engine="zarr", backend_kwargs={"consolidated": False}) + #m = fsspec.get_mapper("reference://", fo=refs["single1"], remote_protocol="memory") + store = refs_as_store(refs["single1"]) + g = xr.open_dataset(store, engine="zarr", backend_kwargs={"consolidated": False}) assert g.time.values.tolist() == [1] assert (g.data.values == arr).all() assert g.attrs["attr1"] == 5 @@ -274,7 +276,8 @@ def test_get_coos(refs, selector, expected): mzz.first_pass() assert mzz.coos["time"].tolist() == expected mzz.store_coords() - g = zarr.open(mzz.out, zarr_format=2) + store = refs_as_store(mzz.out) + g = zarr.open_group(store, mode="r", zarr_format=2) assert g["time"][:].tolist() == expected assert dict(g.attrs) @@ -605,11 +608,11 @@ def test_chunked(refs, inputs, chunks): group="group" if "group" in inputs[0] else None, ) # TODO: make some assert_eq style function - assert z.time.values.tolist() == [1, 2, 3, 4, 5, 6, 7, 8] - assert z.data.shape == (8, 10, 10) - assert z.data.chunks == chunks + assert z["time"].values.tolist() == [1, 2, 3, 4, 5, 6, 7, 8] + assert z["data"].shape == (8, 10, 10) + assert z["data"].chunks == chunks for i in range(z.data.shape[0]): - assert (z.data[i].values == arr).all() + assert (z["data"][i].values == arr).all() def test_var(refs): @@ -774,9 +777,10 @@ def test_no_inline(refs): """Ensure that inline_threshold=0 disables MultiZarrToZarr checking file size.""" ds = xr.Dataset(dict(x=[1, 2, 3])) ds["y"] = 3 + ds["x"] - store = fsspec.get_mapper("memory://zarr_store") - ds.to_zarr(store, mode="w", consolidated=False) - ref = kerchunk.utils.consolidate(store) + kv = {} + store = zarr.storage.MemoryStore(store_dict=kv) + ds.to_zarr(store, mode="w", consolidated=False, zarr_format=2) + ref = kerchunk.utils.consolidate(kv) # This type of reference with no offset or total size is produced by # kerchunk.zarr.single_zarr or equivalently ZarrToZarr.translate. ref["refs"]["y/0"] = ["file:///tmp/some/data-that-shouldnt-be-accessed"] diff --git a/kerchunk/utils.py b/kerchunk/utils.py index b8a53e3c..1f7e0258 100644 --- a/kerchunk/utils.py +++ b/kerchunk/utils.py @@ -48,7 +48,12 @@ def refs_as_store( def is_zarr3(): """Check if the installed zarr version is version 3""" - return Version(zarr.__version__) >= Version("3.0.0.b2") + zarr_version = Version(zarr.__version__) + zarr_v3 = Version("3.0.0b2") + if zarr_version.is_prerelease: + return True + else: + return Version(zarr.__version__) >= zarr_v3 def dict_to_store(store_dict: dict): @@ -128,6 +133,7 @@ def consolidate(refs): out[k] = (b"base64:" + base64.b64encode(v)).decode() else: out[k] = v + out = translate_refs_serializable(out) return {"version": 1, "refs": out} diff --git a/kerchunk/zarr.py b/kerchunk/zarr.py index ea0612de..0417d965 100644 --- a/kerchunk/zarr.py +++ b/kerchunk/zarr.py @@ -54,6 +54,7 @@ def single_zarr( if isinstance(refs, LazyReferenceMapper): refs.flush() refs = kerchunk.utils.consolidate(refs) + return refs