Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions tests/test_agent_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,141 @@ async def test_async_path_three_packets_one_chunk(self):
assert seen == [RSP_READY, RSP_DATA, RSP_ACK]


# ---------------------------------------------------------------------------
# Tests: recv_packet async-leftover buffer stress
#
# These exercise the per-transport buffer that recv_packet keeps for
# bytes that arrived after a frame's delimiter — a regression class
# (PR #86) that's worth pinning down with stream-style scenarios:
# split frames across reads, large multi-packet streams, READY
# interleave, per-transport isolation, and timeout behaviour when
# the buffer has incomplete frame data.
# ---------------------------------------------------------------------------

class TestRecvPacketAsyncLeftoverStress:
@pytest.mark.asyncio
async def test_frame_split_across_two_reads_recombines(self) -> None:
"""A single frame can arrive split across two transport reads
(typical for large TCP packets that cross MTU). The parser
must accumulate until the delimiter and parse cleanly."""
from defib.transport.mock import MockTransport
pkt = make_device_packet(RSP_INFO, b"X" * 32)
# Split the packet at an arbitrary mid-frame byte.
split = len(pkt) // 2
t = MockTransport(flush_clears_buffer=False)
t.enqueue_rx(pkt[:split])
t.enqueue_rx(pkt[split:])

cmd, data = await recv_packet(t, timeout=1.0)
assert cmd == RSP_INFO
assert data == b"X" * 32

@pytest.mark.asyncio
async def test_large_stream_50_packets_in_one_chunk(self) -> None:
"""Stress: 50 small packets crammed into one chunk should
all come out, in order. Catches off-by-one bugs in the
leftover slicing."""
from defib.transport.mock import MockTransport
N = 50
payloads = [bytes([i, 0xAA, 0x55]) for i in range(N)]
stream = b"".join(
make_device_packet(RSP_DATA, b"\x00\x00" + p) for p in payloads
)
t = MockTransport(flush_clears_buffer=False)
t.enqueue_rx(stream)

for i in range(N):
cmd, data = await recv_packet(t, timeout=1.0)
assert cmd == RSP_DATA
assert data == b"\x00\x00" + payloads[i], f"packet {i} mismatch"

@pytest.mark.asyncio
async def test_recv_response_skips_ready_in_leftover(self) -> None:
"""The READY-skipping logic of recv_response (used by INFO,
CRC32, etc.) must work even when the READY and the real
response are coalesced into a single chunk via leftover."""
from defib.transport.mock import MockTransport
chunk = (
make_device_packet(RSP_READY, b"DEFIB")
+ make_device_packet(RSP_READY, b"DEFIB")
+ make_device_packet(RSP_INFO, b"PAYLOAD!")
+ make_device_packet(RSP_READY, b"DEFIB") # trailing READY queued
)
t = MockTransport(flush_clears_buffer=False)
t.enqueue_rx(chunk)

cmd, data = await recv_response(t, timeout=1.0)
assert cmd == RSP_INFO
assert data == b"PAYLOAD!"
# The trailing READY is still parseable on the next call —
# leftover survived recv_response's internal recv_packet calls.
cmd2, _ = await recv_packet(t, timeout=1.0)
assert cmd2 == RSP_READY

@pytest.mark.asyncio
async def test_per_transport_isolation(self) -> None:
"""Two transports must not share leftover state. If they did,
bytes from one socket would surface on another's read."""
from defib.transport.mock import MockTransport
pkt_a = make_device_packet(RSP_INFO, b"AAA")
pkt_b = make_device_packet(RSP_DATA, b"\x00\x00BBB")
ta = MockTransport(flush_clears_buffer=False)
tb = MockTransport(flush_clears_buffer=False)
# Two whole packets per transport — leftover gets populated.
ta.enqueue_rx(pkt_a + pkt_a)
tb.enqueue_rx(pkt_b + pkt_b)

# Interleave reads
ca, _ = await recv_packet(ta, timeout=1.0)
cb, _ = await recv_packet(tb, timeout=1.0)
ca2, _ = await recv_packet(ta, timeout=1.0)
cb2, _ = await recv_packet(tb, timeout=1.0)

assert ca == RSP_INFO
assert ca2 == RSP_INFO
assert cb == RSP_DATA
assert cb2 == RSP_DATA

