forked from zarr-developers/zarr-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_v2.py
More file actions
155 lines (121 loc) · 5.45 KB
/
_v2.py
File metadata and controls
155 lines (121 loc) · 5.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from typing import TYPE_CHECKING, ClassVar, Self, TypeGuard
import numpy as np
from numcodecs.compat import ensure_bytes, ensure_ndarray_like
from typing_extensions import Protocol
from zarr.abc.codec import ArrayBytesCodec, CodecJSON_V2
from zarr.registry import get_ndbuffer_class
if TYPE_CHECKING:
from zarr.core.array_spec import ArraySpec
from zarr.core.buffer import Buffer, NDBuffer
class Numcodec(Protocol):
"""
A protocol that models the ``numcodecs.abc.Codec`` interface.
"""
codec_id: ClassVar[str]
def encode(self, buf: Buffer | NDBuffer) -> Buffer | NDBuffer: ...
def decode(
self, buf: Buffer | NDBuffer, out: Buffer | NDBuffer | None = None
) -> Buffer | NDBuffer: ...
def get_config(self) -> CodecJSON_V2[str]: ...
@classmethod
def from_config(cls, config: CodecJSON_V2[str]) -> Self: ...
def _is_numcodec(obj: object) -> TypeGuard[Numcodec]:
"""
Check if the given object implements the Numcodec protocol.
The @runtime_checkable decorator does not allow issubclass checks for protocols with non-method
members (i.e., attributes), so we use this function to manually check for the presence of the
required attributes and methods on a given object.
"""
return _is_numcodec_cls(type(obj))
def _is_numcodec_cls(obj: object) -> TypeGuard[type[Numcodec]]:
"""
Check if the given object is a class implements the Numcodec protocol.
The @runtime_checkable decorator does not allow issubclass checks for protocols with non-method
members (i.e., attributes), so we use this function to manually check for the presence of the
required attributes and methods on a given object.
"""
return (
isinstance(obj, type)
and hasattr(obj, "codec_id")
and isinstance(obj.codec_id, str)
and hasattr(obj, "encode")
and callable(obj.encode)
and hasattr(obj, "decode")
and callable(obj.decode)
and hasattr(obj, "get_config")
and callable(obj.get_config)
and hasattr(obj, "from_config")
and callable(obj.from_config)
)
@dataclass(frozen=True)
class V2Codec(ArrayBytesCodec):
filters: tuple[Numcodec, ...] | None
compressor: Numcodec | None
is_fixed_size = False
async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
cdata = chunk_bytes.as_array_like()
# decompress
if self.compressor:
chunk = await asyncio.to_thread(self.compressor.decode, cdata) # type: ignore[arg-type]
else:
chunk = cdata # type: ignore[assignment]
# apply filters
if self.filters:
for f in reversed(self.filters):
chunk = await asyncio.to_thread(f.decode, chunk)
# view as numpy array with correct dtype
chunk = ensure_ndarray_like(chunk)
# special case object dtype, because incorrect handling can lead to
# segfaults and other bad things happening
if chunk_spec.dtype.dtype_cls is not np.dtypes.ObjectDType:
try:
chunk = chunk.view(chunk_spec.dtype.to_native_dtype())
except TypeError:
# this will happen if the dtype of the chunk
# does not match the dtype of the array spec i.g. if
# the dtype of the chunk_spec is a string dtype, but the chunk
# is an object array. In this case, we need to convert the object
# array to the correct dtype.
chunk = np.array(chunk).astype(chunk_spec.dtype.to_native_dtype()) # type: ignore[assignment]
elif chunk.dtype != object:
# If we end up here, someone must have hacked around with the filters.
# We cannot deal with object arrays unless there is an object
# codec in the filter chain, i.e., a filter that converts from object
# array to something else during encoding, and converts back to object
# array during decoding.
raise RuntimeError("cannot read object array without object codec")
# ensure correct chunk shape
chunk = chunk.reshape(-1, order="A")
chunk = chunk.reshape(chunk_spec.shape, order=chunk_spec.order)
return get_ndbuffer_class().from_ndarray_like(chunk)
async def _encode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
chunk = chunk_array.as_ndarray_like()
# ensure contiguous and correct order
chunk = chunk.astype(chunk_spec.dtype.to_native_dtype(), order=chunk_spec.order, copy=False)
# apply filters
if self.filters:
for f in self.filters:
chunk = await asyncio.to_thread(f.encode, chunk) # type: ignore[arg-type]
# check object encoding
if ensure_ndarray_like(chunk).dtype == object:
raise RuntimeError("cannot write object array without object codec")
# compress
if self.compressor:
cdata = await asyncio.to_thread(self.compressor.encode, chunk) # type: ignore[arg-type]
else:
cdata = chunk # type: ignore[assignment]
cdata = ensure_bytes(cdata)
return chunk_spec.prototype.buffer.from_bytes(cdata)
def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
raise NotImplementedError