Skip to content

Commit edbca60

Browse files
pinodecacopilot-swe-agent
andcommitted
Re-implement df.loop via sub-orchestration to fix non-root continue_as_new
df.loop called continue_as_new inline in the main orchestration, so every new generation restarted from graph.root_node_id, re-executing prefix nodes on every iteration. Each df.loop() node now spawns a dedicated child sub-orchestration (execute_loop) that owns continue_as_new; the parent awaits it and runs any suffix nodes exactly once. Relies on duroxide PR #31 (parent link preserved across continue_as_new), pulled in as a git dependency until merged and released. Co-authored-by: copilot-swe-agent <copilot@github.com> Co-authored-by: pinodeca <32303022+pinodeca@users.noreply.github.com>
1 parent 846ebce commit edbca60

5 files changed

Lines changed: 419 additions & 56 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,14 @@ serde_json = { version = "1.0", features = ["preserve_order"] }
3434
uuid = { version = "1.0", features = ["v4", "serde"] }
3535

3636
# duroxide integration
37-
duroxide = "=0.1.29"
37+
# Using git dependency on pinodeca/continue-parent-link branch which preserves the
38+
# parent link when a sub-orchestration calls continue_as_new, unblocking the
39+
# sub-orchestration approach for df.loop.
40+
# Compatibility: duroxide-pg 0.1.34 has been verified to compile and run correctly
41+
# against this branch (same public API as 0.1.29 — PR #31 is a runtime-only change).
42+
# Once the branch is merged and a new duroxide release is published, revert both
43+
# entries back to crates.io version pins as a compatible pair.
44+
duroxide = { git = "https://github.com/microsoft/duroxide.git", branch = "pinodeca/continue-parent-link" }
3845
duroxide-pg = "=0.1.34"
3946
tokio = { version = "1", features = ["rt-multi-thread", "sync", "time"] }
4047

@@ -61,6 +68,11 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
6168
[dev-dependencies]
6269
pgrx-tests = "=0.16.1"
6370

71+
# Override the crates.io duroxide with the git branch for both the direct dependency
72+
# and duroxide-pg's transitive dependency on duroxide.
73+
[patch.crates-io]
74+
duroxide = { git = "https://github.com/microsoft/duroxide.git", branch = "pinodeca/continue-parent-link" }
75+
6476
[profile.dev]
6577
panic = "unwind"
6678

src/orchestrations/execute_function_graph.rs

