Skip to content

Commit 754f1f2

Browse files
pinodecacopilot-swe-agent
andcommitted
Re-implement df.loop via sub-orchestration to fix non-root continue_as_new
df.loop called continue_as_new inline in the main orchestration, so every new generation restarted from graph.root_node_id, re-executing prefix nodes on every iteration. Each df.loop() node now spawns a dedicated child sub-orchestration (execute_loop) that owns continue_as_new; the parent awaits it and runs any suffix nodes exactly once. Relies on duroxide PR #31 (parent link preserved across continue_as_new), pulled in as a git dependency until merged and released. Co-authored-by: copilot-swe-agent <copilot@github.com> Co-authored-by: pinodeca <32303022+pinodeca@users.noreply.github.com>
1 parent ddfea20 commit 754f1f2

5 files changed

Lines changed: 407 additions & 62 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,14 @@ serde_json = { version = "1.0", features = ["preserve_order"] }
3434
uuid = { version = "1.0", features = ["v4", "serde"] }
3535

3636
# duroxide integration
37-
duroxide = "=0.1.29"
37+
# Using git dependency on pinodeca/continue-parent-link branch which preserves the
38+
# parent link when a sub-orchestration calls continue_as_new, unblocking the
39+
# sub-orchestration approach for df.loop.
40+
# Compatibility: duroxide-pg 0.1.34 has been verified to compile and run correctly
41+
# against this branch (same public API as 0.1.29 — PR #31 is a runtime-only change).
42+
# Once the branch is merged and a new duroxide release is published, revert both
43+
# entries back to crates.io version pins as a compatible pair.
44+
duroxide = { git = "https://github.com/microsoft/duroxide.git", branch = "pinodeca/continue-parent-link" }
3845
duroxide-pg = "=0.1.34"
3946
tokio = { version = "1", features = ["rt-multi-thread", "sync", "time"] }
4047

@@ -55,6 +62,11 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
5562
[dev-dependencies]
5663
pgrx-tests = "=0.16.1"
5764

65+
# Override the crates.io duroxide with the git branch for both the direct dependency
66+
# and duroxide-pg's transitive dependency on duroxide.
67+
[patch.crates-io]
68+
duroxide = { git = "https://github.com/microsoft/duroxide.git", branch = "pinodeca/continue-parent-link" }
69+
5870
[profile.dev]
5971
panic = "unwind"
6072

src/orchestrations/execute_function_graph.rs

