Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ dependencies = [
"boto3>=1.34.0",
"pyproj>=3.7.0",
"structlog>=25.5.0",
"cast-value>=0.2.1",
"cast-value-rs>=0.4.0",
]

[dependency-groups]
Expand Down
6 changes: 6 additions & 0 deletions src/eopf_geozarr/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,11 @@ def add_s2_optimization_commands(subparsers: argparse._SubParsersAction) -> None
scale-offset encoding will be re-saved as the decoded data type, i.e. floating point values.
""",
)
s2_parser.add_argument(
"--experimental-scale-offset-codec",
action="store_true",
help="Push CF scale-offset encoding into zarr codec pipeline instead of decoding to float.",
)
s2_parser.add_argument(
"--dask-cluster",
action="store_true",
Expand Down Expand Up @@ -1197,6 +1202,7 @@ def convert_s2_optimized_command(args: argparse.Namespace) -> None:
compression_level=args.compression_level,
validate_output=not args.skip_validation,
keep_scale_offset=args.keep_scale_offset,
experimental_scale_offset_codec=args.experimental_scale_offset_codec,
)

log.info("✅ S2 optimization completed", output_path=args.output_path)
Expand Down
5 changes: 5 additions & 0 deletions src/eopf_geozarr/codecs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from zarr.registry import register_codec

from eopf_geozarr.codecs.scale_offset import ScaleOffset

register_codec("scale_offset", ScaleOffset)
110 changes: 110 additions & 0 deletions src/eopf_geozarr/codecs/scale_offset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""
Zarr V3 array-to-array codec implementing the scale_offset extension.

Encode: out = (in - offset) * scale
Decode: out = (in / scale) + offset
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Self

from zarr.abc.codec import ArrayArrayCodec
from zarr.core.common import JSON, parse_named_configuration

if TYPE_CHECKING:
from zarr.core.array_spec import ArraySpec
from zarr.core.chunk_grids import ChunkGrid
from zarr.core.dtype import ZDType
from zarr.core.dtype.wrapper import TBaseDType, TBaseScalar
from zarr.core.ndbuffer import NDBuffer


@dataclass(frozen=True)
class ScaleOffset(ArrayArrayCodec):
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not make a stand-alone library for this codec, because it is so simple.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok but will it live in zarr-python or will users have to import eopf-geozarr to use the scale_offset?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the current state users have to import eopf-geozarr, but I don't like that outcome. I will see if we can get this into zarr-python

"""Array-to-array codec that applies a linear scale and offset transformation."""

is_fixed_size = True

offset: float
scale: float

def __init__(self, *, offset: float = 0.0, scale: float = 1.0) -> None:
object.__setattr__(self, "offset", float(offset))
object.__setattr__(self, "scale", float(scale))

@classmethod
def from_dict(cls, data: dict[str, JSON]) -> Self:
_, configuration_parsed = parse_named_configuration(data, "scale_offset")
return cls(**configuration_parsed)

def to_dict(self) -> dict[str, JSON]:
return {
"name": "scale_offset",
"configuration": {"offset": self.offset, "scale": self.scale},
}

def validate(
self,
shape: tuple[int, ...],
dtype: ZDType[TBaseDType, TBaseScalar],
chunk_grid: ChunkGrid,
) -> None:
pass

def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
return self

def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec:
return chunk_spec

async def _decode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
return self._decode_sync(chunk_array, chunk_spec)

def _decode_sync(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
data = chunk_array.as_numpy_array()
decoded = (data / self.scale) + self.offset
return chunk_spec.prototype.nd_buffer.from_numpy_array(decoded)

async def _encode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> NDBuffer | None:
return self._encode_sync(chunk_array, chunk_spec)

def _encode_sync(
self,
chunk_array: NDBuffer,
_chunk_spec: ArraySpec,
) -> NDBuffer | None:
data = chunk_array.as_numpy_array()
encoded = (data - self.offset) * self.scale
return chunk_array.from_numpy_array(encoded)

def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
return input_byte_length


def scale_offset_from_cf(*, scale_factor: float, add_offset: float) -> ScaleOffset:
"""
Convert CF-convention scale_factor and add_offset to a ScaleOffset codec.

CF convention: unpacked = packed * scale_factor + add_offset

ScaleOffset convention:
encode: out = (in - offset) * scale
decode: out = (in / scale) + offset

