Skip to content

Commit

Permalink
Updates to use the new delayedarray and hdf5array packages.
Browse files Browse the repository at this point in the history
- Use Grids to specify the chunk layout.
- Directly pass the buffer size to the apply functions.
- extract_*_array() functions no longer support None subsets.
  • Loading branch information
LTLA committed Feb 1, 2024
1 parent 0df706a commit 24585c3
Show file tree
Hide file tree
Showing 15 changed files with 112 additions and 115 deletions.
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ python_requires = >=3.8
install_requires =
importlib-metadata; python_version<"3.8"
dolomite-base>=0.2.0
delayedarray>=0.3.4
delayedarray>=0.5.0
h5py
numpy
filebackedarray
hdf5array

[options.packages.find]
where = src
Expand Down
14 changes: 7 additions & 7 deletions src/dolomite_matrix/DelayedMask.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _create_mask(x: numpy.ndarray, placeholder):


@delayedarray.extract_dense_array.register
def extract_dense_array_DelayedMask(x: DelayedMask, subset: Optional[Tuple[Sequence[int], ...]] = None):
def extract_dense_array_DelayedMask(x: DelayedMask, subset: Tuple[Sequence[int], ...]):
"""See :py:meth:`~delayedarray.extract_dense_array.extract_dense_array`."""
out = delayedarray.extract_dense_array(x._seed, subset)
mask = _create_mask(out, x._placeholder) # do this before type coercion, as the placeholder is assumed to be of the same underlying seed type.
Expand All @@ -100,13 +100,13 @@ def _mask_SparseNdarray(contents, placeholder, dtype):


@delayedarray.extract_sparse_array.register
def extract_sparse_array_DelayedMask(x: DelayedMask, subset: Optional[Tuple[Sequence[int], ...]] = None):
def extract_sparse_array_DelayedMask(x: DelayedMask, subset: Tuple[Sequence[int], ...]):
"""See :py:meth:`~delayedarray.extract_sparse_array.extract_sparse_array`."""
out = delayedarray.extract_sparse_array(x._seed, subset)
contents = out.contents
if contents is not None:
contents = _mask_SparseNdarray(contents, x._placeholder, x._dtype)
return delayedarray.SparseNdarray(x.shape, contents, dtype=x._dtype, index_dtype=out.index_dtype, check=False)
return delayedarray.SparseNdarray(out.shape, contents, dtype=x._dtype, index_dtype=out.index_dtype, is_masked=True, check=False)


@delayedarray.create_dask_array.register
Expand All @@ -119,10 +119,10 @@ def create_dask_array_DelayedMask(x: DelayedMask):
return dask.array.ma.masked_array(target, mask=mask)


@delayedarray.chunk_shape.register
def chunk_shape_DelayedMask(x: DelayedMask):
"""See :py:meth:`~delayedarray.chunk_shape.chunk_shape`."""
return delayedarray.chunk_shape(x._seed)
@delayedarray.chunk_grid.register
def chunk_grid_DelayedMask(x: DelayedMask):
"""See :py:meth:`~delayedarray.chunk_grid.chunk_grid`."""
return delayedarray.chunk_grid(x._seed)


@delayedarray.is_sparse.register
Expand Down
12 changes: 6 additions & 6 deletions src/dolomite_matrix/WrapperArraySeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,20 @@ def is_masked_WrapperArraySeed(x: WrapperArraySeed) -> bool:
return delayedarray.is_masked(x._seed)


@delayedarray.chunk_shape.register
def chunk_shape_WrapperArraySeed(x: WrapperArraySeed) -> Tuple[int, ...]:
"""See :py:func:`~delayedarray.chunk_shape.chunk_shape` for details."""
return delayedarray.chunk_shape(x._seed)
@delayedarray.chunk_grid.register
def chunk_grid_WrapperArraySeed(x: WrapperArraySeed) -> Tuple[int, ...]:
"""See :py:func:`~delayedarray.chunk_grid.chunk_grid` for details."""
return delayedarray.chunk_grid(x._seed)


