forked from zarr-developers/zarr-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcommon.py
More file actions
330 lines (261 loc) · 10.7 KB
/
common.py
File metadata and controls
330 lines (261 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
from __future__ import annotations
import asyncio
import functools
import math
import operator
import warnings
from collections.abc import Iterable, Mapping, Sequence
from enum import Enum
from itertools import starmap
from typing import (
TYPE_CHECKING,
Any,
Final,
Literal,
NotRequired,
TypedDict,
cast,
overload,
)
import numpy as np
from typing_extensions import ReadOnly
from zarr.core.config import config as zarr_config
from zarr.errors import ZarrRuntimeWarning
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Iterator
ZARR_JSON = "zarr.json"
ZARRAY_JSON = ".zarray"
ZGROUP_JSON = ".zgroup"
ZATTRS_JSON = ".zattrs"
ZMETADATA_V2_JSON = ".zmetadata"
BytesLike = bytes | bytearray | memoryview
ShapeLike = Iterable[int | np.integer[Any]] | int | np.integer[Any]
ChunksLike = ShapeLike | Sequence[Sequence[int]] | None
# For backwards compatibility
ChunkCoords = tuple[int, ...]
ZarrFormat = Literal[2, 3]
NodeType = Literal["array", "group"]
JSON = str | int | float | bool | Mapping[str, "JSON"] | Sequence["JSON"] | None
MemoryOrder = Literal["C", "F"]
AccessModeLiteral = Literal["r", "r+", "a", "w", "w-"]
ANY_ACCESS_MODE: Final = "r", "r+", "a", "w", "w-"
DimensionNamesLike = Iterable[str | None] | None
DimensionNames = DimensionNamesLike # for backwards compatibility
class NamedConfig[TName: str, TConfig: Mapping[str, object]](TypedDict):
"""
A typed dictionary representing an object with a name and configuration, where the configuration
is an optional mapping of string keys to values, e.g. another typed dictionary or a JSON object.
This class is generic with two type parameters: the type of the name (``TName``) and the type of
the configuration (``TConfig``).
"""
name: ReadOnly[TName]
"""The name of the object."""
configuration: NotRequired[ReadOnly[TConfig]]
"""The configuration of the object. Not required."""
class NamedRequiredConfig[TName: str, TConfig: Mapping[str, object]](TypedDict):
"""
A typed dictionary representing an object with a name and configuration, where the configuration
is a mapping of string keys to values, e.g. another typed dictionary or a JSON object.
This class is generic with two type parameters: the type of the name (``TName``) and the type of
the configuration (``TConfig``).
"""
name: ReadOnly[TName]
"""The name of the object."""
configuration: ReadOnly[TConfig]
"""The configuration of the object."""
def product(tup: tuple[int, ...]) -> int:
return functools.reduce(operator.mul, tup, 1)
def ceildiv(a: float, b: float) -> int:
if a == 0:
return 0
return math.ceil(a / b)
async def concurrent_map[T: tuple[Any, ...], V](
items: Iterable[T],
func: Callable[..., Awaitable[V]],
limit: int | None = None,
) -> list[V]:
if limit is None:
return await asyncio.gather(*list(starmap(func, items)))
else:
sem = asyncio.Semaphore(limit)
async def run(item: tuple[Any]) -> V:
async with sem:
return await func(*item)
return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])
def enum_names[E: Enum](enum: type[E]) -> Iterator[str]:
for item in enum:
yield item.name
def parse_enum[E: Enum](data: object, cls: type[E]) -> E:
if isinstance(data, cls):
return data
if not isinstance(data, str):
raise TypeError(f"Expected str, got {type(data)}")
if data in enum_names(cls):
return cls(data)
raise ValueError(f"Value must be one of {list(enum_names(cls))!r}. Got {data} instead.")
def parse_name(data: JSON, expected: str | None = None) -> str:
if isinstance(data, str):
if expected is None or data == expected:
return data
raise ValueError(f"Expected '{expected}'. Got {data} instead.")
else:
raise TypeError(f"Expected a string, got an instance of {type(data)}.")
def parse_configuration(data: JSON) -> JSON:
if not isinstance(data, dict):
raise TypeError(f"Expected dict, got {type(data)}")
return data
@overload
def parse_named_configuration(
data: JSON | NamedConfig[str, Any], expected_name: str | None = None
) -> tuple[str, dict[str, JSON]]: ...
@overload
def parse_named_configuration(
data: JSON | NamedConfig[str, Any],
expected_name: str | None = None,
*,
require_configuration: bool = True,
) -> tuple[str, dict[str, JSON] | None]: ...
def parse_named_configuration(
data: JSON | NamedConfig[str, Any],
expected_name: str | None = None,
*,
require_configuration: bool = True,
) -> tuple[str, JSON | None]:
if not isinstance(data, dict):
raise TypeError(f"Expected dict, got {type(data)}")
if "name" not in data:
raise ValueError(f"Named configuration does not have a 'name' key. Got {data}.")
name_parsed = parse_name(data["name"], expected_name)
if "configuration" in data:
configuration_parsed = parse_configuration(data["configuration"])
elif require_configuration:
raise ValueError(f"Named configuration does not have a 'configuration' key. Got {data}.")
else:
configuration_parsed = None
return name_parsed, configuration_parsed
def parse_shapelike(data: ShapeLike) -> tuple[int, ...]:
"""
Parse a shape-like input into an explicit shape.
"""
if isinstance(data, int | np.integer):
if data < 0:
raise ValueError(f"Expected a non-negative integer. Got {data} instead")
return (int(data),)
try:
data_tuple = tuple(data)
except TypeError as e:
msg = f"Expected an integer or an iterable of integers. Got {data} instead."
raise TypeError(msg) from e
if not all(isinstance(v, int | np.integer) for v in data_tuple):
msg = f"Expected an iterable of integers. Got {data} instead."
raise TypeError(msg)
if not all(v > -1 for v in data_tuple):
msg = f"Expected all values to be non-negative. Got {data} instead."
raise ValueError(msg)
# cast NumPy scalars to plain python ints
return tuple(int(x) for x in data_tuple)
def parse_fill_value(data: Any) -> Any:
# todo: real validation
return data
def parse_order(data: Any) -> Literal["C", "F"]:
if data in ("C", "F"):
return cast("Literal['C', 'F']", data)
raise ValueError(f"Expected one of ('C', 'F'), got {data} instead.")
def parse_bool(data: Any) -> bool:
if isinstance(data, bool):
return data
raise ValueError(f"Expected bool, got {data} instead.")
def _warn_write_empty_chunks_kwarg() -> None:
# TODO: link to docs page on array configuration in this message
msg = (
"The `write_empty_chunks` keyword argument is deprecated and will be removed in future versions. "
"To control whether empty chunks are written to storage, either use the `config` keyword "
"argument, as in `config={'write_empty_chunks': True}`,"
"or change the global 'array.write_empty_chunks' configuration variable."
)
warnings.warn(msg, ZarrRuntimeWarning, stacklevel=2)
def _warn_order_kwarg() -> None:
# TODO: link to docs page on array configuration in this message
msg = (
"The `order` keyword argument has no effect for Zarr format 3 arrays. "
"To control the memory layout of the array, either use the `config` keyword "
"argument, as in `config={'order': 'C'}`,"
"or change the global 'array.order' configuration variable."
)
warnings.warn(msg, ZarrRuntimeWarning, stacklevel=2)
def _default_zarr_format() -> ZarrFormat:
"""Return the default zarr_format."""
return cast("ZarrFormat", int(zarr_config.get("default_zarr_format", 3)))
def expand_rle(data: Sequence[int | list[int]]) -> list[int]:
"""Expand a mixed array of bare integers and RLE pairs.
Per the rectilinear chunk grid spec, each element can be:
- a bare integer (an explicit edge length)
- a two-element array ``[value, count]`` (run-length encoded)
"""
result: list[int] = []
for item in data:
if isinstance(item, (int, float)) and not isinstance(item, bool):
val = int(item)
if val < 1:
raise ValueError(f"Chunk edge length must be >= 1, got {val}")
result.append(val)
elif isinstance(item, list) and len(item) == 2:
size, count = int(item[0]), int(item[1])
if size < 1:
raise ValueError(f"Chunk edge length must be >= 1, got {size}")
if count < 1:
raise ValueError(f"RLE repeat count must be >= 1, got {count}")
result.extend([size] * count)
else:
raise ValueError(f"RLE entries must be an integer or [size, count], got {item}")
return result
def compress_rle(sizes: Sequence[int]) -> list[int | list[int]]:
"""Compress chunk sizes to mixed RLE format per the rectilinear spec.
Runs of length > 1 are emitted as ``[value, count]`` pairs; runs of
length 1 are emitted as bare integers::
[10, 10, 10, 5] -> [[10, 3], 5]
"""
if not sizes:
return []
result: list[int | list[int]] = []
current = sizes[0]
count = 1
for s in sizes[1:]:
if s == current:
count += 1
else:
result.append([current, count] if count > 1 else current)
current = s
count = 1
result.append([current, count] if count > 1 else current)
return result
def validate_rectilinear_kind(kind: str | None) -> None:
"""Validate the ``kind`` field of a rectilinear chunk grid configuration.
The rectilinear spec requires ``kind: "inline"``.
"""
if kind is None:
raise ValueError(
"Rectilinear chunk grid configuration requires a 'kind' field. "
"Only 'inline' is currently supported."
)
if kind != "inline":
raise ValueError(
f"Unsupported rectilinear chunk grid kind: {kind!r}. "
"Only 'inline' is currently supported."
)
def validate_rectilinear_edges(
chunk_shapes: Sequence[int | Sequence[int]], array_shape: Sequence[int]
) -> None:
"""Validate that rectilinear chunk edges cover the array extent per dimension.
Bare-int dimensions (regular step) always cover any extent, so they are
skipped. Explicit edge lists must sum to at least the array extent.
"""
for i, (dim_spec, extent) in enumerate(zip(chunk_shapes, array_shape, strict=True)):
if isinstance(dim_spec, int):
continue
edge_sum = sum(dim_spec)
if edge_sum < extent:
raise ValueError(
f"Rectilinear chunk edges for dimension {i} sum to {edge_sum} "
f"but array shape extent is {extent} (edge sum must be >= extent)"
)