Skip to content

Commit 05738ed

Browse files
authored
refactor: optimize step output storage and aggregation (#569)
# Optimize Step Output Storage for Improved Performance This PR changes how step outputs are stored and accessed in PGFlow to improve performance and reduce redundant data processing: 1. Store aggregated outputs directly in the `step_states` table instead of recalculating them from individual task outputs 2. Add a new `output` column to `step_states` with a constraint ensuring it's only populated for completed steps 3. Implement data backfill to populate the `output` field for existing completed steps: - For single steps: store the task output directly - For map steps: aggregate task outputs into an array 4. Update functions to use the pre-aggregated outputs: - `maybe_complete_run`: Use step_states.output directly instead of recalculating - `start_tasks`: Read dependency outputs from step_states instead of aggregating from tasks This approach eliminates expensive output aggregation operations that were previously performed repeatedly, making the system more efficient.
1 parent 6cf2515 commit 05738ed

File tree

7 files changed

+688
-63
lines changed

7 files changed

+688
-63
lines changed

.changeset/step-output-storage.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@pgflow/core': minor
3+
---
4+
5+
Performance: Store step outputs atomically for 2x faster downstream task startup
6+
7+
Step outputs are now stored in step_states.output when steps complete, eliminating expensive aggregation queries. Benchmarks show 2.17x improvement for Map->Map chains. Includes data migration to backfill existing completed steps.

pkgs/core/schemas/0100_function_maybe_complete_run.sql

Lines changed: 12 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,51 +15,22 @@ begin
1515
SET
1616
status = 'completed',
1717
completed_at = now(),
18-
-- Only compute expensive aggregation when actually completing the run
18+
-- Gather outputs from leaf steps (already stored in step_states.output by writers)
1919
output = (
20-
-- ---------- Gather outputs from leaf steps ----------
2120
-- Leaf steps = steps with no dependents
22-
-- For map steps: aggregate all task outputs into array
23-
-- For single steps: use the single task output
2421
SELECT jsonb_object_agg(
25-
step_slug,
26-
CASE
27-
WHEN step_type = 'map' THEN aggregated_output
28-
ELSE single_output
29-
END
22+
leaf_state.step_slug,
23+
leaf_state.output -- Already aggregated by writers
3024
)
31-
FROM (
32-
SELECT DISTINCT
33-
leaf_state.step_slug,
34-
leaf_step.step_type,
35-
-- For map steps: aggregate all task outputs
36-
CASE WHEN leaf_step.step_type = 'map' THEN
37-
(SELECT COALESCE(jsonb_agg(leaf_task.output ORDER BY leaf_task.task_index), '[]'::jsonb)
38-
FROM pgflow.step_tasks leaf_task
39-
WHERE leaf_task.run_id = leaf_state.run_id
40-
AND leaf_task.step_slug = leaf_state.step_slug
41-
AND leaf_task.status = 'completed')
42-
END as aggregated_output,
43-
-- For single steps: get the single output
44-
CASE WHEN leaf_step.step_type = 'single' THEN
45-
(SELECT leaf_task.output
46-
FROM pgflow.step_tasks leaf_task
47-
WHERE leaf_task.run_id = leaf_state.run_id
48-
AND leaf_task.step_slug = leaf_state.step_slug
49-
AND leaf_task.status = 'completed'
50-
LIMIT 1)
51-
END as single_output
52-
FROM pgflow.step_states leaf_state
53-
JOIN pgflow.steps leaf_step ON leaf_step.flow_slug = leaf_state.flow_slug AND leaf_step.step_slug = leaf_state.step_slug
54-
WHERE leaf_state.run_id = maybe_complete_run.run_id
55-
AND leaf_state.status = 'completed'
56-
AND NOT EXISTS (
57-
SELECT 1
58-
FROM pgflow.deps dep
59-
WHERE dep.flow_slug = leaf_state.flow_slug
60-
AND dep.dep_slug = leaf_state.step_slug
61-
)
62-
) leaf_outputs
25+
FROM pgflow.step_states leaf_state
26+
WHERE leaf_state.run_id = maybe_complete_run.run_id
27+
AND leaf_state.status = 'completed'
28+
AND NOT EXISTS (
29+
SELECT 1
30+
FROM pgflow.deps dep
31+
WHERE dep.flow_slug = leaf_state.flow_slug
32+
AND dep.dep_slug = leaf_state.step_slug
33+
)
6334
)
6435
WHERE pgflow.runs.run_id = maybe_complete_run.run_id
6536
AND pgflow.runs.remaining_steps = 0

pkgs/core/schemas/0120_function_start_tasks.sql

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -47,28 +47,13 @@ as $$
4747
st.run_id,
4848
st.step_slug,
4949
dep.dep_slug,
50-
-- Aggregate map outputs or use single output
51-
CASE
52-
WHEN dep_step.step_type = 'map' THEN
53-
-- Aggregate all task outputs ordered by task_index
54-
-- Use COALESCE to return empty array if no tasks
55-
(SELECT COALESCE(jsonb_agg(dt.output ORDER BY dt.task_index), '[]'::jsonb)
56-
FROM pgflow.step_tasks dt
57-
WHERE dt.run_id = st.run_id
58-
AND dt.step_slug = dep.dep_slug
59-
AND dt.status = 'completed')
60-
ELSE
61-
-- Single step: use the single task output
62-
dep_task.output
63-
END as dep_output
50+
-- Read output directly from step_states (already aggregated by writers)
51+
dep_state.output as dep_output
6452
from tasks st
6553
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
66-
join pgflow.steps dep_step on dep_step.flow_slug = dep.flow_slug and dep_step.step_slug = dep.dep_slug
67-
left join pgflow.step_tasks dep_task on
68-
dep_task.run_id = st.run_id and
69-
dep_task.step_slug = dep.dep_slug and
70-
dep_task.status = 'completed'
71-
and dep_step.step_type = 'single' -- Only join for single steps
54+
join pgflow.step_states dep_state on
55+
dep_state.run_id = st.run_id and
56+
dep_state.step_slug = dep.dep_slug
7257
),
7358
deps_outputs as (
7459
select
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
# step_output_storage Benchmark
2+
3+
Measures performance improvements from storing step outputs in `step_states.output` instead of aggregating from `step_tasks` on every read.
4+
5+
## Quick Start
6+
7+
```bash
8+
# From pkgs/core directory
9+
pnpm nx supabase:reset core
10+
pnpm nx supabase:status core # Note the port
11+
12+
PGPASSWORD=postgres psql -h 127.0.0.1 -p PORT -U postgres -d postgres \
13+
-f scripts/benchmarks/step_output_storage.sql
14+
```
15+
16+
## What Changed
17+
18+
### OLD Code (main branch)
19+
20+
In `start_tasks`, when a task needs its dependency outputs:
21+
22+
```sql
23+
-- For each dependent task, aggregate all completed task outputs
24+
CASE WHEN dep_step.step_type = 'map' THEN
25+
(SELECT jsonb_agg(dt.output ORDER BY dt.task_index)
26+
FROM pgflow.step_tasks dt
27+
WHERE dt.run_id = ... AND dt.status = 'completed')
28+
ELSE ...
29+
```
30+
31+
**Problem**: If a map step has 500 completed tasks, and 500 downstream tasks need to start, each downstream task triggers an aggregation query over 500 rows = **250,000 row scans**.
32+
33+
### NEW Code (step_output_storage branch)
34+
35+
Outputs are stored in `step_states.output` when a step completes:
36+
37+
```sql
38+
-- Just read the pre-stored output
39+
dep_state.output as dep_output
40+
```
41+
42+
**Improvement**: 500 downstream tasks each read 1 column = **500 column reads**.
43+
44+
## Benchmark Tests
45+
46+
### Test 1: `complete_task_final`
47+
48+
**What it measures**: Time to complete the last task of a map step.
49+
50+
**Setup**: Start a flow with N-element array, complete N-1 tasks, then time the final `complete_task()`.
51+
52+
**Why it matters**: The final task triggers output storage. In NEW code, this aggregates once and stores. In OLD code, this just completes the task (aggregation happens on read).
53+
54+
### Test 2: `start_tasks_read_N`
55+
56+
**What it measures**: Time to start a single downstream task that reads N-element dependency output.
57+
58+
**Setup**:
59+
1. Map step `producer` with N tasks - all completed
60+
2. Single step `consumer` depends on `producer`
61+
3. Time `start_tasks()` for the consumer task
62+
63+
**Flow structure**:
64+
```
65+
[producer: map(N)] --> [consumer: single]
66+
```
67+
68+
**Expected difference**:
69+
- OLD: Aggregates N task outputs on every read
70+
- NEW: Reads from `step_states.output` (O(1))
71+
72+
### Test 3: `start_tasks_batch_NxN`
73+
74+
**What it measures**: Time to start N downstream tasks, each reading N-element dependency output.
75+
76+
**Setup**:
77+
1. Map step `producer` with N tasks - all completed
78+
2. Map step `consumer` depends on `producer`, has N tasks
79+
3. Time `start_tasks()` for all N consumer tasks at once
80+
81+
**Flow structure**:
82+
```
83+
[producer: map(N)] --> [consumer: map(N)]
84+
```
85+
86+
**Expected difference**:
87+
- OLD: N tasks * aggregate(N outputs) = O(N^2) row scans
88+
- NEW: N tasks * read(1 column) = O(N) column reads
89+
90+
**This is the key benchmark** - should show the largest improvement.
91+
92+
## Configuration
93+
94+
Edit line 19 to change array size:
95+
96+
```sql
97+
\set ARRAY_SIZE 500 -- Default: ~3 min runtime
98+
\set ARRAY_SIZE 100 -- Quick test: ~30 sec
99+
\set ARRAY_SIZE 1000 -- Thorough: ~15+ min
100+
```
101+
102+
## Comparing Branches
103+
104+
```bash
105+
# 1. Run on step_output_storage branch
106+
git checkout step_output_storage
107+
pnpm nx supabase:reset core
108+
psql ... -f scripts/benchmarks/step_output_storage.sql | tee results_new.txt
109+
110+
# 2. Run on main branch
111+
git checkout main
112+
pnpm nx supabase:reset core
113+
psql ... -f scripts/benchmarks/step_output_storage.sql | tee results_old.txt
114+
115+
# 3. Compare start_tasks_batch_NxN results
116+
grep "start_tasks_batch" results_*.txt
117+
```
118+
119+
## Expected Results
120+
121+
| Test | OLD (main) | NEW (step_output_storage) | Improvement |
122+
|------|------------|---------------------------|-------------|
123+
| `complete_task_final` | ~same | ~same | minimal |
124+
| `start_tasks_read_N` | slower | faster | 30-50% |
125+
| `start_tasks_batch_NxN` | much slower | faster | 50-80% |
126+
127+
The batch test improvement grows with N because:
128+
- OLD scales as O(N^2)
129+
- NEW scales as O(N)

0 commit comments

Comments
 (0)