@pytest.mark.asyncio
async def test_incomplete_frame_in_leftover_blocks_until_timeout(self) -> None:
"""A leftover containing only PART of a frame (no delimiter yet)
must wait for more data and time out cleanly if none arrives —
never spuriously return a partial frame."""
from defib.transport.mock import MockTransport
pkt = make_device_packet(RSP_DATA, b"\x00\x00" + b"Z" * 16)
# Half the packet only, no delimiter.
t = MockTransport(flush_clears_buffer=False)
t.enqueue_rx(pkt[: len(pkt) // 2])

with pytest.raises(TransportTimeout):
await recv_packet(t, timeout=0.2)

@pytest.mark.asyncio
async def test_corrupt_frame_skipped_then_recovers(self) -> None:
"""A frame that fails CRC mid-stream must be discarded and the
parser must recover to the next valid frame."""
from defib.transport.mock import MockTransport
# Build a packet, then flip a bit in the middle to corrupt
# the CRC. The parser should clear that frame and pick up the
# next valid one.
good = make_device_packet(RSP_INFO, b"GOOD")
broken = bytearray(make_device_packet(RSP_DATA, b"\x00\x00" + b"BAD!"))
broken[4] ^= 0x42 # flip a payload bit → CRC32 mismatch
ok2 = make_device_packet(RSP_ACK, bytes([ACK_OK]))

t = MockTransport(flush_clears_buffer=False)
t.enqueue_rx(bytes(broken) + good + ok2)

# First call: parser sees broken frame (CRC fail, discards),
# then sees the GOOD packet.
cmd, data = await recv_packet(t, timeout=1.0)
assert cmd == RSP_INFO
assert data == b"GOOD"
# Second call should hit the trailing ACK via leftover.
cmd2, _ = await recv_packet(t, timeout=1.0)
assert cmd2 == RSP_ACK


# ---------------------------------------------------------------------------
# Tests: send_packet
# ---------------------------------------------------------------------------
Expand Down
206 changes: 206 additions & 0 deletions tests/test_power_rack.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,209 @@ def raise_oserr(req: Any, timeout: float | None = None) -> None:

def _explode(*args: Any, **kwargs: Any) -> Any: # noqa: ANN401
raise AssertionError("urlopen must not be called in this test")


# ---------------------------------------------------------------------------
# fastboot() — binary blob wire format, success/failure shapes
# ---------------------------------------------------------------------------

def _parse_fastboot_blob(blob: bytes) -> dict[str, object]:
"""Decode the binary blob the pod's /fastboot endpoint expects, so
tests can assert on what the host packed."""
off = 0

def u32() -> int:
nonlocal off
v = int.from_bytes(blob[off:off + 4], "big")
off += 4
return v

def u16() -> int:
nonlocal off
v = int.from_bytes(blob[off:off + 2], "big")
off += 2
return v

def slice_(n: int) -> bytes:
nonlocal off
v = blob[off:off + n]
off += n
return v

spl_address = u32()
ddr_step_address = u32()
uboot_address = u32()
prestep0 = slice_(u16())
ddrstep0 = slice_(u16())
prestep1 = slice_(u16())
spl = slice_(u32())
agent = slice_(u32())
assert off == len(blob), f"trailing bytes ({len(blob) - off}) past parsed fields"
return {
"spl_address": spl_address,
"ddr_step_address": ddr_step_address,
"uboot_address": uboot_address,
"prestep0": prestep0,
"ddrstep0": ddrstep0,
"prestep1": prestep1,
"spl": spl,
"agent": agent,
}


class TestFastbootWireFormat:
"""Round-trip the binary blob the pod's /fastboot endpoint expects.

The C side in rack/firmware/main/http_api.c reads:
[u32 spl_address][u32 ddr_step_address][u32 uboot_address]
[u16 prestep0_len][prestep0][u16 ddrstep0_len][ddrstep0]
[u16 prestep1_len][prestep1][u32 spl_len][spl][u32 agent_len][agent]
all big-endian. Pin that down so a host/pod mismatch breaks loudly.
"""

@pytest.mark.asyncio
async def test_packs_expected_layout(
self, monkeypatch: pytest.MonkeyPatch,
) -> None:
ctrl = RackController(host="pod", port=8080)
body = b'{"success":true,"last_phase":"done","elapsed_ms":4521}'

with patched_urlopen(monkeypatch, body=body) as rec:
await ctrl.fastboot(
spl_address=0x04010500,
ddr_step_address=0x04013000,
uboot_address=0x41000000,
prestep0=b"\x01\x02\x03\x04",
ddrstep0=b"\x05\x06",
prestep1=None,
spl=b"S" * 256,
agent=b"A" * 128,
)

assert len(rec.calls) == 1
method, url, data = rec.calls[0]
assert method == "POST"
assert url == "http://pod:8080/fastboot"
assert data is not None
parsed = _parse_fastboot_blob(data)
assert parsed["spl_address"] == 0x04010500
assert parsed["ddr_step_address"] == 0x04013000
assert parsed["uboot_address"] == 0x41000000
assert parsed["prestep0"] == b"\x01\x02\x03\x04"
assert parsed["ddrstep0"] == b"\x05\x06"
assert parsed["prestep1"] == b"" # None → empty
assert parsed["spl"] == b"S" * 256
assert parsed["agent"] == b"A" * 128

@pytest.mark.asyncio
async def test_prestep1_passthrough(
self, monkeypatch: pytest.MonkeyPatch,
) -> None:
ctrl = RackController(host="pod", port=8080)
prestep1 = bytes(range(64))
with patched_urlopen(monkeypatch) as rec:
await ctrl.fastboot(
spl_address=0, ddr_step_address=0, uboot_address=0,
prestep0=b"", ddrstep0=b"", prestep1=prestep1,
spl=b"", agent=b"",
)
parsed = _parse_fastboot_blob(rec.calls[0][2])
assert parsed["prestep1"] == prestep1

@pytest.mark.asyncio
async def test_success_response_returned_verbatim(
self, monkeypatch: pytest.MonkeyPatch,
) -> None:
ctrl = RackController(host="pod", port=8080)
body = (
b'{"success":true,"last_phase":"done",'
b'"elapsed_ms":4521,"handshake_markers":7}'
)
with patched_urlopen(monkeypatch, body=body):
result = await ctrl.fastboot(
spl_address=0, ddr_step_address=0, uboot_address=0,
prestep0=b"", ddrstep0=b"", prestep1=None,
spl=b"", agent=b"",
)
assert result == {
"success": True,
"last_phase": "done",
"elapsed_ms": 4521,
"handshake_markers": 7,
}

@pytest.mark.asyncio
async def test_pod_500_returns_json_body_not_exception(
self, monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Pod returns 500 + JSON for protocol failure. The host must
surface that JSON (so callers can read failed_phase / error)
rather than raising — a HTTPError surfaces ONLY for non-JSON
responses."""
import urllib.error
err_body = (
b'{"success":false,"last_phase":"prestep0",'
b'"failed_phase":"prestep0","error":"PRESTEP0 HEAD",'
b'"elapsed_ms":214,"handshake_markers":5}'
)

def http500(req: Any, timeout: float | None = None) -> None:
raise urllib.error.HTTPError(
url=req.full_url, code=500, msg="Internal Server Error",
hdrs=None, # type: ignore[arg-type]
fp=io.BytesIO(err_body),
)

monkeypatch.setattr(rack_mod.urllib.request, "urlopen", http500)
ctrl = RackController(host="pod", port=8080)
result = await ctrl.fastboot(
spl_address=0, ddr_step_address=0, uboot_address=0,
prestep0=b"", ddrstep0=b"", prestep1=None,
spl=b"", agent=b"",
)
assert result["success"] is False
assert result["failed_phase"] == "prestep0"
assert "PRESTEP0" in str(result["error"])

@pytest.mark.asyncio
async def test_pod_unreachable_raises_power_controller_error(
self, monkeypatch: pytest.MonkeyPatch,
) -> None:
import urllib.error

def raise_urlerr(req: Any, timeout: float | None = None) -> None:
raise urllib.error.URLError("no route to host")

monkeypatch.setattr(rack_mod.urllib.request, "urlopen", raise_urlerr)
ctrl = RackController(host="pod", port=8080)
with pytest.raises(PowerControllerError, match="rack unreachable"):
await ctrl.fastboot(
spl_address=0, ddr_step_address=0, uboot_address=0,
prestep0=b"", ddrstep0=b"", prestep1=None,
spl=b"", agent=b"",
)

@pytest.mark.asyncio
async def test_realistic_blob_size_within_pod_limit(
self, monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Pod caps body at 1 MiB (FASTBOOT_MAX_BODY). A typical
upload is profile (~140 B) + SPL (~24 KB) + agent (~17 KB)
≈ 41 KB. Make sure our packing matches that ballpark."""
ctrl = RackController(host="pod", port=8080)
prestep = b"\xab" * 64
ddr = b"\xcd" * 64
spl = b"\x90" * 24_576
agent = b"\x55" * 17_104
with patched_urlopen(monkeypatch) as rec:
await ctrl.fastboot(
spl_address=0x04010500,
ddr_step_address=0x04013000,
uboot_address=0x41000000,
prestep0=prestep, ddrstep0=ddr, prestep1=None,
spl=spl, agent=agent,
)
# 3*u32 + 3*u16 + 64 + 64 + 0 + u32 + 24576 + u32 + 17104
# = 12 + 6 + 128 + 4 + 24576 + 4 + 17104 = 41834
assert len(rec.calls[0][2]) == 41834
assert len(rec.calls[0][2]) < 1024 * 1024 # < FASTBOOT_MAX_BODY
Loading