Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix most combine tests #5

Open
wants to merge 1 commit into
base: v3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions kerchunk/combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,11 @@ def append(
target_options=target_options,
asynchronous=True
)
store = fs_as_store(fs)
ds = xr.open_dataset(
fs.get_mapper(), engine="zarr", backend_kwargs={"consolidated": False}
store, engine="zarr", backend_kwargs={"consolidated": False}
)
z = zarr.open(fs.get_mapper(), zarr_format=2)
z = zarr.open_group(store, zarr_format=2, use_consolidated=False)
mzz = MultiZarrToZarr(
path,
out=fs.references, # dict or parquet/lazy
Expand Down Expand Up @@ -373,8 +374,8 @@ def first_pass(self):
fs._dircache_from_items()

logger.debug("First pass: %s", i)
z_store = fs_as_store(fs, read_only=False)
z = zarr.open_group(z_store, zarr_format=2)
z_store = fs_as_store(fs, read_only=True)
z = zarr.open_group(z_store, mode="r", zarr_format=2, use_consolidated=False)
for var in self.concat_dims:
value = self._get_value(i, z, var, fn=self._paths[i])
if isinstance(value, np.ndarray):
Expand All @@ -399,11 +400,13 @@ def store_coords(self):
"""
Write coordinate arrays into the output
"""
# group to write new refs to; backed by kv dict
kv = {}
store = zarr.storage.MemoryStore(kv)
group = zarr.open_group(store, zarr_format=2)
m = fs_as_store(self.fss[0], read_only=False)
z = zarr.open(m, zarr_format=2)
store = zarr.storage.MemoryStore(store_dict=kv)
group = zarr.open_group(store, mode="w", zarr_format=2, use_consolidated=False)
# group to read coords from
m = fs_as_store(self.fss[0], read_only=True)
z = zarr.open_group(m, zarr_format=2, mode="r", use_consolidated=False)
for k, v in self.coos.items():
if k == "var":
# The names of the variables to write in the second pass, not a coordinate
Expand Down Expand Up @@ -454,6 +457,7 @@ def store_coords(self):
else:
arr.attrs.update(self.cf_units[k])
# TODO: rewrite .zarray/.zattrs with ujson to save space. Maybe make them by hand anyway.
translate_refs_serializable(kv)
self.out.update(kv)
logger.debug("Written coordinates")

Expand All @@ -474,8 +478,8 @@ def second_pass(self):

for i, fs in enumerate(self.fss):
to_download = {}
m = fs_as_store(fs, read_only=False)
z = zarr.open(m, zarr_format=2)
m = fs_as_store(fs, read_only=True)
z = zarr.open_group(m, zarr_format=2, mode="r", use_consolidated=False)

if no_deps is None:
# done first time only
Expand Down
68 changes: 36 additions & 32 deletions kerchunk/tests/test_combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import kerchunk.combine
from kerchunk.zarr import single_zarr
from kerchunk.combine import MultiZarrToZarr
from kerchunk.utils import refs_as_store

fs = fsspec.filesystem("memory")
arr = np.random.rand(1, 10, 10)
Expand All @@ -19,28 +20,28 @@
dims=["x", "y"],
name="data",
)
xr.Dataset({"data": data}, attrs={"attr0": 3}).to_zarr("memory://simple1.zarr")
xr.Dataset({"data": data}, attrs={"attr0": 3}).to_zarr("memory://simple1.zarr", zarr_format=2)

data = xr.DataArray(
data=arr.squeeze() + 1,
dims=["x", "y"],
name="data",
)
xr.Dataset({"data": data}, attrs={"attr0": 4}).to_zarr("memory://simple2.zarr")
xr.Dataset({"data": data}, attrs={"attr0": 4}).to_zarr("memory://simple2.zarr", zarr_format=2)

