Skip to content

Commit b1b876a

Browse files
committed
use protocols for new sync behavior
1 parent 284e5e2 commit b1b876a

9 files changed

Lines changed: 242 additions & 188 deletions

File tree

src/zarr/abc/codec.py

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from abc import abstractmethod
44
from collections.abc import Mapping
5-
from typing import TYPE_CHECKING, Generic, TypeGuard, TypeVar
5+
from typing import TYPE_CHECKING, Generic, Protocol, TypeGuard, TypeVar, runtime_checkable
66

77
from typing_extensions import ReadOnly, TypedDict
88

@@ -32,6 +32,7 @@
3232
"CodecInput",
3333
"CodecOutput",
3434
"CodecPipeline",
35+
"SupportsSyncCodec",
3536
]
3637

3738
CodecInput = TypeVar("CodecInput", bound=NDBuffer | Buffer)
@@ -59,6 +60,19 @@ def _check_codecjson_v2(data: object) -> TypeGuard[CodecJSON_V2[str]]:
5960
"""The widest type of JSON-like input that could specify a codec."""
6061

6162

63+
@runtime_checkable
64+
class SupportsSyncCodec(Protocol):
65+
"""Protocol for codecs that support synchronous encode/decode."""
66+
67+
def _decode_sync(
68+
self, chunk_data: NDBuffer | Buffer, chunk_spec: ArraySpec
69+
) -> NDBuffer | Buffer: ...
70+
71+
def _encode_sync(
72+
self, chunk_data: NDBuffer | Buffer, chunk_spec: ArraySpec
73+
) -> NDBuffer | Buffer | None: ...
74+
75+
6276
class BaseCodec(Metadata, Generic[CodecInput, CodecOutput]):
6377
"""Generic base class for codecs.
6478
@@ -137,21 +151,6 @@ def validate(
137151
The array chunk grid
138152
"""
139153

140-
def _decode_sync(self, chunk_data: CodecOutput, chunk_spec: ArraySpec) -> CodecInput:
141-
"""Synchronously decode a single chunk. Override in subclasses to enable
142-
sync codec pipeline support."""
143-
raise NotImplementedError # pragma: no cover
144-
145-
def _encode_sync(self, chunk_data: CodecInput, chunk_spec: ArraySpec) -> CodecOutput | None:
146-
"""Synchronously encode a single chunk. Override in subclasses to enable
147-
sync codec pipeline support."""
148-
raise NotImplementedError # pragma: no cover
149-
150-
@property
151-
def supports_sync(self) -> bool:
152-
"""Whether this codec has synchronous encode/decode implementations."""
153-
return type(self)._decode_sync is not BaseCodec._decode_sync
154-
155154
async def _decode_single(self, chunk_data: CodecOutput, chunk_spec: ArraySpec) -> CodecInput:
156155
raise NotImplementedError # pragma: no cover
157156

@@ -491,7 +490,7 @@ def supports_sync_io(self) -> bool:
491490
"""Whether this pipeline can run read/write entirely on the calling thread.
492491
493492
True when:
494-
- All codecs support synchronous encode/decode (_decode_sync/_encode_sync)
493+
- All codecs implement ``SupportsSyncCodec``
495494
- The pipeline's read_sync/write_sync methods are implemented
496495
497496
Checked by ``Array._can_use_sync_path()`` to decide whether to bypass

src/zarr/abc/store.py

Lines changed: 26 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,14 @@
1616

1717
from zarr.core.buffer import Buffer, BufferPrototype
1818

19-
__all__ = ["ByteGetter", "ByteSetter", "Store", "set_or_delete"]
19+
__all__ = [
20+
"ByteGetter",
21+
"ByteSetter",
22+
"Store",
23+
"SyncByteGetter",
24+
"SyncByteSetter",
25+
"set_or_delete",
26+
]
2027

2128

2229
@dataclass
@@ -524,61 +531,6 @@ def supports_partial_writes(self) -> Literal[False]:
524531
"""
525532
return False
526533

