Skip to content

Commit dbd63dd

Browse files
authored
feat: add conditional step execution with skip infrastructure (#572)
# Add conditional execution infrastructure to pgflow This PR adds support for conditional step execution in pgflow with a new "skip" infrastructure. The implementation includes: - New step configuration options: - `condition_pattern`: JSONB pattern for @> containment check - `when_unmet`: What to do when condition is not met (options: 'fail', 'skip', 'skip-cascade') - `when_failed`: What to do when handler fails after retries (options: 'fail', 'skip', 'skip-cascade') - New step status and tracking: - Added 'skipped' status to step states - New columns: `skip_reason`, `skipped_at` to track why and when a step was skipped - Skip reasons include: 'condition_unmet', 'handler_failed', 'dependency_skipped' - Cascade functionality: - New `cascade_skip_steps` function to skip a step and all its downstream dependents - Respects the dependency graph when skipping steps - Updates run counters and broadcasts step:skipped events - Comprehensive test coverage: - Tests for parameter validation - Tests for cascading skips through multiple dependency levels - Tests for event payload format and ordering This feature enables more flexible workflows where steps can be conditionally executed based on runtime data.
1 parent bd742c4 commit dbd63dd

File tree

82 files changed

+8362
-309
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+8362
-309
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@pgflow/core': patch
3+
'@pgflow/dsl': patch
4+
---
5+
6+
Add whenFailed option for error handling after retries exhausted (fail, skip, skip-cascade)
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@pgflow/core': patch
3+
---
4+
5+
Add skip infrastructure schema for conditional execution - new columns (condition_pattern, when_unmet, when_failed, skip_reason, skipped_at), 'skipped' status, and cascade_skip_steps function

pkgs/core/schemas/0050_tables_definitions.sql

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ create table pgflow.steps (
2424
opt_base_delay int,
2525
opt_timeout int,
2626
opt_start_delay int,
27+
required_input_pattern jsonb, -- JSON pattern for @> containment check (if)
28+
forbidden_input_pattern jsonb, -- JSON pattern for NOT @> containment check (ifNot)
29+
when_unmet text not null default 'skip', -- What to do when condition not met (skip is natural default)
30+
when_failed text not null default 'fail', -- What to do when handler fails after retries
2731
created_at timestamptz not null default now(),
2832
primary key (flow_slug, step_slug),
2933
unique (flow_slug, step_index), -- Ensure step_index is unique within a flow
@@ -32,7 +36,9 @@ create table pgflow.steps (
3236
constraint opt_max_attempts_is_nonnegative check (opt_max_attempts is null or opt_max_attempts >= 0),
3337
constraint opt_base_delay_is_nonnegative check (opt_base_delay is null or opt_base_delay >= 0),
3438
constraint opt_timeout_is_positive check (opt_timeout is null or opt_timeout > 0),
35-
constraint opt_start_delay_is_nonnegative check (opt_start_delay is null or opt_start_delay >= 0)
39+
constraint opt_start_delay_is_nonnegative check (opt_start_delay is null or opt_start_delay >= 0),
40+
constraint when_unmet_is_valid check (when_unmet in ('fail', 'skip', 'skip-cascade')),
41+
constraint when_failed_is_valid check (when_failed in ('fail', 'skip', 'skip-cascade'))
3642
);
3743

3844
-- Dependencies table - stores relationships between steps

pkgs/core/schemas/0060_tables_runtime.sql

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,20 @@ create table pgflow.step_states (
3131
remaining_deps int not null default 0 check (remaining_deps >= 0),
3232
output jsonb, -- Step output: stored atomically with status=completed transition
3333
error_message text,
34+
skip_reason text, -- Why step was skipped: condition_unmet, handler_failed, dependency_skipped
3435
created_at timestamptz not null default now(),
3536
started_at timestamptz,
3637
completed_at timestamptz,
3738
failed_at timestamptz,
39+
skipped_at timestamptz,
3840
primary key (run_id, step_slug),
3941
foreign key (flow_slug, step_slug)
4042
references pgflow.steps (flow_slug, step_slug),
41-
constraint status_is_valid check (status in ('created', 'started', 'completed', 'failed')),
43+
constraint status_is_valid check (status in ('created', 'started', 'completed', 'failed', 'skipped')),
4244
constraint status_and_remaining_tasks_match check (status != 'completed' or remaining_tasks = 0),
4345
-- Add constraint to ensure remaining_tasks is only set when step has started
4446
constraint remaining_tasks_state_consistency check (
45-
remaining_tasks is null or status != 'created'
47+
remaining_tasks is null or status not in ('created', 'skipped')
4648
),
4749
constraint initial_tasks_known_when_started check (
4850
status != 'started' or initial_tasks is not null
@@ -52,16 +54,29 @@ create table pgflow.step_states (
5254
constraint output_only_for_completed_or_null check (
5355
output is null or status = 'completed'
5456
),
55-
constraint completed_at_or_failed_at check (not (completed_at is not null and failed_at is not null)),
57+
-- skip_reason is required for skipped status and forbidden for other statuses
58+
constraint skip_reason_matches_status check (
59+
(status = 'skipped' and skip_reason is not null) or
60+
(status != 'skipped' and skip_reason is null)
61+
),
62+
constraint completed_at_or_failed_at_or_skipped_at check (
63+
(
64+
case when completed_at is not null then 1 else 0 end +
65+
case when failed_at is not null then 1 else 0 end +
66+
case when skipped_at is not null then 1 else 0 end
67+
) <= 1
68+
),
5669
constraint started_at_is_after_created_at check (started_at is null or started_at >= created_at),
5770
constraint completed_at_is_after_started_at check (completed_at is null or completed_at >= started_at),
58-
constraint failed_at_is_after_started_at check (failed_at is null or failed_at >= started_at)
71+
constraint failed_at_is_after_started_at check (failed_at is null or failed_at >= started_at),
72+
constraint skipped_at_is_after_created_at check (skipped_at is null or skipped_at >= created_at)
5973
);
6074

6175
create index if not exists idx_step_states_ready on pgflow.step_states (run_id, status, remaining_deps) where status
6276
= 'created'
6377
and remaining_deps = 0;
6478
create index if not exists idx_step_states_failed on pgflow.step_states (run_id, step_slug) where status = 'failed';
79+
create index if not exists idx_step_states_skipped on pgflow.step_states (run_id, step_slug) where status = 'skipped';
6580
create index if not exists idx_step_states_flow_slug on pgflow.step_states (flow_slug);
6681
create index if not exists idx_step_states_run_id on pgflow.step_states (run_id);
6782

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
-- _cascade_force_skip_steps: Skip a step and cascade to all downstream dependents
2+
-- Used when a condition is unmet (whenUnmet: skip-cascade) or handler fails (whenFailed: skip-cascade)
3+
create or replace function pgflow._cascade_force_skip_steps(
4+
run_id uuid,
5+
step_slug text,
6+
skip_reason text
7+
)
8+
returns int
9+
language plpgsql
10+
as $$
11+
DECLARE
12+
v_flow_slug text;
13+
v_total_skipped int := 0;
14+
BEGIN
15+
-- Get flow_slug for this run
16+
SELECT r.flow_slug INTO v_flow_slug
17+
FROM pgflow.runs r
18+
WHERE r.run_id = _cascade_force_skip_steps.run_id;
19+
20+
IF v_flow_slug IS NULL THEN
21+
RAISE EXCEPTION 'Run not found: %', _cascade_force_skip_steps.run_id;
22+
END IF;
23+
24+
-- ==========================================
25+
-- SKIP STEPS IN TOPOLOGICAL ORDER
26+
-- ==========================================
27+
-- Use recursive CTE to find all downstream dependents,
28+
-- then skip them in topological order (by step_index)
29+
WITH RECURSIVE
30+
-- ---------- Find all downstream steps ----------
31+
downstream_steps AS (
32+
-- Base case: the trigger step
33+
SELECT
34+
s.flow_slug,
35+
s.step_slug,
36+
s.step_index,
37+
_cascade_force_skip_steps.skip_reason AS reason -- Original reason for trigger step
38+
FROM pgflow.steps s
39+
WHERE s.flow_slug = v_flow_slug
40+
AND s.step_slug = _cascade_force_skip_steps.step_slug
41+
42+
UNION ALL
43+
44+
-- Recursive case: steps that depend on already-found steps
45+
SELECT
46+
s.flow_slug,
47+
s.step_slug,
48+
s.step_index,
49+
'dependency_skipped'::text AS reason -- Downstream steps get this reason
50+
FROM pgflow.steps s
51+
JOIN pgflow.deps d ON d.flow_slug = s.flow_slug AND d.step_slug = s.step_slug
52+
JOIN downstream_steps ds ON ds.flow_slug = d.flow_slug AND ds.step_slug = d.dep_slug
53+
),
54+
-- ---------- Deduplicate and order by step_index ----------
55+
steps_to_skip AS (
56+
SELECT DISTINCT ON (ds.step_slug)
57+
ds.flow_slug,
58+
ds.step_slug,
59+
ds.step_index,
60+
ds.reason
61+
FROM downstream_steps ds
62+
ORDER BY ds.step_slug, ds.step_index -- Keep first occurrence (trigger step has original reason)
63+
),
64+
-- ---------- Skip the steps ----------
65+
skipped AS (
66+
UPDATE pgflow.step_states ss
67+
SET status = 'skipped',
68+
skip_reason = sts.reason,
69+
skipped_at = now(),
70+
remaining_tasks = NULL -- Clear remaining_tasks for skipped steps
71+
FROM steps_to_skip sts
72+
WHERE ss.run_id = _cascade_force_skip_steps.run_id
73+
AND ss.step_slug = sts.step_slug
74+
AND ss.status IN ('created', 'started') -- Only skip non-terminal steps
75+
RETURNING
76+
ss.*,
77+
-- Broadcast step:skipped event
78+
realtime.send(
79+
jsonb_build_object(
80+
'event_type', 'step:skipped',
81+
'run_id', ss.run_id,
82+
'flow_slug', ss.flow_slug,
83+
'step_slug', ss.step_slug,
84+
'status', 'skipped',
85+
'skip_reason', ss.skip_reason,
86+
'skipped_at', ss.skipped_at
87+
),
88+
concat('step:', ss.step_slug, ':skipped'),
89+
concat('pgflow:run:', ss.run_id),
90+
false
91+
) as _broadcast_result
92+
),
93+
-- ---------- Update run counters ----------
94+
run_updates AS (
95+
UPDATE pgflow.runs r
96+
SET remaining_steps = r.remaining_steps - skipped_count.count
97+
FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count
98+
WHERE r.run_id = _cascade_force_skip_steps.run_id
99+
AND skipped_count.count > 0
100+
)
101+
SELECT COUNT(*) INTO v_total_skipped FROM skipped;
102+
103+
RETURN v_total_skipped;
104+
END;
105+
$$;

pkgs/core/schemas/0100_function_add_step.sql

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@ create or replace function pgflow.add_step(
66
base_delay int default null,
77
timeout int default null,
88
start_delay int default null,
9-
step_type text default 'single'
9+
step_type text default 'single',
10+
required_input_pattern jsonb default null,
11+
forbidden_input_pattern jsonb default null,
12+
when_unmet text default 'skip',
13+
when_failed text default 'fail'
1014
)
1115
returns pgflow.steps
1216
language plpgsql
@@ -22,7 +26,7 @@ BEGIN
2226
-- 0 dependencies (root map - maps over flow input array)
2327
-- 1 dependency (dependent map - maps over dependency output array)
2428
IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN
25-
RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %',
29+
RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %',
2630
add_step.step_slug,
2731
COALESCE(array_length(add_step.deps_slugs, 1), 0),
2832
array_to_string(add_step.deps_slugs, ', ');
@@ -36,18 +40,23 @@ BEGIN
3640
-- Create the step
3741
INSERT INTO pgflow.steps (
3842
flow_slug, step_slug, step_type, step_index, deps_count,
39-
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay
43+
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay,
44+
required_input_pattern, forbidden_input_pattern, when_unmet, when_failed
4045
)
4146
VALUES (
4247
add_step.flow_slug,
4348
add_step.step_slug,
4449
COALESCE(add_step.step_type, 'single'),
45-
next_idx,
50+
next_idx,
4651
COALESCE(array_length(add_step.deps_slugs, 1), 0),
4752
add_step.max_attempts,
4853
add_step.base_delay,
4954
add_step.timeout,
50-
add_step.start_delay
55+
add_step.start_delay,
56+
add_step.required_input_pattern,
57+
add_step.forbidden_input_pattern,
58+
add_step.when_unmet,
59+
add_step.when_failed
5160
)
5261
ON CONFLICT ON CONSTRAINT steps_pkey
5362
DO UPDATE SET step_slug = EXCLUDED.step_slug
@@ -59,7 +68,7 @@ BEGIN
5968
FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug)
6069
WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0
6170
ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING;
62-
71+
6372
RETURN result_step;
6473
END;
6574
$$;

0 commit comments

Comments
 (0)