Skip to content

Commit befd2d5

Browse files
committed
feat: include options in FlowShape for proper flow creation (#471)
# Include Options in FlowShape for Proper Flow Creation This PR adds support for including flow and step options in the FlowShape structure, ensuring that options defined in the DSL are properly passed through to the database during flow creation. ## Problem Fixed Previously, `_create_flow_from_shape()` was using default values instead of DSL-defined options, causing inconsistency between: - CLI path: `compileFlow()` → SQL with options - Runtime path: `extractFlowShape()` → `_create_flow_from_shape()` → defaults only ## Changes 1. Modified `FlowShape` and `StepShape` interfaces to include optional `options` fields 2. Updated `extractFlowShape()` to include flow and step options in the shape 3. Changed SQL functions to use NULL parameters as "use default" via COALESCE 4. Updated `_create_flow_from_shape()` to pass options from shape to `create_flow()` 5. Added comprehensive tests for options handling ## Design Decision Options are now included in the shape for proper flow creation, but are still excluded from shape comparison. This maintains the ability to tune options at runtime without triggering recompilation. SQL functions now treat NULL parameters as "use default" via COALESCE, ensuring defaults are defined in a single place (inside the function) rather than being hardcoded by callers.
1 parent 2268eec commit befd2d5

9 files changed

Lines changed: 436 additions & 30 deletions

File tree

PLAN.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,44 @@ async function handleEnsureCompiled(
349349

350350
---
351351

352+
## Phase 3.5: Include Options in FlowShape
353+
354+
Options (maxAttempts, baseDelay, timeout, startDelay) must be included in FlowShape for proper flow creation, while remaining excluded from shape comparison (options can be tuned at runtime without recompilation).
355+
356+
### Problem
357+
358+
`_create_flow_from_shape()` was using defaults instead of DSL-defined options, causing drift between:
359+
- CLI path: `compileFlow()` -> SQL with options
360+
- Runtime path: `extractFlowShape()` -> `_create_flow_from_shape()` -> defaults only
361+
362+
### Solution
363+
364+
1. **FlowShape includes options** - Added optional `options` field to FlowShape/StepShape
365+
2. **NULL = use default** - Modified `create_flow()` to use COALESCE internally
366+
3. **Pass options through** - Updated `_create_flow_from_shape()` to pass options from shape
367+
368+
### Key Files
369+
370+
| File | Change |
371+
|------|--------|
372+
| `pkgs/dsl/src/flow-shape.ts` | Add options to interfaces, update extractFlowShape() |
373+
| `pkgs/core/schemas/0100_function_create_flow.sql` | NULL params = use default via COALESCE |
374+
| `pkgs/core/schemas/0100_function_create_flow_from_shape.sql` | Pass options from shape |
375+
376+
### Design Decision: NULL = Use Default
377+
378+
SQL functions now treat NULL parameters as "use default" via COALESCE:
379+
380+
```sql
381+
-- In create_flow():
382+
INSERT INTO pgflow.flows (..., opt_max_attempts, ...)
383+
VALUES (..., COALESCE(max_attempts, 3), ...);
384+
```
385+
386+
This prevents drift - defaults are defined in ONE place (inside the function), not hardcoded by callers.
387+
388+
---
389+
352390
## Phase 4: Worker Configuration (Edge-Worker Package)
353391

354392
### Modify: `pkgs/edge-worker/src/core/workerConfigTypes.ts`

pkgs/core/schemas/0100_function_create_flow.sql

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,29 @@
1+
-- Create a new flow with optional configuration.
2+
-- NULL parameters use defaults defined in the 'defaults' CTE below.
3+
-- This allows callers to pass NULL to explicitly use the default value.
14
create or replace function pgflow.create_flow(
25
flow_slug text,
3-
max_attempts int default 3,
4-
base_delay int default 5,
5-
timeout int default 60
6+
max_attempts int default null,
7+
base_delay int default null,
8+
timeout int default null
69
)
710
returns pgflow.flows
811
language sql
912
set search_path to ''
1013
volatile
1114
as $$
1215
WITH
16+
defaults AS (
17+
SELECT 3 AS def_max_attempts, 5 AS def_base_delay, 60 AS def_timeout
18+
),
1319
flow_upsert AS (
1420
INSERT INTO pgflow.flows (flow_slug, opt_max_attempts, opt_base_delay, opt_timeout)
15-
VALUES (flow_slug, max_attempts, base_delay, timeout)
21+
SELECT
22+
flow_slug,
23+
COALESCE(max_attempts, defaults.def_max_attempts),
24+
COALESCE(base_delay, defaults.def_base_delay),
25+
COALESCE(timeout, defaults.def_timeout)
26+
FROM defaults
1627
ON CONFLICT (flow_slug) DO UPDATE
1728
SET flow_slug = pgflow.flows.flow_slug -- Dummy update
1829
RETURNING *

pkgs/core/schemas/0100_function_create_flow_from_shape.sql

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
-- Compile a flow from a JSONB shape
22
-- Creates the flow and all its steps using existing create_flow/add_step functions
3+
-- Includes options from shape (NULL values = use default)
34
create or replace function pgflow._create_flow_from_shape(
45
p_flow_slug text,
56
p_shape jsonb
@@ -12,9 +13,19 @@ as $$
1213
DECLARE
1314
v_step jsonb;
1415
v_deps text[];
16+
v_flow_options jsonb;
17+
v_step_options jsonb;
1518
BEGIN
16-
-- Create the flow with defaults
17-
PERFORM pgflow.create_flow(p_flow_slug);
19+
-- Extract flow-level options (may be null)
20+
v_flow_options := p_shape->'options';
21+
22+
-- Create the flow with options (NULL = use default)
23+
PERFORM pgflow.create_flow(
24+
p_flow_slug,
25+
(v_flow_options->>'maxAttempts')::int,
26+
(v_flow_options->>'baseDelay')::int,
27+
(v_flow_options->>'timeout')::int
28+
);
1829

1930
-- Iterate over steps in order and add each one
2031
FOR v_step IN SELECT * FROM jsonb_array_elements(p_shape->'steps')
@@ -24,11 +35,18 @@ BEGIN
2435
INTO v_deps
2536
FROM jsonb_array_elements_text(COALESCE(v_step->'dependencies', '[]'::jsonb)) AS dep;
2637

27-
-- Add the step
38+
-- Extract step options (may be null)
39+
v_step_options := v_step->'options';
40+
41+
-- Add the step with options (NULL = use default/inherit)
2842
PERFORM pgflow.add_step(
2943
flow_slug => p_flow_slug,
3044
step_slug => v_step->>'slug',
3145
deps_slugs => v_deps,
46+
max_attempts => (v_step_options->>'maxAttempts')::int,
47+
base_delay => (v_step_options->>'baseDelay')::int,
48+
timeout => (v_step_options->>'timeout')::int,
49+
start_delay => (v_step_options->>'startDelay')::int,
3250
step_type => v_step->>'stepType'
3351
);
3452
END LOOP;
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
-- Modify "create_flow" function
2+
CREATE OR REPLACE FUNCTION "pgflow"."create_flow" ("flow_slug" text, "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer) RETURNS "pgflow"."flows" LANGUAGE sql SET "search_path" = '' AS $$
3+
WITH
4+
defaults AS (
5+
SELECT 3 AS def_max_attempts, 5 AS def_base_delay, 60 AS def_timeout
6+
),
7+
flow_upsert AS (
8+
INSERT INTO pgflow.flows (flow_slug, opt_max_attempts, opt_base_delay, opt_timeout)
9+
SELECT
10+
flow_slug,
11+
COALESCE(max_attempts, defaults.def_max_attempts),
12+
COALESCE(base_delay, defaults.def_base_delay),
13+
COALESCE(timeout, defaults.def_timeout)
14+
FROM defaults
15+
ON CONFLICT (flow_slug) DO UPDATE
16+
SET flow_slug = pgflow.flows.flow_slug -- Dummy update
17+
RETURNING *
18+
),
19+
ensure_queue AS (
20+
SELECT pgmq.create(flow_slug)
21+
WHERE NOT EXISTS (
22+
SELECT 1 FROM pgmq.list_queues() WHERE queue_name = flow_slug
23+
)
24+
)
25+
SELECT f.*
26+
FROM flow_upsert f
27+
LEFT JOIN (SELECT 1 FROM ensure_queue) _dummy ON true; -- Left join ensures flow is returned
28+
$$;
29+
-- Modify "_create_flow_from_shape" function
30+
CREATE OR REPLACE FUNCTION "pgflow"."_create_flow_from_shape" ("p_flow_slug" text, "p_shape" jsonb) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$
31+
DECLARE
32+
v_step jsonb;
33+
v_deps text[];
34+
v_flow_options jsonb;
35+
v_step_options jsonb;
36+
BEGIN
37+
-- Extract flow-level options (may be null)
38+
v_flow_options := p_shape->'options';
39+
40+
-- Create the flow with options (NULL = use default)
41+
PERFORM pgflow.create_flow(
42+
p_flow_slug,
43+
(v_flow_options->>'maxAttempts')::int,
44+
(v_flow_options->>'baseDelay')::int,
45+
(v_flow_options->>'timeout')::int
46+
);
47+
48+
-- Iterate over steps in order and add each one
49+
FOR v_step IN SELECT * FROM jsonb_array_elements(p_shape->'steps')
50+
LOOP
51+
-- Convert dependencies jsonb array to text array
52+
SELECT COALESCE(array_agg(dep), '{}')
53+
INTO v_deps
54+
FROM jsonb_array_elements_text(COALESCE(v_step->'dependencies', '[]'::jsonb)) AS dep;
55+
56+
-- Extract step options (may be null)
57+
v_step_options := v_step->'options';
58+
59+
-- Add the step with options (NULL = use default/inherit)
60+
PERFORM pgflow.add_step(
61+
flow_slug => p_flow_slug,
62+
step_slug => v_step->>'slug',
63+
deps_slugs => v_deps,
64+
max_attempts => (v_step_options->>'maxAttempts')::int,
65+
base_delay => (v_step_options->>'baseDelay')::int,
66+
timeout => (v_step_options->>'timeout')::int,
67+
start_delay => (v_step_options->>'startDelay')::int,
68+
step_type => v_step->>'stepType'
69+
);
70+
END LOOP;
71+
END;
72+
$$;

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:hsI4u+P4rwzRfYUEWgaWMGHQuhCCJcT12gVMHqHV830=
1+
h1:sszJnuW0bvBbhzmEAekpZN/kSi7ga04pjSepmJlgoYY=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -14,3 +14,4 @@ h1:hsI4u+P4rwzRfYUEWgaWMGHQuhCCJcT12gVMHqHV830=
1414
20251130011221_pgflow_temp_shape_utilities.sql h1:KzcP/xJjwfQ7BTbxdgaBzkfPztQcoUwuAmnZTBVqoIE=
1515
20251130012043_pgflow_temp_compilation_utilities.sql h1:Qn7RxYkbFd36hJYhOsuJdrcSlo8itqhmdAQLfmrP9+Y=
1616
20251130012803_pgflow_temp_ensure_flow_compiled.sql h1:RvuDNy53B03P5mzs9JUoVYMA725V6aCVoPSp59Gh9ko=
17+
20251130164844_pgflow_temp_options_in_shape.sql h1:lbMDdu15QiBElTsvl7g0dI7flvyjngK9g68VDnCE0S0=

pkgs/core/supabase/tests/create_flow/options.test.sql

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
begin;
2-
select plan(5);
2+
select plan(7);
33
select pgflow_tests.reset_db();
44

55
-- SETUP: flow with all default values
@@ -12,6 +12,24 @@ select results_eq(
1212
'Should create flow with default opt_max_attempts, opt_base_delay and opt_timeout'
1313
);
1414

15+
-- TEST: NULL parameters should use defaults (NULL = "use default" semantics)
16+
select pgflow.create_flow('test_flow_null', null, null, null);
17+
18+
select results_eq(
19+
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout FROM pgflow.flows WHERE flow_slug = 'test_flow_null' $$,
20+
$$ VALUES (3, 5, 60) $$,
21+
'NULL parameters should use defaults'
22+
);
23+
24+
-- TEST: Mixed NULL and explicit values
25+
select pgflow.create_flow('test_flow_mixed', max_attempts => 10, base_delay => null, timeout => 120);
26+
27+
select results_eq(
28+
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout FROM pgflow.flows WHERE flow_slug = 'test_flow_mixed' $$,
29+
$$ VALUES (10, 5, 120) $$,
30+
'NULL parameters should use defaults while explicit values are preserved'
31+
);
32+
1533
-- SETUP: flow with overriden max_attempts
1634
select pgflow.create_flow('test_flow_2', max_attempts => 10);
1735

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
begin;
2+
select plan(6);
3+
select pgflow_tests.reset_db();
4+
5+
-- Test: Compile a flow with flow-level options from shape
6+
select pgflow._create_flow_from_shape(
7+
'flow_with_options',
8+
'{
9+
"steps": [
10+
{"slug": "step1", "stepType": "single", "dependencies": []}
11+
],
12+
"options": {
13+
"maxAttempts": 5,
14+
"baseDelay": 10,
15+
"timeout": 120
16+
}
17+
}'::jsonb
18+
);
19+
20+
-- Verify flow options were applied
21+
select results_eq(
22+
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout FROM pgflow.flows WHERE flow_slug = 'flow_with_options' $$,
23+
$$ VALUES (5, 10, 120) $$,
24+
'Flow should have options from shape'
25+
);
26+
27+
-- Test: Compile a flow with step-level options from shape
28+
select pgflow._create_flow_from_shape(
29+
'flow_with_step_options',
30+
'{
31+
"steps": [
32+
{
33+
"slug": "step1",
34+
"stepType": "single",
35+
"dependencies": [],
36+
"options": {
37+
"maxAttempts": 7,
38+
"baseDelay": 15,
39+
"timeout": 90,
40+
"startDelay": 1000
41+
}
42+
}
43+
]
44+
}'::jsonb
45+
);
46+
47+
-- Verify step options were applied
48+
select results_eq(
49+
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay FROM pgflow.steps WHERE flow_slug = 'flow_with_step_options' $$,
50+
$$ VALUES (7, 15, 90, 1000) $$,
51+
'Step should have options from shape'
52+
);
53+
54+
-- Test: Compile a flow with no options (defaults should be used)
55+
select pgflow._create_flow_from_shape(
56+
'flow_no_options',
57+
'{
58+
"steps": [
59+
{"slug": "step1", "stepType": "single", "dependencies": []}
60+
]
61+
}'::jsonb
62+
);
63+
64+
-- Verify flow uses default options
65+
select results_eq(
66+
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout FROM pgflow.flows WHERE flow_slug = 'flow_no_options' $$,
67+
$$ VALUES (3, 5, 60) $$,
68+
'Flow without options in shape should use defaults'
69+
);
70+
71+
-- Verify step uses NULL options (inherits from flow)
72+
select results_eq(
73+
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay FROM pgflow.steps WHERE flow_slug = 'flow_no_options' $$,
74+
$$ VALUES (NULL::int, NULL::int, NULL::int, NULL::int) $$,
75+
'Step without options in shape should have NULL (inherit from flow)'
76+
);
77+
78+
-- Test: Compile with partial options (missing options should be NULL/default)
79+
select pgflow._create_flow_from_shape(
80+
'flow_partial_options',
81+
'{
82+
"steps": [
83+
{
84+
"slug": "step1",
85+
"stepType": "single",
86+
"dependencies": [],
87+
"options": {
88+
"timeout": 30
89+
}
90+
}
91+
],
92+
"options": {
93+
"maxAttempts": 10
94+
}
95+
}'::jsonb
96+
);
97+
98+
-- Verify partial flow options
99+
select results_eq(
100+
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout FROM pgflow.flows WHERE flow_slug = 'flow_partial_options' $$,
101+
$$ VALUES (10, 5, 60) $$,
102+
'Flow with partial options should use defaults for missing options'
103+
);
104+
105+
-- Verify partial step options
106+
select results_eq(
107+
$$ SELECT opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay FROM pgflow.steps WHERE flow_slug = 'flow_partial_options' $$,
108+
$$ VALUES (NULL::int, NULL::int, 30, NULL::int) $$,
109+
'Step with partial options should have NULL for missing options'
110+
);
111+
112+
select finish();
113+
rollback;

0 commit comments

Comments
 (0)