Skip to content

Commit 6873dbe

Browse files
authored
fix: archive task messages for skipped steps and handle late callbacks (#609)
# Archive Task Messages for Skipped Steps This PR adds message archiving functionality to prevent "zombie" tasks from being processed after a step has been skipped or completed. Key improvements: 1. Added guards in `complete_task` and `fail_task` to handle late callbacks: - Detects when callbacks arrive after a step is no longer in 'started' state - Archives the task message to prevent stuck work - Returns current task state without mutating step or run state 2. Enhanced `_cascade_force_skip_steps` to archive queued/started task messages for skipped steps: - Ensures all task messages are archived when a step is force-skipped - Prevents workers from processing tasks for steps that are already skipped 3. Modified `fail_task` to archive sibling task messages when a step is skipped: - When a task failure causes a step to be skipped, all sibling task messages are archived - Prevents double-decrement of remaining_steps when multiple tasks fail on the same step 4. Improved `start_tasks` to reject tasks for skipped steps: - Added guard conditions to only start tasks for steps in 'started' state - Prevents race conditions where a task might be started after its step was skipped Added comprehensive tests to verify all these behaviors work correctly.
1 parent 7ae95e9 commit 6873dbe

18 files changed

+678
-29
lines changed

pkgs/core/schemas/0100_function__cascade_force_skip_steps.sql

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,16 @@ BEGIN
9090
false
9191
) as _broadcast_result
9292
),
93+
-- ---------- Archive queued/started task messages for skipped steps ----------
94+
archived_messages AS (
95+
SELECT pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) as result
96+
FROM pgflow.step_tasks st
97+
WHERE st.run_id = _cascade_force_skip_steps.run_id
98+
AND st.step_slug IN (SELECT sk.step_slug FROM skipped sk)
99+
AND st.status IN ('queued', 'started')
100+
AND st.message_id IS NOT NULL
101+
HAVING COUNT(st.message_id) > 0
102+
),
93103
-- ---------- Update run counters ----------
94104
run_updates AS (
95105
UPDATE pgflow.runs r
@@ -100,6 +110,18 @@ BEGIN
100110
)
101111
SELECT COUNT(*) INTO v_total_skipped FROM skipped;
102112

113+
-- Archive queued/started task messages for all steps that were just skipped
114+
-- (query step_states since CTE state is no longer accessible)
115+
PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id))
116+
FROM pgflow.step_tasks st
117+
JOIN pgflow.step_states ss ON ss.run_id = st.run_id AND ss.step_slug = st.step_slug
118+
WHERE st.run_id = _cascade_force_skip_steps.run_id
119+
AND st.status IN ('queued', 'started')
120+
AND st.message_id IS NOT NULL
121+
AND ss.status = 'skipped'
122+
AND ss.skipped_at >= now() - interval '1 second' -- Only recently skipped
123+
HAVING COUNT(st.message_id) > 0;
124+
103125
RETURN v_total_skipped;
104126
END;
105127
$$;

pkgs/core/schemas/0100_function_complete_task.sql

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,30 @@ WHERE pgflow.step_states.run_id = complete_task.run_id
4040
AND pgflow.step_states.step_slug = complete_task.step_slug
4141
FOR UPDATE;
4242

43+
-- ==========================================
44+
-- GUARD: Late callback - step not started
45+
-- ==========================================
46+
-- If the step is not in 'started' state, this is a late callback.
47+
-- Do not mutate step_states or runs, archive message, return task row.
48+
IF v_step_record.status != 'started' THEN
49+
-- Archive the task message if present (prevents stuck work)
50+
PERFORM pgmq.archive(
51+
v_run_record.flow_slug,
52+
st.message_id
53+
)
54+
FROM pgflow.step_tasks st
55+
WHERE st.run_id = complete_task.run_id
56+
AND st.step_slug = complete_task.step_slug
57+
AND st.task_index = complete_task.task_index
58+
AND st.message_id IS NOT NULL;
59+
-- Return the current task row without any mutations
60+
RETURN QUERY SELECT * FROM pgflow.step_tasks
61+
WHERE pgflow.step_tasks.run_id = complete_task.run_id
62+
AND pgflow.step_tasks.step_slug = complete_task.step_slug
63+
AND pgflow.step_tasks.task_index = complete_task.task_index;
64+
RETURN;
65+
END IF;
66+
4367
-- Check for type violations AFTER acquiring locks
4468
SELECT child_step.step_slug INTO v_dependent_map_slug
4569
FROM pgflow.deps dependency

