Skip to content

Commit 28d0def

Browse files
committed
update design doc
1 parent 69172fb commit 28d0def

1 file changed

Lines changed: 116 additions & 22 deletions

File tree

docs/design/sync-bypass.md

Lines changed: 116 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ Add `supports_sync_io`, `read_sync()`, and `write_sync()` to the
9494
`SyncCodecPipeline` implements these with a simple sequential loop:
9595

9696
```python
97-
# read_sync: for each chunk
97+
# read_sync: for each chunk (non-sharded path)
9898
for byte_getter, chunk_spec, chunk_sel, out_sel, _ in batch_info:
9999
chunk_bytes = byte_getter.get_sync(prototype=chunk_spec.prototype) # sync IO
100100
chunk_array = self._decode_one(chunk_bytes, ...) # sync compute
@@ -103,11 +103,13 @@ for byte_getter, chunk_spec, chunk_sel, out_sel, _ in batch_info:
103103

104104
No batching, no `concurrent_map`, no event loop — just a Python for-loop.
105105

106-
**Sharding fallback**: When `supports_partial_decode` is True (i.e. the codec
107-
pipeline uses sharding), `supports_sync_io` returns False and the Array falls
108-
back to the standard `sync()` path. This is because `ShardingCodec`'s
109-
`decode_partial` is async (it reads sub-ranges from the store) and does not
110-
have a sync equivalent.
106+
**Sharding support**: When the pipeline uses `ShardingCodec` (i.e.
107+
`supports_partial_decode` is True), `read_sync` delegates to
108+
`ShardingCodec._decode_partial_sync()` instead. This method fetches
109+
the shard index and requested chunk bytes via sync byte-range reads
110+
(`byte_getter.get_sync()` with `RangeByteRequest`/`SuffixByteRequest`),
111+
then decodes through the inner pipeline's `read_sync` — all on the
112+
calling thread. See [Sync Sharding](#sync-sharding) below for details.
111113

112114
### Layer 3: Array Bypass
113115

@@ -126,20 +128,19 @@ def get_basic_selection(self, selection, *, out=None, prototype=None, fields=Non
126128
return sync(self.async_array._get_selection(indexer, ...))
127129
```
128130

129-
`_can_use_sync_path()` checks three conditions:
131+
`_can_use_sync_path()` checks two conditions:
130132
1. The codec pipeline supports sync IO (`supports_sync_io`)
131-
2. No partial decode is active (rules out sharding)
132-
3. The store supports sync (`supports_sync`)
133+
2. The store supports sync (`supports_sync`)
133134

134-
When all three hold, `_get_selection_sync` / `_set_selection_sync` run the
135+
When both hold, `_get_selection_sync` / `_set_selection_sync` run the
135136
entire operation on the calling thread. These functions mirror the async
136137
`_get_selection` / `_set_selection` exactly, but call `codec_pipeline.read_sync()`
137138
/ `write_sync()` instead of `await codec_pipeline.read()` / `write()`.
138139

139140

140141
## Resulting Call Chain
141142

142-
With the sync bypass active, the call chain becomes:
143+
With the sync bypass active, the call chain for non-sharded arrays becomes:
143144

144145
```
145146
Array.__getitem__
@@ -150,9 +151,54 @@ Array.__getitem__
150151
└─ out[sel] = array # scatter into output
151152
```
152153

154+
For sharded arrays:
155+
156+
```
157+
Array.__getitem__
158+
└─ _get_selection_sync # runs on calling thread
159+
└─ SyncCodecPipeline.read_sync
160+
└─ ShardingCodec._decode_partial_sync
161+
├─ StorePath.get_sync(byte_range) # sync byte-range read for shard index
162+
├─ _decode_shard_index_sync # inline index codec chain
163+
├─ StorePath.get_sync(byte_range) # sync byte-range read per chunk
164+
└─ inner_pipeline.read_sync # inner codec chain (sync)
165+
├─ _ShardingByteGetter.get_sync # dict lookup
166+
├─ _decode_one # inline codec chain
167+
└─ out[sel] = array # scatter
168+
```
169+
153170
No `sync()`, no event loop, no `asyncio.to_thread`, no `concurrent_map`.
154171

155172

