Skip to content

Commit 8c77a83

Browse files
authored
feat: make flowInput lazy-loaded to prevent data duplication in map steps (#560)
# Asymmetric Handler Signatures for Flow Composition This PR implements asymmetric handler signatures to simplify flow composition and improve performance: - Root steps now receive flow input directly: `(flowInput, ctx) => ...` - Dependent steps receive only dependency outputs: `(deps, ctx) => ...` - Flow input in dependent steps is now accessed via `await ctx.flowInput` (async/lazy-loaded) - Lazy loading prevents data duplication for map steps processing large arrays - Only root non-map steps receive flow_input from SQL to reduce data transfer - Added FlowInputProvider class to handle caching and lazy loading of flow input - Updated examples, tests, and documentation to reflect the new pattern This change enables functional composition and simplifies types for future subflows by making handler signatures more intuitive and reducing unnecessary data duplication.
1 parent 37402eb commit 8c77a83

29 files changed

+509
-108
lines changed

.changeset/asymmetric-handler-signatures.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@ BREAKING: Asymmetric handler signatures - remove `run` key from step inputs
1010

1111
- Root steps: `(flowInput, ctx) => ...` - flow input directly as first param
1212
- Dependent steps: `(deps, ctx) => ...` - only dependency outputs as first param
13-
- Access flow input in dependent steps via `ctx.flowInput`
13+
- Access flow input in dependent steps via `await ctx.flowInput` (async/lazy-loaded)
14+
- Lazy loading prevents data duplication for map steps processing large arrays
1415
- Enables functional composition and simplifies types for future subflows

pkgs/cli/examples/analyze_website.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ const AnalyzeWebsite = new Flow<WebsiteAnalysisInput>({
5454
.step(
5555
{ slug: 'saveToDb', dependsOn: ['sentiment', 'summary'] },
5656
async (_deps, ctx) => {
57-
console.log(`Saving results to database for: ${ctx.flowInput.url}`);
57+
const flowInput = await ctx.flowInput;
58+
console.log(`Saving results to database for: ${flowInput.url}`);
5859
// In a real implementation, this would save to a database
5960
return {
6061
status: 'success',

pkgs/cli/src/commands/install/create-example-worker.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,28 @@ import { log, confirm } from '@clack/prompts';
44
import chalk from 'chalk';
55
import { getVersion } from '../../utils/get-version.js';
66

7-
const INDEX_TS_TEMPLATE = `import { EdgeWorker } from '@pgflow/edge-worker';
7+
const INDEX_TS_TEMPLATE = `/**
8+
* To run this worker locally:
9+
*
10+
* 1. Start the Edge Runtime (in one terminal):
11+
* npx supabase functions serve --no-verify-jwt
12+
*
13+
* 2. Start the worker (in another terminal):
14+
* curl http://localhost:54321/functions/v1/greet-user-worker
15+
*
16+
* 3. Trigger a flow run (in Supabase Studio SQL Editor):
17+
* SELECT * FROM pgflow.start_flow(
18+
* flow_slug => 'greetUser',
19+
* input => '{"firstName": "Alice", "lastName": "Smith"}'::jsonb
20+
* );
21+
*
22+
* 4. Check run status:
23+
* SELECT * FROM pgflow.runs
24+
* WHERE flow_slug = 'greetUser'
25+
* ORDER BY started_at DESC
26+
* LIMIT 1;
27+
*/
28+
import { EdgeWorker } from '@pgflow/edge-worker';
829
import { GreetUser } from '../../flows/greet-user.ts';
930
1031
EdgeWorker.start(GreetUser);

pkgs/client/__tests__/e2e/full-stack-dsl.test.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ describe('Full Stack DSL Integration', () => {
111111
tasks = await readAndStart(sql, sqlClient, SimpleFlow.slug, 1, 5);
112112
expect(tasks).toHaveLength(1);
113113
expect(tasks[0].step_slug).toBe('process');
114-
expect(tasks[0].flow_input).toEqual(input);
114+
// Dependent steps have null flow_input (lazy loaded via ctx.flowInput)
115+
expect(tasks[0].flow_input).toBeNull();
115116
expect(tasks[0].input.fetch).toEqual(fetchOutput); // Critical: dependency output included
116117

117118
const processOutput = {
@@ -132,7 +133,8 @@ describe('Full Stack DSL Integration', () => {
132133
tasks = await readAndStart(sql, sqlClient, SimpleFlow.slug, 1, 5);
133134
expect(tasks).toHaveLength(1);
134135
expect(tasks[0].step_slug).toBe('save');
135-
expect(tasks[0].flow_input).toEqual(input);
136+
// Dependent steps have null flow_input (lazy loaded via ctx.flowInput)
137+
expect(tasks[0].flow_input).toBeNull();
136138

137139
// The save step only depends on process, so it should only have process output
138140
// This is correct behavior - transitive dependencies are not automatically included

pkgs/client/__tests__/e2e/happy-path-e2e.test.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ describe('Happy Path E2E Integration', () => {
8888
tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
8989
expect(tasks).toHaveLength(1);
9090
expect(tasks[0].step_slug).toBe('process');
91-
expect(tasks[0].flow_input).toEqual(input);
91+
// Dependent steps have null flow_input (lazy loaded via ctx.flowInput)
92+
expect(tasks[0].flow_input).toBeNull();
9293
expect(tasks[0].input.fetch).toEqual(fetchOutput);
9394

9495
const processOutput = {
@@ -127,7 +128,8 @@ describe('Happy Path E2E Integration', () => {
127128

128129
log('Save task input:', JSON.stringify(tasks[0].input, null, 2));
129130

130-
expect(tasks[0].flow_input).toEqual(input);
131+
// Dependent steps have null flow_input (lazy loaded via ctx.flowInput)
132+
expect(tasks[0].flow_input).toBeNull();
131133
expect(tasks[0].input.process).toEqual(processOutput);
132134
// Note: save step only depends on process, so fetch output is not included (correct behavior)
133135
expect(tasks[0].input.fetch).toBeUndefined();

pkgs/core/schemas/0120_function_start_tasks.sql

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,14 @@ as $$
172172
st.message_id as msg_id,
173173
st.task_index as task_index,
174174
-- flow_input: Original run input for worker context
175-
-- This allows workers to access the original flow input directly
176-
-- without parsing it from the constructed 'input' field
177-
r.input as flow_input
175+
-- Only included for root non-map steps to avoid data duplication.
176+
-- Root map steps: flowInput IS the array, useless to include
177+
-- Dependent steps: lazy load via ctx.flowInput when needed
178+
CASE
179+
WHEN step.step_type != 'map' AND step.deps_count = 0
180+
THEN r.input
181+
ELSE NULL
182+
END as flow_input
178183
from tasks st
179184
join runs r on st.run_id = r.run_id
180185
join pgflow.steps step on

pkgs/core/src/types.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ export type { Json };
1616
* Same as pgflow.step_task_record type, but with not-null fields and type argument for payload.
1717
* The input type is automatically inferred based on the step_slug using a discriminated union.
1818
* This ensures that each step only receives inputs from its declared dependencies and the flow's run input.
19+
*
20+
* Note: flow_input is nullable because start_tasks only includes it for root non-map steps.
21+
* For dependent and map steps, flow_input is NULL to avoid data duplication.
22+
* Workers can access the original flow input via ctx.flowInput (lazy loaded).
1923
*/
2024
export type StepTaskRecord<TFlow extends AnyFlow> = {
2125
[StepSlug in Extract<keyof ExtractFlowSteps<TFlow>, string>]: {
@@ -25,7 +29,7 @@ export type StepTaskRecord<TFlow extends AnyFlow> = {
2529
task_index: number;
2630
input: Simplify<StepInput<TFlow, StepSlug>>;
2731
msg_id: number;
28-
flow_input: ExtractFlowInput<TFlow>;
32+
flow_input: ExtractFlowInput<TFlow> | null;
2933
};
3034
}[Extract<keyof ExtractFlowSteps<TFlow>, string>];
3135

pkgs/core/supabase/migrations/20251223110504_pgflow_add_flow_input_column.sql renamed to pkgs/core/supabase/migrations/20251225163110_pgflow_add_flow_input_column.sql

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,14 @@ with tasks as (
166166
st.message_id as msg_id,
167167
st.task_index as task_index,
168168
-- flow_input: Original run input for worker context
169-
-- This allows workers to access the original flow input directly
170-
-- without parsing it from the constructed 'input' field
171-
r.input as flow_input
169+
-- Only included for root non-map steps to avoid data duplication.
170+
-- Root map steps: flowInput IS the array, useless to include
171+
-- Dependent steps: lazy load via ctx.flowInput when needed
172+
CASE
173+
WHEN step.step_type != 'map' AND step.deps_count = 0
174+
THEN r.input
175+
ELSE NULL
176+
END as flow_input
172177
from tasks st
173178
join runs r on st.run_id = r.run_id
174179
join pgflow.steps step on

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:32eppXsursErjjXT7SfNzyRPfw4iiiL1Pnafyy4DSvk=
1+
h1:SkrHj8RTAbqZRxs3/TvkZ0FV1l9SeNX2Cxba02DAPAk=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -14,4 +14,4 @@ h1:32eppXsursErjjXT7SfNzyRPfw4iiiL1Pnafyy4DSvk=
1414
20251130000000_pgflow_auto_compilation.sql h1:qs+3qq1Vsyo0ETzbxDnmkVtSUa6XHkd/K9wF/3W46jM=
1515
20251209074533_pgflow_worker_management.sql h1:ozFkYM1EvEH7dGvW+1pWgpzbwdWlwuQoddqLomL1GXw=
1616
20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ=
17-
20251223110504_pgflow_add_flow_input_column.sql h1:HJ9GJc4xh68JT82JINf946RS/jQNt9jDhvIH4VA40c8=
17+
20251225163110_pgflow_add_flow_input_column.sql h1:734uCbTgKmPhTK3TY56uNYZ31T8u59yll9ea7nwtEoc=
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
begin;
2+
select plan(6);
3+
select pgflow_tests.reset_db();
4+
5+
-- =========================================================================
6+
-- Test: Conditional flow_input in start_tasks
7+
--
8+
-- Only root non-map steps receive flow_input.
9+
-- All other step types (root map, dependent non-map, dependent map) get NULL.
10+
-- This optimization prevents data duplication for large array processing.
11+
-- =========================================================================
12+
13+
-- Test 1: Root non-map step receives flow_input
14+
select pgflow.create_flow('root_step_flow');
15+
select pgflow.add_step('root_step_flow', 'root_step');
16+
select pgflow.start_flow('root_step_flow', '{"user_id": "abc123"}'::jsonb);
17+
18+
select pgflow_tests.ensure_worker('root_step_flow');
19+
20+
with msgs as (
21+
select * from pgmq.read_with_poll('root_step_flow', 10, 5, 1, 50) limit 1
22+
),
23+
msg_ids as (
24+
select array_agg(msg_id) as ids from msgs
25+
),
26+
started_tasks as (
27+
select * from pgflow.start_tasks(
28+
'root_step_flow',
29+
(select ids from msg_ids),
30+
'11111111-1111-1111-1111-111111111111'::uuid
31+
)
32+
)
33+
select is(
34+
(select flow_input from started_tasks),
35+
'{"user_id": "abc123"}'::jsonb,
36+
'Root non-map step should receive flow_input'
37+
);
38+
39+
-- Test 2: Dependent non-map step receives NULL flow_input
40+
select pgflow_tests.reset_db();
41+
select pgflow.create_flow('dep_flow');
42+
select pgflow.add_step('dep_flow', 'first');
43+
select pgflow.add_step('dep_flow', 'second', ARRAY['first']);
44+
select pgflow.start_flow('dep_flow', '{"original": "input"}'::jsonb);
45+
46+
select pgflow_tests.ensure_worker('dep_flow');
47+
48+
-- Complete first step
49+
with poll_result as (
50+
select * from pgflow_tests.read_and_start('dep_flow', 1, 1)
51+
)
52+
select pgflow.complete_task(
53+
run_id,
54+
step_slug,
55+
task_index,
56+
'{"first_result": "done"}'::jsonb
57+
) from poll_result;
58+
59+
-- Start second step and verify flow_input is NULL
60+
select pgflow_tests.ensure_worker('dep_flow', '22222222-2222-2222-2222-222222222222'::uuid);
61+
with msgs as (
62+
select * from pgmq.read_with_poll('dep_flow', 10, 5, 1, 50) limit 1
63+
),
64+
msg_ids as (
65+
select array_agg(msg_id) as ids from msgs
66+
),
67+
started_tasks as (
68+
select * from pgflow.start_tasks(
69+
'dep_flow',
70+
(select ids from msg_ids),
71+
'22222222-2222-2222-2222-222222222222'::uuid
72+
)
73+
)
74+
select is(
75+
(select flow_input from started_tasks),
76+
NULL::jsonb,
77+
'Dependent non-map step should receive NULL flow_input (lazy loaded)'
78+
);
79+
80+
-- Test 3: Root map step receives NULL flow_input
81+
-- (flowInput IS the array, useless to include - workers get element via task.input)
82+
select pgflow_tests.reset_db();
83+
select pgflow.create_flow('root_map_flow');
84+
select pgflow.add_step('root_map_flow', 'map_step', '{}', null, null, null, null, 'map');
85+
select pgflow.start_flow('root_map_flow', '[1, 2, 3]'::jsonb);
86+
87+
select pgflow_tests.ensure_worker('root_map_flow');
88+
89+
with msgs as (
90+
select * from pgmq.read_with_poll('root_map_flow', 10, 5, 3, 50) limit 1
91+
),
92+
msg_ids as (
93+
select array_agg(msg_id) as ids from msgs
94+
),
95+
started_tasks as (
96+
select * from pgflow.start_tasks(
97+
'root_map_flow',
98+
(select ids from msg_ids),
99+
'11111111-1111-1111-1111-111111111111'::uuid
100+
)
101+
)
102+
select is(
103+
(select flow_input from started_tasks limit 1),
104+
NULL::jsonb,
105+
'Root map step should receive NULL flow_input (flowInput is the array itself)'
106+
);
107+
108+
-- Test 4: Dependent map step receives NULL flow_input
109+
select pgflow_tests.reset_db();
110+
select pgflow.create_flow('dep_map_flow');
111+
select pgflow.add_step('dep_map_flow', 'fetch_items', '{}', null, null, null, null, 'single');
112+
select pgflow.add_step('dep_map_flow', 'process_item', ARRAY['fetch_items'], null, null, null, null, 'map');
113+
select pgflow.start_flow('dep_map_flow', '{"config": "value"}'::jsonb);
114+
115+
select pgflow_tests.ensure_worker('dep_map_flow');
116+
117+
-- Complete first step with array
118+
with poll_result as (
119+
select * from pgflow_tests.read_and_start('dep_map_flow', 1, 1)
120+
)
121+
select pgflow.complete_task(
122+
run_id,
123+
step_slug,
124+
task_index,
125+
'["a", "b", "c"]'::jsonb
126+
) from poll_result;
127+
128+
-- Start map step and verify flow_input is NULL
129+
select pgflow_tests.ensure_worker('dep_map_flow', '33333333-3333-3333-3333-333333333333'::uuid);
130+
with msgs as (
131+
select * from pgmq.read_with_poll('dep_map_flow', 10, 5, 3, 50) limit 1
132+
),
133+
msg_ids as (
134+
select array_agg(msg_id) as ids from msgs
135+
),
136+
started_tasks as (
137+
select * from pgflow.start_tasks(
138+
'dep_map_flow',
139+
(select ids from msg_ids),
140+
'33333333-3333-3333-3333-333333333333'::uuid
141+
)
142+
)
143+
select is(
144+
(select flow_input from started_tasks limit 1),
145+
NULL::jsonb,
146+
'Dependent map step should receive NULL flow_input (lazy loaded)'
147+
);
148+
149+
-- Test 5: Multiple parallel root non-map steps all receive flow_input
150+
select pgflow_tests.reset_db();
151+
select pgflow.create_flow('parallel_flow');
152+
select pgflow.add_step('parallel_flow', 'step1');
153+
select pgflow.add_step('parallel_flow', 'step2'); -- parallel step, no deps
154+
select pgflow.start_flow('parallel_flow', '{"batch": "test"}'::jsonb);
155+
156+
select pgflow_tests.ensure_worker('parallel_flow');
157+
158+
with msgs as (
159+
select * from pgmq.read_with_poll('parallel_flow', 10, 5, 2, 50)
160+
),
161+
msg_ids as (
162+
select array_agg(msg_id) as ids from msgs
163+
),
164+
started_tasks as (
165+
select * from pgflow.start_tasks(
166+
'parallel_flow',
167+
(select ids from msg_ids),
168+
'11111111-1111-1111-1111-111111111111'::uuid
169+
)
170+
)
171+
select ok(
172+
(select bool_and(flow_input = '{"batch": "test"}'::jsonb) from started_tasks),
173+
'All parallel root non-map steps should receive flow_input'
174+
);
175+
176+
-- Test 6: Mixed batch - only root non-map steps receive flow_input
177+
-- This tests a complex scenario with different step types in same batch
178+
select pgflow_tests.reset_db();
179+
select pgflow.create_flow('mixed_flow');
180+
select pgflow.add_step('mixed_flow', 'root_single'); -- root non-map
181+
select pgflow.add_step('mixed_flow', 'root_array'); -- root non-map (array is just single that returns array)
182+
select pgflow.start_flow('mixed_flow', '{"data": "value"}'::jsonb);
183+
184+
select pgflow_tests.ensure_worker('mixed_flow');
185+
186+
-- Both are root non-map steps, both should get flow_input
187+
with msgs as (
188+
select * from pgmq.read_with_poll('mixed_flow', 10, 5, 2, 50)
189+
),
190+
msg_ids as (
191+
select array_agg(msg_id) as ids from msgs
192+
),
193+
started_tasks as (
194+
select * from pgflow.start_tasks(
195+
'mixed_flow',
196+
(select ids from msg_ids),
197+
'11111111-1111-1111-1111-111111111111'::uuid
198+
)
199+
)
200+
select is(
201+
(select count(*)::int from started_tasks where flow_input is not null),
202+
2,
203+
'Both root non-map steps should receive non-null flow_input'
204+
);
205+
206+
select finish();
207+
rollback;

0 commit comments

Comments
 (0)