diff --git a/tests/test_agent_protocol.py b/tests/test_agent_protocol.py index 2839f04..151b05f 100644 --- a/tests/test_agent_protocol.py +++ b/tests/test_agent_protocol.py @@ -236,6 +236,141 @@ async def test_async_path_three_packets_one_chunk(self): assert seen == [RSP_READY, RSP_DATA, RSP_ACK] +# --------------------------------------------------------------------------- +# Tests: recv_packet async-leftover buffer stress +# +# These exercise the per-transport buffer that recv_packet keeps for +# bytes that arrived after a frame's delimiter — a regression class +# (PR #86) that's worth pinning down with stream-style scenarios: +# split frames across reads, large multi-packet streams, READY +# interleave, per-transport isolation, and timeout behaviour when +# the buffer has incomplete frame data. +# --------------------------------------------------------------------------- + +class TestRecvPacketAsyncLeftoverStress: + @pytest.mark.asyncio + async def test_frame_split_across_two_reads_recombines(self) -> None: + """A single frame can arrive split across two transport reads + (typical for large TCP packets that cross MTU). The parser + must accumulate until the delimiter and parse cleanly.""" + from defib.transport.mock import MockTransport + pkt = make_device_packet(RSP_INFO, b"X" * 32) + # Split the packet at an arbitrary mid-frame byte. + split = len(pkt) // 2 + t = MockTransport(flush_clears_buffer=False) + t.enqueue_rx(pkt[:split]) + t.enqueue_rx(pkt[split:]) + + cmd, data = await recv_packet(t, timeout=1.0) + assert cmd == RSP_INFO + assert data == b"X" * 32 + + @pytest.mark.asyncio + async def test_large_stream_50_packets_in_one_chunk(self) -> None: + """Stress: 50 small packets crammed into one chunk should + all come out, in order. Catches off-by-one bugs in the + leftover slicing.""" + from defib.transport.mock import MockTransport + N = 50 + payloads = [bytes([i, 0xAA, 0x55]) for i in range(N)] + stream = b"".join( + make_device_packet(RSP_DATA, b"\x00\x00" + p) for p in payloads + ) + t = MockTransport(flush_clears_buffer=False) + t.enqueue_rx(stream) + + for i in range(N): + cmd, data = await recv_packet(t, timeout=1.0) + assert cmd == RSP_DATA + assert data == b"\x00\x00" + payloads[i], f"packet {i} mismatch" + + @pytest.mark.asyncio + async def test_recv_response_skips_ready_in_leftover(self) -> None: + """The READY-skipping logic of recv_response (used by INFO, + CRC32, etc.) must work even when the READY and the real + response are coalesced into a single chunk via leftover.""" + from defib.transport.mock import MockTransport + chunk = ( + make_device_packet(RSP_READY, b"DEFIB") + + make_device_packet(RSP_READY, b"DEFIB") + + make_device_packet(RSP_INFO, b"PAYLOAD!") + + make_device_packet(RSP_READY, b"DEFIB") # trailing READY queued + ) + t = MockTransport(flush_clears_buffer=False) + t.enqueue_rx(chunk) + + cmd, data = await recv_response(t, timeout=1.0) + assert cmd == RSP_INFO + assert data == b"PAYLOAD!" + # The trailing READY is still parseable on the next call — + # leftover survived recv_response's internal recv_packet calls. + cmd2, _ = await recv_packet(t, timeout=1.0) + assert cmd2 == RSP_READY + + @pytest.mark.asyncio + async def test_per_transport_isolation(self) -> None: + """Two transports must not share leftover state. If they did, + bytes from one socket would surface on another's read.""" + from defib.transport.mock import MockTransport + pkt_a = make_device_packet(RSP_INFO, b"AAA") + pkt_b = make_device_packet(RSP_DATA, b"\x00\x00BBB") + ta = MockTransport(flush_clears_buffer=False) + tb = MockTransport(flush_clears_buffer=False) + # Two whole packets per transport — leftover gets populated. + ta.enqueue_rx(pkt_a + pkt_a) + tb.enqueue_rx(pkt_b + pkt_b) + + # Interleave reads + ca, _ = await recv_packet(ta, timeout=1.0) + cb, _ = await recv_packet(tb, timeout=1.0) + ca2, _ = await recv_packet(ta, timeout=1.0) + cb2, _ = await recv_packet(tb, timeout=1.0) + + assert ca == RSP_INFO + assert ca2 == RSP_INFO + assert cb == RSP_DATA + assert cb2 == RSP_DATA + + @pytest.mark.asyncio + async def test_incomplete_frame_in_leftover_blocks_until_timeout(self) -> None: + """A leftover containing only PART of a frame (no delimiter yet) + must wait for more data and time out cleanly if none arrives — + never spuriously return a partial frame.""" + from defib.transport.mock import MockTransport + pkt = make_device_packet(RSP_DATA, b"\x00\x00" + b"Z" * 16) + # Half the packet only, no delimiter. + t = MockTransport(flush_clears_buffer=False) + t.enqueue_rx(pkt[: len(pkt) // 2]) + + with pytest.raises(TransportTimeout): + await recv_packet(t, timeout=0.2) + + @pytest.mark.asyncio + async def test_corrupt_frame_skipped_then_recovers(self) -> None: + """A frame that fails CRC mid-stream must be discarded and the + parser must recover to the next valid frame.""" + from defib.transport.mock import MockTransport + # Build a packet, then flip a bit in the middle to corrupt + # the CRC. The parser should clear that frame and pick up the + # next valid one. + good = make_device_packet(RSP_INFO, b"GOOD") + broken = bytearray(make_device_packet(RSP_DATA, b"\x00\x00" + b"BAD!")) + broken[4] ^= 0x42 # flip a payload bit → CRC32 mismatch + ok2 = make_device_packet(RSP_ACK, bytes([ACK_OK])) + + t = MockTransport(flush_clears_buffer=False) + t.enqueue_rx(bytes(broken) + good + ok2) + + # First call: parser sees broken frame (CRC fail, discards), + # then sees the GOOD packet. + cmd, data = await recv_packet(t, timeout=1.0) + assert cmd == RSP_INFO + assert data == b"GOOD" + # Second call should hit the trailing ACK via leftover. + cmd2, _ = await recv_packet(t, timeout=1.0) + assert cmd2 == RSP_ACK + + # --------------------------------------------------------------------------- # Tests: send_packet # --------------------------------------------------------------------------- diff --git a/tests/test_power_rack.py b/tests/test_power_rack.py index 269dc7f..ec2e371 100644 --- a/tests/test_power_rack.py +++ b/tests/test_power_rack.py @@ -197,3 +197,209 @@ def raise_oserr(req: Any, timeout: float | None = None) -> None: def _explode(*args: Any, **kwargs: Any) -> Any: # noqa: ANN401 raise AssertionError("urlopen must not be called in this test") + + +# --------------------------------------------------------------------------- +# fastboot() — binary blob wire format, success/failure shapes +# --------------------------------------------------------------------------- + +def _parse_fastboot_blob(blob: bytes) -> dict[str, object]: + """Decode the binary blob the pod's /fastboot endpoint expects, so + tests can assert on what the host packed.""" + off = 0 + + def u32() -> int: + nonlocal off + v = int.from_bytes(blob[off:off + 4], "big") + off += 4 + return v + + def u16() -> int: + nonlocal off + v = int.from_bytes(blob[off:off + 2], "big") + off += 2 + return v + + def slice_(n: int) -> bytes: + nonlocal off + v = blob[off:off + n] + off += n + return v + + spl_address = u32() + ddr_step_address = u32() + uboot_address = u32() + prestep0 = slice_(u16()) + ddrstep0 = slice_(u16()) + prestep1 = slice_(u16()) + spl = slice_(u32()) + agent = slice_(u32()) + assert off == len(blob), f"trailing bytes ({len(blob) - off}) past parsed fields" + return { + "spl_address": spl_address, + "ddr_step_address": ddr_step_address, + "uboot_address": uboot_address, + "prestep0": prestep0, + "ddrstep0": ddrstep0, + "prestep1": prestep1, + "spl": spl, + "agent": agent, + } + + +class TestFastbootWireFormat: + """Round-trip the binary blob the pod's /fastboot endpoint expects. + + The C side in rack/firmware/main/http_api.c reads: + [u32 spl_address][u32 ddr_step_address][u32 uboot_address] + [u16 prestep0_len][prestep0][u16 ddrstep0_len][ddrstep0] + [u16 prestep1_len][prestep1][u32 spl_len][spl][u32 agent_len][agent] + all big-endian. Pin that down so a host/pod mismatch breaks loudly. + """ + + @pytest.mark.asyncio + async def test_packs_expected_layout( + self, monkeypatch: pytest.MonkeyPatch, + ) -> None: + ctrl = RackController(host="pod", port=8080) + body = b'{"success":true,"last_phase":"done","elapsed_ms":4521}' + + with patched_urlopen(monkeypatch, body=body) as rec: + await ctrl.fastboot( + spl_address=0x04010500, + ddr_step_address=0x04013000, + uboot_address=0x41000000, + prestep0=b"\x01\x02\x03\x04", + ddrstep0=b"\x05\x06", + prestep1=None, + spl=b"S" * 256, + agent=b"A" * 128, + ) + + assert len(rec.calls) == 1 + method, url, data = rec.calls[0] + assert method == "POST" + assert url == "http://pod:8080/fastboot" + assert data is not None + parsed = _parse_fastboot_blob(data) + assert parsed["spl_address"] == 0x04010500 + assert parsed["ddr_step_address"] == 0x04013000 + assert parsed["uboot_address"] == 0x41000000 + assert parsed["prestep0"] == b"\x01\x02\x03\x04" + assert parsed["ddrstep0"] == b"\x05\x06" + assert parsed["prestep1"] == b"" # None → empty + assert parsed["spl"] == b"S" * 256 + assert parsed["agent"] == b"A" * 128 + + @pytest.mark.asyncio + async def test_prestep1_passthrough( + self, monkeypatch: pytest.MonkeyPatch, + ) -> None: + ctrl = RackController(host="pod", port=8080) + prestep1 = bytes(range(64)) + with patched_urlopen(monkeypatch) as rec: + await ctrl.fastboot( + spl_address=0, ddr_step_address=0, uboot_address=0, + prestep0=b"", ddrstep0=b"", prestep1=prestep1, + spl=b"", agent=b"", + ) + parsed = _parse_fastboot_blob(rec.calls[0][2]) + assert parsed["prestep1"] == prestep1 + + @pytest.mark.asyncio + async def test_success_response_returned_verbatim( + self, monkeypatch: pytest.MonkeyPatch, + ) -> None: + ctrl = RackController(host="pod", port=8080) + body = ( + b'{"success":true,"last_phase":"done",' + b'"elapsed_ms":4521,"handshake_markers":7}' + ) + with patched_urlopen(monkeypatch, body=body): + result = await ctrl.fastboot( + spl_address=0, ddr_step_address=0, uboot_address=0, + prestep0=b"", ddrstep0=b"", prestep1=None, + spl=b"", agent=b"", + ) + assert result == { + "success": True, + "last_phase": "done", + "elapsed_ms": 4521, + "handshake_markers": 7, + } + + @pytest.mark.asyncio + async def test_pod_500_returns_json_body_not_exception( + self, monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Pod returns 500 + JSON for protocol failure. The host must + surface that JSON (so callers can read failed_phase / error) + rather than raising — a HTTPError surfaces ONLY for non-JSON + responses.""" + import urllib.error + err_body = ( + b'{"success":false,"last_phase":"prestep0",' + b'"failed_phase":"prestep0","error":"PRESTEP0 HEAD",' + b'"elapsed_ms":214,"handshake_markers":5}' + ) + + def http500(req: Any, timeout: float | None = None) -> None: + raise urllib.error.HTTPError( + url=req.full_url, code=500, msg="Internal Server Error", + hdrs=None, # type: ignore[arg-type] + fp=io.BytesIO(err_body), + ) + + monkeypatch.setattr(rack_mod.urllib.request, "urlopen", http500) + ctrl = RackController(host="pod", port=8080) + result = await ctrl.fastboot( + spl_address=0, ddr_step_address=0, uboot_address=0, + prestep0=b"", ddrstep0=b"", prestep1=None, + spl=b"", agent=b"", + ) + assert result["success"] is False + assert result["failed_phase"] == "prestep0" + assert "PRESTEP0" in str(result["error"]) + + @pytest.mark.asyncio + async def test_pod_unreachable_raises_power_controller_error( + self, monkeypatch: pytest.MonkeyPatch, + ) -> None: + import urllib.error + + def raise_urlerr(req: Any, timeout: float | None = None) -> None: + raise urllib.error.URLError("no route to host") + + monkeypatch.setattr(rack_mod.urllib.request, "urlopen", raise_urlerr) + ctrl = RackController(host="pod", port=8080) + with pytest.raises(PowerControllerError, match="rack unreachable"): + await ctrl.fastboot( + spl_address=0, ddr_step_address=0, uboot_address=0, + prestep0=b"", ddrstep0=b"", prestep1=None, + spl=b"", agent=b"", + ) + + @pytest.mark.asyncio + async def test_realistic_blob_size_within_pod_limit( + self, monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Pod caps body at 1 MiB (FASTBOOT_MAX_BODY). A typical + upload is profile (~140 B) + SPL (~24 KB) + agent (~17 KB) + ≈ 41 KB. Make sure our packing matches that ballpark.""" + ctrl = RackController(host="pod", port=8080) + prestep = b"\xab" * 64 + ddr = b"\xcd" * 64 + spl = b"\x90" * 24_576 + agent = b"\x55" * 17_104 + with patched_urlopen(monkeypatch) as rec: + await ctrl.fastboot( + spl_address=0x04010500, + ddr_step_address=0x04013000, + uboot_address=0x41000000, + prestep0=prestep, ddrstep0=ddr, prestep1=None, + spl=spl, agent=agent, + ) + # 3*u32 + 3*u16 + 64 + 64 + 0 + u32 + 24576 + u32 + 17104 + # = 12 + 6 + 128 + 4 + 24576 + 4 + 17104 = 41834 + assert len(rec.calls[0][2]) == 41834 + assert len(rec.calls[0][2]) < 1024 * 1024 # < FASTBOOT_MAX_BODY