Skip to content

Commit 259ddeb

Browse files
maxrjoneschuckwondopre-commit-ci[bot]kylebarron
authored
Add ObjectStoreRegistry to support default stores (#549)
* Add default_object_store internal func * Remove TODO because lack of duplicate keys are useful * Also return prefix * Remove config complexity * Make _find_matching_store a method * Test default store creation with HDF5 reader * Improve typing * Protect against duplicate config options * Mark minio tests * Fix test * Specify that other schemes aren't supported * Make mypy pass * Revise codecov config * Apply suggestions from code review Co-authored-by: Chuck Daniels <cjdaniels4@gmail.com> * Revert pyproject.toml change * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Refactor as ObjectStoreRegistry * Fix typing * Improve docstrings * Fix store parsing * Update hdf reader to use ObjectStoreRegistry * Infer region from HEAD request * Remove overloads that upset mypy * Update virtualizarr/manifests/store.py Co-authored-by: Kyle Barron <kylebarron2@gmail.com> * Remove mypy from pre-commit * Use scheme and netlock as registry keys * Remove unused function * Update docstrings * Consolidate imports * Add docstring * Remove type-ignore --------- Co-authored-by: Chuck Daniels <cjdaniels4@gmail.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Kyle Barron <kylebarron2@gmail.com>
1 parent 5cb63e6 commit 259ddeb

10 files changed

Lines changed: 236 additions & 102 deletions

File tree

pyproject.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,8 @@ readthedocs = "rm -rf $READTHEDOCS_OUTPUT/html && cp -r docs/_build/html $READTH
201201

202202
# Define commands to run within the docs environment
203203
[tool.pixi.feature.minio.tasks]
204-
run-tests = { cmd = "pytest virtualizarr/tests/test_manifests/test_store.py --run-minio-tests --verbose" }
205-
run-tests-xml-cov = { cmd = "pytest virtualizarr/tests/test_manifests/test_store.py --run-minio-tests --verbose --cov-report=xml" }
204+
run-tests = { cmd = "pytest virtualizarr/tests/test_manifests/test_store.py virtualizarr/tests/test_readers/test_hdf/test_hdf_manifest_store.py --run-minio-tests --run-network-tests --verbose" }
205+
run-tests-xml-cov = { cmd = "pytest virtualizarr/tests/test_manifests/test_store.py virtualizarr/tests/test_readers/test_hdf/test_hdf_manifest_store.py --run-minio-tests --run-network-tests --verbose --cov-report=xml" }
206206

207207
[tool.setuptools_scm]
208208
fallback_version = "9999"
@@ -227,6 +227,7 @@ module = [
227227
"numcodecs.*",
228228
"ujson",
229229
"zarr",
230+
"requests",
230231
]
231232
ignore_missing_imports = true
232233

@@ -272,7 +273,6 @@ line-ending = "auto"
272273
known-first-party = ["virtualizarr"]
273274

274275
[tool.coverage.run]
275-
include = ["virtualizarr/"]
276276
omit = ["conftest.py", "virtualizarr/tests/*"]
277277

278278
[tool.coverage.report]

virtualizarr/manifests/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@
44
from virtualizarr.manifests.array import ManifestArray # type: ignore # noqa
55
from virtualizarr.manifests.group import ManifestGroup # type: ignore # noqa
66
from virtualizarr.manifests.manifest import ChunkEntry, ChunkManifest # type: ignore # noqa
7-
from virtualizarr.manifests.store import ManifestStore # type: ignore # noqa
7+
from virtualizarr.manifests.store import ManifestStore, ObjectStoreRegistry # type: ignore # noqa

virtualizarr/manifests/store.py

Lines changed: 107 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
from __future__ import annotations
22

33
import pickle
4-
from collections.abc import Iterable
5-
from typing import TYPE_CHECKING, Any, Mapping
4+
from collections.abc import AsyncGenerator, Iterable
5+
from dataclasses import dataclass
6+
from typing import TYPE_CHECKING, Any
67
from urllib.parse import urlparse
78

89
from zarr.abc.store import (
@@ -12,20 +13,24 @@
1213
Store,
1314
SuffixByteRequest,
1415
)
15-
from zarr.core.buffer import Buffer
16+
from zarr.core.buffer import Buffer, default_buffer_prototype
1617
from zarr.core.buffer.core import BufferPrototype
1718

1819
from virtualizarr.manifests.array import ManifestArray
1920
from virtualizarr.manifests.group import ManifestGroup
21+
from virtualizarr.vendor.zarr.metadata import dict_to_buffer
2022

2123
if TYPE_CHECKING:
22-
from collections.abc import AsyncGenerator, Iterable
24+
from collections.abc import AsyncGenerator, Iterable, Mapping
2325
from typing import Any
2426

27+
import xarray as xr
28+
from obstore.store import (
29+
ObjectStore, # type: ignore[import-not-found]
30+
)
2531
from zarr.core.buffer import BufferPrototype
2632
from zarr.core.common import BytesLike
2733

28-
2934
__all__ = ["ManifestStore"]
3035

3136

@@ -35,21 +40,6 @@
3540
NotADirectoryError,
3641
)
3742

