Skip to content

Commit fa865f8

Browse files
committed
echodata: fix bugs and close feature gaps from parity audit
Bugs fixed: - Add _save_intermediate() to EchodataProcessor for Zarr snapshots - Add _select_channel() with partial matching; include 'blackwell' in Literal - Fix geoparquet→blended signature mismatch (insitu_df, channels, etc.) - Remove unreachable 'return dt' in concat.merge_location_data - Allow string keys in validate_calibration_params (e.g. '38kHz') - Implement detect_sonar_model with binary header detection (CON0/XML0) Feature gaps closed: - Wire depth_offset parameter through compute_sv() - Add Pydantic denoise parameter models (models.py) - Add pulse-category splitting (detect_pulse_category, group_by_pulse_category) - Add time-window batch grouping (batch_key, group_by_time_window) Quality: - Consolidate ECS parser: INI-first with XML fallback - Export new models from echodata __init__ - Update test_calibrate for string key acceptance
1 parent 3060a7a commit fa865f8

10 files changed

Lines changed: 583 additions & 40 deletions

File tree

oceanstream/echodata/__init__.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,14 @@
104104
"DenoiseConfig",
105105
"MVBSConfig",
106106
"NASCConfig",
107+
# Pydantic denoise parameter models
108+
"MaskImpulseNoise",
109+
"TransientNoiseMask",
110+
"RemoveBackgroundNoise",
111+
"MaskAttenuatedSignal",
112+
"MVBSComputeOptions",
113+
"NASCComputeOptions",
114+
"fill_missing_frequency_params",
107115
# Consolidate (depth computation)
108116
"add_depth_to_sv",
109117
"choose_depth_flags",
@@ -161,6 +169,27 @@ def __getattr__(name: str):
161169
)
162170
return locals()[name]
163171

172+
# Pydantic denoise parameter models
173+
if name in (
174+
"MaskImpulseNoise",
175+
"TransientNoiseMask",
176+
"RemoveBackgroundNoise",
177+
"MaskAttenuatedSignal",
178+
"MVBSComputeOptions",
179+
"NASCComputeOptions",
180+
"fill_missing_frequency_params",
181+
):
182+
from oceanstream.echodata.models import (
183+
MaskImpulseNoise,
184+
TransientNoiseMask,
185+
RemoveBackgroundNoise,
186+
MaskAttenuatedSignal,
187+
MVBSComputeOptions,
188+
NASCComputeOptions,
189+
fill_missing_frequency_params,
190+
)
191+
return locals()[name]
192+
164193
# Convert functions
165194
if name in ("convert_raw_files", "convert_raw_file"):
166195
from oceanstream.echodata.convert import convert_raw_files, convert_raw_file

oceanstream/echodata/calibrate/calibration.py

