diff --git a/src/defib/power/rack.py b/src/defib/power/rack.py index f55e46c..fa92530 100644 --- a/src/defib/power/rack.py +++ b/src/defib/power/rack.py @@ -89,6 +89,81 @@ async def close(self) -> None: # Stateless HTTP — nothing to release. return None + async def tftp_put( + self, + name: str, + data: bytes, + timeout: float = 60.0, + ) -> dict[str, object]: + """Stage a file for the pod's local TFTP server. + + The pod hosts a tiny TFTP server on its W5500 interface + (``192.168.1.1:69``) — the camera's U-Boot can fetch staged + files directly over the local LAN with no host-side TFTP + server and no NAT44 in the data path. + + Args: + name: filename the camera will request via ``tftp ``. + Must not contain ``/``; the pod rejects path-traversal. + data: raw file bytes. PSRAM-allocated on the pod; max size + is reported by the pod's ``GET /tftp`` (``max_size_bytes``). + timeout: HTTP timeout in seconds. + + Returns: + JSON from the pod, e.g. ``{"name": "uImage", "size": 2055676}``. + On size/OOM the pod returns 4xx/503 with an ``error`` field — + surfaced as ``PowerControllerError`` with the response body. + """ + if "/" in name or not name: + raise PowerControllerError(f"bad TFTP filename: {name!r}") + url = f"http://{self._host}:{self._port}/tftp/{name}" + logger.info("rack POST %s (%d bytes)", url, len(data)) + return await asyncio.to_thread(self._http_send_sync, "POST", url, data, timeout) + + async def tftp_delete(self, name: str, timeout: float = 10.0) -> dict[str, object]: + """Drop a staged TFTP file on the pod.""" + if "/" in name: + raise PowerControllerError(f"bad TFTP filename: {name!r}") + path = f"/tftp/{name}" if name else "/tftp" + url = f"http://{self._host}:{self._port}{path}" + return await asyncio.to_thread(self._http_send_sync, "DELETE", url, b"", timeout) + + async def tftp_clear(self, timeout: float = 10.0) -> dict[str, object]: + """Drop all staged TFTP files on the pod.""" + url = f"http://{self._host}:{self._port}/tftp" + return await asyncio.to_thread(self._http_send_sync, "DELETE", url, b"", timeout) + + async def tftp_list(self, timeout: float = 10.0) -> dict[str, object]: + """List staged TFTP files + PSRAM headroom.""" + url = f"http://{self._host}:{self._port}/tftp" + return await asyncio.to_thread(self._http_send_sync, "GET", url, None, timeout) + + @staticmethod + def _http_send_sync( + method: str, url: str, body: bytes | None, timeout: float, + ) -> dict[str, object]: + req = urllib.request.Request(url, method=method) + if body is not None: + req.data = body + req.add_header("Content-Type", "application/octet-stream") + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + payload = resp.read() + except urllib.error.HTTPError as e: + detail = e.read().decode("utf-8", "replace")[:300] + raise PowerControllerError( + f"rack HTTP {e.code} on {method} {url}: {detail}" + ) from e + except (urllib.error.URLError, TimeoutError, OSError) as e: + raise PowerControllerError( + f"rack unreachable at {url}: {e}" + ) from e + try: + result = json.loads(payload) + except json.JSONDecodeError: + return {} + return result if isinstance(result, dict) else {} + async def fastboot( self, spl_address: int, diff --git a/tests/test_power_rack.py b/tests/test_power_rack.py index ec2e371..12f373a 100644 --- a/tests/test_power_rack.py +++ b/tests/test_power_rack.py @@ -403,3 +403,106 @@ async def test_realistic_blob_size_within_pod_limit( # = 12 + 6 + 128 + 4 + 24576 + 4 + 17104 = 41834 assert len(rec.calls[0][2]) == 41834 assert len(rec.calls[0][2]) < 1024 * 1024 # < FASTBOOT_MAX_BODY + + +# --------------------------------------------------------------------------- +# tftp_put / tftp_delete / tftp_clear / tftp_list — pod-hosted TFTP staging +# --------------------------------------------------------------------------- + +class TestTftpStaging: + """RackController.tftp_put / delete / clear / list — the host wrapper + around the pod's POST /tftp/, DELETE /tftp/, GET /tftp. + + Lets defib (or any caller) stage firmware into the pod's PSRAM so the + camera's U-Boot can fetch it from 192.168.1.1 over the local LAN — + no host-side TFTP server required.""" + + @pytest.mark.asyncio + async def test_tftp_put_posts_correct_url_and_body( + self, monkeypatch: pytest.MonkeyPatch, + ) -> None: + ctrl = RackController(host="pod", port=8080) + body = b'{"name":"uImage","size":2048}' + with patched_urlopen(monkeypatch, body=body) as rec: + r = await ctrl.tftp_put("uImage", b"X" * 2048) + assert len(rec.calls) == 1 + method, url, data = rec.calls[0] + assert method == "POST" + assert url == "http://pod:8080/tftp/uImage" + assert data == b"X" * 2048 + assert r == {"name": "uImage", "size": 2048} + + @pytest.mark.asyncio + async def test_tftp_put_rejects_path_traversal(self) -> None: + ctrl = RackController(host="pod", port=8080) + with pytest.raises(PowerControllerError, match="bad TFTP filename"): + await ctrl.tftp_put("../etc/passwd", b"x") + with pytest.raises(PowerControllerError, match="bad TFTP filename"): + await ctrl.tftp_put("sub/path", b"x") + with pytest.raises(PowerControllerError, match="bad TFTP filename"): + await ctrl.tftp_put("", b"x") + + @pytest.mark.asyncio + async def test_tftp_delete_one( + self, monkeypatch: pytest.MonkeyPatch, + ) -> None: + ctrl = RackController(host="pod", port=8080) + with patched_urlopen(monkeypatch, body=b'{"ok":true}') as rec: + await ctrl.tftp_delete("uImage") + method, url, _ = rec.calls[0] + assert method == "DELETE" + assert url == "http://pod:8080/tftp/uImage" + + @pytest.mark.asyncio + async def test_tftp_clear_all( + self, monkeypatch: pytest.MonkeyPatch, + ) -> None: + ctrl = RackController(host="pod", port=8080) + with patched_urlopen(monkeypatch, body=b'{"ok":true}') as rec: + await ctrl.tftp_clear() + method, url, _ = rec.calls[0] + assert method == "DELETE" + assert url == "http://pod:8080/tftp" # no trailing / + + @pytest.mark.asyncio + async def test_tftp_list_returns_dict( + self, monkeypatch: pytest.MonkeyPatch, + ) -> None: + ctrl = RackController(host="pod", port=8080) + body = ( + b'{"files":[{"name":"uImage","size":2048,"reads":0}],' + b'"max_size_bytes":8388608,"max_slots":4,' + b'"psram_free_bytes":6291456,"psram_largest_free_block":4194304}' + ) + with patched_urlopen(monkeypatch, body=body) as rec: + r = await ctrl.tftp_list() + method, url, _ = rec.calls[0] + assert method == "GET" + assert url == "http://pod:8080/tftp" + assert r["files"][0]["name"] == "uImage" + assert r["max_size_bytes"] == 8388608 + + @pytest.mark.asyncio + async def test_tftp_put_oom_returns_typed_error( + self, monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Pod returns 503 + JSON body when PSRAM allocation fails. + Surface that as a PowerControllerError with the body so callers + can fall back to host TFTP or chunked upload.""" + import urllib.error + err_body = ( + b'{"error":"oom","requested":8388608,' + b'"psram_largest_free":4194304}' + ) + + def http503(req: Any, timeout: float | None = None) -> None: + raise urllib.error.HTTPError( + url=req.full_url, code=503, msg="Service Unavailable", + hdrs=None, # type: ignore[arg-type] + fp=io.BytesIO(err_body), + ) + + monkeypatch.setattr(rack_mod.urllib.request, "urlopen", http503) + ctrl = RackController(host="pod", port=8080) + with pytest.raises(PowerControllerError, match="503"): + await ctrl.tftp_put("rootfs", b"X" * 4096)