Skip to content

Commit 12e304c

Browse files
committed
fix: byte-range fast path requires write_empty_chunks=True
The byte-range path in ShardingCodec._encode_partial_sync writes every affected inner chunk into its fixed-size data slot. This is incompatible with write_empty_chunks=False (the default), which expects empty chunks to be compacted out of the shard layout entirely. Add `not skip_empty` to the gate. With the default config, partial shard writes go through the full-rewrite path (which honors the compact layout). With write_empty_chunks=True, the byte-range optimization is used. Also copy the shard index before mutating; the decoded view may be backed by a read-only buffer (mmap-style reads from LocalStore). Plus: convert tests/test_sync_pipeline.py::test_memory_store_set_range from `asyncio.run(...)` inside a sync test to a plain `async def`. The asyncio.run call leaked event-loop self-pipe sockets that pytest's unraisableexception plugin would later attribute to unrelated tests. Update test_partial_shard_write_uses_set_range to opt into write_empty_chunks=True so the byte-range path is actually exercised.
1 parent ff0d5a2 commit 12e304c

2 files changed

Lines changed: 23 additions & 16 deletions

File tree

src/zarr/codecs/sharding.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -606,9 +606,14 @@ def _encode_partial_sync(
606606
is_scalar = len(value.shape) == 0
607607

608608
# --- Byte-range fast path ---
609+
# Only safe when we don't need to skip empty chunks: byte-range
610+
# writes leave chunk presence unchanged (writes a fixed-size
611+
# data slot for every affected chunk). Compacting empty chunks
612+
# away requires rewriting the whole shard.
609613
store = byte_setter.store if hasattr(byte_setter, "store") else None
610614
if (
611615
not is_complete
616+
and not skip_empty
612617
and self._inner_codecs_fixed_size
613618
and isinstance(store, SupportsSetRange)
614619
):
@@ -622,7 +627,10 @@ def _encode_partial_sync(
622627
if existing is not None and len(existing) == total_shard_size:
623628
key = byte_setter.path if hasattr(byte_setter, "path") else str(byte_setter)
624629
shard_reader = self._shard_reader_from_bytes_sync(existing, chunks_per_shard)
625-
index = shard_reader.index
630+
# The decoded index may be a view of a read-only buffer (e.g.
631+
# mmap-backed reads from LocalStore). Copy so set_chunk_slice
632+
# below can mutate it.
633+
index = _ShardIndex(shard_reader.index.offsets_and_lengths.copy())
626634

627635
rank_map = {c: r for r, c in enumerate(morton_order_iter(chunks_per_shard))}
628636

tests/test_sync_pipeline.py

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
from __future__ import annotations
44

5-
import asyncio
65
from typing import Any
76

87
import numpy as np
@@ -440,23 +439,19 @@ def test_memory_store_supports_byte_range_setter() -> None:
440439
assert isinstance(store, SupportsSetRange)
441440

442441

443-
def test_memory_store_set_range() -> None:
442+
async def test_memory_store_set_range() -> None:
444443
"""MemoryStore.set_range should overwrite bytes at the given offset."""
444+
store = zarr.storage.MemoryStore()
445+
await store._ensure_open()
446+
buf = cpu.Buffer.from_bytes(b"AAAAAAAAAA") # 10 bytes
447+
await store.set("test/key", buf)
445448

446-
async def _test() -> None:
447-
store = zarr.storage.MemoryStore()
448-
await store._ensure_open()
449-
buf = cpu.Buffer.from_bytes(b"AAAAAAAAAA") # 10 bytes
450-
await store.set("test/key", buf)
451-
452-
patch = cpu.Buffer.from_bytes(b"XX")
453-
await store.set_range("test/key", patch, start=3)
454-
455-
result = await store.get("test/key", prototype=cpu.buffer_prototype)
456-
assert result is not None
457-
assert result.to_bytes() == b"AAAXXAAAAA"
449+
patch = cpu.Buffer.from_bytes(b"XX")
450+
await store.set_range("test/key", patch, start=3)
458451

459-
asyncio.run(_test())
452+
result = await store.get("test/key", prototype=cpu.buffer_prototype)
453+
assert result is not None
454+
assert result.to_bytes() == b"AAAXXAAAAA"
460455

461456

462457
def test_sharding_codec_inner_codecs_fixed_size_no_compression() -> None:
@@ -528,6 +523,9 @@ def test_partial_shard_write_uses_set_range() -> None:
528523
from unittest.mock import patch
529524

530525
store = zarr.storage.MemoryStore()
526+
# write_empty_chunks=True keeps a fixed-size dense layout, which is
527+
# required for the byte-range fast path (chunks never transition
528+
# present <-> absent).
531529
arr = zarr.create_array(
532530
store=store,
533531
shape=(100,),
@@ -536,6 +534,7 @@ def test_partial_shard_write_uses_set_range() -> None:
536534
shards=(100,),
537535
compressors=None,
538536
fill_value=0.0,
537+
config={"write_empty_chunks": True},
539538
)
540539
if not isinstance(arr._async_array.codec_pipeline, SyncCodecPipeline):
541540
pytest.skip("byte-range write optimization is specific to SyncCodecPipeline")

0 commit comments

Comments
 (0)