Skip to content

Commit ddfea20

Browse files
crprashantpinodeca
authored andcommitted
Gate legacy break-sentinel fallback to pre-#148 envelopes
Address PR review (pinodeca): make SubtreeEnvelope.control an Option<SubtreeControl> so a new-binary normal subtree result is never run through the legacy {"__break__"} sentinel check. Only an absent control field (None, written by <= v0.2.2 binaries) takes the legacy fallback path; Some(Normal) passes through untouched. This closes the payload-impersonates-control-flow collision for JOIN/RACE branches whose genuine SQL result happens to be sentinel-shaped. - Make control Option<SubtreeControl>; drop Default/#[default] from the enum. Wire format is unchanged: Some(Normal) still serializes to "Normal", an absent field still deserializes (now to None), so replay is byte-identical. - Collapse the three duplicated node-status-recording arms into one (status, result) match + a single update_node_status schedule, keeping the recorded orchestration history identical. - Add regression test envelope_new_format_normal_with_sentinel_shaped_result_is_not_reraised.
1 parent a242a60 commit ddfea20

1 file changed

Lines changed: 62 additions & 58 deletions

File tree

src/orchestrations/execute_function_graph.rs

Lines changed: 62 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,14 @@ impl From<&str> for NodeError {
6868
type NodeResult = Result<String, NodeError>;
6969

7070
/// Distinguishes a normal subtree result from one that unwound via `df.break()`.
71-
/// `#[serde(default)]` on the envelope's field keeps envelopes recorded by an older binary
72-
/// (which had no control field) deserializable as `Normal` across an in-flight upgrade.
73-
#[derive(Default, serde::Serialize, serde::Deserialize)]
71+
///
72+
/// Stored as `Option<SubtreeControl>` in the envelope (see `SubtreeEnvelope::control`): a
73+
/// missing field deserializes to `None`, which unambiguously marks an envelope recorded by a
74+
/// pre-#148 binary (`<= v0.2.2`, no control field). A new binary always writes an explicit
75+
/// `Some(Normal)` / `Some(Break)`, so the legacy break-sentinel fallback can be gated to
76+
/// `None` only — keeping a user payload from impersonating control flow on a fresh envelope.
77+
#[derive(serde::Serialize, serde::Deserialize)]
7478
enum SubtreeControl {
75-
#[default]
7679
Normal,
7780
Break,
7881
}
@@ -83,8 +86,11 @@ enum SubtreeControl {
8386
/// parent can re-raise it as `NodeError::Break` rather than smuggling a sentinel in `result`.
8487
#[derive(serde::Serialize, serde::Deserialize)]
8588
struct SubtreeEnvelope {
89+
/// `None` only when deserialized from a pre-#148 envelope that had no `control` field; a
90+
/// new binary always serializes `Some(..)`. `parse_subtree_envelope` relies on this to run
91+
/// the legacy break-sentinel fallback exclusively on old envelopes.
8692
#[serde(default)]
87-
control: SubtreeControl,
93+
control: Option<SubtreeControl>,
8894
result: String,
8995
results: HashMap<String, String>,
9096
}
@@ -271,7 +277,7 @@ pub async fn execute_subtree(
271277
Ok(result) => {
272278
ctx.trace_info(format!("ExecuteSubtree: node {node_id} completed"));
273279
SubtreeEnvelope {
274-
control: SubtreeControl::Normal,
280+
control: Some(SubtreeControl::Normal),
275281
result,
276282
results,
277283
}
@@ -281,7 +287,7 @@ pub async fn execute_subtree(
281287
"ExecuteSubtree: node {node_id} broke (propagating)"
282288
));
283289
SubtreeEnvelope {
284-
control: SubtreeControl::Break,
290+
control: Some(SubtreeControl::Break),
285291
result: value,
286292
results,
287293
}
@@ -328,48 +334,24 @@ async fn execute_function_node_with_vars(
328334
// Update node with final status and result. A `Break` is control flow rather than a
329335
// failure: record the node as completed (carrying the break value) so observability is
330336
// unchanged from when break travelled as a normal `Ok` sentinel. Only `Failure` marks
331-
// the node failed.
332-
match &execute_result {
333-
Ok(result) => {
334-
let completed_input = serde_json::json!({
335-
"node_id": node_id,
336-
"status": "completed",
337-
"result": result
338-
});
339-
let _ = ctx
340-
.schedule_activity(
341-
activities::update_node_status::NAME,
342-
completed_input.to_string(),
343-
)
344-
.await;
345-
}
346-
Err(NodeError::Break(value)) => {
347-
let completed_input = serde_json::json!({
348-
"node_id": node_id,
349-
"status": "completed",
350-
"result": value
351-
});
352-
let _ = ctx
353-
.schedule_activity(
354-
activities::update_node_status::NAME,
355-
completed_input.to_string(),
356-
)
357-
.await;
358-
}
359-
Err(NodeError::Failure(err)) => {
360-
let failed_input = serde_json::json!({
361-
"node_id": node_id,
362-
"status": "failed",
363-
"result": err
364-
});
365-
let _ = ctx
366-
.schedule_activity(
367-
activities::update_node_status::NAME,
368-
failed_input.to_string(),
369-
)
370-
.await;
371-
}
372-
}
337+
// the node failed. All three arms schedule exactly one `update_node_status`, so collapse
338+
// them to a single (status, result) pair to keep the recorded history identical.
339+
let (status, status_result) = match &execute_result {
340+
Ok(result) => ("completed", result.as_str()),
341+
Err(NodeError::Break(value)) => ("completed", value.as_str()),
342+
Err(NodeError::Failure(err)) => ("failed", err.as_str()),
343+
};
344+
let status_input = serde_json::json!({
345+
"node_id": node_id,
346+
"status": status,
347+
"result": status_result,
348+
});
349+
let _ = ctx
350+
.schedule_activity(
351+
activities::update_node_status::NAME,
352+
status_input.to_string(),
353+
)
354+
.await;
373355

374356
execute_result
375357
}
@@ -778,7 +760,8 @@ const LEGACY_BREAK_SENTINEL: &str = "__break__";
778760
/// Returns `Some(value)` if `raw` is a legacy `{"__break__": true, "value": ...}` object,
779761
/// where `value` is the break value stringified exactly as the old `extract_break_value`
780762
/// produced it (the JSON value's `to_string()`, or `"null"` when absent). Returns `None` for
781-
/// any normal result. New breaks never reach this path because they carry `control = Break`.
763+
/// any normal result. Only envelopes with an absent `control` field (pre-#148 binaries) reach
764+
/// this path; anything written by the new binary carries an explicit `control` and skips it.
782765
fn parse_legacy_break_sentinel(raw: &str) -> Option<String> {
783766
let value = serde_json::from_str::<serde_json::Value>(raw).ok()?;
784767
if value.get(LEGACY_BREAK_SENTINEL).and_then(|b| b.as_bool()) != Some(true) {
@@ -806,14 +789,20 @@ fn parse_subtree_envelope(
806789
serde_json::from_str(raw).map_err(|e| format!("{context} envelope parse error: {e}"))?;
807790
parent_results.extend(envelope.results);
808791
match envelope.control {
809-
SubtreeControl::Break => Err(NodeError::Break(envelope.result)),
810-
// `Normal` also covers envelopes recorded by a pre-#148 binary (<= v0.2.2), which had
811-
// no `control` field (so it defaults to `Normal`) and instead smuggled a break as a
812-
// `{"__break__": true, ...}` sentinel inside `result`. Re-raise such a legacy sentinel
813-
// as a typed `Break` so a JOIN/RACE-in-loop break still unwinds correctly when an
814-
// orchestration started under the old binary resumes under this one, instead of being
815-
// silently swallowed and treated as a normal branch result.
816-
SubtreeControl::Normal => match parse_legacy_break_sentinel(&envelope.result) {
792+
Some(SubtreeControl::Break) => Err(NodeError::Break(envelope.result)),
793+
// A new binary always writes an explicit `control`, so `Some(Normal)` is a genuine
794+
// normal result and must NOT be run through the legacy sentinel check: otherwise a
795+
// branch whose real SQL result happens to be shaped like `{"__break__": true, ...}`
796+
// would be falsely re-raised as a `Break` — exactly the payload-impersonates-control
797+
// bug class #148 set out to remove.
798+
Some(SubtreeControl::Normal) => Ok(envelope.result),
799+
// `None` means the envelope was recorded by a pre-#148 binary (`<= v0.2.2`): it had no
800+
// `control` field and instead smuggled a break as a `{"__break__": true, ...}`
801+
// sentinel inside `result`. Re-raise such a legacy sentinel as a typed `Break` so a
802+
// JOIN/RACE-in-loop break still unwinds when an orchestration started under the old
803+
// binary resumes under this one, instead of being silently swallowed and treated as a
804+
// normal branch result.
805+
None => match parse_legacy_break_sentinel(&envelope.result) {
817806
Some(value) => Err(NodeError::Break(value)),
818807
None => Ok(envelope.result),
819808
},
@@ -1262,6 +1251,21 @@ mod tests {
12621251
);
12631252
}
12641253

1254+
#[test]
1255+
fn envelope_new_format_normal_with_sentinel_shaped_result_is_not_reraised() {
1256+
// Regression guard for the #229 review finding: a new-binary `Normal` envelope whose
1257+
// genuine result happens to be shaped like the legacy break sentinel must pass through
1258+
// untouched. The legacy fallback now runs only when `control` is absent (`None`), so a
1259+
// JOIN/RACE branch result can no longer impersonate control flow under the new binary.
1260+
let payload = legacy_sentinel(serde_json::json!("not-a-break"));
1261+
let raw = envelope_json(Some("Normal"), &payload, serde_json::json!({}));
1262+
let mut parent = HashMap::new();
1263+
assert_eq!(
1264+
expect_ok(parse_subtree_envelope(&raw, "JOIN", &mut parent)),
1265+
payload
1266+
);
1267+
}
1268+
12651269
// --- In-flight upgrade path (pre-#148 envelopes, no `control` field) ---
12661270

12671271
#[test]

0 commit comments

Comments
 (0)