Skip to content

Commit 1744d31

Browse files
committed
fix fixed decrements for remaining_deps
1 parent 0969378 commit 1744d31

6 files changed

Lines changed: 235 additions & 15 deletions

File tree

pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -197,21 +197,30 @@ BEGIN
197197
false
198198
) AS _broadcast_result
199199
),
200-
-- NEW: Update dependent steps (decrement remaining_deps, set initial_tasks=0 for maps)
200+
-- NEW: Update dependent steps (decrement remaining_deps by count of skipped parents, set initial_tasks=0 for maps)
201+
skipped_parent_counts AS (
202+
-- Count how many skipped parents each child has
203+
SELECT
204+
dep.step_slug AS child_step_slug,
205+
dep.flow_slug AS child_flow_slug,
206+
COUNT(*) AS skipped_parent_count
207+
FROM skipped_steps parent
208+
JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug
209+
GROUP BY dep.step_slug, dep.flow_slug
210+
),
201211
dependent_updates AS (
202212
UPDATE pgflow.step_states child_state
203-
SET remaining_deps = child_state.remaining_deps - 1,
213+
SET remaining_deps = child_state.remaining_deps - spc.skipped_parent_count,
204214
-- If child is a map step and this skipped step is its only dependency,
205215
-- set initial_tasks = 0 (skipped dep = empty array)
206216
initial_tasks = CASE
207217
WHEN child_step.step_type = 'map' AND child_step.deps_count = 1 THEN 0
208218
ELSE child_state.initial_tasks
209219
END
210-
FROM skipped_steps parent
211-
JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug
212-
JOIN pgflow.steps child_step ON child_step.flow_slug = dep.flow_slug AND child_step.step_slug = dep.step_slug
220+
FROM skipped_parent_counts spc
221+
JOIN pgflow.steps child_step ON child_step.flow_slug = spc.child_flow_slug AND child_step.step_slug = spc.child_step_slug
213222
WHERE child_state.run_id = cascade_resolve_conditions.run_id
214-
AND child_state.step_slug = dep.step_slug
223+
AND child_state.step_slug = spc.child_step_slug
215224
),
216225
run_update AS (
217226
UPDATE pgflow.runs r

pkgs/core/supabase/migrations/20260203101513_pgflow_step_conditions.sql renamed to pkgs/core/supabase/migrations/20260206115746_pgflow_step_conditions.sql

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -614,21 +614,30 @@ BEGIN
614614
false
615615
) AS _broadcast_result
616616
),
617-
-- NEW: Update dependent steps (decrement remaining_deps, set initial_tasks=0 for maps)
617+
-- NEW: Update dependent steps (decrement remaining_deps by count of skipped parents, set initial_tasks=0 for maps)
618+
skipped_parent_counts AS (
619+
-- Count how many skipped parents each child has
620+
SELECT
621+
dep.step_slug AS child_step_slug,
622+
dep.flow_slug AS child_flow_slug,
623+
COUNT(*) AS skipped_parent_count
624+
FROM skipped_steps parent
625+
JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug
626+
GROUP BY dep.step_slug, dep.flow_slug
627+
),
618628
dependent_updates AS (
619629
UPDATE pgflow.step_states child_state
620-
SET remaining_deps = child_state.remaining_deps - 1,
630+
SET remaining_deps = child_state.remaining_deps - spc.skipped_parent_count,
621631
-- If child is a map step and this skipped step is its only dependency,
622632
-- set initial_tasks = 0 (skipped dep = empty array)
623633
initial_tasks = CASE
624634
WHEN child_step.step_type = 'map' AND child_step.deps_count = 1 THEN 0
625635
ELSE child_state.initial_tasks
626636
END
627-
FROM skipped_steps parent
628-
JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug
629-
JOIN pgflow.steps child_step ON child_step.flow_slug = dep.flow_slug AND child_step.step_slug = dep.step_slug
637+
FROM skipped_parent_counts spc
638+
JOIN pgflow.steps child_step ON child_step.flow_slug = spc.child_flow_slug AND child_step.step_slug = spc.child_step_slug
630639
WHERE child_state.run_id = cascade_resolve_conditions.run_id
631-
AND child_state.step_slug = dep.step_slug
640+
AND child_state.step_slug = spc.child_step_slug
632641
),
633642
run_update AS (
634643
UPDATE pgflow.runs r

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:C7dqsqxBJfaYoFyWUWyOwQEqSTbZm2ob13rGC3EvYOs=
1+
h1:ThrUAu9izqXh7CYZpi1VC17rNHGXdQh4yX5fwrTmygU=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -18,4 +18,4 @@ h1:C7dqsqxBJfaYoFyWUWyOwQEqSTbZm2ob13rGC3EvYOs=
1818
20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o=
1919
20260120205547_pgflow_requeue_stalled_tasks.sql h1:4wCBBvjtETCgJf1eXmlH5wCTKDUhiLi0uzsFG1V528E=
2020
20260124113408_pgflow_auth_secret_support.sql h1:i/s1JkBqRElN6FOYFQviJt685W08SuSo30aP25lNlLc=
21-
20260203101513_pgflow_step_conditions.sql h1:pom2NuFU0JsCKvdaeQzdGcVF20ZlDUd94bJmZ98hI5A=
21+
20260206115746_pgflow_step_conditions.sql h1:rIoXVl0SoVFGHdCFpAQnD6DRSHugzQODZa+UjAhA0ow=
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
-- Test: Multiple parents skipped in same iteration should decrement remaining_deps for each
2+
-- This tests the bug where remaining_deps is only decremented once even when multiple parents
3+
-- are skipped simultaneously in cascade_resolve_conditions
4+
--
5+
-- Flow structure:
6+
-- root_a (skip) \
7+
-- -> join (depends on both)
8+
-- root_b (skip) /
9+
--
10+
-- Expected behavior:
11+
-- 1. Both root_a and root_b are skipped due to unmet conditions
12+
-- 2. join.remaining_deps should be decremented by 2 (from 2 to 0)
13+
-- 3. join should become ready and start
14+
--
15+
-- Bug behavior (current):
16+
-- 1. Both root_a and root_b are skipped
17+
-- 2. join.remaining_deps is only decremented by 1 (from 2 to 1)
18+
-- 3. join stays stuck with remaining_deps = 1 forever
19+
20+
begin;
21+
select plan(5);
22+
select pgflow_tests.reset_db();
23+
24+
-- Create flow with two conditional root steps that will both be skipped
25+
select pgflow.create_flow('multi_root_skip');
26+
select pgflow.add_step(
27+
flow_slug => 'multi_root_skip',
28+
step_slug => 'root_a',
29+
required_input_pattern => '{"go": true}'::jsonb,
30+
when_unmet => 'skip'
31+
);
32+
select pgflow.add_step(
33+
flow_slug => 'multi_root_skip',
34+
step_slug => 'root_b',
35+
required_input_pattern => '{"go": true}'::jsonb,
36+
when_unmet => 'skip'
37+
);
38+
select pgflow.add_step(
39+
flow_slug => 'multi_root_skip',
40+
step_slug => 'join',
41+
deps_slugs => ARRAY['root_a', 'root_b']
42+
);
43+
44+
-- Start flow with input that does NOT match either condition
45+
with flow as (
46+
select * from pgflow.start_flow('multi_root_skip', '{"go": false}'::jsonb)
47+
)
48+
select run_id into temporary run_ids from flow;
49+
50+
-- Test 1: root_a should be skipped
51+
select is(
52+
(select status from pgflow.step_states
53+
where run_id = (select run_id from run_ids) and step_slug = 'root_a'),
54+
'skipped',
55+
'root_a should be skipped (condition unmet)'
56+
);
57+
58+
-- Test 2: root_b should be skipped
59+
select is(
60+
(select status from pgflow.step_states
61+
where run_id = (select run_id from run_ids) and step_slug = 'root_b'),
62+
'skipped',
63+
'root_b should be skipped (condition unmet)'
64+
);
65+
66+
-- Test 3: join.remaining_deps should be 0 (both parents skipped)
67+
select is(
68+
(select remaining_deps from pgflow.step_states
69+
where run_id = (select run_id from run_ids) and step_slug = 'join'),
70+
0,
71+
'join.remaining_deps should be 0 when both parents are skipped'
72+
);
73+
74+
-- Test 4: join should be started (ready to run)
75+
select is(
76+
(select status from pgflow.step_states
77+
where run_id = (select run_id from run_ids) and step_slug = 'join'),
78+
'started',
79+
'join should start after both parents are skipped'
80+
);
81+
82+
-- Test 5: join should have a task created
83+
select is(
84+
(select count(*)::int from pgflow.step_tasks
85+
where run_id = (select run_id from run_ids) and step_slug = 'join'),
86+
1,
87+
'join should have one task created'
88+
);
89+
90+
drop table if exists run_ids;
91+
select finish();
92+
rollback;
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
-- Test: Multiple parents skipped then one completes should properly decrement remaining_deps
2+
-- This tests the bug where remaining_deps under-decrementing causes the join step to never start
3+
--
4+
-- Flow structure:
5+
-- branch_a (conditional, will run) \
6+
-- branch_b (skip) -> join (depends on all three)
7+
-- branch_c (skip) /
8+
--
9+
-- Expected behavior:
10+
-- 1. branch_a starts (condition met), branch_b and branch_c are skipped
11+
-- 2. join.remaining_deps should be decremented by 2 (for skipped branches) to 1
12+
-- 3. After branch_a completes, join.remaining_deps goes from 1 to 0
13+
-- 4. join should start
14+
--
15+
-- Bug behavior (current):
16+
-- 1. branch_a runs, branch_b and branch_c are skipped
17+
-- 2. join.remaining_deps is only decremented by 1 (from 3 to 2) instead of 2 (to 1)
18+
-- 3. After branch_a completes, join.remaining_deps goes from 2 to 1
19+
-- 4. join stays stuck with remaining_deps = 1 forever
20+
21+
begin;
22+
select plan(6);
23+
select pgflow_tests.reset_db();
24+
25+
-- Create flow with three conditional branches
26+
select pgflow.create_flow('multi_skip_partial');
27+
select pgflow.add_step(
28+
flow_slug => 'multi_skip_partial',
29+
step_slug => 'branch_a',
30+
required_input_pattern => '{"route": "a"}'::jsonb,
31+
when_unmet => 'skip'
32+
);
33+
select pgflow.add_step(
34+
flow_slug => 'multi_skip_partial',
35+
step_slug => 'branch_b',
36+
required_input_pattern => '{"route": "b"}'::jsonb,
37+
when_unmet => 'skip'
38+
);
39+
select pgflow.add_step(
40+
flow_slug => 'multi_skip_partial',
41+
step_slug => 'branch_c',
42+
required_input_pattern => '{"route": "c"}'::jsonb,
43+
when_unmet => 'skip'
44+
);
45+
select pgflow.add_step(
46+
flow_slug => 'multi_skip_partial',
47+
step_slug => 'join',
48+
deps_slugs => ARRAY['branch_a', 'branch_b', 'branch_c']
49+
);
50+
51+
-- Start flow with input that only matches branch_a's condition
52+
with flow as (
53+
select * from pgflow.start_flow('multi_skip_partial', '{"route": "a"}'::jsonb)
54+
)
55+
select run_id into temporary run_ids from flow;
56+
57+
-- Test 1: branch_a should be started (condition met)
58+
select is(
59+
(select status from pgflow.step_states
60+
where run_id = (select run_id from run_ids) and step_slug = 'branch_a'),
61+
'started',
62+
'branch_a should start (condition met)'
63+
);
64+
65+
-- Test 2: branch_b should be skipped
66+
select is(
67+
(select status from pgflow.step_states
68+
where run_id = (select run_id from run_ids) and step_slug = 'branch_b'),
69+
'skipped',
70+
'branch_b should be skipped (condition unmet)'
71+
);
72+
73+
-- Test 3: branch_c should be skipped
74+
select is(
75+
(select status from pgflow.step_states
76+
where run_id = (select run_id from run_ids) and step_slug = 'branch_c'),
77+
'skipped',
78+
'branch_c should be skipped (condition unmet)'
79+
);
80+
81+
-- Test 4: join.remaining_deps should be 1 (one running, two skipped)
82+
-- After skips: should be 3 - 2 = 1
83+
select is(
84+
(select remaining_deps from pgflow.step_states
85+
where run_id = (select run_id from run_ids) and step_slug = 'join'),
86+
1,
87+
'join.remaining_deps should be 1 after two parents are skipped (one still running)'
88+
);
89+
90+
-- Complete branch_a
91+
select pgflow_tests.poll_and_complete('multi_skip_partial');
92+
93+
-- Test 5: After branch_a completes, join.remaining_deps should be 0
94+
select is(
95+
(select remaining_deps from pgflow.step_states
96+
where run_id = (select run_id from run_ids) and step_slug = 'join'),
97+
0,
98+
'join.remaining_deps should be 0 after branch_a completes'
99+
);
100+
101+
-- Test 6: join should be started
102+
select is(
103+
(select status from pgflow.step_states
104+
where run_id = (select run_id from run_ids) and step_slug = 'join'),
105+
'started',
106+
'join should start after all dependencies are resolved'
107+
);
108+
109+
drop table if exists run_ids;
110+
select finish();
111+
rollback;

x.md

Lines changed: 0 additions & 1 deletion
This file was deleted.

0 commit comments

Comments
 (0)