Skip to content

Commit 6518fd1

Browse files
committed
add sync paths for codecpipeline
1 parent 2a3b404 commit 6518fd1

File tree

9 files changed

+831
-60
lines changed

9 files changed

+831
-60
lines changed

src/zarr/abc/codec.py

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from zarr.abc.store import ByteGetter, ByteSetter, Store
2020
from zarr.core.array_spec import ArraySpec
2121
from zarr.core.chunk_grids import ChunkGrid
22-
from zarr.core.codec_pipeline import ChunkRequest
22+
from zarr.core.codec_pipeline import ChunkTransform, ReadChunkRequest, WriteChunkRequest
2323
from zarr.core.dtype.wrapper import TBaseDType, TBaseScalar, ZDType
2424
from zarr.core.indexing import ChunkProjection, SelectorTuple
2525
from zarr.core.metadata import ArrayMetadata
@@ -711,6 +711,20 @@ def compute_encoded_size(self, byte_length: int, array_spec: ArraySpec) -> int:
711711
"""
712712
...
713713

714+
@abstractmethod
715+
def get_chunk_transform(self, array_spec: ArraySpec) -> ChunkTransform:
716+
"""Creates a ChunkTransform for the given array spec.
717+
718+
Parameters
719+
----------
720+
array_spec : ArraySpec
721+
722+
Returns
723+
-------
724+
ChunkTransform
725+
"""
726+
...
727+
714728
@abstractmethod
715729
async def decode(
716730
self,
@@ -752,7 +766,7 @@ async def encode(
752766
@abstractmethod
753767
async def read(
754768
self,
755-
batch_info: Iterable[ChunkRequest],
769+
batch_info: Iterable[ReadChunkRequest],
756770
out: NDBuffer,
757771
drop_axes: tuple[int, ...] = (),
758772
) -> None:
@@ -761,10 +775,10 @@ async def read(
761775
762776
Parameters
763777
----------
764-
batch_info : Iterable[ChunkRequest]
765-
Ordered set of chunk requests. Each ``ChunkRequest`` carries the
766-
store path (``byte_setter``), the ``ArraySpec`` for that chunk,
767-
chunk and output selections, and whether the chunk is complete.
778+
batch_info : Iterable[ReadChunkRequest]
779+
Ordered set of read requests. Each carries a ``byte_getter``,
780+
a ``ChunkTransform`` (codec chain + spec), and chunk/output
781+
selections.
768782
769783
If the Store returns ``None`` for a chunk, then the chunk was not
770784
written and the implementation must set the values of that chunk (or
@@ -777,7 +791,7 @@ async def read(
777791
@abstractmethod
778792
async def write(
779793
self,
780-
batch_info: Iterable[ChunkRequest],
794+
batch_info: Iterable[WriteChunkRequest],
781795
value: NDBuffer,
782796
drop_axes: tuple[int, ...] = (),
783797
) -> None:
@@ -787,14 +801,37 @@ async def write(
787801
788802
Parameters
789803
----------
790-
batch_info : Iterable[ChunkRequest]
791-
Ordered set of chunk requests. Each ``ChunkRequest`` carries the
792-
store path (``byte_setter``), the ``ArraySpec`` for that chunk,
793-
chunk and output selections, and whether the chunk is complete.
804+
batch_info : Iterable[WriteChunkRequest]
805+
Ordered set of write requests. Each carries a ``byte_setter``,
806+
a ``ChunkTransform`` (codec chain + spec), chunk/output
807+
selections, and whether the chunk is complete.
794808
value : NDBuffer
795809
"""
796810
...
797811

812+
@property
813+
def supports_sync_io(self) -> bool:
814+
"""Whether this pipeline can run read/write entirely on the calling thread."""
815+
return False
816+
817+
def read_sync(
818+
self,
819+
batch_info: Iterable[ReadChunkRequest],
820+
out: NDBuffer,
821+
drop_axes: tuple[int, ...] = (),
822+
) -> None:
823+
"""Synchronous read: fetch bytes from store, decode, scatter into *out*."""
824+
raise NotImplementedError
825+
826+
def write_sync(
827+
self,
828+
batch_info: Iterable[WriteChunkRequest],
829+
value: NDBuffer,
830+
drop_axes: tuple[int, ...] = (),
831+
) -> None:
832+
"""Synchronous write: gather from *value*, encode, persist to store."""
833+
raise NotImplementedError
834+
798835

799836
async def _batching_helper(
800837
func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]],

src/zarr/core/array.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
V2ChunkKeyEncoding,
4949
parse_chunk_key_encoding,
5050
)
51-
from zarr.core.codec_pipeline import ChunkRequest
51+
from zarr.core.codec_pipeline import ReadChunkRequest, WriteChunkRequest
5252
from zarr.core.common import (
5353
JSON,
5454
ZARR_JSON,
@@ -5603,14 +5603,15 @@ async def _get_selection(
56035603
# reading chunks and decoding them
56045604
await codec_pipeline.read(
56055605
[
5606-
ChunkRequest(
5607-
byte_setter=store_path / metadata.encode_chunk_key(chunk_coords),
5608-
chunk_spec=metadata.get_chunk_spec(chunk_coords, _config, prototype=prototype),
5606+
ReadChunkRequest(
5607+
byte_getter=store_path / metadata.encode_chunk_key(chunk_coords),
5608+
transform=codec_pipeline.get_chunk_transform(
5609+
metadata.get_chunk_spec(chunk_coords, _config, prototype=prototype)
5610+
),
56095611
chunk_selection=chunk_selection,
56105612
out_selection=out_selection,
5611-
is_complete_chunk=is_complete_chunk,
56125613
)
5613-
for chunk_coords, chunk_selection, out_selection, is_complete_chunk in indexer
5614+
for chunk_coords, chunk_selection, out_selection, _is_complete in indexer
56145615
],
56155616
out_buffer,
56165617
drop_axes=indexer.drop_axes,
@@ -5913,9 +5914,11 @@ async def _set_selection(
59135914
# merging with existing data and encoding chunks
59145915
await codec_pipeline.write(
59155916
[
5916-
ChunkRequest(
5917+
WriteChunkRequest(
59175918
byte_setter=store_path / metadata.encode_chunk_key(chunk_coords),
5918-
chunk_spec=metadata.get_chunk_spec(chunk_coords, _config, prototype),
5919+
transform=codec_pipeline.get_chunk_transform(
5920+
metadata.get_chunk_spec(chunk_coords, _config, prototype)
5921+
),
59195922
chunk_selection=chunk_selection,
59205923
out_selection=out_selection,
59215924
is_complete_chunk=is_complete_chunk,

0 commit comments

Comments
 (0)