Lines changed: 69 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -186,19 +186,23 @@ def validate_calibration_params(params: dict) -> bool:
186186
Validate calibration parameters dictionary.
187187
188188
Args:
189-
params: Dictionary of calibration parameters by frequency
189+
params: Dictionary of calibration parameters by frequency.
190+
Keys may be numeric (Hz) or string labels (e.g. "38kHz",
191+
"38k_short").
190192
191193
Returns:
192194
True if valid
193195
194196
Raises:
195-
ValueError: If invalid frequency type
197+
ValueError: If parameter values are wrong
196198
TypeError: If parameter types are wrong
197199
"""
198200
for freq_key, values in params.items():
199-
# Frequency keys should be numeric (int)
200-
if not isinstance(freq_key, (int, float)):
201-
raise TypeError(f"Frequency key must be numeric, got {type(freq_key)}")
201+
# Accept both numeric keys (int/float Hz) and string labels
202+
if not isinstance(freq_key, (int, float, str)):
203+
raise TypeError(
204+
f"Frequency key must be numeric or string, got {type(freq_key)}"
205+
)
202206

203207
# Values should be a dict
204208
if not isinstance(values, dict):
@@ -208,34 +212,75 @@ def validate_calibration_params(params: dict) -> bool:
208212

209213

210214
def parse_ecs_file(ecs_file: Path) -> dict[int, dict]:
211-
"""
212-
Parse Simrad ECS calibration file format.
213-
215+
"""Parse Simrad ECS calibration file format.
216+
217+
ECS files are INI-style text files with ``[ChannelN]`` sections.
218+
This parser reads them using :mod:`configparser` and returns
219+
calibration values keyed by frequency in Hz.
220+
221+
Falls back to XML parsing if INI parsing yields no sections (some
222+
third-party tools export calibration as XML).
223+
214224
Args:
215225
ecs_file: Path to .ecs file
216-
226+
217227
Returns:
218228
Dictionary of calibration values keyed by frequency (Hz)
219229
"""
230+
import configparser
231+
232+
config = configparser.ConfigParser()
233+
try:
234+
config.read(ecs_file)
235+
except configparser.Error:
236+
# Not INI format — fall through to XML fallback below
237+
config = configparser.ConfigParser()
238+
239+
params: dict[int, dict] = {}
240+
for section in config.sections():
241+
if section.startswith("Channel"):
242+
freq = config.getfloat(section, "Frequency", fallback=0)
243+
freq_hz = int(freq) if freq >= 1000 else int(freq * 1000)
244+
params[freq_hz] = {
245+
"gain": config.getfloat(section, "Gain", fallback=0),
246+
"sa_correction": config.getfloat(section, "SaCorrection", fallback=0),
247+
"beamwidth_alongship": config.getfloat(
248+
section, "BeamWidthAlongship", fallback=0
249+
),
250+
"beamwidth_athwartship": config.getfloat(
251+
section, "BeamWidthAthwartship", fallback=0
252+
),
253+
"angle_offset_alongship": config.getfloat(
254+
section, "AngleOffsetAlongship", fallback=0
255+
),
256+
"angle_offset_athwartship": config.getfloat(
257+
section, "AngleOffsetAthwartship", fallback=0
258+
),
259+
}
260+
261+
if params:
262+
return params
263+
264+
# Fallback: try XML parsing for non-standard calibration files
220265
import xml.etree.ElementTree as ET
221-
222-
tree = ET.parse(ecs_file)
266+
267+
try:
268+
tree = ET.parse(ecs_file)
269+
except ET.ParseError:
270+
logger.warning(f"ECS file {ecs_file} is neither valid INI nor XML")
271+
return {}
272+
223273
root = tree.getroot()
224-
225-
params = {}
226-
227274
for cal in root.findall(".//Calibration"):
228275
freq = int(cal.get("Frequency", 0))
229-
params[freq] = {}
230-
231-
gain_elem = cal.find("Gain")
232-
if gain_elem is not None:
233-
params[freq]["gain"] = float(gain_elem.text)
234-
235-
sa_elem = cal.find("SaCorrection")
236-
if sa_elem is not None:
237-
params[freq]["sa_correction"] = float(sa_elem.text)
238-
276+
entry: dict[str, float] = {}
277+
for tag in ("Gain", "SaCorrection"):
278+
elem = cal.find(tag)
279+
if elem is not None and elem.text:
280+
entry[tag[0].lower() + tag[1:]] = float(elem.text)
281+
if entry:
282+
params[freq] = entry
283+
239284
return params
240285

241286

