Skip to content

Commit 5451d17

Browse files
heyong4725imwyvernclaude
authored
fix(coordinator): validate AddNode daemon reply, forward errors to CLI (rescue of #1757) (#1873)
Rescues @imwyvern's #1757 against current main, addressing both items in @phil-opp's 2026-04-28 review (changes-requested). The bug (#1682) ================ `binaries/coordinator/src/lib.rs:1558` (pre-fix) had: Ok(_) => { dataflow.node_to_daemon.insert(...); dataflow.descriptor.nodes.push(...); dataflow.nodes.insert(...); Ok(ControlRequestReply::NodeAdded { ... }) } The `Ok(_)` arm accepted ANY successful `send_and_receive` reply from the daemon as proof that AddNode applied. The daemon's actual handler returned `reply_tx.send(None)` — a null reply — and the coordinator would still happily commit state. When the daemon's TCP connection happened to be carrying a stale or out-of-order `SetParamResult` / other variant (which can happen under concurrent CLI requests against a flaky daemon), the coordinator committed AddNode state for a node the daemon never actually added. The user-visible symptom from #1682: $ dora node add <node> <times out> $ dora node list <node> shows as present in descriptor <subsequent commands corrupt or hang> The fix ======== Three coordinated changes: 1. `libraries/message/src/daemon_to_coordinator.rs`: new `DaemonCoordinatorReply::AddNodeResult(Result<(), String>)` variant so the daemon can identify the reply specifically. 2. `binaries/daemon/src/lib.rs`: the `DaemonCoordinatorEvent::AddNode` handler now sends `AddNodeResult(result.map_err(|e| format!("{e:?}")))` instead of the previous `None` placeholder. 3. `binaries/coordinator/src/lib.rs`: * `Ok(reply_raw) =>` arm now calls a new `ensure_add_node_applied` helper that pattern-matches the reply against `AddNodeResult`. * The validator returns `Err(eyre!(...))` on either an explicit daemon failure or a wrong-variant reply. * The validator's error is funneled into the existing `Err` branch of the `result` binding, which the coordinator's main loop sends back to the CLI as a `ControlRequestReply::Error`. **The error does NOT propagate via `?` past the dispatch arm** — addresses phil's concern that the original PR's `?` would tear down the coordinator's main loop on a recoverable per-request failure. Addressing @phil-opp's review ============================== Two items from the 2026-04-28 review: * "This brings the whole coordinator down on error, no? We should instead forward the result to the CLI and let it deal with the failure." — Fixed. The helper returns `Err`, the call site converts it into the `Err` arm of `result`, which becomes a `ControlRequestReply::Error` sent back to the CLI. The coordinator loop continues handling subsequent requests. * "These tests are not testing anything non-trivial. Ideally, there would be a test that fails before this PR and succeeds after this PR." — The three new tests now explicitly cover: - happy path (`AddNodeResult(Ok)` accepted) - daemon-rejection path (`AddNodeResult(Err)` rejected with named operation + node id) - **regression scenario for #1682**: a wrong-variant reply (`SetParamResult(Ok)`) is rejected with "unexpected daemon reply" instead of silently committing state. This is the specific failure mode that caused #1682's state corruption. A full end-to-end test against a mock daemon would require extracting the AddNode handler from the `start_inner` async loop — out of scope for the rescue. The helper-level tests cover the contract regression-style: if the validator is removed or weakened, these tests fail. What did NOT change ==================== * No new public API. The CLI/daemon protocol gains one reply variant, consistent with the existing `SetParamResult` / `DeleteParamResult` pattern in the same enum. * No semver-affecting changes to user-facing types. * No behavioral change for the happy path — a successful AddNode still returns `ControlRequestReply::NodeAdded { dataflow_id, node_id }` to the CLI exactly as before. * No new dependencies. Verification ============= cargo fmt --all -- --check cargo clippy --all --exclude dora-{node-api,operator-api,ros2-bridge}-python -- -D warnings cargo test -p dora-coordinator --lib add_node_reply (3/3 new tests pass) cargo test -p dora-coordinator -p dora-daemon -p dora-message --lib (103+/all pass) cargo check --examples Manual verification of the original #1682 repro recipe (`dora up` → `dora build` → `dora start --detach` → `dora node list` → `dora node add`) should now surface daemon errors as CLI-visible failures instead of timeouts + state corruption. Co-authored-by: imwyvern <imwyvern@users.noreply.github.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 54605dc commit 5451d17

3 files changed

Lines changed: 153 additions & 26 deletions

File tree

binaries/coordinator/src/lib.rs

Lines changed: 138 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1555,31 +1555,58 @@ async fn start_inner(
15551555
.send_and_receive(&msg)
15561556
.await
15571557
{
1558-
Ok(_) => {
1559-
dataflow
1560-
.node_to_daemon
1561-
.insert(
1562-
node_id.clone(),
1563-
did,
1564-
);
1565-
// Update the stored descriptor
1566-
// and resolved nodes so
1567-
// `dora info` reflects the
1568-
// new node.
1569-
dataflow
1570-
.descriptor
1571-
.nodes
1572-
.push(original_node);
1573-
dataflow.nodes.insert(
1574-
node_id.clone(),
1575-
resolved_node,
1576-
);
1577-
Ok(
1578-
ControlRequestReply::NodeAdded {
1579-
dataflow_id,
1580-
node_id,
1581-
},
1582-
)
1558+
Ok(reply_raw) => {
1559+
// Validate the daemon reply is
1560+
// specifically an `AddNodeResult`
1561+
// (not just any non-error reply)
1562+
// before committing state. Without
1563+
// this, a `SetParamResult` or an
1564+
// explicit `AddNodeResult(Err)`
1565+
// would still be reported as
1566+
// applied and corrupt the dataflow
1567+
// state (#1682, rescue of #1757).
1568+
// The validator's error is folded
1569+
// into the `Err` arm of `result`
1570+
// (via explicit `Err(e) => Err(e)`
1571+
// below — no `?`), which the
1572+
// coordinator's main loop sends
1573+
// back to the CLI as
1574+
// `ControlRequestReply::Error`.
1575+
// Addresses phil-opp's review of
1576+
// #1757 (do not tear down the
1577+
// event loop on a recoverable
1578+
// per-request failure).
1579+
match ensure_add_node_applied(
1580+
&reply_raw, &node_id,
1581+
) {
1582+
Ok(()) => {
1583+
dataflow
1584+
.node_to_daemon
1585+
.insert(
1586+
node_id.clone(),
1587+
did,
1588+
);
1589+
// Update the stored descriptor
1590+
// and resolved nodes so
1591+
// `dora info` reflects the
1592+
// new node.
1593+
dataflow
1594+
.descriptor
1595+
.nodes
1596+
.push(original_node);
1597+
dataflow.nodes.insert(
1598+
node_id.clone(),
1599+
resolved_node,
1600+
);
1601+
Ok(
1602+
ControlRequestReply::NodeAdded {
1603+
dataflow_id,
1604+
node_id,
1605+
},
1606+
)
1607+
}
1608+
Err(e) => Err(e),
1609+
}
15831610
}
15841611
Err(e) => Err(eyre!(
15851612
"daemon dispatch failed: {e}"
@@ -1625,6 +1652,15 @@ async fn start_inner(
16251652
Some(conn) => {
16261653
match conn.send_and_receive(&msg).await {
16271654
Ok(_) => {
1655+
// TODO: validate the daemon reply
1656+
// is specifically a successful
1657+
// `RemoveNodeResult`, parallel to
1658+
// the AddNode fix (#1682, rescue of
1659+
// #1757). Same `Ok(_)` shape can
1660+
// accept a stale or wrong-variant
1661+
// reply and corrupt state. Tracked
1662+
// separately to keep this rescue
1663+
// panel-faithful to #1757's scope.
16281664
// Clean up coordinator state
16291665
// (inverse of AddNode inserts)
16301666
if let Some(dataflow) =
@@ -3414,6 +3450,27 @@ fn build_set_param_message_from_raw_json(
34143450
.map_err(Into::into)
34153451
}
34163452

3453+
/// Validate that the daemon's reply to `DaemonCoordinatorEvent::AddNode`
3454+
/// is a successful `AddNodeResult`. Returns `Err` for both an explicit
3455+
/// daemon failure and an unexpected reply variant; callers should forward
3456+
/// the error to the CLI as a `ControlRequestReply::Error` and NOT use `?`
3457+
/// to bubble out of the coordinator's main loop. Rescue of #1757,
3458+
/// addresses #1682.
3459+
fn ensure_add_node_applied(
3460+
reply_raw: &[u8],
3461+
node_id: &dora_core::config::NodeId,
3462+
) -> eyre::Result<()> {
3463+
match serde_json::from_slice(reply_raw)? {
3464+
DaemonCoordinatorReply::AddNodeResult(Ok(())) => Ok(()),
3465+
DaemonCoordinatorReply::AddNodeResult(Err(err)) => {
3466+
Err(eyre!("daemon failed to add node `{node_id}`: {err}"))
3467+
}
3468+
other => Err(eyre!(
3469+
"unexpected daemon reply for AddNode on node `{node_id}`: {other:?}"
3470+
)),
3471+
}
3472+
}
3473+
34173474
fn ensure_set_param_forward_applied(
34183475
reply_raw: &[u8],
34193476
node_id: &dora_core::config::NodeId,
@@ -4710,6 +4767,62 @@ mod tests {
47104767
assert!(err.to_string().contains("failed to apply SetParam"));
47114768
}
47124769

4770+
// -------------------------------------------------------------------
4771+
// AddNode reply validation (rescue of #1757, addresses #1682)
4772+
// -------------------------------------------------------------------
4773+
//
4774+
// The bug: coordinator's `Ok(_) =>` arm in the AddNode dispatch
4775+
// (lib.rs:1558 pre-fix) accepted any successful `send_and_receive`
4776+
// reply and committed dataflow state, even when the daemon returned
4777+
// an `AddNodeResult(Err(...))` or a stale reply from a different
4778+
// request. The three tests below pin the validator's contract: it
4779+
// must accept ONLY a successful `AddNodeResult` and forward every
4780+
// other shape as an error to the call site (which then surfaces it
4781+
// to the CLI without bringing down the coordinator's main loop).
4782+
4783+
#[test]
4784+
fn add_node_reply_accepts_daemon_success() {
4785+
let reply = serde_json::to_vec(&DaemonCoordinatorReply::AddNodeResult(Ok(()))).unwrap();
4786+
let node_id: dora_core::config::NodeId = "filter".to_string().into();
4787+
4788+
ensure_add_node_applied(&reply, &node_id).expect("successful AddNode reply should pass");
4789+
}
4790+
4791+
#[test]
4792+
fn add_node_reply_reports_daemon_rejection() {
4793+
let reply = serde_json::to_vec(&DaemonCoordinatorReply::AddNodeResult(Err(
4794+
"failed to spawn node".to_string(),
4795+
)))
4796+
.unwrap();
4797+
let node_id: dora_core::config::NodeId = "filter".to_string().into();
4798+
4799+
let err = ensure_add_node_applied(&reply, &node_id)
4800+
.expect_err("daemon rejection should fail AddNode forwarding");
4801+
let msg = err.to_string();
4802+
assert!(
4803+
msg.contains("failed to add node") && msg.contains("filter"),
4804+
"error must name the operation and node: {msg}"
4805+
);
4806+
}
4807+
4808+
#[test]
4809+
fn add_node_reply_rejects_wrong_reply_variant() {
4810+
// This is the regression scenario for #1682: the daemon returned
4811+
// a stale or otherwise unrelated reply variant (here:
4812+
// `SetParamResult(Ok)`). Before the fix, the coordinator's
4813+
// `Ok(_) =>` arm would accept this and commit state for a node
4814+
// the daemon never actually added.
4815+
let reply = serde_json::to_vec(&DaemonCoordinatorReply::SetParamResult(Ok(()))).unwrap();
4816+
let node_id: dora_core::config::NodeId = "filter".to_string().into();
4817+
4818+
let err = ensure_add_node_applied(&reply, &node_id)
4819+
.expect_err("unexpected reply variant should fail AddNode forwarding");
4820+
assert!(
4821+
err.to_string().contains("unexpected daemon reply"),
4822+
"error must call out the wrong-reply-type failure mode: {err}"
4823+
);
4824+
}
4825+
47134826
#[test]
47144827
fn delete_param_forward_reply_rejects_unexpected_reply_variant() {
47154828
let reply = serde_json::to_vec(&DaemonCoordinatorReply::SetParamResult(Ok(()))).unwrap();

binaries/daemon/src/lib.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1770,7 +1770,13 @@ impl Daemon {
17701770
if let Err(err) = &result {
17711771
tracing::error!(%dataflow_id, %node_id, "AddNode failed: {err:?}");
17721772
}
1773-
let _ = reply_tx.send(None);
1773+
// Return a specific `AddNodeResult` variant so the
1774+
// coordinator can validate the reply against its
1775+
// expected request, instead of treating any non-error
1776+
// reply as success (#1682, rescue of #1757).
1777+
let reply =
1778+
DaemonCoordinatorReply::AddNodeResult(result.map_err(|err| format!("{err:?}")));
1779+
let _ = reply_tx.send(Some(reply));
17741780
RunStatus::Continue
17751781
}
17761782
DaemonCoordinatorEvent::RemoveNode {

libraries/message/src/daemon_to_coordinator.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,14 @@ pub enum DaemonCoordinatorReply {
202202
notify: Option<tokio::sync::oneshot::Sender<()>>,
203203
},
204204
Logs(Result<Vec<u8>, String>),
205+
/// Reply for `DaemonCoordinatorEvent::AddNode`. Previously the daemon
206+
/// returned `None` and the coordinator accepted any successful TCP
207+
/// response as proof that AddNode applied, even a `SetParamResult` or
208+
/// other unrelated reply — committing state for a node the daemon
209+
/// may have rejected (#1682). This variant lets the coordinator
210+
/// pattern-match a specific reply and forward daemon errors to the
211+
/// CLI instead of corrupting the dataflow state. Rescue of #1757.
212+
AddNodeResult(Result<(), String>),
205213
RestartNodeResult(Result<(), String>),
206214
StopNodeResult(Result<(), String>),
207215
SetParamResult(Result<(), String>),

0 commit comments

Comments
 (0)