Skip to content

Commit 43ba30d

Browse files
Refactor S3 handling in GeoZarr conversion: update create_s3_store to return S3 path and introduce get_s3_storage_options for configuration. Enhance tests for path handling and storage options retrieval.
1 parent 32027c0 commit 43ba30d

3 files changed

Lines changed: 64 additions & 39 deletions

File tree

eopf_geozarr/conversion/geozarr.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -282,14 +282,15 @@ def recursive_copy(
282282
# Handle S3 vs local paths for zarr operations
283283
group_path = f"{output_path}/{group_prefix}"
284284
if s3_utils.is_s3_path(output_path):
285-
# For S3, use the S3 store
286-
store = s3_utils.create_s3_store(group_path)
285+
# For S3, use storage_options
286+
storage_options = s3_utils.get_s3_storage_options(group_path)
287287
ds.to_zarr(
288-
store,
288+
group_path,
289289
mode="w" if no_children else "a", # Write if no children, append otherwise
290290
consolidated=is_dataset, # Consolidate metadata if it's a dataset
291291
zarr_format=3,
292292
encoding=encoding,
293+
storage_options=storage_options,
293294
)
294295
else:
295296
ds.to_zarr(
@@ -1165,15 +1166,16 @@ def write_dataset_band_by_band_with_validation(
11651166

11661167
# Handle S3 vs local paths for zarr operations
11671168
if s3_utils.is_s3_path(output_path):
1168-
# For S3, use the S3 store
1169-
store = s3_utils.create_s3_store(output_path)
1169+
# For S3, use storage_options
1170+
storage_options = s3_utils.get_s3_storage_options(output_path)
11701171
single_var_ds.to_zarr(
1171-
store,
1172+
output_path,
11721173
mode=mode,
11731174
consolidated=False,
11741175
zarr_format=3,
11751176
encoding=var_encoding,
11761177
align_chunks=True,
1178+
storage_options=storage_options,
11771179
)
11781180
else:
11791181
single_var_ds.to_zarr(
@@ -1225,14 +1227,15 @@ def write_dataset_band_by_band_with_validation(
12251227
try:
12261228
# Handle S3 vs local paths for zarr operations
12271229
if s3_utils.is_s3_path(output_path):
1228-
# For S3, use the S3 store
1229-
store = s3_utils.create_s3_store(output_path)
1230+
# For S3, use storage_options
1231+
storage_options = s3_utils.get_s3_storage_options(output_path)
12301232
single_var_ds.to_zarr(
1231-
store,
1233+
output_path,
12321234
mode="a", # Always append for grid_mapping variables
12331235
consolidated=False,
12341236
zarr_format=3,
12351237
encoding=var_encoding,
1238+
storage_options=storage_options,
12361239
)
12371240
else:
12381241
single_var_ds.to_zarr(

eopf_geozarr/conversion/s3_utils.py

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ def parse_s3_path(s3_path: str) -> tuple[str, str]:
5151
return bucket, key
5252

5353

54-
def create_s3_store(s3_path: str, **s3_kwargs) -> FsspecStore:
54+
def get_s3_storage_options(s3_path: str, **s3_kwargs) -> Dict[str, Any]:
5555
"""
56-
Create an S3 store for Zarr operations.
56+
Get storage options for S3 access with xarray.
5757
5858
Parameters
5959
----------
@@ -64,8 +64,8 @@ def create_s3_store(s3_path: str, **s3_kwargs) -> FsspecStore:
6464
6565
Returns
6666
-------
67-
FsspecStore
68-
Zarr store backed by S3
67+
Dict[str, Any]
68+
Storage options dictionary for xarray
6969
"""
7070
# Set up default S3 configuration
7171
default_s3_kwargs = {
@@ -84,17 +84,30 @@ def create_s3_store(s3_path: str, **s3_kwargs) -> FsspecStore:
8484
# Merge with user-provided kwargs
8585
s3_config = {**default_s3_kwargs, **s3_kwargs}
8686

87-
# Create S3 filesystem
88-
fs = s3fs.S3FileSystem(**s3_config)
89-
90-
# Remove the s3:// scheme from the path for FsspecStore
91-
bucket, key = parse_s3_path(s3_path)
92-
path_without_scheme = f"{bucket}/{key}" if key else bucket
93-
94-
# Create FsspecStore
95-
store = FsspecStore(fs=fs, path=path_without_scheme)
87+
return s3_config
88+
89+
90+
def create_s3_store(s3_path: str, **s3_kwargs) -> str:
91+
"""
92+
Create an S3 path with storage options for Zarr operations.
9693
97-
return store
94+
This function now returns the S3 path directly, to be used with
95+
xarray's storage_options parameter instead of creating a store.
96+
97+
Parameters
98+
----------
99+
s3_path : str
100+
S3 path in format s3://bucket/key
101+
**s3_kwargs
102+
Additional keyword arguments for s3fs.S3FileSystem
103+
104+
Returns
105+
-------
106+
str
107+
S3 path to be used with storage_options
108+
"""
109+
# Just return the S3 path - storage options will be handled separately
110+
return s3_path
98111

99112

100113
def write_s3_json_metadata(s3_path: str, metadata: Dict[str, Any], **s3_kwargs) -> None:
@@ -117,6 +130,7 @@ def write_s3_json_metadata(s3_path: str, metadata: Dict[str, Any], **s3_kwargs)
117130
default_s3_kwargs = {
118131
"anon": False,
119132
"use_ssl": True,
133+
"asynchronous": False, # Force synchronous mode
120134
"client_kwargs": {
121135
"region_name": os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
122136
}
@@ -155,6 +169,7 @@ def s3_path_exists(s3_path: str, **s3_kwargs) -> bool:
155169
default_s3_kwargs = {
156170
"anon": False,
157171
"use_ssl": True,
172+
"asynchronous": False, # Force synchronous mode
158173
"client_kwargs": {
159174
"region_name": os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
160175
}
@@ -234,6 +249,7 @@ def validate_s3_access(s3_path: str, **s3_kwargs) -> tuple[bool, Optional[str]]:
234249
default_s3_kwargs = {
235250
"anon": False,
236251
"use_ssl": True,
252+
"asynchronous": False, # Force synchronous mode
237253
"client_kwargs": {
238254
"region_name": os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
239255
}

eopf_geozarr/tests/test_s3_utils.py

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
get_s3_credentials_info,
1010
validate_s3_access,
1111
create_s3_store,
12+
get_s3_storage_options,
1213
)
1314

1415

@@ -77,22 +78,27 @@ def test_validate_s3_access_failure(mock_s3fs):
7778
assert "Access denied" in error
7879

7980

80-
@patch('eopf_geozarr.conversion.s3_utils.s3fs.S3FileSystem')
81-
@patch('eopf_geozarr.conversion.s3_utils.FsspecStore')
82-
def test_create_s3_store_path_handling(mock_fsspec_store, mock_s3fs):
83-
"""Test that create_s3_store correctly handles S3 path schemes."""
84-
mock_fs = Mock()
85-
mock_s3fs.return_value = mock_fs
86-
mock_store = Mock()
87-
mock_fsspec_store.return_value = mock_store
88-
81+
def test_create_s3_store_path_handling():
82+
"""Test that create_s3_store returns the S3 path correctly."""
8983
# Test with S3 path
90-
store = create_s3_store("s3://test-bucket/path/to/data")
91-
92-
# Verify that FsspecStore was called with path without scheme
93-
mock_fsspec_store.assert_called_once_with(fs=mock_fs, path="test-bucket/path/to/data")
84+
result = create_s3_store("s3://test-bucket/path/to/data")
85+
assert result == "s3://test-bucket/path/to/data"
9486

9587
# Test with bucket only
96-
mock_fsspec_store.reset_mock()
97-
store = create_s3_store("s3://test-bucket")
98-
mock_fsspec_store.assert_called_with(fs=mock_fs, path="test-bucket")
88+
result = create_s3_store("s3://test-bucket")
89+
assert result == "s3://test-bucket"
90+
91+
92+
def test_get_s3_storage_options():
93+
"""Test that get_s3_storage_options returns correct configuration."""
94+
with patch.dict('os.environ', {
95+
'AWS_DEFAULT_REGION': 'us-west-2',
96+
'AWS_S3_ENDPOINT': 'https://s3.example.com'
97+
}):
98+
options = get_s3_storage_options("s3://test-bucket/path")
99+
100+
assert options['anon'] is False
101+
assert options['use_ssl'] is True
102+
assert options['client_kwargs']['region_name'] == 'us-west-2'
103+
assert options['endpoint_url'] == 'https://s3.example.com'
104+
assert options['client_kwargs']['endpoint_url'] == 'https://s3.example.com'

0 commit comments

Comments
 (0)