173+
## Sync Sharding
174+
175+
`ShardingCodec` participates in the fully-synchronous path through sync
176+
variants of all its methods:
177+
178+
**Shard index codec chain**: The index codecs (typically `BytesCodec` +
179+
`Crc32cCodec`) are run inline via `_decode_shard_index_sync` /
180+
`_encode_shard_index_sync`. These classify the index codecs using
181+
`codecs_from_list`, resolve metadata forward through the chain, then
182+
run the decode/encode in the correct order — all without constructing a
183+
pipeline object.
184+
185+
**Full shard decode/encode** (`_decode_sync` / `_encode_sync`): Receives
186+
complete shard bytes, decodes the index, then delegates to the inner
187+
codec pipeline's `read_sync` / `write_sync` with `_ShardingByteGetter` /
188+
`_ShardingByteSetter` (dict-backed, so "IO" is a dict lookup).
189+
190+
**Partial shard decode/encode** (`_decode_partial_sync` /
191+
`_encode_partial_sync`): The partial path is where most of the IO happens —
192+
it issues sync byte-range reads to fetch the shard index and individual
193+
chunk data from the store. Once bytes are in memory, the inner pipeline
194+
decodes them synchronously.
195+
196+
**Inner pipeline**: `ShardingCodec.codec_pipeline` is obtained via
197+
`get_pipeline_class()`. When `SyncCodecPipeline` is configured globally,
198+
the inner pipeline is also a `SyncCodecPipeline`, enabling recursive sync
199+
dispatch for nested sharding.
200+
201+
156202
## Additional Optimization: Codec Instance Caching
157203

158204
`GzipCodec` was creating a new `GZip(level)` instance on every encode/decode
@@ -169,14 +215,25 @@ This is safe because `GzipCodec` is a frozen dataclass — `level` never
169215
changes after construction, so the cached instance is always valid.
170216

171217

