Skip to content

Commit b53ac3e

Browse files
committed
add comments and documentation
1 parent a934899 commit b53ac3e

9 files changed

Lines changed: 483 additions & 25 deletions

File tree

docs/design/sync-bypass.md

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
# Design: Fully Synchronous Read/Write Bypass
2+
3+
## Problem
4+
5+
Zarr-python's read/write path is inherently async: every `Array.__getitem__`
6+
or `Array.__setitem__` call passes through several layers of async machinery
7+
before any actual work happens. For workloads where both the codec chain and
8+
the store are fundamentally synchronous (e.g. gzip + MemoryStore, or
9+
zstd + LocalStore), this async overhead dominates latency.
10+
11+
The call chain looks like this:
12+
13+
```
14+
Array.__getitem__
15+
└─ sync() # (1) thread hop: submits coroutine to background event loop
16+
└─ AsyncArray._get_selection # runs on the event loop thread
17+
└─ CodecPipeline.read # async pipeline
18+
├─ concurrent_map # (2) launches tasks on event loop
19+
│ └─ ByteGetter.get(prototype) # (3) async store IO
20+
│ └─ MemoryStore.get() # just a dict lookup!
21+
└─ codec.decode()
22+
└─ asyncio.to_thread(...) # (4) thread hop for CPU work
23+
└─ gzip.decompress(...) # actual compute
24+
```
25+
26+
There are four sources of overhead, marked (1)-(4):
27+
28+
1. **`sync()` bridge**: Every synchronous `Array` method calls `sync()`, which
29+
uses `asyncio.run_coroutine_threadsafe()` to submit work to a background
30+
event loop thread. Even when the coroutine does zero awaiting, this costs
31+
~30-50us for the round-trip through the event loop.
32+
33+
2. **`concurrent_map` batching**: The pipeline groups chunks into batches and
34+
dispatches them via `concurrent_map`, which creates asyncio tasks. For
35+
single-chunk reads (the common case), this is pure overhead.
36+
37+
3. **Async store IO**: `StorePath.get()` / `StorePath.set()` are `async def`.
38+
For `MemoryStore` (a dict lookup) and `LocalStore` (a file read), the
39+
underlying operation is synchronous — wrapping it in `async def` forces an
40+
unnecessary context switch through the event loop.
41+
42+
4. **`asyncio.to_thread` for codec compute**: `BatchedCodecPipeline` runs each
43+
codec's encode/decode in `asyncio.to_thread()`, adding another thread hop.
44+
`SyncCodecPipeline` (the foundation this work builds on) already eliminates
45+
this by calling `_decode_sync` / `_encode_sync` inline.
46+
47+
The net effect: a MemoryStore read of a single small chunk spends more time
48+
in async machinery than in actual decompression.
49+
50+
51+
## Solution
52+
53+
When the codec pipeline and store both support synchronous operation, bypass
54+
the event loop entirely: run IO, codec compute, and buffer scatter all on the
55+
calling thread, with zero async overhead.
56+
57+
The solution has three layers:
58+
59+
### Layer 1: Sync Store IO
60+
61+
Add `supports_sync`, `get_sync()`, `set_sync()`, and `delete_sync()` to the
62+
store abstraction. These are opt-in: the `Store` ABC provides default
63+
implementations that raise `NotImplementedError`, and only stores with native
64+
sync capabilities override them.
65+
66+
```
67+
Store ABC (defaults: supports_sync=False, methods raise NotImplementedError)
68+
├── MemoryStore (supports_sync=True, direct dict access)
69+
├── LocalStore (supports_sync=True, direct file IO via _get/_put)
70+
└── FsspecStore (unchanged, remains async-only)
71+
72+
StorePath delegates to its underlying Store:
73+
get_sync() → self.store.get_sync(self.path, ...)
74+
set_sync() → self.store.set_sync(self.path, ...)
75+
```
76+
77+
**Key decision**: `StorePath` is what gets passed to the codec pipeline as a
78+
`ByteGetter` / `ByteSetter`. By adding sync methods to `StorePath`, the
79+
pipeline can call them directly without knowing the concrete store type.
80+
81+
**Protocol gap**: The `ByteGetter` / `ByteSetter` protocols only define async
82+
methods (`get`, `set`, `delete`). Rather than modifying these widely-used
83+
protocols, the sync pipeline methods use `Any` type annotations for the
84+
byte_getter/byte_setter parameters and call `.get_sync()` etc. at runtime.
85+
This is a pragmatic tradeoff: the sync path is an optimization that only
86+
activates when `supports_sync` is True, so the runtime type is always a
87+
`StorePath` that has these methods.
88+
89+
### Layer 2: Sync Codec Pipeline IO
90+
91+
Add `supports_sync_io`, `read_sync()`, and `write_sync()` to the
92+
`CodecPipeline` ABC (non-abstract, default raises `NotImplementedError`).
93+
94+
`SyncCodecPipeline` implements these with a simple sequential loop:
95+
96+
```python
97+
# read_sync: for each chunk
98+
for byte_getter, chunk_spec, chunk_sel, out_sel, _ in batch_info:
99+
chunk_bytes = byte_getter.get_sync(prototype=chunk_spec.prototype) # sync IO
100+
chunk_array = self._decode_one(chunk_bytes, ...) # sync compute
101+
out[out_selection] = chunk_array[chunk_selection] # scatter
102+
```
103+
104+
No batching, no `concurrent_map`, no event loop — just a Python for-loop.
105+
106+
**Sharding fallback**: When `supports_partial_decode` is True (i.e. the codec
107+
pipeline uses sharding), `supports_sync_io` returns False and the Array falls
108+
back to the standard `sync()` path. This is because `ShardingCodec`'s
109+
`decode_partial` is async (it reads sub-ranges from the store) and does not
110+
have a sync equivalent.
111+
112+
### Layer 3: Array Bypass
113+
114+
Each of the 10 sync `Array` selection methods (5 getters, 5 setters) gains a
115+
fast path:
116+
117+
```python
118+
def get_basic_selection(self, selection, *, out=None, prototype=None, fields=None):
119+
indexer = BasicIndexer(selection, self.shape, self.metadata.chunk_grid)
120+
if self._can_use_sync_path():
121+
return _get_selection_sync(
122+
self.async_array.store_path, self.async_array.metadata,
123+
self.async_array.codec_pipeline, self.async_array.config,
124+
indexer, out=out, fields=fields, prototype=prototype,
125+
)
126+
return sync(self.async_array._get_selection(indexer, ...))
127+
```
128+
129+
`_can_use_sync_path()` checks three conditions:
130+
1. The codec pipeline supports sync IO (`supports_sync_io`)
131+
2. No partial decode is active (rules out sharding)
132+
3. The store supports sync (`supports_sync`)
133+
134+
When all three hold, `_get_selection_sync` / `_set_selection_sync` run the
135+
entire operation on the calling thread. These functions mirror the async
136+
`_get_selection` / `_set_selection` exactly, but call `codec_pipeline.read_sync()`
137+
/ `write_sync()` instead of `await codec_pipeline.read()` / `write()`.
138+
139+
140+
## Resulting Call Chain
141+
142+
With the sync bypass active, the call chain becomes:
143+
144+
```
145+
Array.__getitem__
146+
└─ _get_selection_sync # runs on calling thread
147+
└─ SyncCodecPipeline.read_sync
148+
├─ StorePath.get_sync # direct dict/file access, no event loop
149+
├─ _decode_one # inline codec chain, no to_thread
150+
└─ out[sel] = array # scatter into output
151+
```
152+
153+
No `sync()`, no event loop, no `asyncio.to_thread`, no `concurrent_map`.
154+
155+
156+
## Additional Optimization: Codec Instance Caching
157+
158+
`GzipCodec` was creating a new `GZip(level)` instance on every encode/decode
159+
call. `ZstdCodec` and `BloscCodec` already cache their codec instances via
160+
`@cached_property`. We apply the same pattern to `GzipCodec`:
161+
162+
```python
163+
@cached_property
164+
def _gzip_codec(self) -> GZip:
165+
return GZip(self.level)
166+
```
167+
168+
This is safe because `GzipCodec` is a frozen dataclass — `level` never
169+
changes after construction, so the cached instance is always valid.
170+
171+
172+
## What Stays Unchanged
173+
174+
- **`BatchedCodecPipeline`**: Unmodified. It inherits the default
175+
`supports_sync_io=False` from the ABC.
176+
- **Remote stores** (`FsspecStore`): `supports_sync` stays `False`. All
177+
remote IO remains async.
178+
- **Sharded arrays**: Fall back to the `sync()` path because
179+
`supports_partial_decode` is True.
180+
- **All async APIs**: `AsyncArray`, `async def read/write`, etc. are
181+
completely untouched. The sync bypass is an optimization of the
182+
synchronous `Array` class only.
183+
184+
185+
## Files Modified
186+
187+
| File | Layer | Change |
188+
|------|-------|--------|
189+
| `src/zarr/abc/store.py` | 1 | `supports_sync`, `get_sync`, `set_sync`, `delete_sync` on `Store` ABC |
190+
| `src/zarr/storage/_memory.py` | 1 | Sync store methods (direct dict access) |
191+
| `src/zarr/storage/_local.py` | 1 | Sync store methods (direct `_get`/`_put` calls) |
192+
| `src/zarr/storage/_common.py` | 1 | Sync methods on `StorePath` (delegates to store) |
193+
| `src/zarr/abc/codec.py` | 2 | `supports_sync_io`, `read_sync`, `write_sync` on `CodecPipeline` ABC |
194+
| `src/zarr/experimental/sync_codecs.py` | 2 | `read_sync`, `write_sync` implementation |
195+
| `src/zarr/core/array.py` | 3 | `_can_use_sync_path`, `_get_selection_sync`, `_set_selection_sync`, 10 method modifications |
196+
| `src/zarr/codecs/gzip.py` || `@cached_property` for GZip instance |
197+
198+
199+
## Design Tradeoffs
200+
201+
**Duplication of `_get_selection` / `_set_selection`**: The sync versions
202+
(`_get_selection_sync`, `_set_selection_sync`) duplicate the setup logic
203+
(dtype resolution, buffer creation, value coercion) from the async originals.
204+
This is intentional: extracting shared helpers would add complexity and
205+
indirection to the hot path for no functional benefit. The two versions
206+
should be kept in sync manually.
207+
208+
**Sequential chunk processing**: `read_sync` and `write_sync` process chunks
209+
sequentially in a for-loop, with no parallelism. For the target use case
210+
(MemoryStore, LocalStore), this is optimal: MemoryStore is a dict lookup
211+
(~1us), LocalStore is a file read that benefits from OS page cache, and
212+
Python's GIL prevents true parallelism for CPU-bound codec work anyway. The
213+
async path with `concurrent_map` is better for remote stores where IO latency
214+
can be overlapped.
215+
216+
**`Any` type annotations**: The `read_sync` and `write_sync` methods on
217+
`SyncCodecPipeline` use `Any` for the byte_getter/byte_setter type in the
218+
`batch_info` tuples. This avoids modifying the `ByteGetter`/`ByteSetter`
219+
protocols, which are public API. The runtime type is always `StorePath`, which
220+
has the sync methods; the type system just can't express this constraint
221+
through the existing protocol hierarchy.
222+
223+
**No sync partial decode/encode**: Sharding's `decode_partial` /
224+
`encode_partial` methods are inherently async (they issue byte-range reads to
225+
the store). Rather than adding sync variants to the sharding codec (which
226+
would require significant refactoring), we simply fall back to the `sync()`
227+
path for sharded arrays. This is the right tradeoff because sharded arrays
228+
typically involve remote stores where async IO is beneficial anyway.

