Skip to content

Commit 03355b8

Browse files
d-v-bclaudedcherian
authored
perf/store sync (#3725)
* add sync methods to codecs * add CodecChain dataclass and sync codec tests Introduces CodecChain, a frozen dataclass that chains array-array, array-bytes, and bytes-bytes codecs with synchronous encode/decode methods. Pure compute only -- no IO, no threading, no batching. Also adds sync roundtrip tests for individual codecs (blosc, gzip, zstd, crc32c, bytes, transpose, vlen) and CodecChain integration tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * refactor codecchain * separate codecs and specs * add synchronous methods to stores * fix merge error * guard storepath methods that rely on underlying sync impl * scrub out set-range logic * remove is-zstd-fixed-size test * remove codecchain * revert delete comment --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Deepak Cherian <dcherian@users.noreply.github.com>
1 parent 46654ad commit 03355b8

File tree

13 files changed

+436
-7
lines changed

13 files changed

+436
-7
lines changed

src/zarr/abc/store.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,16 @@
1616

1717
from zarr.core.buffer import Buffer, BufferPrototype
1818

19-
__all__ = ["ByteGetter", "ByteSetter", "Store", "set_or_delete"]
19+
__all__ = [
20+
"ByteGetter",
21+
"ByteSetter",
22+
"Store",
23+
"SupportsDeleteSync",
24+
"SupportsGetSync",
25+
"SupportsSetSync",
26+
"SupportsSyncStore",
27+
"set_or_delete",
28+
]
2029

2130

2231
@dataclass(frozen=True, slots=True)
@@ -700,6 +709,31 @@ async def delete(self) -> None: ...
700709
async def set_if_not_exists(self, default: Buffer) -> None: ...
701710

702711

712+
@runtime_checkable
713+
class SupportsGetSync(Protocol):
714+
def get_sync(
715+
self,
716+
key: str,
717+
*,
718+
prototype: BufferPrototype | None = None,
719+
byte_range: ByteRequest | None = None,
720+
) -> Buffer | None: ...
721+
722+
723+
@runtime_checkable
724+
class SupportsSetSync(Protocol):
725+
def set_sync(self, key: str, value: Buffer) -> None: ...
726+
727+
728+
@runtime_checkable
729+
class SupportsDeleteSync(Protocol):
730+
def delete_sync(self, key: str) -> None: ...
731+
732+
733+
@runtime_checkable
734+
class SupportsSyncStore(SupportsGetSync, SupportsSetSync, SupportsDeleteSync, Protocol): ...
735+
736+
703737
async def set_or_delete(byte_setter: ByteSetter, value: Buffer | None) -> None:
704738
"""Set or delete a value in a byte setter
705739

src/zarr/storage/_common.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,13 @@
55
from pathlib import Path
66
from typing import TYPE_CHECKING, Any, Literal, Self, TypeAlias
77

8-
from zarr.abc.store import ByteRequest, Store
8+
from zarr.abc.store import (
9+
ByteRequest,
10+
Store,
11+
SupportsDeleteSync,
12+
SupportsGetSync,
13+
SupportsSetSync,
14+
)
915
from zarr.core.buffer import Buffer, default_buffer_prototype
1016
from zarr.core.common import (
1117
ANY_ACCESS_MODE,
@@ -228,6 +234,37 @@ async def is_empty(self) -> bool:
228234
"""
229235
return await self.store.is_empty(self.path)
230236

237+
# -------------------------------------------------------------------
238+
# Synchronous IO delegation
239+
# -------------------------------------------------------------------
240+
241+
def get_sync(
242+
self,
243+
*,
244+
prototype: BufferPrototype | None = None,
245+
byte_range: ByteRequest | None = None,
246+
) -> Buffer | None:
247+
"""Synchronous read — delegates to ``self.store.get_sync(self.path, ...)``."""
248+
if not isinstance(self.store, SupportsGetSync):
249+
raise TypeError(f"Store {type(self.store).__name__} does not support synchronous get.")
250+
if prototype is None:
251+
prototype = default_buffer_prototype()
252+
return self.store.get_sync(self.path, prototype=prototype, byte_range=byte_range)
253+
254+
def set_sync(self, value: Buffer) -> None:
255+
"""Synchronous write — delegates to ``self.store.set_sync(self.path, value)``."""
256+
if not isinstance(self.store, SupportsSetSync):
257+
raise TypeError(f"Store {type(self.store).__name__} does not support synchronous set.")
258+
self.store.set_sync(self.path, value)
259+
260+
def delete_sync(self) -> None:
261+
"""Synchronous delete — delegates to ``self.store.delete_sync(self.path)``."""
262+
if not isinstance(self.store, SupportsDeleteSync):
263+
raise TypeError(
264+
f"Store {type(self.store).__name__} does not support synchronous delete."
265+
)
266+
self.store.delete_sync(self.path)
267+
231268
def __truediv__(self, other: str) -> StorePath:
232269
"""Combine this store path with another path"""
233270
return self.__class__(self.store, _dereference_path(self.path, other))

src/zarr/storage/_local.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,56 @@ def __repr__(self) -> str:
187187
def __eq__(self, other: object) -> bool:
188188
return isinstance(other, type(self)) and self.root == other.root
189189

190+
# -------------------------------------------------------------------
191+
# Synchronous store methods
192+
# -------------------------------------------------------------------
193+
194+
def _ensure_open_sync(self) -> None:
195+
if not self._is_open:
196+
if not self.read_only:
197+
self.root.mkdir(parents=True, exist_ok=True)
198+
if not self.root.exists():
199+
raise FileNotFoundError(f"{self.root} does not exist")
200+
self._is_open = True
201+
202+
def get_sync(
203+
self,
204+
key: str,
205+
*,
206+
prototype: BufferPrototype | None = None,
207+
byte_range: ByteRequest | None = None,
208+
) -> Buffer | None:
209+
if prototype is None:
210+
prototype = default_buffer_prototype()
211+
self._ensure_open_sync()
212+
assert isinstance(key, str)
213+
path = self.root / key
214+
try:
215+
return _get(path, prototype, byte_range)
216+
except (FileNotFoundError, IsADirectoryError, NotADirectoryError):
217+
return None
218+
219+
def set_sync(self, key: str, value: Buffer) -> None:
220+
self._ensure_open_sync()
221+
self._check_writable()
222+
assert isinstance(key, str)
223+
if not isinstance(value, Buffer):
224+
raise TypeError(
225+
f"LocalStore.set(): `value` must be a Buffer instance. "
226+
f"Got an instance of {type(value)} instead."
227+
)
228+
path = self.root / key
229+
_put(path, value)
230+
231+
def delete_sync(self, key: str) -> None:
232+
self._ensure_open_sync()
233+
self._check_writable()
234+
path = self.root / key
235+
if path.is_dir():
236+
shutil.rmtree(path)
237+
else:
238+
path.unlink(missing_ok=True)
239+
190240
async def get(
191241
self,
192242
key: str,

src/zarr/storage/_memory.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,49 @@ def __eq__(self, other: object) -> bool:
7777
and self.read_only == other.read_only
7878
)
7979

80+
# -------------------------------------------------------------------
81+
# Synchronous store methods
82+
# -------------------------------------------------------------------
83+
84+
def get_sync(
85+
self,
86+
key: str,
87+
*,
88+
prototype: BufferPrototype | None = None,
89+
byte_range: ByteRequest | None = None,
90+
) -> Buffer | None:
91+
if prototype is None:
92+
prototype = default_buffer_prototype()
93+
if not self._is_open:
94+
self._is_open = True
95+
assert isinstance(key, str)
96+
try:
97+
value = self._store_dict[key]
98+
start, stop = _normalize_byte_range_index(value, byte_range)
99+
return prototype.buffer.from_buffer(value[start:stop])
100+
except KeyError:
101+
return None
102+
103+
def set_sync(self, key: str, value: Buffer) -> None:
104+
self._check_writable()
105+
if not self._is_open:
106+
self._is_open = True
107+
assert isinstance(key, str)
108+
if not isinstance(value, Buffer):
109+
raise TypeError(
110+
f"MemoryStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead."
111+
)
112+
self._store_dict[key] = value
113+
114+
def delete_sync(self, key: str) -> None:
115+
self._check_writable()
116+
if not self._is_open:
117+
self._is_open = True
118+
try:
119+
del self._store_dict[key]
120+
except KeyError:
121+
logger.debug("Key %s does not exist.", key)
122+
80123
async def get(
81124
self,
82125
key: str,
@@ -122,7 +165,6 @@ async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None
122165
raise TypeError(
123166
f"MemoryStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead."
124167
)
125-
126168
if byte_range is not None:
127169
buf = self._store_dict[key]
128170
buf[byte_range[0] : byte_range[1]] = value

src/zarr/testing/store.py

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
if TYPE_CHECKING:
1212
from typing import Any
1313

14-
from zarr.abc.store import ByteRequest
1514
from zarr.core.buffer.core import BufferPrototype
1615

1716
import pytest
@@ -22,6 +21,9 @@
2221
RangeByteRequest,
2322
Store,
2423
SuffixByteRequest,
24+
SupportsDeleteSync,
25+
SupportsGetSync,
26+
SupportsSetSync,
2527
)
2628
from zarr.core.buffer import Buffer, default_buffer_prototype
2729
from zarr.core.sync import _collect_aiterator, sync
@@ -39,6 +41,27 @@ class StoreTests(Generic[S, B]):
3941
store_cls: type[S]
4042
buffer_cls: type[B]
4143

44+
@staticmethod
45+
def _require_get_sync(store: S) -> SupportsGetSync:
46+
"""Skip unless *store* implements :class:`SupportsGetSync`."""
47+
if not isinstance(store, SupportsGetSync):
48+
pytest.skip("store does not implement SupportsGetSync")
49+
return store # type: ignore[unreachable]
50+
51+
@staticmethod
52+
def _require_set_sync(store: S) -> SupportsSetSync:
53+
"""Skip unless *store* implements :class:`SupportsSetSync`."""
54+
if not isinstance(store, SupportsSetSync):
55+
pytest.skip("store does not implement SupportsSetSync")
56+
return store # type: ignore[unreachable]
57+
58+
@staticmethod
59+
def _require_delete_sync(store: S) -> SupportsDeleteSync:
60+
"""Skip unless *store* implements :class:`SupportsDeleteSync`."""
61+
if not isinstance(store, SupportsDeleteSync):
62+
pytest.skip("store does not implement SupportsDeleteSync")
63+
return store # type: ignore[unreachable]
64+
4265
@abstractmethod
4366
async def set(self, store: S, key: str, value: Buffer) -> None:
4467
"""
@@ -579,6 +602,52 @@ def test_get_json_sync(self, store: S) -> None:
579602
sync(self.set(store, key, self.buffer_cls.from_bytes(data_bytes)))
580603
assert store._get_json_sync(key, prototype=default_buffer_prototype()) == data
581604

605+
# -------------------------------------------------------------------
606+
# Synchronous store methods (SupportsSyncStore protocol)
607+
# -------------------------------------------------------------------
608+
609+
def test_get_sync(self, store: S) -> None:
610+
getter = self._require_get_sync(store)
611+
data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04")
612+
key = "sync_get"
613+
sync(self.set(store, key, data_buf))
614+
result = getter.get_sync(key)
615+
assert result is not None
616+
assert_bytes_equal(result, data_buf)
617+
618+
def test_get_sync_missing(self, store: S) -> None:
619+
getter = self._require_get_sync(store)
620+
result = getter.get_sync("nonexistent")
621+
assert result is None
622+
623+
def test_set_sync(self, store: S) -> None:
624+
setter = self._require_set_sync(store)
625+
data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04")
626+
key = "sync_set"
627+
setter.set_sync(key, data_buf)
628+
result = sync(self.get(store, key))
629+
assert_bytes_equal(result, data_buf)
630+
631+
def test_delete_sync(self, store: S) -> None:
632+
setter = self._require_set_sync(store)
633+
deleter = self._require_delete_sync(store)
634+
getter = self._require_get_sync(store)
635+
if not store.supports_deletes:
636+
pytest.skip("store does not support deletes")
637+
data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04")
638+
key = "sync_delete"
639+
setter.set_sync(key, data_buf)
640+
deleter.delete_sync(key)
641+
result = getter.get_sync(key)
642+
assert result is None
643+
644+
def test_delete_sync_missing(self, store: S) -> None:
645+
deleter = self._require_delete_sync(store)
646+
if not store.supports_deletes:
647+
pytest.skip("store does not support deletes")
648+
# should not raise
649+
deleter.delete_sync("nonexistent_sync")
650+
582651

583652
class LatencyStore(WrapperStore[Store]):
584653
"""

tests/test_codecs/test_blosc.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66
from packaging.version import Version
77

88
import zarr
9+
from zarr.abc.codec import SupportsSyncCodec
910
from zarr.codecs import BloscCodec
1011
from zarr.codecs.blosc import BloscShuffle, Shuffle
11-
from zarr.core.array_spec import ArraySpec
12+
from zarr.core.array_spec import ArrayConfig, ArraySpec
1213
from zarr.core.buffer import default_buffer_prototype
13-
from zarr.core.dtype import UInt16
14+
from zarr.core.dtype import UInt16, get_data_type_from_native_dtype
1415
from zarr.storage import MemoryStore, StorePath
1516

1617

@@ -110,3 +111,27 @@ async def test_typesize() -> None:
110111
else:
111112
expected_size = 10216
112113
assert size == expected_size, msg
114+
115+
116+
def test_blosc_codec_supports_sync() -> None:
117+
assert isinstance(BloscCodec(), SupportsSyncCodec)
118+
119+
120+
def test_blosc_codec_sync_roundtrip() -> None:
121+
codec = BloscCodec(typesize=8)
122+
arr = np.arange(100, dtype="float64")
123+
zdtype = get_data_type_from_native_dtype(arr.dtype)
124+
spec = ArraySpec(
125+
shape=arr.shape,
126+
dtype=zdtype,
127+
fill_value=zdtype.cast_scalar(0),
128+
config=ArrayConfig(order="C", write_empty_chunks=True),
129+
prototype=default_buffer_prototype(),
130+
)
131+
buf = default_buffer_prototype().buffer.from_array_like(arr.view("B"))
132+
133+
encoded = codec._encode_sync(buf, spec)
134+
assert encoded is not None
135+
decoded = codec._decode_sync(encoded, spec)
136+
result = np.frombuffer(decoded.as_numpy_array(), dtype="float64")
137+
np.testing.assert_array_equal(arr, result)

tests/test_codecs/test_crc32c.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from __future__ import annotations
2+
3+
import numpy as np
4+
5+
from zarr.abc.codec import SupportsSyncCodec
6+
from zarr.codecs.crc32c_ import Crc32cCodec
7+
from zarr.core.array_spec import ArrayConfig, ArraySpec
8+
from zarr.core.buffer import default_buffer_prototype
9+
from zarr.core.dtype import get_data_type_from_native_dtype
10+
11+
12+
def test_crc32c_codec_supports_sync() -> None:
13+
assert isinstance(Crc32cCodec(), SupportsSyncCodec)
14+
15+
16+
def test_crc32c_codec_sync_roundtrip() -> None:
17+
codec = Crc32cCodec()
18+
arr = np.arange(100, dtype="float64")
19+
zdtype = get_data_type_from_native_dtype(arr.dtype)
20+
spec = ArraySpec(
21+
shape=arr.shape,
22+
dtype=zdtype,
23+
fill_value=zdtype.cast_scalar(0),
24+
config=ArrayConfig(order="C", write_empty_chunks=True),
25+
prototype=default_buffer_prototype(),
26+
)
27+
buf = default_buffer_prototype().buffer.from_array_like(arr.view("B"))
28+
29+
encoded = codec._encode_sync(buf, spec)
30+
assert encoded is not None
31+
decoded = codec._decode_sync(encoded, spec)
32+
result = np.frombuffer(decoded.as_numpy_array(), dtype="float64")
33+
np.testing.assert_array_equal(arr, result)

0 commit comments

Comments
 (0)