pkgs/core/schemas/0100_function_fail_task.sql

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ DECLARE
1414
v_step_failed boolean;
1515
v_step_skipped boolean;
1616
v_when_exhausted text;
17-
v_task_exhausted boolean; -- True if task has exhausted retries
18-
v_flow_slug_for_deps text; -- Used for decrementing remaining_deps on plain skip
17+
v_task_exhausted boolean;
18+
v_flow_slug_for_deps text;
19+
v_prev_step_status text;
20+
v_flow_slug text;
1921
begin
2022

2123
-- If run is already failed, no retries allowed
@@ -47,6 +49,34 @@ IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id
4749
RETURN;
4850
END IF;
4951

52+
-- Late callback guard: if step is not 'started', don't mutate step/run state
53+
-- Capture previous status BEFORE any CTE updates (for transition-based decrement)
54+
SELECT ss.status INTO v_prev_step_status
55+
FROM pgflow.step_states ss
56+
WHERE ss.run_id = fail_task.run_id
57+
AND ss.step_slug = fail_task.step_slug;
58+
59+
IF v_prev_step_status IS NOT NULL AND v_prev_step_status != 'started' THEN
60+
-- Archive the task message if present
61+
SELECT r.flow_slug INTO v_flow_slug
62+
FROM pgflow.runs r
63+
WHERE r.run_id = fail_task.run_id;
64+
65+
PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id))
66+
FROM pgflow.step_tasks st
67+
WHERE st.run_id = fail_task.run_id
68+
AND st.step_slug = fail_task.step_slug
69+
AND st.task_index = fail_task.task_index
70+
AND st.message_id IS NOT NULL
71+
HAVING COUNT(st.message_id) > 0;
72+
73+
RETURN QUERY SELECT * FROM pgflow.step_tasks
74+
WHERE pgflow.step_tasks.run_id = fail_task.run_id
75+
AND pgflow.step_tasks.step_slug = fail_task.step_slug
76+
AND pgflow.step_tasks.task_index = fail_task.task_index;
77+
RETURN;
78+
END IF;
79+
5080
WITH run_lock AS (
5181
SELECT * FROM pgflow.runs
5282
WHERE pgflow.runs.run_id = fail_task.run_id
@@ -152,9 +182,13 @@ run_update AS (
152182
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
153183
ELSE NULL
154184
END,
155-
-- Decrement remaining_steps when step was skipped (not failed, run continues)
185+
-- Decrement remaining_steps only on FIRST transition to skipped
186+
-- (not when step was already skipped and a second task fails)
187+
-- Uses PL/pgSQL variable captured before CTE chain
156188
remaining_steps = CASE
157-
WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1
189+
WHEN (select status from maybe_fail_step) = 'skipped'
190+
AND v_prev_step_status != 'skipped'
191+
THEN pgflow.runs.remaining_steps - 1
158192
ELSE pgflow.runs.remaining_steps
159193
END
160194
WHERE pgflow.runs.run_id = fail_task.run_id
@@ -193,6 +227,17 @@ END IF;
193227

194228
-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade')
195229
IF v_task_exhausted AND v_step_skipped THEN
230+
-- Archive all queued/started sibling task messages for this step
231+
PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
232+
FROM pgflow.step_tasks st
233+
JOIN pgflow.runs r ON st.run_id = r.run_id
234+
WHERE st.run_id = fail_task.run_id
235+
AND st.step_slug = fail_task.step_slug
236+
AND st.status IN ('queued', 'started')
237+
AND st.message_id IS NOT NULL
238+
GROUP BY r.flow_slug
239+
HAVING COUNT(st.message_id) > 0;
240+
196241
-- Send broadcast event for step skipped
197242
PERFORM realtime.send(
198243
jsonb_build_object(

pkgs/core/schemas/0120_function_start_tasks.sql

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,14 @@ as $$
2020
where task.flow_slug = start_tasks.flow_slug
2121
and task.message_id = any(msg_ids)
2222
and task.status = 'queued'
23-
-- MVP: Don't start tasks on failed runs
24-
and r.status != 'failed'
23+
and r.status = 'started'
24+
and exists (
25+
select 1
26+
from pgflow.step_states ss
27+
where ss.run_id = task.run_id
28+
and ss.step_slug = task.step_slug
29+
and ss.status = 'started'
30+
)
2531
),
2632
start_tasks_update as (
2733
update pgflow.step_tasks

pkgs/core/supabase/migrations/20260206115746_pgflow_step_conditions.sql renamed to pkgs/core/supabase/migrations/20260214181656_pgflow_step_conditions.sql

Lines changed: 103 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,16 @@ BEGIN
412412
false
413413
) as _broadcast_result
414414
),
415+
-- ---------- Archive queued/started task messages for skipped steps ----------
416+
archived_messages AS (
417+
SELECT pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) as result
418+
FROM pgflow.step_tasks st
419+
WHERE st.run_id = _cascade_force_skip_steps.run_id
420+
AND st.step_slug IN (SELECT sk.step_slug FROM skipped sk)
421+
AND st.status IN ('queued', 'started')
422+
AND st.message_id IS NOT NULL
423+
HAVING COUNT(st.message_id) > 0
424+
),
415425
-- ---------- Update run counters ----------
416426
run_updates AS (
417427
UPDATE pgflow.runs r
@@ -422,6 +432,18 @@ BEGIN
422432
)
423433
SELECT COUNT(*) INTO v_total_skipped FROM skipped;
424434

435+
-- Archive queued/started task messages for all steps that were just skipped
436+
-- (query step_states since CTE state is no longer accessible)
437+
PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id))
438+
FROM pgflow.step_tasks st
439+
JOIN pgflow.step_states ss ON ss.run_id = st.run_id AND ss.step_slug = st.step_slug
440+
WHERE st.run_id = _cascade_force_skip_steps.run_id
441+
AND st.status IN ('queued', 'started')
442+
AND st.message_id IS NOT NULL
443+
AND ss.status = 'skipped'
444+
AND ss.skipped_at >= now() - interval '1 second' -- Only recently skipped
445+
HAVING COUNT(st.message_id) > 0;
446+
425447
RETURN v_total_skipped;
426448
END;
427449
$$;
@@ -890,6 +912,30 @@ WHERE pgflow.step_states.run_id = complete_task.run_id
890912
AND pgflow.step_states.step_slug = complete_task.step_slug
891913
FOR UPDATE;
892914

