-
Notifications
You must be signed in to change notification settings - Fork 255
Expand file tree
/
Copy pathstart.ts
More file actions
336 lines (302 loc) · 11.9 KB
/
start.ts
File metadata and controls
336 lines (302 loc) · 11.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
import { waitUntil } from '@vercel/functions';
import {
EntityConflictError,
ThrottleError,
WorkflowRuntimeError,
WorkflowWorldError,
} from '@workflow/errors';
import type { WorkflowInvokePayload, World } from '@workflow/world';
import {
isLegacySpecVersion,
SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT,
SPEC_VERSION_SUPPORTS_EVENT_SOURCING,
} from '@workflow/world';
import { monotonicFactory } from 'ulid';
import { importKey } from '../encryption.js';
import { runtimeLogger } from '../logger.js';
import type { Serializable } from '../schemas.js';
import { dehydrateWorkflowArguments } from '../serialization.js';
import * as Attribute from '../telemetry/semantic-conventions.js';
import { serializeTraceCarrier, trace } from '../telemetry.js';
import { waitedUntil } from '../util.js';
import { version as workflowCoreVersion } from '../version.js';
import { getWorkflowQueueName } from './helpers.js';
import { Run } from './run.js';
import { getWorld } from './world.js';
/** ULID generator for client-side runId generation */
const ulid = monotonicFactory();
export interface StartOptionsBase {
/**
* The world to use for the workflow run creation,
* by default the world is inferred from the environment variables.
*/
world?: World;
/**
* The spec version to use for the workflow run. Defaults to the latest version.
*/
specVersion?: number;
}
export interface StartOptionsWithDeploymentId extends StartOptionsBase {
/**
* The deployment ID to use for the workflow run.
*
* By default, this is automatically inferred from environment variables
* when deploying to Vercel.
*
* Set to `'latest'` to automatically resolve the most recent deployment
* for the current environment (same production target or git branch).
* This is currently a Vercel-specific feature.
*
* **Note:** When `deploymentId` is provided, the argument and return types become `unknown`
* since there is no guarantee the types will be consistent across deployments.
*/
deploymentId: 'latest' | (string & {});
}
export interface StartOptionsWithoutDeploymentId extends StartOptionsBase {
deploymentId?: undefined;
}
/**
* Options for starting a workflow run.
*/
export type StartOptions =
| StartOptionsWithDeploymentId
| StartOptionsWithoutDeploymentId;
/**
* Represents an imported workflow function.
*/
export type WorkflowFunction<TArgs extends unknown[], TResult> = (
...args: TArgs
) => Promise<TResult>;
/**
* Represents the generated metadata of a workflow function.
*/
export type WorkflowMetadata = { workflowId: string };
/**
* Starts a workflow run.
*
* @param workflow - The imported workflow function to start.
* @param args - The arguments to pass to the workflow (optional).
* @param options - The options for the workflow run (optional).
* @returns The unique run ID for the newly started workflow invocation.
*/
// Overloads with deploymentId - args and return type become unknown
// Uses generics so typed workflows are assignable (avoids contravariance issues),
// but the return type and args are still unknown since the deployed version may differ.
export function start<TArgs extends unknown[], TResult>(
workflow: WorkflowFunction<TArgs, TResult> | WorkflowMetadata,
args: unknown[],
options: StartOptionsWithDeploymentId
): Promise<Run<unknown>>;
export function start<TResult>(
workflow: WorkflowFunction<[], TResult> | WorkflowMetadata,
options: StartOptionsWithDeploymentId
): Promise<Run<unknown>>;
// Overloads without deploymentId - preserve type inference
export function start<TArgs extends unknown[], TResult>(
workflow: WorkflowFunction<TArgs, TResult> | WorkflowMetadata,
args: TArgs,
options?: StartOptionsWithoutDeploymentId
): Promise<Run<TResult>>;
export function start<TResult>(
workflow: WorkflowFunction<[], TResult> | WorkflowMetadata,
options?: StartOptionsWithoutDeploymentId
): Promise<Run<TResult>>;
export async function start<TArgs extends unknown[], TResult>(
workflow: WorkflowFunction<TArgs, TResult> | WorkflowMetadata,
argsOrOptions?: TArgs | StartOptions,
options?: StartOptions
) {
'use step';
return await waitedUntil(() => {
// @ts-expect-error this field is added by our client transform
const workflowName = workflow?.workflowId;
if (!workflowName) {
throw new WorkflowRuntimeError(
`'start' received an invalid workflow function. Ensure the Workflow SDK is configured correctly and the function includes a 'use workflow' directive.`,
{ slug: 'start-invalid-workflow-function' }
);
}
return trace(`workflow.start ${workflowName}`, async (span) => {
span?.setAttributes({
...Attribute.WorkflowName(workflowName),
...Attribute.WorkflowOperation('start'),
});
let args: Serializable[] = [];
let opts: StartOptions = options ?? {};
if (Array.isArray(argsOrOptions)) {
args = argsOrOptions as Serializable[];
} else if (typeof argsOrOptions === 'object') {
opts = argsOrOptions;
}
span?.setAttributes({
...Attribute.WorkflowArgumentsCount(args.length),
});
const world = opts?.world ?? (await getWorld());
let deploymentId = opts.deploymentId ?? (await world.getDeploymentId());
// When 'latest' is requested, resolve the actual latest deployment ID
// for the current deployment's environment (same production target or
// same git branch for preview deployments).
if (deploymentId === 'latest') {
if (!world.resolveLatestDeploymentId) {
throw new WorkflowRuntimeError(
"deploymentId 'latest' requires a World that implements resolveLatestDeploymentId()"
);
}
deploymentId = await world.resolveLatestDeploymentId();
}
const ops: Promise<void>[] = [];
// Generate runId client-side so we have it before serialization
// (required for future E2E encryption where runId is part of the encryption context)
const runId = `wrun_${ulid()}`;
// Serialize current trace context to propagate across queue boundary
const traceCarrier = await serializeTraceCarrier();
// Use world-declared specVersion when available (our worlds set this),
// otherwise fall back to the safe baseline that community worlds handle.
// Community worlds built against older @workflow/world reject runs with
// specVersion > their SPEC_VERSION_CURRENT via requiresNewerWorld().
const specVersion =
opts.specVersion ??
world.specVersion ??
SPEC_VERSION_SUPPORTS_EVENT_SOURCING;
const v1Compat = isLegacySpecVersion(specVersion);
// Resolve encryption key for the new run. The runId has already been
// generated above (client-generated ULID) and will be used for both
// key derivation and the run_created event. The World implementation
// uses the runId for per-run HKDF key derivation. We pass the resolved
// deploymentId (not just the raw opts) so the World can use it for
// key resolution even when deploymentId was inferred from the environment
// rather than explicitly provided in opts (e.g., in e2e test runners).
const rawKey = await world.getEncryptionKeyForRun?.(runId, {
...opts,
deploymentId,
});
const encryptionKey = rawKey ? await importKey(rawKey) : undefined;
// Create run via run_created event (event-sourced architecture)
// Pass client-generated runId - server will accept and use it
const workflowArguments = await dehydrateWorkflowArguments(
args,
runId,
encryptionKey,
ops,
globalThis,
v1Compat
);
const executionContext = {
traceCarrier,
workflowCoreVersion,
features: { encryption: !!encryptionKey },
};
// Call events.create (run_created) and queue in parallel.
// If events.create fails with 429/5xx, the run was still accepted
// via the queue and creation will be re-tried async by the runtime.
const [runCreatedResult, queueResult] = await Promise.allSettled([
world.events.create(
runId,
{
eventType: 'run_created',
specVersion,
eventData: {
deploymentId: deploymentId,
workflowName: workflowName,
input: workflowArguments,
executionContext,
},
},
{ v1Compat }
),
world.queue(
getWorkflowQueueName(workflowName),
{
runId,
traceCarrier,
...(specVersion >= SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT
? {
runInput: {
input: workflowArguments,
deploymentId,
workflowName,
specVersion,
executionContext,
},
}
: {}),
} satisfies WorkflowInvokePayload,
{
deploymentId,
specVersion,
}
),
]);
// Queue failure is always fatal — the run was not enqueued
if (queueResult.status === 'rejected') {
throw queueResult.reason;
}
// Handle events.create result
let resilientStart = false;
if (runCreatedResult.status === 'rejected') {
const err = runCreatedResult.reason;
if (EntityConflictError.is(err)) {
// 409: The run already exists. This can happen in extreme cases where
// the run creation call gets a cold start or other slowdown, and the queue
// + run_started call completes faster. We expect this to be <=1% of cases.
// In this case, we can safely return.
} else if (isRetryableStartError(err)) {
// 429 (ThrottleError) and 5xx (WorkflowWorldError with status >= 500)
// are retryable — the run was accepted via the queue and creation
// will be re-tried by the runtime when it calls run_started.
resilientStart = true;
runtimeLogger.warn(
'Run creation event failed, but the run was accepted via the queue. ' +
'The run_created event will be re-tried async by the runtime.',
{ workflowRunId: runId, error: err.message }
);
} else {
throw err;
}
} else {
const result = runCreatedResult.value;
// Assert that the run was created
if (!result.run) {
throw new WorkflowRuntimeError(
"Missing 'run' in server response for 'run_created' event"
);
}
// Verify server accepted our runId
if (!v1Compat && result.run.runId !== runId) {
throw new WorkflowRuntimeError(
`Server returned different runId than requested: expected ${runId}, got ${result.run.runId}`
);
}
}
waitUntil(
Promise.all(ops).catch((err) => {
// Ignore expected client disconnect errors (e.g., browser refresh during streaming)
const isAbortError =
err?.name === 'AbortError' || err?.name === 'ResponseAborted';
if (!isAbortError) throw err;
})
);
span?.setAttributes({
...Attribute.WorkflowRunId(runId),
...Attribute.DeploymentId(deploymentId),
...(runCreatedResult.status === 'fulfilled' &&
runCreatedResult.value.run
? Attribute.WorkflowRunStatus(runCreatedResult.value.run.status)
: {}),
});
return new Run<TResult>(runId, { resilientStart });
});
});
}
/**
* Checks if an error from events.create (run_created) is retryable,
* meaning the queue can re-try creation later via the run_started path.
* - ThrottleError (429): rate limited, will succeed later
* - WorkflowWorldError with status >= 500: server error, will succeed later
*/
function isRetryableStartError(err: unknown): boolean {
if (ThrottleError.is(err)) return true;
if (WorkflowWorldError.is(err) && err.status && err.status >= 500)
return true;
return false;
}