Skip to content

Commit 6690fa5

Browse files
authored
feat: add constraints for input patterns and improve cascade skip tests (#611)
# Add constraints and improve cascade skip functionality This PR adds database constraints to ensure `required_input_pattern` and `forbidden_input_pattern` are always JSON objects, preventing invalid pattern types. It also enhances the `_cascade_force_skip_steps` function to: 1. Return the count of skipped steps 2. Make the function idempotent (safe to call multiple times) 3. Improve event ordering tests with a more reliable trigger-based approach The changes include: - Added CHECK constraints to the `steps` table to validate pattern types - Made `_cascade_force_skip_steps` return the number of steps skipped - Added a new test for idempotent behavior of `_cascade_force_skip_steps` - Improved event ordering tests to use a trigger-based approach instead of timestamp ordering - Added tests to verify rejection of invalid pattern types (arrays, strings) - Enhanced flow shape comparison tests to detect pattern differences These changes improve data integrity and make the cascade skip functionality more robust and predictable.
1 parent 6873dbe commit 6690fa5

11 files changed

+254
-47
lines changed

pkgs/core/schemas/0050_tables_definitions.sql

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ create table pgflow.steps (
2626
opt_start_delay int,
2727
required_input_pattern jsonb, -- JSON pattern for @> containment check (if)
2828
forbidden_input_pattern jsonb, -- JSON pattern for NOT @> containment check (ifNot)
29+
constraint required_input_pattern_is_object check (
30+
required_input_pattern is null or jsonb_typeof(required_input_pattern) = 'object'
31+
),
32+
constraint forbidden_input_pattern_is_object check (
33+
forbidden_input_pattern is null or jsonb_typeof(forbidden_input_pattern) = 'object'
34+
),
2935
when_unmet text not null default 'skip', -- What to do when condition not met (skip is natural default)
3036
when_exhausted text not null default 'fail', -- What to do when handler fails after retries
3137
created_at timestamptz not null default now(),

pkgs/core/supabase/migrations/20260214181656_pgflow_step_conditions.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ END) <= 1), ADD CONSTRAINT "skip_reason_matches_status" CHECK (((status = 'skipp
1515
-- Create index "idx_step_states_skipped" to table: "step_states"
1616
CREATE INDEX "idx_step_states_skipped" ON "pgflow"."step_states" ("run_id", "step_slug") WHERE (status = 'skipped'::text);
1717
-- Modify "steps" table
18-
ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "when_exhausted_is_valid" CHECK (when_exhausted = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "required_input_pattern" jsonb NULL, ADD COLUMN "forbidden_input_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_exhausted" text NOT NULL DEFAULT 'fail';
18+
ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "forbidden_input_pattern_is_object" CHECK ((forbidden_input_pattern IS NULL) OR (jsonb_typeof(forbidden_input_pattern) = 'object'::text)), ADD CONSTRAINT "required_input_pattern_is_object" CHECK ((required_input_pattern IS NULL) OR (jsonb_typeof(required_input_pattern) = 'object'::text)), ADD CONSTRAINT "when_exhausted_is_valid" CHECK (when_exhausted = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "required_input_pattern" jsonb NULL, ADD COLUMN "forbidden_input_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_exhausted" text NOT NULL DEFAULT 'fail';
1919
-- Modify "_compare_flow_shapes" function
2020
CREATE OR REPLACE FUNCTION "pgflow"."_compare_flow_shapes" ("p_local" jsonb, "p_db" jsonb) RETURNS text[] LANGUAGE plpgsql STABLE SET "search_path" = '' AS $BODY$
2121
DECLARE

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:Hk7WDNVqZP9iYz/vW2Dqe/G3qKdw6i2FVIYl05jn6Kk=
1+
h1:ZZJEI67KUViUzd0rVHGMZPpbUXU2MFSXdTIe/yyJqyE=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -18,4 +18,4 @@ h1:Hk7WDNVqZP9iYz/vW2Dqe/G3qKdw6i2FVIYl05jn6Kk=
1818
20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o=
1919
20260120205547_pgflow_requeue_stalled_tasks.sql h1:4wCBBvjtETCgJf1eXmlH5wCTKDUhiLi0uzsFG1V528E=
2020
20260124113408_pgflow_auth_secret_support.sql h1:i/s1JkBqRElN6FOYFQviJt685W08SuSo30aP25lNlLc=
21-
20260214181656_pgflow_step_conditions.sql h1:uLPoOD/hPrerMACS6CThb7t7T5LKLpMMrdFXXi4ZQ5s=
21+
20260214181656_pgflow_step_conditions.sql h1:nG52qhydTJMeLTd4AoI4buATJNHdEN2C1ZJdKp+i7wE=

pkgs/core/supabase/tests/_cascade_force_skip_steps/broadcast_order.test.sql

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,28 @@ select pgflow.add_step('order_test', 'step_a');
1010
select pgflow.add_step('order_test', 'step_b', ARRAY['step_a']);
1111
select pgflow.add_step('order_test', 'step_c', ARRAY['step_b']);
1212

13+
-- Setup capture table for reliable insertion order tracking
14+
create temporary table skip_event_log (
15+
seq bigserial primary key,
16+
run_id uuid not null,
17+
step_slug text not null
18+
);
19+
20+
create or replace function pg_temp.capture_skip_event()
21+
returns trigger language plpgsql as $$
22+
begin
23+
if new.payload->>'event_type' = 'step:skipped' then
24+
insert into skip_event_log(run_id, step_slug)
25+
values ((new.payload->>'run_id')::uuid, new.payload->>'step_slug');
26+
end if;
27+
return new;
28+
end;
29+
$$;
30+
31+
create trigger capture_skip_event_trigger
32+
after insert on realtime.messages
33+
for each row execute function pg_temp.capture_skip_event();
34+
1335
-- Start flow
1436
with flow as (
1537
select * from pgflow.start_flow('order_test', '{}'::jsonb)
@@ -33,28 +55,14 @@ select is(
3355
);
3456

3557
-- Test 2: Events should be in dependency order (A before B before C)
36-
with ordered_events as (
37-
select
38-
inserted_at,
39-
payload->>'step_slug' as step_slug,
40-
row_number() over (order by inserted_at, payload->>'step_slug') as event_order
41-
from realtime.messages
42-
where payload->>'event_type' = 'step:skipped'
43-
and payload->>'run_id' = (select run_id::text from run_ids)
44-
),
45-
step_a_event as (
46-
select event_order from ordered_events where step_slug = 'step_a'
47-
),
48-
step_b_event as (
49-
select event_order from ordered_events where step_slug = 'step_b'
50-
),
51-
step_c_event as (
52-
select event_order from ordered_events where step_slug = 'step_c'
53-
)
54-
select ok(
55-
(select event_order from step_a_event) < (select event_order from step_b_event)
56-
AND (select event_order from step_b_event) < (select event_order from step_c_event),
57-
'Events must be in dependency order (A -> B -> C)'
58+
-- Uses trigger-based capture for reliable ordering (no timestamp tie-break issues)
59+
select results_eq(
60+
$$ select step_slug
61+
from skip_event_log
62+
where run_id = (select run_id from run_ids)
63+
order by seq $$,
64+
$$ values ('step_a'), ('step_b'), ('step_c') $$,
65+
'Should broadcast step:skipped in dependency order (step_a before step_b before step_c)'
5866
);
5967

6068
-- Clean up

pkgs/core/supabase/tests/_cascade_force_skip_steps/cascade_through_multiple_levels.test.sql

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
-- Test: _cascade_force_skip_steps - Cascade through multiple DAG levels
22
-- Verifies skipping A cascades through A -> B -> C chain
33
begin;
4-
select plan(8);
4+
select plan(9);
55

66
-- Reset database and create a flow: A -> B -> C
77
select pgflow_tests.reset_db();
@@ -17,10 +17,14 @@ with flow as (
1717
select run_id into temporary run_ids from flow;
1818

1919
-- Skip step_a (should cascade to step_b and step_c)
20-
select pgflow._cascade_force_skip_steps(
21-
(select run_id from run_ids),
22-
'step_a',
23-
'handler_failed'
20+
select is(
21+
(select pgflow._cascade_force_skip_steps(
22+
(select run_id from run_ids),
23+
'step_a',
24+
'handler_failed'
25+
)),
26+
3::int,
27+
'Should return count of 3 skipped steps (step_a + step_b + step_c)'
2428
);
2529

2630
-- Test 1: step_a should be skipped with handler_failed reason

pkgs/core/supabase/tests/_cascade_force_skip_steps/cascade_to_single_dependent.test.sql

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
-- Test: _cascade_force_skip_steps - Cascade to single dependent
22
-- Verifies skipping a step cascades to its direct dependent
33
begin;
4-
select plan(7);
4+
select plan(8);
55

66
-- Reset database and create a flow: A -> B
77
select pgflow_tests.reset_db();
@@ -16,10 +16,14 @@ with flow as (
1616
select run_id into temporary run_ids from flow;
1717

1818
-- Skip step_a (should cascade to step_b)
19-
select pgflow._cascade_force_skip_steps(
20-
(select run_id from run_ids),
21-
'step_a',
22-
'condition_unmet'
19+
select is(
20+
(select pgflow._cascade_force_skip_steps(
21+
(select run_id from run_ids),
22+
'step_a',
23+
'condition_unmet'
24+
)),
25+
2::int,
26+
'Should return count of 2 skipped steps (step_a + step_b)'
2327
);
2428

2529
-- Test 1: step_a should be skipped
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
\set ON_ERROR_STOP on
2+
\set QUIET on
3+
4+
begin;
5+
select plan(5);
6+
7+
select pgflow_tests.reset_db();
8+
9+
select pgflow.create_flow('idempotent_test');
10+
select pgflow.add_step('idempotent_test', 'map_step', '{}', max_attempts => 0, step_type => 'map', when_exhausted => 'skip');
11+
select pgflow.add_step('idempotent_test', 'dependent_step', ARRAY['map_step']);
12+
13+
select pgflow.start_flow('idempotent_test', '[1, 2, 3]'::jsonb);
14+
15+
with tasks as (
16+
select message_id, task_index
17+
from pgflow.step_tasks
18+
where flow_slug = 'idempotent_test' and step_slug = 'map_step'
19+
order by task_index
20+
)
21+
select pgflow.start_tasks('idempotent_test', array[(select message_id from tasks where task_index = 0)::bigint], pgflow_tests.ensure_worker('idempotent_test'));
22+
23+
create temporary table test_run as
24+
select run_id from pgflow.runs where flow_slug = 'idempotent_test';
25+
26+
select is(
27+
(select pgflow._cascade_force_skip_steps(
28+
(select run_id from test_run),
29+
'map_step',
30+
'condition_unmet'
31+
)),
32+
2::int,
33+
'First call should skip 2 steps (map_step + dependent_step)'
34+
);
35+
36+
create temporary table after_first as
37+
select
38+
(select remaining_steps from pgflow.runs where run_id = (select run_id from test_run)) as remaining_steps,
39+
(select count(*) from realtime.messages
40+
where payload->>'event_type' = 'step:skipped'
41+
and (payload->>'run_id')::uuid = (select run_id from test_run)) as event_count,
42+
(select count(*) from pgmq.a_idempotent_test) as archive_count;
43+
44+
select is(
45+
(select pgflow._cascade_force_skip_steps(
46+
(select run_id from test_run),
47+
'map_step',
48+
'condition_unmet'
49+
)),
50+
0::int,
51+
'Second call should return 0 (no new skips)'
52+
);
53+
54+
select is(
55+
(select remaining_steps from pgflow.runs where run_id = (select run_id from test_run)),
56+
(select remaining_steps from after_first),
57+
'remaining_steps should be unchanged after second call'
58+
);
59+
60+
select is(
61+
(select count(*) from realtime.messages
62+
where payload->>'event_type' = 'step:skipped'
63+
and (payload->>'run_id')::uuid = (select run_id from test_run)),
64+
(select event_count from after_first),
65+
'step:skipped event count should be unchanged after second call'
66+
);
67+
68+
select is(
69+
(select count(*) from pgmq.a_idempotent_test),
70+
(select archive_count from after_first),
71+
'Archive count should be unchanged after second call'
72+
);
73+
74+
select * from finish();
75+
rollback;

pkgs/core/supabase/tests/_cascade_force_skip_steps/single_step_skip.test.sql

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
-- Test: _cascade_force_skip_steps - Single step skip (base case)
22
-- Verifies the function can skip a single step without dependencies
33
begin;
4-
select plan(5);
4+
select plan(6);
55

66
-- Reset database and create a simple flow with no dependencies
77
select pgflow_tests.reset_db();
@@ -24,10 +24,14 @@ select is(
2424
);
2525

2626
-- Skip step_a
27-
select pgflow._cascade_force_skip_steps(
28-
(select run_id from run_ids),
29-
'step_a',
30-
'condition_unmet'
27+
select is(
28+
(select pgflow._cascade_force_skip_steps(
29+
(select run_id from run_ids),
30+
'step_a',
31+
'condition_unmet'
32+
)),
33+
1::int,
34+
'Should return count of 1 skipped step'
3135
);
3236

3337
-- Test 2: step_a should now have status 'skipped'
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
\set ON_ERROR_STOP on
2+
\set QUIET on
3+
4+
begin;
5+
select plan(4);
6+
select pgflow_tests.reset_db();
7+
8+
select pgflow.create_flow('invalid_pattern_test');
9+
10+
-- Test 1: required_input_pattern as array should fail
11+
select throws_ok(
12+
$$ select pgflow.add_step('invalid_pattern_test', 'step_array', required_input_pattern => '[]'::jsonb) $$,
13+
'new row for relation "steps" violates check constraint "required_input_pattern_is_object"',
14+
'Should reject array for required_input_pattern'
15+
);
16+
17+
-- Test 2: required_input_pattern as string should fail
18+
select throws_ok(
19+
$$ select pgflow.add_step('invalid_pattern_test', 'step_string', required_input_pattern => '"invalid"'::jsonb) $$,
20+
'new row for relation "steps" violates check constraint "required_input_pattern_is_object"',
21+
'Should reject string for required_input_pattern'
22+
);
23+
24+
-- Test 3: forbidden_input_pattern as array should fail
25+
select throws_ok(
26+
$$ select pgflow.add_step('invalid_pattern_test', 'step_forbidden_array', forbidden_input_pattern => '[]'::jsonb) $$,
27+
'new row for relation "steps" violates check constraint "forbidden_input_pattern_is_object"',
28+
'Should reject array for forbidden_input_pattern'
29+
);
30+
31+
-- Test 4: forbidden_input_pattern as string should fail
32+
select throws_ok(
33+
$$ select pgflow.add_step('invalid_pattern_test', 'step_forbidden_string', forbidden_input_pattern => '"invalid"'::jsonb) $$,
34+
'new row for relation "steps" violates check constraint "forbidden_input_pattern_is_object"',
35+
'Should reject string for forbidden_input_pattern'
36+
);
37+
38+
select finish();
39+
rollback;

pkgs/core/supabase/tests/compare_flow_shapes/pattern_differences.test.sql

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
begin;
2-
select plan(2);
2+
select plan(5);
33
select pgflow_tests.reset_db();
44

55
-- Test: Patterns with same value should match
@@ -38,5 +38,59 @@ select is(
3838
'Different requiredInputPattern values should be detected'
3939
);
4040

41+
-- Test: Different forbiddenInputPattern should be detected
42+
select is(
43+
pgflow._compare_flow_shapes(
44+
'{
45+
"steps": [
46+
{"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenExhausted": "fail", "requiredInputPattern": {"defined": true, "value": {"role": "user"}}, "forbiddenInputPattern": {"defined": true, "value": {"role": "admin"}}}
47+
]
48+
}'::jsonb,
49+
'{
50+
"steps": [
51+
{"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenExhausted": "fail", "requiredInputPattern": {"defined": true, "value": {"role": "user"}}, "forbiddenInputPattern": {"defined": true, "value": {"role": "banned"}}}
52+
]
53+
}'::jsonb
54+
),
55+
ARRAY['Step at index 0: forbiddenInputPattern differs ''{"value": {"role": "admin"}, "defined": true}'' vs ''{"value": {"role": "banned"}, "defined": true}''']::text[],
56+
'Different forbiddenInputPattern values should be detected'
57+
);
58+
59+
-- Test: forbiddenInputPattern defined transition should be detected
60+
select is(
61+
pgflow._compare_flow_shapes(
62+
'{
63+
"steps": [
64+
{"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenExhausted": "fail", "requiredInputPattern": {"defined": false}, "forbiddenInputPattern": {"defined": false}}
65+
]
66+
}'::jsonb,
67+
'{
68+
"steps": [
69+
{"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenExhausted": "fail", "requiredInputPattern": {"defined": false}, "forbiddenInputPattern": {"defined": true, "value": {"admin": true}}}
70+
]
71+
}'::jsonb
72+
),
73+
ARRAY['Step at index 0: forbiddenInputPattern differs ''{"defined": false}'' vs ''{"value": {"admin": true}, "defined": true}''']::text[],
74+
'forbiddenInputPattern defined transition should be detected'
75+
);
76+
77+
-- Test: requiredInputPattern defined transition should be detected
78+
select is(
79+
pgflow._compare_flow_shapes(
80+
'{
81+
"steps": [
82+
{"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenExhausted": "fail", "requiredInputPattern": {"defined": false}, "forbiddenInputPattern": {"defined": false}}
83+
]
84+
}'::jsonb,
85+
'{
86+
"steps": [
87+
{"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenExhausted": "fail", "requiredInputPattern": {"defined": true, "value": {"status": "ready"}}, "forbiddenInputPattern": {"defined": false}}
88+
]
89+
}'::jsonb
90+
),
91+
ARRAY['Step at index 0: requiredInputPattern differs ''{"defined": false}'' vs ''{"value": {"status": "ready"}, "defined": true}''']::text[],
92+
'requiredInputPattern defined transition should be detected'
93+
);
94+
4195
select finish();
4296
rollback;

0 commit comments

Comments
 (0)