data = xr.DataArray(
data=arr.squeeze(),
dims=["x", "y"],
name="datum",
)
xr.Dataset({"datum": data}, attrs={"attr0": 3}).to_zarr("memory://simple_var1.zarr")
xr.Dataset({"datum": data}, attrs={"attr0": 3}).to_zarr("memory://simple_var1.zarr", zarr_format=2)

data = xr.DataArray(
data=arr.squeeze() + 1,
dims=["x", "y"],
name="datum",
)
xr.Dataset({"datum": data}, attrs={"attr0": 4}).to_zarr("memory://simple_var2.zarr")
xr.Dataset({"datum": data}, attrs={"attr0": 4}).to_zarr("memory://simple_var2.zarr", zarr_format=2)

data = xr.DataArray(
data=arr,
Expand All @@ -50,7 +51,7 @@
attrs={"attr0": 3},
)
xr.Dataset({"data": data, "static": static}, attrs={"attr1": 5}).to_zarr(
"memory://single1.zarr"
"memory://single1.zarr", zarr_format=2
)

data = xr.DataArray(
Expand All @@ -61,7 +62,7 @@
attrs={"attr0": 4},
)
xr.Dataset({"data": data, "static": static}, attrs={"attr1": 6}).to_zarr(
"memory://single2.zarr"
"memory://single2.zarr", zarr_format=2
)

data = xr.DataArray(
Expand All @@ -72,7 +73,7 @@
attrs={"attr0": 4},
)
xr.Dataset({"data": data, "static": static}, attrs={"attr1": 6}).to_zarr(
"memory://single3.zarr"
"memory://single3.zarr", zarr_format=2
)

data = xr.DataArray(
Expand All @@ -82,8 +83,8 @@
name="data",
attrs={"attr0": 0},
)
xr.Dataset({"data": data}).to_zarr("memory://quad_nochunk1.zarr")
xr.Dataset({"data": data}).to_zarr("memory://group1.zarr", group="group")
xr.Dataset({"data": data}).to_zarr("memory://quad_nochunk1.zarr", zarr_format=2)
xr.Dataset({"data": data}).to_zarr("memory://group1.zarr", group="group", zarr_format=2)

data = xr.DataArray(
data=np.vstack([arr] * 4),
Expand All @@ -92,8 +93,8 @@
name="data",
attrs={"attr0": 0},
)
xr.Dataset({"data": data}).to_zarr("memory://quad_nochunk2.zarr")
xr.Dataset({"data": data}).to_zarr("memory://group2.zarr", group="group")
xr.Dataset({"data": data}).to_zarr("memory://quad_nochunk2.zarr", zarr_format=2)
xr.Dataset({"data": data}).to_zarr("memory://group2.zarr", group="group", zarr_format=2)

data = xr.DataArray(
data=da.from_array(np.vstack([arr] * 4), chunks=(1, 10, 10)),
Expand All @@ -102,7 +103,7 @@
name="data",
attrs={"attr0": 0},
)
xr.Dataset({"data": data}).to_zarr("memory://quad_1chunk1.zarr")
xr.Dataset({"data": data}).to_zarr("memory://quad_1chunk1.zarr", zarr_format=2)

data = xr.DataArray(
data=da.from_array(np.vstack([arr] * 4), chunks=(1, 10, 10)),
Expand All @@ -111,7 +112,7 @@
name="data",
attrs={"attr0": 0},
)
xr.Dataset({"data": data}).to_zarr("memory://quad_1chunk2.zarr")
xr.Dataset({"data": data}).to_zarr("memory://quad_1chunk2.zarr", zarr_format=2)

data = xr.DataArray(
data=da.from_array(np.vstack([arr] * 4), chunks=(2, 10, 10)),
Expand All @@ -120,7 +121,7 @@
name="data",
attrs={"attr0": 0},
)
xr.Dataset({"data": data}).to_zarr("memory://quad_2chunk1.zarr")
xr.Dataset({"data": data}).to_zarr("memory://quad_2chunk1.zarr", zarr_format=2)

