Skip to content

Commit a84a15a

Browse files
committed
fix: byte-range path skips empty chunks and deletes empty shards
The byte-range write path in ShardingCodec._encode_partial_sync was encoding every affected inner chunk regardless of whether it was empty, and never deleting fully-empty shards. This made test_delete_empty_shards fail under the SyncCodecPipeline (the bench branch's default). Now matches the full-rewrite path's behavior: * When skip_empty (write_empty_chunks=False) is set and a merged chunk equals the fill value, mark the chunk absent in the index instead of writing data. * After updating all affected entries, if the index is fully empty, delete the shard rather than writing an empty index. Also copy the shard index before mutating it; the decoded view may be backed by a read-only buffer (mmap-style reads from LocalStore). Plus: convert tests/test_sync_pipeline.py::test_memory_store_set_range from `asyncio.run(...)` inside a sync test to a plain `async def` test. The asyncio.run call leaked event-loop self-pipe sockets that pytest's unraisableexception plugin would later attribute to unrelated tests, producing spurious "flaky" failures across the suite.
1 parent ff0d5a2 commit a84a15a

File tree

2 files changed

+24
-16
lines changed

2 files changed

+24
-16
lines changed

src/zarr/codecs/sharding.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,10 @@ def _encode_partial_sync(
622622
if existing is not None and len(existing) == total_shard_size:
623623
key = byte_setter.path if hasattr(byte_setter, "path") else str(byte_setter)
624624
shard_reader = self._shard_reader_from_bytes_sync(existing, chunks_per_shard)
625-
index = shard_reader.index
625+
# The decoded index may be a view of a read-only buffer (e.g.
626+
# mmap-backed reads from LocalStore). Copy so set_chunk_slice
627+
# below can mutate it.
628+
index = _ShardIndex(shard_reader.index.offsets_and_lengths.copy())
626629

627630
rank_map = {c: r for r, c in enumerate(morton_order_iter(chunks_per_shard))}
628631

@@ -648,6 +651,11 @@ def _byte_offset(coords: tuple[int, ...]) -> int:
648651
).copy()
649652
chunk_array[chunk_sel] = chunk_value
650653

654+
if skip_empty and chunk_array.all_equal(fill_value):
655+
# Mark absent in the index; leave the data region as-is.
656+
index.set_chunk_slice(chunk_coords, None)
657+
continue
658+
651659
encoded = inner_transform.encode_chunk(chunk_array, chunk_spec)
652660
if encoded is not None:
653661
store.set_range_sync(key, encoded, byte_offset)
@@ -656,6 +664,11 @@ def _byte_offset(coords: tuple[int, ...]) -> int:
656664
slice(byte_offset, byte_offset + chunk_byte_length),
657665
)
658666

667+
# If every inner chunk is now absent, delete the entire shard.
668+
if index.is_all_empty():
669+
byte_setter.delete_sync()
670+
return
671+
659672
index_bytes = self._encode_shard_index_sync(index)
660673
if self.index_location == ShardingCodecIndexLocation.start:
661674
store.set_range_sync(key, index_bytes, 0)

tests/test_sync_pipeline.py

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
from __future__ import annotations
44

5-
import asyncio
65
from typing import Any
76

87
import numpy as np
@@ -440,23 +439,19 @@ def test_memory_store_supports_byte_range_setter() -> None:
440439
assert isinstance(store, SupportsSetRange)
441440

442441

443-
def test_memory_store_set_range() -> None:
442+
async def test_memory_store_set_range() -> None:
444443
"""MemoryStore.set_range should overwrite bytes at the given offset."""
444+
store = zarr.storage.MemoryStore()
445+
await store._ensure_open()
446+
buf = cpu.Buffer.from_bytes(b"AAAAAAAAAA") # 10 bytes
447+
await store.set("test/key", buf)
445448

446-
async def _test() -> None:
447-
store = zarr.storage.MemoryStore()
448-
await store._ensure_open()
449-
buf = cpu.Buffer.from_bytes(b"AAAAAAAAAA") # 10 bytes
450-
await store.set("test/key", buf)
451-
452-
patch = cpu.Buffer.from_bytes(b"XX")
453-
await store.set_range("test/key", patch, start=3)
454-
455-
result = await store.get("test/key", prototype=cpu.buffer_prototype)
456-
assert result is not None
457-
assert result.to_bytes() == b"AAAXXAAAAA"
449+
patch = cpu.Buffer.from_bytes(b"XX")
450+
await store.set_range("test/key", patch, start=3)
458451

459-
asyncio.run(_test())
452+
result = await store.get("test/key", prototype=cpu.buffer_prototype)
453+
assert result is not None
454+
assert result.to_bytes() == b"AAAXXAAAAA"
460455

461456

462457
def test_sharding_codec_inner_codecs_fixed_size_no_compression() -> None:

0 commit comments

Comments
 (0)