Skip to content

Commit c71eb84

Browse files
authored
fix: optimize step skipping and improve concurrent task handling (#613)
# Fix race conditions in step skipping and task failure handling This PR addresses several race conditions and improves the robustness of the PGFlow engine: 1. Fixes a race condition in `_cascade_force_skip_steps` by removing the message archiving logic that could cause conflicts with concurrent operations. This prevents errors when multiple steps are skipped simultaneously. 2. Improves the `fail_task` function to properly handle concurrent task failures by acquiring locks on both run and step rows before checking status, preventing race conditions where multiple failure callbacks could read stale status. 3. Adds a test to verify that pre-existing skipped step messages are not incorrectly archived during cascade operations. 4. Enhances SQL formatting in the DSL compiler to properly escape apostrophes in condition patterns, preventing SQL syntax errors when condition patterns contain quotes. 5. Updates slug validation in the DSL to allow underscores at the beginning of slugs and provides clearer error messages for invalid slugs. These changes make the workflow engine more resilient to concurrent operations and edge cases, particularly in high-throughput scenarios where multiple steps might be processed simultaneously.
1 parent 6690fa5 commit c71eb84

17 files changed

+460
-140
lines changed

pkgs/core/schemas/0100_function__cascade_force_skip_steps.sql

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -108,19 +108,10 @@ BEGIN
108108
WHERE r.run_id = _cascade_force_skip_steps.run_id
109109
AND skipped_count.count > 0
110110
)
111-
SELECT COUNT(*) INTO v_total_skipped FROM skipped;
112-
113-
-- Archive queued/started task messages for all steps that were just skipped
114-
-- (query step_states since CTE state is no longer accessible)
115-
PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id))
116-
FROM pgflow.step_tasks st
117-
JOIN pgflow.step_states ss ON ss.run_id = st.run_id AND ss.step_slug = st.step_slug
118-
WHERE st.run_id = _cascade_force_skip_steps.run_id
119-
AND st.status IN ('queued', 'started')
120-
AND st.message_id IS NOT NULL
121-
AND ss.status = 'skipped'
122-
AND ss.skipped_at >= now() - interval '1 second' -- Only recently skipped
123-
HAVING COUNT(st.message_id) > 0;
111+
SELECT skipped_count.count
112+
INTO v_total_skipped
113+
FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count
114+
LEFT JOIN archived_messages ON true;
124115

125116
RETURN v_total_skipped;
126117
END;

pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,11 @@ BEGIN
9393
FROM steps_with_conditions swc
9494
LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug
9595
)
96-
SELECT flow_slug, step_slug, required_input_pattern, forbidden_input_pattern
96+
SELECT
97+
flow_slug,
98+
step_slug,
99+
required_input_pattern,
100+
forbidden_input_pattern
97101
INTO v_first_fail
98102
FROM condition_evaluations
99103
WHERE NOT condition_met AND when_unmet = 'fail'

pkgs/core/schemas/0100_function_fail_task.sql

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -49,19 +49,17 @@ IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id
4949
RETURN;
5050
END IF;
5151

52-
-- Late callback guard: if step is not 'started', don't mutate step/run state
53-
-- Capture previous status BEFORE any CTE updates (for transition-based decrement)
54-
SELECT ss.status INTO v_prev_step_status
55-
FROM pgflow.step_states ss
52+
-- Late callback guard: lock run + step rows and use current step status
53+
-- under lock so concurrent fail_task calls cannot read stale status.
54+
SELECT ss.status, r.flow_slug INTO v_prev_step_status, v_flow_slug
55+
FROM pgflow.runs r
56+
JOIN pgflow.step_states ss ON ss.run_id = r.run_id
5657
WHERE ss.run_id = fail_task.run_id
57-
AND ss.step_slug = fail_task.step_slug;
58+
AND ss.step_slug = fail_task.step_slug
59+
FOR UPDATE OF r, ss;
5860

