|
| 1 | +# Sensorstream IoT Edge Module — Project Guidelines |
| 2 | + |
| 3 | +Azure IoT Edge module for ingesting and processing oceanographic sensor data (CTD, GNSS) from USV and ship-based platforms. Outputs GeoParquet files with per-record metadata. Runs on edge devices (Jetson Orin, x86 Ubuntu) and locally in standalone mode. |
| 4 | + |
| 5 | +## Architecture |
| 6 | + |
| 7 | +| Module | Purpose | |
| 8 | +|--------|---------| |
| 9 | +| `azure_handler/` | IoT Hub client, message sending, twin sync, storage abstraction (local + Azure Blob) | |
| 10 | +| `ingest/` | Data ingestion: TCP/UDP stream listener, file watcher, IoT Edge message triggers, hex/NMEA/CSV parsing | |
| 11 | +| `process/` | Processing pipeline: file → parse → DataFrame → GeoParquet + metadata JSON | |
| 12 | +| `exports/` | Telemetry (IoT Hub D2C messages), metadata JSON generation, telemetry throttle/downsampling | |
| 13 | +| `simulate/` | Built-in simulators: file dropper, NMEA stream replayer, CTD hex stream (SBE 11plus emulation) | |
| 14 | +| `test/` | Pytest suite with blob-backed test data fixtures | |
| 15 | + |
| 16 | +### Data Flow |
| 17 | + |
| 18 | +``` |
| 19 | +Sensor data (TCP/UDP stream, file drop, IoT Edge message) |
| 20 | + → ingest/ (parse NMEA, CSV, .hex, .cnv) |
| 21 | + → process/pipeline.py (DataFrame, enrich, validate) |
| 22 | + → exports/ (GeoParquet + metadata JSON → storage, telemetry → IoT Hub) |
| 23 | +``` |
| 24 | + |
| 25 | +### Entry Points |
| 26 | + |
| 27 | +| Entry Point | Purpose | |
| 28 | +|-------------|---------| |
| 29 | +| `main.py` | IoT Edge module — runs under `aziot-edge` runtime with twin config | |
| 30 | +| `standalone.py` | CLI tool — local processing, file watching, stream listening, simulators | |
| 31 | +| `simulate/__main__.py` | `python -m simulate` — run simulators directly | |
| 32 | + |
| 33 | +## Configuration |
| 34 | + |
| 35 | +All config flows through the `EdgeConfig` dataclass in `config.py`: |
| 36 | + |
| 37 | +- **IoT Edge mode**: `EdgeConfig.from_twin_and_env(desired_properties)` — twin props + env vars |
| 38 | +- **Standalone mode**: `EdgeConfig.from_standalone(**kwargs)` — env vars + CLI args |
| 39 | +- **Live updates**: `config.update_from_twin(patch)` — applies twin patches in place, increments `_config_version` |
| 40 | + |
| 41 | +Key config fields: |
| 42 | + |
| 43 | +| Field | Type | Default | Description | |
| 44 | +|-------|------|---------|-------------| |
| 45 | +| `input_mode` | `stream \| file \| both` | `both` | Which ingest sources to activate | |
| 46 | +| `stream_format` | `nmea \| csv \| hex \| auto` | `auto` | Expected stream data format | |
| 47 | +| `stream_protocol` | `tcp \| udp \| auto` | `auto` | Network protocol | |
| 48 | +| `stream_port` | `int` | `9100` | Listen/connect port | |
| 49 | +| `stream_connect_mode` | `server \| client` | `server` | TCP server (listen) or client (connect) | |
| 50 | +| `watch_dir` | `str` | `/data/sensor` | Directory to watch for new files | |
| 51 | +| `watch_patterns` | `str` | `*.csv,*.txt,*.hex,*.cnv,*.tar.gz` | Glob patterns for file watcher | |
| 52 | +| `batch_interval_seconds` | `int` | `60` | Stream batch flush interval | |
| 53 | +| `batch_max_records` | `int` | `1000` | Stream batch flush size | |
| 54 | +| `telemetry_downsample_seconds` | `int` | `30` | Min interval between telemetry messages | |
| 55 | +| `storage_backend` | `azure-blob-edge \| local` | `azure-blob-edge` | Output storage | |
| 56 | +| `output_base_path` | `str` | `/app/processed` | Local output root | |
| 57 | + |
| 58 | +Twin property names map 1:1 to config field names, except `Log_Level` → `log_level` (via `_TWIN_KEY_MAP`). |
| 59 | + |
| 60 | +## Supported Data Formats |
| 61 | + |
| 62 | +| Format | Extension | Parser | Notes | |
| 63 | +|--------|-----------|--------|-------| |
| 64 | +| NMEA 0183 | `.txt` | `pynmea2` | GGA, RMC, VTG, ZDA sentences; timestamped or raw | |
| 65 | +| CSV | `.csv` | pandas | Auto-detect columns; EMSO, R2R, generic | |
| 66 | +| Sea-Bird CNV | `.cnv` | pandas | Processed CTD with header metadata | |
| 67 | +| Sea-Bird HEX | `.hex` + `.hdr` + `.XMLCON` | `seabirdscientific` + `gsw` | Raw CTD — frequency → T/C/P/S/depth calibration | |
| 68 | +| tar.gz | `.tar.gz`, `.tgz` | `tarfile` | Extracts and processes contained files | |
| 69 | + |
| 70 | +### Hex File Parsing |
| 71 | + |
| 72 | +The hex parser (`ingest/hex_parser.py`) requires companion files in the same directory: |
| 73 | +- `.hex` — raw scan data (hex-encoded frequency counts) |
| 74 | +- `.hdr` — cast metadata (lat/lon, station, time, bytes_per_scan) |
| 75 | +- `.XMLCON` — instrument configuration and calibration coefficients |
| 76 | + |
| 77 | +`find_hex_group(hex_path)` locates companions via case-insensitive stem matching. `parse_hex_file()` uses `seabirdscientific` for frequency→engineering-unit conversion and `gsw` for derived quantities (depth, salinity). |
| 78 | + |
| 79 | +## Code Style |
| 80 | + |
| 81 | +- **Python**: 3.11+ with `from __future__ import annotations` in all files |
| 82 | +- **Async**: `asyncio` throughout — all pipeline functions are `async def` |
| 83 | +- **Logging**: `logging.getLogger("sensorstream")` — never bare `print()` |
| 84 | +- **Type hints**: Use throughout; `TYPE_CHECKING` blocks for heavy imports (pandas, geopandas) |
| 85 | +- **Optional deps**: Guard with try/except and module-level flags (e.g. `HAS_SBS` for seabirdscientific) |
| 86 | +- **Docstrings**: Module-level docstrings on every `.py` file; NumPy-style for public functions |
| 87 | +- **Line length**: ~100 characters (no formal linter configured yet) |
| 88 | + |
| 89 | +## Build and Test |
| 90 | + |
| 91 | +```bash |
| 92 | +# Create venv and install |
| 93 | +python3.12 -m venv .venv |
| 94 | +.venv/bin/pip install -r requirements.txt |
| 95 | + |
| 96 | +# Run tests (always use .venv/bin/python directly — pyenv can interfere) |
| 97 | +.venv/bin/python -m pytest test/ -v --tb=short |
| 98 | + |
| 99 | +# Run standalone pipeline |
| 100 | +.venv/bin/python standalone.py --input-dir ./test_data --output-dir ./output --campaign-id test |
| 101 | + |
| 102 | +# Run CTD stream simulator |
| 103 | +.venv/bin/python standalone.py --simulate-ctd ./test_data/ctd/hex/11901.hex --output-dir ./output |
| 104 | +``` |
| 105 | + |
| 106 | +### Test Data |
| 107 | + |
| 108 | +Test data lives in Azure Blob Storage, not in the repo: |
| 109 | + |
| 110 | +- **Account**: `ne1osvmdevtest` |
| 111 | +- **Container**: `sensorstream-test` |
| 112 | +- **Prefix**: `raw/` (mirrors the `test_data/` directory structure) |
| 113 | + |
| 114 | +The `test/conftest.py` session fixtures automatically: |
| 115 | +1. Download from blob to `.test_data_cache/` on first run |
| 116 | +2. Use size-based freshness checks on subsequent runs |
| 117 | +3. Fall back to local `test_data/` if no Azure connection |
| 118 | + |
| 119 | +**Connection string**: Set `AZURE_CONNECTION_STRING` in `.env` or environment. The conftest also checks `../sd-data-ingest/.env`. |
| 120 | + |
| 121 | +Key fixtures (all session-scoped): |
| 122 | +- `test_data_dir` — root of synced test data |
| 123 | +- `hex_dir`, `hex_file` — Sea-Bird cast 11901 hex data |
| 124 | +- `ctd_csv_file`, `ctd_cnv_file` — EMSO CSV and Sea-Bird CNV |
| 125 | +- `gnss_nmea_file` — R2R GNSS NMEA track |
| 126 | + |
| 127 | +### Test Files |
| 128 | + |
| 129 | +| File | Count | Scope | |
| 130 | +|------|-------|-------| |
| 131 | +| `test_config.py` | 10 | EdgeConfig creation, twin updates, literal validation | |
| 132 | +| `test_stream_parser.py` | 17 | NMEA/CSV parsing, format detection | |
| 133 | +| `test_pipeline.py` | 4 | Pipeline with inline data (no external files) | |
| 134 | +| `test_file_watcher.py` | 4 | FileWatcher events | |
| 135 | +| `test_simulators.py` | 3 | File/stream simulators | |
| 136 | +| `test_hex_parser.py` | 25 | Hex parsing, calibration, pipeline integration | |
| 137 | +| `test_ctd_stream.py` | 9 | CTD stream simulator, hex format detection | |
| 138 | +| `test_throttle.py` | 16 | Telemetry downsampling, config fields | |
| 139 | +| `test_azure_e2e.py` | — | Standalone E2E script (`python test/test_azure_e2e.py`) | |
| 140 | + |
| 141 | +## Docker |
| 142 | + |
| 143 | +Two Dockerfiles for IoT Edge deployment: |
| 144 | + |
| 145 | +- `Dockerfile.Linux.amd64` — x86_64 (Ubuntu-based dev machines, cloud VMs) |
| 146 | +- `Dockerfile.Linux.arm64` — ARM64 (Jetson Orin edge devices) |
| 147 | + |
| 148 | +Both install from `requirements.txt` and copy the full project. Entry point: `python -u main.py`. |
| 149 | + |
| 150 | +Build via Streambase CLI: |
| 151 | +```bash |
| 152 | +streambase module build -e <env> -m iotedge-sensorstream -o Linux -a amd64 -t latest |
| 153 | +``` |
| 154 | + |
| 155 | +## IoT Edge Integration |
| 156 | + |
| 157 | +### Input Messages |
| 158 | + |
| 159 | +Receives `sensorfileadded` messages from the `filenotifier` module: |
| 160 | +```json |
| 161 | +{"event": "fileadd", "path": "/data/raw/ctd/cast.cnv", "size": 17500} |
| 162 | +``` |
| 163 | + |
| 164 | +### Output Messages |
| 165 | + |
| 166 | +Sends D2C telemetry to IoT Hub (downsampled per `telemetry_downsample_seconds`): |
| 167 | +- Individual sensor records (if `telemetry_send_records` is true) |
| 168 | +- Periodic summaries (if `telemetry_send_summaries` is true) |
| 169 | + |
| 170 | +### Twin Desired Properties |
| 171 | + |
| 172 | +All `EdgeConfig` fields can be set via twin desired properties. The module reports back accepted values to twin reported properties. |
| 173 | + |
| 174 | +## Environment Variables |
| 175 | + |
| 176 | +| Variable | Default | Description | |
| 177 | +|----------|---------|-------------| |
| 178 | +| `AZURE_CONNECTION_STRING` | — | Azure Storage connection string (for tests and E2E) | |
| 179 | +| `STORAGE_BACKEND` | `azure-blob-edge` | `local` for standalone | |
| 180 | +| `OUTPUT_BASE_PATH` | `/app/processed` | Output directory | |
| 181 | +| `PROCESSED_CONTAINER_NAME` | `sensordata` | Blob container for processed output | |
| 182 | +| `WATCH_DIR` | `/data/sensor` | File watch directory | |
| 183 | +| `STREAM_HOST` | `0.0.0.0` | Stream listener bind address | |
| 184 | +| `STREAM_PORT` | `9100` | Stream listener port | |
| 185 | +| `CAMPAIGN_ID` | — | Campaign identifier for output partitioning | |
| 186 | +| `PLATFORM_ID` | — | Platform/vessel identifier | |
| 187 | +| `LOG_LEVEL` | `INFO` | Logging level | |
| 188 | + |
| 189 | +## Conventions |
| 190 | + |
| 191 | +- **Storage abstraction**: All file I/O goes through `StorageBackend` (ABC in `azure_handler/storage.py`). Implementations: `LocalStorage`, `AzureBlobEdgeStorage`. Never write files directly. |
| 192 | +- **Processing results**: `process_file()` and `process_stream_batch()` return `dict` with keys: `status` (`ok`/`error`/`skipped`), `record_count`, `file_type`, `processing_time_ms`. |
| 193 | +- **GeoParquet output**: One `.parquet` + one `.metadata.json` per input file, written to `{output_base_path}/{campaign_id}/`. |
| 194 | +- **Telemetry throttle**: `TelemetryThrottle` in `exports/throttle.py` rate-limits D2C messages. Window is twin-controllable via `telemetry_downsample_seconds`. |
| 195 | +- **No setup.py/pyproject.toml**: This is a deployable module, not a pip-installable library. Dependencies are in `requirements.txt`. |
0 commit comments