Skip to content

Commit f213d66

Browse files
Merge pull request #300 from NCAS-CMS/axis_api
Change `method: statistic` from Property to Method to allow eg `active.mean(axis=(0, 1))` and instriduce comprehensive testing of Reductionist axis implementation
2 parents 2850213 + 8933eef commit f213d66

13 files changed

Lines changed: 521 additions & 182 deletions

activestorage/active.py

Lines changed: 76 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from pathlib import Path
77
from typing import Optional
88

9+
import aiohttp
910
import fsspec
1011
import numpy as np
1112
import pyfive
@@ -83,14 +84,40 @@ def load_from_s3(uri, storage_options=None):
8384
return ds
8485

8586

86-
def load_from_https(uri):
87+
def get_endpoint_url(storage_options):
88+
"""
89+
Return the endpoint_url defined in storage_options, or `None` if not defined.
90+
"""
91+
if storage_options is not None:
92+
endpoint_url = storage_options.get('endpoint_url')
93+
if endpoint_url is not None:
94+
return endpoint_url
95+
client_kwargs = storage_options.get('client_kwargs')
96+
if client_kwargs:
97+
endpoint_url = client_kwargs.get('endpoint_url')
98+
if endpoint_url is not None:
99+
return endpoint_url
100+
101+
102+
def load_from_https(uri, storage_options=None):
87103
"""
88104
Load a pyfive.high_level.Dataset from a
89105
netCDF4 file on an https server (NGINX).
106+
This works for both http and https endpoints.
90107
"""
91-
# TODO need to test if NGINX server behind https://
92-
fs = fsspec.filesystem('http')
93-
http_file = fs.open(uri, 'rb')
108+
if storage_options is None:
109+
client_kwargs = {'auth': None}
110+
fs = fsspec.filesystem('http', **client_kwargs)
111+
http_file = fs.open(uri, 'rb')
112+
else:
113+
username = storage_options.get("username", None)
114+
password = storage_options.get("password", None)
115+
client_kwargs = {
116+
'auth': aiohttp.BasicAuth(username, password) if username and password else None
117+
}
118+
fs = fsspec.filesystem('http', **client_kwargs)
119+
http_file = fs.open(uri, 'rb')
120+
94121
ds = pyfive.File(http_file)
95122
print(f"Dataset loaded from https with Pyfive: {uri}")
96123
return ds
@@ -272,9 +299,10 @@ def __load_nc_file(self):
272299
elif self.storage_type == "s3":
273300
nc = load_from_s3(self.uri, self.storage_options)
274301
elif self.storage_type == "https":
275-
nc = load_from_https(self.uri)
302+
nc = load_from_https(self.uri, self.storage_options)
276303
self.filename = self.uri
277304
self.ds = nc[ncvar]
305+
print("Loaded dataset", self.ds)
278306

279307
def __get_missing_attributes(self):
280308
if self.ds is None:
@@ -365,19 +393,22 @@ def method(self, value):
365393

366394
self._method = value
367395

368-
@property
369-
def mean(self):
396+
def mean(self, axis=None):
370397
self._method = "mean"
398+
if axis is not None:
399+
self._axis = axis
371400
return self
372401

373-
@property
374-
def min(self):
402+
def min(self, axis=None):
375403
self._method = "min"
404+
if axis is not None:
405+
self._axis = axis
376406
return self
377407

378-
@property
379-
def max(self):
408+
def max(self, axis=None):
380409
self._method = "max"
410+
if axis is not None:
411+
self._axis = axis
381412
return self
382413

383414
@property
@@ -484,6 +515,10 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype,
484515
if self.storage_type == "s3" and self._version == 2:
485516
if self.storage_options is not None:
486517
key, secret = None, None
518+
if self.storage_options.get("anon", None) is True:
519+
print("Reductionist session for Anon S3 bucket.")
520+
session = reductionist.get_session(
521+
None, None, S3_ACTIVE_STORAGE_CACERT)
487522
if "key" in self.storage_options:
488523
key = self.storage_options["key"]
489524
if "secret" in self.storage_options:
@@ -498,6 +533,15 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype,
498533
session = reductionist.get_session(S3_ACCESS_KEY,
499534
S3_SECRET_KEY,
500535
S3_ACTIVE_STORAGE_CACERT)
536+
elif self.storage_type == "https" and self._version == 2:
537+
username, password = None, None
538+
if self.storage_options is not None:
539+
username = self.storage_options.get("username", None)
540+
password = self.storage_options.get("password", None)
541+
if username and password:
542+
session = reductionist.get_session(username, password, None)
543+
else:
544+
session = reductionist.get_session(None, None, None)
501545
else:
502546
session = None
503547

@@ -585,16 +629,9 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype,
585629