@delayedarray.extract_dense_array.register
def extract_dense_array_WrapperArraySeed(x: WrapperArraySeed, subset: Optional[Tuple[Sequence[int], ...]] = None) -> numpy.ndarray:
def extract_dense_array_WrapperArraySeed(x: WrapperArraySeed, subset: Tuple[Sequence[int], ...]) -> numpy.ndarray:
"""See :py:func:`~delayedarray.extract_dense_array.extract_dense_array` for details."""
return delayedarray.extract_dense_array(x._seed, subset)


@delayedarray.extract_sparse_array.register
def extract_sparse_array_WrapperArraySeed(x: WrapperArraySeed, subset: Optional[Tuple[Sequence[int], ...]] = None) -> delayedarray.SparseNdarray:
def extract_sparse_array_WrapperArraySeed(x: WrapperArraySeed, subset: Tuple[Sequence[int], ...]) -> delayedarray.SparseNdarray:
"""See :py:func:`~delayedarray.extract_sparse_array.extract_sparse_array` for details."""
return delayedarray.extract_sparse_array(x._seed, subset)

Expand Down
20 changes: 8 additions & 12 deletions src/dolomite_matrix/_optimize_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import dolomite_base as dl
import h5py
import numpy
from delayedarray import SparseNdarray, apply_over_blocks, choose_block_shape_for_iteration, is_sparse, is_masked
from delayedarray import SparseNdarray, apply_over_blocks, is_sparse, is_masked

has_scipy = False
try:
Expand Down Expand Up @@ -126,19 +126,18 @@ class _IntegerAttributes:

@singledispatch
def collect_integer_attributes(x: Any, buffer_size: int) -> _IntegerAttributes:
block_shape = choose_block_shape_for_iteration(x, memory = buffer_size)
if is_sparse(x):
collated = apply_over_blocks(
x,
lambda pos, block : _collect_integer_attributes_from_Sparse2darray(block, buffer_size),
block_shape = block_shape,
buffer_size = buffer_size,
allow_sparse=True
)
else:
collated = apply_over_blocks(
x,
lambda pos, block : _collect_integer_attributes_from_ndarray(block, buffer_size),
block_shape = block_shape
buffer_size = buffer_size,
)
return _combine_integer_attributes(collated, check_missing = is_masked(x))

Expand Down Expand Up @@ -295,19 +294,18 @@ class _FloatAttributes:

@singledispatch
def collect_float_attributes(x: Any, buffer_size: int) -> _FloatAttributes:
block_shape = choose_block_shape_for_iteration(x, memory = buffer_size)
if is_sparse(x):
collated = apply_over_blocks(
x,
lambda pos, block : _collect_float_attributes_from_Sparse2darray(block, buffer_size),
block_shape = block_shape,
buffer_size = buffer_size,
allow_sparse=True
)
else:
collated = apply_over_blocks(
x,
lambda pos, block : _collect_float_attributes_from_ndarray(block, buffer_size),
block_shape = block_shape
buffer_size = buffer_size,
)
return _combine_float_attributes(collated, check_missing = is_masked(x))

Expand Down Expand Up @@ -614,11 +612,10 @@ def _simple_string_collector(x: numpy.ndarray, check_missing: None) -> _StringAt

@singledispatch
def collect_string_attributes(x: Any, buffer_size: int) -> _StringAttributes:
block_shape = choose_block_shape_for_iteration(x, memory = buffer_size)
collected = apply_over_blocks(
x,
lambda pos, block : _collect_string_attributes_from_ndarray(block, buffer_size),
block_shape = block_shape
buffer_size = buffer_size,
)
return _combine_string_attributes(collected, check_missing = is_masked(x))

Expand Down Expand Up @@ -677,19 +674,18 @@ class _BooleanAttributes:

