Skip to content

Commit 83d3a4b

Browse files
committed
use chunktransform
1 parent a716480 commit 83d3a4b

File tree

3 files changed

+102
-63
lines changed

3 files changed

+102
-63
lines changed

src/zarr/abc/codec.py

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,11 @@ def _encode_sync(
8383
class SupportsChunkCodec(Protocol):
8484
"""Protocol for objects that can decode/encode whole chunks synchronously.
8585
86-
[`CodecChain`][zarr.core.codec_pipeline.CodecChain] satisfies this protocol.
86+
[`ChunkTransform`][zarr.core.codec_pipeline.ChunkTransform] satisfies this protocol.
8787
"""
8888

89+
array_spec: ArraySpec
90+
8991
def decode_chunk(self, chunk_bytes: Buffer) -> NDBuffer: ...
9092

9193
def encode_chunk(self, chunk_array: NDBuffer) -> Buffer | None: ...
@@ -316,7 +318,6 @@ def serialize(
316318
def prepare_read_sync(
317319
self,
318320
byte_getter: Any,
319-
chunk_spec: ArraySpec,
320321
chunk_selection: SelectorTuple,
321322
codec_chain: SupportsChunkCodec,
322323
) -> NDBuffer | None:
@@ -328,21 +329,19 @@ def prepare_read_sync(
328329
byte_getter : Any
329330
An object supporting ``get_sync`` (e.g.
330331
[`StorePath`][zarr.storage._common.StorePath]).
331-
chunk_spec : ArraySpec
332-
The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk.
333332
chunk_selection : SelectorTuple
334333
Selection within the decoded chunk array.
335334
codec_chain : SupportsChunkCodec
336335
The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] used to
337-
decode the chunk.
336+
decode the chunk. Must carry an ``array_spec`` attribute.
338337
339338
Returns
340339
-------
341340
NDBuffer or None
342341
The decoded chunk data at *chunk_selection*, or ``None`` if the
343342
chunk does not exist in the store.
344343
"""
345-
raw = byte_getter.get_sync(prototype=chunk_spec.prototype)
344+
raw = byte_getter.get_sync(prototype=codec_chain.array_spec.prototype)
346345
if raw is None:
347346
return None
348347
chunk_array = codec_chain.decode_chunk(raw)
@@ -351,7 +350,7 @@ def prepare_read_sync(
351350
def prepare_write_sync(
352351
self,
353352
byte_setter: Any,
354-
chunk_spec: ArraySpec,
353+
codec_chain: SupportsChunkCodec,
355354
chunk_selection: SelectorTuple,
356355
out_selection: SelectorTuple,
357356
replace: bool,
@@ -367,8 +366,9 @@ def prepare_write_sync(
367366
byte_setter : Any
368367
An object supporting ``get_sync`` and ``set_sync`` (e.g.
369368
[`StorePath`][zarr.storage._common.StorePath]).
370-
chunk_spec : ArraySpec
371-
The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk.
369+
codec_chain : SupportsChunkCodec
370+
The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec]
371+
carrying the ``array_spec`` for the chunk.
372372
chunk_selection : SelectorTuple
373373
Selection within the chunk being written.
374374
out_selection : SelectorTuple
@@ -384,6 +384,7 @@ def prepare_write_sync(
384384
A [`PreparedWrite`][zarr.abc.codec.PreparedWrite] carrying the
385385
deserialized chunk data and selection metadata.
386386
"""
387+
chunk_spec = codec_chain.array_spec
387388
existing: Buffer | None = None
388389
if not replace:
389390
existing = byte_setter.get_sync(prototype=chunk_spec.prototype)
@@ -403,7 +404,7 @@ def prepare_write_sync(
403404
def finalize_write_sync(
404405
self,
405406
prepared: PreparedWrite,
406-
chunk_spec: ArraySpec,
407+
codec_chain: SupportsChunkCodec,
407408
byte_setter: Any,
408409
) -> None:
409410
"""Serialize the prepared chunk data and write it to the store.
@@ -416,13 +417,14 @@ def finalize_write_sync(
416417
prepared : PreparedWrite
417418
The [`PreparedWrite`][zarr.abc.codec.PreparedWrite] returned by
418419
[`prepare_write_sync`][zarr.abc.codec.ArrayBytesCodec.prepare_write_sync].
419-
chunk_spec : ArraySpec
420-
The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk.
420+
codec_chain : SupportsChunkCodec
421+
The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec]
422+
carrying the ``array_spec`` for the chunk.
421423
byte_setter : Any
422424
An object supporting ``set_sync`` and ``delete_sync`` (e.g.
423425
[`StorePath`][zarr.storage._common.StorePath]).
424426
"""
425-
blob = self.serialize(prepared.chunk_dict, chunk_spec)
427+
blob = self.serialize(prepared.chunk_dict, codec_chain.array_spec)
426428
if blob is None:
427429
byte_setter.delete_sync()
428430
else:
@@ -435,7 +437,6 @@ def finalize_write_sync(
435437
async def prepare_read(
436438
self,
437439
byte_getter: Any,
438-
chunk_spec: ArraySpec,
439440
chunk_selection: SelectorTuple,
440441
codec_chain: SupportsChunkCodec,
441442
) -> NDBuffer | None:
@@ -447,21 +448,19 @@ async def prepare_read(
447448
byte_getter : Any
448449
An object supporting ``get`` (e.g.
449450
[`StorePath`][zarr.storage._common.StorePath]).
450-
chunk_spec : ArraySpec
451-
The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk.
452451
chunk_selection : SelectorTuple
453452
Selection within the decoded chunk array.
454453
codec_chain : SupportsChunkCodec
455454
The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] used to
456-
decode the chunk.
455+
decode the chunk. Must carry an ``array_spec`` attribute.
457456
458457
Returns
459458
-------
460459
NDBuffer or None
461460
The decoded chunk data at *chunk_selection*, or ``None`` if the
462461
chunk does not exist in the store.
463462
"""
464-
raw = await byte_getter.get(prototype=chunk_spec.prototype)
463+
raw = await byte_getter.get(prototype=codec_chain.array_spec.prototype)
465464
if raw is None:
466465
return None
467466
chunk_array = codec_chain.decode_chunk(raw)
@@ -470,7 +469,7 @@ async def prepare_read(
470469
async def prepare_write(
471470
self,
472471
byte_setter: Any,
473-
chunk_spec: ArraySpec,
472+
codec_chain: SupportsChunkCodec,
474473
chunk_selection: SelectorTuple,
475474
out_selection: SelectorTuple,
476475
replace: bool,
@@ -483,8 +482,9 @@ async def prepare_write(
483482
byte_setter : Any
484483
An object supporting ``get`` and ``set`` (e.g.
485484
[`StorePath`][zarr.storage._common.StorePath]).
486-
chunk_spec : ArraySpec
487-
The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk.
485+
codec_chain : SupportsChunkCodec
486+
The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec]
487+
carrying the ``array_spec`` for the chunk.
488488
chunk_selection : SelectorTuple
489489
Selection within the chunk being written.
490490
out_selection : SelectorTuple
@@ -500,6 +500,7 @@ async def prepare_write(
500500
A [`PreparedWrite`][zarr.abc.codec.PreparedWrite] carrying the
501501
deserialized chunk data and selection metadata.
502502
"""
503+
chunk_spec = codec_chain.array_spec
503504
existing: Buffer | None = None
504505
if not replace:
505506
existing = await byte_setter.get(prototype=chunk_spec.prototype)
@@ -519,7 +520,7 @@ async def prepare_write(
519520
async def finalize_write(
520521
self,
521522
prepared: PreparedWrite,
522-
chunk_spec: ArraySpec,
523+
codec_chain: SupportsChunkCodec,
523524
byte_setter: Any,
524525
) -> None:
525526
"""Async variant of
@@ -530,13 +531,14 @@ async def finalize_write(
530531
prepared : PreparedWrite
531532
The [`PreparedWrite`][zarr.abc.codec.PreparedWrite] returned by
532533
[`prepare_write`][zarr.abc.codec.ArrayBytesCodec.prepare_write].
533-
chunk_spec : ArraySpec
534-
The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk.
534+
codec_chain : SupportsChunkCodec
535+
The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec]
536+
carrying the ``array_spec`` for the chunk.
535537
byte_setter : Any
536538
An object supporting ``set`` and ``delete`` (e.g.
537539
[`StorePath`][zarr.storage._common.StorePath]).
538540
"""
539-
blob = self.serialize(prepared.chunk_dict, chunk_spec)
541+
blob = self.serialize(prepared.chunk_dict, codec_chain.array_spec)
540542
if blob is None:
541543
await byte_setter.delete()
542544
else:

src/zarr/core/codec_pipeline.py

Lines changed: 36 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -69,49 +69,55 @@ def fill_value_or_default(chunk_spec: ArraySpec) -> Any:
6969
return fill_value
7070

7171

72-
@dataclass(frozen=True, slots=True)
73-
class CodecChain:
74-
"""Codec chain with pre-resolved metadata specs.
72+
@dataclass(slots=True, kw_only=True)
73+
class ChunkTransform:
74+
"""A stored chunk, modeled as a layered array.
7575
76-
Constructed from an iterable of codecs and a chunk ArraySpec.
77-
Resolves each codec against the spec so that encode/decode can
78-
run without re-resolving.
76+
Each layer corresponds to one ArrayArrayCodec and the ArraySpec
77+
at its input boundary. ``layers[0]`` is the outermost (user-visible)
78+
transform; after the last layer comes the ArrayBytesCodec.
79+
80+
The chunk's ``shape`` and ``dtype`` reflect the representation
81+
**after** all ArrayArrayCodec layers have been applied — i.e. the
82+
spec that feeds the ArrayBytesCodec.
7983
"""
8084

8185
codecs: tuple[Codec, ...]
82-
chunk_spec: ArraySpec
86+
array_spec: ArraySpec
8387

84-
_aa_codecs: tuple[ArrayArrayCodec, ...] = field(init=False, repr=False, compare=False)
85-
_aa_specs: tuple[ArraySpec, ...] = field(init=False, repr=False, compare=False)
88+
# Each element is (ArrayArrayCodec, input_spec_for_that_codec).
89+
layers: tuple[tuple[ArrayArrayCodec, ArraySpec], ...] = field(
90+
init=False, repr=False, compare=False
91+
)
8692
_ab_codec: ArrayBytesCodec = field(init=False, repr=False, compare=False)
8793
_ab_spec: ArraySpec = field(init=False, repr=False, compare=False)
8894
_bb_codecs: tuple[BytesBytesCodec, ...] = field(init=False, repr=False, compare=False)
89-
_bb_spec: ArraySpec = field(init=False, repr=False, compare=False)
9095
_all_sync: bool = field(init=False, repr=False, compare=False)
9196

9297
def __post_init__(self) -> None:
9398
aa, ab, bb = codecs_from_list(list(self.codecs))
9499

95-
aa_specs: list[ArraySpec] = []
96-
spec = self.chunk_spec
100+
layers: tuple[tuple[ArrayArrayCodec, ArraySpec], ...] = ()
101+
spec = self.array_spec
97102
for aa_codec in aa:
98-
aa_specs.append(spec)
103+
layers = (*layers, (aa_codec, spec))
99104
spec = aa_codec.resolve_metadata(spec)
100105

101-
object.__setattr__(self, "_aa_codecs", aa)
102-
object.__setattr__(self, "_aa_specs", tuple(aa_specs))
103-
object.__setattr__(self, "_ab_codec", ab)
104-
object.__setattr__(self, "_ab_spec", spec)
106+
self.layers = layers
107+
self._ab_codec = ab
108+
self._ab_spec = spec
109+
self._bb_codecs = bb
110+
self._all_sync = all(isinstance(c, SupportsSyncCodec) for c in self.codecs)
105111

106-
spec = ab.resolve_metadata(spec)
107-
object.__setattr__(self, "_bb_codecs", bb)
108-
object.__setattr__(self, "_bb_spec", spec)
112+
@property
113+
def shape(self) -> tuple[int, ...]:
114+
"""Shape after all ArrayArrayCodec layers (input to the ArrayBytesCodec)."""
115+
return self._ab_spec.shape
109116

110-
object.__setattr__(
111-
self,
112-
"_all_sync",
113-
all(isinstance(c, SupportsSyncCodec) for c in self.codecs),
114-
)
117+
@property
118+
def dtype(self) -> ZDType[TBaseDType, TBaseScalar]:
119+
"""Dtype after all ArrayArrayCodec layers (input to the ArrayBytesCodec)."""
120+
return self._ab_spec.dtype
115121

116122
@property
117123
def all_sync(self) -> bool:
@@ -127,11 +133,11 @@ def decode_chunk(
127133
"""
128134
bb_out: Any = chunk_bytes
129135
for bb_codec in reversed(self._bb_codecs):
130-
bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, self._bb_spec)
136+
bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, self._ab_spec)
131137

132138
ab_out: Any = cast("SupportsSyncCodec", self._ab_codec)._decode_sync(bb_out, self._ab_spec)
133139

134-
for aa_codec, spec in zip(reversed(self._aa_codecs), reversed(self._aa_specs), strict=True):
140+
for aa_codec, spec in reversed(self.layers):
135141
ab_out = cast("SupportsSyncCodec", aa_codec)._decode_sync(ab_out, spec)
136142

137143
return ab_out # type: ignore[no-any-return]
@@ -146,7 +152,7 @@ def encode_chunk(
146152
"""
147153
aa_out: Any = chunk_array
148154

149-
for aa_codec, spec in zip(self._aa_codecs, self._aa_specs, strict=True):
155+
for aa_codec, spec in self.layers:
150156
if aa_out is None:
151157
return None
152158
aa_out = cast("SupportsSyncCodec", aa_codec)._encode_sync(aa_out, spec)
@@ -158,7 +164,7 @@ def encode_chunk(
158164
for bb_codec in self._bb_codecs:
159165
if bb_out is None:
160166
return None
161-
bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, self._bb_spec)
167+
bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, self._ab_spec)
162168

163169
return bb_out # type: ignore[no-any-return]
164170

@@ -369,7 +375,7 @@ async def read_batch(
369375
out[out_selection] = fill_value_or_default(chunk_spec)
370376
else:
371377
chunk_bytes_batch = await concurrent_map(
372-
[(byte_getter, array_spec.prototype) for byte_getter, array_spec, *_ in batch_info],
378+
[(byte_getter, chunk_spec.prototype) for byte_getter, chunk_spec, *_ in batch_info],
373379
lambda byte_getter, prototype: byte_getter.get(prototype),
374380
config.get("async.concurrency"),
375381
)

0 commit comments

Comments
 (0)