Skip to content

Commit 19cd921

Browse files
committed
fix: validate CTD numeric fields before sending to echosounder
- Coerce temperature/salinity/pressure/depth/sound_speed to float before including in ctdOutput payload - Skip corrupt values (e.g. '1.!60') instead of forwarding as strings - Add ADCP file patterns to watch_patterns config - README updates for CTD monitoring and config docs
1 parent 6f74d1e commit 19cd921

6 files changed

Lines changed: 145 additions & 20 deletions

File tree

Dockerfile.Linux.arm64

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
1010
# Install oceanstream from git
1111
ARG OCEANSTREAM_BRANCH=main
1212
ARG OCEANSTREAM_REPO=https://github.com/OceanStreamIO/oceanstream-cli.git
13+
ARG OCEANSTREAM_CACHEBUST=1
1314
RUN git clone --depth 1 -b ${OCEANSTREAM_BRANCH} ${OCEANSTREAM_REPO} /tmp/oceanstream && \
1415
pip install --no-cache-dir "/tmp/oceanstream[geotrack,adcp]" && \
1516
rm -rf /tmp/oceanstream

README.md

Lines changed: 90 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Sensor data (TCP/UDP stream, file drop, IoT Edge message)
2020
| Sea-Bird CNV | `.cnv` | pandas | Processed CTD with header metadata |
2121
| Sea-Bird HEX | `.hex` + `.hdr` + `.XMLCON` | seabirdscientific + gsw | Raw CTD frequency → T/C/P/S/depth |
2222
| RDI ADCP | `.raw` | dolfyn | Beam→earth transform, ensemble averaging, u/v/w velocities |
23+
| Nortek AD2CP | `.ad2cp` | oceanstream | Echosounder Sv (volume backscatter) and/or velocity data |
2324
| tar.gz | `.tar.gz`, `.tgz` | tarfile | Extracts and processes contained files |
2425

