-
-
Notifications
You must be signed in to change notification settings - Fork 396
Expand file tree
/
Copy path_codecs.py
More file actions
330 lines (238 loc) · 12 KB
/
_codecs.py
File metadata and controls
330 lines (238 loc) · 12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
"""
This module provides compatibility for [numcodecs][] in Zarr version 3.
These codecs were previously defined in [numcodecs][], and have now been moved to `zarr`.
```python
import numpy as np
import zarr
import zarr.codecs.numcodecs as numcodecs
array = zarr.create_array(
store="data_numcodecs.zarr",
shape=(1024, 1024),
chunks=(64, 64),
dtype="uint32",
filters=[numcodecs.Delta(dtype="uint32")],
compressors=[numcodecs.BZ2(level=5)],
overwrite=True)
array[:] = np.arange(np.prod(array.shape), dtype=array.dtype).reshape(*array.shape)
```
!!! note
Please note that the codecs in [zarr.codecs.numcodecs][] are not part of the Zarr version
3 specification. Using these codecs might cause interoperability issues with other Zarr
implementations.
"""
from __future__ import annotations
import asyncio
import math
from dataclasses import dataclass, replace
from functools import cached_property
from typing import TYPE_CHECKING, Any, Self
import numpy as np
from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec, BytesBytesCodec
from zarr.abc.metadata import Metadata
from zarr.core.buffer.cpu import as_numpy_array_wrapper
from zarr.core.common import JSON, parse_named_configuration, product
from zarr.dtype import UInt8, ZDType, parse_dtype
from zarr.registry import get_numcodec
if TYPE_CHECKING:
from zarr.abc.numcodec import Numcodec
from zarr.core.array_spec import ArraySpec
from zarr.core.buffer import Buffer, NDBuffer
CODEC_PREFIX = "numcodecs."
def _expect_name_prefix(codec_name: str) -> str:
if not codec_name.startswith(CODEC_PREFIX):
raise ValueError(
f"Expected name to start with '{CODEC_PREFIX}'. Got {codec_name} instead."
) # pragma: no cover
return codec_name.removeprefix(CODEC_PREFIX)
def _parse_codec_configuration(data: dict[str, JSON]) -> dict[str, JSON]:
parsed_name, parsed_configuration = parse_named_configuration(data)
if not parsed_name.startswith(CODEC_PREFIX):
raise ValueError(
f"Expected name to start with '{CODEC_PREFIX}'. Got {parsed_name} instead."
) # pragma: no cover
id = _expect_name_prefix(parsed_name)
return {"id": id, **parsed_configuration}
@dataclass(frozen=True)
class _NumcodecsCodec(Metadata):
codec_name: str
codec_config: dict[str, JSON]
def __init_subclass__(cls, *, codec_name: str | None = None, **kwargs: Any) -> None:
"""To be used only when creating the actual public-facing codec class."""
super().__init_subclass__(**kwargs)
if codec_name is not None:
namespace = codec_name
cls_name = f"{CODEC_PREFIX}{namespace}.{cls.__name__}"
cls.codec_name = f"{CODEC_PREFIX}{namespace}"
cls.__doc__ = f"""
See [{cls_name}][] for more details and parameters.
"""
def __init__(self, **codec_config: JSON) -> None:
if not self.codec_name:
raise ValueError(
"The codec name needs to be supplied through the `codec_name` attribute."
) # pragma: no cover
unprefixed_codec_name = _expect_name_prefix(self.codec_name)
if "id" not in codec_config:
codec_config = {"id": unprefixed_codec_name, **codec_config}
elif codec_config["id"] != unprefixed_codec_name:
raise ValueError(
f"Codec id does not match {unprefixed_codec_name}. Got: {codec_config['id']}."
) # pragma: no cover
object.__setattr__(self, "codec_config", codec_config)
@cached_property
def _codec(self) -> Numcodec:
return get_numcodec(self.codec_config) # type: ignore[arg-type]
@classmethod
def from_dict(cls, data: dict[str, JSON]) -> Self:
codec_config = _parse_codec_configuration(data)
return cls(**codec_config)
def to_dict(self) -> dict[str, JSON]:
codec_config = self.codec_config.copy()
codec_config.pop("id", None)
return {
"name": self.codec_name,
"configuration": codec_config,
}
def compute_encoded_size(self, input_byte_length: int, chunk_spec: ArraySpec) -> int:
raise NotImplementedError # pragma: no cover
# Override __repr__ because dynamically constructed classes don't seem to work otherwise
def __repr__(self) -> str:
codec_config = self.codec_config.copy()
codec_config.pop("id", None)
return f"{self.__class__.__name__}(codec_name={self.codec_name!r}, codec_config={codec_config!r})"
class _NumcodecsBytesBytesCodec(_NumcodecsCodec, BytesBytesCodec):
def __init__(self, **codec_config: JSON) -> None:
super().__init__(**codec_config)
def _decode_sync(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
return as_numpy_array_wrapper(self._codec.decode, chunk_data, chunk_spec.prototype)
def _encode_sync(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
encoded = self._codec.encode(chunk_data.as_array_like())
if isinstance(encoded, np.ndarray): # Required for checksum codecs
return chunk_spec.prototype.buffer.from_bytes(encoded.tobytes())
return chunk_spec.prototype.buffer.from_bytes(encoded)
async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(self._decode_sync, chunk_data, chunk_spec)
async def _encode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(self._encode_sync, chunk_data, chunk_spec)
class _NumcodecsArrayArrayCodec(_NumcodecsCodec, ArrayArrayCodec):
def __init__(self, **codec_config: JSON) -> None:
super().__init__(**codec_config)
def _decode_sync(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
chunk_ndarray = chunk_data.as_ndarray_like()
out = self._codec.decode(chunk_ndarray)
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out.reshape(chunk_spec.shape))
def _encode_sync(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
chunk_ndarray = chunk_data.as_ndarray_like()
out = self._codec.encode(chunk_ndarray)
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out)
async def _decode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
return await asyncio.to_thread(self._decode_sync, chunk_data, chunk_spec)
async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer:
return await asyncio.to_thread(self._encode_sync, chunk_data, chunk_spec)
class _NumcodecsArrayBytesCodec(_NumcodecsCodec, ArrayBytesCodec):
def __init__(self, **codec_config: JSON) -> None:
super().__init__(**codec_config)
def _decode_sync(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> NDBuffer:
chunk_bytes = chunk_data.to_bytes()
out = self._codec.decode(chunk_bytes)
return chunk_spec.prototype.nd_buffer.from_ndarray_like(out.reshape(chunk_spec.shape))
def _encode_sync(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> Buffer:
chunk_ndarray = chunk_data.as_ndarray_like()
out = self._codec.encode(chunk_ndarray)
return chunk_spec.prototype.buffer.from_bytes(out)
async def _decode_single(self, chunk_data: Buffer, chunk_spec: ArraySpec) -> NDBuffer:
return await asyncio.to_thread(self._decode_sync, chunk_data, chunk_spec)
async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> Buffer:
return await asyncio.to_thread(self._encode_sync, chunk_data, chunk_spec)
# bytes-to-bytes codecs
class Blosc(_NumcodecsBytesBytesCodec, codec_name="blosc"):
pass
class LZ4(_NumcodecsBytesBytesCodec, codec_name="lz4"):
pass
class Zstd(_NumcodecsBytesBytesCodec, codec_name="zstd"):
pass
class Zlib(_NumcodecsBytesBytesCodec, codec_name="zlib"):
pass
class GZip(_NumcodecsBytesBytesCodec, codec_name="gzip"):
pass
class BZ2(_NumcodecsBytesBytesCodec, codec_name="bz2"):
pass
class LZMA(_NumcodecsBytesBytesCodec, codec_name="lzma"):
pass
class Shuffle(_NumcodecsBytesBytesCodec, codec_name="shuffle"):
def evolve_from_array_spec(self, array_spec: ArraySpec) -> Shuffle:
if self.codec_config.get("elementsize") is None:
dtype = array_spec.dtype.to_native_dtype()
return Shuffle(**{**self.codec_config, "elementsize": dtype.itemsize})
return self # pragma: no cover
# array-to-array codecs ("filters")
class Delta(_NumcodecsArrayArrayCodec, codec_name="delta"):
def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec:
if astype := self.codec_config.get("astype"):
dtype = parse_dtype(np.dtype(astype), zarr_format=3) # type: ignore[call-overload]
return replace(chunk_spec, dtype=dtype)
return chunk_spec
class BitRound(_NumcodecsArrayArrayCodec, codec_name="bitround"):
pass
class FixedScaleOffset(_NumcodecsArrayArrayCodec, codec_name="fixedscaleoffset"):
def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec:
if astype := self.codec_config.get("astype"):
dtype = parse_dtype(np.dtype(astype), zarr_format=3) # type: ignore[call-overload]
return replace(chunk_spec, dtype=dtype)
return chunk_spec
def evolve_from_array_spec(self, array_spec: ArraySpec) -> FixedScaleOffset:
if self.codec_config.get("dtype") is None:
dtype = array_spec.dtype.to_native_dtype()
return FixedScaleOffset(**{**self.codec_config, "dtype": str(dtype)})
return self
class Quantize(_NumcodecsArrayArrayCodec, codec_name="quantize"):
def __init__(self, **codec_config: JSON) -> None:
super().__init__(**codec_config)
def evolve_from_array_spec(self, array_spec: ArraySpec) -> Quantize:
if self.codec_config.get("dtype") is None:
dtype = array_spec.dtype.to_native_dtype()
return Quantize(**{**self.codec_config, "dtype": str(dtype)})
return self
class PackBits(_NumcodecsArrayArrayCodec, codec_name="packbits"):
def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec:
return replace(
chunk_spec,
shape=(1 + math.ceil(product(chunk_spec.shape) / 8),),
dtype=UInt8(),
)
# todo: remove this type: ignore when this class can be defined w.r.t.
# a single zarr dtype API
def validate(self, *, dtype: ZDType[Any, Any], **_kwargs: Any) -> None:
# this is bugged and will fail
_dtype = dtype.to_native_dtype()
if _dtype != np.dtype("bool"):
raise ValueError(f"Packbits filter requires bool dtype. Got {dtype}.")
class AsType(_NumcodecsArrayArrayCodec, codec_name="astype"):
def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec:
dtype = parse_dtype(np.dtype(self.codec_config["encode_dtype"]), zarr_format=3) # type: ignore[arg-type]
return replace(chunk_spec, dtype=dtype)
def evolve_from_array_spec(self, array_spec: ArraySpec) -> AsType:
if self.codec_config.get("decode_dtype") is None:
# TODO: remove these coverage exemptions the correct way, i.e. with tests
dtype = array_spec.dtype.to_native_dtype() # pragma: no cover
return AsType(**{**self.codec_config, "decode_dtype": str(dtype)}) # pragma: no cover
return self
# bytes-to-bytes checksum codecs
class _NumcodecsChecksumCodec(_NumcodecsBytesBytesCodec):
def compute_encoded_size(self, input_byte_length: int, chunk_spec: ArraySpec) -> int:
return input_byte_length + 4 # pragma: no cover
class CRC32(_NumcodecsChecksumCodec, codec_name="crc32"):
pass
class CRC32C(_NumcodecsChecksumCodec, codec_name="crc32c"):
pass
class Adler32(_NumcodecsChecksumCodec, codec_name="adler32"):
pass
class Fletcher32(_NumcodecsChecksumCodec, codec_name="fletcher32"):
pass
class JenkinsLookup3(_NumcodecsChecksumCodec, codec_name="jenkins_lookup3"):
pass
# array-to-bytes codecs
class PCodec(_NumcodecsArrayBytesCodec, codec_name="pcodec"):
pass
class ZFPY(_NumcodecsArrayBytesCodec, codec_name="zfpy"):
pass