Skip to content

Commit 79b3508

Browse files
DanSDS-Mode
authored andcommitted
fix(control): route queue-drained Approve audit row to caller, not lead (#366 twin)
When an approval is raised before any TUI has attached, it queues on `ApprovalState.queue`. When a TUI later connects, the server drains the queue into the bridge map and emits `ApprovalRequest` events; the operator's subsequent `Approve` op flows through `control::server`'s Approve handler, which is responsible for writing the `ApprovalResponse` audit row to `events.jsonl`. That handler hard-coded `&state.root.lead_id` as the actor whose `tasks/<id>/events.jsonl` received the row, so a worker or sub-lead approval response landed in the lead's audit file and the originating actor's events.jsonl was missing its `approval_response` correlation to the earlier `approval_request`. The bridge-side handler (`ApprovalBridge::respond`) was already fixed for this in #366; the queue-drain twin was missed. The just-extracted `caller_id` local (from `bridge_entry.metadata.task_id`) is the value that should have been used — F-ARCH-12 made the variable explicit one line above the buggy call, which is what surfaced the bug. Add a regression test `approve_op_writes_response_row_to_caller_tasks_dir` that pre-seeds the bridge with `task_id = "worker-7"`, sends an Approve op, and asserts: 1. `<run_subdir>/tasks/worker-7/events.jsonl` exists and contains an `approval_response` row with the matching `request_id`. 2. The lead's `events.jsonl` does NOT contain the worker's request_id (covering the specific pre-fix misdirection). Also updates `pitboss-tui/tests/connect_timeout_regression.rs` to use `pitboss_cli::live_stream::*` (instead of the renamed `::stream::*`); that test landed in this branch from a parallel review pass and was written against the pre-F-ARCH-6 module name. Discovered during the F-ARCH-12 review pass on PR #622.
1 parent e3451e0 commit 79b3508

2 files changed

Lines changed: 135 additions & 6 deletions

File tree

crates/pitboss-cli/src/control/server.rs

Lines changed: 130 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1541,9 +1541,16 @@ async fn dispatch_op(
15411541
// control-socket path produces the same audit trail as
15421542
// ApprovalBridge::respond would. Matters when the approval
15431543
// was drained from the queue (no TUI at request time).
1544+
//
1545+
// #366 twin: this row must land in the *originating actor's*
1546+
// tasks dir, not the root lead's, so request/response pairs
1547+
// co-locate under one events.jsonl. `caller_id` was extracted
1548+
// from `bridge_entry.metadata.task_id` above for this purpose;
1549+
// pre-fix we passed `state.root.lead_id` and worker/sub-lead
1550+
// approvals lost their response row entirely.
15441551
let _ = crate::dispatch::events::append_event(
15451552
&state.root.run_subdir,
1546-
&state.root.lead_id,
1553+
&caller_id,
15471554
&crate::dispatch::events::TaskEvent::ApprovalResponse {
15481555
at: chrono::Utc::now(),
15491556
request_id: request_id.clone(),
@@ -1994,6 +2001,128 @@ mod tests {
19942001
drop(handle);
19952002
}
19962003

2004+
/// #366 twin: when an approval drains from the queue (request was
2005+
/// raised before any TUI attached) and the operator subsequently
2006+
/// approves it via the control socket, the `ApprovalResponse` audit
2007+
/// row must land under `tasks/<requesting actor>/events.jsonl`, NOT
2008+
/// under `tasks/<root lead>/events.jsonl`. Pre-fix the handler hard-
2009+
/// coded `state.root.lead_id`, so a worker's approval response row
2010+
/// silently went to the lead's file and the worker's events.jsonl
2011+
/// was missing its `approval_response` audit entry — same bug class
2012+
/// as #366 in `ApprovalBridge::respond`, which was fixed for the
2013+
/// bridge path but not for the queue-drain path.
2014+
#[tokio::test]
2015+
async fn approve_op_writes_response_row_to_caller_tasks_dir() {
2016+
use std::time::Duration;
2017+
2018+
let dir = TempDir::new().unwrap();
2019+
let run_id = Uuid::now_v7();
2020+
let state = mk_state(dir.path(), run_id);
2021+
2022+
// Pre-seed the bridge as if the queue-drain path placed the
2023+
// entry there. The originating actor is a worker, not the lead.
2024+
let (tx, _rx) = tokio::sync::oneshot::channel();
2025+
state.root.approvals.bridge.lock().await.insert(
2026+
"req-worker-approval".into(),
2027+
crate::dispatch::state::BridgeEntry {
2028+
responder: tx,
2029+
metadata: crate::dispatch::state::ApprovalMetadata {
2030+
task_id: "worker-7".into(),
2031+
summary: "drop staging index".into(),
2032+
plan: None,
2033+
kind: crate::control::protocol::ApprovalKind::Action,
2034+
ttl_secs: None,
2035+
fallback: None,
2036+
created_at: chrono::Utc::now(),
2037+
},
2038+
},
2039+
);
2040+
2041+
let sock = dir.path().join("approve-routing.sock");
2042+
let handle = start_control_server(
2043+
sock.clone(),
2044+
"0.17.0".into(),
2045+
run_id.to_string(),
2046+
"hierarchical".into(),
2047+
state.clone(),
2048+
)
2049+
.await
2050+
.unwrap();
2051+
2052+
let mut stream = tokio::net::UnixStream::connect(&sock).await.unwrap();
2053+
let (r, mut w) = stream.split();
2054+
let mut lines = BufReader::new(r).lines();
2055+
2056+
// Same hello + replay drain as the sibling test (the server
2057+
// replays live bridge entries on Hello, #102, and that replay
2058+
// arrives before our Approve reply).
2059+
w.write_all(b"{\"op\":\"hello\",\"client_version\":\"0.17.0\"}\n")
2060+
.await
2061+
.unwrap();
2062+
let _hello = lines.next_line().await.unwrap();
2063+
let _replay = lines.next_line().await.unwrap().unwrap();
2064+
2065+
w.write_all(
2066+
b"{\"op\":\"approve\",\"request_id\":\"req-worker-approval\",\"approved\":true}\n",
2067+
)
2068+
.await
2069+
.unwrap();
2070+
let reply_line = lines.next_line().await.unwrap().unwrap();
2071+
let reply: ControlEvent = serde_json::from_str(&reply_line).unwrap();
2072+
assert!(matches!(
2073+
reply,
2074+
ControlEvent::OpAcked { ref op, .. } if op == "approve"
2075+
));
2076+
2077+
// Give the handler a brief moment to finish the post-ack
2078+
// events.jsonl write (it happens before the OpAcked reply but
2079+
// the file may not be visible until the kernel flushes — short
2080+
// poll with a deadline rather than a fixed sleep).
2081+
let worker_events = state
2082+
.root
2083+
.run_subdir
2084+
.join("tasks")
2085+
.join("worker-7")
2086+
.join("events.jsonl");
2087+
let deadline = tokio::time::Instant::now() + Duration::from_millis(500);
2088+
while !worker_events.exists() && tokio::time::Instant::now() < deadline {
2089+
tokio::time::sleep(Duration::from_millis(10)).await;
2090+
}
2091+
assert!(
2092+
worker_events.exists(),
2093+
"approval_response row should land under worker-7's tasks dir; \
2094+
instead nothing was written. Path checked: {}",
2095+
worker_events.display(),
2096+
);
2097+
let body = tokio::fs::read_to_string(&worker_events).await.unwrap();
2098+
assert!(
2099+
body.contains("\"kind\":\"approval_response\""),
2100+
"worker-7 events.jsonl missing approval_response: {body}",
2101+
);
2102+
assert!(
2103+
body.contains("\"request_id\":\"req-worker-approval\""),
2104+
"worker-7 events.jsonl missing request_id correlation: {body}",
2105+
);
2106+
2107+
// The lead's events.jsonl must NOT have received this row
2108+
// (the pre-fix bug had it landing there instead).
2109+
let lead_events = state
2110+
.root
2111+
.run_subdir
2112+
.join("tasks")
2113+
.join(&state.root.lead_id)
2114+
.join("events.jsonl");
2115+
if lead_events.exists() {
2116+
let lead_body = tokio::fs::read_to_string(&lead_events).await.unwrap();
2117+
assert!(
2118+
!lead_body.contains("req-worker-approval"),
2119+
"lead events.jsonl wrongly received worker-7's approval row: {lead_body}",
2120+
);
2121+
}
2122+
2123+
drop(handle);
2124+
}
2125+
19972126
#[tokio::test]
19982127
async fn reconnecting_tui_receives_replay_of_bridge_pending_approvals() {
19992128
// Regression for #102 — "ghost approval". A prior TUI received an

crates/pitboss-tui/tests/connect_timeout_regression.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,16 @@ fn timeout_constructed_inside_async_block_does_not_panic() {
3131
// runtime context is entered. `open_run_session` returns a
3232
// `RunStreamSession` directly (not a `Result`), so the outer
3333
// `Result` is purely the timeout's elapsed-vs-completed flag.
34-
let result: Result<pitboss_cli::stream::RunStreamSession, tokio::time::error::Elapsed> =
34+
let result: Result<pitboss_cli::live_stream::RunStreamSession, tokio::time::error::Elapsed> =
3535
runtime.block_on(async {
3636
tokio::time::timeout(
3737
Duration::from_millis(50),
38-
pitboss_cli::stream::open_run_session(
38+
pitboss_cli::live_stream::open_run_session(
3939
PathBuf::from("/tmp/pitboss-tui-tests-nonexistent-run-dir"),
4040
Some(PathBuf::from(
4141
"/tmp/pitboss-tui-tests-nonexistent-control.sock",
4242
)),
43-
pitboss_cli::stream::LiveStreamMode::LiveOnly,
43+
pitboss_cli::live_stream::LiveStreamMode::LiveOnly,
4444
),
4545
)
4646
.await
@@ -75,12 +75,12 @@ fn naive_block_on_pattern_panics_when_constructed_outside_runtime() {
7575
let panicked = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
7676
runtime.block_on(tokio::time::timeout(
7777
Duration::from_millis(50),
78-
pitboss_cli::stream::open_run_session(
78+
pitboss_cli::live_stream::open_run_session(
7979
PathBuf::from("/tmp/pitboss-tui-tests-nonexistent-run-dir"),
8080
Some(PathBuf::from(
8181
"/tmp/pitboss-tui-tests-nonexistent-control.sock",
8282
)),
83-
pitboss_cli::stream::LiveStreamMode::LiveOnly,
83+
pitboss_cli::live_stream::LiveStreamMode::LiveOnly,
8484
),
8585
))
8686
}))

0 commit comments

Comments
 (0)