Skip to content

Commit 4e262b1

Browse files
committed
add synchronous methods to stores
1 parent 5a2a884 commit 4e262b1

File tree

6 files changed

+354
-3
lines changed

6 files changed

+354
-3
lines changed

src/zarr/abc/store.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,17 @@
1616

1717
from zarr.core.buffer import Buffer, BufferPrototype
1818

19-
__all__ = ["ByteGetter", "ByteSetter", "Store", "set_or_delete"]
19+
__all__ = [
20+
"ByteGetter",
21+
"ByteSetter",
22+
"Store",
23+
"SupportsDeleteSync",
24+
"SupportsGetSync",
25+
"SupportsSetRangeSync",
26+
"SupportsSetSync",
27+
"SupportsSyncStore",
28+
"set_or_delete",
29+
]
2030

2131

2232
@dataclass
@@ -700,6 +710,38 @@ async def delete(self) -> None: ...
700710
async def set_if_not_exists(self, default: Buffer) -> None: ...
701711

702712

713+
@runtime_checkable
714+
class SupportsGetSync(Protocol):
715+
def get_sync(
716+
self,
717+
key: str,
718+
*,
719+
prototype: BufferPrototype | None = None,
720+
byte_range: ByteRequest | None = None,
721+
) -> Buffer | None: ...
722+
723+
724+
@runtime_checkable
725+
class SupportsSetSync(Protocol):
726+
def set_sync(self, key: str, value: Buffer) -> None: ...
727+
728+
729+
@runtime_checkable
730+
class SupportsSetRangeSync(Protocol):
731+
def set_range_sync(self, key: str, value: Buffer, start: int) -> None: ...
732+
733+
734+
@runtime_checkable
735+
class SupportsDeleteSync(Protocol):
736+
def delete_sync(self, key: str) -> None: ...
737+
738+
739+
@runtime_checkable
740+
class SupportsSyncStore(
741+
SupportsGetSync, SupportsSetSync, SupportsSetRangeSync, SupportsDeleteSync, Protocol
742+
): ...
743+
744+
703745
async def set_or_delete(byte_setter: ByteSetter, value: Buffer | None) -> None:
704746
"""Set or delete a value in a byte setter
705747

src/zarr/storage/_common.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,33 @@ async def is_empty(self) -> bool:
228228
"""
229229
return await self.store.is_empty(self.path)
230230

231+
# -------------------------------------------------------------------
232+
# Synchronous IO delegation
233+
# -------------------------------------------------------------------
234+
235+
def get_sync(
236+
self,
237+
*,
238+
prototype: BufferPrototype | None = None,
239+
byte_range: ByteRequest | None = None,
240+
) -> Buffer | None:
241+
"""Synchronous read — delegates to ``self.store.get_sync(self.path, ...)``."""
242+
if prototype is None:
243+
prototype = default_buffer_prototype()
244+
return self.store.get_sync(self.path, prototype=prototype, byte_range=byte_range) # type: ignore[attr-defined, no-any-return]
245+
246+
def set_sync(self, value: Buffer) -> None:
247+
"""Synchronous write — delegates to ``self.store.set_sync(self.path, value)``."""
248+
self.store.set_sync(self.path, value) # type: ignore[attr-defined]
249+
250+
def set_range_sync(self, value: Buffer, start: int) -> None:
251+
"""Synchronous byte-range write."""
252+
self.store.set_range_sync(self.path, value, start) # type: ignore[attr-defined]
253+
254+
def delete_sync(self) -> None:
255+
"""Synchronous delete — delegates to ``self.store.delete_sync(self.path)``."""
256+
self.store.delete_sync(self.path) # type: ignore[attr-defined]
257+
231258
def __truediv__(self, other: str) -> StorePath:
232259
"""Combine this store path with another path"""
233260
return self.__class__(self.store, _dereference_path(self.path, other))

src/zarr/storage/_local.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,19 @@ def _put(path: Path, value: Buffer, exclusive: bool = False) -> int:
8585
return f.write(view)
8686

8787

