Skip to content

Commit 7ae95e9

Browse files
authored
fix: improve condition handling and event emission for failed steps (#607)
# Improved Condition Handling and Task Failure Management This PR enhances the condition handling and task failure management in PGFlow with several key improvements: 1. Added default values for condition handling modes: - `whenUnmet` defaults to 'skip' - `whenExhausted` defaults to 'fail' 2. Improved failure handling for unmet conditions: - Added realtime event broadcasting for step and run failures - Implemented automatic archiving of queued messages when a run fails due to unmet conditions 3. Enhanced task failure handling: - Made task failure handling more robust and idempotent for replayed failures - Fixed the order of operations in skip-cascade scenarios to ensure proper propagation - Added proper handling of taskless map steps after dependency skips 4. Added comprehensive test coverage: - Tests for condition evaluation failure events - Tests for message archiving on failure - Tests for idempotent task failure handling - Tests for proper downstream step activation after skips These changes improve the reliability and observability of workflow execution, particularly in failure scenarios.
1 parent 6819539 commit 7ae95e9

21 files changed

+1130
-97
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
create or replace function pgflow._archive_task_message(
2+
p_run_id uuid,
3+
p_step_slug text,
4+
p_task_index int
5+
)
6+
returns void
7+
language sql
8+
volatile
9+
set search_path to ''
10+
as $$
11+
SELECT pgmq.archive(
12+
r.flow_slug,
13+
ARRAY_AGG(st.message_id)
14+
)
15+
FROM pgflow.step_tasks st
16+
JOIN pgflow.runs r ON st.run_id = r.run_id
17+
WHERE st.run_id = p_run_id
18+
AND st.step_slug = p_step_slug
19+
AND st.task_index = p_task_index
20+
AND st.message_id IS NOT NULL
21+
GROUP BY r.flow_slug
22+
HAVING COUNT(st.message_id) > 0;
23+
$$;

pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,43 @@ BEGIN
116116
failed_at = now()
117117
WHERE pgflow.runs.run_id = cascade_resolve_conditions.run_id;
118118

119+
PERFORM realtime.send(
120+
jsonb_build_object(
121+
'event_type', 'step:failed',
122+
'run_id', cascade_resolve_conditions.run_id,
123+
'step_slug', v_first_fail.step_slug,
124+
'status', 'failed',
125+
'error_message', 'Condition not met',
126+
'failed_at', now()
127+
),
128+
concat('step:', v_first_fail.step_slug, ':failed'),
129+
concat('pgflow:run:', cascade_resolve_conditions.run_id),
130+
false
131+
);
132+
133+
PERFORM realtime.send(
134+
jsonb_build_object(
135+
'event_type', 'run:failed',
136+
'run_id', cascade_resolve_conditions.run_id,
137+
'flow_slug', v_first_fail.flow_slug,
138+
'status', 'failed',
139+
'error_message', 'Condition not met',
140+
'failed_at', now()
141+
),
142+
'run:failed',
143+
concat('pgflow:run:', cascade_resolve_conditions.run_id),
144+
false
145+
);
146+
147+
PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
148+
FROM pgflow.step_tasks st
149+
JOIN pgflow.runs r ON st.run_id = r.run_id
150+
WHERE st.run_id = cascade_resolve_conditions.run_id
151+
AND st.status IN ('queued', 'started')
152+
AND st.message_id IS NOT NULL
153+
GROUP BY r.flow_slug
154+
HAVING COUNT(st.message_id) > 0;
155+
119156
RETURN false;
120157
END IF;
121158

@@ -180,6 +217,7 @@ BEGIN
180217
FROM unmet_skip_steps uss
181218
WHERE ss.run_id = cascade_resolve_conditions.run_id
182219
AND ss.step_slug = uss.step_slug
220+
AND ss.status = 'created'
183221
RETURNING
184222
ss.*,
185223
realtime.send(

pkgs/core/schemas/0100_function_complete_task.sql

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,12 @@ IF v_dependent_map_slug IS NOT NULL THEN
146146
AND st.task_index = complete_task.task_index
147147
AND st.message_id IS NOT NULL;
148148

149-
-- Return empty result
150-
RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false;
149+
-- Return the failed task row (API contract: always return task row)
150+
RETURN QUERY
151+
SELECT * FROM pgflow.step_tasks st
152+
WHERE st.run_id = complete_task.run_id
153+
AND st.step_slug = complete_task.step_slug
154+
AND st.task_index = complete_task.task_index;
151155
RETURN;
152156
END IF;
153157

pkgs/core/schemas/0100_function_create_flow_from_shape.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ BEGIN
4848
timeout => (v_step_options->>'timeout')::int,
4949
start_delay => (v_step_options->>'startDelay')::int,
5050
step_type => v_step->>'stepType',
51-
when_unmet => v_step->>'whenUnmet',
52-
when_exhausted => v_step->>'whenExhausted',
51+
when_unmet => COALESCE(v_step->>'whenUnmet', 'skip'),
52+
when_exhausted => COALESCE(v_step->>'whenExhausted', 'fail'),
5353
required_input_pattern => CASE
5454
WHEN (v_step->'requiredInputPattern'->>'defined')::boolean
5555
THEN v_step->'requiredInputPattern'->'value'

pkgs/core/schemas/0100_function_fail_task.sql

Lines changed: 48 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -140,45 +140,42 @@ maybe_fail_step AS (
140140
WHERE pgflow.step_states.run_id = fail_task.run_id
141141
AND pgflow.step_states.step_slug = fail_task.step_slug
142142
RETURNING pgflow.step_states.*
143+
),
144+
run_update AS (
145+
-- Update run status: only fail when when_exhausted='fail' and step was failed
146+
UPDATE pgflow.runs
147+
SET status = CASE
148+
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
149+
ELSE status
150+
END,
151+
failed_at = CASE
152+
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
153+
ELSE NULL
154+
END,
155+
-- Decrement remaining_steps when step was skipped (not failed, run continues)
156+
remaining_steps = CASE
157+
WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1
158+
ELSE pgflow.runs.remaining_steps
159+
END
160+
WHERE pgflow.runs.run_id = fail_task.run_id
161+
RETURNING pgflow.runs.status
143162
)
144-
-- Update run status: only fail when when_exhausted='fail' and step was failed
145-
UPDATE pgflow.runs
146-
SET status = CASE
147-
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
148-
ELSE status
149-
END,
150-
failed_at = CASE
151-
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
152-
ELSE NULL
153-
END,
154-
-- Decrement remaining_steps when step was skipped (not failed, run continues)
155-
remaining_steps = CASE
156-
WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1
157-
ELSE pgflow.runs.remaining_steps
158-
END
159-
WHERE pgflow.runs.run_id = fail_task.run_id
160-
RETURNING (status = 'failed') INTO v_run_failed;
163+
SELECT
164+
COALESCE((SELECT status = 'failed' FROM run_update), false),
165+
COALESCE((SELECT status = 'failed' FROM maybe_fail_step), false),
166+
COALESCE((SELECT status = 'skipped' FROM maybe_fail_step), false),
167+
COALESCE((SELECT is_exhausted FROM task_status), false)
168+
INTO v_run_failed, v_step_failed, v_step_skipped, v_task_exhausted;
161169

162-
-- Capture when_exhausted mode and check if step was skipped for later processing
170+
-- Capture when_exhausted mode for later skip handling
163171
SELECT s.when_exhausted INTO v_when_exhausted
164172
FROM pgflow.steps s
165173
JOIN pgflow.runs r ON r.flow_slug = s.flow_slug
166-
WHERE r.run_id = fail_task.run_id
167-
AND s.step_slug = fail_task.step_slug;
168-
169-
SELECT (status = 'skipped') INTO v_step_skipped
170-
FROM pgflow.step_states
171-
WHERE pgflow.step_states.run_id = fail_task.run_id
172-
AND pgflow.step_states.step_slug = fail_task.step_slug;
173-
174-
-- Check if step failed by querying the step_states table
175-
SELECT (status = 'failed') INTO v_step_failed
176-
FROM pgflow.step_states
177-
WHERE pgflow.step_states.run_id = fail_task.run_id
178-
AND pgflow.step_states.step_slug = fail_task.step_slug;
174+
WHERE r.run_id = fail_task.run_id
175+
AND s.step_slug = fail_task.step_slug;
179176

180177
-- Send broadcast event for step failure if the step was failed
181-
IF v_step_failed THEN
178+
IF v_task_exhausted AND v_step_failed THEN
182179
PERFORM realtime.send(
183180
jsonb_build_object(
184181
'event_type', 'step:failed',
@@ -194,8 +191,8 @@ IF v_step_failed THEN
194191
);
195192
END IF;
196193

197-
-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade')
198-
IF v_step_skipped THEN
194+
-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade')
195+
IF v_task_exhausted AND v_step_skipped THEN
199196
-- Send broadcast event for step skipped
200197
PERFORM realtime.send(
201198
jsonb_build_object(
@@ -237,11 +234,26 @@ END IF;
237234
AND dep.dep_slug = fail_task.step_slug
238235
AND child_state.step_slug = dep.step_slug;
239236

240-
-- Start any steps that became ready after decrementing remaining_deps
241-
PERFORM pgflow.start_ready_steps(fail_task.run_id);
237+
-- Evaluate conditions on newly-ready dependent steps
238+
-- This must happen before cascade_complete_taskless_steps so that
239+
-- skipped steps can set initial_tasks=0 for their map dependents
240+
IF NOT pgflow.cascade_resolve_conditions(fail_task.run_id) THEN
241+
-- Run was failed due to a condition with when_unmet='fail'
242+
-- Archive the failed task's message before returning
243+
PERFORM pgflow._archive_task_message(fail_task.run_id, fail_task.step_slug, fail_task.task_index);
244+
-- Return the task row (API contract)
245+
RETURN QUERY SELECT * FROM pgflow.step_tasks
246+
WHERE pgflow.step_tasks.run_id = fail_task.run_id
247+
AND pgflow.step_tasks.step_slug = fail_task.step_slug
248+
AND pgflow.step_tasks.task_index = fail_task.task_index;
249+
RETURN;
250+
END IF;
242251

243252
-- Auto-complete taskless steps (e.g., map steps with initial_tasks=0 from skipped dep)
244253
PERFORM pgflow.cascade_complete_taskless_steps(fail_task.run_id);
254+
255+
-- Start steps that became ready after condition resolution and taskless completion
256+
PERFORM pgflow.start_ready_steps(fail_task.run_id);
245257
END IF;
246258

247259
-- Try to complete the run (remaining_steps may now be 0)

pkgs/core/src/database-types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,10 @@ export type Database = {
406406
[_ in never]: never
407407
}
408408
Functions: {
409+
_archive_task_message: {
410+
Args: { p_run_id: string; p_step_slug: string; p_task_index: number }
411+
Returns: undefined
412+
}
409413
_cascade_force_skip_steps: {
410414
Args: { run_id: string; skip_reason: string; step_slug: string }
411415
Returns: number

0 commit comments

Comments
 (0)