915+
-- ==========================================
916+
-- GUARD: Late callback - step not started
917+
-- ==========================================
918+
-- If the step is not in 'started' state, this is a late callback.
919+
-- Do not mutate step_states or runs, archive message, return task row.
920+
IF v_step_record.status != 'started' THEN
921+
-- Archive the task message if present (prevents stuck work)
922+
PERFORM pgmq.archive(
923+
v_run_record.flow_slug,
924+
st.message_id
925+
)
926+
FROM pgflow.step_tasks st
927+
WHERE st.run_id = complete_task.run_id
928+
AND st.step_slug = complete_task.step_slug
929+
AND st.task_index = complete_task.task_index
930+
AND st.message_id IS NOT NULL;
931+
-- Return the current task row without any mutations
932+
RETURN QUERY SELECT * FROM pgflow.step_tasks
933+
WHERE pgflow.step_tasks.run_id = complete_task.run_id
934+
AND pgflow.step_tasks.step_slug = complete_task.step_slug
935+
AND pgflow.step_tasks.task_index = complete_task.task_index;
936+
RETURN;
937+
END IF;
938+
893939
-- Check for type violations AFTER acquiring locks
894940
SELECT child_step.step_slug INTO v_dependent_map_slug
895941
FROM pgflow.deps dependency
@@ -1246,8 +1292,10 @@ DECLARE
12461292
v_step_failed boolean;
12471293
v_step_skipped boolean;
12481294
v_when_exhausted text;
1249-
v_task_exhausted boolean; -- True if task has exhausted retries
1250-
v_flow_slug_for_deps text; -- Used for decrementing remaining_deps on plain skip
1295+
v_task_exhausted boolean;
1296+
v_flow_slug_for_deps text;
1297+
v_prev_step_status text;
1298+
v_flow_slug text;
12511299
begin
12521300

12531301
-- If run is already failed, no retries allowed
@@ -1279,6 +1327,34 @@ IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id
12791327
RETURN;
12801328
END IF;
12811329

