Skip to content

Commit 55a0824

Browse files
authored
feat: add flow_input field to step_task_record for direct access to original input (#554)
# Add flow_input field to step_task_record for direct access to original run input This PR adds a new `flow_input` field to the `step_task_record` type, which provides workers direct access to the original flow input without having to parse it from the constructed `input` field. Key changes: - Added `flow_input` attribute to the `pgflow.step_task_record` composite type - Updated the `start_tasks` function to populate this field with the original run input - Added database migration script to apply these changes - Added tests to verify the `flow_input` field is correctly populated for: - Root steps - Dependent steps - Map steps (all tasks receive the same original array) - Multiple tasks in a batch This enhancement simplifies worker implementation by providing direct access to the original flow input, which is particularly useful for context that needs to be available to all steps.
1 parent d7e77fd commit 55a0824

6 files changed

Lines changed: 332 additions & 3 deletions

File tree

pkgs/core/schemas/0040_types.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@ create type pgflow.step_task_record as (
55
step_slug text,
66
input jsonb,
77
msg_id bigint,
8-
task_index int
8+
task_index int,
9+
flow_input jsonb
910
);

pkgs/core/schemas/0120_function_start_tasks.sql

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,11 @@ as $$
172172
coalesce(dep_out.deps_output, '{}'::jsonb)
173173
END as input,
174174
st.message_id as msg_id,
175-
st.task_index as task_index
175+
st.task_index as task_index,
176+
-- flow_input: Original run input for worker context
177+
-- This allows workers to access the original flow input directly
178+
-- without parsing it from the constructed 'input' field
179+
r.input as flow_input
176180
from tasks st
177181
join runs r on st.run_id = r.run_id
178182
join pgflow.steps step on

pkgs/core/src/database-types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,7 @@ export type Database = {
613613
input: Json | null
614614
msg_id: number | null
615615
task_index: number | null
616+
flow_input: Json | null
616617
}
617618
}
618619
}
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
-- Modify "step_task_record" composite type
2+
ALTER TYPE "pgflow"."step_task_record" ADD ATTRIBUTE "flow_input" jsonb;
3+
-- Modify "start_tasks" function
4+
CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$
5+
with tasks as (
6+
select
7+
task.flow_slug,
8+
task.run_id,
9+
task.step_slug,
10+
task.task_index,
11+
task.message_id
12+
from pgflow.step_tasks as task
13+
join pgflow.runs r on r.run_id = task.run_id
14+
where task.flow_slug = start_tasks.flow_slug
15+
and task.message_id = any(msg_ids)
16+
and task.status = 'queued'
17+
-- MVP: Don't start tasks on failed runs
18+
and r.status != 'failed'
19+
),
20+
start_tasks_update as (
21+
update pgflow.step_tasks
22+
set
23+
attempts_count = attempts_count + 1,
24+
status = 'started',
25+
started_at = now(),
26+
last_worker_id = worker_id
27+
from tasks
28+
where step_tasks.message_id = tasks.message_id
29+
and step_tasks.flow_slug = tasks.flow_slug
30+
and step_tasks.status = 'queued'
31+
),
32+
runs as (
33+
select
34+
r.run_id,
35+
r.input
36+
from pgflow.runs r
37+
where r.run_id in (select run_id from tasks)
38+
),
39+
deps as (
40+
select
41+
st.run_id,
42+
st.step_slug,
43+
dep.dep_slug,
44+
-- Aggregate map outputs or use single output
45+
CASE
46+
WHEN dep_step.step_type = 'map' THEN
47+
-- Aggregate all task outputs ordered by task_index
48+
-- Use COALESCE to return empty array if no tasks
49+
(SELECT COALESCE(jsonb_agg(dt.output ORDER BY dt.task_index), '[]'::jsonb)
50+
FROM pgflow.step_tasks dt
51+
WHERE dt.run_id = st.run_id
52+
AND dt.step_slug = dep.dep_slug
53+
AND dt.status = 'completed')
54+
ELSE
55+
-- Single step: use the single task output
56+
dep_task.output
57+
END as dep_output
58+
from tasks st
59+
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
60+
join pgflow.steps dep_step on dep_step.flow_slug = dep.flow_slug and dep_step.step_slug = dep.dep_slug
61+
left join pgflow.step_tasks dep_task on
62+
dep_task.run_id = st.run_id and
63+
dep_task.step_slug = dep.dep_slug and
64+
dep_task.status = 'completed'
65+
and dep_step.step_type = 'single' -- Only join for single steps
66+
),
67+
deps_outputs as (
68+
select
69+
d.run_id,
70+
d.step_slug,
71+
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output,
72+
count(*) as dep_count
73+
from deps d
74+
group by d.run_id, d.step_slug
75+
),
76+
timeouts as (
77+
select
78+
task.message_id,
79+
task.flow_slug,
80+
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
81+
from tasks task
82+
join pgflow.flows flow on flow.flow_slug = task.flow_slug
83+
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
84+
),
85+
-- Batch update visibility timeouts for all messages
86+
set_vt_batch as (
87+
select pgflow.set_vt_batch(
88+
start_tasks.flow_slug,
89+
array_agg(t.message_id order by t.message_id),
90+
array_agg(t.vt_delay order by t.message_id)
91+
)
92+
from timeouts t
93+
)
94+
select
95+
st.flow_slug,
96+
st.run_id,
97+
st.step_slug,
98+
-- ==========================================
99+
-- INPUT CONSTRUCTION LOGIC
100+
-- ==========================================
101+
-- This nested CASE statement determines how to construct the input
102+
-- for each task based on the step type (map vs non-map).
103+
--
104+
-- The fundamental difference:
105+
-- - Map steps: Receive RAW array elements (e.g., just 42 or "hello")
106+
-- - Non-map steps: Receive structured objects with named keys
107+
-- (e.g., {"run": {...}, "dependency1": {...}})
108+
-- ==========================================
109+
CASE
110+
-- -------------------- MAP STEPS --------------------
111+
-- Map steps process arrays element-by-element.
112+
-- Each task receives ONE element from the array at its task_index position.
113+
WHEN step.step_type = 'map' THEN
114+
-- Map steps get raw array elements without any wrapper object
115+
CASE
116+
-- ROOT MAP: Gets array from run input
117+
-- Example: run input = [1, 2, 3]
118+
-- task 0 gets: 1
119+
-- task 1 gets: 2
120+
-- task 2 gets: 3
121+
WHEN step.deps_count = 0 THEN
122+
-- Root map (deps_count = 0): no dependencies, reads from run input.
123+
-- Extract the element at task_index from the run's input array.
124+
-- Note: If run input is not an array, this will return NULL
125+
-- and the flow will fail (validated in start_flow).
126+
jsonb_array_element(r.input, st.task_index)
127+
128+
-- DEPENDENT MAP: Gets array from its single dependency
129+
-- Example: dependency output = ["a", "b", "c"]
130+
-- task 0 gets: "a"
131+
-- task 1 gets: "b"
132+
-- task 2 gets: "c"
133+
ELSE
134+
-- Has dependencies (should be exactly 1 for map steps).
135+
-- Extract the element at task_index from the dependency's output array.
136+
--
137+
-- Why the subquery with jsonb_each?
138+
-- - The dependency outputs a raw array: [1, 2, 3]
139+
-- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]}
140+
-- - We need to unwrap and get just the array value
141+
-- - Map steps have exactly 1 dependency (enforced by add_step)
142+
-- - So jsonb_each will return exactly 1 row
143+
-- - We extract the 'value' which is the raw array [1, 2, 3]
144+
-- - Then get the element at task_index from that array
145+
(SELECT jsonb_array_element(value, st.task_index)
146+
FROM jsonb_each(dep_out.deps_output)
147+
LIMIT 1)
148+
END
149+
150+
-- -------------------- NON-MAP STEPS --------------------
151+
-- Regular (non-map) steps receive ALL inputs as a structured object.
152+
-- This includes the original run input plus all dependency outputs.
153+
ELSE
154+
-- Non-map steps get structured input with named keys
155+
-- Example output: {
156+
-- "run": {"original": "input"},
157+
-- "step1": {"output": "from_step1"},
158+
-- "step2": {"output": "from_step2"}
159+
-- }
160+
--
161+
-- Build object with 'run' key containing original input
162+
jsonb_build_object('run', r.input) ||
163+
-- Merge with deps_output which already has dependency outputs
164+
-- deps_output format: {"dep1": output1, "dep2": output2, ...}
165+
-- If no dependencies, defaults to empty object
166+
coalesce(dep_out.deps_output, '{}'::jsonb)
167+
END as input,
168+
st.message_id as msg_id,
169+
st.task_index as task_index,
170+
-- flow_input: Original run input for worker context
171+
-- This allows workers to access the original flow input directly
172+
-- without parsing it from the constructed 'input' field
173+
r.input as flow_input
174+
from tasks st
175+
join runs r on st.run_id = r.run_id
176+
join pgflow.steps step on
177+
step.flow_slug = st.flow_slug and
178+
step.step_slug = st.step_slug
179+
left join deps_outputs dep_out on
180+
dep_out.run_id = st.run_id and
181+
dep_out.step_slug = st.step_slug
182+
$$;

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:yUEa4QnI0kEuEtxWNiNgRgf9GENvG1FkUmzwlu0vLm0=
1+
h1:nPCLpuNY7xbRl3lNa/ns7GG1Uf3twA0VSK7DIVBhzsc=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -14,3 +14,4 @@ h1:yUEa4QnI0kEuEtxWNiNgRgf9GENvG1FkUmzwlu0vLm0=
1414
20251130000000_pgflow_auto_compilation.sql h1:qs+3qq1Vsyo0ETzbxDnmkVtSUa6XHkd/K9wF/3W46jM=
1515
20251209074533_pgflow_worker_management.sql h1:ozFkYM1EvEH7dGvW+1pWgpzbwdWlwuQoddqLomL1GXw=
1616
20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ=
17+
20251223052347_pgflow_add_flow_input_column.sql h1:RwQBPOMml4cCR6qeqd5kVQKhxXAbFqkHEJa0ruXYiTo=
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
begin;
2+
select plan(5);
3+
select pgflow_tests.reset_db();
4+
5+
-- Test 1: Root step returns flow_input matching original run input
6+
select pgflow.create_flow('simple_flow');
7+
select pgflow.add_step('simple_flow', 'root_step');
8+
select pgflow.start_flow('simple_flow', '{"user_id": "abc123", "config": {"debug": true}}'::jsonb);
9+
10+
select pgflow_tests.ensure_worker('simple_flow');
11+
12+
with msgs as (
13+
select * from pgmq.read_with_poll('simple_flow', 10, 5, 1, 50) limit 1
14+
),
15+
msg_ids as (
16+
select array_agg(msg_id) as ids from msgs
17+
),
18+
started_tasks as (
19+
select * from pgflow.start_tasks(
20+
'simple_flow',
21+
(select ids from msg_ids),
22+
'11111111-1111-1111-1111-111111111111'::uuid
23+
)
24+
)
25+
select is(
26+
(select flow_input from started_tasks),
27+
'{"user_id": "abc123", "config": {"debug": true}}'::jsonb,
28+
'Root step flow_input should match original run input'
29+
);
30+
31+
-- Test 2: Dependent step also returns flow_input
32+
select pgflow_tests.reset_db();
33+
select pgflow.create_flow('dep_flow');
34+
select pgflow.add_step('dep_flow', 'first');
35+
select pgflow.add_step('dep_flow', 'second', ARRAY['first']);
36+
select pgflow.start_flow('dep_flow', '{"original": "input"}'::jsonb);
37+
38+
select pgflow_tests.ensure_worker('dep_flow');
39+
40+
-- Complete first step
41+
with poll_result as (
42+
select * from pgflow_tests.read_and_start('dep_flow', 1, 1)
43+
)
44+
select pgflow.complete_task(
45+
run_id,
46+
step_slug,
47+
task_index,
48+
'{"first_result": "done"}'::jsonb
49+
) from poll_result;
50+
51+
-- Start second step and verify flow_input
52+
select pgflow_tests.ensure_worker('dep_flow', '22222222-2222-2222-2222-222222222222'::uuid);
53+
with msgs as (
54+
select * from pgmq.read_with_poll('dep_flow', 10, 5, 1, 50) limit 1
55+
),
56+
msg_ids as (
57+
select array_agg(msg_id) as ids from msgs
58+
),
59+
started_tasks as (
60+
select * from pgflow.start_tasks(
61+
'dep_flow',
62+
(select ids from msg_ids),
63+
'22222222-2222-2222-2222-222222222222'::uuid
64+
)
65+
)
66+
select is(
67+
(select flow_input from started_tasks),
68+
'{"original": "input"}'::jsonb,
69+
'Dependent step flow_input should match original run input'
70+
);
71+
72+
-- Tests 3 & 4: Root map step returns flow_input (the array)
73+
-- All map tasks should have same flow_input and it should be the original array
74+
select pgflow_tests.reset_db();
75+
select pgflow.create_flow('map_flow');
76+
select pgflow.add_step('map_flow', 'map_step', '{}', null, null, null, null, 'map');
77+
select pgflow.start_flow('map_flow', '[1, 2, 3]'::jsonb);
78+
79+
select pgflow_tests.ensure_worker('map_flow');
80+
81+
-- Save map tasks to temp table so we can run multiple assertions
82+
create temp table map_started_tasks as
83+
with msgs as (
84+
select * from pgmq.read_with_poll('map_flow', 10, 5, 3, 50) order by msg_id
85+
),
86+
msg_ids as (
87+
select array_agg(msg_id order by msg_id) as ids from msgs
88+
)
89+
select * from pgflow.start_tasks(
90+
'map_flow',
91+
(select ids from msg_ids),
92+
'11111111-1111-1111-1111-111111111111'::uuid
93+
);
94+
95+
-- Test 3: All map tasks should have same flow_input
96+
select is(
97+
(select count(distinct flow_input)::int from map_started_tasks),
98+
1,
99+
'All map tasks should have same flow_input'
100+
);
101+
102+
-- Test 4: Map task flow_input is the original array, not the element
103+
select is(
104+
(select flow_input from map_started_tasks limit 1),
105+
'[1, 2, 3]'::jsonb,
106+
'Map task flow_input should be original array, not individual element'
107+
);
108+
109+
drop table map_started_tasks;
110+
111+
-- Test 5: Multiple tasks in batch all have correct flow_input
112+
select pgflow_tests.reset_db();
113+
select pgflow.create_flow('multi_flow');
114+
select pgflow.add_step('multi_flow', 'step1');
115+
select pgflow.add_step('multi_flow', 'step2'); -- parallel step, no deps
116+
select pgflow.start_flow('multi_flow', '{"batch": "test"}'::jsonb);
117+
118+
select pgflow_tests.ensure_worker('multi_flow');
119+
120+
-- Both tasks should be queued, read both
121+
with msgs as (
122+
select * from pgmq.read_with_poll('multi_flow', 10, 5, 2, 50)
123+
),
124+
msg_ids as (
125+
select array_agg(msg_id) as ids from msgs
126+
),
127+
started_tasks as (
128+
select * from pgflow.start_tasks(
129+
'multi_flow',
130+
(select ids from msg_ids),
131+
'11111111-1111-1111-1111-111111111111'::uuid
132+
)
133+
)
134+
select ok(
135+
(select bool_and(flow_input = '{"batch": "test"}'::jsonb) from started_tasks),
136+
'All tasks in batch should have correct flow_input'
137+
);
138+
139+
select finish();
140+
rollback;

0 commit comments

Comments
 (0)