Skip to content

Commit f979eaa

Browse files
committed
add partial encode / decode
1 parent e24fe7e commit f979eaa

1 file changed

Lines changed: 38 additions & 2 deletions

File tree

src/zarr/experimental/sync_codecs.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,24 @@ async def _read_batch(
341341
drop_axes: tuple[int, ...] = (),
342342
) -> None:
343343
batch_info = list(batch_info)
344+
345+
if self.supports_partial_decode:
346+
assert isinstance(self.array_bytes_codec, ArrayBytesCodecPartialDecodeMixin)
347+
chunk_array_batch = await self.array_bytes_codec.decode_partial(
348+
[
349+
(byte_getter, chunk_selection, chunk_spec)
350+
for byte_getter, chunk_spec, chunk_selection, *_ in batch_info
351+
]
352+
)
353+
for chunk_array, (_, chunk_spec, _, out_selection, _) in zip(
354+
chunk_array_batch, batch_info, strict=False
355+
):
356+
if chunk_array is not None:
357+
out[out_selection] = chunk_array
358+
else:
359+
out[out_selection] = _fill_value_or_default(chunk_spec)
360+
return
361+
344362
# Phase 1: IO -- fetch bytes from store (always async)
345363
chunk_bytes_batch = await concurrent_map(
346364
[(byte_getter, array_spec.prototype) for byte_getter, array_spec, *_ in batch_info],
@@ -354,8 +372,8 @@ async def _read_batch(
354372
for chunk_bytes, (_, chunk_spec, *_) in zip(chunk_bytes_batch, batch_info, strict=False)
355373
]
356374

357-
chunk_array_batch: Iterable[NDBuffer | None] = await self.decode(decode_items)
358-
self._scatter(chunk_array_batch, batch_info, out, drop_axes)
375+
chunk_array_batch_decoded: Iterable[NDBuffer | None] = await self.decode(decode_items)
376+
self._scatter(chunk_array_batch_decoded, batch_info, out, drop_axes)
359377

360378
@staticmethod
361379
def _scatter(
@@ -437,6 +455,24 @@ async def _write_batch(
437455
) -> None:
438456
batch_info = list(batch_info)
439457

458+
if self.supports_partial_encode:
459+
assert isinstance(self.array_bytes_codec, ArrayBytesCodecPartialEncodeMixin)
460+
if len(value.shape) == 0:
461+
await self.array_bytes_codec.encode_partial(
462+
[
463+
(byte_setter, value, chunk_selection, chunk_spec)
464+
for byte_setter, chunk_spec, chunk_selection, _, _ in batch_info
465+
],
466+
)
467+
else:
468+
await self.array_bytes_codec.encode_partial(
469+
[
470+
(byte_setter, value[out_selection], chunk_selection, chunk_spec)
471+
for byte_setter, chunk_spec, chunk_selection, out_selection, _ in batch_info
472+
],
473+
)
474+
return
475+
440476
# Phase 1: IO -- read existing bytes for non-complete chunks
441477
async def _read_key(
442478
byte_setter: ByteSetter | None, prototype: BufferPrototype

0 commit comments

Comments
 (0)