5961
IF v_prev_step_status IS NOT NULL AND v_prev_step_status != 'started' THEN
6062
-- Archive the task message if present
61-
SELECT r.flow_slug INTO v_flow_slug
62-
FROM pgflow.runs r
63-
WHERE r.run_id = fail_task.run_id;
64-
6563
PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id))
6664
FROM pgflow.step_tasks st
6765
WHERE st.run_id = fail_task.run_id
@@ -77,18 +75,7 @@ IF v_prev_step_status IS NOT NULL AND v_prev_step_status != 'started' THEN
7775
RETURN;
7876
END IF;
7977

80-
WITH run_lock AS (
81-
SELECT * FROM pgflow.runs
82-
WHERE pgflow.runs.run_id = fail_task.run_id
83-
FOR UPDATE
84-
),
85-
step_lock AS (
86-
SELECT * FROM pgflow.step_states
87-
WHERE pgflow.step_states.run_id = fail_task.run_id
88-
AND pgflow.step_states.step_slug = fail_task.step_slug
89-
FOR UPDATE
90-
),
91-
flow_info AS (
78+
WITH flow_info AS (
9279
SELECT r.flow_slug
9380
FROM pgflow.runs r
9481
WHERE r.run_id = fail_task.run_id

pkgs/core/supabase/migrations/20260214181656_pgflow_step_conditions.sql

Lines changed: 17 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -430,19 +430,10 @@ BEGIN
430430
WHERE r.run_id = _cascade_force_skip_steps.run_id
431431
AND skipped_count.count > 0
432432
)
433-
SELECT COUNT(*) INTO v_total_skipped FROM skipped;
434-
435-
-- Archive queued/started task messages for all steps that were just skipped
436-
-- (query step_states since CTE state is no longer accessible)
437-
PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id))
438-
FROM pgflow.step_tasks st
439-
JOIN pgflow.step_states ss ON ss.run_id = st.run_id AND ss.step_slug = st.step_slug
440-
WHERE st.run_id = _cascade_force_skip_steps.run_id
441-
AND st.status IN ('queued', 'started')
442-
AND st.message_id IS NOT NULL
443-
AND ss.status = 'skipped'
444-
AND ss.skipped_at >= now() - interval '1 second' -- Only recently skipped
445-
HAVING COUNT(st.message_id) > 0;
433+
SELECT skipped_count.count
434+
INTO v_total_skipped
435+
FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count
436+
LEFT JOIN archived_messages ON true;
446437

447438
RETURN v_total_skipped;
448439
END;
@@ -532,7 +523,11 @@ BEGIN
532523
FROM steps_with_conditions swc
533524
LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug
534525
)
535-
SELECT flow_slug, step_slug, required_input_pattern, forbidden_input_pattern
526+
SELECT
527+
flow_slug,
528+
step_slug,
529+
required_input_pattern,
530+
forbidden_input_pattern
536531
INTO v_first_fail
537532
FROM condition_evaluations
538533
WHERE NOT condition_met AND when_unmet = 'fail'
@@ -1327,19 +1322,17 @@ IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id
13271322
RETURN;
13281323
END IF;
13291324

1330-
-- Late callback guard: if step is not 'started', don't mutate step/run state
1331-
-- Capture previous status BEFORE any CTE updates (for transition-based decrement)
1332-
SELECT ss.status INTO v_prev_step_status
1333-
FROM pgflow.step_states ss
1325+
-- Late callback guard: lock run + step rows and use current step status
1326+
-- under lock so concurrent fail_task calls cannot read stale status.
1327+
SELECT ss.status, r.flow_slug INTO v_prev_step_status, v_flow_slug
1328+
FROM pgflow.runs r
1329+
JOIN pgflow.step_states ss ON ss.run_id = r.run_id
13341330
WHERE ss.run_id = fail_task.run_id
1335-
AND ss.step_slug = fail_task.step_slug;
1331+
AND ss.step_slug = fail_task.step_slug
1332+
FOR UPDATE OF r, ss;
13361333

13371334
IF v_prev_step_status IS NOT NULL AND v_prev_step_status != 'started' THEN
13381335
-- Archive the task message if present
1339-
SELECT r.flow_slug INTO v_flow_slug
1340-
FROM pgflow.runs r
1341-
WHERE r.run_id = fail_task.run_id;
1342-
13431336
PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id))
13441337
FROM pgflow.step_tasks st
13451338
WHERE st.run_id = fail_task.run_id
@@ -1355,18 +1348,7 @@ IF v_prev_step_status IS NOT NULL AND v_prev_step_status != 'started' THEN
13551348
RETURN;
13561349
END IF;
13571350