src/zarr/abc/codec.py

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -474,9 +474,31 @@ async def write(
474474
"""
475475
...
476476

477+
# -------------------------------------------------------------------
478+
# Fully synchronous read/write (opt-in)
479+
#
480+
# When a CodecPipeline subclass can run the entire read/write path
481+
# (store IO + codec compute + buffer scatter) without touching the
482+
# event loop, it overrides these methods and sets supports_sync_io
483+
# to True. This lets Array selection methods bypass sync() entirely.
484+
#
485+
# The default implementations raise NotImplementedError, so
486+
# BatchedCodecPipeline (the standard pipeline) is unaffected.
487+
#
488+
# See docs/design/sync-bypass.md for the full design rationale.
489+
# -------------------------------------------------------------------
490+
477491
@property
478492
def supports_sync_io(self) -> bool:
479-
"""Whether this pipeline supports fully synchronous read/write."""
493+
"""Whether this pipeline can run read/write entirely on the calling thread.
494+
495+
True when:
496+
- All codecs support synchronous encode/decode (_decode_sync/_encode_sync)
497+
- The pipeline's read_sync/write_sync methods are implemented
498+
499+
Checked by ``Array._can_use_sync_path()`` to decide whether to bypass
500+
the ``sync()`` event-loop bridge.
501+
"""
480502
return False
481503

482504
def read_sync(
@@ -485,7 +507,12 @@ def read_sync(
485507
out: NDBuffer,
486508
drop_axes: tuple[int, ...] = (),
487509
) -> None:
488-
"""Synchronous read path. Only available on pipelines that support it."""
510+
"""Synchronous read: fetch bytes from store, decode, scatter into out.
511+
512+
Runs entirely on the calling thread. Only available when
513+
``supports_sync_io`` is True. Called by ``_get_selection_sync`` in
514+
``array.py`` when the sync bypass is active.
515+
"""
489516
raise NotImplementedError
490517

491518
def write_sync(
@@ -494,7 +521,12 @@ def write_sync(
494521
value: NDBuffer,
495522
drop_axes: tuple[int, ...] = (),
496523
) -> None:
497-
"""Synchronous write path. Only available on pipelines that support it."""
524+
"""Synchronous write: gather from value, encode, persist to store.
525+
526+
Runs entirely on the calling thread. Only available when
527+
``supports_sync_io`` is True. Called by ``_set_selection_sync`` in
528+
``array.py`` when the sync bypass is active.
529+
"""
498530
raise NotImplementedError
499531

500532

src/zarr/abc/store.py

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -524,12 +524,29 @@ def supports_partial_writes(self) -> Literal[False]:
524524
"""
525525
return False
526526

527+
# -----------------------------------------------------------------------
528+
# Synchronous IO interface (opt-in)
529+
#
530+
# These methods enable the SyncCodecPipeline to bypass the event loop
531+
# entirely for store IO. The default implementations raise
532+
# NotImplementedError; stores that wrap fundamentally synchronous
533+
# operations (MemoryStore, LocalStore) override them with direct
534+
# implementations. Remote/cloud stores (FsspecStore) leave them as-is
535+
# and remain async-only.
536+
#
537+
# See docs/design/sync-bypass.md for the full design rationale.
538+
# -----------------------------------------------------------------------
539+
527540
@property
528541
def supports_sync(self) -> bool:
529-
"""Does the store support synchronous get/set/delete?
542+
"""Whether this store has native synchronous get/set/delete methods.
543+
544+
When True, ``SyncCodecPipeline.read_sync`` / ``write_sync`` will call
545+
``get_sync`` / ``set_sync`` / ``delete_sync`` directly on the calling
546+
thread, avoiding the event loop overhead of the async equivalents.
530547
531-
When True, the sync codec pipeline can bypass the event loop for IO.
532-
Override in subclasses that have native sync implementations.
548+
Subclasses that override the sync methods below should also override
549+
this property to return True.
533550
"""
534551
return False
535552

@@ -539,15 +556,28 @@ def get_sync(
539556
prototype: BufferPrototype,
540557
byte_range: ByteRequest | None = None,
541558
) -> Buffer | None:
542-
"""Synchronous version of get(). Only available when supports_sync is True."""
559+
"""Synchronous version of ``get()``.
560+
561+
Called by ``SyncCodecPipeline.read_sync`` to fetch chunk bytes without
562+
going through the event loop. Only called when ``supports_sync`` is
563+
True, so the default ``NotImplementedError`` is never hit in practice.
564+
"""
543565
raise NotImplementedError
544566

545567
def set_sync(self, key: str, value: Buffer) -> None:
546-
"""Synchronous version of set(). Only available when supports_sync is True."""
568+
"""Synchronous version of ``set()``.
569+
570+
Called by ``SyncCodecPipeline.write_sync`` to persist encoded chunk
571+
bytes without going through the event loop.
572+
"""
547573
raise NotImplementedError
548574

549575
def delete_sync(self, key: str) -> None:
550-
"""Synchronous version of delete(). Only available when supports_sync is True."""
576+
"""Synchronous version of ``delete()``.
577+
578+
Called by ``SyncCodecPipeline.write_sync`` when a chunk should be
579+
removed (e.g. an empty chunk with ``write_empty_chunks=False``).
580+
"""
551581
raise NotImplementedError
552582

553583
@property

src/zarr/codecs/gzip.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,19 @@ def from_dict(cls, data: dict[str, JSON]) -> Self:
4949
def to_dict(self) -> dict[str, JSON]:
5050
return {"name": "gzip", "configuration": {"level": self.level}}
5151

52+
# Cache the numcodecs GZip instance. GzipCodec is a frozen dataclass,
53+
# so `level` never changes after construction, making this safe.
54+
# This matches the pattern used by ZstdCodec._zstd_codec and
55+
# BloscCodec._blosc_codec. Without caching, a new GZip(level) was
56+
# created on every encode/decode call.
5257
@cached_property
5358
def _gzip_codec(self) -> GZip:
5459
return GZip(self.level)
5560

5661
def _decode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer:
62+
# Use the cached codec instance instead of creating GZip(self.level)
63+
# each time. The async _decode_single delegates to this method via
64+
# asyncio.to_thread, so both paths benefit from the cache.
5765
return as_numpy_array_wrapper(self._gzip_codec.decode, chunk_bytes, chunk_spec.prototype)
5866

5967
def _encode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer | None:

0 commit comments

Comments
 (0)