Skip to content

Commit d926e43

Browse files
authored
perf: add sync methods to codecs (#3721)
* add sync methods to codecs * Clarify memory conversion in blosc encoding Add comments to clarify memory conversion in encoding. * changelog
1 parent 974c06c commit d926e43

File tree

9 files changed

+157
-35
lines changed

9 files changed

+157
-35
lines changed

changes/3721.misc.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Adds synchronous (non-async) encoding and decoding methods to CPU-bound codecs. This is necessary for performance optimizations based on avoiding `asyncio` overhead. These new methods are described by a new protocol: `SupportsSyncCodec`.

src/zarr/abc/codec.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from abc import abstractmethod
44
from collections.abc import Mapping
5-
from typing import TYPE_CHECKING, Generic, TypeGuard, TypeVar
5+
from typing import TYPE_CHECKING, Generic, Protocol, TypeGuard, TypeVar, runtime_checkable
66

77
from typing_extensions import ReadOnly, TypedDict
88

@@ -32,6 +32,7 @@
3232
"CodecInput",
3333
"CodecOutput",
3434
"CodecPipeline",
35+
"SupportsSyncCodec",
3536
]
3637

3738
CodecInput = TypeVar("CodecInput", bound=NDBuffer | Buffer)
@@ -59,6 +60,23 @@ def _check_codecjson_v2(data: object) -> TypeGuard[CodecJSON_V2[str]]:
5960
"""The widest type of JSON-like input that could specify a codec."""
6061

6162

