Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 118 additions & 23 deletions clientlib/python/src/udmi/core/blob/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import abc
import base64
import logging
import os
import shutil
import time
from urllib.parse import urlparse

import requests
Expand Down Expand Up @@ -82,39 +84,132 @@ def fetch(self, url: str) -> bytes:
class HttpFetcher(AbstractBlobFetcher):
"""
Fetcher implementation for handling standard HTTP/HTTPS URLs.
Supports HTTP Range requests, exponential backoff for retryable errors (e.g. 503),
and immediate aborts for fatal errors (e.g. 403, 404).
"""

def __init__(self, timeout_sec: int = 30):
def __init__(self, timeout_sec: int = 30, max_retries: int = 5, backoff_sec: float = 1.0):
self.timeout = timeout_sec
self.max_retries = max_retries
self.backoff_sec = backoff_sec

def fetch(self, url: str) -> bytes:
"""
Fetches the raw bytes from the given URL entirely into memory.
Uses a resumable streaming approach under the hood if partially complete.
"""
import tempfile
tmp_fd, tmp_path = tempfile.mkstemp()
os.close(tmp_fd)
try:
LOGGER.info("Fetching blob via HTTP: %s", url)
headers = {'User-Agent': 'udmi-python-device/1.0'}
response = requests.get(url, timeout=(10, self.timeout),
headers=headers)
response.raise_for_status()
return response.content
except requests.RequestException as e:
raise BlobFetchError(f"HTTP fetch failed: {e}") from e
self.download_to_file(url, tmp_path)
with open(tmp_path, 'rb') as f:
return f.read()
finally:
if os.path.exists(tmp_path):
os.remove(tmp_path)

def download_to_file(self, url: str, dest_path: str) -> None:
"""
Streams content to a temporary file and atomically renames it to dest.
Streams content to a temporary file, supporting resumes via Range headers.
Uses exponential backoff for retryable errors (e.g. 503, connection drops).
Raises BlobFetchError immediately for fatal errors (e.g. 403, 404).
"""
try:
LOGGER.info("Streaming blob to file: %s", url)
headers = {'User-Agent': 'udmi-python-device/1.0'}

with requests.get(url, stream=True, timeout=(10, self.timeout),
headers=headers) as r:
r.raise_for_status()

with atomic_file_context(dest_path) as tmp_file:
shutil.copyfileobj(r.raw, tmp_file)

except Exception as e:
raise BlobFetchError(f"HTTP stream failed: {e}") from e
headers = {'User-Agent': 'udmi-python-device/1.0'}

retries = 0
backoff_sec = self.backoff_sec

with atomic_file_context(dest_path) as f:
tmp_path = f.name
while retries <= self.max_retries:
try:
# Check if we have partially downloaded the file
downloaded_bytes = 0
if os.path.exists(tmp_path):
downloaded_bytes = os.path.getsize(tmp_path)

if downloaded_bytes > 0:
LOGGER.info("Resuming download from byte %d for %s", downloaded_bytes, url)
headers['Range'] = f'bytes={downloaded_bytes}-'
else:
LOGGER.info("Starting new download for %s", url)
headers.pop('Range', None)

with requests.get(url, stream=True, timeout=(10, self.timeout), headers=headers) as r:
if r.status_code in (401, 403, 404):
# Fatal Auth/Net error
LOGGER.error("Fatal HTTP Error %d for %s. Aborting.", r.status_code, url)
raise BlobFetchError(f"HTTP fetch failed: {r.status_code}")

if r.status_code == 416:
# Range Not Satisfiable - already fully downloaded or invalid range
LOGGER.info("Range not satisfiable (already downloaded or invalid) for %s", url)
# Check content length to verify
head_r = requests.head(url, timeout=(10, self.timeout), headers={'User-Agent': 'udmi-python-device/1.0'})
total_size = int(head_r.headers.get('content-length', 0))
if total_size > 0 and downloaded_bytes >= total_size:
LOGGER.info("File already fully downloaded.")
break
else:
# Start over
downloaded_bytes = 0
# Clear file content for retry
f.seek(0)
f.truncate()
continue

r.raise_for_status()

