Skip to content

Commit 4305dc0

Browse files
committed
add prepared write semantics
1 parent c2131e0 commit 4305dc0

File tree

1 file changed

+228
-39
lines changed

1 file changed

+228
-39
lines changed

src/zarr/abc/codec.py

Lines changed: 228 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
"CodecOutput",
3535
"CodecPipeline",
3636
"PreparedWrite",
37+
"SupportsChunkCodec",
3738
"SupportsSyncCodec",
3839
]
3940

@@ -79,6 +80,17 @@ def _encode_sync(
7980
) -> NDBuffer | Buffer | None: ...
8081

8182

83+
class SupportsChunkCodec(Protocol):
84+
"""Protocol for objects that can decode/encode whole chunks synchronously.
85+
86+
[`CodecChain`][zarr.core.codec_pipeline.CodecChain] satisfies this protocol.
87+
"""
88+
89+
def decode_chunk(self, chunk_bytes: Buffer) -> NDBuffer: ...
90+
91+
def encode_chunk(self, chunk_array: NDBuffer) -> Buffer | None: ...
92+
93+
8294
class BaseCodec(Metadata, Generic[CodecInput, CodecOutput]):
8395
"""Generic base class for codecs.
8496
@@ -208,10 +220,37 @@ class ArrayArrayCodec(BaseCodec[NDBuffer, NDBuffer]):
208220

209221
@dataclass
210222
class PreparedWrite:
211-
"""Result of ``prepare_write``: existing encoded chunk bytes + selection info."""
223+
"""Result of the prepare phase of a write operation.
224+
225+
Carries deserialized chunk data and selection metadata between
226+
[`prepare_write`][zarr.abc.codec.ArrayBytesCodec.prepare_write] (or
227+
[`prepare_write_sync`][zarr.abc.codec.ArrayBytesCodec.prepare_write_sync])
228+
and [`finalize_write`][zarr.abc.codec.ArrayBytesCodec.finalize_write] (or
229+
[`finalize_write_sync`][zarr.abc.codec.ArrayBytesCodec.finalize_write_sync]).
230+
231+
Attributes
232+
----------
233+
chunk_dict : dict[tuple[int, ...], Buffer | None]
234+
Per-inner-chunk buffers keyed by chunk coordinates.
235+
inner_codec_chain : SupportsChunkCodec
236+
The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] for
237+
decoding/encoding inner chunks.
238+
inner_chunk_spec : ArraySpec
239+
The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for inner chunks.
240+
indexer : list[ChunkProjection]
241+
Mapping from inner-chunk coordinates to value/output selections.
242+
value_selection : SelectorTuple | None
243+
Outer ``out_selection`` for sharding. Unused by the base implementation.
244+
write_full_shard : bool
245+
Whether the full shard blob will be written. Unused by the base implementation.
246+
is_complete_shard : bool
247+
Fast-path flag for complete shard writes. Unused by the base implementation.
248+
shard_data : NDBuffer | None
249+
Full shard value for complete writes. Unused by the base implementation.
250+
"""
212251

213252
chunk_dict: dict[tuple[int, ...], Buffer | None]
214-
inner_codec_chain: Any # CodecChain — typed as Any to avoid circular import
253+
inner_codec_chain: SupportsChunkCodec
215254
inner_chunk_spec: ArraySpec
216255
indexer: list[ChunkProjection]
217256
value_selection: SelectorTuple | None = None
@@ -224,11 +263,18 @@ class ArrayBytesCodec(BaseCodec[NDBuffer, Buffer]):
224263
"""Base class for array-to-bytes codecs."""
225264

226265
@property
227-
def inner_codec_chain(self) -> Any:
266+
def inner_codec_chain(self) -> SupportsChunkCodec | None:
228267
"""The codec chain for decoding inner chunks after deserialization.
229268
230-
Returns ``None`` by default — the pipeline should use its own codec chain.
231-
``ShardingCodec`` overrides to return its inner codec chain.
269+
Returns ``None`` by default, meaning the pipeline should use its own
270+
codec chain. ``ShardingCodec`` overrides this to return its inner
271+
codec chain.
272+
273+
Returns
274+
-------
275+
SupportsChunkCodec or None
276+
A [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] instance,
277+
or ``None``.
232278
"""
233279
return None
234280

@@ -237,9 +283,22 @@ def deserialize(
237283
) -> dict[tuple[int, ...], Buffer | None]:
238284
"""Unpack stored bytes into per-inner-chunk buffers.
239285
240-
Default: single chunk keyed at ``(0,)``.
241-
``ShardingCodec`` overrides to decode the shard index and slice the
242-
blob into per-chunk buffers.
286+
The default implementation returns a single-entry dict keyed at
287+
``(0,)``. ``ShardingCodec`` overrides this to decode the shard index
288+
and split the blob into per-chunk buffers.
289+
290+
Parameters
291+
----------
292+
raw : Buffer or None
293+
The raw bytes read from the store, or ``None`` if the key was
294+
absent.
295+
chunk_spec : ArraySpec
296+
The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk.
297+
298+
Returns
299+
-------
300+
dict[tuple[int, ...], Buffer | None]
301+
Mapping from inner-chunk coordinates to their encoded bytes.
243302
"""
244303
return {(0,): raw}
245304

@@ -250,9 +309,22 @@ def serialize(
250309
) -> Buffer | None:
251310
"""Pack per-inner-chunk buffers into a storage blob.
252311
253-
Default: return the single chunk's bytes (or ``None`` if absent).
254-
``ShardingCodec`` overrides to concatenate chunks and build an index.
255-
Returns ``None`` when all chunks are empty (caller should delete the key).
312+
The default implementation returns the single entry at ``(0,)``.
313+
``ShardingCodec`` overrides this to concatenate chunks and build a
314+
shard index.
315+
316+
Parameters
317+
----------
318+
chunk_dict : dict[tuple[int, ...], Buffer | None]
319+
Mapping from inner-chunk coordinates to their encoded bytes.
320+
chunk_spec : ArraySpec
321+
The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk.
322+
323+
Returns
324+
-------
325+
Buffer or None
326+
The serialized blob, or ``None`` when all chunks are empty
327+
(the caller should delete the key).
256328
"""
257329
return chunk_dict.get((0,))
258330

@@ -265,19 +337,35 @@ def prepare_read_sync(
265337
byte_getter: Any,
266338
chunk_spec: ArraySpec,
267339
chunk_selection: SelectorTuple,
268-
codec_chain: Any,
269-
aa_chain: Any,
270-
ab_pair: Any,
271-
bb_chain: Any,
340+
codec_chain: SupportsChunkCodec,
272341
) -> NDBuffer | None:
273-
"""Sync IO + full decode for the selected region."""
342+
"""Read a chunk from the store synchronously, decode it, and
343+
return the selected region.
344+
345+
Parameters
346+
----------
347+
byte_getter : Any
348+
An object supporting ``get_sync`` (e.g.
349+
[`StorePath`][zarr.storage._common.StorePath]).
350+
chunk_spec : ArraySpec
351+
The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk.
352+
chunk_selection : SelectorTuple
353+
Selection within the decoded chunk array.
354+
codec_chain : SupportsChunkCodec
355+
The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] used to
356+
decode the chunk.
357+
358+
Returns
359+
-------
360+
NDBuffer or None
361+
The decoded chunk data at *chunk_selection*, or ``None`` if the
362+
chunk does not exist in the store.
363+
"""
274364
raw = byte_getter.get_sync(prototype=chunk_spec.prototype)
275-
chunk_array: NDBuffer | None = codec_chain.decode_chunk(
276-
raw, chunk_spec, aa_chain, ab_pair, bb_chain
277-
)
278-
if chunk_array is not None:
279-
return chunk_array[chunk_selection]
280-
return None
365+
if raw is None:
366+
return None
367+
chunk_array = codec_chain.decode_chunk(raw)
368+
return chunk_array[chunk_selection]
281369

282370
def prepare_write_sync(
283371
self,
@@ -286,9 +374,39 @@ def prepare_write_sync(
286374
chunk_selection: SelectorTuple,
287375
out_selection: SelectorTuple,
288376
replace: bool,
289-
codec_chain: Any,
377+
codec_chain: SupportsChunkCodec,
290378
) -> PreparedWrite:
291-
"""Sync IO + deserialize. Returns a :class:`PreparedWrite`."""
379+
"""Prepare a synchronous write by optionally reading existing data.
380+
381+
When *replace* is ``False``, the existing chunk bytes are fetched
382+
from the store so they can be merged with the new data. When
383+
*replace* is ``True``, the fetch is skipped.
384+
385+
Parameters
386+
----------
387+
byte_setter : Any
388+
An object supporting ``get_sync`` and ``set_sync`` (e.g.
389+
[`StorePath`][zarr.storage._common.StorePath]).
390+
chunk_spec : ArraySpec
391+
The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk.
392+
chunk_selection : SelectorTuple
393+
Selection within the chunk being written.
394+
out_selection : SelectorTuple
395+
Corresponding selection within the source value array.
396+
replace : bool
397+
If ``True``, the write replaces all data in the chunk and no
398+
read-modify-write is needed. If ``False``, existing data is
399+
fetched first.
400+
codec_chain : SupportsChunkCodec
401+
The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] used to
402+
decode/encode the chunk.
403+
404+
Returns
405+
-------
406+
PreparedWrite
407+
A [`PreparedWrite`][zarr.abc.codec.PreparedWrite] carrying the
408+
deserialized chunk data and selection metadata.
409+
"""
292410
existing: Buffer | None = None
293411
if not replace:
294412
existing = byte_setter.get_sync(prototype=chunk_spec.prototype)
@@ -314,7 +432,22 @@ def finalize_write_sync(
314432
chunk_spec: ArraySpec,
315433
byte_setter: Any,
316434
) -> None:
317-
"""Serialize the prepared *chunk_dict* and write to store."""
435+
"""Serialize the prepared chunk data and write it to the store.
436+
437+
If serialization produces ``None`` (all chunks empty), the key is
438+
deleted instead.
439+
440+
Parameters
441+
----------
442+
prepared : PreparedWrite
443+
The [`PreparedWrite`][zarr.abc.codec.PreparedWrite] returned by
444+
[`prepare_write_sync`][zarr.abc.codec.ArrayBytesCodec.prepare_write_sync].
445+
chunk_spec : ArraySpec
446+
The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk.
447+
byte_setter : Any
448+
An object supporting ``set_sync`` and ``delete_sync`` (e.g.
449+
[`StorePath`][zarr.storage._common.StorePath]).
450+
"""
318451
blob = self.serialize(prepared.chunk_dict, chunk_spec)
319452
if blob is None:
320453
byte_setter.delete_sync()
@@ -330,19 +463,35 @@ async def prepare_read(
330463
byte_getter: Any,
331464
chunk_spec: ArraySpec,
332465
chunk_selection: SelectorTuple,
333-
codec_chain: Any,
334-
aa_chain: Any,
335-
ab_pair: Any,
336-
bb_chain: Any,
466+
codec_chain: SupportsChunkCodec,
337467
) -> NDBuffer | None:
338-
"""Async IO + full decode for the selected region."""
468+
"""Async variant of
469+
[`prepare_read_sync`][zarr.abc.codec.ArrayBytesCodec.prepare_read_sync].
470+
471+
Parameters
472+
----------
473+
byte_getter : Any
474+
An object supporting ``get`` (e.g.
475+
[`StorePath`][zarr.storage._common.StorePath]).
476+
chunk_spec : ArraySpec
477+
The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk.
478+
chunk_selection : SelectorTuple
479+
Selection within the decoded chunk array.
480+
codec_chain : SupportsChunkCodec
481+
The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] used to
482+
decode the chunk.
483+
484+
Returns
485+
-------
486+
NDBuffer or None
487+
The decoded chunk data at *chunk_selection*, or ``None`` if the
488+
chunk does not exist in the store.
489+
"""
339490
raw = await byte_getter.get(prototype=chunk_spec.prototype)
340-
chunk_array: NDBuffer | None = codec_chain.decode_chunk(
341-
raw, chunk_spec, aa_chain, ab_pair, bb_chain
342-
)
343-
if chunk_array is not None:
344-
return chunk_array[chunk_selection]
345-
return None
491+
if raw is None:
492+
return None
493+
chunk_array = codec_chain.decode_chunk(raw)
494+
return chunk_array[chunk_selection]
346495

347496
async def prepare_write(
348497
self,
@@ -351,9 +500,36 @@ async def prepare_write(
351500
chunk_selection: SelectorTuple,
352501
out_selection: SelectorTuple,
353502
replace: bool,
354-
codec_chain: Any,
503+
codec_chain: SupportsChunkCodec,
355504
) -> PreparedWrite:
356-
"""Async IO + deserialize. Returns a :class:`PreparedWrite`."""
505+
"""Async variant of
506+
[`prepare_write_sync`][zarr.abc.codec.ArrayBytesCodec.prepare_write_sync].
507+
508+
Parameters
509+
----------
510+
byte_setter : Any
511+
An object supporting ``get`` and ``set`` (e.g.
512+
[`StorePath`][zarr.storage._common.StorePath]).
513+
chunk_spec : ArraySpec
514+
The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk.
515+
chunk_selection : SelectorTuple
516+
Selection within the chunk being written.
517+
out_selection : SelectorTuple
518+
Corresponding selection within the source value array.
519+
replace : bool
520+
If ``True``, the write replaces all data in the chunk and no
521+
read-modify-write is needed. If ``False``, existing data is
522+
fetched first.
523+
codec_chain : SupportsChunkCodec
524+
The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] used to
525+
decode/encode the chunk.
526+
527+
Returns
528+
-------
529+
PreparedWrite
530+
A [`PreparedWrite`][zarr.abc.codec.PreparedWrite] carrying the
531+
deserialized chunk data and selection metadata.
532+
"""
357533
existing: Buffer | None = None
358534
if not replace:
359535
existing = await byte_setter.get(prototype=chunk_spec.prototype)
@@ -379,7 +555,20 @@ async def finalize_write(
379555
chunk_spec: ArraySpec,
380556
byte_setter: Any,
381557
) -> None:
382-
"""Async version of :meth:`finalize_write_sync`."""
558+
"""Async variant of
559+
[`finalize_write_sync`][zarr.abc.codec.ArrayBytesCodec.finalize_write_sync].
560+
561+
Parameters
562+
----------
563+
prepared : PreparedWrite
564+
The [`PreparedWrite`][zarr.abc.codec.PreparedWrite] returned by
565+
[`prepare_write`][zarr.abc.codec.ArrayBytesCodec.prepare_write].
566+
chunk_spec : ArraySpec
567+
The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk.
568+
byte_setter : Any
569+
An object supporting ``set`` and ``delete`` (e.g.
570+
[`StorePath`][zarr.storage._common.StorePath]).
571+
"""
383572
blob = self.serialize(prepared.chunk_dict, chunk_spec)
384573
if blob is None:
385574
await byte_setter.delete()

0 commit comments

Comments
 (0)