Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/3892.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `codec_class_map` and `codec_pipeline_class` fields to the runtime array configuration. This allows explicitly declaring the codec classes and codec pipeline class to use when reading an array, as well as dynamically swapping out the codec classes or the codec pipeline class on an existing `zarr.Array`.
5 changes: 4 additions & 1 deletion src/zarr/api/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,7 @@ async def open_array(
zarr_format: ZarrFormat | None = None,
path: PathLike = "",
storage_options: dict[str, Any] | None = None,
config: ArrayConfigLike | None = None,
**kwargs: Any, # TODO: type kwargs as valid args to save
) -> AnyAsyncArray:
"""Open an array using file-mode-like semantics.
Expand All @@ -1261,6 +1262,8 @@ async def open_array(
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.
config : ArrayConfigLike
Declaration of the runtime configuration for the array.
**kwargs
Any keyword arguments to pass to [`create`][zarr.api.asynchronous.create].

Expand All @@ -1279,7 +1282,7 @@ async def open_array(
_warn_write_empty_chunks_kwarg()

try:
return await AsyncArray.open(store_path, zarr_format=zarr_format)
return await AsyncArray.open(store_path, zarr_format=zarr_format, config=config)
except FileNotFoundError as err:
if not store_path.read_only and mode in _CREATE_MODES:
overwrite = _infer_overwrite(mode)
Expand Down
4 changes: 4 additions & 0 deletions src/zarr/api/synchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,7 @@ def open_array(
zarr_format: ZarrFormat | None = None,
path: PathLike = "",
storage_options: dict[str, Any] | None = None,
config: ArrayConfigLike | None = None,
**kwargs: Any,
) -> AnyArray:
"""Open an array using file-mode-like semantics.
Expand All @@ -1388,6 +1389,8 @@ def open_array(
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.
config : ArrayConfigLike
Declaration of the runtime configuration for the array.
**kwargs
Any keyword arguments to pass to [`create`][zarr.api.asynchronous.create].

Expand All @@ -1405,6 +1408,7 @@ def open_array(
zarr_format=zarr_format,
path=path,
storage_options=storage_options,
config=config,
**kwargs,
)
)
Expand Down
22 changes: 16 additions & 6 deletions src/zarr/codecs/sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
)
from zarr.codecs.bytes import BytesCodec
from zarr.codecs.crc32c_ import Crc32cCodec
from zarr.core.array_spec import ArrayConfig, ArraySpec
from zarr.core.array_spec import ArraySpec, ArraySpecConfig, parse_codec_class_map
from zarr.core.buffer import (
Buffer,
BufferPrototype,
Expand Down Expand Up @@ -319,10 +319,13 @@ def __init__(
codecs: Iterable[Codec | dict[str, JSON]] = (BytesCodec(),),
index_codecs: Iterable[Codec | dict[str, JSON]] = (BytesCodec(), Crc32cCodec()),
index_location: ShardingCodecIndexLocation | str = ShardingCodecIndexLocation.end,
codec_class_map: Mapping[str, type[Codec]] | None = None,
) -> None:
if codec_class_map is None:
codec_class_map = parse_codec_class_map(None)
chunk_shape_parsed = parse_shapelike(chunk_shape)
codecs_parsed = parse_codecs(codecs)
index_codecs_parsed = parse_codecs(index_codecs)
codecs_parsed = parse_codecs(codecs, codec_class_map=codec_class_map)
index_codecs_parsed = parse_codecs(index_codecs, codec_class_map=codec_class_map)
index_location_parsed = parse_index_location(index_location)

object.__setattr__(self, "chunk_shape", chunk_shape_parsed)
Expand All @@ -345,9 +348,16 @@ def __getstate__(self) -> dict[str, Any]:

def __setstate__(self, state: dict[str, Any]) -> None:
config = state["configuration"]
codec_class_map = parse_codec_class_map(None)
object.__setattr__(self, "chunk_shape", parse_shapelike(config["chunk_shape"]))
object.__setattr__(self, "codecs", parse_codecs(config["codecs"]))
object.__setattr__(self, "index_codecs", parse_codecs(config["index_codecs"]))
object.__setattr__(
self, "codecs", parse_codecs(config["codecs"], codec_class_map=codec_class_map)
)
object.__setattr__(
self,
"index_codecs",
parse_codecs(config["index_codecs"], codec_class_map=codec_class_map),
)
object.__setattr__(self, "index_location", parse_index_location(config["index_location"]))

# Use instance-local lru_cache to avoid memory leaks
Expand Down Expand Up @@ -737,7 +747,7 @@ def _get_index_chunk_spec(self, chunks_per_shard: tuple[int, ...]) -> ArraySpec:
shape=chunks_per_shard + (2,),
dtype=UInt64(endianness="little"),
fill_value=MAX_UINT_64,
config=ArrayConfig(
config=ArraySpecConfig(
order="C", write_empty_chunks=False
), # Note: this is hard-coded for simplicity -- it is not surfaced into user code,
prototype=default_buffer_prototype(),
Expand Down
73 changes: 58 additions & 15 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@
from zarr.codecs.vlen_utf8 import VLenBytesCodec, VLenUTF8Codec
from zarr.codecs.zstd import ZstdCodec
from zarr.core._info import ArrayInfo
from zarr.core.array_spec import ArrayConfig, ArrayConfigLike, ArraySpec, parse_array_config
from zarr.core.array_spec import (
ArrayConfig,
ArrayConfigLike,
ArraySpec,
ArraySpecConfig,
parse_array_config,
)
from zarr.core.attributes import Attributes
from zarr.core.buffer import (
BufferPrototype,
Expand Down Expand Up @@ -122,6 +128,7 @@
ChunkGridMetadata,
RectilinearChunkGridMetadata,
RegularChunkGridMetadata,
parse_codecs,
parse_node_type_array,
resolve_chunks,
)
Expand Down Expand Up @@ -197,13 +204,19 @@ def _chunk_sizes_from_shape(
return tuple(result)


def parse_array_metadata(data: Any) -> ArrayMetadata:
if isinstance(data, ArrayMetadata):
def parse_array_metadata(data: object, codec_class_map: Mapping[str, type[Codec]]) -> ArrayMetadata:
if isinstance(data, ArrayV3Metadata):
new_codecs = parse_codecs(
[c.to_dict() for c in data.codecs], codec_class_map=codec_class_map
)
return replace(data, codecs=new_codecs)
elif isinstance(data, ArrayV2Metadata):
# V2 arrays get their codecs from numcodecs, for now. the codec class map is not used.
return data
elif isinstance(data, dict):
zarr_format = data.get("zarr_format")
if zarr_format == 3:
meta_out = ArrayV3Metadata.from_dict(data)
meta_out = ArrayV3Metadata.from_dict(data, codec_class_map=codec_class_map)
if len(meta_out.storage_transformers) > 0:
msg = (
f"Array metadata contains storage transformers: {meta_out.storage_transformers}."
Expand All @@ -218,20 +231,31 @@ def parse_array_metadata(data: Any) -> ArrayMetadata:
raise TypeError # pragma: no cover


def create_codec_pipeline(metadata: ArrayMetadata, *, store: Store | None = None) -> CodecPipeline:
def create_codec_pipeline(
metadata: ArrayMetadata,
*,
store: Store | None = None,
config: ArrayConfig | None = None,
) -> CodecPipeline:
pipeline_class: type[CodecPipeline]
if config is not None:
pipeline_class = config.codec_pipeline_class
else:
pipeline_class = get_pipeline_class()

if store is not None:
try:
return get_pipeline_class().from_array_metadata_and_store(
return pipeline_class.from_array_metadata_and_store(
array_metadata=metadata, store=store
)
except NotImplementedError:
pass

if isinstance(metadata, ArrayV3Metadata):
return get_pipeline_class().from_codecs(metadata.codecs)
return pipeline_class.from_codecs(metadata.codecs)
elif isinstance(metadata, ArrayV2Metadata):
v2_codec = V2Codec(filters=metadata.filters, compressor=metadata.compressor)
return get_pipeline_class().from_codecs([v2_codec])
return pipeline_class.from_codecs([v2_codec])
raise TypeError # pragma: no cover


Expand Down Expand Up @@ -353,8 +377,10 @@ def __init__(
store_path: StorePath,
config: ArrayConfigLike | None = None,
) -> None:
metadata_parsed = parse_array_metadata(metadata)
config_parsed = parse_array_config(config)
metadata_parsed = parse_array_metadata(
metadata, codec_class_map=config_parsed.codec_class_map
)

object.__setattr__(self, "metadata", metadata_parsed)
object.__setattr__(self, "store_path", store_path)
Expand All @@ -363,7 +389,9 @@ def __init__(
object.__setattr__(
self,
"codec_pipeline",
create_codec_pipeline(metadata=metadata_parsed, store=store_path.store),
create_codec_pipeline(
metadata=metadata_parsed, store=store_path.store, config=config_parsed
),
)

# this overload defines the function signature when zarr_format is 2
Expand Down Expand Up @@ -779,6 +807,7 @@ def _create_metadata_v3(
codecs: Iterable[Codec | dict[str, JSON]] | None = None,
dimension_names: DimensionNamesLike = None,
attributes: dict[str, JSON] | None = None,
codec_class_map: Mapping[str, type[Codec]] | None = None,
) -> ArrayV3Metadata:
"""Create an instance of ArrayV3Metadata."""
filters: tuple[ArrayArrayCodec, ...]
Expand Down Expand Up @@ -816,6 +845,7 @@ def _create_metadata_v3(
codecs=codecs_parsed, # type: ignore[arg-type]
dimension_names=tuple(dimension_names) if dimension_names else None,
attributes=attributes or {},
codec_class_map=codec_class_map,
)

@classmethod
Expand Down Expand Up @@ -863,6 +893,7 @@ async def _create_v3(
codecs=codecs,
dimension_names=dimension_names,
attributes=attributes,
codec_class_map=config.codec_class_map,
)

array = cls(metadata=metadata, store_path=store_path, config=config)
Expand Down Expand Up @@ -987,14 +1018,18 @@ def from_dict(
ValueError
If the dictionary data is invalid or incompatible with either Zarr format 2 or 3 array creation.
"""
metadata = parse_array_metadata(data)
from zarr.core.array_spec import parse_codec_class_map

metadata = parse_array_metadata(data, codec_class_map=parse_codec_class_map(None))
return cls(metadata=metadata, store_path=store_path)

@classmethod
async def open(
cls,
store: StoreLike,
zarr_format: ZarrFormat | None = 3,
*,
config: ArrayConfigLike | None = None,
) -> AnyAsyncArray:
"""
Async method to open an existing Zarr array from a given store.
Expand All @@ -1007,6 +1042,8 @@ async def open(
for a description of all valid StoreLike values.
zarr_format : ZarrFormat | None, optional
The Zarr format version (default is 3).
config : ArrayConfigLike | None, (default is None)
Runtime configuration for the array.

Returns
-------
Expand Down Expand Up @@ -1038,7 +1075,7 @@ async def example():
metadata_dict = await get_array_metadata(store_path, zarr_format=zarr_format)
# TODO: remove this cast when we have better type hints
_metadata_dict = cast("ArrayMetadataJSON_V3", metadata_dict)
return cls(store_path=store_path, metadata=_metadata_dict)
return cls(store_path=store_path, metadata=_metadata_dict, config=config)

@property
def store(self) -> Store:
Expand Down Expand Up @@ -4704,7 +4741,7 @@ async def init_array(
chunk_key_encoding: ChunkKeyEncodingLike | None = None,
dimension_names: DimensionNamesLike = None,
overwrite: bool = False,
config: ArrayConfigLike | None = None,
config: ArrayConfig | None = None,
) -> AnyAsyncArray:
"""Create and persist an array metadata document.

Expand Down Expand Up @@ -4942,6 +4979,7 @@ async def init_array(
codecs=codecs_out,
dimension_names=dimension_names,
attributes=attributes,
codec_class_map=config.codec_class_map if config is not None else None,
)

arr = AsyncArray(metadata=meta, store_path=store_path, config=config)
Expand Down Expand Up @@ -5139,7 +5177,7 @@ async def create_array(
chunk_key_encoding=chunk_key_encoding,
dimension_names=dimension_names,
overwrite=overwrite,
config=config,
config=parse_array_config(config),
)


Expand Down Expand Up @@ -5769,11 +5807,16 @@ def _get_chunk_spec(
spec = chunk_grid[chunk_coords]
if spec is None:
raise IndexError(f"Chunk coordinates {chunk_coords} are out of bounds.")
spec_config = ArraySpecConfig(
order=array_config.order,
read_missing_chunks=array_config.read_missing_chunks,
write_empty_chunks=array_config.write_empty_chunks,
)
return ArraySpec(
shape=spec.codec_shape,
dtype=metadata.dtype,
fill_value=metadata.fill_value,
config=array_config,
config=spec_config,
prototype=prototype,
)

Expand Down
Loading
Loading