38-
from collections.abc import AsyncGenerator
39-
from dataclasses import dataclass
40-
from typing import TYPE_CHECKING, Any, TypeAlias
41-
42-
from zarr.core.buffer import default_buffer_prototype
43-
44-
from virtualizarr.vendor.zarr.metadata import dict_to_buffer
45-
46-
if TYPE_CHECKING:
47-
from obstore.store import ObjectStore # type: ignore[import-not-found]
48-
49-
StoreDict: TypeAlias = dict[str, ObjectStore]
50-
51-
import xarray as xr
52-
5343

5444
@dataclass
5545
class StoreRequest:
@@ -137,34 +127,83 @@ def parse_manifest_index(key: str, chunk_key_encoding: str = ".") -> tuple[int,
137127
return tuple(int(ind) for ind in parts[1].split(chunk_key_encoding))
138128

139129

140-
def find_matching_store(stores: StoreDict, request_key: str) -> StoreRequest:
141-
"""
142-
Find the matching store based on the store keys and the beginning of the URI strings,
143-
to fetch data from the appropriately configured ObjectStore.
130+
def _find_bucket_region(bucket_name: str) -> str:
131+
import requests
144132

145-
Parameters:
146-
-----------
147-
stores : StoreDict
148-
A dictionary with URI prefixes for different stores as keys
149-
request_key : str
150-
A string to match against the dictionary keys
133+
resp = requests.head(f"https://{bucket_name}.s3.amazonaws.com")
134+
return resp.headers["x-amz-bucket-region"]
151135

152-
Returns:
153-
--------
154-
StoreRequest
136+
137+
def default_object_store(filepath: str) -> ObjectStore:
138+
import obstore as obs
139+
140+
parsed = urlparse(filepath)
141+
142+
if parsed.scheme in ["", "file"]:
143+
return obs.store.LocalStore()
144+
if parsed.scheme == "s3":
145+
bucket = parsed.netloc
146+
return obs.store.S3Store(
147+
bucket=bucket,
148+
client_options={"allow_http": True},
149+
skip_signature=True,
150+
virtual_hosted_style_request=False,
151+
region=_find_bucket_region(bucket),
152+
)
153+
154+
raise NotImplementedError(f"{parsed.scheme} is not yet supported")
155+
156+
157+
class ObjectStoreRegistry:
155158
"""
156-
# Sort keys by length in descending order to ensure longer, more specific matches take precedence
157-
sorted_keys = sorted(stores.keys(), key=len, reverse=True)
158-
159-
# Check each key to see if it's a prefix of the uri_string
160-
for key in sorted_keys:
161-
if request_key.startswith(key):
162-
parsed_key = urlparse(request_key)
163-
return StoreRequest(store=stores[key], key=parsed_key.path)
164-
# if no match is found, raise an error
165-
raise ValueError(
166-
f"Expected the one of stores.keys() to match the data prefix, got {stores.keys()} and {request_key}"
167-
)
159+
ObjectStoreRegistry maps the URL scheme and netloc to ObjectStore instances. This register allows
160+
Zarr Store implementations (e.g., ManifestStore) to read from different ObjectStore instances.
161+
"""
162+
163+
_stores: dict[str, ObjectStore]
164+
165+
@classmethod
166+
def __init__(self, stores: dict[str, ObjectStore] | None = None):
167+
stores = stores or {}
168+
for store in stores.values():
169+
if not store.__class__.__module__.startswith("obstore"):
170+
raise TypeError(f"expected ObjectStore class, got {store!r}")
171+
self._stores = stores
172+
173+
def register_store(self, url: str, store: ObjectStore):
174+
"""
175+
Register a store using the given url
176+
177+
If a store with the same key existed before, it is replaced
178+
"""
179+
parsed = urlparse(url)
180+
scheme = parsed.scheme or "file"
181+
self._stores[f"{scheme}://{parsed.netloc}"] = store
182+
183+
def get_store(self, url: str) -> ObjectStore:
184+
"""
185+
Get a suitable store for the provided URL. For example:
186+
187+
- URL with scheme file:/// or no scheme will return the default LocalFS store
188+
- URL with scheme s3://bucket/ will return the S3 store
189+
190+
If no `ObjectStore` is found for the `url`, ad-hoc discovery may be executed depending on the
191+
`url`. An `ObjectStore` may be lazily created and registered.
192+
193+
Parameters:
194+
-----------
195+
url : str
196+
A url to identify the appropriate object_store instance based on the URL scheme and netloc.
197+
Returns:
198+
--------
199+
StoreRequest
200+
"""
201+
parsed = urlparse(url)
202+
store = self._stores.get(f"{parsed.scheme}://{parsed.netloc}")
203+
if not store:
204+
store = default_object_store(url)
205+
self.register_store(url, store)
206+
return store
168207

169208

170209
class ManifestStore(Store):
@@ -179,11 +218,9 @@ class ManifestStore(Store):
179218
group : ManifestGroup
180219
Root group of the store.
181220
Contains group metadata, ManifestArrays, and any subgroups.
182-
stores : dict[prefix, :class:`obstore.store.ObjectStore`]
183-
A mapping of url prefixes to obstore Store instances set up with the proper credentials.
184-
185-
The prefixes are matched to the URIs in the ManifestArrays to determine which store to
186-
use for making requests.
221+
store_registry : ObjectStoreRegistry
222+
ObjectStoreRegistry that maps the URL scheme and netloc to ObjectStore instances,
223+
allowing ManifestStores to read from different ObjectStore instances.
187224
188225
Warnings
189226
--------
@@ -196,57 +233,51 @@ class ManifestStore(Store):
196233
"""
197234

198235
_group: ManifestGroup
199-
_stores: StoreDict
236+
_store_registry: ObjectStoreRegistry
200237

201238
def __eq__(self, value: object):
202239
NotImplementedError
203240

204241
def __init__(
205-
self,
206-
group: ManifestGroup,
207-
*,
208-
stores: StoreDict, # TODO: Consider using a sequence of tuples rather than a dict (see https://github.com/zarr-developers/VirtualiZarr/pull/490#discussion_r2010717898).
242+
self, group: ManifestGroup, *, store_registry: ObjectStoreRegistry | None = None
209243
) -> None:
210-
"""Instantiate a new ManifestStore
244+
"""Instantiate a new ManifestStore.
211245
212246
Parameters
213247
----------
214248
manifest_group : ManifestGroup
215249
Manifest Group containing Group metadata and mapping variable names to ManifestArrays
216-
stores : dict[prefix, :class:`obstore.store.ObjectStore`]
217-
A mapping of url prefixes to obstore Store instances set up with the proper credentials.
218-
219-
The prefixes are matched to the URIs in the ManifestArrays to determine which store to
220-
use for making requests.
250+
store_registry : ObjectStoreRegistry
251+
A registry mapping the URL scheme and netloc to ObjectStore instances,
252+
allowing ManifestStores to read from different ObjectStore instances.
221253
"""
222-
for store in stores.values():
223-
if not store.__class__.__module__.startswith("obstore"):
224-
raise TypeError(f"expected ObjectStore class, got {store!r}")
225254

226255
# TODO: Don't allow stores with prefix
227256
if not isinstance(group, ManifestGroup):
228257
raise TypeError
229258

230259
super().__init__(read_only=True)
231-
self._stores = stores
260+
if store_registry is None:
261+
store_registry = ObjectStoreRegistry()
262+
self._store_registry = store_registry
232263
self._group = group
233264

234265
def __str__(self) -> str:
235-
return f"ManifestStore(group={self._group}, stores={self._stores})"
266+
return f"ManifestStore(group={self._group}, stores={self._store_registry})"
236267

237268
def __getstate__(self) -> dict[Any, Any]:
238269
state = self.__dict__.copy()
239-
stores = state["_stores"].copy()
270+
stores = state["_store_registry"]._stores.copy()
240271
for k, v in stores.items():
241272
stores[k] = pickle.dumps(v)
242-
state["_stores"] = stores
273+
state["_store_registry"] = stores
243274
return state
244275

245276
def __setstate__(self, state: dict[Any, Any]) -> None:
246-
stores = state["_stores"].copy()
277+
stores = state["_store_registry"].copy()
247278
for k, v in stores.items():
248279
stores[k] = pickle.loads(v)
249-
state["_stores"] = stores
280+
state["_store_registry"] = ObjectStoreRegistry(stores)
250281
self.__dict__.update(state)
251282

252283
async def get(
@@ -270,8 +301,10 @@ async def get(
270301
path = manifest._paths[*chunk_indexes]
271302
offset = manifest._offsets[*chunk_indexes]
272303
length = manifest._lengths[*chunk_indexes]
273-
# Get the configured object store instance that matches the path
274-
store_request = find_matching_store(stores=self._stores, request_key=path)
304+
# Get the configured object store instance that matches the path
305+
store = self._store_registry.get_store(path)
306+
# Truncate path to match Obstore expectations
307+
key = urlparse(path).path
275308
# Transform the input byte range to account for the chunk location in the file
276309
chunk_end_exclusive = offset + length
277310
byte_range = _transform_byte_range(
@@ -280,8 +313,8 @@ async def get(
280313
# Actually get the bytes
281314
try:
282315
bytes = await obs.get_range_async(
283-
store_request.store,
284-
store_request.key,
316+
store,
317+
key,
285318
start=byte_range.start,
286319
end=byte_range.end,
287320
)

virtualizarr/readers/hdf/hdf.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
ManifestStore,
2929
)
3030
from virtualizarr.manifests.manifest import validate_and_normalize_path_to_uri
31+
from virtualizarr.manifests.store import ObjectStoreRegistry, default_object_store
3132
from virtualizarr.manifests.utils import create_v3_array_metadata
3233
from virtualizarr.readers.api import VirtualBackend
3334
from virtualizarr.readers.hdf.filters import cfcodec_from_dataset, codecs_from_dataset
@@ -156,16 +157,18 @@ def _construct_manifest_group(
156157
def _create_manifest_store(
157158
filepath: str,
158159
*,
159-
prefix: str,
160-
store: ObjectStore,
160+
store: ObjectStore | None = None,
161161
group: str | None = None,
162162
) -> ManifestStore:
163163
# Create a group containing dataset level metadata and all the manifest arrays
164+
if not store:
165+
store = default_object_store(filepath) # type: ignore
164166
manifest_group = HDFVirtualBackend._construct_manifest_group(
165167
store=store, filepath=filepath, group=group
166168
)
169+
registry = ObjectStoreRegistry({filepath: store})
167170
# Convert to a manifest store
168-
return ManifestStore(stores={prefix: store}, group=manifest_group)
171+
return ManifestStore(store_registry=registry, group=manifest_group)
169172

170173
@staticmethod
171174
def open_virtual_dataset(

virtualizarr/tests/conftest.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import time
23

34
import pytest
@@ -37,7 +38,7 @@ def minio_bucket(container):
3738
# Setup with guidance from https://medium.com/@sant1/using-minio-with-docker-and-python-cbbad397cb5d
3839
from minio import Minio
3940

40-
bucket = "mybucket"
41+
bucket = "my-bucket"
4142
filename = "test.nc"
4243
# Initialize MinIO client
4344
client = Minio(
@@ -47,6 +48,28 @@ def minio_bucket(container):
4748
secure=False,
4849
)
4950
client.make_bucket(bucket)
51+
policy = {
52+
"Version": "2012-10-17",
53+
"Statement": [
54+
{
55+
"Effect": "Allow",
56+
"Principal": {"AWS": "*"},
57+
"Action": ["s3:GetBucketLocation", "s3:ListBucket"],
58+
"Resource": "arn:aws:s3:::my-bucket",
59+
},
60+
{
61+
"Effect": "Allow",
62+
"Principal": {"AWS": "*"},
63+
"Action": [
64+
"s3:GetObject",
65+
"s3:GetObjectRetention",
66+
"s3:GetObjectLegalHold",
67+
],
68+
"Resource": "arn:aws:s3:::my-bucket/*",
69+
},
70+
],
71+
}
72+
client.set_bucket_policy(bucket, json.dumps(policy))
5073
yield {
5174
"port": container["port"],
5275
"endpoint": container["endpoint"],

0 commit comments

Comments
 (0)