527-
# -----------------------------------------------------------------------
528-
# Synchronous IO interface (opt-in)
529-
#
530-
# These methods enable the codec pipeline to bypass the event loop
531-
# entirely for store IO. The default implementations raise
532-
# NotImplementedError; stores that wrap fundamentally synchronous
533-
# operations (MemoryStore, LocalStore) override them with direct
534-
# implementations. Remote/cloud stores (FsspecStore) leave them as-is
535-
# and remain async-only.
536-
# -----------------------------------------------------------------------
537-
538-
@property
539-
def supports_sync(self) -> bool:
540-
"""Whether this store has native synchronous get/set/delete methods.
541-
542-
When True, the codec pipeline's ``read_sync`` / ``write_sync`` will
543-
call ``get_sync`` / ``set_sync`` / ``delete_sync`` directly on the
544-
calling thread, avoiding the event loop overhead of the async
545-
equivalents.
546-
547-
Subclasses that override the sync methods below should also override
548-
this property to return True.
549-
"""
550-
return False
551-
552-
def get_sync(
553-
self,
554-
key: str,
555-
prototype: BufferPrototype,
556-
byte_range: ByteRequest | None = None,
557-
) -> Buffer | None:
558-
"""Synchronous version of ``get()``.
559-
560-
Called by the codec pipeline's ``read_sync`` to fetch chunk bytes without
561-
going through the event loop. Only called when ``supports_sync`` is
562-
True, so the default ``NotImplementedError`` is never hit in practice.
563-
"""
564-
raise NotImplementedError
565-
566-
def set_sync(self, key: str, value: Buffer) -> None:
567-
"""Synchronous version of ``set()``.
568-
569-
Called by the codec pipeline's ``write_sync`` to persist encoded chunk
570-
bytes without going through the event loop.
571-
"""
572-
raise NotImplementedError
573-
574-
def delete_sync(self, key: str) -> None:
575-
"""Synchronous version of ``delete()``.
576-
577-
Called by the codec pipeline's ``write_sync`` when a chunk should be
578-
removed (e.g. an empty chunk with ``write_empty_chunks=False``).
579-
"""
580-
raise NotImplementedError
581-
582534
@property
583535
@abstractmethod
584536
def supports_listing(self) -> bool:
@@ -755,6 +707,24 @@ async def delete(self) -> None: ...
755707
async def set_if_not_exists(self, default: Buffer) -> None: ...
756708

757709