oceanstream/echodata/compute/sv.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ def compute_sv(
2424
use_dask: bool = True,
2525
add_depth: bool = True,
2626
add_location: bool = True,
27+
depth_offset: float = 0.0,
2728
waveform_mode: str = "CW",
2829
encode_mode: str = "complex",
2930
) -> "xr.Dataset":
@@ -40,6 +41,9 @@ def compute_sv(
4041
use_dask: Enable Dask for large files
4142
add_depth: Add depth coordinate to output
4243
add_location: Add lat/lon coordinates to output
44+
depth_offset: Transducer depth offset in metres (e.g. depth below
45+
waterline). Passed to echopype ``add_depth`` when platform
46+
metadata doesn't already provide it.
4347
waveform_mode: Waveform mode for EK80 ('CW' for narrowband, 'BB' for broadband)
4448
encode_mode: Encode mode for EK80 ('complex' or 'power')
4549
@@ -90,10 +94,15 @@ def compute_sv_task(echodata_path: Path) -> xr.Dataset:
9094
logger.info("Computing Sv")
9195
ds_Sv = ep.calibrate.compute_Sv(echodata, **compute_kwargs)
9296

93-
# Add depth coordinate
97+
# Add depth coordinate using platform-aware flag selection
9498
if add_depth:
9599
logger.info("Adding depth coordinate")
96-
ds_Sv = ep.consolidate.add_depth(ds_Sv)
100+
try:
101+
flags = _choose_depth_flags(echodata, depth_offset=depth_offset)
102+
ds_Sv = ep.consolidate.add_depth(ds_Sv, echodata, **flags)
103+
except (KeyError, ValueError, TypeError) as e:
104+
logger.warning(f"Depth-flag add_depth failed ({e}), using simple add_depth")
105+
ds_Sv = ep.consolidate.add_depth(ds_Sv)
97106

98107
# Add location coordinates
99108
if add_location:

oceanstream/echodata/concat.py

Lines changed: 142 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,19 @@
22
33
Groups raw files by UTC date and concatenates them for daily processing,
44
enabling efficient denoising and MVBS/NASC computation over 24-hour periods.
5+
6+
Includes utilities for:
7+
- Pulse-category splitting (short_pulse / long_pulse) based on frequency
8+
combinations in processed Zarr stores.
9+
- Time-window batch grouping for multi-day concatenation windows.
510
"""
611

712
from __future__ import annotations
813

914
import logging
1015
import re
1116
from collections import defaultdict
12-
from datetime import datetime
17+
from datetime import datetime, timedelta
1318
from pathlib import Path
1419
from typing import TYPE_CHECKING, Optional
1520

@@ -394,4 +399,139 @@ def merge_location_data(ds: "xr.Dataset", location_data: list[dict]) -> "xr.Data
394399
merged = merged.reset_coords("time", drop=True)
395400

396401
return merged
397-
return dt
402+
403+
404+
# ============================================================================
405+
# Pulse-category splitting utilities
406+
# ============================================================================
407+
408+
# Well-known pulse categories for Saildrone EK80.
409+
# Key = friendly name, value = comma-joined sorted frequency_nominal strings.
410+
PULSE_CATEGORY_CONFIG: dict[str, dict[str, Optional[str]]] = {
411+
"short_pulse": {"freq_key": "38000.0,200000.0"},
412+
"long_pulse": {"freq_key": "38000.0"},
413+
"exported_ds": {"freq_key": None}, # catch-all
414+
}
415+
416+
417+
def detect_pulse_category(ds: "xr.Dataset") -> str:
418+
"""Classify a Sv dataset into a pulse category.
419+
420+
Classification is based on the sorted frequency_nominal values present
421+
in the ``channel`` dimension, matching the Saildrone EK80 convention:
422+
423+
- ``"short_pulse"`` → 38 kHz + 200 kHz (dual-frequency, short CW pulse)
424+
- ``"long_pulse"`` → 38 kHz only (single-frequency, long CW pulse)
425+
- ``"exported_ds"`` → anything else
426+
427+
Args:
428+
ds: Sv xarray.Dataset with a ``frequency_nominal`` coordinate or
429+
variable.
430+
431+
Returns:
432+
One of ``"short_pulse"``, ``"long_pulse"``, or ``"exported_ds"``.
433+
"""
434+
import numpy as np
435+
436+
if "frequency_nominal" in ds:
437+
freqs = np.sort(
438+
np.unique(ds["frequency_nominal"].values.astype(float))
439+
)
440+
elif "channel" in ds.dims:
441+
freqs = np.sort(
442+
np.unique(ds["channel"].values.astype(float))
443+
)
444+
else:
445+
return "exported_ds"
446+
447+
freq_str = ",".join(f"{f:.1f}" for f in freqs)
448+
449+
for category, cfg in PULSE_CATEGORY_CONFIG.items():
450+
if cfg["freq_key"] is None or freq_str == cfg["freq_key"]:
451+
return category
452+
453+
return "exported_ds"
454+
455+
456+
def group_by_pulse_category(
457+
paths: list[Path],
458+
) -> dict[str, list[Path]]:
459+
"""Group Zarr store paths by pulse category.
460+
461+
Opens each Zarr lazily to read ``frequency_nominal`` and assigns the
462+
file to a pulse category.
463+
464+
Args:
465+
paths: List of Sv Zarr store paths.
466+
467+
Returns:
468+
``{category: [path, ...]}`` mapping.
469+
"""
470+
import xarray as xr
471+
472+
groups: dict[str, list[Path]] = defaultdict(list)
473+
for p in paths:
474+
try:
475+
ds = xr.open_zarr(p)
476+
cat = detect_pulse_category(ds)
477+
except Exception:
478+
logger.warning(f"Could not classify {p}, assigning to exported_ds")
479+
cat = "exported_ds"
480+
groups[cat].append(p)
481+
return dict(groups)
482+
483+
484+
# ============================================================================
485+
# Time-window batch grouping
486+
# ============================================================================
487+
488+
def batch_key(
489+
ts: datetime,
490+
window_days: int = 1,
491+
) -> str:
492+
"""Return a filename-safe key that anchors *ts* to a fixed time window.
493+
494+
Args:
495+
ts: Timestamp (usually a file start time).
496+
window_days: Width of the batching window in days.
497+
498+
Returns:
499+
``"YYYY-MM-DD"`` for single-day windows, or
500+
``"YYYY-MM-DD_to_YYYY-MM-DD"`` for multi-day windows.
501+
502+
Examples:
503+
>>> batch_key(datetime(2023, 8, 10), 1)
504+
'2023-08-10'
505+
>>> batch_key(datetime(2023, 8, 10), 3)
506+
'2023-08-09_to_2023-08-11'
507+
"""
508+
anchor = datetime(ts.year, ts.month, ts.day)
509+
510+
if window_days <= 1:
511+
return f"{anchor:%Y-%m-%d}"
512+
513+
# Floor to start of rolling window
514+
anchor -= timedelta(days=(anchor - datetime.min).days % window_days)
515+
end = anchor + timedelta(days=window_days - 1)
516+
return f"{anchor:%Y-%m-%d}_to_{end:%Y-%m-%d}"
517+
518+
519+
def group_by_time_window(
520+
files: list[tuple[Path, datetime]],
521+
window_days: int = 1,
522+
) -> dict[str, list[Path]]:
523+
"""Group files into time-window batches.
524+
525+
Args:
526+
files: List of ``(path, start_time)`` tuples.
527+
window_days: Width of each batch window in days.
528+
529+
Returns:
530+
``{batch_key_str: [path, ...]}`` mapping, sorted by key.
531+
"""
532+
groups: dict[str, list[Path]] = defaultdict(list)
533+
for path, ts in files:
534+
key = batch_key(ts, window_days)
535+
groups[key].append(path)
536+
537+
return dict(sorted(groups.items()))

oceanstream/echodata/convert.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -221,12 +221,27 @@ def detect_sonar_model(raw_file: Path) -> str:
221221
"EK80" or "EK60" based on file contents
222222
223223
Note:
224-
Currently returns "EK80" as default since Saildrone uses EK80.
225-
Future: implement actual detection based on file header.
224+
Detection reads the first datagram header.
225+
Falls back to "EK80" if the header is unrecognised.
226226
"""
227-
# TODO: Implement actual detection
228-
# For now, default to EK80 (Saildrone standard)
229-
logger.debug(f"Auto-detecting sonar model for {raw_file.name}, defaulting to EK80")
227+
# Simrad raw files start with a datagram whose 4-byte type field
228+
# identifies the instrument:
229+
# EK60 / ES60 → b'CON0' (little-endian ASCII at offset 4)
230+
# EK80 / ES80 → b'XML0'
231+
try:
232+
with open(raw_file, "rb") as fh:
233+
# Skip the 4-byte datagram length and read the 4-byte type.
234+
fh.read(4) # datagram length
235+
tag = fh.read(4)
236+
if tag == b"CON0":
237+
logger.debug(f"{raw_file.name}: detected EK60 (CON0 header)")
238+
return "EK60"
239+
if tag == b"XML0":
240+
logger.debug(f"{raw_file.name}: detected EK80 (XML0 header)")
241+
return "EK80"
242+
logger.debug(f"{raw_file.name}: unrecognised header {tag!r}, defaulting to EK80")
243+
except OSError:
244+
logger.debug(f"Cannot read {raw_file.name}, defaulting to EK80")
230245
return "EK80"
231246

232247

0 commit comments

Comments
 (0)