Skip to content

Commit 3d5cdf8

Browse files
committed
Merge branch 'perf/prepared-write-v2' of github.com:d-v-b/zarr-python into perf/prepared-write-v2-bench
2 parents 863cf8f + 053f2ee commit 3d5cdf8

File tree

6 files changed

+204
-91
lines changed

6 files changed

+204
-91
lines changed

src/zarr/codecs/_v2.py

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,22 @@ class V2Codec(ArrayBytesCodec):
2323

2424
is_fixed_size = False
2525

26-
async def _decode_single(
26+
def _decode_sync(
2727
self,
2828
chunk_bytes: Buffer,
2929
chunk_spec: ArraySpec,
3030
) -> NDBuffer:
3131
cdata = chunk_bytes.as_array_like()
3232
# decompress
3333
if self.compressor:
34-
chunk = await asyncio.to_thread(self.compressor.decode, cdata)
34+
chunk = self.compressor.decode(cdata)
3535
else:
3636
chunk = cdata
3737

3838
# apply filters
3939
if self.filters:
4040
for f in reversed(self.filters):
41-
chunk = await asyncio.to_thread(f.decode, chunk)
41+
chunk = f.decode(chunk)
4242

4343
# view as numpy array with correct dtype
4444
chunk = ensure_ndarray_like(chunk)
@@ -48,20 +48,9 @@ async def _decode_single(
4848
try:
4949
chunk = chunk.view(chunk_spec.dtype.to_native_dtype())
5050
except TypeError:
51-
# this will happen if the dtype of the chunk
52-
# does not match the dtype of the array spec i.g. if
53-
# the dtype of the chunk_spec is a string dtype, but the chunk
54-
# is an object array. In this case, we need to convert the object
55-
# array to the correct dtype.
56-
5751
chunk = np.array(chunk).astype(chunk_spec.dtype.to_native_dtype())
5852

5953
elif chunk.dtype != object:
60-
# If we end up here, someone must have hacked around with the filters.
61-
# We cannot deal with object arrays unless there is an object
62-
# codec in the filter chain, i.e., a filter that converts from object
63-
# array to something else during encoding, and converts back to object
64-
# array during decoding.
6554
raise RuntimeError("cannot read object array without object codec")
6655

6756
# ensure correct chunk shape
@@ -70,7 +59,7 @@ async def _decode_single(
7059

7160
return get_ndbuffer_class().from_ndarray_like(chunk)
7261

73-
async def _encode_single(
62+
def _encode_sync(
7463
self,
7564
chunk_array: NDBuffer,
7665
chunk_spec: ArraySpec,
@@ -83,18 +72,32 @@ async def _encode_single(
8372
# apply filters
8473
if self.filters:
8574
for f in self.filters:
86-
chunk = await asyncio.to_thread(f.encode, chunk)
75+
chunk = f.encode(chunk)
8776
# check object encoding
8877
if ensure_ndarray_like(chunk).dtype == object:
8978
raise RuntimeError("cannot write object array without object codec")
9079

9180
# compress
9281
if self.compressor:
93-
cdata = await asyncio.to_thread(self.compressor.encode, chunk)
82+
cdata = self.compressor.encode(chunk)
9483
else:
9584
cdata = chunk
9685
cdata = ensure_bytes(cdata)
9786
return chunk_spec.prototype.buffer.from_bytes(cdata)
9887

88+
async def _decode_single(
89+
self,
90+
chunk_bytes: Buffer,
91+
chunk_spec: ArraySpec,
92+
) -> NDBuffer:
93+
return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec)
94+
95+
async def _encode_single(
96+
self,
97+
chunk_array: NDBuffer,
98+
chunk_spec: ArraySpec,
99+
) -> Buffer | None:
100+
return await asyncio.to_thread(self._encode_sync, chunk_array, chunk_spec)
101+
99102
def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
100103
raise NotImplementedError

src/zarr/codecs/numcodecs/_codecs.py

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
if TYPE_CHECKING:
4646
from zarr.abc.numcodec import Numcodec
4747
from zarr.core.array_spec import ArraySpec
48-
from zarr.core.buffer import Buffer, BufferPrototype, NDBuffer
48+
from zarr.core.buffer import Buffer, NDBuffer
4949

5050
CODEC_PREFIX = "numcodecs."
5151

@@ -132,53 +132,63 @@ class _NumcodecsBytesBytesCodec(_NumcodecsCodec, BytesBytesCodec):
132132
def __init__(self, **codec_config: JSON) -> None:
133133
super().__init__(**codec_config)
134134

135-
async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
136-
return await asyncio.to_thread(
137-
as_numpy_array_wrapper,
138-
self._codec.decode,
139-
chunk_data,
140-
chunk_spec.prototype,
141-
)
135+
def _decode_sync(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
136+
return as_numpy_array_wrapper(self._codec.decode, chunk_data, chunk_spec.prototype)
142137

143-
def _encode(self, chunk_data: Buffer, prototype: BufferPrototype) -> Buffer:
138+
def _encode_sync(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
144139
encoded = self._codec.encode(chunk_data.as_array_like())
145140
if isinstance(encoded, np.ndarray): # Required for checksum codecs
146-
return prototype.buffer.from_bytes(encoded.tobytes())
147-
return prototype.buffer.from_bytes(encoded)
141+
return chunk_spec.prototype.buffer.from_bytes(encoded.tobytes())
142+
return chunk_spec.prototype.buffer.from_bytes(encoded)
143+
144+
async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
145+
return await asyncio.to_thread(self._decode_sync, chunk_data, chunk_spec)
148146

149147
async def _encode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
150-
return await asyncio.to_thread(self._encode, chunk_data, chunk_spec.prototype)
148+
return await asyncio.to_thread(self._encode_sync, chunk_data, chunk_spec)
151149

152150

153151
class _NumcodecsArrayArrayCodec(_NumcodecsCodec, ArrayArrayCodec):
154152
def __init__(self, **codec_config: JSON) -> None:
155153
super().__init__(**codec_config)
156154

157-
async def _decode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
155+
def _decode_sync(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
158156
chunk_ndarray = chunk_data.as_ndarray_like()
159-
out = await asyncio.to_thread(self._codec.decode, chunk_ndarray)
157+
out = self._codec.decode(chunk_ndarray)
160158
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out.reshape(chunk_spec.shape))
161159

162-
async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
160+
def _encode_sync(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
163161
chunk_ndarray = chunk_data.as_ndarray_like()
164-
out = await asyncio.to_thread(self._codec.encode, chunk_ndarray)
162+
out = self._codec.encode(chunk_ndarray)
165163
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out)
166164

165+
async def _decode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
166+
return await asyncio.to_thread(self._decode_sync, chunk_data, chunk_spec)
167+
168+
async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
169+
return await asyncio.to_thread(self._encode_sync, chunk_data, chunk_spec)
170+
167171

168172
class _NumcodecsArrayBytesCodec(_NumcodecsCodec, ArrayBytesCodec):
169173
def __init__(self, **codec_config: JSON) -> None:
170174
super().__init__(**codec_config)
171175

172-
async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> NDBuffer:
176+
def _decode_sync(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> NDBuffer:
173177
chunk_bytes = chunk_data.to_bytes()
174-
out = await asyncio.to_thread(self._codec.decode, chunk_bytes)
178+
out = self._codec.decode(chunk_bytes)
175179
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out.reshape(chunk_spec.shape))
176180

177-
async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> Buffer:
181+
def _encode_sync(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> Buffer:
178182
chunk_ndarray = chunk_data.as_ndarray_like()
179-
out = await asyncio.to_thread(self._codec.encode, chunk_ndarray)
183+
out = self._codec.encode(chunk_ndarray)
180184
return chunk_spec.prototype.buffer.from_bytes(out)
181185

186+
async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> NDBuffer:
187+
return await asyncio.to_thread(self._decode_sync, chunk_data, chunk_spec)
188+
189+
async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> Buffer:
190+
return await asyncio.to_thread(self._encode_sync, chunk_data, chunk_spec)
191+
182192

183193
# bytes-to-bytes codecs
184194
class Blosc(_NumcodecsBytesBytesCodec, codec_name="blosc"):

src/zarr/codecs/sharding.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,9 @@ def from_dict(cls, data: dict[str, JSON]) -> Self:
374374

375375
@property
376376
def codec_pipeline(self) -> CodecPipeline:
377-
return get_pipeline_class().from_codecs(self.codecs)
377+
from zarr.core.codec_pipeline import BatchedCodecPipeline
378+
379+
return BatchedCodecPipeline.from_codecs(self.codecs)
378380

379381
def to_dict(self) -> dict[str, JSON]:
380382
return {

src/zarr/core/array.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,15 @@ def create_codec_pipeline(metadata: ArrayMetadata, *, store: Store | None = None
248248
return pipeline.evolve_from_array_spec(chunk_spec)
249249
elif isinstance(metadata, ArrayV2Metadata):
250250
v2_codec = V2Codec(filters=metadata.filters, compressor=metadata.compressor)
251-
return get_pipeline_class().from_codecs([v2_codec])
251+
pipeline = get_pipeline_class().from_codecs([v2_codec])
252+
chunk_spec = ArraySpec(
253+
shape=metadata.chunks,
254+
dtype=metadata.dtype,
255+
fill_value=metadata.fill_value,
256+
config=ArrayConfig.from_dict({"order": metadata.order}),
257+
prototype=default_buffer_prototype(),
258+
)
259+
return pipeline.evolve_from_array_spec(chunk_spec)
252260
raise TypeError # pragma: no cover
253261

254262

0 commit comments

Comments
 (0)