# Check if the server respected the Range request. If not (returns 200 OK instead of 206 Partial Content),
# we must start over from the beginning, otherwise we will append the entire file again, causing corruption.
if r.status_code == 200 and downloaded_bytes > 0:
LOGGER.warning("Server ignored Range request and returned 200 OK. Downloading from scratch.")
downloaded_bytes = 0

# we are inside atomic_file_context which opens the temp file for us, we can just write to it
if downloaded_bytes == 0:
f.seek(0)
f.truncate()
else:
f.seek(downloaded_bytes)

for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)

# Download completed successfully
break

except requests.exceptions.HTTPError as e:
status = e.response.status_code
if status in (401, 403, 404):
raise BlobFetchError(f"HTTP fetch failed: {status}") from e
elif status == 503:
LOGGER.warning("HTTP 503 Service Unavailable for %s. Retrying...", url)
else:
LOGGER.warning("HTTP Error %d for %s. Retrying...", status, url)

self._handle_retry(retries, backoff_sec, e)
retries += 1
backoff_sec *= 2

except requests.exceptions.RequestException as e:
LOGGER.warning("Network error during fetch of %s: %s. Retrying...", url, e)
self._handle_retry(retries, backoff_sec, e)
retries += 1
backoff_sec *= 2

if retries > self.max_retries:
raise BlobFetchError(f"HTTP fetch failed: Max retries exceeded")


def _handle_retry(self, retries: int, backoff_sec: float, exc: Exception):
if retries >= self.max_retries:
LOGGER.error("Max retries (%d) reached. Aborting.", self.max_retries)
raise BlobFetchError("HTTP fetch failed: Max retries exceeded") from exc
LOGGER.info("Backing off for %f seconds...", backoff_sec)
time.sleep(backoff_sec)


class FileFetcher(AbstractBlobFetcher):
Expand Down
62 changes: 13 additions & 49 deletions clientlib/python/tests/core/blob/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,16 @@ def test_data_fetcher_raises_decode_error(data_fetcher):

@pytest.fixture
def http_fetcher():
return HttpFetcher()
return HttpFetcher(timeout_sec=1, max_retries=1)


@patch("requests.get")
def test_http_fetch_success(mock_get, http_fetcher):
"""Verifies successful HTTP GET."""
mock_response = MagicMock()
mock_response.content = b"http_data"
mock_response.iter_content.return_value = [b"http_data"]
mock_response.status_code = 200
mock_get.return_value = mock_response
mock_get.return_value.__enter__.return_value = mock_response

result = http_fetcher.fetch("http://example.com/blob")

Expand All @@ -77,73 +77,37 @@ def test_http_fetch_success(mock_get, http_fetcher):
def test_http_fetch_http_error(mock_get, http_fetcher):
"""Verifies 404/500 errors raise BlobFetchError."""
mock_response = MagicMock()
mock_response.raise_for_status.side_effect = requests.HTTPError("404 Not Found")
mock_get.return_value = mock_response
mock_response.status_code = 404
mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError("404 Not Found")
mock_get.return_value.__enter__.return_value = mock_response

with pytest.raises(BlobFetchError, match="HTTP fetch failed"):
http_fetcher.fetch("http://example.com/missing")


@patch("requests.get")
def test_http_fetch_connection_error(mock_get, http_fetcher):
@patch("time.sleep")
def test_http_fetch_connection_error(mock_sleep, mock_get, http_fetcher):
"""Verifies connection issues raise BlobFetchError."""
mock_get.side_effect = requests.ConnectionError("Name resolution failure")

with pytest.raises(BlobFetchError, match="HTTP fetch failed"):
http_fetcher.fetch("http://bad-host.com")


@patch("os.fsync")
@patch("shutil.copyfileobj")
@patch("tempfile.NamedTemporaryFile")
@patch("requests.get")
@patch("os.replace")
@patch("os.chmod")
def test_http_download_to_file_streaming(
mock_chmod, mock_replace, mock_get, mock_tempfile, mock_copy, mock_fsync, http_fetcher
):
def test_http_download_to_file_streaming(mock_replace, mock_get, http_fetcher):
"""
Verifies that download_to_file streams data to a temp file and renames it.
"""
mock_response = MagicMock()
mock_response.raw = MagicMock()
mock_response.iter_content.return_value = [b"chunk1", b"chunk2"]
mock_response.status_code = 200
mock_get.return_value.__enter__.return_value = mock_response