710+
@runtime_checkable
711+
class SyncByteGetter(Protocol):
712+
"""Protocol for stores that support synchronous byte reads."""
713+
714+
def get_sync(
715+
self, prototype: BufferPrototype, byte_range: ByteRequest | None = None
716+
) -> Buffer | None: ...
717+
718+
719+
@runtime_checkable
720+
class SyncByteSetter(SyncByteGetter, Protocol):
721+
"""Protocol for stores that support synchronous byte reads, writes, and deletes."""
722+
723+
def set_sync(self, value: Buffer) -> None: ...
724+
725+
def delete_sync(self) -> None: ...
726+
727+
758728
async def set_or_delete(byte_setter: ByteSetter, value: Buffer | None) -> None:
759729
"""Set or delete a value in a byte setter
760730

src/zarr/codecs/sharding.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
ArrayBytesCodecPartialEncodeMixin,
1717
Codec,
1818
CodecPipeline,
19+
SupportsSyncCodec,
1920
)
2021
from zarr.abc.store import (
2122
ByteGetter,
@@ -968,15 +969,15 @@ def _decode_shard_index_sync(
968969
spec = bb.resolve_metadata(spec)
969970

970971
# Decode: reverse bb, then ab, then reverse aa
971-
chunk_bytes: Buffer = index_bytes
972+
bb_out: Any = index_bytes
972973
for bb_codec, s in reversed(bb_with_spec):
973-
chunk_bytes = bb_codec._decode_sync(chunk_bytes, s)
974-
chunk_array: NDBuffer = ab_codec._decode_sync(chunk_bytes, ab_spec)
974+
bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, s)
975+
ab_out: Any = cast("SupportsSyncCodec", ab_codec)._decode_sync(bb_out, ab_spec)
975976
for aa_codec, s in reversed(aa_with_spec):
976-
chunk_array = aa_codec._decode_sync(chunk_array, s)
977+
ab_out = cast("SupportsSyncCodec", aa_codec)._decode_sync(ab_out, s)
977978

978-
assert chunk_array is not None
979-
return _ShardIndex(chunk_array.as_numpy_array())
979+
assert ab_out is not None
980+
return _ShardIndex(ab_out.as_numpy_array())
980981

981982
def _encode_shard_index_sync(self, index: _ShardIndex) -> Buffer:
982983
"""Encode shard index synchronously by running index codecs inline."""
@@ -986,25 +987,25 @@ def _encode_shard_index_sync(self, index: _ShardIndex) -> Buffer:
986987

987988
aa_codecs, ab_codec, bb_codecs = codecs_from_list(list(self.index_codecs))
988989

989-
aa_out: NDBuffer | None = get_ndbuffer_class().from_numpy_array(index.offsets_and_lengths)
990+
aa_out: Any = get_ndbuffer_class().from_numpy_array(index.offsets_and_lengths)
990991

991992
# Encode: aa forward, then ab, then bb forward
992993
spec = index_chunk_spec
993994
for aa_codec in aa_codecs:
994995
assert aa_out is not None
995-
aa_out = aa_codec._encode_sync(aa_out, spec)
996+
aa_out = cast("SupportsSyncCodec", aa_codec)._encode_sync(aa_out, spec)
996997
spec = aa_codec.resolve_metadata(spec)
997998
assert aa_out is not None
998-
chunk_bytes = ab_codec._encode_sync(aa_out, spec)
999+
bb_out: Any = cast("SupportsSyncCodec", ab_codec)._encode_sync(aa_out, spec)
9991000
spec = ab_codec.resolve_metadata(spec)
10001001
for bb_codec in bb_codecs:
1001-
assert chunk_bytes is not None
1002-
chunk_bytes = bb_codec._encode_sync(chunk_bytes, spec)
1002+
assert bb_out is not None
1003+
bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, spec)
10031004
spec = bb_codec.resolve_metadata(spec)
10041005

1005-
assert chunk_bytes is not None
1006-
assert isinstance(chunk_bytes, Buffer)
1007-
return chunk_bytes
1006+
assert bb_out is not None
1007+
assert isinstance(bb_out, Buffer)
1008+
return bb_out
10081009

10091010
async def _decode_shard_index(
10101011
self, index_bytes: Buffer, chunks_per_shard: tuple[int, ...]

src/zarr/core/array.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1978,12 +1978,12 @@ def _can_use_sync_path(self) -> bool:
19781978
19791979
Two conditions must hold:
19801980
1981-
1. The codec pipeline supports fully synchronous IO (all codecs in
1982-
the chain have _decode_sync/_encode_sync). This is True for
1981+
1. The codec pipeline supports fully synchronous IO (all codecs
1982+
implement ``SupportsSyncCodec``). This is True for
19831983
BatchedCodecPipeline when all codecs support sync.
19841984
1985-
2. The store supports synchronous operations (MemoryStore, LocalStore).
1986-
Remote stores like FsspecStore remain async-only.
1985+
2. The store supports synchronous operations (has ``get_sync``).
1986+
MemoryStore and LocalStore provide this; remote stores do not.
19871987
19881988
When both hold, the selection methods below call
19891989
_get_selection_sync / _set_selection_sync directly, running the
@@ -1992,10 +1992,8 @@ def _can_use_sync_path(self) -> bool:
19921992
is used automatically.
19931993
"""
19941994
pipeline = self.async_array.codec_pipeline
1995-
store_path = self.async_array.store_path
1996-
return getattr(pipeline, "supports_sync_io", False) and getattr(
1997-
store_path, "supports_sync", False
1998-
)
1995+
store = self.async_array.store_path.store
1996+
return getattr(pipeline, "supports_sync_io", False) and hasattr(store, "get_sync")
19991997

20001998
@classmethod
20011999
@deprecated("Use zarr.create_array instead.", category=ZarrDeprecationWarning)

0 commit comments

Comments
 (0)