Skip to content

Commit 0bec552

Browse files
committed
refactor: delegate all parsing to oceanstream library
- ingest/adapter.py: new adapter layer delegating NMEA, CSV, CTD, ADCP parsing to oceanstream - ingest/stream_parser.py: thin re-export from adapter (backwards compat) - ingest/adcp_parser.py: thin re-export from oceanstream.adcp - ingest/stream_listener.py: import from adapter - process/pipeline.py: use adapter for all parsing, secure tar extraction with tmpdir cleanup - requirements.txt: remove individual parser deps (now via oceanstream extras) - Dockerfile.Linux.amd64/arm64: install oceanstream[geotrack,adcp] with extras - .github/copilot-instructions.md: document oceanstream dependency and adapter pattern
1 parent a3fc5c0 commit 0bec552

10 files changed

Lines changed: 627 additions & 776 deletions

.github/copilot-instructions.md

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
# Sensorstream IoT Edge Module — Project Guidelines
22

3-
Azure IoT Edge module for ingesting and processing oceanographic sensor data (CTD, GNSS) from USV and ship-based platforms. Outputs GeoParquet files with per-record metadata. Runs on edge devices (Jetson Orin, x86 Ubuntu) and locally in standalone mode.
3+
Azure IoT Edge module for ingesting and processing oceanographic sensor data (CTD, GNSS, ADCP) from USV and ship-based platforms. Outputs GeoParquet files with per-record metadata. Runs on edge devices (Jetson Orin, x86 Ubuntu) and locally in standalone mode.
44

55
## Architecture
66

77
| Module | Purpose |
88
|--------|---------|
99
| `azure_handler/` | IoT Hub client, message sending, twin sync, storage abstraction (local + Azure Blob) |
10-
| `ingest/` | Data ingestion: TCP/UDP stream listener, file watcher, IoT Edge message triggers, hex/NMEA/CSV parsing |
11-
| `process/` | Processing pipeline: file → parse → DataFrame → GeoParquet + metadata JSON |
10+
| `ingest/adapter.py` | **Adapter layer** — delegates all parsing to the `oceanstream` library (NMEA, CSV, CTD hex, ADCP) |
11+
| `ingest/stream_listener.py` | TCP/UDP stream listener, hex stream buffering, batch flushing |
12+
| `ingest/file_watcher.py` | Watchdog directory monitor for new sensor files |
13+
| `ingest/file_trigger.py` | IoT Edge message handler for file-added events |
14+
| `ingest/hex_parser.py` | Sea-Bird hex file utilities (companion file discovery, header parsing) — used by CTD stream simulator |
15+
| `ingest/stream_parser.py` | Thin re-export from `ingest.adapter` (backwards compatibility) |
16+
| `ingest/adcp_parser.py` | Thin re-export from `oceanstream.adcp` (backwards compatibility) |
17+
| `process/` | Processing pipeline: file → adapter parse → DataFrame → GeoParquet + metadata JSON |
1218
| `exports/` | Telemetry (IoT Hub D2C messages), metadata JSON generation, telemetry throttle/downsampling |
1319
| `simulate/` | Built-in simulators: file dropper, NMEA stream replayer, CTD hex stream (SBE 11plus emulation) |
1420
| `test/` | Pytest suite with blob-backed test data fixtures |
@@ -17,11 +23,27 @@ Azure IoT Edge module for ingesting and processing oceanographic sensor data (CT
1723

1824
```
1925
Sensor data (TCP/UDP stream, file drop, IoT Edge message)
20-
→ ingest/ (parse NMEA, CSV, .hex, .cnv)
21-
→ process/pipeline.py (DataFrame, enrich, validate)
26+
→ ingest/adapter.py (delegates to oceanstream: NMEA, CSV, CTD hex, ADCP)
27+
→ process/pipeline.py (DataFrame, provider enrichment, deduplication)
2228
→ exports/ (GeoParquet + metadata JSON → storage, telemetry → IoT Hub)
2329
```
2430

31+
### Oceanstream Dependency
32+
33+
All data parsing is handled by the **oceanstream** library (`oceanstream` package from `sd-data-ingest` repo). Sensorstream installs it with `[geotrack,adcp]` extras:
34+
35+
- **Local dev**: `pip install -e "../sd-data-ingest[geotrack,adcp]"`
36+
- **Production**: `pip install "oceanstream[geotrack,adcp] @ git+https://github.com/OceanStreamIO/oceanstream-cli.git"`
37+
38+
The adapter (`ingest/adapter.py`) provides:
39+
- `parse_nmea_line()` / `parse_nmea_file()` — NMEA via `oceanstream.sensors.processors.nmea_gnss`
40+
- `parse_csv_file()` / `parse_csv_line()` — CSV parsing
41+
- `parse_hex_file()` — CTD hex via `oceanstream.sensors.processors.sbe911`
42+
- `parse_cnv_file()` — CTD cnv files
43+
- `parse_adcp_file()` — ADCP via `oceanstream.adcp.processor.process_file`
44+
- `enrich_with_provider()` — Provider enrichment via `oceanstream.providers`
45+
- `detect_format()` — Line format auto-detection (NMEA/CSV/hex)
46+
2547
### Entry Points
2648

2749
| Entry Point | Purpose |

Dockerfile.Linux.amd64

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
1111
ARG OCEANSTREAM_BRANCH=main
1212
ARG OCEANSTREAM_REPO=https://github.com/OceanStreamIO/sd-data-ingest.git
1313
RUN git clone --depth 1 -b ${OCEANSTREAM_BRANCH} ${OCEANSTREAM_REPO} /tmp/oceanstream && \
14-
pip install --no-cache-dir /tmp/oceanstream && \
14+
pip install --no-cache-dir "/tmp/oceanstream[geotrack,adcp]" && \
1515
rm -rf /tmp/oceanstream
1616

1717
# Python dependencies

Dockerfile.Linux.arm64

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
1111
ARG OCEANSTREAM_BRANCH=main
1212
ARG OCEANSTREAM_REPO=https://github.com/OceanStreamIO/sd-data-ingest.git
1313
RUN git clone --depth 1 -b ${OCEANSTREAM_BRANCH} ${OCEANSTREAM_REPO} /tmp/oceanstream && \
14-
pip install --no-cache-dir /tmp/oceanstream && \
14+
pip install --no-cache-dir "/tmp/oceanstream[geotrack,adcp]" && \
1515
rm -rf /tmp/oceanstream
1616

1717
# Python dependencies

0 commit comments

Comments
 (0)