forked from zarr-developers/zarr-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathv2.py
More file actions
353 lines (293 loc) · 12.8 KB
/
v2.py
File metadata and controls
353 lines (293 loc) · 12.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
from __future__ import annotations
import base64
from collections.abc import Iterable
from enum import Enum
from functools import cached_property
from typing import TYPE_CHECKING, Any, TypedDict, cast
from zarr.abc.metadata import Metadata
if TYPE_CHECKING:
from typing import Any, Literal, Self
import numpy.typing as npt
from zarr.core.buffer import Buffer, BufferPrototype
from zarr.core.common import JSON, ChunkCoords
import json
from dataclasses import dataclass, field, fields, replace
import numcodecs
import numpy as np
from zarr.core.array_spec import ArraySpec
from zarr.core.chunk_grids import RegularChunkGrid
from zarr.core.chunk_key_encodings import parse_separator
from zarr.core.common import ZARRAY_JSON, ZATTRS_JSON, MemoryOrder, parse_shapelike
from zarr.core.config import config, parse_indexing_order
from zarr.core.metadata.common import parse_attributes
class ArrayV2MetadataDict(TypedDict):
"""
A typed dictionary model for zarr v2 metadata.
"""
zarr_format: Literal[2]
attributes: dict[str, JSON]
@dataclass(frozen=True, kw_only=True)
class ArrayV2Metadata(Metadata):
shape: ChunkCoords
chunks: tuple[int, ...]
dtype: np.dtype[Any]
fill_value: None | int | float | str | bytes = 0
order: MemoryOrder = "C"
filters: tuple[numcodecs.abc.Codec, ...] | None = None
dimension_separator: Literal[".", "/"] = "."
compressor: numcodecs.abc.Codec | None = None
attributes: dict[str, JSON] = field(default_factory=dict)
zarr_format: Literal[2] = field(init=False, default=2)
def __init__(
self,
*,
shape: ChunkCoords,
dtype: npt.DTypeLike,
chunks: ChunkCoords,
fill_value: Any,
order: MemoryOrder,
dimension_separator: Literal[".", "/"] = ".",
compressor: numcodecs.abc.Codec | dict[str, JSON] | None = None,
filters: Iterable[numcodecs.abc.Codec | dict[str, JSON]] | None = None,
attributes: dict[str, JSON] | None = None,
) -> None:
"""
Metadata for a Zarr version 2 array.
"""
shape_parsed = parse_shapelike(shape)
dtype_parsed = parse_dtype(dtype)
chunks_parsed = parse_shapelike(chunks)
if not filters and not compressor:
filters, compressor = _default_filters_and_compressor(dtype_parsed)
if dtype is str or dtype == "str":
vlen_codec: dict[str, JSON] = {"id": "vlen-utf8"}
if filters and not any(x["id"] == "vlen-utf8" for x in filters):
filters = list(filters) + [vlen_codec]
else:
filters = [vlen_codec]
compressor_parsed = parse_compressor(compressor)
order_parsed = parse_indexing_order(order)
dimension_separator_parsed = parse_separator(dimension_separator)
filters_parsed = parse_filters(filters)
fill_value_parsed = parse_fill_value(fill_value, dtype=dtype_parsed)
attributes_parsed = parse_attributes(attributes)
object.__setattr__(self, "shape", shape_parsed)
object.__setattr__(self, "dtype", dtype_parsed)
object.__setattr__(self, "chunks", chunks_parsed)
object.__setattr__(self, "compressor", compressor_parsed)
object.__setattr__(self, "order", order_parsed)
object.__setattr__(self, "dimension_separator", dimension_separator_parsed)
object.__setattr__(self, "filters", filters_parsed)
object.__setattr__(self, "fill_value", fill_value_parsed)
object.__setattr__(self, "attributes", attributes_parsed)
# ensure that the metadata document is consistent
_ = parse_metadata(self)
@property
def ndim(self) -> int:
return len(self.shape)
@cached_property
def chunk_grid(self) -> RegularChunkGrid:
return RegularChunkGrid(chunk_shape=self.chunks)
def to_buffer_dict(self, prototype: BufferPrototype) -> dict[str, Buffer]:
def _json_convert(
o: Any,
) -> Any:
if isinstance(o, np.dtype):
if o.fields is None:
return o.str
else:
return o.descr
if isinstance(o, numcodecs.abc.Codec):
return o.get_config()
if np.isscalar(o):
out: Any
if hasattr(o, "dtype") and o.dtype.kind == "M" and hasattr(o, "view"):
# https://github.com/zarr-developers/zarr-python/issues/2119
# `.item()` on a datetime type might or might not return an
# integer, depending on the value.
# Explicitly cast to an int first, and then grab .item()
out = o.view("i8").item()
else:
# convert numpy scalar to python type, and pass
# python types through
out = getattr(o, "item", lambda: o)()
if isinstance(out, complex):
# python complex types are not JSON serializable, so we use the
# serialization defined in the zarr v3 spec
return [out.real, out.imag]
return out
if isinstance(o, Enum):
return o.name
raise TypeError
zarray_dict = self.to_dict()
zattrs_dict = zarray_dict.pop("attributes", {})
json_indent = config.get("json_indent")
return {
ZARRAY_JSON: prototype.buffer.from_bytes(
json.dumps(zarray_dict, default=_json_convert, indent=json_indent).encode()
),
ZATTRS_JSON: prototype.buffer.from_bytes(
json.dumps(zattrs_dict, indent=json_indent).encode()
),
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> ArrayV2Metadata:
# make a copy to protect the original from modification
_data = data.copy()
# check that the zarr_format attribute is correct
_ = parse_zarr_format(_data.pop("zarr_format"))
dtype = parse_dtype(_data["dtype"])
if dtype.kind in "SV":
fill_value_encoded = _data.get("fill_value")
if fill_value_encoded is not None:
fill_value = base64.standard_b64decode(fill_value_encoded)
_data["fill_value"] = fill_value
# zarr v2 allowed arbitrary keys here.
# We don't want the ArrayV2Metadata constructor to fail just because someone put an
# extra key in the metadata.
expected = {x.name for x in fields(cls)}
# https://github.com/zarr-developers/zarr-python/issues/2269
# handle the renames
expected |= {"dtype", "chunks"}
_data = {k: v for k, v in _data.items() if k in expected}
return cls(**_data)
def to_dict(self) -> dict[str, JSON]:
zarray_dict = super().to_dict()
if self.dtype.kind in "SV" and self.fill_value is not None:
# There's a relationship between self.dtype and self.fill_value
# that mypy isn't aware of. The fact that we have S or V dtype here
# means we should have a bytes-type fill_value.
fill_value = base64.standard_b64encode(cast(bytes, self.fill_value)).decode("ascii")
zarray_dict["fill_value"] = fill_value
_ = zarray_dict.pop("dtype")
zarray_dict["dtype"] = self.dtype.str
return zarray_dict
def get_chunk_spec(
self, _chunk_coords: ChunkCoords, order: MemoryOrder, prototype: BufferPrototype
) -> ArraySpec:
return ArraySpec(
shape=self.chunks,
dtype=self.dtype,
fill_value=self.fill_value,
order=order,
prototype=prototype,
)
def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str:
chunk_identifier = self.dimension_separator.join(map(str, chunk_coords))
return "0" if chunk_identifier == "" else chunk_identifier
def update_shape(self, shape: ChunkCoords) -> Self:
return replace(self, shape=shape)
def update_attributes(self, attributes: dict[str, JSON]) -> Self:
return replace(self, attributes=attributes)
def parse_dtype(data: npt.DTypeLike) -> np.dtype[Any]:
return np.dtype(data)
def parse_zarr_format(data: object) -> Literal[2]:
if data == 2:
return 2
raise ValueError(f"Invalid value. Expected 2. Got {data}.")
def parse_filters(data: object) -> tuple[numcodecs.abc.Codec, ...] | None:
"""
Parse a potential tuple of filters
"""
out: list[numcodecs.abc.Codec] = []
if data is None:
return data
if isinstance(data, Iterable):
for idx, val in enumerate(data):
if isinstance(val, numcodecs.abc.Codec):
out.append(val)
elif isinstance(val, dict):
out.append(numcodecs.get_codec(val))
else:
msg = f"Invalid filter at index {idx}. Expected a numcodecs.abc.Codec or a dict representation of numcodecs.abc.Codec. Got {type(val)} instead."
raise TypeError(msg)
return tuple(out)
msg = f"Invalid filters. Expected None, an iterable of numcodecs.abc.Codec or dict representations of numcodecs.abc.Codec. Got {type(data)} instead."
raise TypeError(msg)
def parse_compressor(data: object) -> numcodecs.abc.Codec | None:
"""
Parse a potential compressor.
"""
if data is None or isinstance(data, numcodecs.abc.Codec):
return data
if isinstance(data, dict):
return numcodecs.get_codec(data)
msg = f"Invalid compressor. Expected None, a numcodecs.abc.Codec, or a dict representation of a numcodecs.abc.Codec. Got {type(data)} instead."
raise ValueError(msg)
def parse_metadata(data: ArrayV2Metadata) -> ArrayV2Metadata:
if (l_chunks := len(data.chunks)) != (l_shape := len(data.shape)):
msg = (
f"The `shape` and `chunks` attributes must have the same length. "
f"`chunks` has length {l_chunks}, but `shape` has length {l_shape}."
)
raise ValueError(msg)
return data
def parse_fill_value(fill_value: object, dtype: np.dtype[Any]) -> Any:
"""
Parse a potential fill value into a value that is compatible with the provided dtype.
Parameters
----------
fill_value : Any
A potential fill value.
dtype : np.dtype[Any]
A numpy dtype.
Returns
-------
An instance of `dtype`, or `None`, or any python object (in the case of an object dtype)
"""
if fill_value is None or dtype.hasobject:
# no fill value
pass
elif not isinstance(fill_value, np.void) and fill_value == 0:
# this should be compatible across numpy versions for any array type, including
# structured arrays
fill_value = np.zeros((), dtype=dtype)[()]
elif dtype.kind == "U":
# special case unicode because of encoding issues on Windows if passed through numpy
# https://github.com/alimanfoo/zarr/pull/172#issuecomment-343782713
if not isinstance(fill_value, str):
raise ValueError(
f"fill_value {fill_value!r} is not valid for dtype {dtype}; must be a unicode string"
)
else:
try:
if isinstance(fill_value, bytes) and dtype.kind == "V":
# special case for numpy 1.14 compatibility
fill_value = np.array(fill_value, dtype=dtype.str).view(dtype)[()]
else:
fill_value = np.array(fill_value, dtype=dtype)[()]
except Exception as e:
msg = f"Fill_value {fill_value} is not valid for dtype {dtype}."
raise ValueError(msg) from e
return fill_value
def _default_fill_value(dtype: np.dtype[Any]) -> Any:
"""
Get the default fill value for a type.
Notes
-----
This differs from :func:`parse_fill_value`, which parses a fill value
stored in the Array metadata into an in-memory value. This only gives
the default fill value for some type.
This is useful for reading Zarr V2 arrays, which allow the fill
value to be unspecified.
"""
if dtype.kind == "S":
return b""
elif dtype.kind in "UO":
return ""
else:
return dtype.type(0)
def _default_filters_and_compressor(
dtype: np.dtype[Any],
) -> tuple[list[dict[str, str]], dict[str, str] | None]:
"""Get the default filters and compressor for a dtype.
The config contains a mapping from numpy dtype kind to the default compressor.
https://numpy.org/doc/2.1/reference/generated/numpy.dtype.kind.html
"""
dtype_kind_to_default_compressor = config.get("v2_dtype_kind_to_default_filters_and_compressor")
for dtype_kinds, filters_and_compressor in dtype_kind_to_default_compressor.items():
if dtype.kind in dtype_kinds:
filters = [{"id": f} for f in filters_and_compressor]
compressor = None
return filters, compressor
return [], None