Skip to content

Commit b388911

Browse files
committed
default to 1 itemsize for data types that don't declare it
1 parent f8e39e6 commit b388911

1 file changed

Lines changed: 31 additions & 19 deletions

File tree

src/zarr/experimental/sync_codecs.py

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -402,22 +402,16 @@ async def _decode_async(
402402
specs = [bb_codec.resolve_metadata(s) for s in specs]
403403

404404
# Decode in reverse, using the forward-resolved specs.
405-
for bb_codec, bb_spec in zip(
406-
self.bytes_bytes_codecs[::-1], bb_specs[::-1], strict=False
407-
):
405+
for bb_codec, bb_spec in zip(self.bytes_bytes_codecs[::-1], bb_specs[::-1], strict=False):
408406
chunk_bytes_batch = list(
409407
await bb_codec.decode(zip(chunk_bytes_batch, bb_spec, strict=False))
410408
)
411409

412410
chunk_array_batch: list[NDBuffer | None] = list(
413-
await self.array_bytes_codec.decode(
414-
zip(chunk_bytes_batch, ab_specs, strict=False)
415-
)
411+
await self.array_bytes_codec.decode(zip(chunk_bytes_batch, ab_specs, strict=False))
416412
)
417413

418-
for aa_codec, aa_spec in zip(
419-
self.array_array_codecs[::-1], aa_specs[::-1], strict=False
420-
):
414+
for aa_codec, aa_spec in zip(self.array_array_codecs[::-1], aa_specs[::-1], strict=False):
421415
chunk_array_batch = list(
422416
await aa_codec.decode(zip(chunk_array_batch, aa_spec, strict=False))
423417
)
@@ -835,7 +829,10 @@ def read_sync(
835829

836830
# Phase 2: Decode — run the codec chain for each chunk.
837831
# Estimate per-chunk codec work and decide whether to parallelize.
838-
chunk_nbytes = product(first_spec.shape) * first_spec.dtype.item_size
832+
# Not all dtypes have item_size (e.g. custom dtypes), so fall back
833+
# to sequential processing when we can't estimate chunk size.
834+
dtype_item_size = getattr(first_spec.dtype, "item_size", 1)
835+
chunk_nbytes = product(first_spec.shape) * dtype_item_size
839836
n_workers = _choose_workers(len(batch_info_list), chunk_nbytes, self)
840837
if n_workers > 0:
841838
pool = _get_pool(n_workers)
@@ -886,8 +883,13 @@ def _write_chunk_compute(
886883

887884
# Merge new data into the chunk
888885
chunk_array: NDBuffer | None = self._merge_chunk_array(
889-
existing_array, value, out_selection, chunk_spec,
890-
chunk_selection, is_complete_chunk, drop_axes,
886+
existing_array,
887+
value,
888+
out_selection,
889+
chunk_spec,
890+
chunk_selection,
891+
is_complete_chunk,
892+
drop_axes,
891893
)
892894

893895
# Filter empty chunks
@@ -934,17 +936,18 @@ def write_sync(
934936

935937
# Phase 1: IO — read existing chunk bytes for partial writes.
936938
existing_bytes_list: list[Buffer | None] = [
937-
byte_setter.get_sync(prototype=chunk_spec.prototype)
938-
if not is_complete_chunk
939-
else None
939+
byte_setter.get_sync(prototype=chunk_spec.prototype) if not is_complete_chunk else None
940940
for byte_setter, chunk_spec, _, _, is_complete_chunk in batch_info_list
941941
]
942942

943943
# Phase 2: Compute — decode existing, merge new data, encode.
944944
# Estimate per-chunk work to decide whether to parallelize.
945945
# Use encode cost model since writes are dominated by compression.
946+
# Not all dtypes have item_size (e.g. custom dtypes), so fall back
947+
# to sequential processing when we can't estimate chunk size.
946948
_, first_spec, *_ = batch_info_list[0]
947-
chunk_nbytes = product(first_spec.shape) * first_spec.dtype.item_size
949+
dtype_item_size = getattr(first_spec.dtype, "item_size", 1)
950+
chunk_nbytes = product(first_spec.shape) * dtype_item_size
948951
n_workers = _choose_workers(len(batch_info_list), chunk_nbytes, self, is_encode=True)
949952
if n_workers > 0:
950953
pool = _get_pool(n_workers)
@@ -963,11 +966,20 @@ def write_sync(
963966
else:
964967
encoded_list = [
965968
self._write_chunk_compute(
966-
existing_bytes, chunk_spec, chunk_selection,
967-
out_selection, is_complete_chunk, value, drop_axes,
969+
existing_bytes,
970+
chunk_spec,
971+
chunk_selection,
972+
out_selection,
973+
is_complete_chunk,
974+
value,
975+
drop_axes,
968976
)
969977
for existing_bytes, (
970-
_, chunk_spec, chunk_selection, out_selection, is_complete_chunk,
978+
_,
979+
chunk_spec,
980+
chunk_selection,
981+
out_selection,
982+
is_complete_chunk,
971983
) in zip(existing_bytes_list, batch_info_list, strict=False)
972984
]
973985

0 commit comments

Comments
 (0)