Skip to content

Commit ede7720

Browse files
authored
feat: add flow_input to StepTaskRecord and context objects (#555)
# Add flow_input to StepTaskRecord and expose it in worker context This PR adds the original flow input to the `StepTaskRecord` type and exposes it throughout the worker context. This allows dependent steps to access the original flow input parameters, which is useful for maintaining context across multi-step flows. The changes: - Add `flow_input` field to `StepTaskRecord` in core types - Expose the flow input in the `StepTaskExecution` and `StepTaskWithMessage` interfaces - Pass the flow input through the task poller to the worker context - Update test helpers to include flow input in context creation - Add tests to verify flow input is properly passed through the context This enhancement enables step implementations to reference the original parameters that started the flow, improving the ability to build steps that depend on initial configuration.
1 parent 55a0824 commit ede7720

File tree

9 files changed

+57
-9
lines changed

9 files changed

+57
-9
lines changed

pkgs/core/src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export type StepTaskRecord<TFlow extends AnyFlow> = {
2525
task_index: number;
2626
input: Simplify<StepInput<TFlow, StepSlug>>;
2727
msg_id: number;
28+
flow_input: ExtractFlowInput<TFlow>;
2829
};
2930
}[Extract<keyof ExtractFlowSteps<TFlow>, string>];
3031

pkgs/edge-worker/deno.lock

Lines changed: 25 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkgs/edge-worker/src/core/context.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/* DSL‐level ------------------------------------------------------------ */
2-
import type { BaseContext, AnyFlow, AllStepInputs } from '@pgflow/dsl';
2+
import type { BaseContext, AnyFlow, AllStepInputs, ExtractFlowInput } from '@pgflow/dsl';
33
import type { Json } from './types.js';
44
import type { PgmqMessageRecord } from '../queue/types.js';
55
import type { StepTaskRecord } from '../flow/types.js';
@@ -30,6 +30,7 @@ export interface StepTaskExecution<TFlow extends AnyFlow = AnyFlow> {
3030
rawMessage: PgmqMessageRecord<AllStepInputs<TFlow>>;
3131
stepTask : StepTaskRecord<TFlow>;
3232
workerConfig: Readonly<Omit<FlowWorkerConfig, 'sql'>>;
33+
flowInput: ExtractFlowInput<TFlow>;
3334
}
3435

3536
/** Message handler context for any platform */
@@ -52,6 +53,7 @@ export interface StepTaskWithMessage<TFlow extends AnyFlow> {
5253
msg_id : number;
5354
message: PgmqMessageRecord<AllStepInputs<TFlow>>;
5455
task : StepTaskRecord<TFlow>;
56+
flowInput: ExtractFlowInput<TFlow>;
5557
}
5658

5759
import { deepClone, deepFreeze } from './deepUtils.js';

pkgs/edge-worker/src/flow/StepTaskPoller.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ export class StepTaskPoller<TFlow extends AnyFlow>
103103
return {
104104
message,
105105
task,
106-
msg_id: task.msg_id
106+
msg_id: task.msg_id,
107+
flowInput: task.flow_input
107108
};
108109
})
109110
.filter((item): item is StepTaskWithMessage<TFlow> => item !== null);

pkgs/edge-worker/src/flow/createFlowWorker.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ export function createFlowWorker<TFlow extends AnyFlow, TResources extends Recor
132132
rawMessage: taskWithMessage.message,
133133
stepTask: taskWithMessage.task,
134134
workerConfig: frozenWorkerConfig, // Reuse cached frozen config
135+
flowInput: taskWithMessage.flowInput, // Original flow input for dependent steps
135136

136137
// Platform-specific resources (generic)
137138
...platformAdapter.platformResources