Lines changed: 180 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -526,81 +526,156 @@ async fn execute_wait_schedule_node(
526526
/// deficit so an empty-bodied loop can't busy-spin via continue_as_new.
527527
const LOOP_MIN_ITER_DURATION: Duration = Duration::from_secs(1);
528528

529-
async fn execute_loop_node(
530-
ctx: &OrchestrationContext,
531-
graph: &FunctionGraph,
532-
node: &FunctionNode,
533-
node_id: &str,
534-
results: &mut HashMap<String, String>,
535-
exec_ctx: &ExecutionContext,
536-
) -> NodeResult {
529+
/// Orchestration name for the loop sub-orchestration.
530+
///
531+
/// Each `df.loop()` node spawns a child orchestration under this name. The
532+
/// child handles all iterations via `continue_as_new`; when the loop exits it
533+
/// returns a `SubtreeEnvelope` to the parent. The parent link is preserved
534+
/// across `continue_as_new` generations by duroxide (see duroxide PR #31), so
535+
/// the parent orchestration is notified when the loop finally completes.
536+
pub const LOOP_NAME: &str = "pg_durable::orchestration::execute-loop";
537+
538+
/// Build the `SubtreeEnvelope` a loop returns to its parent on exit.
539+
///
540+
/// A loop always exits with a *normal* result: a `df.break()` inside the body is the loop's
541+
/// own terminator (caught here as `NodeError::Break`), not a break that should unwind past
542+
/// the loop, so the envelope is always tagged `Normal`. `execute_loop_node` merges `results`
543+
/// back into the parent map via `parse_subtree_envelope`.
544+
fn loop_exit_envelope(
545+
result: String,
546+
results: HashMap<String, String>,
547+
) -> Result<String, String> {
548+
let envelope = SubtreeEnvelope {
549+
control: Some(SubtreeControl::Normal),
550+
result,
551+
results,
552+
};
553+
serde_json::to_string(&envelope).map_err(|e| format!("Failed to serialize loop envelope: {e}"))
554+
}
555+
556+
/// Sub-orchestration that runs a single loop iteration and either returns or
557+
/// calls `continue_as_new` for the next iteration.
558+
///
559+
/// Input JSON:
560+
/// ```json
561+
/// { "instance_id": "...", "loop_node_id": "...",
562+
/// "results": "<results_json>", "vars": "<vars_json>", "label": "..." }
563+
/// ```
564+
///
565+
/// `load_function_graph` is called at the start of **every** generation
566+
/// (including after `continue_as_new`) so that cross-iteration security
567+
/// tampering is caught and the instance is failed — the same guarantee the
568+
/// main `execute()` orchestration provides at its generation boundary.
569+
///
570+
/// On loop exit the function returns a `SubtreeEnvelope` containing the final
571+
/// result and any named results accumulated during the loop.
572+
pub async fn execute_loop(ctx: OrchestrationContext, input_json: String) -> Result<String, String> {
573+
let input: serde_json::Value = serde_json::from_str(&input_json)
574+
.map_err(|e| format!("Failed to parse ExecuteLoop input: {e}"))?;
575+
576+
let instance_id = input["instance_id"]
577+
.as_str()
578+
.ok_or("Missing instance_id in ExecuteLoop input")?
579+
.to_string();
580+
let loop_node_id = input["loop_node_id"]
581+
.as_str()
582+
.ok_or("Missing loop_node_id in ExecuteLoop input")?
583+
.to_string();
584+
let results_json = input["results"]
585+
.as_str()
586+
.ok_or("Missing results in ExecuteLoop input")?;
587+
588+
// re-load the graph from the database on every generation — this re-validates
589+
// submitted_by and catches cross-iteration security tampering.
590+
let graph_json = ctx
591+
.schedule_activity(activities::load_function_graph::NAME, instance_id.clone())
592+
.await?;
593+
let graph: FunctionGraph = serde_json::from_str(&graph_json)
594+
.map_err(|e| format!("Failed to parse graph in ExecuteLoop: {e}"))?;
595+
596+
let mut results: HashMap<String, String> = serde_json::from_str(results_json)
597+
.map_err(|e| format!("Failed to parse results in ExecuteLoop: {e}"))?;
598+
599+
let vars: HashMap<String, String> = if let Some(vars_str) = input["vars"].as_str() {
600+
serde_json::from_str(vars_str)
601+
.map_err(|e| format!("Failed to parse vars in ExecuteLoop: {e}"))?
602+
} else {
603+
HashMap::new()
604+
};
605+
let label: Option<String> = input["label"].as_str().map(|s| s.to_string());
606+
607+
let exec_ctx = ExecutionContext { vars, label };
608+
609+
let node = graph
610+
.nodes
611+
.get(&loop_node_id)
612+
.ok_or_else(|| format!("Loop node not found: {loop_node_id}"))?;
613+
537614
let body_id = node
538615
.left_node
539616
.as_ref()
540-
.ok_or_else(|| format!("LOOP node {node_id} has no body"))?;
617+
.ok_or_else(|| format!("LOOP node {loop_node_id} has no body"))?
618+
.clone();
541619

542-
// Capture the iteration start time so we can rate-limit `continue_as_new`
543-
// below. `utc_now()` is duroxide's deterministic clock (recorded in
544-
// history and replayed verbatim), so this remains replay-safe.
620+
// Capture iteration start time for rate-limiting continue_as_new.
545621
let iter_started = ctx.utc_now().await.ok();
546622

547623
ctx.trace_info("Executing loop iteration");
548624

549-
// The loop is the only place that catches `NodeError::Break`: a break unwinds through
550-
// every compound node in the body via `?` and is converted here into a normal loop exit.
551-
// A `Failure` still propagates out of the loop unchanged.
552-
let body_result = match Box::pin(execute_function_node_with_vars(
553-
ctx, graph, body_id, results, exec_ctx,
554-
))
555-
.await
556-
{
557-
Ok(v) => v,
558-
Err(NodeError::Break(break_value)) => {
559-
ctx.trace_info(format!(
560-
"Loop terminated by break with value: {break_value}"
561-
));
562-
store_named_result(ctx, node, &break_value, results, "LOOP");
563-
return Ok(break_value);
564-
}
565-
Err(e @ NodeError::Failure(_)) => return Err(e),
566-
};
625+
// The loop is where `NodeError::Break` is caught: a break unwinds through the body via
626+
// `?` and is converted here into the loop's normal exit value. A `Failure` propagates
627+
// out of the sub-orchestration unchanged.
628+
let body_result =
629+
match execute_function_node_with_vars(&ctx, &graph, &body_id, &mut results, &exec_ctx).await
630+
{
631+
Ok(v) => v,
632+
Err(NodeError::Break(break_value)) => {
633+
ctx.trace_info(format!(
634+
"Loop terminated by break with value: {break_value}"
635+
));
636+
store_named_result(&ctx, node, &break_value, &mut results, "LOOP");
637+
return loop_exit_envelope(break_value, results);
638+
}
639+
Err(NodeError::Failure(e)) => return Err(e),
640+
};
567641

568-
// Check while-condition if present
642+
// While-condition: if present and false, exit the loop.
569643
if let Some(ref config_str) = node.query {
570644
if let Ok(config) = serde_json::from_str::<serde_json::Value>(config_str) {
571645
if let Some(condition_node_id) = config["condition_node"].as_str() {
572646
ctx.trace_info("Evaluating loop condition");
573-
let condition_result = Box::pin(execute_function_node_with_vars(
574-
ctx,
575-
graph,
647+
let condition_result = match execute_function_node_with_vars(
648+
&ctx,
649+
&graph,
576650
condition_node_id,
577-
results,
578-
exec_ctx,
579-
))
580-
.await?;
651+
&mut results,
652+
&exec_ctx,
653+
)
654+
.await
655+
{
656+
Ok(v) => v,
657+
Err(NodeError::Break(break_value)) => {
658+
store_named_result(&ctx, node, &break_value, &mut results, "LOOP");
659+
return loop_exit_envelope(break_value, results);
660+
}
661+
Err(NodeError::Failure(e)) => return Err(e),
662+
};
581663

582-
// Parse condition result to check truthiness (uses evaluate_condition to extract boolean from SQL result)
583664
let should_continue = evaluate_condition(&condition_result).unwrap_or(false);
584665
ctx.trace_info(format!(
585666
"Loop condition evaluated to: {condition_result} (continue={should_continue})"
586667
));
587668

588669
if !should_continue {
589670
ctx.trace_info("Loop condition false, exiting loop");
590-
store_named_result(ctx, node, &body_result, results, "LOOP");
591-
return Ok(body_result);
671+
store_named_result(&ctx, node, &body_result, &mut results, "LOOP");
672+
return loop_exit_envelope(body_result, results);
592673
}
593674
}
594675
}
595676
}
596677

597-
ctx.trace_info("Continuing as new for next loop iteration");
598-
599-
// Enforce a minimum per-iteration wall-clock duration to prevent
600-
// busy-looping (e.g. `df.loop(df.sleep(0))`). Compute the elapsed time
601-
// from the deterministic clock; if the iteration finished faster than
602-
// LOOP_MIN_ITER_DURATION, schedule a timer for the deficit so the next
603-
// continue_as_new is gated by at least that much real-clock time.
678+
// Enforce a minimum per-iteration wall-clock duration to prevent busy-looping.
604679
if let Some(started) = iter_started {
605680
if let Ok(now) = ctx.utc_now().await {
606681
let elapsed = now.duration_since(started).unwrap_or(Duration::ZERO);
@@ -615,19 +690,65 @@ async fn execute_loop_node(
615690
}
616691
}
617692

618-
// Preserve vars in continue_as_new input
619-
let new_input = FunctionInput {
620-
instance_id: graph.instance_id.clone(),
621-
label: exec_ctx.label.clone(),
622-
vars: exec_ctx.vars.clone(),
623-
};
693+
// Another iteration needed: continue_as_new within this sub-orchestration.
694+
// The parent orchestration keeps its awaiting handle because duroxide preserves
695+
// the parent link across continue_as_new (duroxide PR #31).
696+
ctx.trace_info(format!(
697+
"Loop continuing with continue_as_new at node {loop_node_id}"
698+
));
699+
let new_results_json = serde_json::to_string(&results)
700+
.map_err(|e| format!("Failed to serialize updated results: {e}"))?;
701+
let mut new_input = input.clone();
702+
new_input["results"] = serde_json::Value::String(new_results_json);
703+
let new_input_json = serde_json::to_string(&new_input)
704+
.map_err(|e| format!("Failed to serialize loop input: {e}"))?;
705+
ctx.continue_as_new(new_input_json)
706+
.await
707+
.map(|_| String::new())
708+
.map_err(|e| format!("continue_as_new failed: {e:?}"))
709+
}
710+
711+
async fn execute_loop_node(
712+
ctx: &OrchestrationContext,
713+
graph: &FunctionGraph,
714+
node: &FunctionNode,
715+
node_id: &str,
716+
results: &mut HashMap<String, String>,
717+
exec_ctx: &ExecutionContext,
718+
) -> NodeResult {
719+
// Validate that the loop has a body before spawning the sub-orchestration.
720+
node.left_node
721+
.as_ref()
722+
.ok_or_else(|| format!("LOOP node {node_id} has no body"))?;
723+
724+
let results_json =
725+
serde_json::to_string(results).map_err(|e| format!("Failed to serialize results: {e}"))?;
726+
let vars_json = serde_json::to_string(&exec_ctx.vars)
727+
.map_err(|e| format!("Failed to serialize vars: {e}"))?;
624728

625-
// duroxide 0.1.1: continue_as_new returns an awaitable future - return it directly
626-
return ctx
627-
.continue_as_new(serde_json::to_string(&new_input).unwrap_or(graph.instance_id.clone()))
729+
let loop_input = serde_json::json!({
730+
"instance_id": graph.instance_id,
731+
"loop_node_id": node_id,
732+
"results": results_json,
733+
"vars": vars_json,
734+
"label": exec_ctx.label,
735+
})
736+
.to_string();
737+
738+
ctx.trace_info(format!(
739+
"Spawning loop sub-orchestration for node {node_id}"
740+
));
741+
742+
let raw = ctx
743+
.schedule_sub_orchestration(LOOP_NAME, loop_input)
628744
.await
629-
.map(|_| body_result)
630-
.map_err(|e| NodeError::Failure(format!("continue_as_new failed: {e:?}")));
745+
.map_err(|e| format!("Loop sub-orchestration failed: {e}"))?;
746+
747+
// Merge named results from the loop sub-orchestration back into the parent map and
748+
// return the loop's final result. The loop always returns a `Normal` envelope (a break
749+
// inside the body is the loop's own terminator), so `parse_subtree_envelope` will not
750+
// re-raise a `NodeError::Break` here.
751+
parse_subtree_envelope(&raw, "LOOP", results)
631752
}
632753

633754
async fn execute_break_node(

src/registry.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,5 +55,9 @@ pub fn create_orchestration_registry() -> OrchestrationRegistry {
5555
orchestrations::execute_function_graph::SUBTREE_NAME,
5656
orchestrations::execute_function_graph::execute_subtree,
5757
)
58+
.register(
59+
orchestrations::execute_function_graph::LOOP_NAME,
60+
orchestrations::execute_function_graph::execute_loop,
61+
)
5862
.build()
5963
}

0 commit comments

Comments
 (0)