Skip to content

Commit b26327a

Browse files
committed
Fix sub-orchestration child ID collision across loop generations
Nested loops and loop bodies containing JOIN/RACE branches hung forever because duroxide auto-generated child instance IDs ({parent}::sub::{event_id}) reset their event counter on each continue_as_new generation, so every loop generation re-derived the same child ID and collided with the previous (now terminal) child. Use schedule_sub_orchestration_with_id with a deterministic, generation-qualified child ID ({instance}::e{execution_id}::{tag}::{node_id}) at all loop/join/race spawn sites. Add nested-loop, loop-in-JOIN-branch, and non-root while-loop regression tests, and a USER_GUIDE note that each df.loop runs as its own child orchestration.
1 parent edbca60 commit b26327a

3 files changed

Lines changed: 250 additions & 10 deletions

File tree

USER_GUIDE.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -948,6 +948,12 @@ SELECT df.start(
948948

949949
Use `@>` operator or `df.loop()` to create functions that run forever. Each iteration creates a new execution with fresh state (via continue-as-new).
950950

951+
> **Note:** Each `df.loop()` runs as its own child orchestration. The loop's iterations
952+
> (the continue-as-new generations) are scoped to that child, so any nodes *before* the
953+
> loop in the graph run exactly once and any nodes *after* the loop run once the loop
954+
> exits. When inspecting `df.instances`, a looping function therefore shows a parent
955+
> instance plus a separate child instance for the loop.
956+
951957
```sql
952958
-- Simple heartbeat every 30 seconds (using @> operator)
953959
SELECT df.start(

src/orchestrations/execute_function_graph.rs

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,23 @@ pub async fn execute_loop(ctx: OrchestrationContext, input_json: String) -> Resu
790790
.map_err(|e| format!("continue_as_new failed: {e:?}"))
791791
}
792792

793+
/// Build a deterministic, generation-qualified child instance ID for a sub-orchestration.
794+
///
795+
/// Duroxide's auto-generated child IDs (`{parent}::sub::{event_id}`) reset their event
796+
/// counter across `continue_as_new` generations. When a loop body itself spawns
797+
/// sub-orchestrations (a nested `df.loop`, or a parallel/race branch), each loop
798+
/// generation would otherwise re-derive the *same* child ID, colliding with the previous
799+
/// (now terminal) generation's child and stalling forever. Embedding the current
800+
/// execution (generation) ID plus the spawning node ID makes the child ID unique per
801+
/// generation while staying deterministic across replays of the same generation.
802+
fn child_instance_id(ctx: &OrchestrationContext, tag: &str, node_id: &str) -> String {
803+
format!(
804+
"{}::e{}::{tag}::{node_id}",
805+
ctx.instance_id(),
806+
ctx.execution_id()
807+
)
808+
}
809+
793810
async fn execute_loop_node(
794811
ctx: &OrchestrationContext,
795812
graph: &FunctionGraph,
@@ -822,8 +839,9 @@ async fn execute_loop_node(
822839
"Spawning loop sub-orchestration for node {node_id}"
823840
));
824841

842+
let child_id = child_instance_id(ctx, "loop", node_id);
825843
let raw = ctx
826-
.schedule_sub_orchestration(LOOP_NAME, loop_input)
844+
.schedule_sub_orchestration_with_id(LOOP_NAME, child_id, loop_input)
827845
.await
828846
.map_err(|e| format!("Loop sub-orchestration failed: {e}"))?;
829847

@@ -1057,8 +1075,11 @@ async fn execute_join_node(
10571075
})
10581076
.to_string();
10591077

1060-
// Build list of branch inputs
1061-
let mut branch_inputs = vec![left_input, right_input];
1078+
// Build list of (branch node id, branch input) pairs.
1079+
let mut branch_inputs: Vec<(String, String)> = vec![
1080+
(left_id.clone(), left_input),
1081+
(right_id.clone(), right_input),
1082+
];
10621083

10631084
// Check for extra nodes (join3)
10641085
if let Some(config_str) = &node.query {
@@ -1074,17 +1095,20 @@ async fn execute_join_node(
10741095
"label": exec_ctx.label
10751096
})
10761097
.to_string();
1077-
branch_inputs.push(extra_input);
1098+
branch_inputs.push((extra_id.to_string(), extra_input));
10781099
}
10791100
}
10801101
}
10811102
}
10821103
}
10831104

1084-
// Schedule sub-orchestrations and collect DurableFutures
1105+
// Schedule sub-orchestrations and collect DurableFutures. Use explicit,
1106+
// generation-qualified child IDs so a JOIN inside a loop body does not collide
1107+
// across continue_as_new generations.
10851108
let mut durable_futures = Vec::new();
1086-
for input in branch_inputs {
1087-
let fut = ctx.schedule_sub_orchestration(SUBTREE_NAME, input);
1109+
for (branch_node_id, input) in branch_inputs {
1110+
let child_id = child_instance_id(ctx, "subtree", &branch_node_id);
1111+
let fut = ctx.schedule_sub_orchestration_with_id(SUBTREE_NAME, child_id, input);
10881112
durable_futures.push(fut);
10891113
}
10901114

@@ -1176,9 +1200,18 @@ async fn execute_race_node(
11761200
})
11771201
.to_string();
11781202

1179-
// Schedule sub-orchestrations
1180-
let left_fut = ctx.schedule_sub_orchestration(SUBTREE_NAME, left_input);
1181-
let right_fut = ctx.schedule_sub_orchestration(SUBTREE_NAME, right_input);
1203+
// Schedule sub-orchestrations with explicit, generation-qualified child IDs so a
1204+
// RACE inside a loop body does not collide across continue_as_new generations.
1205+
let left_fut = ctx.schedule_sub_orchestration_with_id(
1206+
SUBTREE_NAME,
1207+
child_instance_id(ctx, "subtree", left_id),
1208+
left_input,
1209+
);
1210+
let right_fut = ctx.schedule_sub_orchestration_with_id(
1211+
SUBTREE_NAME,
1212+
child_instance_id(ctx, "subtree", right_id),
1213+
right_input,
1214+
);
11821215

11831216
// Use ctx.select2() - first to complete wins
11841217
// select2 now returns Either2<Left, Right> instead of (winner_idx, DurableOutput)

tests/e2e/sql/24_nonroot_loop.sql

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,5 +204,206 @@ END $$;
204204
DROP TABLE _t3;
205205
DROP TABLE test_nonroot3_log;
206206

207+
-- === Test 4: Nested loop — a loop body that itself contains a loop ===
208+
--
209+
-- Each df.loop() spawns an execute_loop sub-orchestration, so a nested loop spawns a
210+
-- child execute_loop from *within* another execute_loop generation. This verifies that
211+
-- continue_as_new in the outer loop does not disturb the inner loop and vice versa.
212+
--
213+
-- Outer loop runs 2 iterations (break when outer_marker has 2 rows); each outer iteration
214+
-- runs an inner loop that inserts exactly one row and breaks immediately.
215+
-- Expected: outer_marker = 2 rows, inner_table = 2 rows.
216+
217+
DROP TABLE IF EXISTS test_nested_outer;
218+
DROP TABLE IF EXISTS test_nested_inner;
219+
CREATE TABLE test_nested_outer (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp());
220+
CREATE TABLE test_nested_inner (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp());
221+
222+
CREATE TEMP TABLE _t4 AS
223+
SELECT df.start(
224+
df.loop(
225+
'INSERT INTO test_nested_outer DEFAULT VALUES'
226+
~> df.loop(
227+
'INSERT INTO test_nested_inner DEFAULT VALUES'
228+
~> df.break()
229+
)
230+
~> (
231+
'SELECT COUNT(*) >= 2 FROM test_nested_outer'
232+
?> df.break()
233+
!> df.sleep(1)
234+
)
235+
),
236+
'test-nested-loop'
237+
) AS instance_id;
238+
239+
DO $$
240+
DECLARE
241+
v_id TEXT;
242+
v_status TEXT;
243+
v_outer INT;
244+
v_inner INT;
245+
BEGIN
246+
SELECT instance_id INTO v_id FROM _t4;
247+
RAISE NOTICE 'Test 4 - nested loop: instance %', v_id;
248+
249+
SELECT df.wait_for_completion(v_id, 90) INTO v_status;
250+
251+
IF v_status != 'completed' THEN
252+
RAISE EXCEPTION 'TEST FAILED [nested]: expected completed, got %', v_status;
253+
END IF;
254+
255+
SELECT COUNT(*) INTO v_outer FROM test_nested_outer;
256+
SELECT COUNT(*) INTO v_inner FROM test_nested_inner;
257+
258+
IF v_outer != 2 THEN
259+
RAISE EXCEPTION 'TEST FAILED [nested]: outer ran % time(s) (expected 2)', v_outer;
260+
END IF;
261+
262+
IF v_inner != 2 THEN
263+
RAISE EXCEPTION 'TEST FAILED [nested]: inner ran % time(s) (expected 2)', v_inner;
264+
END IF;
265+
266+
RAISE NOTICE 'PASSED: nested loop — outer ran twice, inner ran once per outer iteration';
267+
END $$;
268+
269+
DROP TABLE _t4;
270+
DROP TABLE test_nested_outer;
271+
DROP TABLE test_nested_inner;
272+
273+
-- === Test 5: Loop inside a JOIN branch ===
274+
--
275+
-- JOIN branches execute as execute_subtree sub-orchestrations, so a loop in a branch
276+
-- spawns an execute_loop child from *within* execute_subtree. This verifies the loop
277+
-- sub-orchestration nests correctly under a parallel branch and that the JOIN still
278+
-- completes once both branches finish.
279+
--
280+
-- Left branch inserts one row; right branch loops until join_loop has 2 rows.
281+
-- Expected: join_left = 1 row, join_loop = 2 rows.
282+
283+
DROP TABLE IF EXISTS test_join_left;
284+
DROP TABLE IF EXISTS test_join_loop;
285+
CREATE TABLE test_join_left (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp());
286+
CREATE TABLE test_join_loop (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp());
287+
288+
CREATE TEMP TABLE _t5 AS
289+
SELECT df.start(
290+
'INSERT INTO test_join_left DEFAULT VALUES'
291+
& df.loop(
292+
'INSERT INTO test_join_loop DEFAULT VALUES'
293+
~> (
294+
'SELECT COUNT(*) >= 2 FROM test_join_loop'
295+
?> df.break()
296+
!> df.sleep(1)
297+
)
298+
),
299+
'test-loop-in-join-branch'
300+
) AS instance_id;
301+
302+
DO $$
303+
DECLARE
304+
v_id TEXT;
305+
v_status TEXT;
306+
v_left INT;
307+
v_loop INT;
308+
BEGIN
309+
SELECT instance_id INTO v_id FROM _t5;
310+
RAISE NOTICE 'Test 5 - loop in JOIN branch: instance %', v_id;
311+
312+
SELECT df.wait_for_completion(v_id, 90) INTO v_status;
313+
314+
IF v_status != 'completed' THEN
315+
RAISE EXCEPTION 'TEST FAILED [loop-in-join]: expected completed, got %', v_status;
316+
END IF;
317+
318+
SELECT COUNT(*) INTO v_left FROM test_join_left;
319+
SELECT COUNT(*) INTO v_loop FROM test_join_loop;
320+
321+
IF v_left != 1 THEN
322+
RAISE EXCEPTION 'TEST FAILED [loop-in-join]: left branch ran % time(s) (expected 1)', v_left;
323+
END IF;
324+
325+
IF v_loop != 2 THEN
326+
RAISE EXCEPTION 'TEST FAILED [loop-in-join]: loop branch body ran % time(s) (expected 2)', v_loop;
327+
END IF;
328+
329+
RAISE NOTICE 'PASSED: loop in JOIN branch — left ran once, loop body ran twice';
330+
END $$;
331+
332+
DROP TABLE _t5;
333+
DROP TABLE test_join_left;
334+
DROP TABLE test_join_loop;
335+
336+
-- === Test 6: Non-root while-loop — prefix once, while-condition exit, suffix once ===
337+
--
338+
-- The earlier tests exit via df.break(); this one exits via a false while-condition
339+
-- (df.loop(body, condition)). The condition node also runs inside the loop
340+
-- sub-orchestration, so this exercises the while-false exit path across generations.
341+
--
342+
-- Graph: INSERT prefix ~> df.loop(body, 'COUNT < 3') ~> INSERT suffix
343+
-- Expected: prefix = 1 row, body = 3 rows (loop stops when count reaches 3), suffix = 1 row.
344+
345+
DROP TABLE IF EXISTS test_while_prefix;
346+
DROP TABLE IF EXISTS test_while_body;
347+
DROP TABLE IF EXISTS test_while_suffix;
348+
CREATE TABLE test_while_prefix (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp());
349+
CREATE TABLE test_while_body (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp());
350+
CREATE TABLE test_while_suffix (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp());
351+
352+
CREATE TEMP TABLE _t6 AS
353+
SELECT df.start(
354+
df.seq(
355+
'INSERT INTO test_while_prefix DEFAULT VALUES',
356+
df.seq(
357+
df.loop(
358+
'INSERT INTO test_while_body DEFAULT VALUES' ~> df.sleep(1),
359+
'SELECT COUNT(*) < 3 FROM test_while_body'
360+
),
361+
'INSERT INTO test_while_suffix DEFAULT VALUES'
362+
)
363+
),
364+
'test-nonroot-while-loop'
365+
) AS instance_id;
366+
367+
DO $$
368+
DECLARE
369+
v_id TEXT;
370+
v_status TEXT;
371+
v_prefix INT;
372+
v_body INT;
373+
v_suffix INT;
374+
BEGIN
375+
SELECT instance_id INTO v_id FROM _t6;
376+
RAISE NOTICE 'Test 6 - non-root while loop: instance %', v_id;
377+
378+
SELECT df.wait_for_completion(v_id, 90) INTO v_status;
379+
380+
IF v_status != 'completed' THEN
381+
RAISE EXCEPTION 'TEST FAILED [nonroot-while]: expected completed, got %', v_status;
382+
END IF;
383+
384+
SELECT COUNT(*) INTO v_prefix FROM test_while_prefix;
385+
SELECT COUNT(*) INTO v_body FROM test_while_body;
386+
SELECT COUNT(*) INTO v_suffix FROM test_while_suffix;
387+
388+
IF v_prefix != 1 THEN
389+
RAISE EXCEPTION 'TEST FAILED [nonroot-while]: prefix ran % time(s) (expected 1)', v_prefix;
390+
END IF;
391+
392+
IF v_body != 3 THEN
393+
RAISE EXCEPTION 'TEST FAILED [nonroot-while]: body ran % time(s) (expected 3)', v_body;
394+
END IF;
395+
396+
IF v_suffix != 1 THEN
397+
RAISE EXCEPTION 'TEST FAILED [nonroot-while]: suffix ran % time(s) (expected 1)', v_suffix;
398+
END IF;
399+
400+
RAISE NOTICE 'PASSED: non-root while loop — prefix once, body 3x via while-condition, suffix once';
401+
END $$;
402+
403+
DROP TABLE _t6;
404+
DROP TABLE test_while_prefix;
405+
DROP TABLE test_while_body;
406+
DROP TABLE test_while_suffix;
407+
207408
RESET SESSION AUTHORIZATION;
208409
SELECT 'TEST PASSED' AS result;

0 commit comments

Comments
 (0)