218+
## Bugfix: _decode_async Metadata Resolution
219+
220+
The async fallback path in `SyncCodecPipeline._decode_async()` (used when
221+
a codec in the chain doesn't support sync) had a metadata resolution bug:
222+
it passed the same unresolved `chunk_specs` to every codec during decode.
223+
224+
Size-changing codecs like `FixedScaleOffset` and `PackBits` alter the data
225+
shape/dtype, so each codec needs specs resolved through the forward chain.
226+
The fix resolves metadata forward (aa -> ab -> bb), records specs at each
227+
step, then uses the correct resolved specs during reverse decode traversal.
228+
This matches `BatchedCodecPipeline._codecs_with_resolved_metadata_batched`.
229+
230+
172231
## What Stays Unchanged
173232

174233
- **`BatchedCodecPipeline`**: Unmodified. It inherits the default
175234
`supports_sync_io=False` from the ABC.
176235
- **Remote stores** (`FsspecStore`): `supports_sync` stays `False`. All
177236
remote IO remains async.
178-
- **Sharded arrays**: Fall back to the `sync()` path because
179-
`supports_partial_decode` is True.
180237
- **All async APIs**: `AsyncArray`, `async def read/write`, etc. are
181238
completely untouched. The sync bypass is an optimization of the
182239
synchronous `Array` class only.
@@ -190,10 +247,39 @@ changes after construction, so the cached instance is always valid.
190247
| `src/zarr/storage/_memory.py` | 1 | Sync store methods (direct dict access) |
191248
| `src/zarr/storage/_local.py` | 1 | Sync store methods (direct `_get`/`_put` calls) |
192249
| `src/zarr/storage/_common.py` | 1 | Sync methods on `StorePath` (delegates to store) |
193-
| `src/zarr/abc/codec.py` | 2 | `supports_sync_io`, `read_sync`, `write_sync` on `CodecPipeline` ABC |
194-
| `src/zarr/experimental/sync_codecs.py` | 2 | `read_sync`, `write_sync` implementation |
250+
| `src/zarr/abc/codec.py` | 2 | `_decode_sync`, `_encode_sync`, `supports_sync` on `BaseCodec`; `supports_sync_io`, `read_sync`, `write_sync` on `CodecPipeline` |
251+
| `src/zarr/experimental/sync_codecs.py` | 2 | `read_sync`, `write_sync`, `_decode_async` metadata fix |
252+
| `src/zarr/codecs/sharding.py` | 2 | `_decode_sync`, `_encode_sync`, `_decode_partial_sync`, `_encode_partial_sync`, shard index sync codec chain |
195253
| `src/zarr/core/array.py` | 3 | `_can_use_sync_path`, `_get_selection_sync`, `_set_selection_sync`, 10 method modifications |
196254
| `src/zarr/codecs/gzip.py` || `@cached_property` for GZip instance |
255+
| `src/zarr/codecs/blosc.py` || `_decode_sync`/`_encode_sync`; `_decode_single`/`_encode_single` delegate to sync |
256+
| `src/zarr/codecs/zstd.py` || `_decode_sync`/`_encode_sync`; `_decode_single`/`_encode_single` delegate to sync |
257+
| `src/zarr/codecs/bytes.py` || `_decode_sync`/`_encode_sync` (was `_decode_single`/`_encode_single`) |
258+
| `src/zarr/codecs/crc32c_.py` || `_decode_sync`/`_encode_sync` (was `_decode_single`/`_encode_single`) |
259+
| `src/zarr/codecs/transpose.py` || `_decode_sync`/`_encode_sync`; `_decode_single`/`_encode_single` delegate to sync |
260+
| `src/zarr/codecs/vlen_utf8.py` || `_decode_sync`/`_encode_sync` for `VLenUTF8Codec` and `VLenBytesCodec` |
261+
262+
263+
## Performance
264+
265+
Benchmarks on MemoryStore with `SyncCodecPipeline` vs `BatchedCodecPipeline`:
266+
267+
**Non-sharded arrays** (zstd compression, 100x100 float64, 32x32 chunks):
268+
- Single-chunk read: ~2-4x faster
269+
- Full-array read: ~2-11x faster (varies with chunk count)
270+
- Single-chunk write: ~2-3x faster
271+
272+
**Sharded arrays** (4x4 shard of 8x8 inner chunks, zstd, MemoryStore):
273+
- Single-chunk read: ~1.5-2.5x faster
274+
- Full-array read: ~1.5-2x faster
275+
- Single-chunk write: ~1.3-1.6x faster
276+
- Full-array write: ~1.3-1.5x faster
277+
278+
The sharded speedup is smaller because the shard index decode and
279+
per-chunk byte-range reads add overhead that wasn't present in the
280+
non-sharded path. Still, eliminating the event loop round-trip and
281+
`asyncio.to_thread` for each inner chunk decode provides a meaningful
282+
improvement.
197283

198284

199285
## Design Tradeoffs
@@ -216,13 +302,21 @@ can be overlapped.
216302
**`Any` type annotations**: The `read_sync` and `write_sync` methods on
217303
`SyncCodecPipeline` use `Any` for the byte_getter/byte_setter type in the
218304
`batch_info` tuples. This avoids modifying the `ByteGetter`/`ByteSetter`
219-
protocols, which are public API. The runtime type is always `StorePath`, which
305+
protocols, which are public API. The runtime type is always `StorePath` (or
306+
`_ShardingByteGetter`/`_ShardingByteSetter` for inner-shard access), which
220307
has the sync methods; the type system just can't express this constraint
221308
through the existing protocol hierarchy.
222309

223-
**No sync partial decode/encode**: Sharding's `decode_partial` /
224-
`encode_partial` methods are inherently async (they issue byte-range reads to
225-
the store). Rather than adding sync variants to the sharding codec (which
226-
would require significant refactoring), we simply fall back to the `sync()`
227-
path for sharded arrays. This is the right tradeoff because sharded arrays
228-
typically involve remote stores where async IO is beneficial anyway.
310+
**Sync sharding — sequential chunk reads**: The sync partial decode path
311+
fetches each chunk's bytes sequentially via `byte_getter.get_sync()` with
312+
byte-range requests. The async path can overlap these reads via
313+
`concurrent_map`. For MemoryStore this doesn't matter (dict lookup is ~1us).
314+
For LocalStore, OS page cache means sequential reads are fast for warm data.
315+
For remote stores where overlapping IO would help, `supports_sync` is False
316+
and the async path is used automatically.
317+
318+
**Inline shard index codec chain**: `_decode_shard_index_sync` and
319+
`_encode_shard_index_sync` run the index codecs (BytesCodec + Crc32cCodec)
320+
directly rather than constructing a temporary `CodecPipeline`. This avoids
321+
the overhead of pipeline construction for a simple two-codec chain and keeps
322+
the sync path self-contained.

0 commit comments

Comments
 (0)