Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
d71b343
Add default_object_store internal func
maxrjones Apr 12, 2025
d1faa78
Remove TODO because lack of duplicate keys are useful
maxrjones Apr 12, 2025
c360707
Also return prefix
maxrjones Apr 12, 2025
938ff3b
Remove config complexity
maxrjones Apr 12, 2025
7ed96f4
Make _find_matching_store a method
maxrjones Apr 12, 2025
89f3f3a
Test default store creation with HDF5 reader
maxrjones Apr 12, 2025
34f9d0c
Improve typing
maxrjones Apr 12, 2025
7094212
Protect against duplicate config options
maxrjones Apr 12, 2025
78f483f
Mark minio tests
maxrjones Apr 12, 2025
6c901bf
Fix test
maxrjones Apr 12, 2025
cd5fc34
Specify that other schemes aren't supported
maxrjones Apr 13, 2025
6899e79
Make mypy pass
maxrjones Apr 13, 2025
1bf5495
Revise codecov config
maxrjones Apr 13, 2025
bc26ebb
Apply suggestions from code review
maxrjones Apr 15, 2025
e503df6
Revert pyproject.toml change
maxrjones Apr 15, 2025
62fd065
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 15, 2025
d8e5fd5
Refactor as ObjectStoreRegistry
maxrjones Apr 17, 2025
53a26ee
Fix typing
maxrjones Apr 17, 2025
4bd9e2b
Improve docstrings
maxrjones Apr 17, 2025
77a0124
Fix store parsing
maxrjones Apr 17, 2025
4e0b92c
Update hdf reader to use ObjectStoreRegistry
maxrjones Apr 17, 2025
ca48b6c
Infer region from HEAD request
maxrjones Apr 17, 2025
bc41e9b
Remove overloads that upset mypy
maxrjones Apr 17, 2025
ad7c003
Update virtualizarr/manifests/store.py
maxrjones Apr 18, 2025
1ee867b
Remove mypy from pre-commit
maxrjones Apr 18, 2025
e96f6c8
Use scheme and netlock as registry keys
maxrjones Apr 18, 2025
2d84e2c
Remove unused function
maxrjones Apr 18, 2025
7e9233f
Update docstrings
maxrjones Apr 18, 2025
e3a6267
Consolidate imports
maxrjones Apr 18, 2025
9fe8af3
Add docstring
maxrjones Apr 18, 2025
f183f9d
Remove type-ignore
maxrjones Apr 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ readthedocs = "rm -rf $READTHEDOCS_OUTPUT/html && cp -r docs/_build/html $READTH

# Define commands to run within the docs environment
[tool.pixi.feature.minio.tasks]
run-tests = { cmd = "pytest virtualizarr/tests/test_manifests/test_store.py --run-minio-tests --verbose" }
run-tests-xml-cov = { cmd = "pytest virtualizarr/tests/test_manifests/test_store.py --run-minio-tests --verbose --cov-report=xml" }
run-tests = { cmd = "pytest virtualizarr/tests/test_manifests/test_store.py virtualizarr/tests/test_readers/test_hdf/test_hdf_manifest_store.py --run-minio-tests --run-network-tests --verbose" }
run-tests-xml-cov = { cmd = "pytest virtualizarr/tests/test_manifests/test_store.py virtualizarr/tests/test_readers/test_hdf/test_hdf_manifest_store.py --run-minio-tests --run-network-tests --verbose --cov-report=xml" }

[tool.setuptools_scm]
fallback_version = "9999"
Expand All @@ -227,6 +227,7 @@ module = [
"numcodecs.*",
"ujson",
"zarr",
"requests",
]
ignore_missing_imports = true

Expand Down Expand Up @@ -272,7 +273,6 @@ line-ending = "auto"
known-first-party = ["virtualizarr"]

[tool.coverage.run]
include = ["virtualizarr/"]
omit = ["conftest.py", "virtualizarr/tests/*"]