data = xr.DataArray(
data=da.from_array(np.vstack([arr] * 4), chunks=(2, 10, 10)),
Expand All @@ -129,19 +130,19 @@
name="data",
attrs={"attr0": 0},
)
xr.Dataset({"data": data}).to_zarr("memory://quad_2chunk2.zarr")
xr.Dataset({"data": data}).to_zarr("memory://quad_2chunk2.zarr", zarr_format=2)

# simple time arrays - xarray can't make these!
m = fs.get_mapper("time1.zarr")
z = zarr.open(m, mode="w", zarr_format=2)
z = zarr.open_group("memory://time1.zarr", mode="w", zarr_format=2)
time1_array = np.array([1], dtype="M8[s]")
ar = z.create_array("time", data=time1_array, shape=time1_array.shape)
ar.attrs.update({"_ARRAY_DIMENSIONS": ["time"]})
ar = z.create_array("data", data=arr, shape=arr.shape)
ar.attrs.update({"_ARRAY_DIMENSIONS": ["time", "x", "y"]})

m = fs.get_mapper("time2.zarr")
z = zarr.open(m, mode="w", zarr_format=2)
z = zarr.open_group("memory://time2.zarr", mode="w", zarr_format=2)
time2_array = np.array([2], dtype="M8[s]")
ar = z.create_array("time", data=time2_array, shape=time2_array.shape)
ar.attrs.update({"_ARRAY_DIMENSIONS": ["time"]})
Expand All @@ -156,7 +157,7 @@
dims=["time", "x", "y"],
name="data",
)
xr.Dataset({"data": tdata1}).to_zarr("memory://cfstdtime1.zarr")
xr.Dataset({"data": tdata1}).to_zarr("memory://cfstdtime1.zarr", zarr_format=2)
fs.pipe(
"cfstdtime1.zarr/time/.zattrs",
b'{"_ARRAY_DIMENSIONS": ["time"], "units": "seconds since '
Expand All @@ -169,7 +170,7 @@
dims=["time", "x", "y"],
name="data",
)
xr.Dataset({"data": tdata1}).to_zarr("memory://cfstdtime2.zarr")
xr.Dataset({"data": tdata1}).to_zarr("memory://cfstdtime2.zarr", zarr_format=2)
fs.pipe(
"cfstdtime2.zarr/time/.zattrs",
b'{"_ARRAY_DIMENSIONS": ["time"], "units": "seconds since '
Expand All @@ -182,7 +183,7 @@
dims=["time", "x", "y"],
name="data",
)
xr.Dataset({"data": tdata1}).to_zarr("memory://cfstdtime3.zarr")
xr.Dataset({"data": tdata1}).to_zarr("memory://cfstdtime3.zarr", zarr_format=2)
fs.pipe(
"cfstdtime3.zarr/time/.zattrs",
b'{"_ARRAY_DIMENSIONS": ["time"], "units": "seconds since '
Expand All @@ -197,7 +198,7 @@
name="data",
attrs={"units": "months since 1970-01-01", "calendar": "360_day"},
)
xr.Dataset({"data": tdata1}).to_zarr("memory://cfnontime1.zarr")
xr.Dataset({"data": tdata1}).to_zarr("memory://cfnontime1.zarr", zarr_format=2)
fs.pipe(
"cfnontime1.zarr/time/.zattrs",
b'{"_ARRAY_DIMENSIONS": ["time"], "units": "months since 1970-01-01", "calendar": "360_day"}',
Expand All @@ -210,7 +211,7 @@
name="data",
attrs={"units": "months since 1970-01-01", "calendar": "360_day"},
)
xr.Dataset({"data": tdata1}).to_zarr("memory://cfnontime2.zarr")
xr.Dataset({"data": tdata1}).to_zarr("memory://cfnontime2.zarr", zarr_format=2)
fs.pipe(
"cfnontime2.zarr/time/.zattrs",
b'{"_ARRAY_DIMENSIONS": ["time"], "units": "months since 1970-01-01", "calendar": "360_day"}',
Expand All @@ -228,8 +229,9 @@ def refs():
def test_fixture(refs):
# effectively checks that single_zarr works
assert "single1" in refs
m = fsspec.get_mapper("reference://", fo=refs["single1"], remote_protocol="memory")
g = xr.open_dataset(m, engine="zarr", backend_kwargs={"consolidated": False})
#m = fsspec.get_mapper("reference://", fo=refs["single1"], remote_protocol="memory")
store = refs_as_store(refs["single1"])
g = xr.open_dataset(store, engine="zarr", backend_kwargs={"consolidated": False})
assert g.time.values.tolist() == [1]
assert (g.data.values == arr).all()
assert g.attrs["attr1"] == 5
Expand Down Expand Up @@ -274,7 +276,8 @@ def test_get_coos(refs, selector, expected):
mzz.first_pass()
assert mzz.coos["time"].tolist() == expected
mzz.store_coords()
g = zarr.open(mzz.out, zarr_format=2)
store = refs_as_store(mzz.out)
g = zarr.open_group(store, mode="r", zarr_format=2)
assert g["time"][:].tolist() == expected
assert dict(g.attrs)

Expand Down Expand Up @@ -605,11 +608,11 @@ def test_chunked(refs, inputs, chunks):
group="group" if "group" in inputs[0] else None,
)
# TODO: make some assert_eq style function
assert z.time.values.tolist() == [1, 2, 3, 4, 5, 6, 7, 8]
assert z.data.shape == (8, 10, 10)
assert z.data.chunks == chunks
assert z["time"].values.tolist() == [1, 2, 3, 4, 5, 6, 7, 8]
assert z["data"].shape == (8, 10, 10)
assert z["data"].chunks == chunks
for i in range(z.data.shape[0]):
assert (z.data[i].values == arr).all()
assert (z["data"][i].values == arr).all()


