Skip to content

Commit e300caa

Browse files
committed
fix(coordinator): validate remove node daemon replies
Signed-off-by: GHX5T-SOL <200635707+GHX5T-SOL@users.noreply.github.com>
1 parent 5451d17 commit e300caa

3 files changed

Lines changed: 102 additions & 31 deletions

File tree

binaries/coordinator/src/lib.rs

Lines changed: 86 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1651,35 +1651,39 @@ async fn start_inner(
16511651
match daemon_connections.get_mut(daemon_id) {
16521652
Some(conn) => {
16531653
match conn.send_and_receive(&msg).await {
1654-
Ok(_) => {
1655-
// TODO: validate the daemon reply
1656-
// is specifically a successful
1657-
// `RemoveNodeResult`, parallel to
1658-
// the AddNode fix (#1682, rescue of
1659-
// #1757). Same `Ok(_)` shape can
1660-
// accept a stale or wrong-variant
1661-
// reply and corrupt state. Tracked
1662-
// separately to keep this rescue
1663-
// panel-faithful to #1757's scope.
1664-
// Clean up coordinator state
1665-
// (inverse of AddNode inserts)
1666-
if let Some(dataflow) =
1667-
running_dataflows
1668-
.get_mut(&dataflow_id)
1669-
{
1670-
dataflow
1671-
.node_to_daemon
1672-
.remove(&node_id);
1673-
dataflow
1674-
.descriptor
1675-
.nodes
1676-
.retain(|n| n.id != node_id);
1677-
dataflow.nodes.remove(&node_id);
1654+
Ok(reply_raw) => {
1655+
match ensure_remove_node_applied(
1656+
&reply_raw, &node_id,
1657+
) {
1658+
Ok(()) => {
1659+
// Clean up coordinator state
1660+
// (inverse of AddNode inserts)
1661+
if let Some(dataflow) =
1662+
running_dataflows
1663+
.get_mut(&dataflow_id)
1664+
{
1665+
dataflow
1666+
.node_to_daemon
1667+
.remove(&node_id);
1668+
dataflow
1669+
.descriptor
1670+
.nodes
1671+
.retain(|n| {
1672+
n.id != node_id
1673+
});
1674+
dataflow
1675+
.nodes
1676+
.remove(&node_id);
1677+
}
1678+
Ok(
1679+
ControlRequestReply::NodeRemoved {
1680+
dataflow_id,
1681+
node_id,
1682+
},
1683+
)
1684+
}
1685+
Err(e) => Err(e),
16781686
}
1679-
Ok(ControlRequestReply::NodeRemoved {
1680-
dataflow_id,
1681-
node_id,
1682-
})
16831687
}
16841688
Err(e) => Err(eyre!(
16851689
"daemon dispatch failed: {e}"
@@ -3501,6 +3505,21 @@ fn ensure_delete_param_forward_applied(
35013505
}
35023506
}
35033507

3508+
fn ensure_remove_node_applied(
3509+
reply_raw: &[u8],
3510+
node_id: &dora_core::config::NodeId,
3511+
) -> eyre::Result<()> {
3512+
match serde_json::from_slice(reply_raw)? {
3513+
DaemonCoordinatorReply::RemoveNodeResult(Ok(())) => Ok(()),
3514+
DaemonCoordinatorReply::RemoveNodeResult(Err(err)) => {
3515+
Err(eyre!("daemon failed to remove node `{node_id}`: {err}"))
3516+
}
3517+
other => Err(eyre!(
3518+
"unexpected daemon reply for RemoveNode on node `{node_id}`: {other:?}"
3519+
)),
3520+
}
3521+
}
3522+
35043523
fn schedule_param_replay_for_ready_dataflow(
35053524
dataflow_id: DataflowId,
35063525
dataflow: &RunningDataflow,
@@ -4833,6 +4852,45 @@ mod tests {
48334852
assert!(err.to_string().contains("unexpected daemon reply"));
48344853
}
48354854

4855+
#[test]
4856+
fn remove_node_reply_accepts_daemon_success() {
4857+
let reply = serde_json::to_vec(&DaemonCoordinatorReply::RemoveNodeResult(Ok(()))).unwrap();
4858+
let node_id: dora_core::config::NodeId = "camera".to_string().into();
4859+
4860+
ensure_remove_node_applied(&reply, &node_id)
4861+
.expect("successful RemoveNode reply should pass");
4862+
}
4863+
4864+
#[test]
4865+
fn remove_node_reply_reports_daemon_rejection() {
4866+
let reply = serde_json::to_vec(&DaemonCoordinatorReply::RemoveNodeResult(Err(
4867+
"node `camera` not found in running dataflow".to_string(),
4868+
)))
4869+
.unwrap();
4870+
let node_id: dora_core::config::NodeId = "camera".to_string().into();
4871+
4872+
let err = ensure_remove_node_applied(&reply, &node_id)
4873+
.expect_err("daemon rejection should fail RemoveNode forwarding");
4874+
let msg = err.to_string();
4875+
assert!(
4876+
msg.contains("failed to remove node") && msg.contains("camera"),
4877+
"error must name the operation and node: {msg}"
4878+
);
4879+
}
4880+
4881+
#[test]
4882+
fn remove_node_reply_rejects_wrong_reply_variant() {
4883+
let reply = serde_json::to_vec(&DaemonCoordinatorReply::SetParamResult(Ok(()))).unwrap();
4884+
let node_id: dora_core::config::NodeId = "camera".to_string().into();
4885+
4886+
let err = ensure_remove_node_applied(&reply, &node_id)
4887+
.expect_err("unexpected reply variant should fail RemoveNode forwarding");
4888+
assert!(
4889+
err.to_string().contains("unexpected daemon reply"),
4890+
"error must call out the wrong-reply-type failure mode: {err}"
4891+
);
4892+
}
4893+
48364894
// -------------------------------------------------------------------
48374895
// Spawn timeout watchdog (rescue of #1593)
48384896
// -------------------------------------------------------------------

binaries/daemon/src/lib.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1785,8 +1785,12 @@ impl Daemon {
17851785
grace_duration,
17861786
} => {
17871787
tracing::info!(%dataflow_id, %node_id, "removing node from running dataflow");
1788-
if let Some(dataflow) = self.running.get_mut(&dataflow_id) {
1789-
let _ = dataflow.stop_single_node(&node_id, &self.clock, grace_duration);
1788+
let result: eyre::Result<()> = (|| {
1789+
let dataflow = self
1790+
.running
1791+
.get_mut(&dataflow_id)
1792+
.ok_or_else(|| eyre!("no running dataflow with ID `{dataflow_id}`"))?;
1793+
dataflow.stop_single_node(&node_id, &self.clock, grace_duration)?;
17901794

17911795
// Clean up routing tables: remove all mappings where this
17921796
// node is a source, and close inputs on downstream nodes.
@@ -1818,8 +1822,16 @@ impl Daemon {
18181822
// Remove from stored descriptor (inverse of AddNode
18191823
// push) so descriptor-based lookups stay consistent.
18201824
dataflow.descriptor.nodes.retain(|n| n.id != node_id);
1825+
Ok(())
1826+
})();
1827+
1828+
if let Err(err) = &result {
1829+
tracing::error!(%dataflow_id, %node_id, "RemoveNode failed: {err:?}");
18211830
}
1822-
let _ = reply_tx.send(None);
1831+
let reply = DaemonCoordinatorReply::RemoveNodeResult(
1832+
result.map_err(|err| format!("{err:?}")),
1833+
);
1834+
let _ = reply_tx.send(Some(reply));
18231835
RunStatus::Continue
18241836
}
18251837
DaemonCoordinatorEvent::AddMapping {

libraries/message/src/daemon_to_coordinator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ pub enum DaemonCoordinatorReply {
212212
AddNodeResult(Result<(), String>),
213213
RestartNodeResult(Result<(), String>),
214214
StopNodeResult(Result<(), String>),
215+
RemoveNodeResult(Result<(), String>),
215216
SetParamResult(Result<(), String>),
216217
DeleteParamResult(Result<(), String>),
217218
StartTopicDebugStreamResult(Result<(), String>),

0 commit comments

Comments
 (0)