[tool.coverage.report]
Expand Down
2 changes: 1 addition & 1 deletion virtualizarr/manifests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
from virtualizarr.manifests.array import ManifestArray # type: ignore # noqa
from virtualizarr.manifests.group import ManifestGroup # type: ignore # noqa
from virtualizarr.manifests.manifest import ChunkEntry, ChunkManifest # type: ignore # noqa
from virtualizarr.manifests.store import ManifestStore # type: ignore # noqa
from virtualizarr.manifests.store import ManifestStore, ObjectStoreRegistry # type: ignore # noqa
181 changes: 107 additions & 74 deletions virtualizarr/manifests/store.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import annotations

import pickle
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, Mapping
from collections.abc import AsyncGenerator, Iterable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse

from zarr.abc.store import (
Expand All @@ -12,20 +13,24 @@
Store,
SuffixByteRequest,
)
from zarr.core.buffer import Buffer
from zarr.core.buffer import Buffer, default_buffer_prototype
from zarr.core.buffer.core import BufferPrototype

from virtualizarr.manifests.array import ManifestArray
from virtualizarr.manifests.group import ManifestGroup
from virtualizarr.vendor.zarr.metadata import dict_to_buffer

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Iterable
from collections.abc import AsyncGenerator, Iterable, Mapping
from typing import Any

import xarray as xr
from obstore.store import (
ObjectStore, # type: ignore[import-not-found]
)
from zarr.core.buffer import BufferPrototype
from zarr.core.common import BytesLike


__all__ = ["ManifestStore"]


Expand All @@ -35,21 +40,6 @@
NotADirectoryError,
)

from collections.abc import AsyncGenerator
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, TypeAlias

from zarr.core.buffer import default_buffer_prototype

from virtualizarr.vendor.zarr.metadata import dict_to_buffer

if TYPE_CHECKING:
from obstore.store import ObjectStore # type: ignore[import-not-found]

StoreDict: TypeAlias = dict[str, ObjectStore]

import xarray as xr