def test_var(refs):
Expand Down Expand Up @@ -774,9 +777,10 @@ def test_no_inline(refs):
"""Ensure that inline_threshold=0 disables MultiZarrToZarr checking file size."""
ds = xr.Dataset(dict(x=[1, 2, 3]))
ds["y"] = 3 + ds["x"]
store = fsspec.get_mapper("memory://zarr_store")
ds.to_zarr(store, mode="w", consolidated=False)
ref = kerchunk.utils.consolidate(store)
kv = {}
store = zarr.storage.MemoryStore(store_dict=kv)
ds.to_zarr(store, mode="w", consolidated=False, zarr_format=2)
ref = kerchunk.utils.consolidate(kv)
# This type of reference with no offset or total size is produced by
# kerchunk.zarr.single_zarr or equivalently ZarrToZarr.translate.
ref["refs"]["y/0"] = ["file:///tmp/some/data-that-shouldnt-be-accessed"]
Expand Down
8 changes: 7 additions & 1 deletion kerchunk/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ def refs_as_store(

def is_zarr3():
"""Check if the installed zarr version is version 3"""
return Version(zarr.__version__) >= Version("3.0.0.b2")
zarr_version = Version(zarr.__version__)
zarr_v3 = Version("3.0.0b2")
if zarr_version.is_prerelease:
return True
else:
return Version(zarr.__version__) >= zarr_v3


def dict_to_store(store_dict: dict):
Expand Down Expand Up @@ -128,6 +133,7 @@ def consolidate(refs):
out[k] = (b"base64:" + base64.b64encode(v)).decode()
else:
out[k] = v
out = translate_refs_serializable(out)
return {"version": 1, "refs": out}


Expand Down
1 change: 1 addition & 0 deletions kerchunk/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def single_zarr(
if isinstance(refs, LazyReferenceMapper):
refs.flush()
refs = kerchunk.utils.consolidate(refs)

return refs


Expand Down
Loading