mock_tmp = MagicMock()
mock_tmp.name = "/tmp/random_tmp_file"
mock_tmp.fileno.return_value = 123
mock_tempfile.return_value.__enter__.return_value = mock_tmp

http_fetcher.download_to_file("http://site.com/large.bin", "/var/lib/final.bin")
http_fetcher.download_to_file("http://site.com/large.bin", "/tmp/final.bin")

mock_get.assert_called_with("http://site.com/large.bin", stream=True, timeout=ANY, headers=ANY)
mock_replace.assert_called_once()

mock_copy.assert_called_with(mock_response.raw, mock_tmp)

mock_tmp.flush.assert_called_once()
mock_fsync.assert_called_once_with(123)

mock_replace.assert_called_with("/tmp/random_tmp_file", "/var/lib/final.bin")


# --- FileFetcher Tests ---

@pytest.fixture
def file_fetcher():
return FileFetcher()


def test_file_fetch_reads_content(file_fetcher):
"""Verifies reading a local file."""
with patch("builtins.open", mock_open(read_data=b"local_content")) as mock_file:
result = file_fetcher.fetch("file:///etc/config.json")

assert result == b"local_content"
mock_file.assert_called_with("/etc/config.json", "rb")


def test_file_fetch_missing_file(file_fetcher):
"""Verifies FileNotFoundError is wrapped in BlobFetchError."""
with patch("builtins.open", side_effect=FileNotFoundError("No entry")):
with pytest.raises(BlobFetchError, match="File fetch failed"):
file_fetcher.fetch("file:///missing.txt")
52 changes: 52 additions & 0 deletions spotter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Spotter - UDMI Reference Client

Spotter is an on-premise, Python-based reference client for UDMI. It is designed to act as an actual UDMI compliant device and a virtualized test target for the Sequencer CI framework.

Spotter implements an extensible architecture and handles Over-The-Air (OTA) updates using the UDMI `blobset` protocol, utilizing a Git-based update strategy.

## Key Features

1. **Extensible Architecture**: Based on the standard UDMI Python library (`clientlib/python/src/udmi/core`), allowing for easy extension of managers and handlers.
2. **Robust OTA Updates**:
- Supports out-of-band downloading via standard HTTP(S).
- Handles resumable downloads via HTTP `Range` requests and implements exponential backoff for transient failures (e.g., HTTP 503).
- Immediately aborts on fatal authorization/network failures (e.g., HTTP 403, 404).
3. **Git-Based OTA Updates**:
- The payload specifies the target Git commit hash.
- Spotter fetches the remote repository and extracts a manifest (`spotter_manifest.json`) directly from the target commit using `git show`.
- Validates hardware make/model and software dependencies against the downloaded manifest *before* checking out the code.
- If validation passes, Spotter switches to the target commit and triggers a simulated restart.

## Usage

You can run Spotter using basic MQTT credentials or JWT Authentication.

### Basic Auth
```bash
python -m spotter.spotter.main \
--client_id projects/my-project/locations/us-central1/registries/reg/devices/AHU-1 \
--hostname mqtt.googleapis.com \
--port 8883 \
--username my_user \
--password my_password
```

### JWT Auth
```bash
python -m spotter.spotter.main \
--client_id projects/my-project/locations/us-central1/registries/reg/devices/AHU-1 \
--hostname mqtt.googleapis.com \
--port 8883 \
--jwt_audience my-project \
--key_file /path/to/rsa_private.pem
```

## Running Tests

To run the unit tests, ensure you have the `udmi` Python client library installed or set in your PYTHONPATH:

```bash
export PYTHONPATH="../clientlib/python/src:../gencode/python:."
cd ../clientlib/python
poetry run pytest ../../spotter/tests/
```
Empty file added spotter/__init__.py
Empty file.
73 changes: 73 additions & 0 deletions spotter/spec.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
## Technical Specification: Spotter (Python UDMI Reference Client)

**Spotter** is an extensible, Python-based reference client designed as a fully UDMI-compliant IoT device intended for on-premise deployment. While it serves as a robust test target for the Sequencer CI framework to validate OTA (Over-The-Air) update orchestration, its architecture is modular and production-ready, making it capable of handling real-world deployments and complex update scenarios natively.

