Commit 06ea538
authored
fix(broker): harden dashboard replay against silent live-channel drops (#1224)
* fix(broker): harden dashboard replay against silent live-channel drops
Pear observed relay_inbound messages that reached the hosted Relaycast
observer but never showed up in the local Electron UI. Root cause: Pear's
Electron main process holds one long-lived WS connection to the broker's
/ws endpoint, and the broker's replay buffer (used to backfill missed
events on reconnect) shares its capacity with every event kind it emits,
including worker_stream (one message per PTY output chunk). A burst of
terminal output could, in principle, evict a relay_inbound event before a
reconnecting client ever asked for it.
worker_stream/delivery_active were already excluded from replay-buffer
storage on main, which closes that specific eviction path. Regression
tests are added to lock that behavior in (a worker_stream/delivery_active
flood no longer evicts an earlier relay_inbound entry) and to document, at
the ReplayBuffer level, that the buffer itself is kind-agnostic FIFO —
callers must keep ephemeral kinds out of it.
The bigger, previously-unaddressed gap: the *live* broadcast::channel that
feeds the dashboard WS loop is bounded independently of the replay buffer
(capacity 512). A worker_stream burst can still overflow it for a
momentarily slow reader, producing RecvError::Lagged — which the broker
only logged server-side. The client's connection stayed open and it never
learned anything was dropped, so it silently drifted out of sync with no
prompt to resync. This is fixed by having the WS loop, on Lagged, consult
the replay buffer (which independently retains durable events) to backfill
whatever the broadcast channel dropped, and only fall back to an explicit
replay_gap frame if the replay buffer has also aged the range out.
Also fixed a genuine off-by-one in ReplayBuffer::replay_since: since_seq <
oldest incorrectly flagged a gap whenever a brand-new client (since_seq=0,
the default) connected after at least one durable event had ever been
recorded, even with zero evictions, because sequence numbers are 1-based.
Every first-time dashboard connection was getting a spurious replay_gap
frame. Changed to since_seq + 1 < oldest.
Finally, the replay_gap frame (both the WS push and the HTTP
/api/events/replay endpoint) now includes droppedCount, the number of
durable events unrecoverably lost, so a consuming client can gauge gap
severity instead of only getting a boolean.
Test plan:
- cargo check -p agent-relay-broker
- cargo clippy -p agent-relay-broker -- -D warnings (clean)
- cargo fmt -p agent-relay-broker -- --check (clean)
- cargo test -p agent-relay-broker --lib (808 passed, 0 failed)
* fix(broker): address PR review findings (overflow + parse efficiency)
- replay_buffer.rs: since_seq.saturating_add(1) < oldest instead of
since_seq + 1 < oldest. A client-supplied since_seq is untrusted (e.g.
the sinceSeq query param); u64::MAX would overflow the plain addition,
panicking in debug/overflow-checked builds and wrapping to 0 in release
(falsely reporting a gap for a cursor that's actually ahead of
everything retained). Flagged independently by gemini-code-assist and
chatgpt-codex-connector. Added a regression test.
- listen_api.rs: the dashboard WS live loop was parsing every broadcast
message into a full serde_json::Value just to read the optional seq
field, allocating a full tree (including large terminal-output strings
on worker_stream chunks) to check one field. Replaced with a minimal
MessageSeq { seq: Option<u64> } struct + extract_seq() helper, so serde
skips unknown fields (including large strings) without allocating them.
Flagged by gemini-code-assist. Added unit tests.
Verified: cargo check, cargo clippy -D warnings, cargo fmt --check all
clean; cargo test --lib: 812 passed, 0 failed.
* fix(broker): eliminate TOCTOU race in catch_up_after_lag cutoff
current_seq() was read before replay_since() in catch_up_after_lag. A
durable event pushed (or an eviction) between those two calls could leave
the resulting cutoff_seq stale relative to what replay_since actually
returned, potentially pairing a replay_gap frame's seq with a mismatched
oldestAvailable/droppedCount, or causing the entry.seq <= cutoff_seq
filter to wrongly drop entries replay_since had already committed to
returning.
Fixed by deriving cutoff_seq from the entries snapshot itself (the last
entry's seq, falling back to last_forwarded_seq when empty), which is
what replay_since actually returned under a single atomic read lock —
removing both the separate current_seq() call and the now-redundant
filter. Flagged independently by coderabbitai and cubic-dev-ai.
Added catch_up_after_lag_cutoff_is_consistent_with_returned_entries,
proving the returned cutoff always matches the highest seq actually
forwarded even when more durable events land after last_forwarded_seq was
decided but before recovery runs.
Verified: cargo check, cargo clippy -D warnings, cargo fmt --check all
clean; cargo test --lib: 813 passed, 0 failed.
* test(broker): exercise the TOCTOU race with genuine concurrency
cubic flagged that catch_up_after_lag_cutoff_is_consistent_with_returned_entries
inserted all durable events before catch_up_after_lag ran, so it never
actually raced anything -- the old current_seq()-first implementation
would have passed it too. Correct: the previous test only proved the
current implementation's static invariant, not the interleaving that
motivated the fix.
Added a_push_racing_between_cutoff_read_and_replay_since_produces_a_stale_cutoff,
which uses two real tokio tasks and a oneshot-channel handshake to
guarantee (deterministically, not via timing) that a durable push lands
strictly between a current_seq() snapshot and a subsequent replay_since()
call -- reconstructing the pre-fix computation inline under genuine
concurrency, since the fix removes that separate snapshot step from
catch_up_after_lag entirely (so there's no window left in the current
implementation to inject a hook into without adding test-only
instrumentation to production code). This proves the underlying hazard is
real under concurrent execution, then confirms catch_up_after_lag itself
does not lose the racing event when run against the resulting buffer
state.
Verified: cargo check, cargo clippy -D warnings, cargo fmt --check all
clean; cargo test --lib: 814 passed, 0 failed (new test re-run 5x to
confirm it's deterministic, not flaky).1 parent f300426 commit 06ea538
2 files changed
Lines changed: 638 additions & 15 deletions
0 commit comments