Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch Buffers to memoryviews #656

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 131 additions & 126 deletions numcodecs/blosc.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ import multiprocessing
import os


from cpython.buffer cimport PyBUF_ANY_CONTIGUOUS, PyBUF_WRITEABLE
from cpython.buffer cimport PyBuffer_IsContiguous
from cpython.bytes cimport PyBytes_FromStringAndSize, PyBytes_AS_STRING
from cpython.memoryview cimport PyMemoryView_GET_BUFFER


from .compat_ext cimport Buffer
from .compat_ext import Buffer
from .compat import ensure_contiguous_ndarray
from .abc import Codec

Expand Down Expand Up @@ -146,34 +145,36 @@ def cbuffer_sizes(source):

"""
cdef:
Buffer buffer
memoryview source_mv
const Py_buffer* source_pb
size_t nbytes, cbytes, blocksize

# obtain buffer
buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS)
# obtain source memoryview
source_mv = memoryview(source)
source_pb = PyMemoryView_GET_BUFFER(source_mv)
if not PyBuffer_IsContiguous(source_pb, b'A'):
raise BufferError("`source` must contain contiguous memory")

# determine buffer size
blosc_cbuffer_sizes(buffer.ptr, &nbytes, &cbytes, &blocksize)

# release buffers
buffer.release()
blosc_cbuffer_sizes(source_pb.buf, &nbytes, &cbytes, &blocksize)

return nbytes, cbytes, blocksize


def cbuffer_complib(source):
"""Return the name of the compression library used to compress `source`."""
cdef:
Buffer buffer
memoryview source_mv
const Py_buffer* source_pb

# obtain buffer
buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS)
# obtain source memoryview
source_mv = memoryview(source)
source_pb = PyMemoryView_GET_BUFFER(source_mv)
if not PyBuffer_IsContiguous(source_pb, b'A'):
raise BufferError("`source` must contain contiguous memory")

# determine buffer size
complib = blosc_cbuffer_complib(buffer.ptr)

# release buffers
buffer.release()
complib = blosc_cbuffer_complib(source_pb.buf)

complib = complib.decode('ascii')

Expand All @@ -193,18 +194,19 @@ def cbuffer_metainfo(source):

"""
cdef:
Buffer buffer
memoryview source_mv
const Py_buffer* source_pb
size_t typesize
int flags

# obtain buffer
buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS)
# obtain source memoryview
source_mv = memoryview(source)
source_pb = PyMemoryView_GET_BUFFER(source_mv)
if not PyBuffer_IsContiguous(source_pb, b'A'):
raise BufferError("`source` must contain contiguous memory")

# determine buffer size
blosc_cbuffer_metainfo(buffer.ptr, &typesize, &flags)

# release buffers
buffer.release()
blosc_cbuffer_metainfo(source_pb.buf, &typesize, &flags)

