Skip to content

Commit 6819539

Browse files
authored
fix: correctly decrement remaining_deps for multiple skipped parents (#606)
# Fix remaining_deps decrement for multiple skipped parents This PR fixes a bug where child steps with multiple skipped parent steps would not have their `remaining_deps` decremented correctly. Previously, when multiple parent steps were skipped simultaneously, the child's `remaining_deps` would only be decremented by 1 regardless of how many parents were skipped. The fix: - Added a new CTE `skipped_parent_counts` that counts how many skipped parents each child step has - Modified the `dependent_updates` query to decrement `remaining_deps` by the actual count of skipped parents - Added two test cases to verify the fix: 1. `multi_parent_skip_decrements_remaining_deps.test.sql` - Tests that when two parent steps are skipped, the child's `remaining_deps` is decremented by 2 2. `multi_parent_skip_then_complete.test.sql` - Tests a more complex scenario with three parents where two are skipped and one completes normally This ensures that workflows with conditional branches properly advance when multiple parent steps are skipped in the same cascade resolution.
1 parent 0969378 commit 6819539

File tree

6 files changed

+235
-15
lines changed

6 files changed

+235
-15
lines changed

pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -197,21 +197,30 @@ BEGIN
197197
false
198198
) AS _broadcast_result
199199
),
200-
-- NEW: Update dependent steps (decrement remaining_deps, set initial_tasks=0 for maps)
200+
-- NEW: Update dependent steps (decrement remaining_deps by count of skipped parents, set initial_tasks=0 for maps)
201+
skipped_parent_counts AS (
202+
-- Count how many skipped parents each child has
203+
SELECT
204+
dep.step_slug AS child_step_slug,
205+
dep.flow_slug AS child_flow_slug,
206+
COUNT(*) AS skipped_parent_count
207+
FROM skipped_steps parent
208+
JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug
209+
GROUP BY dep.step_slug, dep.flow_slug
210+
),
201211
dependent_updates AS (
202212
UPDATE pgflow.step_states child_state
203-
SET remaining_deps = child_state.remaining_deps - 1,
213+
SET remaining_deps = child_state.remaining_deps - spc.skipped_parent_count,
204214
-- If child is a map step and this skipped step is its only dependency,
205215
-- set initial_tasks = 0 (skipped dep = empty array)
206216
initial_tasks = CASE
207217
WHEN child_step.step_type = 'map' AND child_step.deps_count = 1 THEN 0
208218
ELSE child_state.initial_tasks
209219
END
210-
FROM skipped_steps parent
211-
JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug
212-
JOIN pgflow.steps child_step ON child_step.flow_slug = dep.flow_slug AND child_step.step_slug = dep.step_slug
220+
FROM skipped_parent_counts spc
221+
JOIN pgflow.steps child_step ON child_step.flow_slug = spc.child_flow_slug AND child_step.step_slug = spc.child_step_slug
213222
WHERE child_state.run_id = cascade_resolve_conditions.run_id
214-
AND child_state.step_slug = dep.step_slug
223+
AND child_state.step_slug = spc.child_step_slug
215224
),
216225
run_update AS (
217226
UPDATE pgflow.runs r

pkgs/core/supabase/migrations/20260203101513_pgflow_step_conditions.sql renamed to pkgs/core/supabase/migrations/20260206115746_pgflow_step_conditions.sql

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -614,21 +614,30 @@ BEGIN
614614
false
615615
) AS _broadcast_result
616616
),
617-
-- NEW: Update dependent steps (decrement remaining_deps, set initial_tasks=0 for maps)
617+
-- NEW: Update dependent steps (decrement remaining_deps by count of skipped parents, set initial_tasks=0 for maps)
618+
skipped_parent_counts AS (
619+
-- Count how many skipped parents each child has
620+
SELECT
621+
dep.step_slug AS child_step_slug,
622+
dep.flow_slug AS child_flow_slug,
623+
COUNT(*) AS skipped_parent_count
624+
FROM skipped_steps parent
625+
JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug
626+
GROUP BY dep.step_slug, dep.flow_slug
627+
),
618628
dependent_updates AS (
619629
UPDATE pgflow.step_states child_state
620-
SET remaining_deps = child_state.remaining_deps - 1,
630+
SET remaining_deps = child_state.remaining_deps - spc.skipped_parent_count,
621631
-- If child is a map step and this skipped step is its only dependency,
622632
-- set initial_tasks = 0 (skipped dep = empty array)
623633
initial_tasks = CASE
624634
WHEN child_step.step_type = 'map' AND child_step.deps_count = 1 THEN 0
625635
ELSE child_state.initial_tasks
626636
END
627-
FROM skipped_steps parent
628-
JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug
629-
JOIN pgflow.steps child_step ON child_step.flow_slug = dep.flow_slug AND child_step.step_slug = dep.step_slug
637+
FROM skipped_parent_counts spc
638+
JOIN pgflow.steps child_step ON child_step.flow_slug = spc.child_flow_slug AND child_step.step_slug = spc.child_step_slug
630639
WHERE child_state.run_id = cascade_resolve_conditions.run_id
631-
AND child_state.step_slug = dep.step_slug
640+
AND child_state.step_slug = spc.child_step_slug
632641
),
633642
run_update AS (
634643
UPDATE pgflow.runs r

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:C7dqsqxBJfaYoFyWUWyOwQEqSTbZm2ob13rGC3EvYOs=
1+
h1:ThrUAu9izqXh7CYZpi1VC17rNHGXdQh4yX5fwrTmygU=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -18,4 +18,4 @@ h1:C7dqsqxBJfaYoFyWUWyOwQEqSTbZm2ob13rGC3EvYOs=
1818
20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o=
1919
20260120205547_pgflow_requeue_stalled_tasks.sql h1:4wCBBvjtETCgJf1eXmlH5wCTKDUhiLi0uzsFG1V528E=
2020
20260124113408_pgflow_auth_secret_support.sql h1:i/s1JkBqRElN6FOYFQviJt685W08SuSo30aP25lNlLc=
21-
20260203101513_pgflow_step_conditions.sql h1:pom2NuFU0JsCKvdaeQzdGcVF20ZlDUd94bJmZ98hI5A=
21+
20260206115746_pgflow_step_conditions.sql h1:rIoXVl0SoVFGHdCFpAQnD6DRSHugzQODZa+UjAhA0ow=
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
-- Test: Multiple parents skipped in same iteration should decrement remaining_deps for each
2+
-- This tests the bug where remaining_deps is only decremented once even when multiple parents
3+
-- are skipped simultaneously in cascade_resolve_conditions
4+
--
5+
-- Flow structure:
6+
-- root_a (skip) \
7+
-- -> join (depends on both)
8+
-- root_b (skip) /
9+
--
10+
-- Expected behavior:
11+
-- 1. Both root_a and root_b are skipped due to unmet conditions
12+
-- 2. join.remaining_deps should be decremented by 2 (from 2 to 0)
13+
-- 3. join should become ready and start
14+
--
15+
-- Bug behavior (current):
16+
-- 1. Both root_a and root_b are skipped
17+
-- 2. join.remaining_deps is only decremented by 1 (from 2 to 1)
18+
-- 3. join stays stuck with remaining_deps = 1 forever
19+
20+
begin;
21+
select plan(5);
22+
select pgflow_tests.reset_db();
23+
24+
-- Create flow with two conditional root steps that will both be skipped
25+
select pgflow.create_flow('multi_root_skip');
26+
select pgflow.add_step(
27+
flow_slug => 'multi_root_skip',
28+
step_slug => 'root_a',
29+
required_input_pattern => '{"go": true}'::jsonb,
30+
when_unmet => 'skip'
31+
);
32+
select pgflow.add_step(
33+
flow_slug => 'multi_root_skip',
34+
step_slug => 'root_b',
35+
required_input_pattern => '{"go": true}'::jsonb,
36+
when_unmet => 'skip'
37+
);
38+
select pgflow.add_step(
39+
flow_slug => 'multi_root_skip',
40+
step_slug => 'join',
41+
deps_slugs => ARRAY['root_a', 'root_b']
42+
);
43+
44+
-- Start flow with input that does NOT match either condition
45+
with flow as (
46+
select * from pgflow.start_flow('multi_root_skip', '{"go": false}'::jsonb)
47+
)
48+
select run_id into temporary run_ids from flow;
49+
50+
-- Test 1: root_a should be skipped
51+
select is(
52+
(select status from pgflow.step_states
53+
where run_id = (select run_id from run_ids) and step_slug = 'root_a'),
54+
'skipped',
55+
'root_a should be skipped (condition unmet)'
56+
);
57+
58+
-- Test 2: root_b should be skipped
59+
select is(
60+
(select status from pgflow.step_states
61+
where run_id = (select run_id from run_ids) and step_slug = 'root_b'),
62+
'skipped',
63+
'root_b should be skipped (condition unmet)'
64+
);
65+
66+
-- Test 3: join.remaining_deps should be 0 (both parents skipped)
67+
select is(
68+
(select remaining_deps from pgflow.step_states
69+
where run_id = (select run_id from run_ids) and step_slug = 'join'),
70+
0,
71+
'join.remaining_deps should be 0 when both parents are skipped'
72+
);
73+
74+
-- Test 4: join should be started (ready to run)
75+
select is(
76+
(select status from pgflow.step_states
77+
where run_id = (select run_id from run_ids) and step_slug = 'join'),
78+
'started',
79+
'join should start after both parents are skipped'
80+
);
81+
82+
-- Test 5: join should have a task created
83+
select is(
84+
(select count(*)::int from pgflow.step_tasks
85+
where run_id = (select run_id from run_ids) and step_slug = 'join'),
86+
1,
87+
'join should have one task created'
88+
);
89+
90+
drop table if exists run_ids;
91+
select finish();
92+
rollback;
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
-- Test: Multiple parents skipped then one completes should properly decrement remaining_deps
2+
-- This tests the bug where remaining_deps under-decrementing causes the join step to never start
3+
--
4+
-- Flow structure:
5+
-- branch_a (conditional, will run) \
6+
-- branch_b (skip) -> join (depends on all three)
7+
-- branch_c (skip) /
8+
--
9+
-- Expected behavior:
10+
-- 1. branch_a starts (condition met), branch_b and branch_c are skipped
11+
-- 2. join.remaining_deps should be decremented by 2 (for skipped branches) to 1
12+
-- 3. After branch_a completes, join.remaining_deps goes from 1 to 0
13+
-- 4. join should start
14+
--
15+
-- Bug behavior (current):
16+
-- 1. branch_a runs, branch_b and branch_c are skipped
17+
-- 2. join.remaining_deps is only decremented by 1 (from 3 to 2) instead of 2 (to 1)
18+
-- 3. After branch_a completes, join.remaining_deps goes from 2 to 1
19+
-- 4. join stays stuck with remaining_deps = 1 forever
20+
21+
begin;
22+
select plan(6);
23+
select pgflow_tests.reset_db();
24+
25+
-- Create flow with three conditional branches
26+
select pgflow.create_flow('multi_skip_partial');
27+
select pgflow.add_step(
28+
flow_slug => 'multi_skip_partial',
29+
step_slug => 'branch_a',
30+
required_input_pattern => '{"route": "a"}'::jsonb,
31+
when_unmet => 'skip'
32+
);
33+
select pgflow.add_step(
34+
flow_slug => 'multi_skip_partial',
35+
step_slug => 'branch_b',
36+
required_input_pattern => '{"route": "b"}'::jsonb,
37+
when_unmet => 'skip'
38+
);
39+
select pgflow.add_step(
40+
flow_slug => 'multi_skip_partial',
41+
step_slug => 'branch_c',
42+
required_input_pattern => '{"route": "c"}'::jsonb,
43+
when_unmet => 'skip'
44+
);
45+
select pgflow.add_step(
46+
flow_slug => 'multi_skip_partial',
47+
step_slug => 'join',
48+
deps_slugs => ARRAY['branch_a', 'branch_b', 'branch_c']
49+
);
50+
51+
-- Start flow with input that only matches branch_a's condition
52+
with flow as (
53+
select * from pgflow.start_flow('multi_skip_partial', '{"route": "a"}'::jsonb)
54+
)
55+
select run_id into temporary run_ids from flow;
56+
57+
-- Test 1: branch_a should be started (condition met)
58+
select is(
59+
(select status from pgflow.step_states
60+
where run_id = (select run_id from run_ids) and step_slug = 'branch_a'),
61+
'started',
62+
'branch_a should start (condition met)'
63+
);
64+
65+
-- Test 2: branch_b should be skipped
66+
select is(
67+
(select status from pgflow.step_states
68+
where run_id = (select run_id from run_ids) and step_slug = 'branch_b'),
69+
'skipped',
70+
'branch_b should be skipped (condition unmet)'
71+
);
72+
73+
-- Test 3: branch_c should be skipped
74+
select is(
75+
(select status from pgflow.step_states
76+
where run_id = (select run_id from run_ids) and step_slug = 'branch_c'),
77+
'skipped',
78+
'branch_c should be skipped (condition unmet)'
79+
);
80+
81+
-- Test 4: join.remaining_deps should be 1 (one running, two skipped)
82+
-- After skips: should be 3 - 2 = 1
83+
select is(
84+
(select remaining_deps from pgflow.step_states
85+
where run_id = (select run_id from run_ids) and step_slug = 'join'),
86+
1,
87+
'join.remaining_deps should be 1 after two parents are skipped (one still running)'
88+
);
89+
90+
-- Complete branch_a
91+
select pgflow_tests.poll_and_complete('multi_skip_partial');
92+
93+
-- Test 5: After branch_a completes, join.remaining_deps should be 0
94+
select is(
95+
(select remaining_deps from pgflow.step_states
96+
where run_id = (select run_id from run_ids) and step_slug = 'join'),
97+
0,
98+
'join.remaining_deps should be 0 after branch_a completes'
99+
);
100+
101+
-- Test 6: join should be started
102+
select is(
103+
(select status from pgflow.step_states
104+
where run_id = (select run_id from run_ids) and step_slug = 'join'),
105+
'started',
106+
'join should start after all dependencies are resolved'
107+
);
108+
109+
drop table if exists run_ids;
110+
select finish();
111+
rollback;

x.md

Lines changed: 0 additions & 1 deletion
This file was deleted.

0 commit comments

Comments
 (0)