1358-
WITH run_lock AS (
1359-
SELECT * FROM pgflow.runs
1360-
WHERE pgflow.runs.run_id = fail_task.run_id
1361-
FOR UPDATE
1362-
),
1363-
step_lock AS (
1364-
SELECT * FROM pgflow.step_states
1365-
WHERE pgflow.step_states.run_id = fail_task.run_id
1366-
AND pgflow.step_states.step_slug = fail_task.step_slug
1367-
FOR UPDATE
1368-
),
1369-
flow_info AS (
1351+
WITH flow_info AS (
13701352
SELECT r.flow_slug
13711353
FROM pgflow.runs r
13721354
WHERE r.run_id = fail_task.run_id

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:ZZJEI67KUViUzd0rVHGMZPpbUXU2MFSXdTIe/yyJqyE=
1+
h1:jDc+2bvTL4ZYqATAMfBXbTYtMlx8RPvDUvRJjrP537w=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -18,4 +18,4 @@ h1:ZZJEI67KUViUzd0rVHGMZPpbUXU2MFSXdTIe/yyJqyE=
1818
20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o=
1919
20260120205547_pgflow_requeue_stalled_tasks.sql h1:4wCBBvjtETCgJf1eXmlH5wCTKDUhiLi0uzsFG1V528E=
2020
20260124113408_pgflow_auth_secret_support.sql h1:i/s1JkBqRElN6FOYFQviJt685W08SuSo30aP25lNlLc=
21-
20260214181656_pgflow_step_conditions.sql h1:nG52qhydTJMeLTd4AoI4buATJNHdEN2C1ZJdKp+i7wE=
21+
20260214181656_pgflow_step_conditions.sql h1:rHQnXCeZ/QGxPlChdTMxumtsTtYHr1ej183Dd+auw34=
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
\set ON_ERROR_STOP on
2+
\set QUIET on
3+
4+
begin;
5+
select plan(4);
6+
7+
select pgflow_tests.reset_db();
8+
9+
select pgflow.create_flow('cascade_skip_preexisting');
10+
select pgflow.add_step('cascade_skip_preexisting', 'target', '{}', step_type => 'map');
11+
select pgflow.add_step('cascade_skip_preexisting', 'already_skipped', '{}', step_type => 'map');
12+
13+
select pgflow.start_flow('cascade_skip_preexisting', '[1, 2]'::jsonb);
14+
15+
select ok(
16+
(
17+
select count(*) > 0
18+
from pgmq.q_cascade_skip_preexisting q
19+
join pgflow.step_tasks st on st.message_id = q.msg_id
20+
where st.flow_slug = 'cascade_skip_preexisting'
21+
and st.step_slug = 'already_skipped'
22+
),
23+
'Setup: already_skipped has queued messages before cascade call'
24+
);
25+
26+
update pgflow.step_states
27+
set status = 'skipped',
28+
skip_reason = 'preexisting_skip',
29+
skipped_at = now(),
30+
remaining_tasks = null
31+
where flow_slug = 'cascade_skip_preexisting'
32+
and step_slug = 'already_skipped';
33+
34+
select pgflow._cascade_force_skip_steps(
35+
(select run_id from pgflow.runs where flow_slug = 'cascade_skip_preexisting'),
36+
'target',
37+
'condition_unmet'
38+
);
39+
40+
select is_empty(
41+
$$
42+
select 1
43+
from pgmq.q_cascade_skip_preexisting q
44+
join pgflow.step_tasks st on st.message_id = q.msg_id
45+
where st.flow_slug = 'cascade_skip_preexisting'
46+
and st.step_slug = 'target'
47+
$$,
48+
'Target step messages should be archived'
49+
);
50+
51+
select isnt_empty(
52+
$$
53+
select 1
54+
from pgmq.q_cascade_skip_preexisting q
55+
join pgflow.step_tasks st on st.message_id = q.msg_id
56+
where st.flow_slug = 'cascade_skip_preexisting'
57+
and st.step_slug = 'already_skipped'
58+
$$,
59+
'Preexisting skipped step messages should remain queued'
60+
);
61+
62+
select is(
63+
(select status from pgflow.step_states where flow_slug = 'cascade_skip_preexisting' and step_slug = 'target'),
64+
'skipped'::text,
65+
'Target step should be marked skipped'
66+
);
67+
68+
select * from finish();
69+
rollback;

pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
begin;
2-
select plan(5);
2+
select plan(6);
33

44
select pgflow_tests.reset_db();
55

@@ -92,6 +92,17 @@ select is(
9292
'previously active messages should be in archive'
9393
);
9494

95+
select is(
96+
(
97+
select error_message
98+
from pgflow.step_states
99+
where run_id = (select run_id from run_ids)
100+
and step_slug = 'checker'
101+
),
102+
'Condition not met',
103+
'checker failure should use stable condition error message'
104+
);
105+
95106
drop table if exists run_ids;
96107
drop table if exists pre_failure_msgs;
97108

pkgs/core/supabase/tests/condition_evaluation/ifnot_root_step_pattern_matches_fail.test.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@ select is(
3030
'Step with matched ifNot pattern and whenUnmet=fail should be failed'
3131
);
3232

33-
-- Test 2: Error message should indicate condition not met
33+
-- Test 2: Error message should remain stable and minimal
3434
select is(
3535
(select error_message from pgflow.step_states
3636
where run_id = (select run_id from run_ids) and step_slug = 'no_admin_step'),
3737
'Condition not met',
38-
'Error message should indicate condition not met'
38+
'Error message should use stable condition error message'
3939
);
4040

4141
-- Test 3: No task should be created for failed step

pkgs/core/supabase/tests/condition_evaluation/root_step_condition_unmet_fail.test.sql

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
-- Verifies that a root step with unmet condition and whenUnmet='fail'
33
-- causes the run to fail immediately
44
begin;
5-
select plan(4);
5+
select plan(5);
66

77
-- Reset database
88
select pgflow_tests.reset_db();
@@ -30,11 +30,12 @@ select is(
3030
'Step with unmet condition and whenUnmet=fail should be failed'
3131
);
3232

33-
-- Test 2: error_message should indicate condition unmet
34-
select ok(
33+
-- Test 2: error_message should remain stable and minimal
34+
select is(
3535
(select error_message from pgflow.step_states
36-
where run_id = (select run_id from run_ids) and step_slug = 'checked_step') ILIKE '%condition%',
37-
'Failed step should have error message about condition'
36+
where run_id = (select run_id from run_ids) and step_slug = 'checked_step'),
37+
'Condition not met',
38+
'Failed step should use stable condition error message'
3839
);
3940

4041
-- Test 3: No task should be created
@@ -52,6 +53,19 @@ select is(
5253
'Run should fail when step condition fails with fail mode'
5354
);
5455

56+
-- Test 5: Run-level error event should use same stable message
57+
select is(
58+
(
59+
select payload->>'error_message'
60+
from pgflow_tests.get_realtime_message(
61+
event_type => 'run:failed',
62+
run_id => (select run_id from run_ids)
63+
)
64+
),
65+
'Condition not met',
66+
'Run failed event should use stable condition error message'
67+
);
68+
5569
-- Clean up
5670
drop table if exists run_ids;
5771

pkgs/core/supabase/tests/condition_evaluation/root_unmet_fail_emits_failure_events.test.sql

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
begin;
2-
select plan(6);
2+
select plan(7);
33

44
select pgflow_tests.reset_db();
55

@@ -82,6 +82,19 @@ select is(
8282
'run:failed payload should include failed status'
8383
);
8484

85+
select is(
86+
(
87+
select payload->>'error_message'
88+
from pgflow_tests.get_realtime_message(
89+
event_type => 'step:failed',
90+
run_id => (select run_id from run_ids),
91+
step_slug => 'guarded'
92+
)
93+
),
94+
'Condition not met',
95+
'step:failed payload should use stable condition error message'
96+
);
97+
8598
drop table if exists run_ids;
8699

87100
select finish();

0 commit comments

Comments
 (0)