@singledispatch
def collect_boolean_attributes(x: Any, buffer_size: int) -> _BooleanAttributes:
block_shape = choose_block_shape_for_iteration(x, memory = buffer_size)
if is_sparse(x):
collated = apply_over_blocks(
x,
lambda pos, block : _collect_boolean_attributes_from_Sparse2darray(block, buffer_size),
block_shape = block_shape,
buffer_size = buffer_size,
allow_sparse=True
)
else:
collated = apply_over_blocks(
x,
lambda pos, block : _collect_boolean_attributes_from_ndarray(block, buffer_size),
block_shape = block_shape
buffer_size = buffer_size,
)
return _combine_boolean_attributes(collated, check_missing = is_masked(x))

Expand Down
12 changes: 6 additions & 6 deletions src/dolomite_matrix/choose_chunk_dimensions.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from typing import Tuple


def choose_chunk_dimensions(shape: Tuple[int, ...], size: int, min_extent: int = 100, memory: int = 1e7) -> Tuple[int, ...]:
def choose_chunk_dimensions(shape: Tuple[int, ...], size: int, min_extent: int = 100, buffer_size: int = 1e7) -> Tuple[int, ...]:
"""
Choose chunk dimensions to use for a dense HDF5 dataset. For each
dimension, we consider a slice of the array that consists of the full
extent of all other dimensions. We want this slice to occupy less than
``memory`` in memory, and we resize the slice along the current dimension
to achieve this. The chosen chunk size is then defined as the size of the
``buffer_size`` in memory, and we resize the slice along the current
dimension to achieve this. The chunk size is then chosen as the size of the
slice along the current dimension. This ensures that efficient iteration
along each dimension will not use any more than ``memory`` bytes.
along each dimension will not use any more than ``buffer_size`` bytes.
Args:
shape: Shape of the array.
Expand All @@ -20,15 +20,15 @@ def choose_chunk_dimensions(shape: Tuple[int, ...], size: int, min_extent: int =
Minimum extent of each chunk dimension, to avoid problems
with excessively small chunk sizes when the data is large.
memory:
buffer_size:
Size of the (conceptual) memory buffer to use for storing blocks of
data during iteration through the array, in bytes.
Returns:
Tuple containing the chunk dimensions.
"""

num_elements = int(memory / size)
num_elements = int(buffer_size / size)
chunks = []

for d, s in enumerate(shape):
Expand Down
2 changes: 1 addition & 1 deletion src/dolomite_matrix/read_compressed_sparse_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import numpy
import os
import h5py
from filebackedarray import Hdf5CompressedSparseMatrixSeed
from hdf5array import Hdf5CompressedSparseMatrixSeed

from .DelayedMask import DelayedMask
from .ReloadedArray import ReloadedArray
Expand Down
2 changes: 1 addition & 1 deletion src/dolomite_matrix/read_dense_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import numpy
import os
import h5py
from filebackedarray import Hdf5DenseArraySeed
from hdf5array import Hdf5DenseArraySeed

from .DelayedMask import DelayedMask
from .ReloadedArray import ReloadedArray
Expand Down
17 changes: 9 additions & 8 deletions src/dolomite_matrix/save_compressed_sparse_matrix.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any
from functools import singledispatch
from dolomite_base import save_object, validate_saves
from delayedarray import SparseNdarray, chunk_shape
from delayedarray import SparseNdarray, chunk_grid
import os
import h5py
import numpy
Expand Down Expand Up @@ -36,9 +36,9 @@ def _choose_index_type(n: int) -> str:

@singledispatch
def _h5_write_sparse_matrix(x: Any, handle, details, compressed_sparse_matrix_buffer_size, compressed_sparse_matrix_chunk_size):
chunks = chunk_shape(x)
chunks_per_col = x.shape[0] / chunks[0]
chunks_per_row = x.shape[1] / chunks[1]
chunks = chunk_grid(x)
chunks_per_col = len(chunks.boundaries[0])
chunks_per_row = len(chunks.boundaries[1])

