Skip to content

Commit 3c27e49

Browse files
committed
feat: complete second codecpipeline
1 parent 47a407f commit 3c27e49

7 files changed

Lines changed: 682 additions & 161 deletions

File tree

src/zarr/abc/codec.py

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
"GetResult",
3737
"PreparedWrite",
3838
"SupportsChunkCodec",
39-
"SupportsChunkPacking",
39+
"SupportsChunkMapping",
4040
"SupportsSyncCodec",
4141
]
4242

@@ -100,21 +100,26 @@ def encode_chunk(self, chunk_array: NDBuffer) -> Buffer | None: ...
100100

101101

102102
@runtime_checkable
103-
class SupportsChunkPacking(Protocol):
104-
"""Protocol for codecs that can pack/unpack inner chunks into a storage blob
105-
and manage the prepare/finalize IO lifecycle.
106-
107-
`BytesCodec` and `ShardingCodec` implement this protocol. The pipeline
108-
uses it to separate IO (prepare/finalize) from compute (encode/decode),
109-
enabling the compute phase to run in a thread pool.
110-
111-
The lifecycle is:
112-
113-
1. **Prepare**: fetch existing bytes from the store (if partial write),
114-
unpack into per-inner-chunk buffers → `PreparedWrite`
115-
2. **Compute**: iterate `PreparedWrite.indexer`, decode each inner chunk,
116-
merge new data, re-encode, update `PreparedWrite.chunk_dict`
117-
3. **Finalize**: pack `chunk_dict` back into a blob and write to store
103+
class SupportsChunkMapping(Protocol):
104+
"""Protocol for codecs that expose their stored data as a mapping
105+
from chunk coordinates to encoded buffers.
106+
107+
A single store key holds a blob. This protocol defines how to
108+
interpret that blob as a ``dict[tuple[int, ...], Buffer | None]`` —
109+
a mapping from inner-chunk coordinates to their encoded bytes.
110+
111+
For a non-sharded codec (``BytesCodec``), the mapping is trivial:
112+
one entry at ``(0,)`` containing the entire blob. For a sharded
113+
codec, the mapping has one entry per inner chunk, derived from the
114+
shard index embedded in the blob. The pipeline doesn't need to know
115+
which case it's dealing with — it operates on the mapping uniformly.
116+
117+
This abstraction enables the three-phase IO/compute/IO pattern:
118+
119+
1. **IO**: fetch the blob from the store.
120+
2. **Compute**: unpack the blob into the chunk mapping, decode/merge/
121+
re-encode entries, pack back into a blob. All pure compute.
122+
3. **IO**: write the blob to the store.
118123
"""
119124

120125
@property

src/zarr/codecs/bytes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ async def _encode_single(
127127
def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
128128
return input_byte_length
129129

130-
# -- SupportsChunkPacking --
130+
# -- SupportsChunkMapping --
131131

132132
@property
133133
def inner_codec_chain(self) -> SupportsChunkCodec | None:

src/zarr/codecs/sharding.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
numpy_buffer_prototype,
3636
)
3737
from zarr.core.chunk_grids import ChunkGrid, RegularChunkGrid
38+
from zarr.core.codec_pipeline import ChunkTransform
3839
from zarr.core.common import (
3940
ShapeLike,
4041
parse_enum,
@@ -423,10 +424,8 @@ def _get_inner_chunk_transform(self, shard_spec: ArraySpec) -> Any:
423424
evolved = tuple(c.evolve_from_array_spec(array_spec=chunk_spec) for c in self.codecs)
424425
return ChunkTransform(codecs=evolved, array_spec=chunk_spec)
425426

426-
def _get_index_chunk_transform(self, chunks_per_shard: tuple[int, ...]) -> Any:
427+
def _get_index_chunk_transform(self, chunks_per_shard: tuple[int, ...]) -> ChunkTransform:
427428
"""Build a ChunkTransform for index codecs."""
428-
from zarr.core.codec_pipeline import ChunkTransform
429-
430429
index_spec = self._get_index_chunk_spec(chunks_per_shard)
431430
evolved = tuple(c.evolve_from_array_spec(array_spec=index_spec) for c in self.index_codecs)
432431
return ChunkTransform(codecs=evolved, array_spec=index_spec)
@@ -523,7 +522,7 @@ def _encode_sync(
523522
morton_order_iter(chunks_per_shard)
524523
)
525524

526-
for chunk_coords, chunk_selection, out_selection, _ in indexer:
525+
for chunk_coords, _chunk_selection, out_selection, _ in indexer:
527526
chunk_array = shard_array[out_selection]
528527
encoded = inner_transform.encode_chunk(chunk_array)
529528
shard_builder[chunk_coords] = encoded

src/zarr/core/array.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,17 @@ def create_codec_pipeline(metadata: ArrayMetadata, *, store: Store | None = None
205205
pass
206206

207207
if isinstance(metadata, ArrayV3Metadata):
208-
return get_pipeline_class().from_codecs(metadata.codecs)
208+
pipeline = get_pipeline_class().from_codecs(metadata.codecs)
209+
# PhasedCodecPipeline needs evolve_from_array_spec to build its
210+
# ChunkTransform and ShardLayout. BatchedCodecPipeline does not.
211+
if hasattr(pipeline, "chunk_transform") and pipeline.chunk_transform is None:
212+
chunk_spec = metadata.get_chunk_spec(
213+
(0,) * len(metadata.shape),
214+
ArrayConfig.from_dict({}),
215+
default_buffer_prototype(),
216+
)
217+
pipeline = pipeline.evolve_from_array_spec(chunk_spec)
218+
return pipeline
209219
elif isinstance(metadata, ArrayV2Metadata):
210220
v2_codec = V2Codec(filters=metadata.filters, compressor=metadata.compressor)
211221
return get_pipeline_class().from_codecs([v2_codec])

0 commit comments

Comments
 (0)