-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathpipeline.py
More file actions
244 lines (213 loc) · 8.55 KB
/
pipeline.py
File metadata and controls
244 lines (213 loc) · 8.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
from __future__ import annotations
import asyncio
import json
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING, TypedDict
import numpy as np
from zarr.abc.codec import Codec, CodecPipeline
from zarr.core import BatchedCodecPipeline
from zarr.core.config import config
if TYPE_CHECKING:
from collections.abc import Generator, Iterable, Iterator
from typing import Any, Self
from zarr.abc.store import ByteGetter, ByteSetter
from zarr.core.array_spec import ArraySpec
from zarr.core.buffer import Buffer, NDArrayLike, NDBuffer
from zarr.core.chunk_grids import ChunkGrid
from zarr.core.common import ChunkCoords
from zarr.core.indexing import SelectorTuple
from ._internal import CodecPipelineImpl, codec_metadata_v2_to_v3
from .utils import (
CollapsedDimensionError,
DiscontiguousArrayError,
FillValueNoneError,
make_chunk_info_for_rust_with_indices,
)
class UnsupportedDataTypeError(Exception):
pass
class UnsupportedMetadataError(Exception):
pass
def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | None:
try:
return CodecPipelineImpl(
codec_metadata_json,
validate_checksums=config.get("codec_pipeline.validate_checksums", None),
chunk_concurrent_minimum=config.get(
"codec_pipeline.chunk_concurrent_minimum", None
),
chunk_concurrent_maximum=config.get(
"codec_pipeline.chunk_concurrent_maximum", None
),
num_threads=config.get("threading.max_workers", None),
)
except TypeError as e:
if re.match(r"codec (delta|zlib) is not supported", str(e)):
return None
else:
raise e
def codecs_to_dict(codecs: Iterable[Codec]) -> Generator[dict[str, Any], None, None]:
for codec in codecs:
if codec.__class__.__name__ == "V2Codec":
codec_dict = codec.to_dict()
if codec_dict.get("filters", None) is not None:
filters = [
json.dumps(filter.get_config())
for filter in codec_dict.get("filters")
]
else:
filters = None
if codec_dict.get("compressor", None) is not None:
compressor_json = codec_dict.get("compressor").get_config()
compressor = json.dumps(compressor_json)
else:
compressor = None
codecs_v3 = codec_metadata_v2_to_v3(filters, compressor)
for codec in codecs_v3:
yield json.loads(codec)
else:
yield codec.to_dict()
class ZarrsCodecPipelineState(TypedDict):
codec_metadata_json: str
codecs: tuple[Codec, ...]
@dataclass
class ZarrsCodecPipeline(CodecPipeline):
codecs: tuple[Codec, ...]
impl: CodecPipelineImpl | None
codec_metadata_json: str
python_impl: BatchedCodecPipeline
def __getstate__(self) -> ZarrsCodecPipelineState:
return {"codec_metadata_json": self.codec_metadata_json, "codecs": self.codecs}
def __setstate__(self, state: ZarrsCodecPipelineState):
self.codecs = state["codecs"]
self.codec_metadata_json = state["codec_metadata_json"]
self.impl = get_codec_pipeline_impl(self.codec_metadata_json)
self.python_impl = BatchedCodecPipeline.from_codecs(self.codecs)
def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
raise NotImplementedError("evolve_from_array_spec")
@classmethod
def from_codecs(cls, codecs: Iterable[Codec]) -> Self:
codec_metadata = list(codecs_to_dict(codecs))
codec_metadata_json = json.dumps(codec_metadata)
# TODO: upstream zarr-python has not settled on how to deal with configs yet
# Should they be checked when an array is created, or when an operation is performed?
# https://github.com/zarr-developers/zarr-python/issues/2409
# https://github.com/zarr-developers/zarr-python/pull/2429#issuecomment-2566976567
return cls(
codec_metadata_json=codec_metadata_json,
codecs=tuple(codecs),
impl=get_codec_pipeline_impl(codec_metadata_json),
python_impl=BatchedCodecPipeline.from_codecs(codecs),
)
@property
def supports_partial_decode(self) -> bool:
return False
@property
def supports_partial_encode(self) -> bool:
return False
def __iter__(self) -> Iterator[Codec]:
yield from self.codecs
def validate(
self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid
) -> None:
raise NotImplementedError("validate")
def compute_encoded_size(self, byte_length: int, array_spec: ArraySpec) -> int:
raise NotImplementedError("compute_encoded_size")
async def decode(
self,
chunk_bytes_and_specs: Iterable[tuple[Buffer | None, ArraySpec]],
) -> Iterable[NDBuffer | None]:
raise NotImplementedError("decode")
async def encode(
self,
chunk_arrays_and_specs: Iterable[tuple[NDBuffer | None, ArraySpec]],
) -> Iterable[Buffer | None]:
raise NotImplementedError("encode")
async def read(
self,
batch_info: Iterable[
tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]
],
out: NDBuffer, # type: ignore
drop_axes: tuple[int, ...] = (), # FIXME: unused
) -> None:
# FIXME: Error if array is not in host memory
if not out.dtype.isnative:
raise RuntimeError("Non-native byte order not supported")
try:
if self.impl is None:
raise UnsupportedMetadataError()
self._raise_error_on_unsupported_batch_dtype(batch_info)
chunks_desc = make_chunk_info_for_rust_with_indices(
batch_info, drop_axes, out.shape
)
except (
UnsupportedMetadataError,
DiscontiguousArrayError,
CollapsedDimensionError,
UnsupportedDataTypeError,
FillValueNoneError,
):
await self.python_impl.read(batch_info, out, drop_axes)
return None
else:
out: NDArrayLike = out.as_ndarray_like()
await asyncio.to_thread(
self.impl.retrieve_chunks_and_apply_index,
chunks_desc.chunk_info_with_indices,
out,
)
return None
async def write(
self,
batch_info: Iterable[
tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple, bool]
],
value: NDBuffer, # type: ignore
drop_axes: tuple[int, ...] = (),
) -> None:
try:
if self.impl is None:
raise UnsupportedMetadataError()
self._raise_error_on_unsupported_batch_dtype(batch_info)
chunks_desc = make_chunk_info_for_rust_with_indices(
batch_info, drop_axes, value.shape
)
except (
UnsupportedMetadataError,
DiscontiguousArrayError,
CollapsedDimensionError,
UnsupportedDataTypeError,
FillValueNoneError,
):
await self.python_impl.write(batch_info, value, drop_axes)
return None
else:
# FIXME: Error if array is not in host memory
value_np: NDArrayLike | np.ndarray = value.as_ndarray_like()
if not value_np.dtype.isnative:
value_np = np.ascontiguousarray(
value_np, dtype=value_np.dtype.newbyteorder("=")
)
elif not value_np.flags.c_contiguous:
value_np = np.ascontiguousarray(value_np)
await asyncio.to_thread(
self.impl.store_chunks_with_indices,
chunks_desc.chunk_info_with_indices,
value_np,
chunks_desc.write_empty_chunks,
)
return None
def _raise_error_on_unsupported_batch_dtype(
self,
batch_info: Iterable[
tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple, bool]
],
):
# https://github.com/LDeakin/zarrs/blob/0532fe983b7b42b59dbf84e50a2fe5e6f7bad4ce/zarrs_metadata/src/v2_to_v3.rs#L289-L293 for VSUMm
# Further, our pipeline does not support variable-length objects due to limitations on decode_into, so object/np.dtypes.StringDType is also out
if any(
info.dtype.kind in {"V", "S", "U", "M", "m", "O", "T"}
for (_, info, _, _, _) in batch_info
):
raise UnsupportedDataTypeError()