Skip to content

Commit 3060a7a

Browse files
committed
Refactor sensor processors: rename from R2R-prefixed to instrument-model names
- Rename r2r_ctd.py → sbe911.py (SeaBird SBE-911+ CTD) - Rename r2r_fluorometer.py → eco_flntu.py (WETLabs ECO-FLNTU) - Rename r2r_winch.py → lci90i.py (LCI-90i winch) - Rename r2r_ssv.py → minisvs.py (Valeport miniSVS) - Add generic FileInfo and SensorInfo base types to processor_base.py - Make R2RFileInfo/R2RSensorInfo inherit from generic base types - Rename all corresponding test files - Update all imports across codebase - Add test_processor_registry.py for processor registration tests
1 parent 861bfab commit 3060a7a

34 files changed

Lines changed: 1955 additions & 1755 deletions

oceanstream/geotrack/processor.py

Lines changed: 512 additions & 391 deletions
Large diffs are not rendered by default.
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# Plan: Echodata Module Parity Audit & Remediation
2+
3+
**TL;DR**: Deep comparison of `oceanstream/echodata/` against the `saildrone-data/saildrone/` reference reveals **6 confirmed bugs**, **~8 functional gaps**, and several areas where oceanstream already exceeds the reference. Priority: fix bugs → close feature gaps → verify parity.
4+
5+
---
6+
7+
## Phase 1: Bug Fixes (Critical — blocks correctness)
8+
9+
**Bug 1 — Missing `_save_intermediate` method in `processor.py`**
10+
Called at ~3 locations but never defined. `save_intermediate=True` will raise `AttributeError`. Fix: implement the method or remove the calls.
11+
12+
**Bug 2 — Missing `_select_channel` in `seabed/detection.py`**
13+
`detect_seabed_blackwell` calls `_select_channel(ds, channel)` — function doesn't exist. The `"blackwell"` method is also missing from the `Literal` type hint. The entire blackwell path is broken.
14+
15+
**Bug 3 — Signature mismatch: `environment/geoparquet.py``environment/blended.py`**
16+
Caller passes `insitu_temp=, insitu_sal=, channel_ids=, target_depth_m=` but the callee expects `insitu_df=, channels=, target_depth=, frequency_hz=`. This path raises `TypeError`.
17+
18+
**Bug 4 — Unreachable return in `concat.py`**
19+
`return dt` after `return merged` in `merge_location_data` — dead code. Remove it.
20+
21+
**Bug 5 — Calibration key type inconsistency in `calibrate/calibration.py`**
22+
`validate_calibration_params` expects numeric frequency keys but loaders produce string keys like `"38kHz"` or `"38k_short"`. Validation will reject valid calibration data.
23+
24+
**Bug 6 — `detect_sonar_model` is a stub in `convert.py`**
25+
Always returns `"EK80"`. Non-critical but incorrect for non-EK80 data. Implement basic header detection.
26+
27+
---
28+
29+
## Phase 2: Feature Gaps (needed for saildrone-data parity)
30+
31+
| # | Gap | Reference (saildrone-data) | Status in oceanstream | Fix |
32+
|---|-----|---------------------------|----------------------|-----|
33+
| 1 | `depth_offset` not wired in main `compute_sv` | `sv_dataset.py` `compute_sv(depth_offset=0)` | Helper exists in `enrich_sv_dataset` but primary `compute_sv()` skips it | Add param and wire through |
34+
| 2 | No Pydantic denoise parameter models | `pydantic_models/denoise.py` — per-freq, per-pulse, 38kHz inheritance | Has `DenoiseConfig` dataclass + `FREQUENCY_PRESETS` but not Pydantic | Create Pydantic models with same per-freq/pulse-length schema |
35+
| 3 | No pulse-category splitting for concat/export | `export.py` / concat flow separates `short_pulse`/`long_pulse` | `concat.py` groups by day only | Add pulse-mode detection + category grouping |
36+
| 4 | No batch time-window grouping for concatenation | Prefect flow `_batch_key` day-window logic | Basic concat only | Add time-window batch utility |
37+
| 5 | No standalone `compute_and_save_nasc`/`mvbs` wrappers | `workflow.py` zarr-open→compute→save helpers | Has compute functions but no zarr-to-zarr wrappers | Add thin wrappers or ensure processor covers this |
38+
| 6 | No bathymetry gating for seabed step | `workflow.py` skips seabed detection if depth > instrument range | Processor always runs seabed detection | Add bathymetry check before seabed step |
39+
| 7 | No NASC DB persistence | `NASCPointService` PostgreSQL | Has NASC GeoParquet export instead | **Parity via different approach** — exclude |
40+
| 8 | No Azure IoT integration | `azure_iot/` module | Not present | **Out of scope** — edge deployment feature |
41+
42+
---
43+
44+
## Phase 3: Consistency & Quality Improvements
45+
46+
1. **Two incompatible ECS parsers** in `calibrate/calibration.py`: `_load_ecs_calibration` uses INI-style parsing; `parse_ecs_file` uses XML. Consolidate to one correct parser.
47+
2. **Calibration write inconsistency**: Generic `apply_calibration` and Saildrone-specific `calibrate_saildrone` write to echodata groups differently. Ensure consistency.
48+
3. **Test coverage**: Verify unit tests exist for all denoise algorithms, all seabed detection methods (including blackwell), calibration paths, and compute paths.
49+
50+
---
51+
52+
## What Oceanstream Already Has Beyond Saildrone-Data
53+
54+
Not gaps — these are advancements:
55+
- **Environment module** — Copernicus CDS integration, blended profiles, absorption/sound-speed equations (nothing equivalent in saildrone-data)
56+
- **STAC metadata** — Collection/item emission for echodata products
57+
- **NASC GeoParquet export** — Spatial Hive-partitioned output
58+
- **TOML config with frequency presets** — Centralized configuration
59+
- **Provider architecture** — Pluggable data source adapters
60+
- **Geotrack module** — Replaces and exceeds `process_gps.py`
61+
- **Composite/deltaSv seabed detection** — Additional algorithms beyond saildrone-data's main path
62+
63+
---
64+
65+
## Verification
66+
67+
1. After bugs: `make test-unit` + `ruff check . && mypy oceanstream` pass clean
68+
2. After gaps: Integration test running full pipeline (convert → calibrate → Sv → denoise → seabed → MVBS/NASC → echogram)
69+
3. Specific: `detect_seabed(method="blackwell")` runs without error; `.ecs` and `.xlsx` calibration pass validation; geoparquet→blended path completes; processor with `save_intermediate=True` writes Zarr checkpoints
70+
4. Cross-validation: Same denoise config on same test file in both projects, compare output masks
71+
72+
---
73+
74+
## Scope Decisions
75+
76+
- **In scope**: All 6 bugs, gaps 1-6, quality improvements
77+
- **Out of scope**: Azure IoT (edge feature), PostgreSQL NASC persistence (GeoParquet approach is better), Prefect orchestration (oceanstream uses class-based processor)
78+
- **Assumption**: Core denoise *algorithms* are at parity — gaps are in plumbing/config/integration
79+
- **Assumption**: Geotrack module adequately replaces `process_gps.py` — not re-audited separately
80+
81+
---
82+
83+
## Further Considerations
84+
85+
1. **Pydantic models location**: Should they live in `oceanstream/echodata/config.py` alongside existing dataclasses, or in a new `oceanstream/echodata/models.py`? Recommend new `models.py` to keep concerns separated.
86+
2. **Batch concatenation complexity**: The saildrone-data time-window grouping is tightly coupled to Prefect. Oceanstream could implement this as a pure library utility in `concat.py` that the processor class calls, making it usable outside any orchestration framework.
87+
3. **ECS parser consolidation**: ECS files from Simrad are INI-style (`.ini` variant). The XML parser path may have been added for a different calibration file format — need to verify before removing it.
Lines changed: 13 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -1,113 +1,13 @@
1-
from __future__ import annotations
2-
3-
from dataclasses import dataclass, field
4-
from pathlib import Path
5-
from typing import Dict, Optional
6-
7-
8-
def _parse_simple_kv_file(path: Path) -> Dict[str, str]:
9-
"""Parse a simple `key: value` text file into a dictionary."""
10-
11-
text = path.read_text(encoding="utf-8")
12-
result: Dict[str, str] = {}
13-
for line in text.splitlines():
14-
stripped = line.strip()
15-
if not stripped or ":" not in stripped:
16-
continue
17-
key, value = stripped.split(":", 1)
18-
result[key.strip()] = value.strip()
19-
return result
20-
21-
22-
@dataclass
23-
class R2RFileInfo:
24-
"""Structured representation of `file-info.txt` contents."""
25-
26-
campaign_id: Optional[str] = None
27-
cruise_id: Optional[str] = None
28-
platform: Optional[str] = None
29-
start_time: Optional[str] = None
30-
end_time: Optional[str] = None
31-
extra: Dict[str, str] = field(default_factory=dict)
32-
33-
34-
@dataclass
35-
class R2RSensorInfo:
36-
"""Structured representation of `bag-info.txt` contents."""
37-
38-
sensor_type: Optional[str] = None
39-
sensor_id: Optional[str] = None
40-
description: Optional[str] = None
41-
extra: Dict[str, str] = field(default_factory=dict)
42-
43-
44-
def parse_file_info(path: Path) -> R2RFileInfo:
45-
"""Parse an R2R `file-info.txt` file into :class:`R2RFileInfo`."""
46-
47-
if not path.exists():
48-
raise FileNotFoundError(path)
49-
50-
raw = _parse_simple_kv_file(path)
51-
52-
def pop_first(keys: list[str]) -> Optional[str]:
53-
for key in keys:
54-
if key in raw:
55-
return raw.pop(key)
56-
return None
57-
58-
campaign_id = pop_first(["Campaign", "campaign_id", "campaign"])
59-
cruise_id = pop_first(["Cruise", "cruise_id"])
60-
platform = pop_first(["Platform", "Ship", "Vessel"])
61-
start_time = pop_first(["Start time", "StartTime", "start_time"])
62-
end_time = pop_first(["End time", "EndTime", "end_time"])
63-
64-
return R2RFileInfo(
65-
campaign_id=campaign_id,
66-
cruise_id=cruise_id,
67-
platform=platform,
68-
start_time=start_time,
69-
end_time=end_time,
70-
extra=raw,
71-
)
72-
73-
74-
def parse_bag_info(path: Path) -> R2RSensorInfo:
75-
"""Parse an R2R `bag-info.txt` file into :class:`R2RSensorInfo`."""
76-
77-
if not path.exists():
78-
raise FileNotFoundError(path)
79-
80-
raw = _parse_simple_kv_file(path)
81-
82-
def pop_first(keys: list[str]) -> Optional[str]:
83-
for key in keys:
84-
if key in raw:
85-
return raw.pop(key)
86-
return None
87-
88-
sensor_type = pop_first([
89-
"R2R-DeviceType", # R2R standard field for device type
90-
"Sensor Type",
91-
"sensor_type",
92-
"Instrument",
93-
"instrument",
94-
])
95-
sensor_id = pop_first([
96-
"R2R-DeviceModel", # R2R standard field for device model
97-
"Sensor ID",
98-
"sensor_id",
99-
"SerialNumber",
100-
"serial_number",
101-
])
102-
description = pop_first([
103-
"Internal-Sender-Description", # R2R description field
104-
"Description",
105-
"description",
106-
])
107-
108-
return R2RSensorInfo(
109-
sensor_type=sensor_type,
110-
sensor_id=sensor_id,
111-
description=description,
112-
extra=raw,
113-
)
1+
"""R2R metadata — re-exported from canonical location.
2+
3+
All classes and functions are defined in :mod:`oceanstream.providers.r2r_metadata`.
4+
This module re-exports them for backward-compatible imports.
5+
"""
6+
7+
from oceanstream.providers.r2r_metadata import ( # noqa: F401
8+
R2RFileInfo,
9+
R2RSensorInfo,
10+
parse_bag_info,
11+
parse_file_info,
12+
)
13+
from oceanstream.sensors.processor_base import FileInfo, SensorInfo # noqa: F401

0 commit comments

Comments
 (0)