Skip to content

Commit e82f40e

Browse files
committed
test: pipeline parity matrix — sync vs batched on every cell
Add tests/test_pipeline_parity.py — for every cell of {codec config x layout x write sequence x write_empty_chunks}, assert that SyncCodecPipeline and BatchedCodecPipeline produce semantically identical results. Three checks per cell, in order of decreasing diagnostic value: 1. Both pipelines return the same array contents after the same write sequence (catches semantic correctness bugs). 2. Both produce the same set of store keys (catches divergent empty-shard handling: one deletes, the other writes empty). 3. Each pipeline reads the OTHER pipeline's store and gets the right answer (catches layout-divergence bugs that would prevent interop, e.g. dense vs compact shard layouts). Byte-for-byte store equality is intentionally NOT checked: codecs like gzip embed wall-clock timestamps that vary between runs. Matrix axes: * codec chain — bytes-only, gzip * layout — 1d unsharded, 1d (1 chunk/shard, multi chunks/shard), 2d unsharded, 2d sharded * write sequence — full overwrite, partial middle, scalar one cell, multiple overlapping writes, ends in fill, ends in partial fill * write_empty_chunks — True, False 120 cells total, runs in ~1.5s. This is the test that would have caught the divergence bugs we hit during the SyncCodecPipeline development without waiting for an end-to-end test in a particular config to surface the symptom.
1 parent 68a7cdc commit e82f40e

1 file changed

Lines changed: 281 additions & 0 deletions

File tree