1330+
-- Late callback guard: if step is not 'started', don't mutate step/run state
1331+
-- Capture previous status BEFORE any CTE updates (for transition-based decrement)
1332+
SELECT ss.status INTO v_prev_step_status
1333+
FROM pgflow.step_states ss
1334+
WHERE ss.run_id = fail_task.run_id
1335+
AND ss.step_slug = fail_task.step_slug;
1336+
1337+
IF v_prev_step_status IS NOT NULL AND v_prev_step_status != 'started' THEN
1338+
-- Archive the task message if present
1339+
SELECT r.flow_slug INTO v_flow_slug
1340+
FROM pgflow.runs r
1341+
WHERE r.run_id = fail_task.run_id;
1342+
1343+
PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id))
1344+
FROM pgflow.step_tasks st
1345+
WHERE st.run_id = fail_task.run_id
1346+
AND st.step_slug = fail_task.step_slug
1347+
AND st.task_index = fail_task.task_index
1348+
AND st.message_id IS NOT NULL
1349+
HAVING COUNT(st.message_id) > 0;
1350+
1351+
RETURN QUERY SELECT * FROM pgflow.step_tasks
1352+
WHERE pgflow.step_tasks.run_id = fail_task.run_id
1353+
AND pgflow.step_tasks.step_slug = fail_task.step_slug
1354+
AND pgflow.step_tasks.task_index = fail_task.task_index;
1355+
RETURN;
1356+
END IF;
1357+
12821358
WITH run_lock AS (
12831359
SELECT * FROM pgflow.runs
12841360
WHERE pgflow.runs.run_id = fail_task.run_id
@@ -1384,9 +1460,13 @@ run_update AS (
13841460
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
13851461
ELSE NULL
13861462
END,
1387-
-- Decrement remaining_steps when step was skipped (not failed, run continues)
1463+
-- Decrement remaining_steps only on FIRST transition to skipped
1464+
-- (not when step was already skipped and a second task fails)
1465+
-- Uses PL/pgSQL variable captured before CTE chain
13881466
remaining_steps = CASE
1389-
WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1
1467+
WHEN (select status from maybe_fail_step) = 'skipped'
1468+
AND v_prev_step_status != 'skipped'
1469+
THEN pgflow.runs.remaining_steps - 1
13901470
ELSE pgflow.runs.remaining_steps
13911471
END
13921472
WHERE pgflow.runs.run_id = fail_task.run_id
@@ -1425,6 +1505,17 @@ END IF;
14251505

14261506
-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade')
14271507
IF v_task_exhausted AND v_step_skipped THEN
1508+
-- Archive all queued/started sibling task messages for this step
1509+
PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
1510+
FROM pgflow.step_tasks st
1511+
JOIN pgflow.runs r ON st.run_id = r.run_id
1512+
WHERE st.run_id = fail_task.run_id
1513+
AND st.step_slug = fail_task.step_slug
1514+
AND st.status IN ('queued', 'started')
1515+
AND st.message_id IS NOT NULL
1516+
GROUP BY r.flow_slug
1517+
HAVING COUNT(st.message_id) > 0;
1518+
14281519
-- Send broadcast event for step skipped
14291520
PERFORM realtime.send(
14301521
jsonb_build_object(
@@ -1717,8 +1808,14 @@ with tasks as (
17171808
where task.flow_slug = start_tasks.flow_slug
17181809
and task.message_id = any(msg_ids)
17191810
and task.status = 'queued'
1720-
-- MVP: Don't start tasks on failed runs
1721-
and r.status != 'failed'
1811+
and r.status = 'started'
1812+
and exists (
1813+
select 1
1814+
from pgflow.step_states ss
1815+
where ss.run_id = task.run_id
1816+
and ss.step_slug = task.step_slug
1817+
and ss.status = 'started'
1818+
)
17221819
),
17231820
start_tasks_update as (
17241821
update pgflow.step_tasks

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:NOsAYua/Juse0euNM4usm7M9DDPL7Btt5YvbexpfllA=
1+
h1:Hk7WDNVqZP9iYz/vW2Dqe/G3qKdw6i2FVIYl05jn6Kk=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -18,4 +18,4 @@ h1:NOsAYua/Juse0euNM4usm7M9DDPL7Btt5YvbexpfllA=
1818
20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o=
1919
20260120205547_pgflow_requeue_stalled_tasks.sql h1:4wCBBvjtETCgJf1eXmlH5wCTKDUhiLi0uzsFG1V528E=
2020
20260124113408_pgflow_auth_secret_support.sql h1:i/s1JkBqRElN6FOYFQviJt685W08SuSo30aP25lNlLc=
21-
20260206115746_pgflow_step_conditions.sql h1:rGmG0hwC40AyEoofwX9Pj1b6DNiPG+pX9aLjEMgIoSQ=
21+
20260214181656_pgflow_step_conditions.sql h1:uLPoOD/hPrerMACS6CThb7t7T5LKLpMMrdFXXi4ZQ5s=

0 commit comments

Comments
 (0)