diff --git a/CHANGELOG.md b/CHANGELOG.md index b02c9da1..cb3f2377 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,64 @@ All notable behavioural changes to `strands-robots` are logged here. Follows [Keep a Changelog](https://keepachangelog.com/) conventions. +## Unreleased - #228 (AWS IoT provisioning hardening) + +### Changed: default presigned-URL TTL for camera offload + +``CameraOffloader.presign_ttl`` default is now **60 seconds** (was 3600s). +A 1-hour ceiling (``MAX_PRESIGN_TTL_SECONDS``) is enforced; values above +the cap are clamped with a ``WARNING``. The change shrinks the replay +window for a captured ``strands//camera//ref`` MQTT message +from one hour to one minute. + +Migration: deployments whose downstream consumers (review UIs, +recording pipelines that fetch on a delay) need >60 seconds of validity +should opt in explicitly: + +```bash +export STRANDS_MESH_CAMERA_PRESIGN_TTL=3600 # legacy 1h +``` + +or pass ``presign_ttl=3600`` to ``CameraOffloader(...)`` / ``enable_for_mesh(...)``. + +### Added: AWS IoT provisioning hardening + +Applies to ``strands_robots.mesh.iot.provision`` and +``strands_robots.mesh.iot.camera_offload``: + +- **CA pinning** — ``AmazonRootCA1.pem`` is verified against an + in-tree pin tuple (``_AMAZON_ROOT_CA1_PINS``) at download AND on + every on-disk re-use. Defeats CA-substitution MITM. Operators can + add additional pins via ``STRANDS_MESH_CA_PINS`` (comma-separated + 64-char lowercase hex). The break-glass ``STRANDS_MESH_DISABLE_CA_PIN=true`` + (case-insensitive) writes a ``.unverified`` sidecar marker (mode + ``0o600``) for audit traceability. +- **Strict thing-name regex** (``^[a-zA-Z0-9_-]{1,128}$``, + ``re.fullmatch``) applied symmetrically across ``provision_robot``, + ``provision_operator``, and ``teardown_thing``. Rejects path + separators, dots, spaces, NUL, non-ASCII, and trailing + `` +``/`` ``/`` ``. Pre-existing AWS IoT Things containing ``:`` + must be renamed (we deliberately reject ``:`` due to NTFS / classic + Mac filesystem semantics). +- **IoT policy scope** — robot/operator policies use explicit + per-thing topic prefixes; no ``Resource: '*'`` on Receive. + ``OperatorPublishToFleet``'s ``*/cmd`` wildcard is documented and + pinned as a deliberate design choice (``test_publish_to_fleet_wildcard_is_deliberate``). +- **Per-recv TLS timeout bound** via custom ``HTTPSHandler`` (defeats + malicious-broker connection-stalling). +- **``teardown_thing(cert_dir=...)`` kwarg** for parity with + ``provision_robot``/``provision_operator`` (closes stale-credential + leak on non-default ``cert_dir`` deployments). + +New env vars (documented in README Configuration matrix): +``STRANDS_MESH_CA_PINS``, ``STRANDS_MESH_DISABLE_CA_PIN``, +``STRANDS_MESH_CAMERA_PRESIGN_TTL``. + +Known follow-ups: #249 (camera privacy kill-switch + S3 ACL), +#251 (chunked-read parity in ``_ensure_ca``), #259 (kwarg negative-TTL +WARNING symmetry), #260 (warn on re-use of break-glass-written CA). + ## Unreleased - #178 (LiberoOffScreenRenderEngine retired) ### Removed: ``LiberoOffScreenRenderEngine`` simulation backend (BREAKING) diff --git a/README.md b/README.md index 76853304..ded0d1f1 100644 --- a/README.md +++ b/README.md @@ -501,6 +501,9 @@ agent.tool.gr00t_inference(action="stop", port=8000) | `ZENOH_CONNECT` | Comma-separated list of remote Zenoh endpoints to connect to | - | | `ZENOH_LISTEN` | Comma-separated list of endpoints for the local Zenoh listener | - | | `STRANDS_MESH_AUDIT_DIR` | Directory for the safety audit log (`mesh_audit.jsonl`) | `~/.strands_robots/` | +| `STRANDS_MESH_CA_PINS` | Comma-separated SHA-256 hex pins, **additive** to the bundled Amazon Root CA1 pin tuple. Operator break-glass for an AWS-side root rotation that arrives before the next `strands-robots` release ships the new pin. Must match `^[0-9a-fA-F]{64}$` per entry. | unset | +| `STRANDS_MESH_DISABLE_CA_PIN` | Set to `true` (case-insensitive) to skip CA pin verification on the *download* path only. The on-disk re-use path always raw-checks the pin regardless. Last-resort break-glass; prefer `STRANDS_MESH_CA_PINS` for rotations. | `false` | +| `STRANDS_MESH_CAMERA_PRESIGN_TTL` | Default TTL (seconds) for S3 presigned URLs emitted on the IoT camera-offload path. Capped at 3600s (1h); a `0` is clamped to 1s. Non-integer values fall back to the default with a WARNING. | `60` | | `GROOT_API_TOKEN` | API token for GR00T inference service | - | | `STRANDS_ROBOT_MODE` | Override `Robot()` factory mode detection (`sim`, `real`, `auto`) | `auto` | | `STRANDS_TRUST_REMOTE_CODE` | Set to `1` to opt into HuggingFace `trust_remote_code` for `lerobot_local` policies | unset | diff --git a/strands_robots/mesh/iot/__init__.py b/strands_robots/mesh/iot/__init__.py index 3390d1c2..6da8bc7e 100644 --- a/strands_robots/mesh/iot/__init__.py +++ b/strands_robots/mesh/iot/__init__.py @@ -2,10 +2,10 @@ This subpackage owns the cloud-side concerns of the mesh: -- :mod:`provision` — single-Thing bootstrap (cert + policy + Thing). -- :mod:`bootstrap` — account-wide bootstrap (Rules + Lambda + +- :mod:`provision` — single-Thing bootstrap (cert + policy + Thing). +- :mod:`bootstrap` — account-wide bootstrap (Rules + Lambda + DynamoDB audit + Fleet Provisioning template). -- :mod:`shadow` — Device Shadow named-shadow mirror of presence. +- :mod:`shadow` — Device Shadow named-shadow mirror of presence. - :mod:`camera_offload` — S3-backed camera frame offload. The wire-level transport (:class:`IotMqttTransport`) lives in @@ -20,14 +20,14 @@ from strands_robots.mesh.iot import bootstrap_account, provision_robot # 1. Once per AWS account/region: spin up Rules / Lambda / DynamoDB / - # Fleet Provisioning template / etc. + # Fleet Provisioning template / etc. bootstrap_account() # 2. Per robot: issue a cert + attach the strands-robot policy. p = provision_robot("so100-arm-01") # 3. Run the robot under the iot transport. - # (export the env vars from p.env_vars() and `Robot()` Just Works.) + # (export the env vars from p.env_vars() and `Robot()` Just Works.) For a single-robot dev setup, step 1 is optional — without it E-stop fan-out and DynamoDB audit just don't activate; everything else still diff --git a/strands_robots/mesh/iot/camera_offload.py b/strands_robots/mesh/iot/camera_offload.py index f75974c3..7f5a8ed6 100644 --- a/strands_robots/mesh/iot/camera_offload.py +++ b/strands_robots/mesh/iot/camera_offload.py @@ -37,7 +37,23 @@ ``STRANDS_MESH_CAMERA_S3_PREFIX`` Optional prefix inside the bucket (defaults to ``""``). ``STRANDS_MESH_CAMERA_PRESIGN_TTL`` - Seconds the presigned GET URL stays valid (default 3600). + Seconds the presigned GET URL stays valid. Defaults to + :data:`DEFAULT_PRESIGN_TTL_SECONDS` (60s); clamped at + :data:`MAX_PRESIGN_TTL_SECONDS` (1 hour) to prevent accidental + day- or week-long URLs. Pass ``presign_ttl=N`` to override + explicitly; the env-var fallback only applies when the kwarg is + ``None``. + +Bucket-ownership threat model +----------------------------- +The S3 PutObject path in :meth:`CameraOffloader._upload_frame` does +not pass an ``ACL=`` kwarg. The contract for the offload bucket is +that the operator configures it with object-ownership control +``BucketOwnerEnforced`` (and a bucket policy that denies public +ACLs); that enforcement is out of scope for this library because +deployments differ on whether the bucket is shared with non-mesh +producers. A future code-side ``ACL="private"`` + ``ChecksumAlgorithm`` +hardening is tracked in #249. """ from __future__ import annotations @@ -50,7 +66,13 @@ logger = logging.getLogger(__name__) -DEFAULT_PRESIGN_TTL_SECONDS = 3600 +# Lifetime of the presigned GET URL we hand out for each camera frame. +# Kept deliberately short -- anyone who briefly captures a /ref MQTT message +# can use the URL inside this window. ``STRANDS_MESH_CAMERA_PRESIGN_TTL`` +# overrides for higher-latency consumers, clamped to MAX_PRESIGN_TTL_SECONDS +# (1 hour) to prevent accidental day- or week-long URLs. +DEFAULT_PRESIGN_TTL_SECONDS = 60 +MAX_PRESIGN_TTL_SECONDS = 3600 class CameraOffloader: @@ -70,9 +92,46 @@ def __init__( ) -> None: self.bucket = bucket or os.getenv("STRANDS_MESH_CAMERA_S3_BUCKET", "") self.prefix = (prefix or os.getenv("STRANDS_MESH_CAMERA_S3_PREFIX") or "").strip("/") - self.presign_ttl = presign_ttl or int( - os.getenv("STRANDS_MESH_CAMERA_PRESIGN_TTL", str(DEFAULT_PRESIGN_TTL_SECONDS)) - ) + if presign_ttl is not None: + ttl_raw = presign_ttl + else: + raw_env = os.getenv("STRANDS_MESH_CAMERA_PRESIGN_TTL") + if raw_env is None or raw_env == "": + ttl_raw = DEFAULT_PRESIGN_TTL_SECONDS + else: + try: + ttl_raw = int(raw_env) + except ValueError: + logger.warning( + "[camera_offload] STRANDS_MESH_CAMERA_PRESIGN_TTL=%r is not an integer; using default %ds", + raw_env, + DEFAULT_PRESIGN_TTL_SECONDS, + ) + ttl_raw = DEFAULT_PRESIGN_TTL_SECONDS + if ttl_raw > MAX_PRESIGN_TTL_SECONDS: + logger.warning( + "[camera_offload] STRANDS_MESH_CAMERA_PRESIGN_TTL=%d > %d cap; clamping", + ttl_raw, + MAX_PRESIGN_TTL_SECONDS, + ) + ttl_raw = MAX_PRESIGN_TTL_SECONDS + if ttl_raw < 1: + # Issue #262: WARN on any sub-1 value EXCEPT exactly 0. + # ``presign_ttl=0`` is the documented kwarg-vs-env-precedence + # sentinel pinned by ``test_presign_ttl_none_vs_zero.py`` (R1 + # fix). ``presign_ttl=-99`` is unambiguously a bug at the call + # site -- no caller deliberately wants a negative TTL clamped + # to 1 -- and we surface it. The env-var path always WARNs + # (operator-side bug if STRANDS_MESH_CAMERA_PRESIGN_TTL=-99). + if presign_ttl is None or presign_ttl != 0: + source = "env" if presign_ttl is None else "kwarg" + logger.warning( + "[camera_offload] presign_ttl=%d < 1 floor; clamping to 1s (source=%s)", + ttl_raw, + source, + ) + ttl_raw = 1 + self.presign_ttl = ttl_raw self.region = region or os.getenv("AWS_REGION", os.getenv("AWS_DEFAULT_REGION")) self._s3: Any | None = None @@ -120,7 +179,7 @@ def upload_frame(self, peer_id: str, cam_name: str, jpeg_bytes: bytes, ts: float Body=jpeg_bytes, ContentType="image/jpeg", ) - except Exception as exc: + except Exception as exc: # noqa: BLE001 -- boto3 raises ClientError, EndpointConnectionError, NoCredentialsError, etc.; offload is best-effort logger.debug("[camera_offload] put_object %s failed: %s", key, exc) return None @@ -130,7 +189,7 @@ def upload_frame(self, peer_id: str, cam_name: str, jpeg_bytes: bytes, ts: float Params={"Bucket": self.bucket, "Key": key}, ExpiresIn=self.presign_ttl, ) - except Exception as exc: + except Exception as exc: # noqa: BLE001 -- boto3 ClientError / NoCredentialsError; presign is best-effort logger.debug("[camera_offload] presign %s failed: %s", key, exc) url = None @@ -181,7 +240,7 @@ def _publish_cameras_once_with_offload() -> None: # call it to preserve any user customisation that might have been added). try: original() - except Exception as exc: + except Exception as exc: # noqa: BLE001 -- original is user-customised; offload must not block on user code logger.debug("[camera_offload] original _publish_cameras_once raised: %s", exc) # Now do the S3 offload + ref publish per camera. @@ -195,13 +254,14 @@ def _publish_cameras_once_with_offload() -> None: try: obs = inner.get_observation() - except Exception: + except Exception as exc: # noqa: BLE001 -- LeRobot get_observation() may raise hardware-specific errors + logger.debug("[camera_offload] get_observation failed: %s", exc) return try: import cv2 - except Exception: - logger.debug("[camera_offload] cv2 unavailable — skipping S3 upload") + except ImportError: + logger.debug("[camera_offload] cv2 unavailable -- skipping S3 upload") return transport = current_transport() @@ -231,8 +291,17 @@ def _publish_cameras_once_with_offload() -> None: if ref is None: continue ref["shape"] = list(shape) + # Publish the /ref topic via the transport layer. + # On ``iot`` the IoT Policy's AllowOwnTopics statement + # bounds writes to ``strands//*`` (covers + # ``camera/*/ref`` via the trailing wildcard); on + # ``bridge`` the Zenoh ACL adds a LAN-side gate on top. + # ``enable_for_mesh`` early-returns unless the active + # backend is one of those two, so the publish reaches + # the wire only when at least one of these gates is in + # force. transport.put(f"strands/{mesh.peer_id}/camera/{cam_name}/ref", ref) - except Exception as exc: + except Exception as exc: # noqa: BLE001 -- numpy / cv2 / transport.put can raise diverse errors per frame; offload is best-effort logger.debug( "[camera_offload] %s/%s offload failed: %s", mesh.peer_id, diff --git a/strands_robots/mesh/iot/provision.py b/strands_robots/mesh/iot/provision.py index 64d894ce..efb6747e 100644 --- a/strands_robots/mesh/iot/provision.py +++ b/strands_robots/mesh/iot/provision.py @@ -6,7 +6,7 @@ from strands_robots.mesh.iot import provision_robot provision_robot("so100-arm-01") -…and the function: +...and the function: 1. Creates an AWS IoT Thing named ``so100-arm-01``. 2. Generates an X.509 keypair + cert (AWS-issued, ``CreateKeysAndCertificate``). @@ -38,14 +38,16 @@ strands-robots iot provision so100-arm-01 strands-robots iot provision-operator bedrock-agent-01 - strands-robots iot teardown so100-arm-01 # cleanup + strands-robots iot teardown so100-arm-01 # cleanup """ from __future__ import annotations +import hashlib import json import logging import os +import re import urllib.request from dataclasses import dataclass from pathlib import Path @@ -56,6 +58,41 @@ _AMAZON_ROOT_CA1_URL = "https://www.amazontrust.com/repository/AmazonRootCA1.pem" +# Pinned SHA-256 fingerprints of the canonical Amazon Root CA1 PEM bytes. +# Pinning prevents a network-level attacker (DNS hijack, captive portal, +# BGP, malicious local proxy) from substituting a rogue CA at the URL. +# +# Recompute when AWS rotates the root:: +# +# python -c "import hashlib, urllib.request as u; \ +# print(hashlib.sha256(u.urlopen( \ +# 'https://www.amazontrust.com/repository/AmazonRootCA1.pem' \ +# ).read()).hexdigest())" +# +# Last verified 2026-05. +# +# this is now a TUPLE so a CA rotation can ship as a code change +# that adds the new pin in advance and removes the old one in a follow- +# up after rollout. Operators can also extend the accepted pins via +# ``STRANDS_MESH_CA_PINS`` (comma-separated 64-char lowercase hex). The +# env var augments the built-in tuple; it does not replace it. +_AMAZON_ROOT_CA1_PINS: tuple[str, ...] = ("2c43952ee9e000ff2acc4e2ed0897c0a72ad5fa72c3d934e81741cbd54f05bd1",) +# The legacy ``_AMAZON_ROOT_CA1_SHA256`` alias was deleted. +# CodeQL #229 flagged it as unused after every reader was wired +# through ``_resolve_ca_pins`` / ``_AMAZON_ROOT_CA1_PINS``. Internal +# code references the tuple directly; error messages now format the +# full pin set via ``_resolve_ca_pins`` so operators see every +# accepted pin (not just the canonical first one). + +# Regex: 64 hex chars, lowercase. Matches what hashlib.sha256(...).hexdigest() +# emits and rejects anything else (operator typos surface immediately). +_PIN_RE = re.compile(r"^[0-9a-f]{64}$") + +# Cap the CA download response to a generous multiple of the real ~1.4 KiB +# certificate. Defeats body-size DoS attacks (a captive portal returning a +# multi-megabyte HTML "login page" instead of the expected PEM). +_CA_FETCH_MAX_BYTES = 64 * 1024 + DEFAULT_CERT_DIR = Path.home() / ".strands_robots" / "iot" ROBOT_POLICY_NAME = "strands-robot" OPERATOR_POLICY_NAME = "strands-operator" @@ -127,6 +164,23 @@ def export_lines(self) -> list[str]: ], }, { + # This wildcard middle segment is the OPERATOR'S thing-name + # (the recipient of the response). The robot legitimately + # needs to publish to the operator's own response inbox to + # complete the request/response RPC pattern. Scoping + # tighter (e.g. ``${iot:Connection.Thing.ThingName}/...``) + # would force the operator to know each robot's name to + # route responses, breaking the topic contract. + # + # The trailing ``/*`` is the per-turn id (UUID per + # call). The middle wildcard is the only legitimate broadening. + # + # Defence-in-depth: the operator-side ACL (``AllowOwnSubscriptions`` + # at line 187) restricts each operator to subscribing only to + # ``strands/${ThingName}/...``, so a robot publishing to + # ``strands//response/`` lands on a + # topic nobody is authorised to subscribe to (the message + # is silently dropped by the broker per the IoT Core contract). "Sid": "AllowResponseToAnyOperator", "Effect": "Allow", "Action": "iot:Publish", @@ -154,11 +208,21 @@ def export_lines(self) -> list[str]: ], }, { - "Sid": "AllowReceiveOthers", + # Tightly scoped Receive: a robot only sees the messages + # delivered to topics it actually subscribes to (own /cmd, own + # /response/*, broadcast, safety/estop, +/presence). Previously + # this was a wildcard ``iot:Receive`` on ``strands/*``, which + # would have let any robot eavesdrop on the entire fleet's + # traffic -- including other robots' commands and responses. + "Sid": "AllowReceiveScoped", "Effect": "Allow", "Action": "iot:Receive", "Resource": [ - "arn:aws:iot:*:*:topic/strands/*", + "arn:aws:iot:*:*:topic/strands/${iot:Connection.Thing.ThingName}/cmd", + "arn:aws:iot:*:*:topic/strands/${iot:Connection.Thing.ThingName}/response/*", + "arn:aws:iot:*:*:topic/strands/broadcast", + "arn:aws:iot:*:*:topic/strands/safety/estop", + "arn:aws:iot:*:*:topic/strands/+/presence", ], }, { @@ -184,6 +248,19 @@ def export_lines(self) -> list[str]: "Resource": "arn:aws:iot:*:*:client/${iot:Connection.Thing.ThingName}", }, { + # Deliberate wildcard: any operator credential can publish + # ``cmd`` to any robot (``strands/*/cmd``). The system has + # no per-operator-to-per-robot binding by design -- the + # threat model of a compromised operator is equivalent to a + # compromised fleet command authority. Mitigations: short- + # lived certs (rotation via ``provision_operator`` re-run), + # the OperatorShadow attribute condition that gates shadow + # reads, and operational audit (``mesh_audit.jsonl`` logs + # every command dispatch). A per-robot operator scope would + # require a per-robot policy document, which explodes the + # policy count linearly with fleet size. Pinned as + # intentional by test_iot_policy_scope.py::TestOperatorPolicy + # ::test_publish_to_fleet_wildcard_is_deliberate. "Sid": "OperatorPublishToFleet", "Effect": "Allow", "Action": ["iot:Publish", "iot:RetainPublish"], @@ -203,15 +280,24 @@ def export_lines(self) -> list[str]: ], }, { + # Operator monitoring scope. Operators can subscribe to fleet + # state (presence/state/health) and safety events but NOT to + # other operators' command/response streams. The policy used to + # include a wildcard ``strands/*`` in Receive which exposed all + # fleet traffic to every operator credential. "Sid": "OperatorObserveFleet", "Effect": "Allow", "Action": ["iot:Subscribe", "iot:Receive"], "Resource": [ - "arn:aws:iot:*:*:topic/strands/*", + "arn:aws:iot:*:*:topic/strands/+/presence", "arn:aws:iot:*:*:topicfilter/strands/+/presence", + "arn:aws:iot:*:*:topic/strands/+/state", "arn:aws:iot:*:*:topicfilter/strands/+/state", + "arn:aws:iot:*:*:topic/strands/+/health", "arn:aws:iot:*:*:topicfilter/strands/+/health", + "arn:aws:iot:*:*:topic/strands/+/safety/event", "arn:aws:iot:*:*:topicfilter/strands/+/safety/event", + "arn:aws:iot:*:*:topic/strands/safety/estop", "arn:aws:iot:*:*:topicfilter/strands/safety/estop", ], }, @@ -233,6 +319,43 @@ def export_lines(self) -> list[str]: # Public API +# Thing names flow into S3 keys, IoT ARNs, and on-disk cert filenames. +# Our regex is a STRICT SUBSET of AWS IoT's accepted charset (AWS allows +# ``:`` per the AWS IoT docs; we deliberately reject it because ``:`` is +# a stream separator on NTFS and a directory separator on classic Mac, +# and our cert files are written to ``cert_dir / f"{thing_name}.pem"``). +# Operators who use ``:`` in pre-existing AWS IoT Thing names need to +# rename the Thing or maintain a mapping; we choose the safer subset +# here over compatibility with every legal AWS Thing name. +_THING_NAME_RE = re.compile(r"^[a-zA-Z0-9_-]{1,128}$") + + +def _validate_thing_name(thing_name: str) -> None: + """Raise :class:`ValueError` when *thing_name* is unsafe for use as a + filesystem component AND as an AWS IoT Thing name. + + The accepted pattern is ``^[a-zA-Z0-9_-]{1,128}$``: alphanumerics, + dash and underscore, length 1-128. This is a **strict subset** of + AWS IoT's accepted Thing-name charset (AWS allows ``:`` server-side; + we reject it because of NTFS / classic Mac filesystem semantics). + Anything else (slashes, colons, dots, spaces, NUL, non-ASCII,...) + is rejected -- a slip in upstream validation can never produce a + path such as ``../../../etc/foo`` reaching + ``cert_dir / f"{thing_name}.pem"``. + + Operators with pre-existing AWS IoT Things whose names contain + ``:`` will hit a ``ValueError`` here. Rename the Thing or maintain + an external mapping; the strict charset is intentional. + """ + if not isinstance(thing_name, str) or not thing_name: + raise ValueError(f"thing_name must be a non-empty string, got {thing_name!r}") + if not _THING_NAME_RE.fullmatch(thing_name): + raise ValueError( + f"thing_name={thing_name!r} contains invalid characters; " + "allowed: ASCII letters, digits, '-', '_'; max 128 chars." + ) + + def provision_robot( thing_name: str, *, @@ -242,13 +365,21 @@ def provision_robot( ) -> ProvisionedThing: """Provision a robot Thing and write its credentials to disk. + Validates *thing_name* against ``^[a-zA-Z0-9_-]{1,128}$`` before any + AWS call. The pattern is a **strict subset** of AWS IoT's accepted + Thing-name charset (AWS server-side accepts ``:`` as well; we reject + it for filesystem-path safety on NTFS / classic Mac where ``:`` is a + stream / directory separator). Operators with pre-existing AWS IoT + Things containing ``:`` must rename or maintain a mapping; the + error message will direct them here. + Args: thing_name: The Thing name. MUST equal the intended Mesh peer_id — the IoT Policy uses ``${iot:Connection.Thing.ThingName}`` for topic ACL substitution. Should be DNS-safe (alphanumeric + ``-_``). region: AWS region. Defaults to the default boto3 session region. cert_dir: Where to write certs. Defaults to ``~/.strands_robots/iot``. - attributes: Optional thing-attribute dict (≤3 keys, ≤800 chars total). + attributes: Optional thing-attribute dict (<=3 keys, <=800 chars total). Returns: :class:`ProvisionedThing` describing the artefacts. @@ -264,6 +395,8 @@ def provision_robot( recoverable). Old certs from prior runs remain on the Thing — call :func:`teardown_thing` to clean them up. """ + + _validate_thing_name(thing_name) boto3 = _require_boto3() iot = boto3.client("iot", region_name=region) region = iot.meta.region_name @@ -337,6 +470,8 @@ def provision_operator( (``strands-operator``) which can publish ``cmd`` / ``broadcast`` and observe the whole fleet. """ + + _validate_thing_name(thing_name) boto3 = _require_boto3() iot = boto3.client("iot", region_name=region) region = iot.meta.region_name @@ -386,20 +521,45 @@ def provision_operator( ) -def teardown_thing(thing_name: str, *, region: str | None = None) -> None: +def teardown_thing( + thing_name: str, + *, + region: str | None = None, + cert_dir: Path | str | None = None, +) -> None: """Detach + delete every cert attached to *thing_name*, then delete the Thing. - Cleans up the cert files in ``DEFAULT_CERT_DIR`` if they're named after - this Thing. Does NOT delete the policies — those are shared across all - robots and removing them would break siblings. + Cleans up the cert files under *cert_dir* (defaults to + :data:`DEFAULT_CERT_DIR`) if they're named after this Thing. Pass the + same ``cert_dir`` you used at provision time so the on-disk cert and key + are removed instead of orphaned. Does NOT delete the policies — those + are shared across all robots and removing them would break siblings. Idempotent: missing Thing or no certs is a silent success. + + Note: + ``cert_dir`` is treated as trusted operator input -- it is not + validated beyond ``Path()`` coercion. Do not pass LLM-generated + or otherwise untrusted values; this is a privileged provisioning API. """ + _validate_thing_name(thing_name) boto3 = _require_boto3() iot = boto3.client("iot", region_name=region) + # Paginate principals: ``list_thing_principals`` returns up to 8 + # principals per call (AWS IoT default page size); a Thing with more + # than 8 attached certs (rare but possible after multiple + # provision_robot calls) would otherwise leave certs orphaned. + # Fallback to single-call when the client doesn't expose + # ``get_paginator`` (test mocks, custom shims). + principals: list[str] = [] try: - principals = iot.list_thing_principals(thingName=thing_name).get("principals", []) + if hasattr(iot, "get_paginator"): + paginator = iot.get_paginator("list_thing_principals") + for page in paginator.paginate(thingName=thing_name): + principals.extend(page.get("principals", [])) + else: + principals = list(iot.list_thing_principals(thingName=thing_name).get("principals", [])) except iot.exceptions.ResourceNotFoundException: logger.info("[teardown] thing %s not found, skipping", thing_name) principals = [] @@ -408,18 +568,18 @@ def teardown_thing(thing_name: str, *, region: str | None = None) -> None: cert_id = cert_arn.rsplit("/", 1)[-1] try: iot.detach_thing_principal(thingName=thing_name, principal=cert_arn) - except Exception as exc: + except Exception as exc: # noqa: BLE001 -- iot.exceptions.ClientError / BotoCoreError; teardown is idempotent best-effort logger.debug("[teardown] detach %s from %s: %s", cert_id, thing_name, exc) # Detach all attached policies first try: for pol in iot.list_attached_policies(target=cert_arn).get("policies", []): iot.detach_policy(policyName=pol["policyName"], target=cert_arn) - except Exception as exc: + except Exception as exc: # noqa: BLE001 -- iot.exceptions.ClientError / BotoCoreError; teardown is idempotent best-effort logger.debug("[teardown] detach policies from %s: %s", cert_id, exc) try: iot.update_certificate(certificateId=cert_id, newStatus="INACTIVE") iot.delete_certificate(certificateId=cert_id, forceDelete=True) - except Exception as exc: + except Exception as exc: # noqa: BLE001 -- iot.exceptions.ClientError / BotoCoreError; teardown is idempotent best-effort logger.warning("[teardown] could not delete cert %s: %s", cert_id, exc) # Delete the Thing @@ -429,9 +589,13 @@ def teardown_thing(thing_name: str, *, region: str | None = None) -> None: except iot.exceptions.ResourceNotFoundException: pass - # Remove local cert files - for suffix in (".cert.pem", ".private.key", ".public.key"): - p = DEFAULT_CERT_DIR / f"{thing_name}{suffix}" + # Remove local cert files. Honour a custom ``cert_dir`` so we don't + # orphan certs provisioned with ``provision_robot(..., cert_dir=...)``. + # ``_create_cert`` only writes ``.cert.pem`` and ``.private.key`` -- a + # ``.public.key`` suffix was dead code and is intentionally dropped. + target_cert_dir = Path(cert_dir) if cert_dir else DEFAULT_CERT_DIR + for suffix in (".cert.pem", ".private.key"): + p = target_cert_dir / f"{thing_name}{suffix}" if p.exists(): try: p.unlink() @@ -552,18 +716,317 @@ def _create_cert(iot: Any, cert_path: Path, key_path: Path) -> tuple[str, str]: return cert_arn, cert_id +# Issue #261: one-WARN-per-process gate for unverified-origin CA re-use. +_UNVERIFIED_CA_WARNED: set[Path] = set() + + def _ensure_ca(ca_path: Path) -> None: - """Download the Amazon Root CA1 to *ca_path* if not already present.""" + """Ensure a verified copy of Amazon Root CA1 lives at *ca_path*. + + Behaviour: + + * If *ca_path* already exists, re-check its bytes against the pinned + SHA-256. A mismatch raises :class:`RuntimeError` and leaves the file + untouched -- the caller decides whether to delete and retry. + * Otherwise download the CA over HTTPS, cap the body at + :data:`_CA_FETCH_MAX_BYTES`, verify the pin, and write the result + with mode ``0o644``. + + Pinning defeats a network-level adversary (DNS hijack, captive portal, + BGP route attacks, malicious corporate proxy) that could substitute a + rogue CA at the canonical URL. + + Break glass: setting ``STRANDS_MESH_DISABLE_CA_PIN=true`` skips the pin + check. A WARNING is logged on every disabled run. This exists for + proxy environments that legitimately re-encode the certificate; it + should never be set in production. + """ if ca_path.exists() and ca_path.stat().st_size > 0: + # Existing-file branch: ALWAYS perform the raw hash compare, + # regardless of STRANDS_MESH_DISABLE_CA_PIN. The break-glass + # exists to allow re-encoding proxies on the *download* path + # (lines below) -- it must NOT silently re-use a rogue CA from + # a prior compromised provisioning run. Operators who need + # to refresh a re-encoded cert can delete the file and let + # the download path run with the override set. + # O_NOFOLLOW to prevent TOCTOU symlink-swap + # Issue #251: chunked-read loop (mirrors verify_ca_pin). Single + # ``os.read(fd, 10MB)`` returns *up to* the requested byte count + # so on interrupted syscalls / unusual filesystems the read can + # return short, in which case ``_hash_matches_pin(existing)`` + # hashes a partial file and rejects (fail-closed, OK) -- but + # the surrounding error message says "failed pin check" rather + # than the truthful "short read", which is hostile to the + # operator triaging the issue. The chunked loop drains the file + # or hits the 10 MiB cap, matching ``verify_ca_pin`` posture. + try: + nofollow = getattr(os, "O_NOFOLLOW", 0) + fd = os.open(ca_path, os.O_RDONLY | nofollow) + try: + chunks: list[bytes] = [] + remaining = 10 * 1024 * 1024 # 10 MiB bound + while remaining > 0: + buf = os.read(fd, min(65536, remaining)) + if not buf: + break + chunks.append(buf) + remaining -= len(buf) + existing = b"".join(chunks) + finally: + os.close(fd) + except OSError as exc: + raise RuntimeError(f"AmazonRootCA1 at {ca_path} unreadable or symlink: {exc}") from exc + if not _hash_matches_pin(existing): + logger.warning( + "[provision] existing CA at %s does NOT match pinned SHA-256. " + "Refusing to use it (STRANDS_MESH_DISABLE_CA_PIN does not " + "apply to the on-disk re-use path). Delete the file to " + "force re-download.", + ca_path, + ) + accepted = ", ".join(sorted(_resolve_ca_pins())) + raise RuntimeError(f"AmazonRootCA1 at {ca_path} failed pin check; accepted pins: {accepted}") + # Issue #261: WARN if this CA was originally downloaded under + # the STRANDS_MESH_DISABLE_CA_PIN break-glass. The pin check above + # passed (so the bytes match a known good pin), but the operator + # should be aware that an unverified-origin CA is being re-used + # in case they want to refresh it via the canonical (pinned) path. + # Emit one WARNING per process (gated on a module-level set). + marker = ca_path.with_suffix(ca_path.suffix + ".unverified") + if marker.exists() and ca_path not in _UNVERIFIED_CA_WARNED: + _UNVERIFIED_CA_WARNED.add(ca_path) + logger.warning( + "[provision] re-using CA at %s that was originally downloaded " + "with STRANDS_MESH_DISABLE_CA_PIN=true (sidecar marker %s " + "exists). The pin check on the on-disk bytes passed, but the " + "ORIGIN of those bytes was not pin-verified. Delete both files " + "and re-run without the break-glass to refresh via the canonical path.", + ca_path, + marker, + ) return - logger.info("[provision] downloading Amazon Root CA1 → %s", ca_path) - with urllib.request.urlopen(_AMAZON_ROOT_CA1_URL) as resp: - ca_path.write_bytes(resp.read()) + + logger.info("[provision] downloading Amazon Root CA1 -> %s (pinned)", ca_path) + # per-socket timeout via a custom HTTPSHandler. + # + # The previous implementation called ``socket.setdefaulttimeout(15.0)`` + # for the duration of the urlopen and restored it in ``finally``. + # That is process-global -- every other thread doing socket I/O + # during the CA download window observes the foreign 15s default + # (boto3, Zenoh keepalives, requests pools all assume None). The + # ``urllib.request.build_opener`` path here installs a one-shot + # ``HTTPSHandler`` whose ``https_open`` builds connections via + # ``socket.create_connection(timeout=...)`` so the per-recv deadline + # sticks to that one socket only. No process-global mutation. + body = _download_with_per_socket_timeout(_AMAZON_ROOT_CA1_URL, 15.0, _CA_FETCH_MAX_BYTES + 1) + if len(body) > _CA_FETCH_MAX_BYTES: + raise RuntimeError(f"AmazonRootCA1 download exceeded {_CA_FETCH_MAX_BYTES} bytes -- refusing") + + if not _verify_ca_bytes(body): + accepted = ", ".join(sorted(_resolve_ca_pins())) + raise RuntimeError( + "AmazonRootCA1 SHA-256 mismatch -- refusing to write rogue CA. " + f"Got {hashlib.sha256(body).hexdigest()}; accepted pins: {accepted}" + ) + + ca_path.write_bytes(body) try: os.chmod(ca_path, 0o644) except OSError: pass + # Issue #261: when the break-glass STRANDS_MESH_DISABLE_CA_PIN was + # active during this download, write a sidecar marker so future + # _ensure_ca invocations can WARN about re-using an unverified CA + # even when the env var is no longer set. + if os.getenv("STRANDS_MESH_DISABLE_CA_PIN", "").strip().lower() == "true": + marker = ca_path.with_suffix(ca_path.suffix + ".unverified") + try: + marker.write_text( + "# This CA was downloaded with STRANDS_MESH_DISABLE_CA_PIN=true.\n" + "# Future _ensure_ca calls on this host will WARN until this\n" + "# marker is removed (e.g. by deleting the CA + re-running with\n" + "# the pin enforced).\n" + ) + # Owner-only: marker is a local sentinel read only by this + # process via _ensure_ca; no other user needs read access. + # Tightens CodeQL py/overly-permissive-file-permission alert + # vs the prior 0o644 default. + os.chmod(marker, 0o600) + except OSError: + # Best-effort marker write: an unwritable cert_dir already + # surfaced via the preceding write_bytes/write_text path. + # Failing to chmod the marker should not abort provisioning; + # the WARN-on-reuse contract is degraded-but-honest. + logger.debug("[provision] CA-unverified marker chmod failed -- continuing", exc_info=True) + + +def _resolve_ca_pins() -> frozenset[str]: + """Return the full set of accepted Amazon Root CA1 SHA-256 pins. + + Combines the built-in :data:`_AMAZON_ROOT_CA1_PINS` tuple with any + operator-supplied pins from ``STRANDS_MESH_CA_PINS`` + (comma-separated, 64-char lowercase hex; invalid entries are + rejected with a WARNING and skipped). The env-var path lets a + fleet operator stage a new pin ahead of a code-level rotation without a flag-day deploy. The built-in tuple is always + included; the env var is additive only. + """ + pins = set(_AMAZON_ROOT_CA1_PINS) + raw = os.getenv("STRANDS_MESH_CA_PINS", "").strip() + if raw: + for entry in raw.split(","): + normalised = entry.strip().lower() + if not normalised: + continue + if not _PIN_RE.fullmatch(normalised): + logger.warning( + "[provision] STRANDS_MESH_CA_PINS entry %r is not a valid 64-char lowercase hex SHA-256; skipping.", + entry, + ) + continue + pins.add(normalised) + return frozenset(pins) + + +def _download_with_per_socket_timeout(url: str, timeout_s: float, max_bytes: int) -> bytes: + """Download *url* with a per-socket recv timeout -- no process-global mutation. + + ``socket.setdefaulttimeout`` is a process-global. While its + try/finally restore is correct, every other thread doing socket I/O + during the urlopen observes the foreign default. We install a one- + shot ``HTTPSHandler`` whose ``https_open`` constructs HTTPSConnection + instances with the timeout baked in, so the deadline is per-socket + and never visible to other code paths. + + Raises ``RuntimeError`` on socket timeout (slow-loris responder / + hostile proxy) with a message pointing at the break-glass env var. + """ + import http.client + import urllib.error + + class _TimedHTTPSHandler(urllib.request.HTTPSHandler): + """HTTPSHandler whose connection factory bakes in *timeout_s*. + + urllib.request's default HTTPSHandler builds an HTTPSConnection + without an explicit timeout -- only the urlopen(timeout=) value + is forwarded, and that argument only covers connect + TLS + handshake. A per-connection timeout on the HTTPSConnection + itself propagates to recv() / sendall() and bounds wall-clock + for the whole transaction. + """ + + def https_open(self, req: urllib.request.Request) -> Any: + return self.do_open(self._connection_factory, req) + + @staticmethod + def _connection_factory(host: str, **kwargs: Any) -> http.client.HTTPSConnection: + kwargs["timeout"] = timeout_s + return http.client.HTTPSConnection(host, **kwargs) + + opener = urllib.request.build_opener(_TimedHTTPSHandler()) + try: + with opener.open(url, timeout=timeout_s) as resp: # noqa: S310 -- HTTPS + pinned + return resp.read(max_bytes) + except (TimeoutError, urllib.error.URLError) as exc: + # urllib wraps socket.timeout in URLError on some Python versions; + # both surface as a RuntimeError pointing at the break-glass. + if isinstance(exc, urllib.error.URLError) and not isinstance(exc.reason, TimeoutError): + raise + raise RuntimeError( + "AmazonRootCA1 download timed out -- possible slow-loris " + "responder or hostile proxy. Set " + "STRANDS_MESH_DISABLE_CA_PIN=true and retry only after " + "confirming the network path is trustworthy." + ) from exc + + +def _hash_matches_pin(body: bytes) -> bool: + """Return True iff *body*'s SHA-256 matches any accepted pin. + + Consults the full pin set returned by :func:`_resolve_ca_pins` + (built-in :data:`_AMAZON_ROOT_CA1_PINS` plus any + ``STRANDS_MESH_CA_PINS`` entries). Pure check -- does not honour + ``STRANDS_MESH_DISABLE_CA_PIN`` (that's the contract that makes + :func:`verify_ca_pin` safe for ops scripts). + """ + digest = hashlib.sha256(body).hexdigest() + return digest in _resolve_ca_pins() + + +def _verify_ca_bytes(body: bytes) -> bool: + """Return True if *body* may be used as the Amazon Root CA1. + + This is the **provisioning-side** check used by :func:`_ensure_ca`. + It honours the ``STRANDS_MESH_DISABLE_CA_PIN`` break-glass env var: + when set, the function returns True for any input and logs a WARNING + so the override surfaces in routine audits. Operators set the + override only for proxy environments that legitimately re-encode the + cert; production deployments must leave it unset. + + Forensic / ops scripts that want ground truth should call + :func:`verify_ca_pin` instead -- that function never honours the + break-glass. + """ + if os.getenv("STRANDS_MESH_DISABLE_CA_PIN", "").strip().lower() == "true": + logger.warning("[provision] STRANDS_MESH_DISABLE_CA_PIN=true -- CA pin check skipped") + return True + return _hash_matches_pin(body) + + +def verify_ca_pin(ca_path: Path) -> bool: + """Public helper: does the CA file at *ca_path* match the pinned hash? + + This function NEVER honours the ``STRANDS_MESH_DISABLE_CA_PIN`` + break-glass -- its job is to tell the caller the truth about whether + the file on disk is the canonical Amazon Root CA1. If it returned + True under the break-glass an attacker who set the env var on a + compromised host would defeat exactly the forensic check operators + rely on. + + mirrors the ``O_NOFOLLOW`` discipline that + ``_ensure_ca`` uses on the on-disk re-use path. Without it, + an attacker who can race a symlink between the operator-supplied + ``ca_path`` and this read can redirect ``read_bytes()`` to a hash- + matching decoy, defeating exactly this verifier. The asymmetric + posture (``_ensure_ca`` defends, ``verify_ca_pin`` does not) was + the actual gap; this closes it. + + Returns False on any I/O error (missing file, permission denied, + symlinked path, etc.). The caller should treat False as "do not + trust this CA". + """ + import os + + try: + if ca_path.is_symlink(): + logger.warning( + "[provision] verify_ca_pin: refusing %s -- it is a SYMLINK " + "(target=%r). CA files must be regular files at the canonical path.", + ca_path, + os.readlink(ca_path), + ) + return False + flags = os.O_RDONLY + nofollow = getattr(os, "O_NOFOLLOW", 0) + fd = os.open(str(ca_path), flags | nofollow) + try: + # Bound the read at 1 MiB -- the AWS Root CA1 PEM is < 2 KiB; + # anything larger is a suspicious file we should not pin against. + chunks: list[bytes] = [] + remaining = 1 * 1024 * 1024 + while remaining > 0: + buf = os.read(fd, min(65536, remaining)) + if not buf: + break + chunks.append(buf) + remaining -= len(buf) + content = b"".join(chunks) + finally: + os.close(fd) + return _hash_matches_pin(content) + except OSError: + return False + def _discover_endpoint(iot: Any) -> str: """Return the iot:Data-ATS endpoint for this region+account.""" diff --git a/strands_robots/mesh/iot/shadow.py b/strands_robots/mesh/iot/shadow.py index 71e05b89..b8fb2428 100644 --- a/strands_robots/mesh/iot/shadow.py +++ b/strands_robots/mesh/iot/shadow.py @@ -19,7 +19,7 @@ :class:`ShadowMirror` exposes :meth:`update` (call from anywhere) and a ready-to-wire convenience :func:`enable_for_mesh` that binds it to the heartbeat path automatically. Today the heartbeat path is -:meth:`Mesh._heartbeat_loop` which calls ``put(strands/{peer}/presence, ...)`` +:meth:`Mesh._heartbeat_loop` which calls ``put(strands/{peer}/presence,...)`` — we hook in by registering a publish-side observer that mirrors any presence write to the shadow update topic. @@ -63,7 +63,7 @@ class ShadowMirror: mirror.update(current_transport(), {"connected": True, "robot_type": "so100"}) The ``update`` call wraps your dict in the canonical - ``{"state": {"reported": ...}}`` envelope and publishes via the active + ``{"state": {"reported":...}}`` envelope and publishes via the active transport's ``put()``. No retain — shadows are stored server-side. """ diff --git a/tests/mesh/test_camera_acl.py b/tests/mesh/test_camera_acl.py new file mode 100644 index 00000000..ec214d86 --- /dev/null +++ b/tests/mesh/test_camera_acl.py @@ -0,0 +1,62 @@ +"""Camera-frame access-control tests. + +Covers :class:`CameraOffloader` presigned-URL TTL semantics: + +* The short default TTL of 60 seconds, as a posture pin. +* The 1-hour ceiling that clamps over-eager operator overrides. +* The None-vs-explicit-0 distinction (env fallback only when None; + explicit 0 is treated as an operator value and clamped to 1). + +The privacy kill-switch (``STRANDS_MESH_CAMERA_DISABLED``) and the S3 +PutObject ACL hardening were dropped from PR #228 R2 because the +intended publish-side gate was never landed in production code; the +prior tests passed for incidental reasons (short-circuiting at the +inner-None guard rather than any kill-switch guard) and gave false +reassurance. Both items are tracked in the deferred follow-up issue +#249 and will land with their own pin tests there. +""" + +from __future__ import annotations + +from strands_robots.mesh.iot.camera_offload import ( + DEFAULT_PRESIGN_TTL_SECONDS, + MAX_PRESIGN_TTL_SECONDS, + CameraOffloader, +) + + +class TestPresignTTL: + def test_default_is_60s(self, monkeypatch): + monkeypatch.delenv("STRANDS_MESH_CAMERA_PRESIGN_TTL", raising=False) + off = CameraOffloader(bucket="test-bucket") + assert off.presign_ttl == 60 + # Pin the constant so a future regression that bumps it back to 3600 + # fails this test loudly. + assert DEFAULT_PRESIGN_TTL_SECONDS == 60 + + def test_env_override_within_cap_passes_through(self, monkeypatch): + monkeypatch.setenv("STRANDS_MESH_CAMERA_PRESIGN_TTL", "120") + off = CameraOffloader(bucket="test-bucket") + assert off.presign_ttl == 120 + + def test_env_override_above_cap_clamps(self, monkeypatch): + monkeypatch.setenv("STRANDS_MESH_CAMERA_PRESIGN_TTL", "86400") # 1 day + off = CameraOffloader(bucket="test-bucket") + assert off.presign_ttl == MAX_PRESIGN_TTL_SECONDS # clamped + + def test_kwarg_override_above_cap_clamps(self, monkeypatch): + monkeypatch.delenv("STRANDS_MESH_CAMERA_PRESIGN_TTL", raising=False) + off = CameraOffloader(bucket="test-bucket", presign_ttl=999_999) + assert off.presign_ttl == MAX_PRESIGN_TTL_SECONDS + + def test_zero_or_negative_clamped_up(self, monkeypatch): + monkeypatch.delenv("STRANDS_MESH_CAMERA_PRESIGN_TTL", raising=False) + off = CameraOffloader(bucket="test-bucket", presign_ttl=0) + # presign_ttl=0 is explicitly passed (not None) -> clamped to floor of 1 + assert off.presign_ttl == 1 + + def test_negative_clamped_up(self, monkeypatch): + monkeypatch.delenv("STRANDS_MESH_CAMERA_PRESIGN_TTL", raising=False) + off = CameraOffloader(bucket="test-bucket", presign_ttl=-5) + # Negative values are clamped to 1 + assert off.presign_ttl == 1 diff --git a/tests/mesh/test_camera_schema.py b/tests/mesh/test_camera_schema.py index a7a30b10..4a13610a 100644 --- a/tests/mesh/test_camera_schema.py +++ b/tests/mesh/test_camera_schema.py @@ -83,7 +83,11 @@ def test_resolve_camera_hz_invalid_disables(fake_robot_with_camera, monkeypatch) def test_publish_cameras_once_calls_put(fake_robot_with_camera): - """One frame is read per camera and forwarded via mesh_session.put.""" + """One frame is read per camera and forwarded via mesh_session.put. + + Note: outgoing camera frames are wrapped in a signed envelope; we + unwrap them here so the rest of the assertions stay readable. + """ from strands_robots.mesh import Mesh m = Mesh(fake_robot_with_camera, peer_id="test-cam-6") diff --git a/tests/mesh/test_iot_bootstrap.py b/tests/mesh/test_iot_bootstrap.py index f229608b..88a269c5 100644 --- a/tests/mesh/test_iot_bootstrap.py +++ b/tests/mesh/test_iot_bootstrap.py @@ -244,3 +244,293 @@ def test_lambda_zip_size_reasonable(self): zb = _build_lambda_zip() # Lambda source is ~2 KB; zipped should be well under 10 KB. assert 500 < len(zb) < 10_000 + + +# === Coverage gaps: create-paths for _ensure_* helpers === + + +@pytest.fixture(autouse=False) +def _no_sleep(monkeypatch): + """IAM role propagation has a `time.sleep(8)`; mock it in tests.""" + monkeypatch.setattr("time.sleep", lambda *a, **kw: None) + + +class TestEnsureLambdaRoleCreate: + """Cover the `_ensure_lambda_role` create-path (skipped covered above).""" + + def test_creates_with_correct_trust_and_policies(self, _no_sleep): + from strands_robots.mesh.iot.bootstrap import ( + ESTOP_LAMBDA_ROLE, + BootstrappedAccount, + _ensure_lambda_role, + ) + + class _NotFound(Exception): + pass + + iam = MagicMock() + iam.exceptions = MagicMock() + iam.exceptions.NoSuchEntityException = _NotFound + iam.get_role.side_effect = _NotFound("no role") + iam.create_role.return_value = {"Role": {"Arn": "arn:iam:role/created"}} + + a = BootstrappedAccount(region="us-west-2", account_id="123") + arn = _ensure_lambda_role(iam, a) + + assert arn == "arn:iam:role/created" + # Trust policy must allow lambda.amazonaws.com to AssumeRole + create_kwargs = iam.create_role.call_args.kwargs + import json as _json + + trust = _json.loads(create_kwargs["AssumeRolePolicyDocument"]) + assert trust["Statement"][0]["Principal"] == {"Service": "lambda.amazonaws.com"} + # AWS basic execution + our publish policy attached + iam.attach_role_policy.assert_called_once() + iam.put_role_policy.assert_called_once() + publish_kwargs = iam.put_role_policy.call_args.kwargs + assert publish_kwargs["PolicyName"] == "strands-mesh-iot-publish" + # Inline policy must scope iot:Publish to strands/* topics + policy = _json.loads(publish_kwargs["PolicyDocument"]) + publish_stmt = next(s for s in policy["Statement"] if "iot:Publish" in s["Action"]) + assert any("strands/*" in r for r in publish_stmt["Resource"]) + assert f"iam:{ESTOP_LAMBDA_ROLE}" in a.created + + +class TestEnsureEstopLambdaCreate: + def test_creates_when_missing(self, _no_sleep): + from strands_robots.mesh.iot.bootstrap import ( + ESTOP_LAMBDA_NAME, + BootstrappedAccount, + _ensure_estop_lambda, + ) + + class _NotFound(Exception): + pass + + lam = MagicMock() + lam.exceptions = MagicMock() + lam.exceptions.ResourceNotFoundException = _NotFound + lam.get_function.side_effect = _NotFound() + lam.create_function.return_value = {"FunctionArn": "arn:lambda:created"} + + a = BootstrappedAccount(region="us-west-2", account_id="123") + arn = _ensure_estop_lambda(lam, "arn:role", a) + + assert arn == "arn:lambda:created" + kw = lam.create_function.call_args.kwargs + # Sanity: handler, runtime, version-tagged description + assert kw["Handler"] == "lambda_function.lambda_handler" + assert kw["Runtime"] == "python3.12" + assert "[v" in kw["Description"], "description must carry version tag" + # Source must be the zipped lambda we built + assert kw["Code"]["ZipFile"][:2] == b"PK" + assert f"lambda:{ESTOP_LAMBDA_NAME}" in a.created + + def test_force_update_replaces_stale_version(self, _no_sleep): + from strands_robots.mesh.iot.bootstrap import ( + BootstrappedAccount, + _ensure_estop_lambda, + ) + + lam = MagicMock() + # Simulate an existing function with a stale version description + lam.get_function.return_value = { + "Configuration": { + "Description": "strands-mesh: defence-in-depth E-stop fan-out [v0]", + "FunctionArn": "arn:lambda:existing", + } + } + a = BootstrappedAccount(region="us-west-2", account_id="123") + arn = _ensure_estop_lambda(lam, "arn:role", a, force_update=True) + + assert arn == "arn:lambda:existing" + lam.update_function_code.assert_called_once() + lam.update_function_configuration.assert_called_once() + assert any("(updated)" in entry for entry in a.created) + + def test_warns_on_stale_version_without_force_update(self, _no_sleep, caplog): + import logging + + from strands_robots.mesh.iot.bootstrap import ( + ESTOP_LAMBDA_NAME, + BootstrappedAccount, + _ensure_estop_lambda, + ) + + lam = MagicMock() + lam.get_function.return_value = { + "Configuration": { + "Description": "strands-mesh: defence-in-depth E-stop fan-out [v0]", + "FunctionArn": "arn:lambda:existing", + } + } + a = BootstrappedAccount(region="us-west-2", account_id="123") + with caplog.at_level(logging.WARNING, logger="strands_robots.mesh.iot.bootstrap"): + arn = _ensure_estop_lambda(lam, "arn:role", a, force_update=False) + + assert arn == "arn:lambda:existing" + # No update call -- only WARNING + lam.update_function_code.assert_not_called() + assert any("stale version" in m for m in caplog.messages) + assert f"lambda:{ESTOP_LAMBDA_NAME}" in a.skipped + + +class TestEnsureSafetyToDynamoDbRuleCreate: + """The create-path: builds the IoT Rule action when the rule is missing.""" + + def test_creates_rule_with_correct_sql_and_action(self): + from strands_robots.mesh.iot.bootstrap import ( + BootstrappedAccount, + _ensure_safety_to_dynamodb_rule, + ) + + class _NotFound(Exception): + pass + + iot = MagicMock() + iot.exceptions = MagicMock() + iot.exceptions.ResourceNotFoundException = _NotFound + iot.exceptions.UnauthorizedException = type("UE", (Exception,), {}) + iot.get_topic_rule.side_effect = _NotFound() + iot.create_topic_rule.return_value = None + # After creation, get_topic_rule is called again to retrieve the ARN + iot.list_topic_rules.return_value = { + "rules": [{"ruleName": "strands_safety_to_dynamodb", "ruleArn": "arn:rule:safety"}] + } + + a = BootstrappedAccount(region="us-west-2", account_id="123") + with patch( + "strands_robots.mesh.iot.bootstrap._ensure_iot_action_role", + return_value="arn:iam:action-role", + ): + arn = _ensure_safety_to_dynamodb_rule(iot, "arn:t:safety", a) + + assert arn # non-empty + iot.create_topic_rule.assert_called_once() + kw = iot.create_topic_rule.call_args.kwargs + # SQL select on safety/event topic + sql = kw["topicRulePayload"]["sql"] + assert "safety/event" in sql + # DynamoDBv2 action wired + actions = kw["topicRulePayload"]["actions"] + assert any("dynamoDBv2" in a or "dynamoDBv2" in str(a) for a in actions) + + +class TestEnsureIotActionRoleCreate: + """Tests the `_ensure_iot_action_role` create-path (require_boto3 wrapper).""" + + def test_creates_role_with_dynamodb_putitem_policy(self, _no_sleep, monkeypatch): + from strands_robots.mesh.iot.bootstrap import ( + BootstrappedAccount, + _ensure_iot_action_role, + ) + + class _NotFound(Exception): + pass + + iam = MagicMock() + iam.exceptions = MagicMock() + iam.exceptions.NoSuchEntityException = _NotFound + iam.get_role.side_effect = _NotFound() + iam.create_role.return_value = {"Role": {"Arn": "arn:iam:action"}} + + boto3_mock = MagicMock() + boto3_mock.client.return_value = iam + monkeypatch.setattr("strands_robots.mesh.iot.bootstrap._require_boto3", lambda: boto3_mock) + + a = BootstrappedAccount(region="us-west-2", account_id="123") + arn = _ensure_iot_action_role(a) + + assert arn == "arn:iam:action" + # Trust must allow iot.amazonaws.com + import json as _json + + trust = _json.loads(iam.create_role.call_args.kwargs["AssumeRolePolicyDocument"]) + assert trust["Statement"][0]["Principal"] == {"Service": "iot.amazonaws.com"} + # Inline policy must scope DynamoDB:PutItem to the safety table + inline = _json.loads(iam.put_role_policy.call_args.kwargs["PolicyDocument"]) + stmt = inline["Statement"][0] + assert stmt["Action"] == ["dynamodb:PutItem"] + assert "strands-mesh-safety-events" in stmt["Resource"] + assert any("iam:strands-mesh-iot-action-role" in entry for entry in a.created) + + +class TestEnsureProvisioningTemplateCreate: + """Cover the provisioning-template create-path -- one of the largest + untested code blocks in this module.""" + + def test_creates_template_with_thing_resource(self, _no_sleep): + from strands_robots.mesh.iot.bootstrap import ( + PROVISIONING_TEMPLATE, + BootstrappedAccount, + _ensure_provisioning_template, + ) + + class _NotFound(Exception): + pass + + iot = MagicMock() + iot.exceptions = MagicMock() + iot.exceptions.ResourceNotFoundException = _NotFound + iot.describe_provisioning_template.side_effect = _NotFound() + iot.create_provisioning_template.return_value = {"templateArn": "arn:iot:template:provisioning"} + + a = BootstrappedAccount(region="us-west-2", account_id="123") + with patch( + "strands_robots.mesh.iot.bootstrap._ensure_provisioning_role", + return_value="arn:iam:provisioning", + ): + arn = _ensure_provisioning_template(iot, a) + + assert arn == "arn:aws:iot:us-west-2:123:provisioningtemplate/strands-mesh-fleet-provisioning" + kw = iot.create_provisioning_template.call_args.kwargs + assert kw["templateName"] == PROVISIONING_TEMPLATE + assert kw["enabled"] is True + # Body must reference AWS::IoT::Thing + body_str = kw["templateBody"] + assert "AWS::IoT::Thing" in body_str + assert "AWS::IoT::Certificate" in body_str + assert "AWS::IoT::Policy" in body_str + assert f"iot-prov-template:{PROVISIONING_TEMPLATE}" in a.created + + def test_skips_when_template_present(self): + from strands_robots.mesh.iot.bootstrap import ( + PROVISIONING_TEMPLATE, + BootstrappedAccount, + _ensure_provisioning_template, + ) + + iot = MagicMock() + iot.describe_provisioning_template.return_value = {"templateArn": "arn:iot:template:existing"} + a = BootstrappedAccount(region="us-west-2", account_id="123") + arn = _ensure_provisioning_template(iot, a) + assert arn == "arn:aws:iot:us-west-2:123:provisioningtemplate/strands-mesh-fleet-provisioning" + iot.create_provisioning_template.assert_not_called() + assert f"iot-prov-template:{PROVISIONING_TEMPLATE}" in a.skipped + + +class TestEnsureProvisioningRoleCreate: + def test_creates_role_with_provisioning_policy(self, _no_sleep, monkeypatch): + from strands_robots.mesh.iot.bootstrap import ( + BootstrappedAccount, + _ensure_provisioning_role, + ) + + class _NotFound(Exception): + pass + + iam = MagicMock() + iam.exceptions = MagicMock() + iam.exceptions.NoSuchEntityException = _NotFound + iam.get_role.side_effect = _NotFound() + iam.create_role.return_value = {"Role": {"Arn": "arn:iam:provisioning"}} + + boto3_mock = MagicMock() + boto3_mock.client.return_value = iam + monkeypatch.setattr("strands_robots.mesh.iot.bootstrap._require_boto3", lambda: boto3_mock) + + a = BootstrappedAccount(region="us-west-2", account_id="123") + arn = _ensure_provisioning_role(a) + assert arn == "arn:iam:provisioning" + # Managed policy attachment for fleet provisioning + iam.attach_role_policy.assert_called() diff --git a/tests/mesh/test_iot_ca_pin.py b/tests/mesh/test_iot_ca_pin.py new file mode 100644 index 00000000..8cd0d4df --- /dev/null +++ b/tests/mesh/test_iot_ca_pin.py @@ -0,0 +1,245 @@ +"""Tests for Amazon Root CA1 pin verification + size-cap fetch. + +The provisioner downloads ``AmazonRootCA1.pem`` over HTTPS and pins its +SHA-256 fingerprint before writing the file to disk. These tests exercise: + +* :func:`_verify_ca_bytes` accepts the canonical bytes and rejects any + one-byte modification. +* :func:`_ensure_ca` raises on a rogue download, on a tampered on-disk + copy, and on responses larger than :data:`_CA_FETCH_MAX_BYTES`. +* The :envvar:`STRANDS_MESH_DISABLE_CA_PIN` break-glass env var bypasses + the check (with a WARNING log) for proxy environments that legitimately + re-encode the cert. +""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest + +from strands_robots.mesh.iot import provision + +# Known-good copy of AmazonRootCA1.pem. If Amazon rotates this root the value +# below + provision._AMAZON_ROOT_CA1_PINS must both update together. +_REAL_CA = b"""-----BEGIN CERTIFICATE----- +MIIDQTCCAimgAwIBAgITBmyfz5m/jAo54vB4ikPmljZbyjANBgkqhkiG9w0BAQsF +ADA5MQswCQYDVQQGEwJVUzEPMA0GA1UEChMGQW1hem9uMRkwFwYDVQQDExBBbWF6 +b24gUm9vdCBDQSAxMB4XDTE1MDUyNjAwMDAwMFoXDTM4MDExNzAwMDAwMFowOTEL +MAkGA1UEBhMCVVMxDzANBgNVBAoTBkFtYXpvbjEZMBcGA1UEAxMQQW1hem9uIFJv +b3QgQ0EgMTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALJ4gHHKeNXj +ca9HgFB0fW7Y14h29Jlo91ghYPl0hAEvrAIthtOgQ3pOsqTQNroBvo3bSMgHFzZM +9O6II8c+6zf1tRn4SWiw3te5djgdYZ6k/oI2peVKVuRF4fn9tBb6dNqcmzU5L/qw +IFAGbHrQgLKm+a/sRxmPUDgH3KKHOVj4utWp+UhnMJbulHheb4mjUcAwhmahRWa6 +VOujw5H5SNz/0egwLX0tdHA114gk957EWW67c4cX8jJGKLhD+rcdqsq08p8kDi1L +93FcXmn/6pUCyziKrlA4b9v7LWIbxcceVOF34GfID5yHI9Y/QCB/IIDEgEw+OyQm +jgSubJrIqg0CAwEAAaNCMEAwDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8EBAMC +AYYwHQYDVR0OBBYEFIQYzIU07LwMlJQuCFmcx7IQTgoIMA0GCSqGSIb3DQEBCwUA +A4IBAQCY8jdaQZChGsV2USggNiMOruYou6r4lK5IpDB/G/wkjUu0yKGX9rbxenDI +U5PMCCjjmCXPI6T53iHTfIUJrU6adTrCC2qJeHZERxhlbI1Bjjt/msv0tadQ1wUs +N+gDS63pYaACbvXy8MWy7Vu33PqUXHeeE6V/Uq2V8viTO96LXFvKWlJbYK8U90vv +o/ufQJVtMVT8QtPHRh8jrdkPSHCa2XV4cdFyQzR1bldZwgJcJmApzyMZFo6IQ6XU +5MsI+yMRQ+hDKXJioaldXgjUkK642M4UwtBV8ob2xJNDd2ZhwLnoQdeXeGADbkpy +rqXRfboQnoZsG4q5WTP468SQvvG5 +-----END CERTIFICATE----- +""" + + +class TestPinVerification: + def test_real_bytes_match(self): + # Real bytes must match the constant in provision (else the constant + # is wrong and every deployment will fail closed -- which is good). + assert provision._verify_ca_bytes(_REAL_CA) is True + + def test_modified_bytes_rejected(self): + # Flip one byte -> must fail. + tampered = bytearray(_REAL_CA) + tampered[100] ^= 0x01 + assert provision._verify_ca_bytes(bytes(tampered)) is False + + def test_empty_bytes_rejected(self): + assert provision._verify_ca_bytes(b"") is False + + def test_breakglass_override_bypasses(self, monkeypatch): + monkeypatch.setenv("STRANDS_MESH_DISABLE_CA_PIN", "true") + assert provision._verify_ca_bytes(b"any garbage") is True + + def test_verify_ca_pin_path_helper(self, tmp_path): + good = tmp_path / "ca.pem" + good.write_bytes(_REAL_CA) + assert provision.verify_ca_pin(good) is True + + bad = tmp_path / "rogue.pem" + bad.write_bytes(b"-----FAKE CA-----") + assert provision.verify_ca_pin(bad) is False + + def test_verify_missing_file_returns_false(self, tmp_path): + assert provision.verify_ca_pin(tmp_path / "nope.pem") is False + + +class TestEnsureCA: + def test_existing_clean_file_skipped(self, tmp_path): + ca_path = tmp_path / "ca.pem" + ca_path.write_bytes(_REAL_CA) + # Should NOT make a network call + with patch("strands_robots.mesh.iot.provision.urllib.request.urlopen") as mock_url: + provision._ensure_ca(ca_path) + mock_url.assert_not_called() + + def test_existing_tampered_file_raises(self, tmp_path): + ca_path = tmp_path / "ca.pem" + ca_path.write_bytes(b"rogue cert content") + with patch("strands_robots.mesh.iot.provision.urllib.request.urlopen") as mock_url: + with pytest.raises(RuntimeError, match="failed pin check"): + provision._ensure_ca(ca_path) + mock_url.assert_not_called() + + def test_download_writes_when_pin_matches(self, tmp_path): + ca_path = tmp_path / "ca.pem" + mock_resp = MagicMock() + mock_resp.read.return_value = _REAL_CA + mock_resp.__enter__ = lambda self: self + mock_resp.__exit__ = lambda self, *a: None + with patch("strands_robots.mesh.iot.provision.urllib.request.urlopen", return_value=mock_resp): + provision._ensure_ca(ca_path) + assert ca_path.read_bytes() == _REAL_CA + + def test_download_rogue_cert_rejected(self, tmp_path): + # the download path is _download_with_per_socket_timeout, + # which builds its own opener (no setdefaulttimeout). Patch the + # helper directly so the test stays focused on the pin-mismatch + # rejection rather than urllib internals. + ca_path = tmp_path / "ca.pem" + with patch( + "strands_robots.mesh.iot.provision._download_with_per_socket_timeout", + return_value=b"-----BEGIN ROGUE CERTIFICATE-----\n", + ): + with pytest.raises(RuntimeError, match="SHA-256 mismatch"): + provision._ensure_ca(ca_path) + assert not ca_path.exists(), "rogue cert must NOT be written to disk" + + def test_download_oversized_rejected(self, tmp_path): + # patch _download_with_per_socket_timeout directly. The + # body-size cap is enforced after the download returns. + ca_path = tmp_path / "ca.pem" + big = b"X" * (provision._CA_FETCH_MAX_BYTES + 100) + with patch( + "strands_robots.mesh.iot.provision._download_with_per_socket_timeout", + return_value=big, + ): + with pytest.raises(RuntimeError, match="exceeded"): + provision._ensure_ca(ca_path) + assert not ca_path.exists() + + +class TestVerifyCaPinSymlink: + """verify_ca_pin must not follow symlinks. + + Symlink-following on the on-disk CA path is a TOCTOU gap: an attacker + could race a symlink into place after _ensure_ca downloads but before + verify_ca_pin reads. Refusing O_NOFOLLOW at the verify layer closes + the window. + """ + + def test_symlinked_ca_path_returns_false(self, tmp_path): + from strands_robots.mesh.iot.provision import verify_ca_pin + + target = tmp_path / "real_ca.pem" + target.write_bytes(b"-----BEGIN CERTIFICATE-----\nfake\n-----END CERTIFICATE-----\n") + symlink = tmp_path / "ca_link.pem" + symlink.symlink_to(target) + + # verify_ca_pin must refuse to read through the symlink + assert verify_ca_pin(symlink) is False + + +class TestMultiPinRotation: + """Pin-tuple rotation regression: a follow-up cannot collapse the + tuple back to a string without breaking this test. + + AGENTS.md > Review Learnings (#85) > "Pin regression tests for + reviewed fixes" -- the move from a single ``str`` to a + ``tuple[str, ...]`` exists *so that* a CA rotation can ship as a + code-only deploy that adds the new pin alongside the old one. + Without an explicit test that exercises the multi-entry path, every + existing test still passes when someone "simplifies" the tuple back + to a string -- and the rotation contract silently breaks. + """ + + def test_tuple_supports_multiple_pins(self, monkeypatch): + import hashlib + + # Synthesize a second pin pointing at a fictional rotated CA. + future_bytes = b"future-rotated-ca" + future_pin = hashlib.sha256(future_bytes).hexdigest() + + # Append the new pin to the live tuple via monkeypatch to mirror + # the rotation path: existing pin still accepted, new pin also + # accepted. _hash_matches_pin reads the module-level constant + # via _resolve_ca_pins, so the patch is visible. + monkeypatch.setattr( + provision, + "_AMAZON_ROOT_CA1_PINS", + provision._AMAZON_ROOT_CA1_PINS + (future_pin,), + ) + + # Original canonical CA bytes still pass. + assert provision._hash_matches_pin(_REAL_CA) is True + # New rotated CA bytes also pass. + assert provision._hash_matches_pin(future_bytes) is True + # Something that matches neither pin is still rejected. + assert provision._hash_matches_pin(b"unrelated bytes") is False + + +class TestUnverifiedMarkerPermissions: + """The CA-unverified sidecar marker must be owner-only (mode 0o600). + + AGENTS.md > Review Learnings (#85) > "Pin regression tests for + reviewed fixes" + CodeQL py/overly-permissive-file-permission + (alert #273): the marker is a local sentinel read only by this + process via ``_ensure_ca`` to WARN about re-using a CA downloaded + under the ``STRANDS_MESH_DISABLE_CA_PIN=true`` break-glass. No + other user needs read access; world-readable mode (0o644) was + flagged by CodeQL on the previous round and tightened to 0o600 in + this commit. + """ + + def test_marker_written_owner_only_when_breakglass_active(self, tmp_path, monkeypatch): + import stat + + ca_path = tmp_path / "ca.pem" + monkeypatch.setenv("STRANDS_MESH_DISABLE_CA_PIN", "true") + with patch( + "strands_robots.mesh.iot.provision._download_with_per_socket_timeout", + return_value=b"any-bytes-the-pin-bypass-accepts", + ): + provision._ensure_ca(ca_path) + + marker = ca_path.with_suffix(ca_path.suffix + ".unverified") + assert marker.exists(), ( + "_ensure_ca must write the .unverified marker when the break-glass is active so subsequent runs can WARN." + ) + + mode = stat.S_IMODE(marker.stat().st_mode) + assert mode == 0o600, ( + f"marker mode is 0o{mode:o}; must be 0o600 (owner-only). " + "World-readable permissions on the unverified-CA sentinel " + "were flagged by CodeQL py/overly-permissive-file-permission." + ) + + def test_marker_not_written_when_breakglass_inactive(self, tmp_path, monkeypatch): + ca_path = tmp_path / "ca.pem" + # Break-glass NOT set -- marker must not appear. + monkeypatch.delenv("STRANDS_MESH_DISABLE_CA_PIN", raising=False) + with patch( + "strands_robots.mesh.iot.provision._download_with_per_socket_timeout", + return_value=_REAL_CA, + ): + provision._ensure_ca(ca_path) + + marker = ca_path.with_suffix(ca_path.suffix + ".unverified") + assert not marker.exists(), ( + "marker must only appear when the break-glass was active " + "during the download; the canonical-CA path must not leak " + "the sentinel." + ) diff --git a/tests/mesh/test_iot_camera_offload.py b/tests/mesh/test_iot_camera_offload.py index 3cc579a2..10a5422a 100644 --- a/tests/mesh/test_iot_camera_offload.py +++ b/tests/mesh/test_iot_camera_offload.py @@ -7,7 +7,17 @@ from unittest.mock import MagicMock, patch -from strands_robots.mesh.iot.camera_offload import ( +import pytest + +# numpy is only used by three frame-encoding tests below. Use +# pytest.importorskip so this module is collected (and the bulk +# of the suite still runs) on environments that ship without numpy. +# AGENTS.md > Review Learnings (#85): test import paths must match +# production -- the camera_offload production code does not require +# numpy at module-import time, so neither should this file. +np = pytest.importorskip("numpy") + +from strands_robots.mesh.iot.camera_offload import ( # noqa: E402 # importorskip must precede CameraOffloader, enable_for_mesh, ) @@ -37,9 +47,11 @@ def test_prefix_strips_slashes(self): assert c.prefix == "foo/bar" def test_default_presign_ttl(self, monkeypatch): + # Default presigned-URL TTL is 60 s, deliberately short to limit + # the window during which a leaked /ref message can be replayed. monkeypatch.delenv("STRANDS_MESH_CAMERA_PRESIGN_TTL", raising=False) c = CameraOffloader(bucket="b") - assert c.presign_ttl == 3600 + assert c.presign_ttl == 60 def test_env_presign_ttl(self, monkeypatch): monkeypatch.setenv("STRANDS_MESH_CAMERA_PRESIGN_TTL", "60") @@ -77,7 +89,7 @@ def test_uploads_and_returns_ref(self, monkeypatch): assert ref["t"] == 12345.6 assert ref["s3_uri"].startswith("s3://frames/so100-01/wrist/") assert ref["presigned_url"] == "https://signed.example/" - assert ref["expires_at"] == 12345.6 + 3600 + assert ref["expires_at"] == 12345.6 + 60 # default TTL = 60 s # Verify the put_object call shape. s3.put_object.assert_called_once() @@ -242,3 +254,320 @@ def test_wrapper_no_op_when_no_cameras_in_config(self, monkeypatch): enable_for_mesh(mesh) mesh._publish_cameras_once() # must not raise + + +# === Coverage-gap tests for upload_frame error paths and edge cases === + + +class TestCameraOffloaderTTLBounds: + """The presign TTL is clamped to ``[1, MAX_PRESIGN_TTL_SECONDS=3600]``. + Below the floor or above the cap, we clamp loudly (or silently for the + floor since 0/negative is operator-supplied bad input). + """ + + def test_ttl_above_cap_is_clamped_to_3600(self, monkeypatch, caplog): + import logging + + monkeypatch.setenv("STRANDS_MESH_CAMERA_PRESIGN_TTL", "999999") + with caplog.at_level(logging.WARNING, logger="strands_robots.mesh.iot.camera_offload"): + c = CameraOffloader(bucket="b") + assert c.presign_ttl == 3600 + assert any("clamping" in m for m in caplog.messages), f"expected WARNING about clamping; got {caplog.messages}" + + def test_ttl_zero_clamped_to_one(self): + c = CameraOffloader(bucket="b", presign_ttl=0) + # ttl=0 means "always-expired URL" which is useless; clamp to 1. + # presign_ttl=0 is explicit (not None), so the env-var fallback is + # skipped and the < 1 floor clamps it to exactly 1. See + # tests/mesh/test_presign_ttl_none_vs_zero.py for the full matrix. + assert c.presign_ttl == 1 + + def test_ttl_negative_env_clamped(self, monkeypatch): + monkeypatch.setenv("STRANDS_MESH_CAMERA_PRESIGN_TTL", "-99") + c = CameraOffloader(bucket="b") + assert c.presign_ttl == 1 + + +class TestCameraOffloaderClientLazy: + """`_client()` is lazy and gracefully degrades when boto3 is missing.""" + + def test_boto3_missing_returns_none(self, monkeypatch): + c = CameraOffloader(bucket="b") + # Force ImportError inside _client by removing boto3 from sys.modules + # and blocking its import. + import builtins + import sys + + original_import = builtins.__import__ + + def blocked(name, *a, **kw): + if name == "boto3": + raise ImportError("simulated boto3 missing") + return original_import(name, *a, **kw) + + monkeypatch.setattr(builtins, "__import__", blocked) + sys.modules.pop("boto3", None) + assert c._client() is None + # Subsequent calls also return None (cached miss is acceptable; we + # just check the public contract: never raises, always returns None + # when boto3 absent). + assert c._client() is None + + def test_client_is_cached(self): + c = CameraOffloader(bucket="b", region="us-west-2") + with patch("boto3.client") as boto_client: + mock_s3 = MagicMock() + boto_client.return_value = mock_s3 + assert c._client() is mock_s3 + # Second call must not invoke boto3.client again. + assert c._client() is mock_s3 + assert boto_client.call_count == 1 + + +class TestUploadFrameErrorPaths: + """`upload_frame` returns None on every error condition; never raises.""" + + def test_no_bucket_returns_none(self, monkeypatch): + monkeypatch.delenv("STRANDS_MESH_CAMERA_S3_BUCKET", raising=False) + c = CameraOffloader() + assert c.upload_frame("r1", "front", b"jpeg-bytes", 1234.5) is None + + def test_put_object_error_returns_none(self): + c = CameraOffloader(bucket="b") + c._s3 = MagicMock() + c._s3.put_object.side_effect = RuntimeError("simulated ClientError") + assert c.upload_frame("r1", "front", b"jpeg", 1.0) is None + + def test_presign_failure_returns_ref_with_null_url(self): + c = CameraOffloader(bucket="b") + c._s3 = MagicMock() + c._s3.put_object.return_value = {} + c._s3.generate_presigned_url.side_effect = RuntimeError("simulated NoCredentialsError") + ref = c.upload_frame("r1", "front", b"jpeg", 1.0) + assert ref is not None + # Upload succeeded, but presign failed -> ref carries a null URL + assert ref["s3_uri"] == "s3://b/r1/front/1000000000.jpg" + assert ref["presigned_url"] is None + assert ref["expires_at"] == 1.0 + c.presign_ttl + + def test_happy_path_returns_full_ref(self): + c = CameraOffloader(bucket="b", prefix="frames", presign_ttl=120) + c._s3 = MagicMock() + c._s3.generate_presigned_url.return_value = "https://example.com/signed" + ref = c.upload_frame("r1", "front", b"jpeg", 100.0) + assert ref == { + "peer_id": "r1", + "cam": "front", + "t": 100.0, + "encoding": "jpeg", + "s3_uri": "s3://b/frames/r1/front/100000000000.jpg", + "presigned_url": "https://example.com/signed", + "expires_at": 100.0 + 120, + } + c._s3.put_object.assert_called_once_with( + Bucket="b", + Key="frames/r1/front/100000000000.jpg", + Body=b"jpeg", + ContentType="image/jpeg", + ) + + +class TestS3KeyForLayout: + def test_no_prefix(self): + c = CameraOffloader(bucket="b") + assert c.s3_key_for("r1", "front", 12345) == "r1/front/12345.jpg" + + def test_with_prefix(self): + c = CameraOffloader(bucket="b", prefix="fleet-a/raw") + assert c.s3_key_for("r1", "front", 12345) == "fleet-a/raw/r1/front/12345.jpg" + + +class TestEnableForMeshGuards: + """`enable_for_mesh` short-circuits cleanly on every wrong-state path.""" + + def test_returns_none_on_zenoh_backend(self, monkeypatch): + from strands_robots.mesh.transport import factory as fac + + monkeypatch.setattr(fac, "current_backend", lambda: "zenoh") + mesh = MagicMock() + assert enable_for_mesh(mesh) is None + # Original method should NOT have been monkey-patched + # (mesh._publish_cameras_once is a MagicMock attr, not the patched fn) + # We verify by asserting no setattr occurred. + + def test_returns_none_when_bucket_missing(self, monkeypatch): + from strands_robots.mesh.transport import factory as fac + + monkeypatch.setattr(fac, "current_backend", lambda: "iot") + monkeypatch.delenv("STRANDS_MESH_CAMERA_S3_BUCKET", raising=False) + mesh = MagicMock() + assert enable_for_mesh(mesh) is None + + def test_wires_up_when_bucket_set(self, monkeypatch): + from strands_robots.mesh.transport import factory as fac + + monkeypatch.setattr(fac, "current_backend", lambda: "iot") + monkeypatch.setenv("STRANDS_MESH_CAMERA_S3_BUCKET", "wired-bucket") + + mesh = MagicMock() + original = mesh._publish_cameras_once + off = enable_for_mesh(mesh) + assert off is not None + assert off.bucket == "wired-bucket" + # The patched method replaces the original + assert mesh._publish_cameras_once is not original + + +# === The patched _publish_cameras_once_with_offload closure === + + +class TestPatchedPublishClosure: + """Cover the inner closure ``_publish_cameras_once_with_offload`` end-to-end. + Builds a Mesh stub that satisfies the closure's duck-type requirements. + """ + + def _make_mesh_with_camera(self, monkeypatch, bucket="b"): + from strands_robots.mesh.transport import factory as fac + + monkeypatch.setattr(fac, "current_backend", lambda: "iot") + monkeypatch.setenv("STRANDS_MESH_CAMERA_S3_BUCKET", bucket) + + # Stub transport that the closure queries + transport = MagicMock() + transport.is_alive.return_value = True + monkeypatch.setattr(fac, "current_transport", lambda: transport) + + # Mock S3 so upload_frame succeeds + offloader = CameraOffloader(bucket=bucket) + offloader._s3 = MagicMock() + offloader._s3.generate_presigned_url.return_value = "https://example.com/signed" + + # Build a mesh stub + mesh = MagicMock() + mesh.peer_id = "robot-x" + # Connected inner robot with one camera + frame = np.zeros((480, 640, 3), dtype=np.uint8) + mesh.robot.robot.is_connected = True + mesh.robot.robot.config.cameras = {"front": object()} + mesh.robot.robot.get_observation.return_value = {"front": frame} + return mesh, offloader, transport + + def test_closure_uploads_and_publishes_ref(self, monkeypatch): + mesh, off, transport = self._make_mesh_with_camera(monkeypatch) + # Wire up + off_returned = enable_for_mesh(mesh, offloader=off) + assert off_returned is off + # Trigger the patched publish + mesh._publish_cameras_once() + # transport.put must have been called exactly once for the ref topic + calls = [c for c in transport.put.call_args_list if "/ref" in c.args[0]] + assert len(calls) == 1, f"expected 1 ref publish, got {transport.put.call_args_list}" + topic, ref = calls[0].args + assert topic == "strands/robot-x/camera/front/ref" + assert ref["peer_id"] == "robot-x" + assert ref["cam"] == "front" + assert ref["s3_uri"].startswith("s3://b/robot-x/front/") + assert ref["shape"] == [480, 640, 3] + assert ref["presigned_url"] == "https://example.com/signed" + + def test_closure_skips_disconnected_robot(self, monkeypatch): + mesh, off, transport = self._make_mesh_with_camera(monkeypatch) + mesh.robot.robot.is_connected = False + enable_for_mesh(mesh, offloader=off) + mesh._publish_cameras_once() + # No /ref publish when disconnected + assert not any("/ref" in c.args[0] for c in transport.put.call_args_list) + + def test_closure_skips_when_transport_dead(self, monkeypatch): + mesh, off, transport = self._make_mesh_with_camera(monkeypatch) + transport.is_alive.return_value = False + enable_for_mesh(mesh, offloader=off) + mesh._publish_cameras_once() + assert not any("/ref" in c.args[0] for c in transport.put.call_args_list) + + def test_closure_skips_camera_with_no_frame(self, monkeypatch): + mesh, off, transport = self._make_mesh_with_camera(monkeypatch) + # Two cameras configured, but only one has data + frame = np.zeros((480, 640, 3), dtype=np.uint8) + mesh.robot.robot.config.cameras = {"front": object(), "back": object()} + mesh.robot.robot.get_observation.return_value = {"front": frame, "back": None} + enable_for_mesh(mesh, offloader=off) + mesh._publish_cameras_once() + ref_calls = [c for c in transport.put.call_args_list if "/ref" in c.args[0]] + assert len(ref_calls) == 1 + assert ref_calls[0].args[0] == "strands/robot-x/camera/front/ref" + + def test_closure_handles_observation_failure(self, monkeypatch, caplog): + import logging + + mesh, off, transport = self._make_mesh_with_camera(monkeypatch) + mesh.robot.robot.get_observation.side_effect = RuntimeError("hardware glitch") + enable_for_mesh(mesh, offloader=off) + with caplog.at_level(logging.DEBUG, logger="strands_robots.mesh.iot.camera_offload"): + # Must not raise -- offload is best-effort + mesh._publish_cameras_once() + assert not any("/ref" in c.args[0] for c in transport.put.call_args_list) + + def test_closure_skips_dtype_dtype_promotion_to_uint8(self, monkeypatch): + """Float frames are silently coerced to uint8 before JPEG-encoding.""" + mesh, off, transport = self._make_mesh_with_camera(monkeypatch) + # Use a float32 frame with valid uint8 range + frame = np.zeros((100, 100, 3), dtype=np.float32) + mesh.robot.robot.get_observation.return_value = {"front": frame} + enable_for_mesh(mesh, offloader=off) + mesh._publish_cameras_once() + ref_calls = [c for c in transport.put.call_args_list if "/ref" in c.args[0]] + assert len(ref_calls) == 1 + + +# ---------------------------------------------------------------------- +# Issue #262: presign_ttl negative kwarg WARNING (asymmetric clamp fix) +# ---------------------------------------------------------------------- + + +class TestNegativeKwargWarns: + """Pin: ``presign_ttl=-99`` (unambiguous bug at call site) emits a + WARNING when clamped to 1, but ``presign_ttl=0`` (documented + sentinel pinned by R1 fix) does NOT. + """ + + def test_negative_kwarg_emits_warning(self, caplog): + from strands_robots.mesh.iot.camera_offload import CameraOffloader + + caplog.clear() + with caplog.at_level("WARNING"): + off = CameraOffloader(bucket="test-bucket", presign_ttl=-99) + + assert off.presign_ttl == 1 + warns = [r for r in caplog.records if "presign_ttl" in r.message] + assert len(warns) == 1, f"expected 1 WARNING, got {warns}" + assert "source=kwarg" in warns[0].message + assert "-99" in warns[0].message + + def test_zero_kwarg_no_warning(self, caplog): + """Sentinel value 0 (kwarg-vs-env-precedence pin from R1) must + NOT emit a WARNING -- documented deliberate caller value. + """ + from strands_robots.mesh.iot.camera_offload import CameraOffloader + + caplog.clear() + with caplog.at_level("WARNING"): + off = CameraOffloader(bucket="test-bucket", presign_ttl=0) + + assert off.presign_ttl == 1 + warns = [r for r in caplog.records if "presign_ttl" in r.message] + assert len(warns) == 0, f"expected no WARNING for sentinel 0, got {warns}" + + def test_negative_env_emits_warning(self, monkeypatch, caplog): + """Env-var path always WARNs (operator-side bug).""" + from strands_robots.mesh.iot.camera_offload import CameraOffloader + + monkeypatch.setenv("STRANDS_MESH_CAMERA_PRESIGN_TTL", "-99") + caplog.clear() + with caplog.at_level("WARNING"): + off = CameraOffloader(bucket="test-bucket") + + assert off.presign_ttl == 1 + warns = [r for r in caplog.records if "presign_ttl" in r.message] + assert len(warns) == 1 + assert "source=env" in warns[0].message diff --git a/tests/mesh/test_iot_policy_scope.py b/tests/mesh/test_iot_policy_scope.py new file mode 100644 index 00000000..89c89d52 --- /dev/null +++ b/tests/mesh/test_iot_policy_scope.py @@ -0,0 +1,129 @@ +"""Tests pinning the scope of the canonical IoT policies. + +The robot and operator policies in :mod:`provision` deliberately avoid the +``strands/*`` wildcard for ``iot:Receive`` so neither role can eavesdrop +on the entire fleet's mesh traffic. These tests assert that scope: + +* Robot ``Receive`` covers only the robot's own ``/cmd``, own + ``/response/*``, ``broadcast``, ``safety/estop``, and ``+/presence``. +* Operator ``Receive`` covers monitoring topics (``presence``, ``state``, + ``health``, ``safety/event``, ``safety/estop``) and not the + command/response streams of other operators. + +A future refactor that re-introduces the wildcard will fail these tests +loudly, surfacing the regression in code review. +""" + +from __future__ import annotations + +from strands_robots.mesh.iot.provision import ( + _OPERATOR_POLICY_DOC, + _ROBOT_POLICY_DOC, +) + + +def _statements_by_sid(doc: dict) -> dict[str, dict]: + return {st.get("Sid", ""): st for st in doc["Statement"]} + + +class TestRobotPolicy: + def test_no_unconditional_receive_wildcard(self): + """Robot policy must NOT contain iot:Receive on strands/*.""" + for st in _ROBOT_POLICY_DOC["Statement"]: + actions = st.get("Action") + if isinstance(actions, str): + actions = [actions] + if not any(a == "iot:Receive" or a == "iot:*" for a in actions): + continue + resources = st.get("Resource") + if isinstance(resources, str): + resources = [resources] + for r in resources: + # Wildcard on strands/* would expose the entire fleet; + # specific-suffix patterns are OK. + assert not r.endswith(":topic/strands/*"), f"Found wildcard Receive resource: {r!r}" + + def test_scoped_receive_present(self): + """The replacement statement must exist and cover only the topics + robots actually subscribe to.""" + sids = _statements_by_sid(_ROBOT_POLICY_DOC) + assert "AllowReceiveScoped" in sids, "scoped-Receive statement missing" + st = sids["AllowReceiveScoped"] + resources = st["Resource"] + # Must include own cmd + own response + broadcast + safety + presence. + joined = "\n".join(resources) + assert "${iot:Connection.Thing.ThingName}/cmd" in joined + assert "${iot:Connection.Thing.ThingName}/response/*" in joined + assert "/strands/broadcast" in joined + assert "/strands/safety/estop" in joined + assert "/strands/+/presence" in joined + + def test_publish_still_scoped_to_own_thing(self): + """Sanity: publish remains scoped to the robot's own topics.""" + sids = _statements_by_sid(_ROBOT_POLICY_DOC) + st = sids["AllowOwnTopics"] + for r in st["Resource"]: + assert "${iot:Connection.Thing.ThingName}/" in r + + def test_no_receive_on_arbitrary_camera(self): + """A robot must not be able to subscribe to another robot's camera.""" + for st in _ROBOT_POLICY_DOC["Statement"]: + actions = st.get("Action") + if isinstance(actions, str): + actions = [actions] + if "iot:Subscribe" not in actions and "iot:Receive" not in actions: + continue + resources = st.get("Resource") + if isinstance(resources, str): + resources = [resources] + for r in resources: + assert "/camera/" not in r, f"Camera subscription leaked into robot policy: {r!r}" + + +class TestOperatorPolicy: + def test_no_unconditional_receive_wildcard(self): + """Operator policy must not allow Receive on strands/* either.""" + sids = _statements_by_sid(_OPERATOR_POLICY_DOC) + st = sids["OperatorObserveFleet"] + for r in st["Resource"]: + assert not r.endswith(":topic/strands/*"), f"Operator wildcard Receive: {r!r}" + + def test_scoped_to_monitoring_topics(self): + sids = _statements_by_sid(_OPERATOR_POLICY_DOC) + st = sids["OperatorObserveFleet"] + joined = "\n".join(st["Resource"]) + assert "/strands/+/presence" in joined + assert "/strands/+/state" in joined + assert "/strands/+/health" in joined + assert "/strands/+/safety/event" in joined + assert "/strands/safety/estop" in joined + + def test_no_camera_or_input_in_operator_observe(self): + sids = _statements_by_sid(_OPERATOR_POLICY_DOC) + st = sids["OperatorObserveFleet"] + for r in st["Resource"]: + assert "/camera/" not in r + assert "/input/" not in r + + def test_publish_to_fleet_wildcard_is_deliberate(self): + """Pin: OperatorPublishToFleet uses ``strands/*/cmd`` wildcard by design. + + The system has no per-operator-to-per-robot binding. A compromised + operator credential has equivalent scope to a compromised fleet + command authority. Mitigations are short-lived certs, the + OperatorShadow attribute condition, and the operational audit log. + A per-robot operator scope would require one policy document per + robot, scaling policy count linearly with fleet size. + + If this test breaks, someone narrowed the operator publish scope -- + verify the corresponding transport/dispatch code still routes + commands correctly. + """ + sids = _statements_by_sid(_OPERATOR_POLICY_DOC) + st = sids["OperatorPublishToFleet"] + resources = st["Resource"] + # The wildcard ``strands/*/cmd`` must exist for the operator to + # address any robot without a per-robot policy. + assert any(r.endswith(":topic/strands/*/cmd") for r in resources), ( + "OperatorPublishToFleet must retain the */cmd wildcard (deliberate design choice)" + ) diff --git a/tests/mesh/test_iot_provision.py b/tests/mesh/test_iot_provision.py index 023f95e4..b6e9544e 100644 --- a/tests/mesh/test_iot_provision.py +++ b/tests/mesh/test_iot_provision.py @@ -29,6 +29,33 @@ # Test fixtures +@pytest.fixture +def bypass_ca(monkeypatch): + """Opt-in CA-pin bypass for tests that exercise ``provision_robot`` + / ``provision_operator`` orchestration. + + Stub ``_ensure_ca`` to a no-op so a test does not need to pre-seed + a real pinned CA file or mock urllib. CA pinning behaviour itself + has dedicated coverage in ``test_iot_ca_pin.py`` -- including a + regression test that the on-disk re-use path always raw-checks the + pin even when the ``STRANDS_MESH_DISABLE_CA_PIN`` break-glass is + set. Tests that don't go through the provisioning entry points + don't get the bypass, so a future refactor that drops the + ``_ensure_ca`` call from ``provision_robot`` would still surface in + a test that exercises the production call path. + + NB: we deliberately do NOT use ``STRANDS_MESH_DISABLE_CA_PIN=true`` + here. The break-glass only applies to the *download* path; the + on-disk re-use path always raw-checks the pin, so a pre-seeded + ``fake-ca`` file would be (correctly) rejected. + """ + monkeypatch.setattr( + "strands_robots.mesh.iot.provision._ensure_ca", + lambda ca_path: None, + ) + yield + + @pytest.fixture def tmp_cert_dir(tmp_path): """Isolated cert dir so we don't write to ~/.strands_robots.""" @@ -173,6 +200,7 @@ def test_skips_when_present(self, fake_iot_client): class TestProvisionRobot: + pytestmark = pytest.mark.usefixtures("bypass_ca") """End-to-end provisioning with all AWS calls mocked.""" def test_writes_certs_with_correct_permissions(self, fake_iot_client, tmp_cert_dir, monkeypatch): @@ -181,8 +209,6 @@ def test_writes_certs_with_correct_permissions(self, fake_iot_client, tmp_cert_d "strands_robots.mesh.iot.provision._require_boto3", lambda: MagicMock(client=lambda *a, **kw: fake_iot_client), ) - # Skip the urlopen call for the CA — write a fake one. - (tmp_cert_dir / "AmazonRootCA1.pem").write_text("fake-ca") result = provision_robot( "test-robot-01", @@ -204,7 +230,6 @@ def test_attaches_policy_to_cert_and_thing(self, fake_iot_client, tmp_cert_dir, "strands_robots.mesh.iot.provision._require_boto3", lambda: MagicMock(client=lambda *a, **kw: fake_iot_client), ) - (tmp_cert_dir / "AmazonRootCA1.pem").write_text("fake-ca") provision_robot("r", cert_dir=tmp_cert_dir) @@ -221,7 +246,6 @@ def test_env_vars_helper(self, fake_iot_client, tmp_cert_dir, monkeypatch): "strands_robots.mesh.iot.provision._require_boto3", lambda: MagicMock(client=lambda *a, **kw: fake_iot_client), ) - (tmp_cert_dir / "AmazonRootCA1.pem").write_text("fake-ca") result = provision_robot("e2", cert_dir=tmp_cert_dir) env = result.env_vars() @@ -238,7 +262,6 @@ def test_injects_strands_mesh_role_attribute(self, fake_iot_client, tmp_cert_dir "strands_robots.mesh.iot.provision._require_boto3", lambda: MagicMock(client=lambda *a, **kw: fake_iot_client), ) - (tmp_cert_dir / "AmazonRootCA1.pem").write_text("fake-ca") provision_robot("my-robot", cert_dir=tmp_cert_dir) @@ -252,7 +275,6 @@ def test_preserves_user_attributes(self, fake_iot_client, tmp_cert_dir, monkeypa "strands_robots.mesh.iot.provision._require_boto3", lambda: MagicMock(client=lambda *a, **kw: fake_iot_client), ) - (tmp_cert_dir / "AmazonRootCA1.pem").write_text("fake-ca") provision_robot("my-robot", cert_dir=tmp_cert_dir, attributes={"hw": "so100"}) @@ -267,7 +289,6 @@ def test_user_can_override_role_attribute(self, fake_iot_client, tmp_cert_dir, m "strands_robots.mesh.iot.provision._require_boto3", lambda: MagicMock(client=lambda *a, **kw: fake_iot_client), ) - (tmp_cert_dir / "AmazonRootCA1.pem").write_text("fake-ca") provision_robot("my-robot", cert_dir=tmp_cert_dir, attributes={"strands-mesh-role": "custom"}) @@ -277,6 +298,7 @@ def test_user_can_override_role_attribute(self, fake_iot_client, tmp_cert_dir, m class TestProvisionOperator: + pytestmark = pytest.mark.usefixtures("bypass_ca") """Operator provisioning uses the operator policy, not the robot policy.""" def test_uses_operator_policy(self, fake_iot_client, tmp_cert_dir, monkeypatch): @@ -284,7 +306,6 @@ def test_uses_operator_policy(self, fake_iot_client, tmp_cert_dir, monkeypatch): "strands_robots.mesh.iot.provision._require_boto3", lambda: MagicMock(client=lambda *a, **kw: fake_iot_client), ) - (tmp_cert_dir / "AmazonRootCA1.pem").write_text("fake-ca") result = provision_operator("ops-1", cert_dir=tmp_cert_dir) @@ -331,6 +352,7 @@ def test_env_vars_keys(self): class TestCleanupStaleCerts: + pytestmark = pytest.mark.usefixtures("bypass_ca") """Re-running provision_robot must not accumulate certs. Regression coverage for the security-relevant bug found in cycle 9 of @@ -347,7 +369,6 @@ def test_cleanup_runs_before_new_cert_issuance(self, fake_iot_client, tmp_cert_d "strands_robots.mesh.iot.provision._require_boto3", lambda: MagicMock(client=lambda *a, **kw: fake_iot_client), ) - (tmp_cert_dir / "AmazonRootCA1.pem").write_text("fake-ca") # Pretend the Thing already has an old cert attached. old_cert_arn = "arn:aws:iot:us-west-2:123:cert/old-cert-id-aaaaa" @@ -373,7 +394,6 @@ def test_cleanup_swallows_cert_delete_failures(self, fake_iot_client, tmp_cert_d "strands_robots.mesh.iot.provision._require_boto3", lambda: MagicMock(client=lambda *a, **kw: fake_iot_client), ) - (tmp_cert_dir / "AmazonRootCA1.pem").write_text("fake-ca") fake_iot_client.list_thing_principals.return_value = { "principals": ["arn:aws:iot:us-west-2:123:cert/cant-delete"] @@ -404,3 +424,98 @@ class _NotFound(Exception): assert n == 0 iot.detach_thing_principal.assert_not_called() iot.delete_certificate.assert_not_called() + + +class TestThingNameStrictSubset: + """docstring previously claimed AWS-IoT-compatible. + The regex is in fact a strict subset (no colon). Pin the contract. + """ + + def test_alphanumerics_accepted(self): + from strands_robots.mesh.iot import provision + + provision._validate_thing_name("robot-01") # must NOT raise + provision._validate_thing_name("R") + provision._validate_thing_name("a" * 128) + + def test_colon_rejected_even_though_aws_allows(self): + """AWS IoT permits ``:`` in Thing names; we deliberately do not.""" + from strands_robots.mesh.iot import provision + + with pytest.raises(ValueError, match="invalid characters"): + provision._validate_thing_name("robot:01") + + def test_path_traversal_rejected(self): + from strands_robots.mesh.iot import provision + + with pytest.raises(ValueError, match="invalid characters"): + provision._validate_thing_name("../../etc/passwd") + + def test_too_long_rejected(self): + from strands_robots.mesh.iot import provision + + with pytest.raises(ValueError, match="invalid characters"): + provision._validate_thing_name("a" * 129) + + def test_empty_rejected(self): + from strands_robots.mesh.iot import provision + + with pytest.raises(ValueError, match="non-empty"): + provision._validate_thing_name("") + + def test_slash_rejected(self): + from strands_robots.mesh.iot import provision + + with pytest.raises(ValueError, match="invalid characters"): + provision._validate_thing_name("robot/01") + + def test_dot_rejected(self): + from strands_robots.mesh.iot import provision + + with pytest.raises(ValueError, match="invalid characters"): + provision._validate_thing_name("robot.01") + + +class TestValidateThingNameFullmatch: + """Regression: ``_validate_thing_name`` must use ``re.fullmatch`` so a + trailing newline / CR / form-feed / EOL character is rejected. + + ``re.match(r'^[a-zA-Z0-9_-]{1,128}$', s)`` accepts ``'robot\\n'`` + because in non-MULTILINE mode ``$`` matches *just before a trailing + newline*. The PR description for #228 explicitly claims the regex + is "anchored, not just `match`" — these tests pin that contract. + + A bypass surface exists wherever ``thing_name`` is interpolated + into a filesystem path or an AWS API call (cert files under + ``cert_dir``, IoT topic ARNs, S3 keys via ``mesh.peer_id``). + """ + + def test_trailing_newline_rejected(self): + from strands_robots.mesh.iot import provision + + with pytest.raises(ValueError, match="invalid characters"): + provision._validate_thing_name("robot\n") + + def test_trailing_carriage_return_rejected(self): + from strands_robots.mesh.iot import provision + + with pytest.raises(ValueError, match="invalid characters"): + provision._validate_thing_name("robot\r") + + def test_embedded_newline_rejected(self): + from strands_robots.mesh.iot import provision + + with pytest.raises(ValueError, match="invalid characters"): + provision._validate_thing_name("robot\nfoo") + + def test_trailing_tab_rejected(self): + from strands_robots.mesh.iot import provision + + with pytest.raises(ValueError, match="invalid characters"): + provision._validate_thing_name("robot\t") + + def test_trailing_form_feed_rejected(self): + from strands_robots.mesh.iot import provision + + with pytest.raises(ValueError, match="invalid characters"): + provision._validate_thing_name("robot\x0c") diff --git a/tests/mesh/test_presign_ttl_none_vs_zero.py b/tests/mesh/test_presign_ttl_none_vs_zero.py new file mode 100644 index 00000000..d09da9c2 --- /dev/null +++ b/tests/mesh/test_presign_ttl_none_vs_zero.py @@ -0,0 +1,99 @@ +"""Regression test for presign_ttl=0 vs presign_ttl=None. + +Bug: ``presign_ttl or int(os.getenv(...))`` treats 0 as falsy and +silently falls back to the env-var default. An operator who explicitly +passes ``presign_ttl=0`` (intending "minimum possible") gets 60s instead +of the 1s floor. This test pins the fix: explicit None check. + +Thread: PR #228, camera_offload.py:80 +""" + +from __future__ import annotations + +from strands_robots.mesh.iot.camera_offload import ( + DEFAULT_PRESIGN_TTL_SECONDS, + CameraOffloader, +) + + +class TestPresignTTLNoneVsZero: + """Pin the distinction between presign_ttl=None and presign_ttl=0.""" + + def test_none_falls_back_to_env_default(self, monkeypatch): + """presign_ttl=None means 'use env var or built-in default'.""" + monkeypatch.delenv("STRANDS_MESH_CAMERA_PRESIGN_TTL", raising=False) + off = CameraOffloader(bucket="b", presign_ttl=None) + assert off.presign_ttl == DEFAULT_PRESIGN_TTL_SECONDS + + def test_none_falls_back_to_env_override(self, monkeypatch): + """presign_ttl=None + env var set -> uses env var.""" + monkeypatch.setenv("STRANDS_MESH_CAMERA_PRESIGN_TTL", "300") + off = CameraOffloader(bucket="b", presign_ttl=None) + assert off.presign_ttl == 300 + + def test_zero_is_explicit_and_clamps_to_one(self, monkeypatch): + """presign_ttl=0 is an explicit value, NOT a fallback trigger. + + The < 1 clamp brings it to 1. This is the regression guard for + the ``or``-based falsy bug. + """ + monkeypatch.delenv("STRANDS_MESH_CAMERA_PRESIGN_TTL", raising=False) + off = CameraOffloader(bucket="b", presign_ttl=0) + # Must NOT be DEFAULT_PRESIGN_TTL_SECONDS (60); must be 1. + assert off.presign_ttl == 1 + + def test_zero_ignores_env_var(self, monkeypatch): + """Explicit presign_ttl=0 overrides the env var entirely.""" + monkeypatch.setenv("STRANDS_MESH_CAMERA_PRESIGN_TTL", "300") + off = CameraOffloader(bucket="b", presign_ttl=0) + # The explicit kwarg takes precedence over env var. + assert off.presign_ttl == 1 + + def test_explicit_positive_value_ignores_env(self, monkeypatch): + """Explicit presign_ttl=120 overrides env var.""" + monkeypatch.setenv("STRANDS_MESH_CAMERA_PRESIGN_TTL", "300") + off = CameraOffloader(bucket="b", presign_ttl=120) + assert off.presign_ttl == 120 + + def test_negative_clamps_to_one(self, monkeypatch): + """Negative values are clamped to 1 floor.""" + monkeypatch.delenv("STRANDS_MESH_CAMERA_PRESIGN_TTL", raising=False) + off = CameraOffloader(bucket="b", presign_ttl=-10) + assert off.presign_ttl == 1 + + +class TestEnvVarMalformed: + """Non-integer STRANDS_MESH_CAMERA_PRESIGN_TTL must not crash __init__. + + Regression: ``int(os.getenv(...))`` would raise ValueError on a typo'd + env var ('forever', '1m', whitespace), bricking CameraOffloader at + construction time with a confusing traceback. We fall back to the + documented default with a WARNING instead. + """ + + def test_non_numeric_falls_back_with_warning(self, monkeypatch, caplog): + import logging + + from strands_robots.mesh.iot.camera_offload import ( + DEFAULT_PRESIGN_TTL_SECONDS, + CameraOffloader, + ) + + monkeypatch.setenv("STRANDS_MESH_CAMERA_PRESIGN_TTL", "forever") + with caplog.at_level(logging.WARNING, logger="strands_robots.mesh.iot.camera_offload"): + c = CameraOffloader(bucket="b") + assert c.presign_ttl == DEFAULT_PRESIGN_TTL_SECONDS + assert any("is not an integer" in m for m in caplog.messages), ( + f"expected WARNING about non-integer env var; got {caplog.messages}" + ) + + def test_empty_string_falls_back_silently(self, monkeypatch): + """Empty string should be treated as 'unset' (no warning), not crash.""" + from strands_robots.mesh.iot.camera_offload import ( + DEFAULT_PRESIGN_TTL_SECONDS, + CameraOffloader, + ) + + monkeypatch.setenv("STRANDS_MESH_CAMERA_PRESIGN_TTL", "") + c = CameraOffloader(bucket="b") + assert c.presign_ttl == DEFAULT_PRESIGN_TTL_SECONDS diff --git a/tests/mesh/test_teardown_thing_validation.py b/tests/mesh/test_teardown_thing_validation.py new file mode 100644 index 00000000..6acf6297 --- /dev/null +++ b/tests/mesh/test_teardown_thing_validation.py @@ -0,0 +1,220 @@ +"""Regression test: teardown_thing validates thing_name before any AWS/FS call. + +Addresses review thread on provision.py:320 (PR #228 R3) -- _validate_thing_name +was applied to provision_robot and provision_operator but NOT to teardown_thing, +leaving a path-traversal vector via ``DEFAULT_CERT_DIR / f"{thing_name}.pem"``. +""" + +import pytest + + +class TestTeardownThingValidation: + """teardown_thing must reject unsafe thing_name values.""" + + def test_path_traversal_rejected(self): + """thing_name containing '../' must raise ValueError before any I/O.""" + from strands_robots.mesh.iot.provision import teardown_thing + + with pytest.raises(ValueError, match="invalid characters"): + teardown_thing("../../etc/passwd") + + def test_dots_rejected(self): + """thing_name containing '.' must raise ValueError.""" + from strands_robots.mesh.iot.provision import teardown_thing + + with pytest.raises(ValueError, match="invalid characters"): + teardown_thing("robot.v2") + + def test_colons_rejected(self): + """thing_name containing ':' must raise ValueError.""" + from strands_robots.mesh.iot.provision import teardown_thing + + with pytest.raises(ValueError, match="invalid characters"): + teardown_thing("robot:alpha") + + def test_empty_rejected(self): + """Empty thing_name must raise ValueError.""" + from strands_robots.mesh.iot.provision import teardown_thing + + with pytest.raises(ValueError, match="non-empty string"): + teardown_thing("") + + def test_valid_name_passes_validation(self, monkeypatch): + """Valid thing_name passes validation, reaches boto3 import.""" + from strands_robots.mesh.iot import provision + + # Mock _require_boto3 to avoid real AWS calls + mock_called = [] + + def fake_require_boto3(): + mock_called.append(True) + raise ImportError("boto3 not available in test") + + monkeypatch.setattr(provision, "_require_boto3", fake_require_boto3) + + with pytest.raises(ImportError, match="boto3 not available"): + provision.teardown_thing("valid-robot-name_123") + + assert mock_called, "_require_boto3 should be called after validation passes" + + +class TestTeardownThingCertDirParity: + """teardown_thing must honour a custom cert_dir to match provision_robot. + + Regression for the asymmetry where provision_robot accepted ``cert_dir=`` + but teardown_thing was hardcoded to DEFAULT_CERT_DIR -- callers who + provisioned with a custom cert_dir would silently leak .cert.pem and + .private.key on disk forever after teardown. + """ + + def test_cert_dir_kwarg_unlinks_local_files(self, tmp_path, monkeypatch): + """teardown_thing(thing, cert_dir=tmp) must unlink files under tmp, + not under DEFAULT_CERT_DIR.""" + from strands_robots.mesh.iot import provision + + # Seed cert + key files in a custom dir + custom_dir = tmp_path / "iot" + custom_dir.mkdir() + cert = custom_dir / "test-robot.cert.pem" + key = custom_dir / "test-robot.private.key" + cert.write_text("FAKE CERT") + key.write_text("FAKE KEY") + assert cert.exists() and key.exists() + + # Stub boto3 + the iot client so teardown reaches the unlink loop + # without making real AWS calls. + fake_iot = type("FakeIoT", (), {})() + + class _NotFound(Exception): + pass + + fake_iot.exceptions = type( + "FakeExc", + (), + {"ResourceNotFoundException": _NotFound}, + )() + fake_iot.list_thing_principals = lambda **kw: {"principals": []} + fake_iot.delete_thing = lambda **kw: None + + fake_boto3 = type("FakeBoto3", (), {})() + fake_boto3.client = lambda *a, **kw: fake_iot + monkeypatch.setattr(provision, "_require_boto3", lambda: fake_boto3) + + provision.teardown_thing("test-robot", cert_dir=custom_dir) + + # Both files should now be gone under the custom dir. + assert not cert.exists(), "cert.pem leaked under custom cert_dir" + assert not key.exists(), "private.key leaked under custom cert_dir" + + def test_no_public_key_suffix_attempted(self, tmp_path, monkeypatch): + """``.public.key`` was dead code (``_create_cert`` never writes it). + teardown_thing should not attempt to unlink it.""" + from strands_robots.mesh.iot import provision + + custom_dir = tmp_path / "iot" + custom_dir.mkdir() + + attempted: list[str] = [] + original_unlink = type(custom_dir).unlink + + def _track_unlink(self, *a, **kw): + attempted.append(self.name) + return original_unlink(self, *a, **kw) + + monkeypatch.setattr(type(custom_dir), "unlink", _track_unlink) + + fake_iot = type("FakeIoT", (), {})() + + class _NotFound(Exception): + pass + + fake_iot.exceptions = type( + "FakeExc", + (), + {"ResourceNotFoundException": _NotFound}, + )() + fake_iot.list_thing_principals = lambda **kw: {"principals": []} + fake_iot.delete_thing = lambda **kw: None + fake_boto3 = type("FakeBoto3", (), {})() + fake_boto3.client = lambda *a, **kw: fake_iot + monkeypatch.setattr(provision, "_require_boto3", lambda: fake_boto3) + + # Pre-seed only the suffixes _create_cert actually writes. + (custom_dir / "robot.cert.pem").write_text("c") + (custom_dir / "robot.private.key").write_text("k") + + provision.teardown_thing("robot", cert_dir=custom_dir) + + # We should never have attempted a `.public.key` suffix. + assert not any(name.endswith(".public.key") for name in attempted), ( + f"unexpected .public.key unlink attempt in {attempted}" + ) + + +class TestTeardownThingDocstringShape: + """Pin: teardown_thing's docstring must render with consistent indentation. + + Regression marker for two related defects in the same docstring: + + 1. R7 docstring-typo bug (a literal `n` glyph between the body and the + `Note:` section -- the failure mode of an editor inserting `n` instead + of `\n`). + 2. R7-fix indentation bug: the original repair left the body at 8 spaces, + the `Note:` heading at 4, and the Note body at 12 -- which `cleandoc` + resolved to a body indented as if it were a literal blockquote with the + Note body double-indented underneath. Sphinx / pdoc / IDE help renderers + treat that as a malformed Google-style docstring. + + The pin tests assert on *post-cleandoc structure*, not adjacent substrings, + because the original R7-fix pin asserted only on substring presence and + missed the whole indentation regression. A pin test must reject the same + failure mode it was added to prevent. + """ + + def test_no_stray_n_literal_in_docstring(self): + """The docstring must not contain a bare `n Note:` artefact.""" + from strands_robots.mesh.iot.provision import teardown_thing + + ds = teardown_thing.__doc__ + assert ds is not None, "teardown_thing.__doc__ went missing" + # The R7 typo manifested as a literal 'n Note:' on a line by itself + # (where '\n Note:' was intended). Pin the absence of that artefact. + assert "n Note:" not in ds, "stray 'n' before Note: section -- R7 docstring typo regression" + # And keep the Note section itself, since the original R7 fix was + # adding the cert_dir trust note. + assert "Note:" in ds, "Note: section must remain" + assert "trusted operator input" in ds, "cert_dir trust note (R7) must remain" + + def test_cleandoc_renders_consistent_indentation(self): + """After ``inspect.cleandoc``, body, ``Note:`` heading, and Note body + must use the Google-style indent ladder: body and heading at 0, Note + body at 4. The R7-fix indentation bug rendered body at 4 (blockquote) + and Note body at 8 (double-indented). + """ + import inspect + + from strands_robots.mesh.iot.provision import teardown_thing + + cleaned = inspect.cleandoc(teardown_thing.__doc__ or "") + lines = cleaned.split("\n") + + # Summary line at column 0. + assert lines[0].startswith("Detach + delete"), "summary line missing" + assert not lines[0].startswith(" "), "summary line must be at column 0" + + # Body paragraph ("Cleans up the cert files") must be at column 0, + # not indented as a literal blockquote. + body_line = next(ln for ln in lines if ln.lstrip().startswith("Cleans up the cert files")) + assert body_line == body_line.lstrip(), ( + f"docstring body must render at column 0 after cleandoc; " + f"got {len(body_line) - len(body_line.lstrip())} leading spaces" + ) + + # `Note:` heading at column 0. + note_heading = next(ln for ln in lines if ln.rstrip() == "Note:") + assert note_heading == "Note:", f"`Note:` heading must render at column 0; got {note_heading!r}" + + # Note body at exactly 4 spaces (one indent level under heading). + note_body_line = next(ln for ln in lines if ln.lstrip().startswith("``cert_dir`` is treated")) + leading = len(note_body_line) - len(note_body_line.lstrip()) + assert leading == 4, f"Note body must render at 4 spaces (Google-style); got {leading}"