Skip to content

Commit 961a3ae

Browse files
committed
fix: zarr v3 compat for Azure storage + update storage tests
- echodata/storage: remove FSStore type hint, branch on zarr v2/v3 for get_azure_zarr_store (FSStore vs fsspec FSMap), add save_dataset_to_azure helper, make open_sv_from_azure accept direct zarr_path, handle missing consolidated metadata in zarr v3 - echodata/calibrate/saildrone: pending calibration updates - echodata/compute/nasc_export: pending compute updates - tests: update TestGetAzureZarrStore to dynamically test whichever zarr code path is active (v2 FSStore or v3 get_mapper)
1 parent 93affa8 commit 961a3ae

4 files changed

Lines changed: 126 additions & 40 deletions

File tree

oceanstream/echodata/calibrate/saildrone.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,22 @@ def detect_pulse_mode(echodata: "EchoData", atol: float = 5e-6) -> list[str]:
145145
Raises:
146146
ValueError: If a channel has mixed pulse durations
147147
"""
148-
td = echodata["Sonar/Beam_group1"].transmit_duration_nominal # (ping_time, channel)
148+
td = echodata["Sonar/Beam_group1"].transmit_duration_nominal
149+
# Dims may be (channel, ping_time) in echopype 0.11.x or
150+
# (ping_time, channel) in older versions.
149151

150152
# Get first ping values as baseline
151153
first = td.isel(ping_time=0).values.astype(float)
154+
155+
# Ensure td_vals is (channel, ping_time) for iteration
152156
td_vals = td.values.astype(float)
157+
if td.dims[0] != "channel" and "channel" in td.dims:
158+
# (ping_time, channel) → transpose to (channel, ping_time)
159+
td_vals = td_vals.T
153160

154-
# Check for consistency across pings
155-
for ch, col in enumerate(td_vals.T):
161+
# Check for consistency across pings per channel
162+
for ch in range(len(first)):
163+
col = td_vals[ch] # all pings for this channel
156164
if not np.allclose(col, first[ch], atol=atol):
157165
uniq = np.unique(np.round(col, 6))
158166
if len(uniq) == 1:

oceanstream/echodata/compute/nasc_export.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,13 @@ def export_nasc_to_geoparquet(
8080
# Write Hive-partitioned GeoParquet
8181
partition_cols = ["campaign_id", "lat_bin", "lon_bin"]
8282

83-
gdf.to_parquet(
84-
output_dir / "nasc",
85-
engine="pyarrow",
86-
partition_cols=partition_cols,
83+
import pyarrow.parquet as pq
84+
85+
table = gdf.to_arrow()
86+
pq.write_to_dataset(
87+
table,
88+
root_path=str(output_dir / "nasc"),
89+
partitioning=partition_cols,
8790
existing_data_behavior="overwrite_or_ignore",
8891
)
8992

oceanstream/echodata/storage.py

Lines changed: 76 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ def get_azure_zarr_store(
139139
container: Optional[str] = None,
140140
mode: str = "w",
141141
connection_string: str | None = None,
142-
) -> "zarr.storage.FSStore":
142+
):
143143
"""Get a Zarr store backed by Azure Blob Storage.
144144
145145
Args:
@@ -149,7 +149,7 @@ def get_azure_zarr_store(
149149
connection_string: Explicit connection string.
150150
151151
Returns:
152-
zarr.storage.FSStore configured for Azure
152+
Zarr store configured for Azure (FSStore for zarr v2, FSMap for v3)
153153
154154
Example:
155155
store = get_azure_zarr_store("echodata/test/data.zarr")
@@ -163,7 +163,12 @@ def get_azure_zarr_store(
163163
fs = get_azure_filesystem(connection_string)
164164
full_path = f"{container}/{path}"
165165

166-
return zarr.storage.FSStore(full_path, fs=fs, mode=mode)
166+
# zarr v2 has FSStore, zarr v3 removed it
167+
if hasattr(zarr.storage, "FSStore"):
168+
return zarr.storage.FSStore(full_path, fs=fs, mode=mode)
169+
else:
170+
# zarr v3: use fsspec.FSMap which xarray's to_zarr() accepts
171+
return fs.get_mapper(full_path)
167172

168173

169174
def get_zarr_store_uri(
@@ -315,6 +320,40 @@ def save_product_to_azure(
315320
return get_zarr_store_uri(path, container, connection_string=connection_string)
316321

317322

323+
def save_dataset_to_azure(
324+
dataset: "xr.Dataset",
325+
zarr_path: str,
326+
container: Optional[str] = None,
327+
connection_string: str | None = None,
328+
) -> str:
329+
"""Save an xarray Dataset to an arbitrary zarr path on Azure Blob Storage.
330+
331+
This is a generic helper for callers that manage their own path layout
332+
(e.g. ``cruise_id/days/2023-01-01_Sv.zarr``). For campaign-structured
333+
storage prefer :func:`save_sv_to_azure` or :func:`save_product_to_azure`.
334+
335+
Args:
336+
dataset: xarray Dataset to save
337+
zarr_path: Path inside the container (e.g. "HB2302/file_Sv.zarr")
338+
container: Azure container (default: from credentials)
339+
340+
Returns:
341+
Azure URI of saved zarr store
342+
"""
343+
store = get_azure_zarr_store(
344+
zarr_path, container=container, connection_string=connection_string,
345+
)
346+
347+
logger.info(f"Saving dataset to Azure: {zarr_path}")
348+
import xarray as xr_mod
349+
if isinstance(dataset, xr_mod.Dataset):
350+
from oceanstream.echodata.utils.encoding import fix_chunking
351+
dataset = fix_chunking(dataset)
352+
dataset.to_zarr(store, mode="w")
353+
354+
return get_zarr_store_uri(zarr_path, container, connection_string=connection_string)
355+
356+
318357
def open_echodata_from_azure(
319358
campaign_id: str,
320359
filename: str,
@@ -346,26 +385,42 @@ def open_echodata_from_azure(
346385

347386

348387
def open_sv_from_azure(
349-
campaign_id: str,
350-
filename: str,
388+
campaign_id: str | None = None,
389+
filename: str | None = None,
351390
container: Optional[str] = None,
352391
chunks: Optional[dict] = None,
353392
connection_string: str | None = None,
393+
*,
394+
zarr_path: str | None = None,
354395
) -> "xr.Dataset":
355396
"""Open Sv dataset from Azure Blob Storage.
356-
397+
398+
Can be called in two ways:
399+
# Structured (campaign-based):
400+
open_sv_from_azure(campaign_id="HB2302", filename="file1")
401+
402+
# Direct path:
403+
open_sv_from_azure(zarr_path="HB2302/file1_Sv.zarr", container="processed")
404+
357405
Args:
358-
campaign_id: Campaign identifier
359-
filename: Base filename
360-
container: Azure container
406+
campaign_id: Campaign identifier (used with *filename*)
407+
filename: Base filename (used with *campaign_id*)
408+
container: Azure container (default: from credentials)
361409
chunks: Dask chunking for lazy loading
362-
410+
zarr_path: Direct path to the zarr store inside the container.
411+
When provided, *campaign_id* and *filename* are ignored.
412+
363413
Returns:
364414
xarray Dataset with Sv data
365415
"""
366416
import xarray as xr
367-
368-
path = build_echodata_path(campaign_id, f"{filename}_Sv", stage="calibrated")
417+
418+
if zarr_path is not None:
419+
path = zarr_path
420+
elif campaign_id is not None and filename is not None:
421+
path = build_echodata_path(campaign_id, f"{filename}_Sv", stage="calibrated")
422+
else:
423+
raise ValueError("Provide either zarr_path or both campaign_id and filename.")
369424

370425
conn_str, default_container = get_azure_credentials(connection_string)
371426
container = container or default_container
@@ -374,10 +429,16 @@ def open_sv_from_azure(
374429
logger.info(f"Opening Sv from Azure: {full_path}")
375430

376431
storage_options = {"connection_string": conn_str}
377-
432+
433+
open_kw: dict = {"storage_options": storage_options}
378434
if chunks:
379-
return xr.open_zarr(full_path, chunks=chunks, storage_options=storage_options)
380-
return xr.open_zarr(full_path, storage_options=storage_options)
435+
open_kw["chunks"] = chunks
436+
437+
ds = xr.open_zarr(full_path, **open_kw)
438+
if not ds.data_vars:
439+
# Zarr v3 stores lack consolidated metadata — retry without it
440+
ds = xr.open_zarr(full_path, consolidated=False, **open_kw)
441+
return ds
381442

382443

383444
def list_campaign_data(

oceanstream/tests/unit/echodata/test_storage.py

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -167,41 +167,55 @@ class TestGetAzureZarrStore:
167167

168168
@patch.dict("os.environ", {"AZURE_CONNECTION_STRING": "test-conn"}, clear=True)
169169
@patch("oceanstream.echodata.storage.get_azure_filesystem")
170-
@patch("zarr.storage.FSStore")
171-
def test_get_zarr_store(self, mock_fsstore, mock_get_fs):
170+
def test_get_zarr_store(self, mock_get_fs):
172171
"""Test getting a Zarr store for Azure."""
172+
import zarr
173173
from oceanstream.echodata.storage import get_azure_zarr_store
174174

175175
mock_fs = MagicMock()
176176
mock_get_fs.return_value = mock_fs
177-
mock_store = MagicMock()
178-
mock_fsstore.return_value = mock_store
179177

180-
store = get_azure_zarr_store("echodata/test/file.zarr")
181-
182-
mock_fsstore.assert_called_once_with(
183-
"oceanstream-data/echodata/test/file.zarr", fs=mock_fs, mode="w"
184-
)
185-
assert store == mock_store
178+
if hasattr(zarr.storage, "FSStore"):
179+
mock_store = MagicMock()
180+
with patch("zarr.storage.FSStore", return_value=mock_store) as mock_fsstore:
181+
store = get_azure_zarr_store("echodata/test/file.zarr")
182+
mock_fsstore.assert_called_once_with(
183+
"oceanstream-data/echodata/test/file.zarr", fs=mock_fs, mode="w"
184+
)
185+
assert store == mock_store
186+
else:
187+
mock_mapper = MagicMock()
188+
mock_fs.get_mapper.return_value = mock_mapper
189+
store = get_azure_zarr_store("echodata/test/file.zarr")
190+
mock_fs.get_mapper.assert_called_once_with(
191+
"oceanstream-data/echodata/test/file.zarr"
192+
)
193+
assert store == mock_mapper
186194

187195
@patch.dict(
188196
"os.environ",
189197
{"AZURE_CONNECTION_STRING": "conn", "AZURE_CONTAINER_NAME": "custom"},
190198
clear=True,
191199
)
192200
@patch("oceanstream.echodata.storage.get_azure_filesystem")
193-
@patch("zarr.storage.FSStore")
194-
def test_get_zarr_store_custom_container(self, mock_fsstore, mock_get_fs):
201+
def test_get_zarr_store_custom_container(self, mock_get_fs):
195202
"""Test getting Zarr store with custom container."""
203+
import zarr
196204
from oceanstream.echodata.storage import get_azure_zarr_store
197205

198-
mock_get_fs.return_value = MagicMock()
199-
mock_fsstore.return_value = MagicMock()
200-
201-
get_azure_zarr_store("path/file.zarr", container="override")
206+
mock_fs = MagicMock()
207+
mock_get_fs.return_value = mock_fs
202208

203-
args = mock_fsstore.call_args
204-
assert args[0][0] == "override/path/file.zarr"
209+
if hasattr(zarr.storage, "FSStore"):
210+
with patch("zarr.storage.FSStore", return_value=MagicMock()) as mock_fsstore:
211+
get_azure_zarr_store("path/file.zarr", container="override")
212+
args = mock_fsstore.call_args
213+
assert args[0][0] == "override/path/file.zarr"
214+
else:
215+
mock_fs.get_mapper.return_value = MagicMock()
216+
get_azure_zarr_store("path/file.zarr", container="override")
217+
args = mock_fs.get_mapper.call_args
218+
assert args[0][0] == "override/path/file.zarr"
205219

206220

207221
class TestSaveEchodataToAzure:

0 commit comments

Comments
 (0)