586630
def _get_endpoint_url(self):
587631
"""Return the endpoint_url of an S3 object store, or `None`"""
588-
endpoint_url = self.storage_options.get('endpoint_url')
632+
endpoint_url = get_endpoint_url(self.storage_options)
589633
if endpoint_url is not None:
590634
return endpoint_url
591-
592-
client_kwargs = self.storage_options.get('client_kwargs')
593-
if client_kwargs:
594-
endpoint_url = client_kwargs.get('endpoint_url')
595-
if endpoint_url is not None:
596-
return endpoint_url
597-
598635
return f"http://{urllib.parse.urlparse(self.filename).netloc}"
599636

600637
def _process_chunk(self,
@@ -624,7 +661,6 @@ def _process_chunk(self,
624661
axis = self._axis
625662

626663
if self.storage_type == 's3' and self._version == 1:
627-
628664
tmp, count = reduce_opens3_chunk(ds._fh,
629665
offset,
630666
size,
@@ -640,9 +676,7 @@ def _process_chunk(self,
640676

641677
elif self.storage_type == "s3" and self._version == 2:
642678
# S3: pass in pre-configured storage options (credentials)
643-
# print("S3 rfile is:", self.filename)
644679
parsed_url = urllib.parse.urlparse(self.filename)
645-
646680
bucket = parsed_url.netloc
647681
object = parsed_url.path
648682

@@ -651,17 +685,13 @@ def _process_chunk(self,
651685
if bucket == "":
652686
bucket = os.path.dirname(object)
653687
object = os.path.basename(object)
654-
# print("S3 bucket:", bucket)
655-
# print("S3 file:", object)
656688
if self.storage_options is None:
657689

658690
# for the moment we need to force ds.dtype to be a numpy type
659691
# Reductionist returns "count" as a list even for single elements
660692
tmp, count = reductionist.reduce_chunk(session,
661693
S3_ACTIVE_STORAGE_URL,
662-
S3_URL,
663-
bucket,
664-
object,
694+
f"{S3_URL}/{bucket}/{object}",
665695
offset,
666696
size,
667697
compressor,
@@ -674,22 +704,14 @@ def _process_chunk(self,
674704
axis,
675705
operation=self._method)
676706
else:
677-
# special case for "anon=True" buckets that work only with e.g.
678-
# fs = s3fs.S3FileSystem(anon=True, client_kwargs={'endpoint_url': S3_URL})
679-
# where file uri = bucketX/fileY.mc
680-
# print("S3 Storage options to Reductionist:", self.storage_options)
681707
if self.storage_options.get("anon", None) is True:
682-
bucket = os.path.dirname(parsed_url.path) # bucketX
683-
object = os.path.basename(parsed_url.path) # fileY
684-
print("S3 anon=True Bucket and File:", bucket, object)
685-
708+
bucket = os.path.dirname(parsed_url.path)
709+
object = os.path.basename(parsed_url.path)
686710
# Reductionist returns "count" as a list even for single elements
687711
tmp, count = reductionist.reduce_chunk(
688712
session,
689713
self.active_storage_url,
690-
self._get_endpoint_url(),
691-
bucket,
692-
object,
714+
f"{self._get_endpoint_url()}/{bucket}/{object}",
693715
offset,
694716
size,
695717
compressor,
@@ -701,39 +723,23 @@ def _process_chunk(self,
701723
chunk_selection,
702724
axis,
703725
operation=self._method)
704-
# this is for testing ONLY until Reductionist is able to handle https
705-
# located files; after that, we can pipe any regular https file through
706-
# to Reductionist, provided the https server is "closer" to Reductionist
707726
elif self.storage_type == "https" and self._version == 2:
708-
# build a simple session
709-
session = requests.Session()
710-
session.auth = (None, None)
711-
session.verify = False
712-
bucket = "https" # really doesn't matter
713-
714-
# note the extra "storage_type" kwarg
715-
# this currently makes Reductionist throw a wobbly
716-
# E activestorage.reductionist.ReductionistError: Reductionist error: HTTP 400: {"error": {"message": "request data is not valid", "caused_by": ["Failed to deserialize the JSON body into the target type", "storage_type: unknown field `storage_type`, expected one of `source`, `bucket`, `object`, `dtype`, `byte_order`, `offset`, `size`, `shape`, `order`, `selection`, `compression`, `filters`, `missing` at line 1 column 550"]}} # noqa
717-
718-
# Reductionist returns "count" as a list even for single elements
719-
tmp, count = reductionist.reduce_chunk(
720-
session,
721-
"https://reductionist.jasmin.ac.uk/", # Wacasoft
722-
self.filename,
723-
bucket,
724-
self.filename,
725-
offset,
726-
size,
727-
compressor,
728-
filters,
729-
self.missing,
730-
np.dtype(ds.dtype),
731-
chunks,
732-
ds._order,
733-
chunk_selection,
734-
axis,
735-
operation=self._method,
736-
storage_type="https")
727+
tmp, count = reductionist.reduce_chunk(session,
728+
self.active_storage_url,
729+
f"{self.uri}",
730+
offset,
731+
size,
732+
compressor,
733+
filters,
734+
self.missing,
735+
np.dtype(ds.dtype),
736+
chunks,
737+
ds._order,
738+
chunk_selection,
739+
axis,
740+
operation=self._method,
741+
storage_type="https")
742+
737743
elif self.storage_type == 'ActivePosix' and self.version == 2:
738744
# This is where the DDN Fuse and Infinia wrappers go
739745
raise NotImplementedError

activestorage/reductionist.py

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Reductionist S3 Active Storage server storage interface module."""
22

3+
import cbor2 as cbor
34
import collections.abc
45
import http.client
56
import json
@@ -10,7 +11,6 @@
1011
import numpy as np
1112
import requests
1213

13-
REDUCTIONIST_AXIS_READY = False
1414

1515
DEBUG = 0
1616

@@ -24,16 +24,19 @@ def get_session(username: str, password: str,
2424
:returns: a client session object.
2525
"""
2626
session = requests.Session()
27+
# TODO Stack-HPC
28+
# we need to allow Anon buckets. though this
29+
# will break connection to data server
30+
# if username is None and password is None:
31+
# return session
2732
session.auth = (username, password)
2833
session.verify = cacert or False
2934
return session
3035

3136

3237
def reduce_chunk(session,
3338
server,
34-
source,
35-
bucket,
36-
object,
39+
url,
3740
offset,
3841
size,
3942
compression,
@@ -50,9 +53,7 @@ def reduce_chunk(session,
5053
5154
:param server: Reductionist server URL
5255
:param cacert: Reductionist CA certificate path
53-
:param source: S3 URL
54-
:param bucket: S3 bucket
55-
:param object: S3 object
56+
:param url: object URL
5657
:param offset: offset of data in object
5758
:param size: size of data in object
5859
:param compression: optional `numcodecs.abc.Codec` compression codec
@@ -74,9 +75,7 @@ def reduce_chunk(session,
7475
:raises ReductionistError: if the request to Reductionist fails
7576
"""
7677

77-
request_data = build_request_data(source,
78-
bucket,
79-
object,
78+
request_data = build_request_data(url,
8079
offset,
8180
size,
8281
compression,
@@ -91,7 +90,7 @@ def reduce_chunk(session,
9190
if DEBUG:
9291
print(f"Reductionist request data dictionary: {request_data}")
9392
api_operation = "sum" if operation == "mean" else operation or "select"
94-
url = f'{server}/v1/{api_operation}/'
93+
url = f'{server}/v2/{api_operation}/'
9594
response = request(session, url, request_data)
9695

9796
if response.ok:
@@ -174,9 +173,7 @@ def encode_missing(missing):
174173
assert False, "Expected missing values not found"
175174

176175

177-
def build_request_data(source: str,
178-
bucket: str,
179-
object: str,
176+
def build_request_data(url: str,
180177
offset: int,
181178
size: int,
182179
compression,
@@ -190,15 +187,13 @@ def build_request_data(source: str,
190187
storage_type=None) -> dict:
191188
"""Build request data for Reductionist API."""
192189
request_data = {
193-
'source': source,
194-
'bucket': bucket,
195-
'object': object,
190+
'interface_type': storage_type if storage_type else "s3",
191+
'url': url,
196192
'dtype': dtype.name,
197193
'byte_order': encode_byte_order(dtype),
198194
'offset': int(offset),
199195
'size': int(size),
200196
'order': order,
201-
'storage_type': storage_type,
202197
}
203198
if shape:
204199
request_data["shape"] = shape
@@ -214,11 +209,8 @@ def build_request_data(source: str,
214209
if any(missing):
215210
request_data["missing"] = encode_missing(missing)
216211

217-
if REDUCTIONIST_AXIS_READY:
212+
if axis is not None:
218213
request_data['axis'] = axis
219-
elif axis is not None and len(axis) != len(shape):
220-
raise ValueError(
221-
"Can't reduce over axis subset unitl reductionist is ready")
222214

223215
return {k: v for k, v in request_data.items() if v is not None}
224216

@@ -234,15 +226,16 @@ def request(session: requests.Session, url: str, request_data: dict):
234226

235227
def decode_result(response):
236228
"""Decode a successful response, return as a 2-tuple of (numpy array or scalar, count)."""
237-
dtype = response.headers['x-activestorage-dtype']
238-
shape = json.loads(response.headers['x-activestorage-shape'])
229+
reduction_result = cbor.loads(response.content)
230+
dtype = reduction_result['dtype']
231+
shape = reduction_result['shape'] if "shape" in reduction_result else None
239232

240233
# Result
241-
result = np.frombuffer(response.content, dtype=dtype)
234+
result = np.frombuffer(reduction_result['bytes'], dtype=dtype)
242235
result = result.reshape(shape)
243236

244237
# Counts
245-
count = json.loads(response.headers['x-activestorage-count'])
238+
count = reduction_result['count']
246239
# TODO: When reductionist is ready, we need to fix 'count'
247240

248241
# Mask the result

environment.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ channels:
77
dependencies:
88
- python >=3.10
99
- pyfive >=0.5.0 # earliest support for advanced Pyfive
10+
- cbor2
1011
- fsspec
1112
- h5netcdf
1213
- netcdf4

0 commit comments

Comments
 (0)