# decompose flags
if flags & BLOSC_DOSHUFFLE:
Expand Down Expand Up @@ -252,23 +254,29 @@ def compress(source, char* cname, int clevel, int shuffle=SHUFFLE,
"""

cdef:
char *source_ptr
char *dest_ptr
Buffer source_buffer
memoryview source_mv
const Py_buffer* source_pb
const char* source_ptr
size_t nbytes, itemsize
int cbytes
bytes dest
char* dest_ptr

# check valid cname early
cname_str = cname.decode('ascii')
if cname_str not in list_compressors():
err_bad_cname(cname_str)

# setup source buffer
source_buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS)
source_ptr = source_buffer.ptr
nbytes = source_buffer.nbytes
itemsize = source_buffer.itemsize
# obtain source memoryview
source_mv = memoryview(source)
source_pb = PyMemoryView_GET_BUFFER(source_mv)
if not PyBuffer_IsContiguous(source_pb, b'A'):
raise BufferError("`source` must contain contiguous memory")

# extract metadata
source_ptr = <const char*>source_pb.buf
nbytes = source_pb.len
itemsize = source_pb.itemsize

# determine shuffle
if shuffle == AUTOSHUFFLE:
Expand All @@ -280,46 +288,40 @@ def compress(source, char* cname, int clevel, int shuffle=SHUFFLE,
raise ValueError('invalid shuffle argument; expected -1, 0, 1 or 2, found %r' %
shuffle)

try:

# setup destination
dest = PyBytes_FromStringAndSize(NULL, nbytes + BLOSC_MAX_OVERHEAD)
dest_ptr = PyBytes_AS_STRING(dest)

# perform compression
if _get_use_threads():
# allow blosc to use threads internally
# setup destination
dest = PyBytes_FromStringAndSize(NULL, nbytes + BLOSC_MAX_OVERHEAD)
dest_ptr = PyBytes_AS_STRING(dest)

# N.B., we are using blosc's global context, and so we need to use a lock
# to ensure no-one else can modify the global context while we're setting it
# up and using it.
with get_mutex():
# perform compression
if _get_use_threads():
# allow blosc to use threads internally

# set compressor
compressor_set = blosc_set_compressor(cname)
if compressor_set < 0:
# shouldn't happen if we checked against list of compressors
# already, but just in case
err_bad_cname(cname_str)
# N.B., we are using blosc's global context, and so we need to use a lock
# to ensure no-one else can modify the global context while we're setting it
# up and using it.
with get_mutex():

# set blocksize
blosc_set_blocksize(blocksize)
# set compressor
compressor_set = blosc_set_compressor(cname)
if compressor_set < 0:
# shouldn't happen if we checked against list of compressors
# already, but just in case
err_bad_cname(cname_str)

# perform compression
with nogil:
cbytes = blosc_compress(clevel, shuffle, itemsize, nbytes, source_ptr,
dest_ptr, nbytes + BLOSC_MAX_OVERHEAD)
# set blocksize
blosc_set_blocksize(blocksize)

else:
# perform compression
with nogil:
cbytes = blosc_compress_ctx(clevel, shuffle, itemsize, nbytes, source_ptr,
dest_ptr, nbytes + BLOSC_MAX_OVERHEAD,
cname, blocksize, 1)
cbytes = blosc_compress(clevel, shuffle, itemsize, nbytes, source_ptr,
dest_ptr, nbytes + BLOSC_MAX_OVERHEAD)

finally:
else:
with nogil:
cbytes = blosc_compress_ctx(clevel, shuffle, itemsize, nbytes, source_ptr,
dest_ptr, nbytes + BLOSC_MAX_OVERHEAD,
cname, blocksize, 1)

# release buffers
source_buffer.release()

# check compression was successful
if cbytes <= 0:
Expand Down Expand Up @@ -350,53 +352,52 @@ def decompress(source, dest=None):
"""
cdef:
int ret
char *source_ptr
char *dest_ptr
Buffer source_buffer
Buffer dest_buffer = None
memoryview source_mv
const Py_buffer* source_pb
const char* source_ptr
memoryview dest_mv
Py_buffer* dest_pb
char* dest_ptr
size_t nbytes, cbytes, blocksize

# setup source buffer
source_buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS)
source_ptr = source_buffer.ptr
# obtain source memoryview
source_mv = memoryview(source)
source_pb = PyMemoryView_GET_BUFFER(source_mv)
if not PyBuffer_IsContiguous(source_pb, b'A'):
raise BufferError("`source` must contain contiguous memory")

# get source pointer
source_ptr = <const char*>source_pb.buf

# determine buffer size
blosc_cbuffer_sizes(source_ptr, &nbytes, &cbytes, &blocksize)

# setup destination buffer
if dest is None:
# allocate memory
dest = PyBytes_FromStringAndSize(NULL, nbytes)
dest_ptr = PyBytes_AS_STRING(dest)
dest_nbytes = nbytes
dest_1d = dest = PyBytes_FromStringAndSize(NULL, nbytes)
else:
arr = ensure_contiguous_ndarray(dest)
dest_buffer = Buffer(arr, PyBUF_ANY_CONTIGUOUS | PyBUF_WRITEABLE)
dest_ptr = dest_buffer.ptr
dest_nbytes = dest_buffer.nbytes

try:

