diff --git a/pyproject.toml b/pyproject.toml index 8df6bb9bf..f90ea87bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -201,8 +201,8 @@ readthedocs = "rm -rf $READTHEDOCS_OUTPUT/html && cp -r docs/_build/html $READTH # Define commands to run within the docs environment [tool.pixi.feature.minio.tasks] -run-tests = { cmd = "pytest virtualizarr/tests/test_manifests/test_store.py --run-minio-tests --verbose" } -run-tests-xml-cov = { cmd = "pytest virtualizarr/tests/test_manifests/test_store.py --run-minio-tests --verbose --cov-report=xml" } +run-tests = { cmd = "pytest virtualizarr/tests/test_manifests/test_store.py virtualizarr/tests/test_readers/test_hdf/test_hdf_manifest_store.py --run-minio-tests --run-network-tests --verbose" } +run-tests-xml-cov = { cmd = "pytest virtualizarr/tests/test_manifests/test_store.py virtualizarr/tests/test_readers/test_hdf/test_hdf_manifest_store.py --run-minio-tests --run-network-tests --verbose --cov-report=xml" } [tool.setuptools_scm] fallback_version = "9999" @@ -227,6 +227,7 @@ module = [ "numcodecs.*", "ujson", "zarr", + "requests", ] ignore_missing_imports = true @@ -272,7 +273,6 @@ line-ending = "auto" known-first-party = ["virtualizarr"] [tool.coverage.run] -include = ["virtualizarr/"] omit = ["conftest.py", "virtualizarr/tests/*"] [tool.coverage.report] diff --git a/virtualizarr/manifests/__init__.py b/virtualizarr/manifests/__init__.py index 787bb23be..5fc8f2575 100644 --- a/virtualizarr/manifests/__init__.py +++ b/virtualizarr/manifests/__init__.py @@ -4,4 +4,4 @@ from virtualizarr.manifests.array import ManifestArray # type: ignore # noqa from virtualizarr.manifests.group import ManifestGroup # type: ignore # noqa from virtualizarr.manifests.manifest import ChunkEntry, ChunkManifest # type: ignore # noqa -from virtualizarr.manifests.store import ManifestStore # type: ignore # noqa +from virtualizarr.manifests.store import ManifestStore, ObjectStoreRegistry # type: ignore # noqa diff --git a/virtualizarr/manifests/store.py b/virtualizarr/manifests/store.py index 9100019ee..ccc00b690 100644 --- a/virtualizarr/manifests/store.py +++ b/virtualizarr/manifests/store.py @@ -1,8 +1,9 @@ from __future__ import annotations import pickle -from collections.abc import Iterable -from typing import TYPE_CHECKING, Any, Mapping +from collections.abc import AsyncGenerator, Iterable +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any from urllib.parse import urlparse from zarr.abc.store import ( @@ -12,20 +13,24 @@ Store, SuffixByteRequest, ) -from zarr.core.buffer import Buffer +from zarr.core.buffer import Buffer, default_buffer_prototype from zarr.core.buffer.core import BufferPrototype from virtualizarr.manifests.array import ManifestArray from virtualizarr.manifests.group import ManifestGroup +from virtualizarr.vendor.zarr.metadata import dict_to_buffer if TYPE_CHECKING: - from collections.abc import AsyncGenerator, Iterable + from collections.abc import AsyncGenerator, Iterable, Mapping from typing import Any + import xarray as xr + from obstore.store import ( + ObjectStore, # type: ignore[import-not-found] + ) from zarr.core.buffer import BufferPrototype from zarr.core.common import BytesLike - __all__ = ["ManifestStore"] @@ -35,21 +40,6 @@ NotADirectoryError, ) -from collections.abc import AsyncGenerator -from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, TypeAlias - -from zarr.core.buffer import default_buffer_prototype - -from virtualizarr.vendor.zarr.metadata import dict_to_buffer - -if TYPE_CHECKING: - from obstore.store import ObjectStore # type: ignore[import-not-found] - - StoreDict: TypeAlias = dict[str, ObjectStore] - - import xarray as xr - @dataclass class StoreRequest: @@ -137,34 +127,83 @@ def parse_manifest_index(key: str, chunk_key_encoding: str = ".") -> tuple[int, return tuple(int(ind) for ind in parts[1].split(chunk_key_encoding)) -def find_matching_store(stores: StoreDict, request_key: str) -> StoreRequest: - """ - Find the matching store based on the store keys and the beginning of the URI strings, - to fetch data from the appropriately configured ObjectStore. +def _find_bucket_region(bucket_name: str) -> str: + import requests - Parameters: - ----------- - stores : StoreDict - A dictionary with URI prefixes for different stores as keys - request_key : str - A string to match against the dictionary keys + resp = requests.head(f"https://{bucket_name}.s3.amazonaws.com") + return resp.headers["x-amz-bucket-region"] - Returns: - -------- - StoreRequest + +def default_object_store(filepath: str) -> ObjectStore: + import obstore as obs + + parsed = urlparse(filepath) + + if parsed.scheme in ["", "file"]: + return obs.store.LocalStore() + if parsed.scheme == "s3": + bucket = parsed.netloc + return obs.store.S3Store( + bucket=bucket, + client_options={"allow_http": True}, + skip_signature=True, + virtual_hosted_style_request=False, + region=_find_bucket_region(bucket), + ) + + raise NotImplementedError(f"{parsed.scheme} is not yet supported") + + +class ObjectStoreRegistry: """ - # Sort keys by length in descending order to ensure longer, more specific matches take precedence - sorted_keys = sorted(stores.keys(), key=len, reverse=True) - - # Check each key to see if it's a prefix of the uri_string - for key in sorted_keys: - if request_key.startswith(key): - parsed_key = urlparse(request_key) - return StoreRequest(store=stores[key], key=parsed_key.path) - # if no match is found, raise an error - raise ValueError( - f"Expected the one of stores.keys() to match the data prefix, got {stores.keys()} and {request_key}" - ) + ObjectStoreRegistry maps the URL scheme and netloc to ObjectStore instances. This register allows + Zarr Store implementations (e.g., ManifestStore) to read from different ObjectStore instances. + """ + + _stores: dict[str, ObjectStore] + + @classmethod + def __init__(self, stores: dict[str, ObjectStore] | None = None): + stores = stores or {} + for store in stores.values(): + if not store.__class__.__module__.startswith("obstore"): + raise TypeError(f"expected ObjectStore class, got {store!r}") + self._stores = stores + + def register_store(self, url: str, store: ObjectStore): + """ + Register a store using the given url + + If a store with the same key existed before, it is replaced + """ + parsed = urlparse(url) + scheme = parsed.scheme or "file" + self._stores[f"{scheme}://{parsed.netloc}"] = store + + def get_store(self, url: str) -> ObjectStore: + """ + Get a suitable store for the provided URL. For example: + + - URL with scheme file:/// or no scheme will return the default LocalFS store + - URL with scheme s3://bucket/ will return the S3 store + + If no `ObjectStore` is found for the `url`, ad-hoc discovery may be executed depending on the + `url`. An `ObjectStore` may be lazily created and registered. + + Parameters: + ----------- + url : str + A url to identify the appropriate object_store instance based on the URL scheme and netloc. + Returns: + -------- + StoreRequest + """ + parsed = urlparse(url) + store = self._stores.get(f"{parsed.scheme}://{parsed.netloc}") + if not store: + store = default_object_store(url) + self.register_store(url, store) + return store class ManifestStore(Store): @@ -179,11 +218,9 @@ class ManifestStore(Store): group : ManifestGroup Root group of the store. Contains group metadata, ManifestArrays, and any subgroups. - stores : dict[prefix, :class:`obstore.store.ObjectStore`] - A mapping of url prefixes to obstore Store instances set up with the proper credentials. - - The prefixes are matched to the URIs in the ManifestArrays to determine which store to - use for making requests. + store_registry : ObjectStoreRegistry + ObjectStoreRegistry that maps the URL scheme and netloc to ObjectStore instances, + allowing ManifestStores to read from different ObjectStore instances. Warnings -------- @@ -196,57 +233,51 @@ class ManifestStore(Store): """ _group: ManifestGroup - _stores: StoreDict + _store_registry: ObjectStoreRegistry def __eq__(self, value: object): NotImplementedError def __init__( - self, - group: ManifestGroup, - *, - stores: StoreDict, # TODO: Consider using a sequence of tuples rather than a dict (see https://github.com/zarr-developers/VirtualiZarr/pull/490#discussion_r2010717898). + self, group: ManifestGroup, *, store_registry: ObjectStoreRegistry | None = None ) -> None: - """Instantiate a new ManifestStore + """Instantiate a new ManifestStore. Parameters ---------- manifest_group : ManifestGroup Manifest Group containing Group metadata and mapping variable names to ManifestArrays - stores : dict[prefix, :class:`obstore.store.ObjectStore`] - A mapping of url prefixes to obstore Store instances set up with the proper credentials. - - The prefixes are matched to the URIs in the ManifestArrays to determine which store to - use for making requests. + store_registry : ObjectStoreRegistry + A registry mapping the URL scheme and netloc to ObjectStore instances, + allowing ManifestStores to read from different ObjectStore instances. """ - for store in stores.values(): - if not store.__class__.__module__.startswith("obstore"): - raise TypeError(f"expected ObjectStore class, got {store!r}") # TODO: Don't allow stores with prefix if not isinstance(group, ManifestGroup): raise TypeError super().__init__(read_only=True) - self._stores = stores + if store_registry is None: + store_registry = ObjectStoreRegistry() + self._store_registry = store_registry self._group = group def __str__(self) -> str: - return f"ManifestStore(group={self._group}, stores={self._stores})" + return f"ManifestStore(group={self._group}, stores={self._store_registry})" def __getstate__(self) -> dict[Any, Any]: state = self.__dict__.copy() - stores = state["_stores"].copy() + stores = state["_store_registry"]._stores.copy() for k, v in stores.items(): stores[k] = pickle.dumps(v) - state["_stores"] = stores + state["_store_registry"] = stores return state def __setstate__(self, state: dict[Any, Any]) -> None: - stores = state["_stores"].copy() + stores = state["_store_registry"].copy() for k, v in stores.items(): stores[k] = pickle.loads(v) - state["_stores"] = stores + state["_store_registry"] = ObjectStoreRegistry(stores) self.__dict__.update(state) async def get( @@ -270,8 +301,10 @@ async def get( path = manifest._paths[*chunk_indexes] offset = manifest._offsets[*chunk_indexes] length = manifest._lengths[*chunk_indexes] - # Get the configured object store instance that matches the path - store_request = find_matching_store(stores=self._stores, request_key=path) + # Get the configured object store instance that matches the path + store = self._store_registry.get_store(path) + # Truncate path to match Obstore expectations + key = urlparse(path).path # Transform the input byte range to account for the chunk location in the file chunk_end_exclusive = offset + length byte_range = _transform_byte_range( @@ -280,8 +313,8 @@ async def get( # Actually get the bytes try: bytes = await obs.get_range_async( - store_request.store, - store_request.key, + store, + key, start=byte_range.start, end=byte_range.end, ) diff --git a/virtualizarr/readers/hdf/hdf.py b/virtualizarr/readers/hdf/hdf.py index 3d96e4885..c6947ccd9 100644 --- a/virtualizarr/readers/hdf/hdf.py +++ b/virtualizarr/readers/hdf/hdf.py @@ -28,6 +28,7 @@ ManifestStore, ) from virtualizarr.manifests.manifest import validate_and_normalize_path_to_uri +from virtualizarr.manifests.store import ObjectStoreRegistry, default_object_store from virtualizarr.manifests.utils import create_v3_array_metadata from virtualizarr.readers.api import VirtualBackend from virtualizarr.readers.hdf.filters import cfcodec_from_dataset, codecs_from_dataset @@ -156,16 +157,18 @@ def _construct_manifest_group( def _create_manifest_store( filepath: str, *, - prefix: str, - store: ObjectStore, + store: ObjectStore | None = None, group: str | None = None, ) -> ManifestStore: # Create a group containing dataset level metadata and all the manifest arrays + if not store: + store = default_object_store(filepath) # type: ignore manifest_group = HDFVirtualBackend._construct_manifest_group( store=store, filepath=filepath, group=group ) + registry = ObjectStoreRegistry({filepath: store}) # Convert to a manifest store - return ManifestStore(stores={prefix: store}, group=manifest_group) + return ManifestStore(store_registry=registry, group=manifest_group) @staticmethod def open_virtual_dataset( diff --git a/virtualizarr/tests/conftest.py b/virtualizarr/tests/conftest.py index baeb95a55..a73de6e37 100644 --- a/virtualizarr/tests/conftest.py +++ b/virtualizarr/tests/conftest.py @@ -1,3 +1,4 @@ +import json import time import pytest @@ -37,7 +38,7 @@ def minio_bucket(container): # Setup with guidance from https://medium.com/@sant1/using-minio-with-docker-and-python-cbbad397cb5d from minio import Minio - bucket = "mybucket" + bucket = "my-bucket" filename = "test.nc" # Initialize MinIO client client = Minio( @@ -47,6 +48,28 @@ def minio_bucket(container): secure=False, ) client.make_bucket(bucket) + policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": {"AWS": "*"}, + "Action": ["s3:GetBucketLocation", "s3:ListBucket"], + "Resource": "arn:aws:s3:::my-bucket", + }, + { + "Effect": "Allow", + "Principal": {"AWS": "*"}, + "Action": [ + "s3:GetObject", + "s3:GetObjectRetention", + "s3:GetObjectLegalHold", + ], + "Resource": "arn:aws:s3:::my-bucket/*", + }, + ], + } + client.set_bucket_policy(bucket, json.dumps(policy)) yield { "port": container["port"], "endpoint": container["endpoint"], diff --git a/virtualizarr/tests/test_integration.py b/virtualizarr/tests/test_integration.py index a8beeaab0..e73c5ce6d 100644 --- a/virtualizarr/tests/test_integration.py +++ b/virtualizarr/tests/test_integration.py @@ -1,7 +1,7 @@ from collections.abc import Mapping from os.path import relpath from pathlib import Path -from typing import Any, Callable, Concatenate, TypeAlias, overload +from typing import Any, Callable, Concatenate, TypeAlias import numpy as np import pytest @@ -114,16 +114,6 @@ def roundtrip_as_kerchunk_parquet(vds: xr.Dataset, tmpdir, **kwargs): return xr.open_dataset(f"{tmpdir}/refs.parquet", engine="kerchunk", **kwargs) -@overload -def roundtrip_as_in_memory_icechunk( - vdata: xr.Dataset, tmp_path: Path, **kwargs -) -> xr.Dataset: ... -@overload -def roundtrip_as_in_memory_icechunk( - vdata: xr.DataTree, tmp_path: Path, **kwargs -) -> xr.DataTree: ... - - def roundtrip_as_in_memory_icechunk( vdata: xr.Dataset | xr.DataTree, tmp_path: Path, diff --git a/virtualizarr/tests/test_manifests/test_store.py b/virtualizarr/tests/test_manifests/test_store.py index a9d4a9ca2..094b003ac 100644 --- a/virtualizarr/tests/test_manifests/test_store.py +++ b/virtualizarr/tests/test_manifests/test_store.py @@ -19,7 +19,9 @@ ManifestArray, ManifestGroup, ManifestStore, + ObjectStoreRegistry, ) +from virtualizarr.manifests.store import default_object_store from virtualizarr.manifests.utils import create_v3_array_metadata from virtualizarr.tests import ( requires_hdf5plugin, @@ -87,7 +89,8 @@ def _generate_manifest_store( arrays={"foo": manifest_array, "bar": manifest_array}, attributes={"Zarr": "Hooray!"}, ) - return ManifestStore(stores={prefix: store}, group=manifest_group) + registry = ObjectStoreRegistry({prefix: store}) + return ManifestStore(store_registry=registry, group=manifest_group) @pytest.fixture() @@ -125,6 +128,27 @@ def s3_store(minio_bucket): ) +@requires_obstore +@requires_minio +def test_default_object_store_s3(minio_bucket): + from obstore.store import S3Store + + filepath = f"s3://{minio_bucket['bucket']}/data/data.tmp" + store = default_object_store( + filepath, + ) + assert isinstance(store, S3Store) + + +@requires_obstore +def test_default_object_store_local(tmpdir): + from obstore.store import LocalStore + + filepath = f"{tmpdir}/data.tmp" + store = default_object_store(filepath) + assert isinstance(store, LocalStore) + + @requires_obstore class TestManifestStore: def test_manifest_store_properties(self, local_store): @@ -237,8 +261,6 @@ class TestToVirtualXarray: def test_single_group_to_dataset( self, manifest_array, loadable_variables, expected_loadable_variables ): - import obstore as obs - marr1 = manifest_array( shape=(3, 2, 5), chunks=(1, 2, 1), dimension_names=["x", "y", "t"] ) @@ -254,8 +276,7 @@ def test_single_group_to_dataset( attributes={"coordinates": "elevation t", "ham": "eggs"}, ) - local_store = obs.store.LocalStore() - manifest_store = ManifestStore(manifest_group, stores={"file://": local_store}) + manifest_store = ManifestStore(manifest_group) vds = manifest_store.to_virtual_dataset(loadable_variables=loadable_variables) assert set(vds.variables) == set(["T", "elevation", "t"]) diff --git a/virtualizarr/tests/test_readers/conftest.py b/virtualizarr/tests/test_readers/conftest.py index e49de78c1..d52b11b7d 100644 --- a/virtualizarr/tests/test_readers/conftest.py +++ b/virtualizarr/tests/test_readers/conftest.py @@ -433,3 +433,20 @@ def cf_array_fill_value_hdf5_file(tmp_path: Path) -> str: dset.attrs["_FillValue"] = np.array([np.nan]) return filepath + + +@pytest.fixture() +def chunked_roundtrip_hdf5_s3_file(minio_bucket, cf_array_fill_value_hdf5_file): + import obstore as obs + + store = obs.store.S3Store( + minio_bucket["bucket"], + aws_endpoint=minio_bucket["endpoint"], + access_key_id=minio_bucket["username"], + secret_access_key=minio_bucket["password"], + virtual_hosted_style_request=False, + client_options={"allow_http": True}, + ) + filepath = "data/cf_array_fill_value.nc" + obs.put(store, filepath, cf_array_fill_value_hdf5_file) + return f"s3://{minio_bucket['bucket']}/{filepath}" diff --git a/virtualizarr/tests/test_readers/test_hdf/test_hdf_manifest_store.py b/virtualizarr/tests/test_readers/test_hdf/test_hdf_manifest_store.py index e226e6647..4f7ff8e1c 100644 --- a/virtualizarr/tests/test_readers/test_hdf/test_hdf_manifest_store.py +++ b/virtualizarr/tests/test_readers/test_hdf/test_hdf_manifest_store.py @@ -2,9 +2,11 @@ import pytest import xarray as xr +from virtualizarr.manifests import ManifestArray from virtualizarr.readers.hdf import HDFVirtualBackend from virtualizarr.tests import ( requires_hdf5plugin, + requires_minio, requires_obstore, ) @@ -25,16 +27,58 @@ def basic_ds(): @requires_obstore class TestHDFManifestStore: def test_rountrip_simple_virtualdataset(self, tmpdir, basic_ds): - from obstore.store import LocalStore - "Roundtrip a dataset to/from NetCDF with the HDF reader and ManifestStore" filepath = f"{tmpdir}/basic_ds_roundtrip.nc" basic_ds.to_netcdf(filepath, engine="h5netcdf") store = HDFVirtualBackend._create_manifest_store( - filepath=filepath, store=LocalStore(), prefix="file://" + filepath=filepath, ) rountripped_ds = xr.open_dataset( store, engine="zarr", consolidated=False, zarr_format=3 ) xr.testing.assert_allclose(basic_ds, rountripped_ds) + + def test_rountrip_simple_virtualdataset_default_store(self, tmpdir, basic_ds): + "Roundtrip a dataset to/from NetCDF with the HDF reader and ManifestStore" + + filepath = f"{tmpdir}/basic_ds_roundtrip.nc" + basic_ds.to_netcdf(filepath, engine="h5netcdf") + store = HDFVirtualBackend._create_manifest_store(filepath=filepath) + rountripped_ds = xr.open_dataset( + store, engine="zarr", consolidated=False, zarr_format=3 + ) + xr.testing.assert_allclose(basic_ds, rountripped_ds) + + @requires_minio + @requires_obstore + def test_store(self, minio_bucket, chunked_roundtrip_hdf5_s3_file): + import obstore as obs + + s3store = obs.store.S3Store( + bucket=minio_bucket["bucket"], + config={ + "endpoint": minio_bucket["endpoint"], + "virtual_hosted_style_request": False, + "skip_signature": True, + }, + client_options={"allow_http": True}, + ) + store = HDFVirtualBackend._create_manifest_store( + filepath=chunked_roundtrip_hdf5_s3_file, + store=s3store, + ) + vds = store.to_virtual_dataset() + assert vds.dims == {"phony_dim_0": 5} + assert isinstance(vds["data"].data, ManifestArray) + + @requires_obstore + def test_default_store(self): + store = HDFVirtualBackend._create_manifest_store( + filepath="s3://carbonplan-share/virtualizarr/local.nc", + ) + vds = store.to_virtual_dataset() + assert vds.dims == {"time": 2920, "lat": 25, "lon": 53} + assert isinstance(vds["air"].data, ManifestArray) + for name in ["time", "lat", "lon"]: + assert isinstance(vds[name].data, np.ndarray) diff --git a/virtualizarr/utils.py b/virtualizarr/utils.py index a0ef2b040..b25b6c1e8 100644 --- a/virtualizarr/utils.py +++ b/virtualizarr/utils.py @@ -4,6 +4,7 @@ import io from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, Iterable, Optional, Union +from urllib.parse import urlparse from zarr.abc.codec import ArrayArrayCodec, BytesBytesCodec from zarr.core.metadata import ArrayV2Metadata, ArrayV3Metadata @@ -28,7 +29,9 @@ class ObstoreReader: def __init__(self, store: ObjectStore, path: str) -> None: import obstore as obs - self._reader = obs.open_reader(store, path) + parsed = urlparse(path) + + self._reader = obs.open_reader(store, parsed.path) def read(self, size: int, /) -> bytes: return self._reader.read(size).to_bytes()