Skip to content

Commit 0bc2bbe

Browse files
authored
Merge branch 'main' into oindex-optimization
2 parents 5013068 + 5d92e85 commit 0bc2bbe

6 files changed

Lines changed: 134 additions & 33 deletions

File tree

changes/3828.misc.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
`CodecPipeline.read` and `CodecPipeline.read_batch` now return a tuple of typeddict objects
2+
that each carry information about the request for a chunk from storage.

changes/3836.doc.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Corrects the type annotation reported for the `batch_info` parameter in the `CodecPipeline.write`
2+
method docstring.

docs/user-guide/v3_migration.md

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -198,32 +198,29 @@ after the 3.0.0 release. If features listed below are important to your use case
198198
of Zarr-Python, please open (or comment on) a
199199
[GitHub issue](https://github.com/zarr-developers/zarr-python/issues/new).
200200

201-
- The following functions / methods have not been ported to Zarr-Python 3 yet:
201+
The following functions / methods have not been ported to Zarr-Python 3 yet:
202202

203-
* `zarr.copy` ([issue #2407](https://github.com/zarr-developers/zarr-python/issues/2407))
204-
* `zarr.copy_all` ([issue #2407](https://github.com/zarr-developers/zarr-python/issues/2407))
205-
* `zarr.copy_store` ([issue #2407](https://github.com/zarr-developers/zarr-python/issues/2407))
206-
* `zarr.Group.move` ([issue #2108](https://github.com/zarr-developers/zarr-python/issues/2108))
203+
- `zarr.copy` ([issue #2407](https://github.com/zarr-developers/zarr-python/issues/2407))
204+
- `zarr.copy_all` ([issue #2407](https://github.com/zarr-developers/zarr-python/issues/2407))
205+
- `zarr.copy_store` ([issue #2407](https://github.com/zarr-developers/zarr-python/issues/2407))
206+
- `zarr.Group.move` ([issue #2108](https://github.com/zarr-developers/zarr-python/issues/2108))
207207

208-
- The following features (corresponding to function arguments to functions in
208+
The following features (corresponding to function arguments to functions in
209209
`zarr`) have not been ported to Zarr-Python 3 yet. Using these features
210210
will raise a warning or a `NotImplementedError`:
211211

212-
* `cache_attrs`
213-
* `cache_metadata`
214-
* `chunk_store` ([issue #2495](https://github.com/zarr-developers/zarr-python/issues/2495))
215-
* `meta_array`
216-
* `object_codec` ([issue #2617](https://github.com/zarr-developers/zarr-python/issues/2617))
217-
* `synchronizer` ([issue #1596](https://github.com/zarr-developers/zarr-python/issues/1596))
218-
* `dimension_separator`
212+
- `cache_attrs`
213+
- `cache_metadata`
214+
- `chunk_store` ([issue #2495](https://github.com/zarr-developers/zarr-python/issues/2495))
215+
- `meta_array`
216+
- `object_codec` ([issue #2617](https://github.com/zarr-developers/zarr-python/issues/2617))
217+
- `synchronizer` ([issue #1596](https://github.com/zarr-developers/zarr-python/issues/1596))
218+
- `dimension_separator`
219219

220-
- The following features that were supported by Zarr-Python 2 have not been ported
220+
The following features that were supported by Zarr-Python 2 have not been ported
221221
to Zarr-Python 3 yet:
222222

223-
* Structured arrays / dtypes ([issue #2134](https://github.com/zarr-developers/zarr-python/issues/2134))
224-
* Fixed-length string dtypes ([issue #2347](https://github.com/zarr-developers/zarr-python/issues/2347))
225-
* Datetime and timedelta dtypes ([issue #2616](https://github.com/zarr-developers/zarr-python/issues/2616))
226-
* Object dtypes ([issue #2616](https://github.com/zarr-developers/zarr-python/issues/2616))
227-
* Ragged arrays ([issue #2618](https://github.com/zarr-developers/zarr-python/issues/2618))
228-
* Groups and Arrays do not implement `__enter__` and `__exit__` protocols ([issue #2619](https://github.com/zarr-developers/zarr-python/issues/2619))
229-
* Default filters for object dtypes for Zarr format 2 arrays ([issue #2627](https://github.com/zarr-developers/zarr-python/issues/2627))
223+
- Object dtypes ([issue #2616](https://github.com/zarr-developers/zarr-python/issues/2616))
224+
- Ragged arrays ([issue #2618](https://github.com/zarr-developers/zarr-python/issues/2618))
225+
- Groups and Arrays do not implement `__enter__` and `__exit__` protocols ([issue #2619](https://github.com/zarr-developers/zarr-python/issues/2619))
226+
- Default filters for object dtypes for Zarr format 2 arrays ([issue #2627](https://github.com/zarr-developers/zarr-python/issues/2627))

src/zarr/abc/codec.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from abc import abstractmethod
44
from collections.abc import Mapping
5-
from typing import TYPE_CHECKING, Generic, Protocol, TypeGuard, TypeVar, runtime_checkable
5+
from typing import TYPE_CHECKING, Generic, Literal, Protocol, TypeGuard, TypeVar, runtime_checkable
66

77
from typing_extensions import ReadOnly, TypedDict
88

@@ -32,9 +32,17 @@
3232
"CodecInput",
3333
"CodecOutput",
3434
"CodecPipeline",
35+
"GetResult",
3536
"SupportsSyncCodec",
3637
]
3738

39+
40+
class GetResult(TypedDict):
41+
"""Metadata about a store get operation."""
42+
43+
status: Literal["present", "missing"]
44+
45+
3846
CodecInput = TypeVar("CodecInput", bound=NDBuffer | Buffer)
3947
CodecOutput = TypeVar("CodecOutput", bound=NDBuffer | Buffer)
4048

@@ -433,13 +441,13 @@ async def read(
433441
batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]],
434442
out: NDBuffer,
435443
drop_axes: tuple[int, ...] = (),
436-
) -> None:
444+
) -> tuple[GetResult, ...]:
437445
"""Reads chunk data from the store, decodes it and writes it into an output array.
438446
Partial decoding may be utilized if the codecs and stores support it.
439447
440448
Parameters
441449
----------
442-
batch_info : Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple]]
450+
batch_info : Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]]
443451
Ordered set of information about the chunks.
444452
The first slice selection determines which parts of the chunk will be fetched.
445453
The second slice selection determines where in the output array the chunk data will be written.
@@ -451,6 +459,11 @@ async def read(
451459
``out``) to the fill value for the array.
452460
453461
out : NDBuffer
462+
463+
Returns
464+
-------
465+
tuple[GetResult, ...]
466+
One result per chunk in ``batch_info``.
454467
"""
455468
...
456469

@@ -467,7 +480,7 @@ async def write(
467480
468481
Parameters
469482
----------
470-
batch_info : Iterable[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]]
483+
batch_info : Iterable[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple, bool]]
471484
Ordered set of information about the chunks.
472485
The first slice selection determines which parts of the chunk will be encoded.
473486
The second slice selection determines where in the value array the chunk data is located.

src/zarr/core/codec_pipeline.py

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
BytesBytesCodec,
1414
Codec,
1515
CodecPipeline,
16+
GetResult,
1617
)
1718
from zarr.core.common import concurrent_map
1819
from zarr.core.config import config
@@ -251,47 +252,58 @@ async def read_batch(
251252
batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]],
252253
out: NDBuffer,
253254
drop_axes: tuple[int, ...] = (),
254-
) -> None:
255+
) -> tuple[GetResult, ...]:
256+
results: list[GetResult] = []
255257
if self.supports_partial_decode:
258+
batch_info_list = list(batch_info)
256259
chunk_array_batch = await self.decode_partial_batch(
257260
[
258261
(byte_getter, chunk_selection, chunk_spec)
259-
for byte_getter, chunk_spec, chunk_selection, *_ in batch_info
262+
for byte_getter, chunk_spec, chunk_selection, *_ in batch_info_list
260263
]
261264
)
262265
for chunk_array, (_, chunk_spec, _, out_selection, _) in zip(
263-
chunk_array_batch, batch_info, strict=False
266+
chunk_array_batch, batch_info_list, strict=False
264267
):
265268
if chunk_array is not None:
266269
if drop_axes:
267270
chunk_array = chunk_array.squeeze(axis=drop_axes)
268271
out[out_selection] = chunk_array
272+
results.append(GetResult(status="present"))
269273
else:
270274
out[out_selection] = fill_value_or_default(chunk_spec)
275+
results.append(GetResult(status="missing"))
271276
else:
277+
batch_info_list = list(batch_info)
272278
chunk_bytes_batch = await concurrent_map(
273-
[(byte_getter, array_spec.prototype) for byte_getter, array_spec, *_ in batch_info],
279+
[
280+
(byte_getter, array_spec.prototype)
281+
for byte_getter, array_spec, *_ in batch_info_list
282+
],
274283
lambda byte_getter, prototype: byte_getter.get(prototype),
275284
config.get("async.concurrency"),
276285
)
277286
chunk_array_batch = await self.decode_batch(
278287
[
279288
(chunk_bytes, chunk_spec)
280289
for chunk_bytes, (_, chunk_spec, *_) in zip(
281-
chunk_bytes_batch, batch_info, strict=False
290+
chunk_bytes_batch, batch_info_list, strict=False
282291
)
283292
],
284293
)
285294
for chunk_array, (_, chunk_spec, chunk_selection, out_selection, _) in zip(
286-
chunk_array_batch, batch_info, strict=False
295+
chunk_array_batch, batch_info_list, strict=False
287296
):
288297
if chunk_array is not None:
289298
tmp = chunk_array[chunk_selection]
290299
if drop_axes:
291300
tmp = tmp.squeeze(axis=drop_axes)
292301
out[out_selection] = tmp
302+
results.append(GetResult(status="present"))
293303
else:
294304
out[out_selection] = fill_value_or_default(chunk_spec)
305+
results.append(GetResult(status="missing"))
306+
return tuple(results)
295307

296308
def _merge_chunk_array(
297309
self,
@@ -471,15 +483,19 @@ async def read(
471483
batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]],
472484
out: NDBuffer,
473485
drop_axes: tuple[int, ...] = (),
474-
) -> None:
475-
await concurrent_map(
486+
) -> tuple[GetResult, ...]:
487+
batch_results = await concurrent_map(
476488
[
477489
(single_batch_info, out, drop_axes)
478490
for single_batch_info in batched(batch_info, self.batch_size)
479491
],
480492
self.read_batch,
481493
config.get("async.concurrency"),
482494
)
495+
results: list[GetResult] = []
496+
for batch in batch_results:
497+
results.extend(batch)
498+
return tuple(results)
483499

484500
async def write(
485501
self,

tests/test_codec_pipeline.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
from __future__ import annotations
2+
3+
import pytest
4+
5+
import zarr
6+
from zarr.core.buffer.core import default_buffer_prototype
7+
from zarr.core.indexing import BasicIndexer
8+
from zarr.storage import MemoryStore
9+
10+
11+
@pytest.mark.parametrize(
12+
("write_slice", "read_slice", "expected_statuses"),
13+
[
14+
# Write all chunks, read all — all present
15+
(slice(None), slice(None), ("present", "present", "present")),
16+
# Write first chunk only, read all — first present, rest missing
17+
(slice(0, 2), slice(None), ("present", "missing", "missing")),
18+
# Write nothing, read all — all missing
19+
(None, slice(None), ("missing", "missing", "missing")),
20+
],
21+
)
22+
async def test_read_returns_get_results(
23+
write_slice: slice | None,
24+
read_slice: slice,
25+
expected_statuses: tuple[str, ...],
26+
) -> None:
27+
"""
28+
Test that CodecPipeline.read returns a tuple of GetResult with correct statuses.
29+
"""
30+
store = MemoryStore()
31+
arr = zarr.open_array(store, mode="w", shape=(6,), chunks=(2,), dtype="int64", fill_value=-1)
32+
33+
if write_slice is not None:
34+
arr[write_slice] = 0
35+
36+
async_arr = arr._async_array
37+
pipeline = async_arr.codec_pipeline
38+
metadata = async_arr.metadata
39+
40+
prototype = default_buffer_prototype()
41+
config = async_arr.config
42+
indexer = BasicIndexer(
43+
read_slice,
44+
shape=metadata.shape,
45+
chunk_grid=metadata.chunk_grid,
46+
)
47+
48+
out_buffer = prototype.nd_buffer.empty(
49+
shape=indexer.shape,
50+
dtype=metadata.dtype.to_native_dtype(),
51+
order=config.order,
52+
)
53+
54+
results = await pipeline.read(
55+
[
56+
(
57+
async_arr.store_path / metadata.encode_chunk_key(chunk_coords),
58+
metadata.get_chunk_spec(chunk_coords, config, prototype=prototype),
59+
chunk_selection,
60+
out_selection,
61+
is_complete_chunk,
62+
)
63+
for chunk_coords, chunk_selection, out_selection, is_complete_chunk in indexer
64+
],
65+
out_buffer,
66+
drop_axes=indexer.drop_axes,
67+
)
68+
69+
assert len(results) == len(expected_statuses)
70+
for result, expected_status in zip(results, expected_statuses, strict=True):
71+
assert result["status"] == expected_status

0 commit comments

Comments
 (0)