63+
@runtime_checkable
64+
class SupportsSyncCodec(Protocol):
65+
"""Protocol for codecs that support synchronous encode/decode.
66+
67+
Codecs implementing this protocol provide ``_decode_sync`` and ``_encode_sync``
68+
methods that perform encoding/decoding without requiring an async event loop.
69+
"""
70+
71+
def _decode_sync(
72+
self, chunk_data: NDBuffer | Buffer, chunk_spec: ArraySpec
73+
) -> NDBuffer | Buffer: ...
74+
75+
def _encode_sync(
76+
self, chunk_data: NDBuffer | Buffer, chunk_spec: ArraySpec
77+
) -> NDBuffer | Buffer | None: ...
78+
79+
6280
class BaseCodec(Metadata, Generic[CodecInput, CodecOutput]):
6381
"""Generic base class for codecs.
6482

src/zarr/codecs/blosc.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -299,28 +299,37 @@ def _blosc_codec(self) -> Blosc:
299299
config_dict["typesize"] = self.typesize
300300
return Blosc.from_config(config_dict)
301301

302+
def _decode_sync(
303+
self,
304+
chunk_bytes: Buffer,
305+
chunk_spec: ArraySpec,
306+
) -> Buffer:
307+
return as_numpy_array_wrapper(self._blosc_codec.decode, chunk_bytes, chunk_spec.prototype)
308+
302309
async def _decode_single(
303310
self,
304311
chunk_bytes: Buffer,
305312
chunk_spec: ArraySpec,
306313
) -> Buffer:
307-
return await asyncio.to_thread(
308-
as_numpy_array_wrapper, self._blosc_codec.decode, chunk_bytes, chunk_spec.prototype
309-
)
314+
return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec)
310315

311-
async def _encode_single(
316+
def _encode_sync(
312317
self,
313318
chunk_bytes: Buffer,
314319
chunk_spec: ArraySpec,
315320
) -> Buffer | None:
316321
# Since blosc only support host memory, we convert the input and output of the encoding
317322
# between numpy array and buffer
318-
return await asyncio.to_thread(
319-
lambda chunk: chunk_spec.prototype.buffer.from_bytes(
320-
self._blosc_codec.encode(chunk.as_numpy_array())
321-
),
322-
chunk_bytes,
323+
return chunk_spec.prototype.buffer.from_bytes(
324+
self._blosc_codec.encode(chunk_bytes.as_numpy_array())
323325
)
324326

327+
async def _encode_single(
328+
self,
329+
chunk_bytes: Buffer,
330+
chunk_spec: ArraySpec,
331+
) -> Buffer | None:
332+
return await asyncio.to_thread(self._encode_sync, chunk_bytes, chunk_spec)
333+
325334
def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
326335
raise NotImplementedError

src/zarr/codecs/bytes.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
6565
)
6666
return self
6767

68-
async def _decode_single(
68+
def _decode_sync(
6969
self,
7070
chunk_bytes: Buffer,
7171
chunk_spec: ArraySpec,
@@ -88,7 +88,14 @@ async def _decode_single(
8888
)
8989
return chunk_array
9090

91-
async def _encode_single(
91+
async def _decode_single(
92+
self,
93+
chunk_bytes: Buffer,
94+
chunk_spec: ArraySpec,
95+
) -> NDBuffer:
96+
return self._decode_sync(chunk_bytes, chunk_spec)
97+
98+
def _encode_sync(
9299
self,
93100
chunk_array: NDBuffer,
94101
chunk_spec: ArraySpec,
@@ -109,5 +116,12 @@ async def _encode_single(
109116
nd_array = nd_array.ravel().view(dtype="B")
110117
return chunk_spec.prototype.buffer.from_array_like(nd_array)
111118

119+
async def _encode_single(
120+
self,
121+
chunk_array: NDBuffer,
122+
chunk_spec: ArraySpec,
123+
) -> Buffer | None:
124+
return self._encode_sync(chunk_array, chunk_spec)
125+
112126
def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
113127
return input_byte_length

src/zarr/codecs/crc32c_.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def from_dict(cls, data: dict[str, JSON]) -> Self:
3131
def to_dict(self) -> dict[str, JSON]:
3232
return {"name": "crc32c"}
3333

34-
async def _decode_single(
34+
def _decode_sync(
3535
self,
3636
chunk_bytes: Buffer,
3737
chunk_spec: ArraySpec,
@@ -51,7 +51,14 @@ async def _decode_single(
5151
)
5252
return chunk_spec.prototype.buffer.from_array_like(inner_bytes)
5353

54-
async def _encode_single(
54+
async def _decode_single(
55+
self,
56+
chunk_bytes: Buffer,
57+
chunk_spec: ArraySpec,
58+
) -> Buffer:
59+
return self._decode_sync(chunk_bytes, chunk_spec)
60+
61+
def _encode_sync(
5562
self,
5663
chunk_bytes: Buffer,
5764
chunk_spec: ArraySpec,
@@ -64,5 +71,12 @@ async def _encode_single(
6471
# Append the checksum (as bytes) to the data
6572
return chunk_spec.prototype.buffer.from_array_like(np.append(data, checksum.view("B")))
6673

74+
async def _encode_single(
75+
self,
76+
chunk_bytes: Buffer,
77+
chunk_spec: ArraySpec,
78+
) -> Buffer | None:
79+
return self._encode_sync(chunk_bytes, chunk_spec)
80+
6781
def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
6882
return input_byte_length + 4

src/zarr/codecs/gzip.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import asyncio
44
from dataclasses import dataclass
5+
from functools import cached_property
56
from typing import TYPE_CHECKING
67

78
from numcodecs.gzip import GZip
@@ -48,23 +49,37 @@ def from_dict(cls, data: dict[str, JSON]) -> Self:
4849
def to_dict(self) -> dict[str, JSON]:
4950
return {"name": "gzip", "configuration": {"level": self.level}}
5051

52+
@cached_property
53+
def _gzip_codec(self) -> GZip:
54+
return GZip(self.level)
55+
56+
def _decode_sync(
57+
self,
58+
chunk_bytes: Buffer,
59+
chunk_spec: ArraySpec,
60+
) -> Buffer:
61+
return as_numpy_array_wrapper(self._gzip_codec.decode, chunk_bytes, chunk_spec.prototype)
62+
5163
async def _decode_single(
5264
self,
5365
chunk_bytes: Buffer,
5466
chunk_spec: ArraySpec,
5567
) -> Buffer:
56-
return await asyncio.to_thread(
57-
as_numpy_array_wrapper, GZip(self.level).decode, chunk_bytes, chunk_spec.prototype
58-
)
68+
return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec)
69+
70+
def _encode_sync(
71+
self,
72+
chunk_bytes: Buffer,
73+
chunk_spec: ArraySpec,
74+
) -> Buffer | None:
75+
return as_numpy_array_wrapper(self._gzip_codec.encode, chunk_bytes, chunk_spec.prototype)
5976

6077
async def _encode_single(
6178
self,
6279
chunk_bytes: Buffer,
6380
chunk_spec: ArraySpec,
6481
) -> Buffer | None:
65-
return await asyncio.to_thread(
66-
as_numpy_array_wrapper, GZip(self.level).encode, chunk_bytes, chunk_spec.prototype
67-
)
82+
return await asyncio.to_thread(self._encode_sync, chunk_bytes, chunk_spec)
6883

6984
def compute_encoded_size(
7085
self,

src/zarr/codecs/transpose.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,20 +95,34 @@ def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec:
9595
prototype=chunk_spec.prototype,
9696
)
9797

98-
async def _decode_single(
98+
def _decode_sync(
9999
self,
100100
chunk_array: NDBuffer,
101101
chunk_spec: ArraySpec,
102102
) -> NDBuffer:
103-
inverse_order = np.argsort(self.order)
103+
inverse_order = tuple(int(i) for i in np.argsort(self.order))
104104
return chunk_array.transpose(inverse_order)
105105

106-
async def _encode_single(
106+
async def _decode_single(
107+
self,
108+
chunk_array: NDBuffer,
109+
chunk_spec: ArraySpec,
110+
) -> NDBuffer:
111+
return self._decode_sync(chunk_array, chunk_spec)
112+
113+
def _encode_sync(
107114
self,
108115
chunk_array: NDBuffer,
109116
_chunk_spec: ArraySpec,
110117
) -> NDBuffer | None:
111118
return chunk_array.transpose(self.order)
112119

120+
async def _encode_single(
121+
self,
122+
chunk_array: NDBuffer,
123+
_chunk_spec: ArraySpec,
124+
) -> NDBuffer | None:
125+
return self._encode_sync(chunk_array, _chunk_spec)
126+
113127
def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
114128
return input_byte_length

src/zarr/codecs/vlen_utf8.py

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ def to_dict(self) -> dict[str, JSON]:
4040
def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
4141
return self
4242

43-
# TODO: expand the tests for this function
44-
async def _decode_single(
43+
def _decode_sync(
4544
self,
4645
chunk_bytes: Buffer,
4746
chunk_spec: ArraySpec,
@@ -55,7 +54,14 @@ async def _decode_single(
5554
as_string_dtype = decoded.astype(chunk_spec.dtype.to_native_dtype(), copy=False)
5655
return chunk_spec.prototype.nd_buffer.from_numpy_array(as_string_dtype)
5756

58-
async def _encode_single(
57+
async def _decode_single(
58+
self,
59+
chunk_bytes: Buffer,
60+
chunk_spec: ArraySpec,
61+
) -> NDBuffer:
62+
return self._decode_sync(chunk_bytes, chunk_spec)
63+
64+
def _encode_sync(
5965
self,
6066
chunk_array: NDBuffer,
6167
chunk_spec: ArraySpec,
@@ -65,6 +71,13 @@ async def _encode_single(
6571
_vlen_utf8_codec.encode(chunk_array.as_numpy_array())
6672
)
6773

74+
async def _encode_single(
75+
self,
76+
chunk_array: NDBuffer,
77+
chunk_spec: ArraySpec,
78+
) -> Buffer | None:
79+
return self._encode_sync(chunk_array, chunk_spec)
80+
6881
def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
6982
# what is input_byte_length for an object dtype?
7083
raise NotImplementedError("compute_encoded_size is not implemented for VLen codecs")
@@ -86,7 +99,7 @@ def to_dict(self) -> dict[str, JSON]:
8699
def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
87100
return self
88101

89-
async def _decode_single(
102+
def _decode_sync(
90103
self,
91104
chunk_bytes: Buffer,
92105
chunk_spec: ArraySpec,
@@ -99,7 +112,14 @@ async def _decode_single(
99112
decoded = _reshape_view(decoded, chunk_spec.shape)
100113
return chunk_spec.prototype.nd_buffer.from_numpy_array(decoded)
101114

102-
async def _encode_single(
115+
async def _decode_single(
116+
self,
117+
chunk_bytes: Buffer,
118+
chunk_spec: ArraySpec,
119+
) -> NDBuffer:
120+
return self._decode_sync(chunk_bytes, chunk_spec)
121+
122+
def _encode_sync(
103123
self,
104124
chunk_array: NDBuffer,
105125
chunk_spec: ArraySpec,
@@ -109,6 +129,13 @@ async def _encode_single(
109129
_vlen_bytes_codec.encode(chunk_array.as_numpy_array())
110130
)
111131

132+
async def _encode_single(
133+
self,
134+
chunk_array: NDBuffer,
135+
chunk_spec: ArraySpec,
136+
) -> Buffer | None:
137+
return self._encode_sync(chunk_array, chunk_spec)
138+
112139
def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
113140
# what is input_byte_length for an object dtype?
114141
raise NotImplementedError("compute_encoded_size is not implemented for VLen codecs")

src/zarr/codecs/zstd.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def parse_checksum(data: JSON) -> bool:
3838
class ZstdCodec(BytesBytesCodec):
3939
"""zstd codec"""
4040

41-
is_fixed_size = True
41+
is_fixed_size = False
4242

4343
level: int = 0
4444
checksum: bool = False
@@ -71,23 +71,33 @@ def _zstd_codec(self) -> Zstd:
7171
config_dict = {"level": self.level, "checksum": self.checksum}
7272
return Zstd.from_config(config_dict)
7373

74+
def _decode_sync(
75+
self,
76+
chunk_bytes: Buffer,
77+
chunk_spec: ArraySpec,
78+
) -> Buffer:
79+
return as_numpy_array_wrapper(self._zstd_codec.decode, chunk_bytes, chunk_spec.prototype)
80+
7481
async def _decode_single(
7582
self,
7683
chunk_bytes: Buffer,
7784
chunk_spec: ArraySpec,
7885
) -> Buffer:
79-
return await asyncio.to_thread(
80-
as_numpy_array_wrapper, self._zstd_codec.decode, chunk_bytes, chunk_spec.prototype
81-
)
86+
return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec)
87+
88+
def _encode_sync(
89+
self,
90+
chunk_bytes: Buffer,
91+
chunk_spec: ArraySpec,
92+
) -> Buffer | None:
93+
return as_numpy_array_wrapper(self._zstd_codec.encode, chunk_bytes, chunk_spec.prototype)
8294

8395
async def _encode_single(
8496
self,
8597
chunk_bytes: Buffer,
8698
chunk_spec: ArraySpec,
8799
) -> Buffer | None:
88-
return await asyncio.to_thread(
89-
as_numpy_array_wrapper, self._zstd_codec.encode, chunk_bytes, chunk_spec.prototype
90-
)
100+
return await asyncio.to_thread(self._encode_sync, chunk_bytes, chunk_spec)
91101

92102
def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
93103
raise NotImplementedError

0 commit comments

Comments
 (0)