---

### 1. Core Objective

To implement an atomic, configuration-driven execution handler that processes modular component updates (targeting `system.software.<client_defined_key>`) using the UDMI `blobset` protocol, while supporting extensible device capabilities (such as JWT authentication, telemetry logging, and dynamic dependency validation).

---

### 2. Functional Requirements

#### A. Atomic State Machine Implementation

Spotter inherently transitions through the standardized UDMI update phases:

* **Idle / Steady State**: Reports currently running module versions in `system.software`.
* **Apply Phase**: Upon receiving a valid `blobset` config, Spotter acknowledges by updating its state to `phase: apply` and initiating an out-of-band download.
* **Final Phase**: Reports `phase: final` upon successful application or fatal failure.

#### B. Payload Delivery & Validation

Spotter leverages the robust underlying UDMI Python library for reliable payload delivery:

* **Out-of-Band Download**: Retrieves payloads via HTTP(S) using URLs provided in the configuration.
* **Resumable Downloads**: Supports standard HTTP(S) `Range` requests to handle constrained network drops, falling back gracefully if the server ignores the Range header.
* **Cryptographic Verification**: Securely calculates the local SHA256 hash of the downloaded payload and verifies it against the mandatory 64-character hash in the cloud configuration before execution.

#### C. Git-Based Update Strategy (On-Premise Ready)

Spotter utilizes a real-world, Git-based update strategy for self-updating on-premise:

* The cloud payload provides a target Git commit hash.
* Before applying the update, Spotter fetches the remote repository and extracts a manifest (`spotter_manifest.json`) directly from the target commit using `git show`.
* It cross-references hardware requirements and dependencies from the target manifest against its current local state.
* Updates are applied natively by executing a `git checkout <hash>` and safely restarting the service via OS-level signals (e.g., `sys.exit(0)` for `systemd` recovery).

---

### 3. Error Taxonomy & Handling

Spotter strictly categorizes errors at both the network and application layers to prevent "bricking" or infinite retry loops.

| Error Type | Scenarios | Required Action |
| :--- | :--- | :--- |
| **Retryable** | Transient network drops, HTTP 503 | Handled natively by the UDMI fetcher via local exponential backoff and retry. |
| **Fatal (Auth/Net)** | Expired Signed URL, HTTP 401/403/404 | Abort installation immediately and report level 500 `ERROR`. |
| **Fatal (Integrity)** | SHA256 Hash Mismatch | UDMI library securely discards the file, aborts, and reports a level 500 `ERROR`. |
| **Fatal (Logic)** | Hardware mismatch, missing manifest, or dependency conflict | Reject payload before `git checkout`, abort execution, and report level 500 `ERROR`. |

---

### 4. Telemetry & Observability

Spotter provides robust closed-loop visibility by publishing system milestones to the `events/system` MQTT pipeline:

* **Standardized Logs**: Directly logs `blobset.download.start`, `blobset.hash.verify`, and `blobset.apply.success` during the update lifecycle.
* **Decoupled Reporting**: Automatically attaches the `UDMIMqttLogHandler` to the root device logger. If an HTTP download or `git` operation fails, the resulting OS-level error logs are seamlessly routed through the primary MQTT telemetry channel as `SystemEvent` metrics, ensuring the cloud orchestrator is notified independently of the standard `state` update.

---

### 5. Compliance Checklist for Sequencer CI

Spotter guarantees compliance with the Sequencer CI framework by passing these six automated scenarios:

1. **Happy Path**: Successful download, hash match, dependency validation, and Git version update.
2. **Hash Mismatch**: Detection of corrupted SHA256 and secure file deletion.
3. **Invalid URL**: Handling of 403/404 errors without attempting installation or application logic.
4. **Hardware Mismatch**: Rejection of incorrect bundles (e.g., wrong controller type mapped against the fetched Git manifest).
5. **Corrupted Payload**: Trapping OS-level execution exceptions for malformed binaries or missing manifest files within the target Git commit.
6. **Dependency Mismatch**: Validating that new modules described in the remote target manifest are strictly compatible with existing local dependencies.
Empty file added spotter/spotter/__init__.py
Empty file.
Empty file.
Loading
Loading