tests/test_pipeline_parity.py

Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
"""Pipeline parity test — exhaustive matrix of read/write scenarios.
2+
3+
For every cell of the matrix (codec config x layout x operation
4+
sequence x runtime config), assert that ``SyncCodecPipeline`` and
5+
``BatchedCodecPipeline`` produce semantically identical results:
6+
7+
* Same returned array contents on read.
8+
* Same set of store keys after writes (catches divergent empty-shard
9+
handling: one pipeline deletes, the other writes an empty blob).
10+
* Reading each pipeline's store contents through the *other* pipeline
11+
yields the same array (catches "wrote a layout that only one
12+
pipeline can read" bugs).
13+
14+
Pipeline-divergence bugs (e.g. one pipeline writes a dense shard
15+
layout while the other writes a compact layout) fail this test
16+
loudly with a clear diff, instead of waiting for a downstream
17+
test to trip over the symptom.
18+
19+
Byte-for-byte equality of store contents is intentionally NOT
20+
checked: codecs like gzip embed the wall-clock timestamp in their
21+
output, so two compressions of the same data done at different
22+
seconds produce different bytes despite being semantically
23+
identical.
24+
25+
The matrix axes are:
26+
27+
* codec chain — bytes-only, gzip, with/without sharding
28+
* layout — chunk_shape, shard_shape (None for no sharding)
29+
* write sequence — full overwrite, partial in middle, scalar to one
30+
cell, multiple overlapping writes, sequence ending in fill values
31+
* runtime config — write_empty_chunks True/False
32+
"""
33+
34+
from __future__ import annotations
35+
36+
from typing import TYPE_CHECKING, Any
37+
38+
import numpy as np
39+
import pytest
40+
41+
import zarr
42+
from zarr.codecs.gzip import GzipCodec
43+
from zarr.core.config import config as zarr_config
44+
from zarr.storage import MemoryStore
45+
46+
if TYPE_CHECKING:
47+
from collections.abc import Callable, Iterator
48+
49+
50+
# ---------------------------------------------------------------------------
51+
# Reference helpers
52+
# ---------------------------------------------------------------------------
53+
54+
55+
def _store_snapshot(store: MemoryStore) -> dict[str, bytes]:
56+
"""Return {key: bytes} for every entry in the store."""
57+
return {k: bytes(v.to_bytes()) for k, v in store._store_dict.items()}
58+
59+
60+
# ---------------------------------------------------------------------------
61+
# Matrix definitions
62+
# ---------------------------------------------------------------------------
63+
64+
65+
# Each codec config is (filters, serializer, compressors). We only vary the
66+
# pieces that actually affect the pipeline. compressors=None means a
67+
# fixed-size chain (the byte-range fast path is eligible when sharded).
68+
CodecConfig = dict[str, Any]
69+
70+
CODEC_CONFIGS: list[tuple[str, CodecConfig]] = [
71+
("bytes-only", {"compressors": None}),
72+
("gzip", {"compressors": GzipCodec(level=1)}),
73+
]
74+
75+
76+
# (id, kwargs) — chunks/shards layout. kwargs are passed to create_array.
77+
LayoutConfig = dict[str, Any]
78+
79+
LAYOUT_CONFIGS: list[tuple[str, LayoutConfig]] = [
80+
("1d-unsharded", {"shape": (100,), "chunks": (10,), "shards": None}),
81+
("1d-1chunk-per-shard", {"shape": (100,), "chunks": (10,), "shards": (10,)}),
82+
("1d-multi-chunk-per-shard", {"shape": (100,), "chunks": (10,), "shards": (50,)}),
83+
("2d-unsharded", {"shape": (20, 20), "chunks": (5, 5), "shards": None}),
84+
("2d-sharded", {"shape": (20, 20), "chunks": (5, 5), "shards": (10, 10)}),
85+
]
86+
87+
88+
WriteOp = tuple[Any, Any] # (selection, value)
89+
WriteSequence = tuple[str, list[WriteOp]]
90+
91+
92+
def _full_overwrite(shape: tuple[int, ...]) -> list[WriteOp]:
93+
return [((slice(None),) * len(shape), np.arange(int(np.prod(shape))).reshape(shape) + 1)]
94+
95+
96+
def _partial_middle(shape: tuple[int, ...]) -> list[WriteOp]:
97+
if len(shape) == 1:
98+
n = shape[0]
99+
return [((slice(n // 4, 3 * n // 4),), 7)]
100+
# 2D: write a centered block
101+
rs = slice(shape[0] // 4, 3 * shape[0] // 4)
102+
cs = slice(shape[1] // 4, 3 * shape[1] // 4)
103+
return [((rs, cs), 7)]
104+
105+
106+
def _scalar_one_cell(shape: tuple[int, ...]) -> list[WriteOp]:
107+
if len(shape) == 1:
108+
return [((shape[0] // 2,), 99)]
109+
return [((shape[0] // 2, shape[1] // 2), 99)]
110+
111+
112+
def _overlapping(shape: tuple[int, ...]) -> list[WriteOp]:
113+
if len(shape) == 1:
114+
n = shape[0]
115+
return [
116+
((slice(0, n // 2),), 1),
117+
((slice(n // 4, 3 * n // 4),), 2),
118+
((slice(n // 2, n),), 3),
119+
]
120+
rs1, cs1 = slice(0, shape[0] // 2), slice(0, shape[1] // 2)
121+
rs2, cs2 = slice(shape[0] // 4, 3 * shape[0] // 4), slice(shape[1] // 4, 3 * shape[1] // 4)
122+
return [((rs1, cs1), 1), ((rs2, cs2), 2)]
123+
124+
125+
def _ends_in_fill(shape: tuple[int, ...]) -> list[WriteOp]:
126+
"""Write something then overwrite it with fill — exercises empty-chunk handling."""
127+
full = (slice(None),) * len(shape)
128+
return [(full, 5), (full, 0)]
129+
130+
131+
def _ends_in_partial_fill(shape: tuple[int, ...]) -> list[WriteOp]:
132+
"""Write data, then overwrite half with fill — some chunks become empty."""
133+
full: tuple[slice, ...]
134+
half: tuple[slice, ...]
135+
if len(shape) == 1:
136+
full = (slice(None),)
137+
half = (slice(0, shape[0] // 2),)
138+
else:
139+
full = (slice(None), slice(None))
140+
half = (slice(0, shape[0] // 2), slice(None))
141+
return [(full, 5), (half, 0)]
142+
143+
144+
SEQUENCES: list[tuple[str, Callable[[tuple[int, ...]], list[WriteOp]]]] = [
145+
("full-overwrite", _full_overwrite),
146+
("partial-middle", _partial_middle),
147+
("scalar-one-cell", _scalar_one_cell),
148+
("overlapping", _overlapping),
149+
("ends-in-fill", _ends_in_fill),
150+
("ends-in-partial-fill", _ends_in_partial_fill),
151+
]
152+
153+
154+
WRITE_EMPTY_CHUNKS = [False, True]
155+
156+
157+
# ---------------------------------------------------------------------------
158+
# Matrix iteration (pruned)
159+
# ---------------------------------------------------------------------------
160+
161+
162+
def _matrix() -> Iterator[Any]:
163+
for codec_id, codec_kwargs in CODEC_CONFIGS:
164+
for layout_id, layout in LAYOUT_CONFIGS:
165+
for seq_id, seq_fn in SEQUENCES:
166+
for wec in WRITE_EMPTY_CHUNKS:
167+
yield pytest.param(
168+
codec_kwargs,
169+
layout,
170+
seq_fn,
171+
wec,
172+
id=f"{layout_id}-{codec_id}-{seq_id}-wec{wec}",
173+
)
174+
175+
176+
# ---------------------------------------------------------------------------
177+
# The parity test
178+
# ---------------------------------------------------------------------------
179+
180+
181+
def _write_under_pipeline(
182+
pipeline_path: str,
183+
codec_kwargs: CodecConfig,
184+
layout: LayoutConfig,
185+
sequence: list[WriteOp],
186+
write_empty_chunks: bool,
187+
) -> tuple[MemoryStore, np.ndarray[Any, np.dtype[Any]]]:
188+
"""Apply a sequence of writes via the chosen pipeline.
189+
190+
Returns (store with the written data, final array contents read back).
191+
"""
192+
store = MemoryStore()
193+
with zarr_config.set({"codec_pipeline.path": pipeline_path}):
194+
arr = zarr.create_array(
195+
store=store,
196+
dtype="float64",
197+
fill_value=0.0,
198+
config={"write_empty_chunks": write_empty_chunks},
199+
**layout,
200+
**codec_kwargs,
201+
)
202+
for sel, val in sequence:
203+
arr[sel] = val
204+
contents = arr[...]
205+
return store, contents
206+
207+
208+
def _read_under_pipeline(pipeline_path: str, store: MemoryStore) -> np.ndarray[Any, np.dtype[Any]]:
209+
"""Re-open an existing store under the chosen pipeline and read it whole."""
210+
with zarr_config.set({"codec_pipeline.path": pipeline_path}):
211+
arr = zarr.open_array(store=store, mode="r")
212+
return arr[...] # type: ignore[no-any-return]
213+
214+
215+
_BATCHED = "zarr.core.codec_pipeline.BatchedCodecPipeline"
216+
_SYNC = "zarr.core.codec_pipeline.SyncCodecPipeline"
217+
218+
219+
@pytest.mark.parametrize(
220+
("codec_kwargs", "layout", "sequence_fn", "write_empty_chunks"),
221+
list(_matrix()),
222+
)
223+
def test_pipeline_parity(
224+
codec_kwargs: CodecConfig,
225+
layout: LayoutConfig,
226+
sequence_fn: Callable[[tuple[int, ...]], list[WriteOp]],
227+
write_empty_chunks: bool,
228+
) -> None:
229+
"""SyncCodecPipeline must be semantically identical to BatchedCodecPipeline.
230+
231+
Three checks, in order of decreasing diagnostic value:
232+
233+
1. Both pipelines return the same array contents after the same
234+
write sequence (catches semantic correctness bugs).
235+
2. Both pipelines produce the same set of store keys (catches
236+
empty-shard divergence: one deletes, the other doesn't).
237+
3. Each pipeline can correctly read the *other* pipeline's
238+
output (catches layout-divergence bugs that would prevent
239+
interop, e.g. dense vs compact shard layouts).
240+
241+
Byte-for-byte store equality is intentionally not checked: codecs
242+
like gzip embed wall-clock timestamps that vary between runs.
243+
"""
244+
sequence = sequence_fn(layout["shape"])
245+
246+
batched_store, batched_arr = _write_under_pipeline(
247+
_BATCHED, codec_kwargs, layout, sequence, write_empty_chunks
248+
)
249+
sync_store, sync_arr = _write_under_pipeline(
250+
_SYNC, codec_kwargs, layout, sequence, write_empty_chunks
251+
)
252+
253+
# 1. Array contents must agree.
254+
np.testing.assert_array_equal(
255+
sync_arr,
256+
batched_arr,
257+
err_msg="SyncCodecPipeline returned different array contents than BatchedCodecPipeline",
258+
)
259+
260+
# 2. Store key sets must agree.
261+
batched_keys = set(batched_store._store_dict) - {"zarr.json"}
262+
sync_keys = set(sync_store._store_dict) - {"zarr.json"}
263+
assert sync_keys == batched_keys, (
264+
f"Pipelines disagree on which store keys exist.\n"
265+
f" only in batched: {sorted(batched_keys - sync_keys)}\n"
266+
f" only in sync: {sorted(sync_keys - batched_keys)}"
267+
)
268+
269+
# 3. Cross-read: each pipeline must correctly read the other's output.
270+
sync_reads_batched = _read_under_pipeline(_SYNC, batched_store)
271+
batched_reads_sync = _read_under_pipeline(_BATCHED, sync_store)
272+
np.testing.assert_array_equal(
273+
sync_reads_batched,
274+
batched_arr,
275+
err_msg="SyncCodecPipeline could not correctly read BatchedCodecPipeline's output",
276+
)
277+
np.testing.assert_array_equal(
278+
batched_reads_sync,
279+
sync_arr,
280+
err_msg="BatchedCodecPipeline could not correctly read SyncCodecPipeline's output",
281+
)

0 commit comments

Comments
 (0)