Skip to content

Commit 10fbf4c

Browse files
committed
initial import
0 parents  commit 10fbf4c

42 files changed

Lines changed: 5395 additions & 0 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.env.example

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# IoT Edge runtime (set automatically when running as Edge module)
2+
# LOCAL_ENV=true
3+
# DEVICE_CONNECTION_STRING=HostName=...;DeviceId=...;SharedAccessKey=...
4+
5+
# Storage
6+
STORAGE_BACKEND=local
7+
OUTPUT_BASE_PATH=./output
8+
9+
# Azure Blob Edge (only when STORAGE_BACKEND=azure-blob-edge)
10+
# AZURE_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=http;...
11+
12+
# Stream input
13+
STREAM_HOST=0.0.0.0
14+
STREAM_PORT=9100
15+
16+
# File watcher
17+
WATCH_DIR=/data/sensor
18+
19+
# Metadata
20+
CAMPAIGN_ID=
21+
PLATFORM_ID=
22+
PLATFORM_NAME=
23+
24+
# Logging
25+
LOG_LEVEL=INFO

.gitignore

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
__pycache__/
2+
*.pyc
3+
*.pyo
4+
.env
5+
.venv/
6+
venv/
7+
output/
8+
*.egg-info/
9+
dist/
10+
build/
11+
.pytest_cache/
12+
.mypy_cache/
13+
test/.test_data_cache/
14+
test/output_azure/
15+
test/test_data/