To match CF: offset = add_offset, scale = 1 / scale_factor.
"""
return ScaleOffset(offset=add_offset, scale=1.0 / scale_factor)
3 changes: 3 additions & 0 deletions src/eopf_geozarr/s2_optimization/s2_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def convert_s2_optimized(
compression_level: int,
validate_output: bool,
keep_scale_offset: bool,
experimental_scale_offset_codec: bool = False,
max_retries: int = 3,
) -> xr.DataTree:
"""
Expand All @@ -199,6 +200,7 @@ def convert_s2_optimized(
compression_level: Compression level 1-9
validate_output: Whether to validate the output
keep_scale_offset: Whether to preserve scale-offset encoding of the source data.
experimental_scale_offset_codec: Push CF scale-offset into zarr codec pipeline.
max_retries: Maximum number of retries for network operations

Returns:
Expand Down Expand Up @@ -234,6 +236,7 @@ def convert_s2_optimized(
enable_sharding=enable_sharding,
crs=crs,
keep_scale_offset=keep_scale_offset,
experimental_scale_offset_codec=experimental_scale_offset_codec,
)

log.info("Created multiscale pyramids", num_groups=len(datasets))
Expand Down
29 changes: 28 additions & 1 deletion src/eopf_geozarr/s2_optimization/s2_multiscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import numpy as np
import structlog
import xarray as xr
from cast_value import CastValueRustV1
from dask import delayed
from dask.array import from_delayed
from pydantic.experimental.missing_sentinel import MISSING
Expand Down Expand Up @@ -170,6 +171,7 @@ def create_multiscale_from_datatree(
spatial_chunk: int,
crs: CRS | None = None,
keep_scale_offset: bool,
experimental_scale_offset_codec: bool = False,
) -> dict[str, dict]:
"""
Create multiscale versions preserving original structure.
Expand Down Expand Up @@ -239,6 +241,7 @@ def create_multiscale_from_datatree(
spatial_chunk=spatial_chunk,
enable_sharding=enable_sharding,
keep_scale_offset=keep_scale_offset,
experimental_scale_offset_codec=experimental_scale_offset_codec,
)
# convert float64 arrays to float32
for data_var in dataset.data_vars:
Expand Down Expand Up @@ -300,6 +303,7 @@ def create_multiscale_from_datatree(
spatial_chunk=spatial_chunk,
enable_sharding=enable_sharding,
keep_scale_offset=keep_scale_offset,
experimental_scale_offset_codec=experimental_scale_offset_codec,
)

# Strip _FillValue from DataArray encoding for downsampled levels too
Expand Down Expand Up @@ -343,6 +347,7 @@ def create_measurements_encoding(
spatial_chunk: int,
enable_sharding: bool = True,
keep_scale_offset: bool = True,
experimental_scale_offset_codec: bool = False,
) -> dict[str, XarrayDataArrayEncoding]:
"""
Create optimized encoding for a pyramid level with advanced chunking and sharding.
Expand Down Expand Up @@ -390,7 +395,29 @@ def create_measurements_encoding(
# Forward-propagate the existing encoding, minus keys that should be omitted
keep_keys = XARRAY_ENCODING_KEYS - {"compressors", "shards", "chunks"}

if not keep_scale_offset:
if experimental_scale_offset_codec and not keep_scale_offset:
# Push CF scale-offset into the zarr codec pipeline instead of
# decoding to float. The data stays as packed integers on disk,
# but zarr transparently decodes on read.
scale_factor = var_data.encoding.get("scale_factor")
add_offset = var_data.encoding.get("add_offset")
packed_dtype = var_data.encoding.get("dtype")

if scale_factor is not None and add_offset is not None and packed_dtype is not None:
from eopf_geozarr.codecs.scale_offset import scale_offset_from_cf

so_codec = scale_offset_from_cf(
scale_factor=float(scale_factor), add_offset=float(add_offset)
)
cv_codec = CastValueRustV1(
data_type=np.dtype(packed_dtype).name, rounding="nearest-even"
)
var_encoding["filters"] = (so_codec, cv_codec)

# Strip CF keys — the codecs handle encoding/decoding now
keep_keys = keep_keys - CF_SCALE_OFFSET_KEYS - {"_FillValue"}
var_encoding["fill_value"] = float("nan")
elif not keep_scale_offset:
# When stripping scale/offset, also strip _FillValue since the original
# _FillValue is in raw integer units and meaningless for decoded float data.
keep_keys = keep_keys - CF_SCALE_OFFSET_KEYS - {"_FillValue"}
Expand Down
37 changes: 37 additions & 0 deletions tests/test_s2_multiscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,43 @@ def test_calculate_simple_shard_dimensions() -> None:
assert shard_dims[1] == 768 # 3 * 256 = 768


def test_create_measurements_encoding_experimental_scale_offset_codec() -> None:
"""Test that experimental_scale_offset_codec adds ScaleOffset + CastValue filters."""
# Create a dataset with CF-style scale-offset encoding, as xarray would
# produce when reading a CF-encoded zarr/netCDF variable.
data = xr.DataArray(
np.arange(0, 100, dtype="float64").reshape(10, 10),
dims=["y", "x"],
)
data.encoding = {
"scale_factor": 0.01,
"add_offset": 273.15,
"dtype": np.dtype("int16"),
}
ds = xr.Dataset({"temperature": data})

encoding = create_measurements_encoding(
ds,
enable_sharding=True,
spatial_chunk=256,
keep_scale_offset=False,
experimental_scale_offset_codec=True,
)

from eopf_geozarr.codecs.scale_offset import ScaleOffset

var_encoding = encoding["temperature"]
assert "filters" in var_encoding
filters = var_encoding["filters"]
assert len(filters) == 2
assert isinstance(filters[0], ScaleOffset)
assert filters[0].offset == 273.15
assert filters[0].scale == 1.0 / 0.01
# CF keys should be stripped from the encoding
assert "scale_factor" not in var_encoding
assert "add_offset" not in var_encoding


@pytest.mark.parametrize("keep_scale_offset", [True, False])
def test_create_measurements_encoding(keep_scale_offset: bool, sample_dataset: xr.Dataset) -> None:
"""Test measurements encoding creation with xy-aligned sharding."""
Expand Down
86 changes: 86 additions & 0 deletions tests/test_scale_offset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Test that the ScaleOffset codec combined with the CastValueRustV1 codec from the
# cast_value library works correctly. Correctness is measured by the ability of
# these two codecs to define a procedure that saves an array of floats ranging from -1000 to 1000
# as uint16 array. The offset should be the minimum value of the array, but the scale can be 1 in
# this case.

import numpy as np
import zarr
import zarr.storage
from cast_value import CastValueRustV1

from eopf_geozarr.codecs.scale_offset import ScaleOffset, scale_offset_from_cf


def test_scale_offset_with_cast_value() -> None:
"""
Round-trip test: write float64 data through ScaleOffset + CastValueRustV1,
verify it is stored as uint16, and read back as the original float64 values.
"""
data = np.linspace(-1000, 1000, 2001, dtype="float64")
offset = float(data.min()) # -1000.0
scale = 1.0

store = zarr.storage.MemoryStore()
arr = zarr.open_array(
store,
mode="w",
shape=data.shape,
dtype="float64",
codecs=[
ScaleOffset(offset=offset, scale=scale),
CastValueRustV1(data_type="uint16", rounding="nearest-even"),
zarr.codecs.BytesCodec(),
],
)

arr[:] = data
result = arr[:]

np.testing.assert_array_almost_equal(result, data)


def test_cf_scale_offset_pushed_into_codecs() -> None:
"""
Given CF-convention scale_factor and add_offset, generate a ScaleOffset codec
that replicates the CF behavior at the zarr chunk level, paired with a
CastValueRustV1 codec for the packed integer dtype.

CF convention: unpacked = packed * scale_factor + add_offset
"""
scale_factor = 0.01
add_offset = 273.15
packed_dtype = "int16"

# Build the "unpacked" (decoded) float data that the user sees
packed_values = np.arange(-1000, 1001, dtype=packed_dtype)
unpacked_values = packed_values * scale_factor + add_offset

# Generate the ScaleOffset codec from CF parameters
so_codec = scale_offset_from_cf(scale_factor=scale_factor, add_offset=add_offset)
cv_codec = CastValueRustV1(data_type=packed_dtype, rounding="nearest-even")

# Write the unpacked float data through the codec pipeline
store = zarr.storage.MemoryStore()
arr = zarr.open_array(
store,
mode="w",
shape=unpacked_values.shape,
dtype=unpacked_values.dtype,
codecs=[so_codec, cv_codec, zarr.codecs.BytesCodec()],
)

arr[:] = unpacked_values
result = arr[:]

# The round-trip should recover the original unpacked floats
np.testing.assert_array_almost_equal(result, unpacked_values)


def test_scale_offset_from_dict_round_trip() -> None:
"""ScaleOffset.to_dict / from_dict should round-trip."""
codec = ScaleOffset(offset=273.15, scale=100.0)
d = codec.to_dict()
restored = ScaleOffset.from_dict(d)
assert restored.offset == codec.offset
assert restored.scale == codec.scale
Loading
Loading