-
-
Notifications
You must be signed in to change notification settings - Fork 398
Expand file tree
/
Copy pathtranspose.py
More file actions
123 lines (100 loc) · 4.45 KB
/
transpose.py
File metadata and controls
123 lines (100 loc) · 4.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from __future__ import annotations
from collections.abc import Iterable
from dataclasses import dataclass, replace
from typing import TYPE_CHECKING, cast
import numpy as np
from zarr.abc.codec import ArrayArrayCodec
from zarr.core.array_spec import ArraySpec
from zarr.core.common import JSON, parse_named_configuration
if TYPE_CHECKING:
from typing import Self
from zarr.core.buffer import NDBuffer
from zarr.core.chunk_grids import ChunkGrid
from zarr.core.dtype.wrapper import TBaseDType, TBaseScalar, ZDType
from zarr.core.indexing import SelectorTuple
def parse_transpose_order(data: JSON | Iterable[int]) -> tuple[int, ...]:
if not isinstance(data, Iterable):
raise TypeError(f"Expected an iterable. Got {data} instead.")
if not all(isinstance(a, int) for a in data):
raise TypeError(f"Expected an iterable of integers. Got {data} instead.")
return tuple(cast("Iterable[int]", data))
@dataclass(frozen=True)
class TransposeCodec(ArrayArrayCodec):
"""Transpose codec"""
is_fixed_size = True
order: tuple[int, ...]
def __init__(self, *, order: Iterable[int]) -> None:
order_parsed = parse_transpose_order(order)
object.__setattr__(self, "order", order_parsed)
@classmethod
def from_dict(cls, data: dict[str, JSON]) -> Self:
_, configuration_parsed = parse_named_configuration(data, "transpose")
return cls(**configuration_parsed) # type: ignore[arg-type]
def to_dict(self) -> dict[str, JSON]:
return {"name": "transpose", "configuration": {"order": tuple(self.order)}}
def validate(
self,
shape: tuple[int, ...],
dtype: ZDType[TBaseDType, TBaseScalar],
chunk_grid: ChunkGrid,
) -> None:
if len(self.order) != len(shape):
raise ValueError(
f"The `order` tuple must have as many entries as there are dimensions in the array. Got {self.order}."
)
if len(self.order) != len(set(self.order)):
raise ValueError(
f"There must not be duplicates in the `order` tuple. Got {self.order}."
)
if not all(0 <= x < len(shape) for x in self.order):
raise ValueError(
f"All entries in the `order` tuple must be between 0 and the number of dimensions in the array. Got {self.order}."
)
def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
ndim = array_spec.ndim
if len(self.order) != ndim:
raise ValueError(
f"The `order` tuple must have as many entries as there are dimensions in the array. Got {self.order}."
)
if len(self.order) != len(set(self.order)):
raise ValueError(
f"There must not be duplicates in the `order` tuple. Got {self.order}."
)
if not all(0 <= x < ndim for x in self.order):
raise ValueError(
f"All entries in the `order` tuple must be between 0 and the number of dimensions in the array. Got {self.order}."
)
order = tuple(self.order)
if order != self.order:
return replace(self, order=order)
return self
def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec:
return ArraySpec(
shape=tuple(chunk_spec.shape[self.order[i]] for i in range(chunk_spec.ndim)),
dtype=chunk_spec.dtype,
fill_value=chunk_spec.fill_value,
config=chunk_spec.config,
prototype=chunk_spec.prototype,
)
def resolve_selection(self, selection: SelectorTuple) -> SelectorTuple | None:
# Decode applies transpose(inverse_order) where inverse_order = argsort(order).
# decoded[i] = encoded[inverse_order[i]], so to go back:
# encoded[j] = decoded[order[j]]
if isinstance(selection, tuple):
return tuple(selection[self.order[j]] for j in range(len(selection)))
return selection
async def _decode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
inverse_order = np.argsort(self.order)
return chunk_array.transpose(inverse_order)
async def _encode_single(
self,
chunk_array: NDBuffer,
_chunk_spec: ArraySpec,
) -> NDBuffer | None:
return chunk_array.transpose(self.order)
def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
return input_byte_length