Lines changed: 192 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -572,59 +572,154 @@ const LOOP_MIN_ITER_DURATION: Duration = Duration::from_secs(1);
572572
/// This prevents runaway infinite loops from consuming resources indefinitely.
573573
/// At the minimum 1-second rate limit, this allows ~27 hours of looping.
574574
const MAX_LOOP_ITERATIONS: u64 = 100_000;
575-
async fn execute_loop_node(
576-
ctx: &OrchestrationContext,
577-
graph: &FunctionGraph,
578-
node: &FunctionNode,
579-
node_id: &str,
580-
results: &mut HashMap<String, String>,
581-
exec_ctx: &ExecutionContext,
582-
) -> NodeResult {
575+
576+
/// Orchestration name for the loop sub-orchestration.
577+
///
578+
/// Each `df.loop()` node spawns a child orchestration under this name. The
579+
/// child handles all iterations via `continue_as_new`; when the loop exits it
580+
/// returns a `SubtreeEnvelope` to the parent. The parent link is preserved
581+
/// across `continue_as_new` generations by duroxide (see duroxide PR #31), so
582+
/// the parent orchestration is notified when the loop finally completes.
583+
pub const LOOP_NAME: &str = "pg_durable::orchestration::execute-loop";
584+
585+
/// Build the `SubtreeEnvelope` a loop returns to its parent on exit.
586+
///
587+
/// A loop always exits with a *normal* result: a `df.break()` inside the body is the loop's
588+
/// own terminator (caught here as `NodeError::Break`), not a break that should unwind past
589+
/// the loop, so the envelope is always tagged `Normal`. `execute_loop_node` merges `results`
590+
/// back into the parent map via `parse_subtree_envelope`.
591+
fn loop_exit_envelope(result: String, results: HashMap<String, String>) -> Result<String, String> {
592+
let envelope = SubtreeEnvelope {
593+
control: Some(SubtreeControl::Normal),
594+
result,
595+
results,
596+
};
597+
serde_json::to_string(&envelope).map_err(|e| format!("Failed to serialize loop envelope: {e}"))
598+
}
599+
600+
/// Sub-orchestration that runs a single loop iteration and either returns or
601+
/// calls `continue_as_new` for the next iteration.
602+
///
603+
/// Input JSON:
604+
/// ```json
605+
/// { "instance_id": "...", "loop_node_id": "...",
606+
/// "results": "<results_json>", "vars": "<vars_json>", "label": "...",
607+
/// "iteration": 0 }
608+
/// ```
609+
///
610+
/// `load_function_graph` is called at the start of **every** generation
611+
/// (including after `continue_as_new`) so that cross-iteration security
612+
/// tampering is caught and the instance is failed — the same guarantee the
613+
/// main `execute()` orchestration provides at its generation boundary.
614+
///
615+
/// On loop exit the function returns a `SubtreeEnvelope` containing the final
616+
/// result and any named results accumulated during the loop.
617+
pub async fn execute_loop(ctx: OrchestrationContext, input_json: String) -> Result<String, String> {
618+
let input: serde_json::Value = serde_json::from_str(&input_json)
619+
.map_err(|e| format!("Failed to parse ExecuteLoop input: {e}"))?;
620+
621+
let instance_id = input["instance_id"]
622+
.as_str()
623+
.ok_or("Missing instance_id in ExecuteLoop input")?
624+
.to_string();
625+
let loop_node_id = input["loop_node_id"]
626+
.as_str()
627+
.ok_or("Missing loop_node_id in ExecuteLoop input")?
628+
.to_string();
629+
let results_json = input["results"]
630+
.as_str()
631+
.ok_or("Missing results in ExecuteLoop input")?;
632+
633+
// Iteration counter threaded across continue_as_new generations (M7).
634+
// Absent on the first generation (spawned by execute_loop_node), so default to 0.
635+
let iteration = input["iteration"].as_u64().unwrap_or(0);
636+
637+
// re-load the graph from the database on every generation — this re-validates
638+
// submitted_by and catches cross-iteration security tampering.
639+
let graph_json = ctx
640+
.schedule_activity(activities::load_function_graph::NAME, instance_id.clone())
641+
.await?;
642+
let graph: FunctionGraph = serde_json::from_str(&graph_json)
643+
.map_err(|e| format!("Failed to parse graph in ExecuteLoop: {e}"))?;
644+
645+
let mut results: HashMap<String, String> = serde_json::from_str(results_json)
646+
.map_err(|e| format!("Failed to parse results in ExecuteLoop: {e}"))?;
647+
648+
let vars: HashMap<String, String> = if let Some(vars_str) = input["vars"].as_str() {
649+
serde_json::from_str(vars_str)
650+
.map_err(|e| format!("Failed to parse vars in ExecuteLoop: {e}"))?
651+
} else {
652+
HashMap::new()
653+
};
654+
let label: Option<String> = input["label"].as_str().map(|s| s.to_string());
655+
656+
let exec_ctx = ExecutionContext {
657+
vars,
658+
label,
659+
loop_iteration: iteration,
660+
};
661+
662+
let node = graph
663+
.nodes
664+
.get(&loop_node_id)
665+
.ok_or_else(|| format!("Loop node not found: {loop_node_id}"))?;
666+
583667
let body_id = node
584668
.left_node
585669
.as_ref()
586-
.ok_or_else(|| format!("LOOP node {node_id} has no body"))?;
670+
.ok_or_else(|| format!("LOOP node {loop_node_id} has no body"))?
671+
.clone();
587672

588-
// Capture the iteration start time so we can rate-limit `continue_as_new`
589-
// below. `utc_now()` is duroxide's deterministic clock (recorded in
590-
// history and replayed verbatim), so this remains replay-safe.
673+
// Capture iteration start time for rate-limiting continue_as_new.
591674
let iter_started = ctx.utc_now().await.ok();
592675

593676
ctx.trace_info("Executing loop iteration");
594677

595-
// The loop is the only place that catches `NodeError::Break`: a break unwinds through
596-
// every compound node in the body via `?` and is converted here into a normal loop exit.
597-
// A `Failure` still propagates out of the loop unchanged.
598-
let body_result = match Box::pin(execute_function_node_with_vars(
599-
ctx, graph, body_id, results, exec_ctx,
600-
))
678+
// The loop is where `NodeError::Break` is caught: a break unwinds through the body via
679+
// `?` and is converted here into the loop's normal exit value. A `Failure` propagates
680+
// out of the sub-orchestration unchanged.
681+
let body_result = match execute_function_node_with_vars(
682+
&ctx,
683+
&graph,
684+
&body_id,
685+
&mut results,
686+
&exec_ctx,
687+
)
601688
.await
602689
{
603690
Ok(v) => v,
604691
Err(NodeError::Break(break_value)) => {
605692
ctx.trace_info(format!(
606693
"Loop terminated by break with value: {break_value}"
607694
));
608-
store_named_result(ctx, node, &break_value, results, "LOOP");
609-
return Ok(break_value);
695+
store_named_result(&ctx, node, &break_value, &mut results, "LOOP");
696+
return loop_exit_envelope(break_value, results);
610697
}
611-
Err(e @ NodeError::Failure(_)) => return Err(e),
698+
Err(NodeError::Failure(e)) => return Err(e),
612699
};
613700

614-
// Check while-condition if present
701+
// While-condition: if present and false, exit the loop.
615702
if let Some(ref config_str) = node.query {
616703
match serde_json::from_str::<serde_json::Value>(config_str) {
617704
Ok(config) => {
618705
if let Some(condition_node_id) = config["condition_node"].as_str() {
619706
ctx.trace_info("Evaluating loop condition");
620-
let condition_result = Box::pin(execute_function_node_with_vars(
621-
ctx,
622-
graph,
707+
let condition_result = match execute_function_node_with_vars(
708+
&ctx,
709+
&graph,
623710
condition_node_id,
624-
results,
625-
exec_ctx,
626-
))
627-
.await?;
711+
&mut results,
712+
&exec_ctx,
713+
)
714+
.await
715+
{
716+
Ok(v) => v,
717+
Err(NodeError::Break(break_value)) => {
718+
store_named_result(&ctx, node, &break_value, &mut results, "LOOP");
719+
return loop_exit_envelope(break_value, results);
720+
}
721+
Err(NodeError::Failure(e)) => return Err(e),
722+
};
628723

629724
// Parse condition result to check truthiness (uses evaluate_condition to extract boolean from SQL result)
630725
let should_continue = evaluate_condition(&condition_result).unwrap_or(false);
@@ -634,17 +729,17 @@ async fn execute_loop_node(
634729

635730
if !should_continue {
636731
ctx.trace_info("Loop condition false, exiting loop");
637-
store_named_result(ctx, node, &body_result, results, "LOOP");
638-
return Ok(body_result);
732+
store_named_result(&ctx, node, &body_result, &mut results, "LOOP");
733+
return loop_exit_envelope(body_result, results);
639734
}
640735
}
641736
}
642737
Err(e) => {
643738
// M8: Malformed condition config should fail the loop rather than
644739
// silently creating an infinite loop without exit condition.
645-
return Err(NodeError::Failure(format!(
646-
"LOOP node {node_id}: failed to parse condition config: {e}"
647-
)));
740+
return Err(format!(
741+
"LOOP node {loop_node_id}: failed to parse condition config: {e}"
742+
));
648743
}
649744
}
650745
}
@@ -654,17 +749,13 @@ async fn execute_loop_node(
654749
// M7: Enforce maximum iteration count to prevent runaway infinite loops
655750
let next_iteration = exec_ctx.loop_iteration + 1;
656751
if next_iteration >= MAX_LOOP_ITERATIONS {
657-
return Err(NodeError::Failure(format!(
752+
return Err(format!(
658753
"Loop exceeded maximum iteration count of {MAX_LOOP_ITERATIONS}. \
659754
Use df.break() to exit the loop or restructure the workflow."
660-
)));
755+
));
661756
}
662757

663-
// Enforce a minimum per-iteration wall-clock duration to prevent
664-
// busy-looping (e.g. `df.loop(df.sleep(0))`). Compute the elapsed time
665-
// from the deterministic clock; if the iteration finished faster than
666-
// LOOP_MIN_ITER_DURATION, schedule a timer for the deficit so the next
667-
// continue_as_new is gated by at least that much real-clock time.
758+
// Enforce a minimum per-iteration wall-clock duration to prevent busy-looping.
668759
if let Some(started) = iter_started {
669760
if let Ok(now) = ctx.utc_now().await {
670761
let elapsed = now.duration_since(started).unwrap_or(Duration::ZERO);
@@ -679,20 +770,68 @@ async fn execute_loop_node(
679770
}
680771
}
681772

682-
// Preserve vars in continue_as_new input
683-
let new_input = FunctionInput {
684-
instance_id: graph.instance_id.clone(),
685-
label: exec_ctx.label.clone(),
686-
vars: exec_ctx.vars.clone(),
687-
loop_iteration: next_iteration,
688-
};
773+
// Another iteration needed: continue_as_new within this sub-orchestration.
774+
// The parent orchestration keeps its awaiting handle because duroxide preserves
775+
// the parent link across continue_as_new (duroxide PR #31).
776+
ctx.trace_info(format!(
777+
"Loop continuing with continue_as_new at node {loop_node_id}"
778+
));
779+
let new_results_json = serde_json::to_string(&results)
780+
.map_err(|e| format!("Failed to serialize updated results: {e}"))?;
781+
let mut new_input = input.clone();
782+
new_input["results"] = serde_json::Value::String(new_results_json);
783+
// Persist the incremented iteration counter for the next generation (M7).
784+
new_input["iteration"] = serde_json::Value::Number(next_iteration.into());
785+
let new_input_json = serde_json::to_string(&new_input)
786+
.map_err(|e| format!("Failed to serialize loop input: {e}"))?;
787+
ctx.continue_as_new(new_input_json)
788+
.await
789+
.map(|_| String::new())
790+
.map_err(|e| format!("continue_as_new failed: {e:?}"))
791+
}
792+
793+
async fn execute_loop_node(
794+
ctx: &OrchestrationContext,
795+
graph: &FunctionGraph,
796+
node: &FunctionNode,
797+
node_id: &str,
798+
results: &mut HashMap<String, String>,
799+
exec_ctx: &ExecutionContext,
800+
) -> NodeResult {
801+
// Validate that the loop has a body before spawning the sub-orchestration.
802+
node.left_node
803+
.as_ref()
804+
.ok_or_else(|| format!("LOOP node {node_id} has no body"))?;
805+
806+
let results_json =
807+
serde_json::to_string(results).map_err(|e| format!("Failed to serialize results: {e}"))?;
808+
let vars_json = serde_json::to_string(&exec_ctx.vars)
809+
.map_err(|e| format!("Failed to serialize vars: {e}"))?;
689810

690-
// duroxide 0.1.1: continue_as_new returns an awaitable future - return it directly
691-
return ctx
692-
.continue_as_new(serde_json::to_string(&new_input).unwrap_or(graph.instance_id.clone()))
811+
let loop_input = serde_json::json!({
812+
"instance_id": graph.instance_id,
813+
"loop_node_id": node_id,
814+
"results": results_json,
815+
"vars": vars_json,
816+
"label": exec_ctx.label,
817+
"iteration": 0,
818+
})
819+
.to_string();
820+
821+
ctx.trace_info(format!(
822+
"Spawning loop sub-orchestration for node {node_id}"
823+
));
824+
825+
let raw = ctx
826+
.schedule_sub_orchestration(LOOP_NAME, loop_input)
693827
.await
694-
.map(|_| body_result)
695-
.map_err(|e| NodeError::Failure(format!("continue_as_new failed: {e:?}")));
828+
.map_err(|e| format!("Loop sub-orchestration failed: {e}"))?;
829+
830+
// Merge named results from the loop sub-orchestration back into the parent map and
831+
// return the loop's final result. The loop always returns a `Normal` envelope (a break
832+
// inside the body is the loop's own terminator), so `parse_subtree_envelope` will not
833+
// re-raise a `NodeError::Break` here.
834+
parse_subtree_envelope(&raw, "LOOP", results)
696835
}
697836

698837
async fn execute_break_node(

src/registry.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,5 +55,9 @@ pub fn create_orchestration_registry() -> OrchestrationRegistry {
5555
orchestrations::execute_function_graph::SUBTREE_NAME,
5656
orchestrations::execute_function_graph::execute_subtree,
5757
)
58+
.register(
59+
orchestrations::execute_function_graph::LOOP_NAME,
60+
orchestrations::execute_function_graph::execute_loop,
61+
)
5862
.build()
5963
}

0 commit comments

Comments
 (0)