88+
def _put_range(path: Path, value: Buffer, start: int) -> None:
89+
view = value.as_buffer_like()
90+
file_size = path.stat().st_size
91+
if start + len(view) > file_size:
92+
raise ValueError(
93+
f"set_range would write beyond the end of the stored value: "
94+
f"start={start}, len(value)={len(view)}, stored size={file_size}"
95+
)
96+
with path.open("r+b") as f:
97+
f.seek(start)
98+
f.write(view)
99+
100+
88101
class LocalStore(Store):
89102
"""
90103
Store for the local file system.
@@ -187,6 +200,62 @@ def __repr__(self) -> str:
187200
def __eq__(self, other: object) -> bool:
188201
return isinstance(other, type(self)) and self.root == other.root
189202

203+
# -------------------------------------------------------------------
204+
# Synchronous store methods
205+
# -------------------------------------------------------------------
206+
207+
def _ensure_open_sync(self) -> None:
208+
if not self._is_open:
209+
if not self.read_only:
210+
self.root.mkdir(parents=True, exist_ok=True)
211+
if not self.root.exists():
212+
raise FileNotFoundError(f"{self.root} does not exist")
213+
self._is_open = True
214+
215+
def get_sync(
216+
self,
217+
key: str,
218+
*,
219+
prototype: BufferPrototype | None = None,
220+
byte_range: ByteRequest | None = None,
221+
) -> Buffer | None:
222+
if prototype is None:
223+
prototype = default_buffer_prototype()
224+
self._ensure_open_sync()
225+
assert isinstance(key, str)
226+
path = self.root / key
227+
try:
228+
return _get(path, prototype, byte_range)
229+
except (FileNotFoundError, IsADirectoryError, NotADirectoryError):
230+
return None
231+
232+
def set_sync(self, key: str, value: Buffer) -> None:
233+
self._ensure_open_sync()
234+
self._check_writable()
235+
assert isinstance(key, str)
236+
if not isinstance(value, Buffer):
237+
raise TypeError(
238+
f"LocalStore.set(): `value` must be a Buffer instance. "
239+
f"Got an instance of {type(value)} instead."
240+
)
241+
path = self.root / key
242+
_put(path, value)
243+
244+
def set_range_sync(self, key: str, value: Buffer, start: int) -> None:
245+
self._ensure_open_sync()
246+
self._check_writable()
247+
path = self.root / key
248+
_put_range(path, value, start)
249+
250+
def delete_sync(self, key: str) -> None:
251+
self._ensure_open_sync()
252+
self._check_writable()
253+
path = self.root / key
254+
if path.is_dir():
255+
shutil.rmtree(path)
256+
else:
257+
path.unlink(missing_ok=True)
258+
190259
async def get(
191260
self,
192261
key: str,

src/zarr/storage/_memory.py

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,49 @@ def __eq__(self, other: object) -> bool:
7777
and self.read_only == other.read_only
7878
)
7979

80+
# -------------------------------------------------------------------
81+
# Synchronous store methods
82+
# -------------------------------------------------------------------
83+
84+
def get_sync(
85+
self,
86+
key: str,
87+
*,
88+
prototype: BufferPrototype | None = None,
89+
byte_range: ByteRequest | None = None,
90+
) -> Buffer | None:
91+
if prototype is None:
92+
prototype = default_buffer_prototype()
93+
if not self._is_open:
94+
self._is_open = True
95+
assert isinstance(key, str)
96+
try:
97+
value = self._store_dict[key]
98+
start, stop = _normalize_byte_range_index(value, byte_range)
99+
return prototype.buffer.from_buffer(value[start:stop])
100+
except KeyError:
101+
return None
102+
103+
def set_sync(self, key: str, value: Buffer) -> None:
104+
self._check_writable()
105+
if not self._is_open:
106+
self._is_open = True
107+
assert isinstance(key, str)
108+
if not isinstance(value, Buffer):
109+
raise TypeError(
110+
f"MemoryStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead."
111+
)
112+
self._store_dict[key] = value
113+
114+
def delete_sync(self, key: str) -> None:
115+
self._check_writable()
116+
if not self._is_open:
117+
self._is_open = True
118+
try:
119+
del self._store_dict[key]
120+
except KeyError:
121+
logger.debug("Key %s does not exist.", key)
122+
80123
async def get(
81124
self,
82125
key: str,
@@ -122,14 +165,33 @@ async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None
122165
raise TypeError(
123166
f"MemoryStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead."
124167
)
125-
126168
if byte_range is not None:
127169
buf = self._store_dict[key]
128170
buf[byte_range[0] : byte_range[1]] = value
129171
self._store_dict[key] = buf
130172
else:
131173
self._store_dict[key] = value
132174

175+
def _set_range_impl(self, key: str, value: Buffer, start: int) -> None:
176+
buf = self._store_dict[key]
177+
target = buf.as_numpy_array()
178+
if start + len(value) > len(target):
179+
raise ValueError(
180+
f"set_range would write beyond the end of the stored value: "
181+
f"start={start}, len(value)={len(value)}, stored size={len(target)}"
182+
)
183+
if not target.flags.writeable:
184+
target = target.copy()
185+
self._store_dict[key] = buf.__class__(target)
186+
target[start : start + len(value)] = value.as_numpy_array()
187+
188+
def set_range_sync(self, key: str, value: Buffer, start: int) -> None:
189+
"""Synchronous byte-range write."""
190+
self._check_writable()
191+
if not self._is_open:
192+
self._is_open = True
193+
self._set_range_impl(key, value, start)
194+
133195
async def set_if_not_exists(self, key: str, value: Buffer) -> None:
134196
# docstring inherited
135197
self._check_writable()

0 commit comments

Comments
 (0)