|
3 | 3 | import json |
4 | 4 | import warnings |
5 | 5 | from contextlib import suppress |
| 6 | +from functools import partial |
6 | 7 | from typing import TYPE_CHECKING, Any |
7 | 8 |
|
8 | 9 | from packaging.version import parse as parse_version |
|
14 | 15 | Store, |
15 | 16 | SuffixByteRequest, |
16 | 17 | ) |
| 18 | +from zarr.core._coalesce import ( |
| 19 | + DEFAULT_COALESCE_OPTIONS, |
| 20 | + CoalesceOptions, |
| 21 | + coalesced_get, |
| 22 | +) |
17 | 23 | from zarr.core.buffer import Buffer |
18 | 24 | from zarr.errors import ZarrUserWarning |
19 | 25 | from zarr.storage._utils import _join_paths, normalize_path |
20 | 26 |
|
21 | 27 | if TYPE_CHECKING: |
22 | | - from collections.abc import AsyncIterator, Iterable |
| 28 | + from collections.abc import AsyncIterator, Iterable, Sequence |
23 | 29 |
|
24 | 30 | from fsspec import AbstractFileSystem |
25 | 31 | from fsspec.asyn import AsyncFileSystem |
26 | 32 | from fsspec.mapping import FSMap |
27 | 33 |
|
28 | 34 | from zarr.core.buffer import BufferPrototype |
| 35 | + from zarr.storage._protocols import SupportsGetRanges |
29 | 36 |
|
30 | 37 |
|
31 | 38 | ALLOWED_EXCEPTIONS: tuple[type[Exception], ...] = ( |
@@ -124,11 +131,14 @@ def __init__( |
124 | 131 | read_only: bool = False, |
125 | 132 | path: str = "/", |
126 | 133 | allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS, |
| 134 | + *, |
| 135 | + coalesce_options: CoalesceOptions = DEFAULT_COALESCE_OPTIONS, |
127 | 136 | ) -> None: |
128 | 137 | super().__init__(read_only=read_only) |
129 | 138 | self.fs = fs |
130 | 139 | self.path = normalize_path(path) |
131 | 140 | self.allowed_exceptions = allowed_exceptions |
| 141 | + self.coalesce_options = coalesce_options |
132 | 142 |
|
133 | 143 | if not self.fs.async_impl: |
134 | 144 | raise TypeError("Filesystem needs to support async operations.") |
@@ -315,6 +325,22 @@ async def get( |
315 | 325 | else: |
316 | 326 | return value |
317 | 327 |
|
| 328 | + async def get_ranges( |
| 329 | + self, |
| 330 | + key: str, |
| 331 | + byte_ranges: Iterable[ByteRequest | None], |
| 332 | + *, |
| 333 | + prototype: BufferPrototype, |
| 334 | + ) -> AsyncIterator[Sequence[tuple[int, Buffer | None]]]: |
| 335 | + """Read many byte ranges from ``key``, coalescing nearby ranges and fetching concurrently. |
| 336 | +
|
| 337 | + See :class:`zarr.storage._protocols.SupportsGetRanges` for the contract and |
| 338 | + :func:`zarr.core._coalesce.coalesced_get` for the full semantics. |
| 339 | + """ |
| 340 | + fetch = partial(self.get, key, prototype) |
| 341 | + async for group in coalesced_get(fetch, byte_ranges, options=self.coalesce_options): |
| 342 | + yield group |
| 343 | + |
318 | 344 | async def set( |
319 | 345 | self, |
320 | 346 | key: str, |
@@ -440,3 +466,8 @@ async def getsize(self, key: str) -> int: |
440 | 466 | else: |
441 | 467 | # fsspec doesn't have typing. We'll need to assume or verify this is true |
442 | 468 | return int(size) |
| 469 | + |
| 470 | + |
| 471 | +# Module-level type assertion: FsspecStore structurally satisfies SupportsGetRanges. |
| 472 | +# This line is a no-op at runtime but causes mypy/pyright to complain if the shape drifts. |
| 473 | +_: type[SupportsGetRanges] = FsspecStore |
0 commit comments