# guard condition
if dest_nbytes < nbytes:
raise ValueError('destination buffer too small; expected at least %s, '
'got %s' % (nbytes, dest_nbytes))

# perform decompression
if _get_use_threads():
# allow blosc to use threads internally
with nogil:
ret = blosc_decompress(source_ptr, dest_ptr, nbytes)
else:
with nogil:
ret = blosc_decompress_ctx(source_ptr, dest_ptr, nbytes, 1)

finally:

# release buffers
source_buffer.release()
if dest_buffer is not None:
dest_buffer.release()
dest_1d = ensure_contiguous_ndarray(dest)

# obtain dest memoryview
dest_mv = memoryview(dest_1d)
dest_pb = PyMemoryView_GET_BUFFER(dest_mv)
dest_ptr = <char*>dest_pb.buf
dest_nbytes = dest_pb.len

# guard condition
if dest_nbytes < nbytes:
raise ValueError('destination buffer too small; expected at least %s, '
'got %s' % (nbytes, dest_nbytes))

# perform decompression
if _get_use_threads():
# allow blosc to use threads internally
with nogil:
ret = blosc_decompress(source_ptr, dest_ptr, nbytes)
else:
with nogil:
ret = blosc_decompress_ctx(source_ptr, dest_ptr, nbytes, 1)

# handle errors
if ret <= 0:
Expand Down Expand Up @@ -433,14 +434,22 @@ def decompress_partial(source, start, nitems, dest=None):
int encoding_size
int nitems_bytes
int start_bytes
char *source_ptr
char *dest_ptr
Buffer source_buffer
Buffer dest_buffer = None

# setup source buffer
source_buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS)
source_ptr = source_buffer.ptr
const char* source_ptr
memoryview source_mv
const Py_buffer* source_pb
memoryview dest_mv
Py_buffer* dest_pb
char* dest_ptr
size_t dest_nbytes

# obtain source memoryview
source_mv = memoryview(source)
source_pb = PyMemoryView_GET_BUFFER(source_mv)
if not PyBuffer_IsContiguous(source_pb, b"A"):
raise BufferError("`source` must contain contiguous memory")

# setup source pointer
source_ptr = <const char*>source_pb.buf

# get encoding size from source buffer header
encoding_size = source[3]
Expand All @@ -451,26 +460,22 @@ def decompress_partial(source, start, nitems, dest=None):

# setup destination buffer
if dest is None:
dest = PyBytes_FromStringAndSize(NULL, nitems_bytes)
dest_ptr = PyBytes_AS_STRING(dest)
dest_nbytes = nitems_bytes
# allocate memory
dest_1d = dest = PyBytes_FromStringAndSize(NULL, nitems_bytes)
else:
arr = ensure_contiguous_ndarray(dest)
dest_buffer = Buffer(arr, PyBUF_ANY_CONTIGUOUS | PyBUF_WRITEABLE)
dest_ptr = dest_buffer.ptr
dest_nbytes = dest_buffer.nbytes
dest_1d = ensure_contiguous_ndarray(dest)

# obtain dest memoryview
dest_mv = memoryview(dest_1d)
dest_pb = PyMemoryView_GET_BUFFER(dest_mv)
dest_ptr = <char*>dest_pb.buf
dest_nbytes = dest_pb.len

# try decompression
try:
if dest_nbytes < nitems_bytes:
raise ValueError('destination buffer too small; expected at least %s, '
'got %s' % (nitems_bytes, dest_nbytes))
ret = blosc_getitem(source_ptr, start, nitems, dest_ptr)

finally:
source_buffer.release()
if dest_buffer is not None:
dest_buffer.release()
if dest_nbytes < nitems_bytes:
raise ValueError('destination buffer too small; expected at least %s, '
'got %s' % (nitems_bytes, dest_nbytes))
ret = blosc_getitem(source_ptr, start, nitems, dest_ptr)

# ret refers to the number of bytes returned from blosc_getitem.
if ret <= 0:
Expand Down
12 changes: 0 additions & 12 deletions numcodecs/compat_ext.pxd

This file was deleted.

Loading
Loading