|
| 1 | +# iotedge-sensorstream |
| 2 | + |
| 3 | +Azure IoT Edge module for ingesting and processing oceanographic sensor data (CTD, GNSS) from USV and ship-based platforms. Outputs GeoParquet files with per-record metadata JSON. Runs on edge devices (Jetson Orin, x86 Ubuntu) and locally in standalone mode. |
| 4 | + |
| 5 | +## Data Flow |
| 6 | + |
| 7 | +``` |
| 8 | +Sensor data (TCP/UDP stream, file drop, IoT Edge message) |
| 9 | + → ingest/ (parse NMEA, CSV, .hex, .cnv) |
| 10 | + → process/pipeline.py (DataFrame → enrich → validate) |
| 11 | + → exports/ (GeoParquet + metadata JSON → storage, telemetry → IoT Hub) |
| 12 | +``` |
| 13 | + |
| 14 | +## Supported Formats |
| 15 | + |
| 16 | +| Format | Extension | Parser | Notes | |
| 17 | +|--------|-----------|--------|-------| |
| 18 | +| NMEA 0183 | `.txt` | pynmea2 | GGA, RMC, VTG, ZDA; timestamped or raw | |
| 19 | +| CSV | `.csv` | pandas | Auto-detect columns; EMSO, R2R, generic | |
| 20 | +| Sea-Bird CNV | `.cnv` | pandas | Processed CTD with header metadata | |
| 21 | +| Sea-Bird HEX | `.hex` + `.hdr` + `.XMLCON` | seabirdscientific + gsw | Raw CTD frequency → T/C/P/S/depth | |
| 22 | +| tar.gz | `.tar.gz`, `.tgz` | tarfile | Extracts and processes contained files | |
| 23 | + |
| 24 | +## Quick Start |
| 25 | + |
| 26 | +### Standalone (local processing) |
| 27 | + |
| 28 | +```bash |
| 29 | +python3.12 -m venv .venv |
| 30 | +.venv/bin/pip install -r requirements.txt |
| 31 | + |
| 32 | +# Process files from a directory |
| 33 | +.venv/bin/python standalone.py \ |
| 34 | + --input-dir ./test_data \ |
| 35 | + --output-dir ./output \ |
| 36 | + --campaign-id my_cruise |
| 37 | + |
| 38 | +# Watch a directory for new files |
| 39 | +.venv/bin/python standalone.py \ |
| 40 | + --watch /tmp/sensorstream-watch \ |
| 41 | + --output-dir ./output |
| 42 | + |
| 43 | +# Simulate a CTD hex stream over TCP |
| 44 | +.venv/bin/python standalone.py \ |
| 45 | + --simulate-ctd ./test_data/ctd/hex/11901.hex \ |
| 46 | + --output-dir ./output |
| 47 | +``` |
| 48 | + |
| 49 | +### IoT Edge |
| 50 | + |
| 51 | +Deploy via the [Streambase CLI](https://github.com/OceanStreamIO/streambase): |
| 52 | + |
| 53 | +```bash |
| 54 | +streambase module build -e <env> -m iotedge-sensorstream -o Linux -a amd64 -t latest |
| 55 | +streambase device apply -e <env> -d <device-id> |
| 56 | +``` |
| 57 | + |
| 58 | +Configuration is driven by IoT Hub module twin desired properties. All `EdgeConfig` fields can be set via the twin and are reported back as reported properties. |
| 59 | + |
| 60 | +## Architecture |
| 61 | + |
| 62 | +| Module | Purpose | |
| 63 | +|--------|---------| |
| 64 | +| `azure_handler/` | IoT Hub client, message sending, twin sync, storage abstraction | |
| 65 | +| `ingest/` | TCP/UDP stream listener, file watcher, IoT Edge triggers, hex/NMEA/CSV parsing | |
| 66 | +| `process/` | Processing pipeline: file → DataFrame → GeoParquet + metadata JSON | |
| 67 | +| `exports/` | D2C telemetry, metadata JSON generation, telemetry throttle/downsampling | |
| 68 | +| `simulate/` | Built-in simulators: file dropper, NMEA stream replayer, CTD hex stream | |
| 69 | + |
| 70 | +### Entry Points |
| 71 | + |
| 72 | +| File | Purpose | |
| 73 | +|------|---------| |
| 74 | +| `main.py` | IoT Edge module — runs under aziot-edge runtime | |
| 75 | +| `standalone.py` | CLI — local processing, file watching, stream listening, simulators | |
| 76 | +| `simulate/__main__.py` | `python -m simulate` — run simulators directly | |
| 77 | + |
| 78 | +## Configuration |
| 79 | + |
| 80 | +All config flows through the `EdgeConfig` dataclass in `config.py`: |
| 81 | + |
| 82 | +| Field | Default | Description | |
| 83 | +|-------|---------|-------------| |
| 84 | +| `input_mode` | `both` | `stream`, `file`, or `both` | |
| 85 | +| `stream_format` | `auto` | `nmea`, `csv`, `hex`, or `auto` | |
| 86 | +| `stream_port` | `9100` | TCP/UDP listen port | |
| 87 | +| `watch_dir` | `/data/sensor` | Directory to watch for new files | |
| 88 | +| `batch_interval_seconds` | `60` | Stream batch flush interval | |
| 89 | +| `telemetry_downsample_seconds` | `30` | Min interval between D2C messages | |
| 90 | +| `storage_backend` | `azure-blob-edge` | `azure-blob-edge` or `local` | |
| 91 | +| `output_base_path` | `/app/processed` | Output root directory | |
| 92 | + |
| 93 | +See `config.py` for the full list. Standalone mode uses env vars and CLI args; IoT Edge mode uses twin desired properties. |
| 94 | + |
| 95 | +## Testing |
| 96 | + |
| 97 | +```bash |
| 98 | +# Run all tests |
| 99 | +.venv/bin/python -m pytest test/ -v --tb=short |
| 100 | + |
| 101 | +# Run Azure E2E test (standalone script) |
| 102 | +.venv/bin/python test/test_azure_e2e.py |
| 103 | +``` |
| 104 | + |
| 105 | +Test data is stored in Azure Blob Storage (`sensorstream-test` container) and downloaded automatically on first run. Set `AZURE_CONNECTION_STRING` in `.env` or environment. Tests fall back to local `test_data/` if no connection is available. |
| 106 | + |
| 107 | +**90 tests** across 8 files: config, stream parsing, pipeline, file watcher, simulators, hex parsing, CTD stream, and telemetry throttling. |
| 108 | + |
| 109 | +## IoT Edge Integration |
| 110 | + |
| 111 | +**Input**: Receives `sensorfileadded` messages from the `filenotifier` module: |
| 112 | +```json |
| 113 | +{"event": "fileadd", "path": "/data/raw/ctd/cast.cnv", "size": 17500} |
| 114 | +``` |
| 115 | + |
| 116 | +**Output**: D2C telemetry to IoT Hub (rate-limited by `telemetry_downsample_seconds`), GeoParquet + metadata JSON to blob storage. |
| 117 | + |
| 118 | +**Twin**: All config fields are readable/writable via module twin. Changes are applied live without restart. |
| 119 | + |
| 120 | +## Environment Variables |
| 121 | + |
| 122 | +| Variable | Default | Description | |
| 123 | +|----------|---------|-------------| |
| 124 | +| `AZURE_CONNECTION_STRING` | — | Storage connection string (tests, E2E) | |
| 125 | +| `STORAGE_BACKEND` | `azure-blob-edge` | `local` for standalone | |
| 126 | +| `OUTPUT_BASE_PATH` | `/app/processed` | Output directory | |
| 127 | +| `WATCH_DIR` | `/data/sensor` | File watch directory | |
| 128 | +| `STREAM_PORT` | `9100` | Stream listener port | |
| 129 | +| `CAMPAIGN_ID` | — | Campaign identifier | |
| 130 | +| `LOG_LEVEL` | `INFO` | Logging level | |
| 131 | + |
| 132 | +## Docker |
| 133 | + |
| 134 | +Two Dockerfiles for IoT Edge deployment: |
| 135 | + |
| 136 | +- `Dockerfile.Linux.amd64` — x86_64 |
| 137 | +- `Dockerfile.Linux.arm64` — ARM64 (Jetson Orin) |
| 138 | + |
| 139 | +## License |
| 140 | + |
| 141 | +See [LICENSE](LICENSE). |
0 commit comments