# If we have to extract fewer chunks per column, we iterate by column to
# create a CSC matrix. Otherwise we make a CSR matrix.
Expand All @@ -56,14 +56,15 @@ def _h5_write_sparse_matrix(x: Any, handle, details, compressed_sparse_matrix_bu
if details.placeholder is not None:
dhandle.attrs.create("missing-value-placeholder", data = details.placeholder, dtype = details.type)

masked = delayedarray.is_masked(x)
block_size = max(int(compressed_sparse_matrix_buffer_size) // (x.dtype.itemsize * x.shape[secondary]), 1)
limit = x.shape[primary]

itype = _choose_index_type(x.shape[secondary])
ihandle = handle.create_dataset("indices", shape = details.non_zero, dtype = itype, compression = "gzip", chunks = compressed_sparse_matrix_chunk_size)
indptrs = numpy.zeros(x.shape[primary] + 1, dtype = numpy.uint64)
counter = 0

masked = delayedarray.is_masked(x)
block_size = delayedarray.choose_block_size_for_1d_iteration(x, dimension=primary, memory=compressed_sparse_matrix_buffer_size)
limit = x.shape[primary]
counter = 0
subset = [None] * 2
subset[secondary] = range(x.shape[secondary])

Expand Down
27 changes: 5 additions & 22 deletions src/dolomite_matrix/save_dense_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,8 @@
###################################################


# We use a mock class with the properties of the HDF5 dataset. This allows us
# to use choose_block_shape_for_iteration to pick dimensions that align with
# the HDF5 chunks; these may or may not be suitable for the input array, but
# we'll take the chance that the input array is already in memory.
class _DenseArrayOutputMock:
def __init__(self, shape: Tuple, dtype: numpy.dtype, chunks: Tuple):
self.shape = shape
self.dtype = dtype
self.chunks = chunks


@delayedarray.chunk_shape.register
def _chunk_shape_DenseArrayOutputMock(x: _DenseArrayOutputMock):
return x.chunks


def _blockwise_write_to_hdf5(dhandle: h5py.Dataset, chunk_shape: Tuple, x: Any, placeholder: Any, memory: int):
mock = _DenseArrayOutputMock(x.shape, x.dtype, chunk_shape)
block_shape = delayedarray.choose_block_shape_for_iteration(mock, memory=memory)
def _blockwise_write_to_hdf5(dhandle: h5py.Dataset, chunk_shape: Tuple, x: Any, placeholder: Any, buffer_size: int):
masked = delayedarray.is_masked(x)

is_string = numpy.issubdtype(dhandle.dtype, numpy.bytes_)
if placeholder is not None:
if is_string:
Expand All @@ -59,7 +40,9 @@ def _blockwise_dense_writer(pos: Tuple, block):
coords = [slice(start, end) for start, end in reversed(pos)]
dhandle[(*coords,)] = block.T

delayedarray.apply_over_blocks(x, _blockwise_dense_writer, block_shape = block_shape)
# Cost factor doesn't really matter here as we're not choosing between grids.
grid = delayedarray.chunk_shape_to_grid(chunk_shape, x.shape, cost_factor=10)
delayedarray.apply_over_blocks(x, _blockwise_dense_writer, grid = grid, buffer_size = buffer_size)
return


Expand Down Expand Up @@ -130,7 +113,7 @@ def _save_dense_array(
# So, we save the blocks in transposed form for efficiency.
ghandle.create_dataset("transposed", data=1, dtype="i1")
dhandle = ghandle.create_dataset("data", shape=(*reversed(x.shape),), chunks=(*reversed(dense_array_chunk_dimensions),), dtype=opts.type, compression="gzip")
_blockwise_write_to_hdf5(dhandle, chunk_shape=dense_array_chunk_dimensions, x=x, placeholder=opts.placeholder, memory=dense_array_buffer_size)
_blockwise_write_to_hdf5(dhandle, chunk_shape=dense_array_chunk_dimensions, x=x, placeholder=opts.placeholder, buffer_size=dense_array_buffer_size)
if opts.placeholder is not None:
dhandle.attrs.create("missing-value-placeholder", data=opts.placeholder, dtype=opts.type)

Expand Down
Loading

0 comments on commit 24585c3

Please sign in to comment.