2526
## Quick Start
@@ -84,20 +85,89 @@ Configuration is driven by IoT Hub module twin desired properties. All `EdgeConf
8485

8586
## Configuration
8687

87-
All config flows through the `EdgeConfig` dataclass in `config.py`:
88+
All config flows through the `EdgeConfig` dataclass in `config.py`. In IoT Edge mode, every field is readable/writable via module twin desired properties (changes apply live). In standalone mode, fields come from environment variables and CLI args.
8889

89-
| Field | Default | Description |
90-
|-------|---------|-------------|
91-
| `input_mode` | `both` | `stream`, `file`, or `both` |
92-
| `stream_format` | `auto` | `nmea`, `csv`, `hex`, or `auto` |
93-
| `stream_port` | `9100` | TCP/UDP listen port |
94-
| `watch_dir` | `/data/sensor` | Directory to watch for new files |
95-
| `batch_interval_seconds` | `60` | Stream batch flush interval |
96-
| `telemetry_downsample_seconds` | `30` | Min interval between D2C messages |
97-
| `storage_backend` | `azure-blob-edge` | `azure-blob-edge` or `local` |
98-
| `output_base_path` | `/app/processed` | Output root directory |
90+
### Input Mode
9991

100-
See `config.py` for the full list. Standalone mode uses env vars and CLI args; IoT Edge mode uses twin desired properties.
92+
| Field | Type | Default | Description |
93+
|-------|------|---------|-------------|
94+
| `input_mode` | `stream` \| `file` \| `both` | `both` | Which ingest sources to activate |
95+
96+
### Network Stream
97+
98+
| Field | Type | Default | Description |
99+
|-------|------|---------|-------------|
100+
| `stream_protocol` | `tcp` \| `udp` \| `auto` | `auto` | Network protocol for stream listener |
101+
| `stream_host` | string | `0.0.0.0` | Bind address (server mode) or remote host (client mode) |
102+
| `stream_port` | int | `9100` | Listen port (server) or connect port (client) |
103+
| `stream_format` | `nmea` \| `csv` \| `hex` \| `auto` | `auto` | Expected data format on the stream |
104+
| `stream_connect_mode` | `server` \| `client` | `server` | TCP server (listen) or client (connect to remote) |
105+
106+
### File Watcher
107+
108+
| Field | Type | Default | Description |
109+
|-------|------|---------|-------------|
110+
| `watch_dir` | string | `/data/sensor` | Directory to watch for new files |
111+
| `watch_patterns` | string | `*.csv,*.txt,*.hex,*.cnv,*.raw,*.ad2cp,*.tar.gz` | Comma-separated glob patterns |
112+
| `watch_polling` | bool | `false` | Use polling instead of inotify (required for SMB/NFS mounts) |
113+
| `watch_poll_interval` | int | `2` | Seconds between polls when `watch_polling` is true |
114+
| `backfill_minutes` | int | `0` | On startup, queue files modified within the last N minutes. `0` = skip all existing files (only process new arrivals). Set to e.g. `60` to reprocess the last hour of data after a restart |
115+
116+
### Batching
117+
118+
| Field | Type | Default | Description |
119+
|-------|------|---------|-------------|
120+
| `batch_interval_seconds` | int | `60` | Stream batch flush interval (seconds) |
121+
| `batch_max_records` | int | `1000` | Stream batch flush when this many records buffered |
122+
123+
### Metadata
124+
125+
| Field | Type | Default | Description |
126+
|-------|------|---------|-------------|
127+
| `campaign_id` | string | `""` | Campaign identifier — used as blob container name and output path prefix |
128+
| `platform_id` | string | `""` | Platform/vessel identifier |
129+
| `platform_name` | string | `""` | Human-readable platform name |
130+
| `provider` | string | `auto` | Oceanstream data provider for enrichment (`auto`, `generic`, or a named provider) |
131+
132+
### Telemetry
133+
134+
| Field | Type | Default | Description |
135+
|-------|------|---------|-------------|
136+
| `telemetry_interval_seconds` | int | `300` | Periodic telemetry summary interval |
137+
| `telemetry_send_records` | bool | `true` | Send individual sensor records to IoT Hub |
138+
| `telemetry_send_summaries` | bool | `true` | Send periodic summary messages to IoT Hub |
139+
| `telemetry_downsample_seconds` | int | `30` | Minimum interval between D2C telemetry messages |
140+
141+
### CTD File Monitor
142+
143+
| Field | Type | Default | Description |
144+
|-------|------|---------|-------------|
145+
| `ctd_enabled` | bool | `false` | Enable polling a CTD file for latest readings |
146+
| `ctd_file_path` | string | `/mnt/ctd/latest_ctd.csv` | Path to the CTD CSV file (updated in place by logger) |
147+
| `ctd_poll_interval_seconds` | int | `30` | How often to read the CTD file |
148+
| `ctd_observatory` | string | `munkholmen` | Observatory name for CTD provider enrichment |
149+
150+
### Storage
151+
152+
| Field | Type | Default | Description |
153+
|-------|------|---------|-------------|
154+
| `storage_backend` | `azure-blob-edge` \| `local` | `azure-blob-edge` | Output storage backend |
155+
| `output_base_path` | string | `/app/processed` | Root path for local storage backend |
156+
| `processed_container` | string | `sensordata` | Subfolder within the campaign container for processed output |
157+
158+
### Logging
159+
160+
| Field | Type | Default | Description |
161+
|-------|------|---------|-------------|
162+
| `log_level` | string | `INFO` | Logging level (`DEBUG`, `INFO`, `WARNING`, `ERROR`) |
163+
164+
### Startup Behaviour
165+
166+
By default (`backfill_minutes: 0`), the module does **not** process existing files when it starts. It only processes files that arrive after startup via the file watcher, stream listener, or IoT Edge messages. This prevents reprocessing the entire dataset on every container restart.
167+
168+
To backfill recent data after a restart, set `backfill_minutes` to the desired window (e.g. `60` for the last hour). Only files whose modification time falls within that window are queued. This is useful when the module was down briefly and you want to catch up on missed files.
169+
170+
See `config.py` for implementation details.
101171

102172
## Connecting Sensors
103173

@@ -203,18 +273,23 @@ Test data is stored in Azure Blob Storage (`sensorstream-test` container) and do
203273

204274
**Output**: D2C telemetry to IoT Hub (rate-limited by `telemetry_downsample_seconds`), GeoParquet + metadata JSON to blob storage.
205275

206-
**Twin**: All config fields are readable/writable via module twin. Changes are applied live without restart.
276+
**Twin**: All config fields are readable/writable via module twin desired properties. Changes are applied live without restart. Twin property names map 1:1 to config field names, except `Log_Level``log_level`.
277+
278+
**Backfill on restart**: By default the module skips existing files. Set `backfill_minutes` in the twin to process recent files after a restart (e.g. `60` for the last hour).
207279

208280
## Environment Variables
209281

210282
| Variable | Default | Description |
211283
|----------|---------|-------------|
212-
| `AZURE_CONNECTION_STRING` || Storage connection string (tests, E2E) |
284+
| `AZURE_STORAGE_CONNECTION_STRING` || Azure Blob Storage connection string (edge blob or cloud) |
213285
| `STORAGE_BACKEND` | `azure-blob-edge` | `local` for standalone |
214286
| `OUTPUT_BASE_PATH` | `/app/processed` | Output directory |
287+
| `PROCESSED_CONTAINER_NAME` | `sensordata` | Subfolder name within campaign container |
215288
| `WATCH_DIR` | `/data/sensor` | File watch directory |
289+
| `STREAM_HOST` | `0.0.0.0` | Stream listener bind address |
216290
| `STREAM_PORT` | `9100` | Stream listener port |
217-
| `CAMPAIGN_ID` || Campaign identifier |
291+
| `CAMPAIGN_ID` || Campaign identifier for output partitioning |
292+
| `PLATFORM_ID` || Platform/vessel identifier |
218293
| `LOG_LEVEL` | `INFO` | Logging level |
219294

220295
## Docker

config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ def _parse_dict(val: Any) -> Optional[Dict[str, Any]]:
100100
"telemetry_downsample_seconds",
101101
"watch_poll_interval",
102102
"ctd_poll_interval_seconds",
103+
"backfill_minutes",
103104
}
104105

105106
_FLOAT_FIELDS: set[str] = set()
@@ -126,6 +127,7 @@ class EdgeConfig:
126127
watch_patterns: str = "*.csv,*.txt,*.hex,*.cnv,*.raw,*.ad2cp,*.tar.gz"
127128
watch_polling: bool = False
128129
watch_poll_interval: int = 2
130+
backfill_minutes: int = 0
129131

130132
# --- Batching ---
131133
batch_interval_seconds: int = 60

exports/telemetry.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,24 @@ def send_record_telemetry(
7676
}
7777

7878
send_to_hub(client, data=payload, output_name="output1")
79+
80+
# Forward CTD readings on a dedicated output for module-to-module routing
81+
source = record.get("source", "")
82+
if source in ("ctd_latest", "oceanlab_munkholmen") and any(
83+
k in record for k in ("temperature", "salinity", "pressure")
84+
):
85+
ctd_payload: dict = {
86+
"type": "ctd_environment",
87+
"time": record.get("time"),
88+
"latitude": record.get("latitude"),
89+
"longitude": record.get("longitude"),
90+
}
91+
# Coerce numeric CTD fields — skip corrupt values (e.g. "1.!60")
92+
for key in ("temperature", "salinity", "pressure", "depth", "sound_speed"):
93+
val = record.get(key)
94+
if val is not None:
95+
try:
96+
ctd_payload[key] = float(val)
97+
except (ValueError, TypeError):
98+
pass # omit corrupt value
99+
send_to_hub(client, data=ctd_payload, output_name="ctdOutput")

ingest/adapter.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -467,9 +467,16 @@ def parse_ad2cp_file(
467467
"AD2CP %s has no echosounder data — trying velocity reader",
468468
raw_path.name,
469469
)
470-
from oceanstream.adcp.processor import (
471-
process_ad2cp_velocity_file as _os_process_velocity,
472-
)
470+
try:
471+
from oceanstream.adcp.processor import (
472+
process_ad2cp_velocity_file as _os_process_velocity,
473+
)
474+
except ImportError:
475+
logger.warning(
476+
"AD2CP %s: velocity reader not yet available — skipping",
477+
raw_path.name,
478+
)
479+
return pd.DataFrame()
473480

474481
try:
475482
return _os_process_velocity(raw_path)

ingest/file_watcher.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,25 +93,44 @@ async def stop(self) -> None:
9393
logger.info("File watcher stopped")
9494

9595
async def scan_existing(self) -> int:
96-
"""Scan watch_dir for existing files and queue them.
96+
"""Scan watch_dir for recently created files and queue them.
97+
98+
Only files modified within the last ``config.backfill_minutes`` are
99+
queued. If ``backfill_minutes`` is 0 (the default), no existing
100+
files are queued — only newly arriving files trigger processing.
97101
98102
Returns the number of files queued.
99103
"""
104+
backfill = self.config.backfill_minutes
105+
if backfill <= 0:
106+
return 0
107+
100108
watch_path = Path(self.config.watch_dir)
101109
if not watch_path.exists():
102110
return 0
103111

112+
import time as _time
113+
cutoff = _time.time() - backfill * 60
104114
patterns = [p.strip() for p in self.config.watch_patterns.split(",")]
105115
count = 0
116+
skipped = 0
106117
for pattern in patterns:
107118
for file_path in sorted(watch_path.rglob(pattern)):
108119
if file_path.is_file() and str(file_path) not in self._seen:
109120
self._seen.add(str(file_path))
121+
if file_path.stat().st_mtime < cutoff:
122+
skipped += 1
123+
continue
110124
await self.queue.put(("file", str(file_path)))
111125
count += 1
112126

127+
if skipped:
128+
logger.info(
129+
"Skipped %d files older than %d min from %s",
130+
skipped, backfill, watch_path,
131+
)
113132
if count:
114-
logger.info("Queued %d existing files from %s", count, watch_path)
133+
logger.info("Queued %d recent files from %s", count, watch_path)
115134
return count
116135

117136
async def _on_new_file(self, file_path: str, patterns: list[str]) -> None:

0 commit comments

Comments
 (0)