pkgs/edge-worker/src/test/test-helpers.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type postgres from 'postgres';
22
import type { SupabaseClient } from '@supabase/supabase-js';
3-
import type { AnyFlow, AllStepInputs } from '@pgflow/dsl';
3+
import type { AnyFlow, AllStepInputs, ExtractFlowInput } from '@pgflow/dsl';
44
import type { SupabaseResources } from '@pgflow/dsl/supabase';
55
import type { Json } from '../core/types.js';
66
import type { PgmqMessageRecord } from '../queue/types.js';
@@ -83,9 +83,10 @@ export function createSupabaseStepTaskContext<TFlow extends AnyFlow>(params: {
8383
abortSignal: AbortSignal;
8484
stepTask: StepTaskRecord<TFlow>;
8585
rawMessage: PgmqMessageRecord<AllStepInputs<TFlow>>;
86+
flowInput: ExtractFlowInput<TFlow>;
8687
workerConfig?: Readonly<Omit<FlowWorkerConfig, 'sql'>>;
8788
}): SupabaseStepTaskContext<TFlow> {
88-
const { env, sql, abortSignal, stepTask, rawMessage, workerConfig } = params;
89+
const { env, sql, abortSignal, stepTask, rawMessage, flowInput, workerConfig } = params;
8990

9091
// Validate required environment variables
9192
const supabaseUrl = env.SUPABASE_URL;
@@ -126,6 +127,7 @@ export function createSupabaseStepTaskContext<TFlow extends AnyFlow>(params: {
126127
rawMessage,
127128
stepTask,
128129
workerConfig: defaultWorkerConfig,
130+
flowInput,
129131

130132
// Supabase-specific resources (always present)
131133
sql,

pkgs/edge-worker/tests/integration/stepTaskExecutorContext.test.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ Deno.test(
5858
step_slug: 'test_step',
5959
task_index: 0,
6060
input: { run: { data: 'test data' } },
61+
flow_input: { data: 'test data' },
6162
};
6263

6364
// Create context with mock task and message using proper flow worker context creation
@@ -77,6 +78,7 @@ Deno.test(
7778
msg_id: 123,
7879
message: mockMessage,
7980
task: mockTask,
81+
flowInput: { data: 'test data' },
8082
},
8183
});
8284

@@ -120,6 +122,7 @@ Deno.test(
120122
step_slug: 'legacy_step',
121123
task_index: 0,
122124
input: { run: { value: 42 } },
125+
flow_input: { value: 42 },
123126
};
124127

125128
// Get the step handler
@@ -142,6 +145,7 @@ Deno.test(
142145
msg_id: 456,
143146
message: mockMessage,
144147
task: mockTask,
148+
flowInput: { value: 42 },
145149
},
146150
});
147151

@@ -187,13 +191,15 @@ Deno.test(
187191
step_slug: 'check_raw',
188192
task_index: 0,
189193
input: { run: {} },
194+
flow_input: {},
190195
};
191196

192197
// Create context - for this test we need a mock taskWithMessage
193198
const mockTaskWithMessage = {
194199
msg_id: 789,
195200
message: mockMessage,
196201
task: mockTask,
202+
flowInput: {},
197203
};
198204

199205
const context = createFlowWorkerContext({
@@ -247,13 +253,15 @@ Deno.test(
247253
step_slug: 'check_clients',
248254
task_index: 0,
249255
input: { run: {} },
256+
flow_input: {},
250257
};
251258

252259
// Create context with Supabase env vars
253260
const mockTaskWithMessage = {
254261
msg_id: 999,
255262
message: mockMessage,
256263
task: mockTask,
264+
flowInput: {},
257265
};
258266

259267
const context = createFlowWorkerContext({
@@ -328,6 +336,7 @@ Deno.test(
328336
step_slug: 'fetch_data',
329337
task_index: 0,
330338
input: { run: { id: 123 } },
339+
flow_input: { id: 123 },
331340
};
332341

333342
const context = createFlowWorkerContext({
@@ -338,6 +347,7 @@ Deno.test(
338347
msg_id: 456,
339348
message: mockMessageForComplex,
340349
task: mockTaskForComplex,
350+
flowInput: { id: 123 },
341351
},
342352
});
343353

pkgs/edge-worker/tests/unit/contextUtils.test.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,18 @@ const mockStepMessage: PgmqMessageRecord<{ run: { test: string } }> = {
4747
headers: null,
4848
};
4949

50+
// Mock flow input
51+
const mockFlowInput = { test: 'flow-input' };
52+
5053
// Mock step task (using generic typing)
5154
const mockStepTask = {
5255
flow_slug: 'test-flow',
5356
run_id: 'run-456',
5457
step_slug: 'test-step',
5558
input: { run: { test: 'input' } },
56-
msg_id: 123
59+
msg_id: 123,
60+
flow_input: mockFlowInput,
61+
task_index: 0
5762
} as StepTaskRecord<never>;
5863

5964
Deno.test('createSupabaseMessageContext - creates context with all Supabase resources', () => {
@@ -104,15 +109,17 @@ Deno.test('createSupabaseStepTaskContext - creates context with step task', () =
104109
abortSignal: mockAbortSignal,
105110
stepTask: mockStepTask,
106111
rawMessage: mockStepMessage,
112+
flowInput: mockFlowInput,
107113
});
108-
114+
109115
// Check all properties exist
110116
assertEquals(context.env, fullEnv);
111117
assertEquals(context.sql, mockSql);
112118
assertEquals(context.shutdownSignal, mockAbortSignal);
113119
assertEquals(context.stepTask, mockStepTask);
114120
assertEquals(context.rawMessage, mockStepMessage);
115-
121+
assertEquals(context.flowInput, mockFlowInput);
122+
116123
// Supabase client should always be present
117124
assertExists(context.supabase);
118125
});

pkgs/example-flows/src/example-flow.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ export const stepTaskRecord: StepTaskRecord<typeof ExampleFlow> = {
4141
// normalStep: { doubledValueArray: [1, 2, 3] }, --- this should be an error
4242
},
4343
msg_id: 1,
44+
flow_input: { value: 23 },
4445
};
4546

4647
// export const yolo: { value: number } = { value: 23, otherValue: 'yolo' };

0 commit comments

Comments
 (0)