Skip to content

Commit 3453b21

Browse files
Merge branch 'main' into fix_mask_equal_2
2 parents f213da9 + b615868 commit 3453b21

22 files changed

Lines changed: 742 additions & 357 deletions

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ There are some (relatively obsolete) documents from our exploration of zarr inte
6969
## Storage types
7070

7171
PyActiveStorage is designed to interact with various storage backends.
72-
The storage backend is automatically detected, but can still be specified using the `storage_type` argument to the `Active` constructor.
72+
The storage backend is automatically detected, but can still be specified using the `interface_type` argument to the `Active` constructor.
7373
There are two main integration points for a storage backend:
7474

7575
#. Load netCDF metadata
@@ -78,7 +78,7 @@ There are two main integration points for a storage backend:
7878
### Local file
7979

8080
The default storage backend is a local file.
81-
To use a local file, use a `storage_type` of `None`, which is its default value.
81+
To use a local file, use a `interface_type` of `None`, which is its default value.
8282
netCDF metadata is loaded using the [netCDF4](https://pypi.org/project/netCDF4/) library.
8383
The chunk reductions are implemented in `activestorage.storage` using NumPy.
8484

@@ -87,7 +87,7 @@ The chunk reductions are implemented in `activestorage.storage` using NumPy.
8787
We now have support for Active runs with netCDF4 files on S3, from [PR 89](https://github.com/NCAS-CMS/PyActiveStorage/pull/89).
8888
To achieve this we integrate with [Reductionist](https://github.com/stackhpc/reductionist-rs), an S3 Active Storage Server.
8989
Reductionist is typically deployed "near" to an S3-compatible object store and provides an API to perform numerical reductions on object data.
90-
To use Reductionist, use a `storage_type` of `s3`.
90+
To use Reductionist, use a `interface_type` of `s3`.
9191

9292
To load metadata, netCDF files are opened using `s3fs`, with `h5netcdf` used to put the open file (which is nothing more than a memory view of the netCDF file) into an hdf5/netCDF-like object format.
9393
Chunk reductions are implemented in `activestorage.reductionist`, with each operation resulting in an API request to the Reductionist server.

activestorage/active.py

Lines changed: 98 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from pathlib import Path
77
from typing import Optional
88

9+
import aiohttp
910
import fsspec
1011
import numpy as np
1112
import pyfive
@@ -19,7 +20,7 @@
1920
from activestorage.storage import reduce_chunk, reduce_opens3_chunk
2021

2122

22-
def return_storage_type(uri):
23+
def return_interface_type(uri):
2324
"""
2425
Extract the gateway-protocol to infer what type of storage
2526
"""
@@ -83,14 +84,40 @@ def load_from_s3(uri, storage_options=None):
8384
return ds
8485

8586

86-
def load_from_https(uri):
87+
def get_endpoint_url(storage_options):
88+
"""
89+
Return the endpoint_url defined in storage_options, or `None` if not defined.
90+
"""
91+
if storage_options is not None:
92+
endpoint_url = storage_options.get('endpoint_url')
93+
if endpoint_url is not None:
94+
return endpoint_url
95+
client_kwargs = storage_options.get('client_kwargs')
96+
if client_kwargs:
97+
endpoint_url = client_kwargs.get('endpoint_url')
98+
if endpoint_url is not None:
99+
return endpoint_url
100+
101+
102+
def load_from_https(uri, storage_options=None):
87103
"""
88104
Load a pyfive.high_level.Dataset from a
89105
netCDF4 file on an https server (NGINX).
106+
This works for both http and https endpoints.
90107
"""
91-
# TODO need to test if NGINX server behind https://
92-
fs = fsspec.filesystem('http')
93-
http_file = fs.open(uri, 'rb')
108+
if storage_options is None:
109+
client_kwargs = {'auth': None}
110+
fs = fsspec.filesystem('http', **client_kwargs)
111+
http_file = fs.open(uri, 'rb')
112+
else:
113+
username = storage_options.get("username", None)
114+
password = storage_options.get("password", None)
115+
client_kwargs = {
116+
'auth': aiohttp.BasicAuth(username, password) if username and password else None
117+
}
118+
fs = fsspec.filesystem('http', **client_kwargs)
119+
http_file = fs.open(uri, 'rb')
120+
94121
ds = pyfive.File(http_file)
95122
print(f"Dataset loaded from https with Pyfive: {uri}")
96123
return ds
@@ -161,7 +188,7 @@ def __init__(self,
161188
dataset: Optional[str | Path | object],
162189
ncvar: str = None,
163190
axis: tuple = None,
164-
storage_type: str = None,
191+
interface_type: str = None,
165192
max_threads: int = 100,
166193
storage_options: dict = None,
167194
active_storage_url: str = None) -> None:
@@ -192,9 +219,9 @@ def __init__(self,
192219
self.ds = dataset
193220
self.uri = dataset
194221

195-
# determine the storage_type
222+
# determine the interface_type
196223
# based on what we have available
197-
if not storage_type:
224+
if not interface_type:
198225
if not input_variable:
199226
check_uri = self.uri
200227
else:
@@ -210,29 +237,29 @@ def __init__(self,
210237
else:
211238
check_uri = os.path.join(base_url,
212239
self.ds.id._filename)
213-
storage_type = return_storage_type(check_uri)
240+
interface_type = return_interface_type(check_uri)
214241

215-
# still allow for a passable storage_type
242+
# still allow for a passable interface_type
216243
# for special cases eg "special-POSIX" ie DDN
217-
if not storage_type and storage_options is not None:
218-
storage_type = urllib.parse.urlparse(dataset).scheme
219-
self.storage_type = storage_type
244+
if not interface_type and storage_options is not None:
245+
interface_type = urllib.parse.urlparse(dataset).scheme
246+
self.interface_type = interface_type
220247

221248
# set correct filename attr
222-
if input_variable and not self.storage_type:
249+
if input_variable and not self.interface_type:
223250
self.filename = self.ds
224-
elif input_variable and self.storage_type == "s3":
251+
elif input_variable and self.interface_type == "s3":
252+
self.filename = self.ds.id._filename
253+
elif input_variable and self.interface_type == "https":
225254
self.filename = self.ds.id._filename
226-
elif input_variable and self.storage_type == "https":
227-
self.filename = self.ds
228255

229256
# get storage_options
230257
self.storage_options = storage_options
231258
self.active_storage_url = active_storage_url
232259

233260
# basic check on file
234261
if not input_variable:
235-
if not os.path.isfile(self.uri) and not self.storage_type:
262+
if not os.path.isfile(self.uri) and not self.interface_type:
236263
raise ValueError(
237264
f"Must use existing file for uri. {self.uri} not found")
238265

@@ -268,14 +295,15 @@ def __load_nc_file(self):
268295
and `_filename` attribute.
269296
"""
270297
ncvar = self.ncvar
271-
if self.storage_type is None:
298+
if self.interface_type is None:
272299
nc = pyfive.File(self.uri)
273-
elif self.storage_type == "s3":
300+
elif self.interface_type == "s3":
274301
nc = load_from_s3(self.uri, self.storage_options)
275-
elif self.storage_type == "https":
276-
nc = load_from_https(self.uri)
302+
elif self.interface_type == "https":
303+
nc = load_from_https(self.uri, self.storage_options)
277304
self.filename = self.uri
278305
self.ds = nc[ncvar]
306+
print("Loaded dataset", self.ds)
279307

280308
def __get_missing_attributes(self):
281309
if self.ds is None:
@@ -366,19 +394,22 @@ def method(self, value):
366394

367395
self._method = value
368396

369-
@property
370-
def mean(self):
397+
def mean(self, axis=None):
371398
self._method = "mean"
399+
if axis is not None:
400+
self._axis = axis
372401
return self
373402

374-
@property
375-
def min(self):
403+
def min(self, axis=None):
376404
self._method = "min"
405+
if axis is not None:
406+
self._axis = axis
377407
return self
378408

379-
@property
380-
def max(self):
409+
def max(self, axis=None):
381410
self._method = "max"
411+
if axis is not None:
412+
self._axis = axis
382413
return self
383414

384415
@property
@@ -482,9 +513,13 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype,
482513
out = np.ma.empty(out_shape, dtype=out_dtype, order=ds._order)
483514

484515
# Create a shared session object.
485-
if self.storage_type == "s3" and self._version == 2:
516+
if self.interface_type == "s3" and self._version == 2:
486517
if self.storage_options is not None:
487518
key, secret = None, None
519+
if self.storage_options.get("anon", None) is True:
520+
print("Reductionist session for Anon S3 bucket.")
521+
session = reductionist.get_session(
522+
None, None, S3_ACTIVE_STORAGE_CACERT)
488523
if "key" in self.storage_options:
489524
key = self.storage_options["key"]
490525
if "secret" in self.storage_options:
@@ -499,6 +534,15 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype,
499534
session = reductionist.get_session(S3_ACCESS_KEY,
500535
S3_SECRET_KEY,
501536
S3_ACTIVE_STORAGE_CACERT)
537+
elif self.interface_type == "https" and self._version == 2:
538+
username, password = None, None
539+
if self.storage_options is not None:
540+
username = self.storage_options.get("username", None)
541+
password = self.storage_options.get("password", None)
542+
if username and password:
543+
session = reductionist.get_session(username, password, None)
544+
else:
545+
session = reductionist.get_session(None, None, None)
502546
else:
503547
session = None
504548

@@ -586,16 +630,9 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype,
586630

587631
def _get_endpoint_url(self):
588632
"""Return the endpoint_url of an S3 object store, or `None`"""
589-
endpoint_url = self.storage_options.get('endpoint_url')
633+
endpoint_url = get_endpoint_url(self.storage_options)
590634
if endpoint_url is not None:
591635
return endpoint_url
592-
593-
client_kwargs = self.storage_options.get('client_kwargs')
594-
if client_kwargs:
595-
endpoint_url = client_kwargs.get('endpoint_url')
596-
if endpoint_url is not None:
597-
return endpoint_url
598-
599636
return f"http://{urllib.parse.urlparse(self.filename).netloc}"
600637

601638
def _process_chunk(self,
@@ -624,8 +661,7 @@ def _process_chunk(self,
624661
# Axes over which to apply a reduction
625662
axis = self._axis
626663

627-
if self.storage_type == 's3' and self._version == 1:
628-
664+
if self.interface_type == 's3' and self._version == 1:
629665
tmp, count = reduce_opens3_chunk(ds._fh,
630666
offset,
631667
size,
@@ -639,11 +675,9 @@ def _process_chunk(self,
639675
axis=axis,
640676
method=self.method)
641677

642-
elif self.storage_type == "s3" and self._version == 2:
678+
elif self.interface_type == "s3" and self._version == 2:
643679
# S3: pass in pre-configured storage options (credentials)
644-
# print("S3 rfile is:", self.filename)
645680
parsed_url = urllib.parse.urlparse(self.filename)
646-
647681
bucket = parsed_url.netloc
648682
object = parsed_url.path
649683

@@ -652,17 +686,13 @@ def _process_chunk(self,
652686
if bucket == "":
653687
bucket = os.path.dirname(object)
654688
object = os.path.basename(object)
655-
# print("S3 bucket:", bucket)
656-
# print("S3 file:", object)
657689
if self.storage_options is None:
658690

659691
# for the moment we need to force ds.dtype to be a numpy type
660692
# Reductionist returns "count" as a list even for single elements
661693
tmp, count = reductionist.reduce_chunk(session,
662694
S3_ACTIVE_STORAGE_URL,
663-
S3_URL,
664-
bucket,
665-
object,
695+
f"{S3_URL}/{bucket}/{object}",
666696
offset,
667697
size,
668698
compressor,
@@ -675,22 +705,14 @@ def _process_chunk(self,
675705
axis,
676706
operation=self._method)
677707
else:
678-
# special case for "anon=True" buckets that work only with e.g.
679-
# fs = s3fs.S3FileSystem(anon=True, client_kwargs={'endpoint_url': S3_URL})
680-
# where file uri = bucketX/fileY.mc
681-
# print("S3 Storage options to Reductionist:", self.storage_options)
682708
if self.storage_options.get("anon", None) is True:
683-
bucket = os.path.dirname(parsed_url.path) # bucketX
684-
object = os.path.basename(parsed_url.path) # fileY
685-
print("S3 anon=True Bucket and File:", bucket, object)
686-
709+
bucket = os.path.dirname(parsed_url.path)
710+
object = os.path.basename(parsed_url.path)
687711
# Reductionist returns "count" as a list even for single elements
688712
tmp, count = reductionist.reduce_chunk(
689713
session,
690714
self.active_storage_url,
691-
self._get_endpoint_url(),
692-
bucket,
693-
object,
715+
f"{self._get_endpoint_url()}/{bucket}/{object}",
694716
offset,
695717
size,
696718
compressor,
@@ -702,40 +724,24 @@ def _process_chunk(self,
702724
chunk_selection,
703725
axis,
704726
operation=self._method)
705-
# this is for testing ONLY until Reductionist is able to handle https
706-
# located files; after that, we can pipe any regular https file through
707-
# to Reductionist, provided the https server is "closer" to Reductionist
708-
elif self.storage_type == "https" and self._version == 2:
709-
# build a simple session
710-
session = requests.Session()
711-
session.auth = (None, None)
712-
session.verify = False
713-
bucket = "https" # really doesn't matter
714-
715-
# note the extra "storage_type" kwarg
716-
# this currently makes Reductionist throw a wobbly
717-
# E activestorage.reductionist.ReductionistError: Reductionist error: HTTP 400: {"error": {"message": "request data is not valid", "caused_by": ["Failed to deserialize the JSON body into the target type", "storage_type: unknown field `storage_type`, expected one of `source`, `bucket`, `object`, `dtype`, `byte_order`, `offset`, `size`, `shape`, `order`, `selection`, `compression`, `filters`, `missing` at line 1 column 550"]}} # noqa
718-
719-
# Reductionist returns "count" as a list even for single elements
720-
tmp, count = reductionist.reduce_chunk(
721-
session,
722-
"https://reductionist.jasmin.ac.uk/", # Wacasoft
723-
self.filename,
724-
bucket,
725-
self.filename,
726-
offset,
727-
size,
728-
compressor,
729-
filters,
730-
self.missing,
731-
np.dtype(ds.dtype),
732-
chunks,
733-
ds._order,
734-
chunk_selection,
735-
axis,
736-
operation=self._method,
737-
storage_type="https")
738-
elif self.storage_type == 'ActivePosix' and self.version == 2:
727+
elif self.interface_type == "https" and self._version == 2:
728+
tmp, count = reductionist.reduce_chunk(session,
729+
self.active_storage_url,
730+
self.filename,
731+
offset,
732+
size,
733+
compressor,
734+
filters,
735+
self.missing,
736+
np.dtype(ds.dtype),
737+
chunks,
738+
ds._order,
739+
chunk_selection,
740+
axis,
741+
operation=self._method,
742+
interface_type="https")
743+
744+
elif self.interface_type == 'ActivePosix' and self.version == 2:
739745
# This is where the DDN Fuse and Infinia wrappers go
740746
raise NotImplementedError
741747
else:

0 commit comments

Comments
 (0)