@dataclass
class StoreRequest:
Expand Down Expand Up @@ -137,34 +127,83 @@ def parse_manifest_index(key: str, chunk_key_encoding: str = ".") -> tuple[int,
return tuple(int(ind) for ind in parts[1].split(chunk_key_encoding))


def find_matching_store(stores: StoreDict, request_key: str) -> StoreRequest:
"""
Find the matching store based on the store keys and the beginning of the URI strings,
to fetch data from the appropriately configured ObjectStore.
def _find_bucket_region(bucket_name: str) -> str:
Comment thread
maxrjones marked this conversation as resolved.
import requests

Parameters:
-----------
stores : StoreDict
A dictionary with URI prefixes for different stores as keys
request_key : str
A string to match against the dictionary keys
resp = requests.head(f"https://{bucket_name}.s3.amazonaws.com")
Comment thread
maxrjones marked this conversation as resolved.
return resp.headers["x-amz-bucket-region"]

Returns:
--------
StoreRequest

def default_object_store(filepath: str) -> ObjectStore:
import obstore as obs

parsed = urlparse(filepath)

if parsed.scheme in ["", "file"]:
return obs.store.LocalStore()
if parsed.scheme == "s3":
bucket = parsed.netloc
return obs.store.S3Store(
bucket=bucket,
client_options={"allow_http": True},
skip_signature=True,
virtual_hosted_style_request=False,
region=_find_bucket_region(bucket),
)

raise NotImplementedError(f"{parsed.scheme} is not yet supported")


class ObjectStoreRegistry:
"""
# Sort keys by length in descending order to ensure longer, more specific matches take precedence
sorted_keys = sorted(stores.keys(), key=len, reverse=True)

# Check each key to see if it's a prefix of the uri_string
for key in sorted_keys:
if request_key.startswith(key):
parsed_key = urlparse(request_key)
return StoreRequest(store=stores[key], key=parsed_key.path)
# if no match is found, raise an error
raise ValueError(
f"Expected the one of stores.keys() to match the data prefix, got {stores.keys()} and {request_key}"
)
ObjectStoreRegistry maps the URL scheme and netloc to ObjectStore instances. This register allows
Zarr Store implementations (e.g., ManifestStore) to read from different ObjectStore instances.
"""

_stores: dict[str, ObjectStore]

@classmethod
def __init__(self, stores: dict[str, ObjectStore] | None = None):
stores = stores or {}
for store in stores.values():
if not store.__class__.__module__.startswith("obstore"):
raise TypeError(f"expected ObjectStore class, got {store!r}")
self._stores = stores

def register_store(self, url: str, store: ObjectStore):
"""
Register a store using the given url

If a store with the same key existed before, it is replaced
"""
parsed = urlparse(url)
scheme = parsed.scheme or "file"
self._stores[f"{scheme}://{parsed.netloc}"] = store

def get_store(self, url: str) -> ObjectStore:
"""
Get a suitable store for the provided URL. For example:

- URL with scheme file:/// or no scheme will return the default LocalFS store
- URL with scheme s3://bucket/ will return the S3 store

If no `ObjectStore` is found for the `url`, ad-hoc discovery may be executed depending on the
`url`. An `ObjectStore` may be lazily created and registered.

Parameters:
-----------
url : str
A url to identify the appropriate object_store instance based on the URL scheme and netloc.
Returns:
--------
StoreRequest
"""
parsed = urlparse(url)
store = self._stores.get(f"{parsed.scheme}://{parsed.netloc}")
if not store:
store = default_object_store(url)
self.register_store(url, store)
return store


class ManifestStore(Store):
Expand All @@ -179,11 +218,9 @@ class ManifestStore(Store):
group : ManifestGroup
Root group of the store.
Contains group metadata, ManifestArrays, and any subgroups.
stores : dict[prefix, :class:`obstore.store.ObjectStore`]
A mapping of url prefixes to obstore Store instances set up with the proper credentials.

The prefixes are matched to the URIs in the ManifestArrays to determine which store to
use for making requests.
store_registry : ObjectStoreRegistry
ObjectStoreRegistry that maps the URL scheme and netloc to ObjectStore instances,
allowing ManifestStores to read from different ObjectStore instances.

Warnings
--------
Expand All @@ -196,57 +233,51 @@ class ManifestStore(Store):
"""

_group: ManifestGroup
_stores: StoreDict
_store_registry: ObjectStoreRegistry

def __eq__(self, value: object):
NotImplementedError

def __init__(
self,
group: ManifestGroup,
*,
stores: StoreDict, # TODO: Consider using a sequence of tuples rather than a dict (see https://github.com/zarr-developers/VirtualiZarr/pull/490#discussion_r2010717898).
self, group: ManifestGroup, *, store_registry: ObjectStoreRegistry | None = None
) -> None:
"""Instantiate a new ManifestStore
"""Instantiate a new ManifestStore.

Parameters
----------
manifest_group : ManifestGroup
Manifest Group containing Group metadata and mapping variable names to ManifestArrays
stores : dict[prefix, :class:`obstore.store.ObjectStore`]
A mapping of url prefixes to obstore Store instances set up with the proper credentials.

The prefixes are matched to the URIs in the ManifestArrays to determine which store to
use for making requests.
store_registry : ObjectStoreRegistry
A registry mapping the URL scheme and netloc to ObjectStore instances,
allowing ManifestStores to read from different ObjectStore instances.
"""
for store in stores.values():
if not store.__class__.__module__.startswith("obstore"):
raise TypeError(f"expected ObjectStore class, got {store!r}")

# TODO: Don't allow stores with prefix
if not isinstance(group, ManifestGroup):
raise TypeError

super().__init__(read_only=True)
self._stores = stores
if store_registry is None:
store_registry = ObjectStoreRegistry()
self._store_registry = store_registry
self._group = group

def __str__(self) -> str:
return f"ManifestStore(group={self._group}, stores={self._stores})"
return f"ManifestStore(group={self._group}, stores={self._store_registry})"

def __getstate__(self) -> dict[Any, Any]:
state = self.__dict__.copy()
stores = state["_stores"].copy()
stores = state["_store_registry"]._stores.copy()
for k, v in stores.items():
stores[k] = pickle.dumps(v)
state["_stores"] = stores
state["_store_registry"] = stores
return state

def __setstate__(self, state: dict[Any, Any]) -> None:
stores = state["_stores"].copy()
stores = state["_store_registry"].copy()
for k, v in stores.items():
stores[k] = pickle.loads(v)
state["_stores"] = stores
state["_store_registry"] = ObjectStoreRegistry(stores)
self.__dict__.update(state)

async def get(
Expand All @@ -270,8 +301,10 @@ async def get(
path = manifest._paths[*chunk_indexes]
offset = manifest._offsets[*chunk_indexes]
length = manifest._lengths[*chunk_indexes]
# Get the configured object store instance that matches the path
store_request = find_matching_store(stores=self._stores, request_key=path)
# Get the configured object store instance that matches the path
store = self._store_registry.get_store(path)
# Truncate path to match Obstore expectations
key = urlparse(path).path
# Transform the input byte range to account for the chunk location in the file
chunk_end_exclusive = offset + length
byte_range = _transform_byte_range(
Expand All @@ -280,8 +313,8 @@ async def get(
# Actually get the bytes
try:
bytes = await obs.get_range_async(
store_request.store,
store_request.key,
store,
key,
start=byte_range.start,
end=byte_range.end,
)
Expand Down
9 changes: 6 additions & 3 deletions virtualizarr/readers/hdf/hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
ManifestStore,
)
from virtualizarr.manifests.manifest import validate_and_normalize_path_to_uri
from virtualizarr.manifests.store import ObjectStoreRegistry, default_object_store
from virtualizarr.manifests.utils import create_v3_array_metadata
from virtualizarr.readers.api import VirtualBackend
from virtualizarr.readers.hdf.filters import cfcodec_from_dataset, codecs_from_dataset
Expand Down Expand Up @@ -156,16 +157,18 @@ def _construct_manifest_group(
def _create_manifest_store(
filepath: str,
*,
prefix: str,
store: ObjectStore,
store: ObjectStore | None = None,
Comment on lines -159 to +160
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sharkinsspatial with using the scheme + bucket for the ObjectStoreRegistry keys, we no longer need the prefix argument for _create_manifest_store since it can be inferred from the filepath. 🙇 to @kylebarron for this suggested approach

group: str | None = None,
) -> ManifestStore:
# Create a group containing dataset level metadata and all the manifest arrays
if not store:
store = default_object_store(filepath) # type: ignore
manifest_group = HDFVirtualBackend._construct_manifest_group(
store=store, filepath=filepath, group=group
)
registry = ObjectStoreRegistry({filepath: store})
# Convert to a manifest store
return ManifestStore(stores={prefix: store}, group=manifest_group)
return ManifestStore(store_registry=registry, group=manifest_group)

@staticmethod
def open_virtual_dataset(
Expand Down
25 changes: 24 additions & 1 deletion virtualizarr/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import time

import pytest
Expand Down Expand Up @@ -37,7 +38,7 @@ def minio_bucket(container):
# Setup with guidance from https://medium.com/@sant1/using-minio-with-docker-and-python-cbbad397cb5d
from minio import Minio

bucket = "mybucket"
bucket = "my-bucket"
filename = "test.nc"
# Initialize MinIO client
client = Minio(
Expand All @@ -47,6 +48,28 @@ def minio_bucket(container):
secure=False,
)
client.make_bucket(bucket)
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["s3:GetBucketLocation", "s3:ListBucket"],
"Resource": "arn:aws:s3:::my-bucket",
},
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": [
"s3:GetObject",
"s3:GetObjectRetention",
"s3:GetObjectLegalHold",
],
"Resource": "arn:aws:s3:::my-bucket/*",
},
],
}
client.set_bucket_policy(bucket, json.dumps(policy))
yield {
"port": container["port"],
"endpoint": container["endpoint"],
Expand Down
Loading