Dockerfile.Linux.amd64

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
FROM python:3.12-slim
2+
3+
WORKDIR /app
4+
5+
# System dependencies
6+
RUN apt-get update && apt-get install -y --no-install-recommends \
7+
git build-essential && \
8+
rm -rf /var/lib/apt/lists/*
9+
10+
# Install oceanstream from git
11+
ARG OCEANSTREAM_BRANCH=main
12+
ARG OCEANSTREAM_REPO=https://github.com/OceanStreamIO/sd-data-ingest.git
13+
RUN git clone --depth 1 -b ${OCEANSTREAM_BRANCH} ${OCEANSTREAM_REPO} /tmp/oceanstream && \
14+
pip install --no-cache-dir /tmp/oceanstream && \
15+
rm -rf /tmp/oceanstream
16+
17+
# Python dependencies
18+
COPY requirements.txt .
19+
RUN pip install --no-cache-dir -r requirements.txt
20+
21+
# Application code
22+
COPY azure_handler/ azure_handler/
23+
COPY ingest/ ingest/
24+
COPY process/ process/
25+
COPY exports/ exports/
26+
COPY simulate/ simulate/
27+
COPY config.py main.py standalone.py ./
28+
29+
ENV PYTHONUNBUFFERED=1
30+
31+
CMD ["python", "-u", "main.py"]

Dockerfile.Linux.arm64

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
FROM python:3.12-slim-bookworm
2+
3+
WORKDIR /app
4+
5+
# System dependencies
6+
RUN apt-get update && apt-get install -y --no-install-recommends \
7+
git build-essential gcc g++ && \
8+
rm -rf /var/lib/apt/lists/*
9+
10+
# Install oceanstream from git
11+
ARG OCEANSTREAM_BRANCH=main
12+
ARG OCEANSTREAM_REPO=https://github.com/OceanStreamIO/sd-data-ingest.git
13+
RUN git clone --depth 1 -b ${OCEANSTREAM_BRANCH} ${OCEANSTREAM_REPO} /tmp/oceanstream && \
14+
pip install --no-cache-dir /tmp/oceanstream && \
15+
rm -rf /tmp/oceanstream
16+
17+
# Python dependencies
18+
COPY requirements.txt .
19+
RUN pip install --no-cache-dir -r requirements.txt
20+
21+
# Application code
22+
COPY azure_handler/ azure_handler/
23+
COPY ingest/ ingest/
24+
COPY process/ process/
25+
COPY exports/ exports/
26+
COPY simulate/ simulate/
27+
COPY config.py main.py standalone.py ./
28+
29+
ENV PYTHONUNBUFFERED=1
30+
31+
CMD ["python", "-u", "main.py"]

azure_handler/__init__.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
"""Azure IoT Hub handler — client, messaging, storage."""
2+
3+
from __future__ import annotations
4+
5+
6+
def create_client():
7+
"""Lazy import to avoid pulling azure-iot-device at module level."""
8+
from azure_handler.connect_iothub import create_client as _create
9+
return _create()
10+
11+
12+
def create_storage(backend: str = "local", base_path: str = "/app/processed"):
13+
"""Factory for storage backends."""
14+
from azure_handler.storage import LocalStorage, AzureBlobEdgeStorage
15+
16+
if backend == "azure-blob-edge":
17+
try:
18+
return AzureBlobEdgeStorage()
19+
except Exception:
20+
import logging
21+
logging.getLogger("sensorstream").warning(
22+
"Azure Blob Edge unavailable, falling back to local storage"
23+
)
24+
return LocalStorage(base_path)
25+
26+
return LocalStorage(base_path)

azure_handler/connect_iothub.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
"""IoT Hub module client creation for the sensorstream module."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
import os
7+
from typing import Optional
8+
9+
from dotenv import load_dotenv
10+
11+
load_dotenv()
12+
13+
logger = logging.getLogger("sensorstream")
14+
15+
DEVICE_CONNECTION_STRING: Optional[str] = os.getenv("DEVICE_CONNECTION_STRING")
16+
LOCAL_ENV: bool = os.getenv("LOCAL_ENV", "").lower() in ("true", "1", "yes")
17+
18+
19+
def create_client():
20+
"""Create the IoT Hub module client.
21+
22+
In IoT Edge runtime: creates from edge environment.
23+
In local/dev mode: creates from device connection string.
24+
"""
25+
from azure.iot.device import IoTHubModuleClient
26+
27+
client = None
28+
29+
try:
30+
if LOCAL_ENV:
31+
logger.info("Running in local environment using device connection string.")
32+
if not DEVICE_CONNECTION_STRING:
33+
raise ValueError("Missing DEVICE_CONNECTION_STRING environment variable.")
34+
client = IoTHubModuleClient.create_from_connection_string(DEVICE_CONNECTION_STRING)
35+
else:
36+
client = IoTHubModuleClient.create_from_edge_environment()
37+
38+
if client is None:
39+
raise ValueError("Failed to create IoTHubModuleClient.")
40+
41+
logger.info("IoT Hub module client initialized")
42+
return client
43+
44+
except Exception as e:
45+
logger.error("Could not connect: %s", e, exc_info=True)
46+
raise

azure_handler/message_handler.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
"""Send messages and update twin reported properties via IoT Hub."""
2+
3+
from __future__ import annotations
4+
5+
import json
6+
import logging
7+
import uuid
8+
from datetime import datetime
9+
from typing import TYPE_CHECKING, Any, Dict
10+
11+
import numpy as np
12+
import pandas as pd
13+
14+
if TYPE_CHECKING:
15+
from azure.iot.device import IoTHubModuleClient
16+
17+
logger = logging.getLogger("sensorstream")
18+
19+
20+
def _default_serializer(obj):
21+
"""Convert non-JSON-serializable objects."""
22+
if isinstance(obj, (pd.Timestamp, np.datetime64, datetime)):
23+
return obj.isoformat()
24+
if isinstance(obj, (np.integer,)):
25+
return int(obj)
26+
if isinstance(obj, (np.floating,)):
27+
return float(obj)
28+
if isinstance(obj, np.generic):
29+
return obj.item()
30+
return obj
31+
32+
33+
def serialize_for_json(obj):
34+
"""Recursively make an object JSON-serializable."""
35+
if isinstance(obj, dict):
36+
return {k: serialize_for_json(v) for k, v in obj.items()}
37+
if isinstance(obj, (list, tuple)):
38+
return [serialize_for_json(v) for v in obj]
39+
if isinstance(obj, (pd.Timestamp, np.datetime64, datetime)):
40+
return obj.isoformat()
41+
if isinstance(obj, (np.int64, np.int32)):
42+
return int(obj)
43+
if isinstance(obj, (np.float64, np.float32)):
44+
return float(obj)
45+
if isinstance(obj, np.generic):
46+
return obj.item()
47+
if isinstance(obj, set):
48+
return list(obj)
49+
return obj
50+
51+
52+
def send_to_hub(
53+
client: "IoTHubModuleClient",
54+
data: Dict[str, Any] | None = None,
55+
properties: Dict[str, Any] | None = None,
56+
output_name: str = "output1",
57+
) -> None:
58+
"""Send data to Azure IoT Hub via output message or twin patch."""
59+
from azure.iot.device import Message
60+
61+
try:
62+
if properties:
63+
properties = serialize_for_json(properties)
64+
client.patch_twin_reported_properties(properties)
65+
else:
66+
if data is None:
67+
data = {}
68+
69+
payload = json.dumps(data, default=_default_serializer)
70+
message = Message(payload)
71+
message.message_id = str(uuid.uuid4())
72+
client.send_message_to_output(message, output_name)
73+
74+
logger.info("Sent data to IoT Hub on output '%s'", output_name)
75+
except Exception as e:
76+
logger.error("Failed to send data to IoT Hub on output '%s': %s", output_name, e, exc_info=True)

0 commit comments

Comments
 (0)