Skip to content

Commit c8dce52

Browse files
[core] [world] Lazy run creation on start (#1537)
1 parent 9d19530 commit c8dce52

21 files changed

Lines changed: 964 additions & 89 deletions

File tree

.changeset/four-donuts-glow.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
"@workflow/world-postgres": patch
3+
"@workflow/world-vercel": patch
4+
"@workflow/world-local": patch
5+
"@workflow/world": patch
6+
"@workflow/core": patch
7+
---
8+
9+
Allow workflow invocation to create run if initial storage call in `start` did not succeed. Send run input through queue to enable this. Allow creating run_created and run_started events together in World, and skip first event list call by returning events directly.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
22
"title": "Changelog",
3-
"pages": ["index", "eager-processing"],
3+
"pages": ["index", "eager-processing", "resilient-start"],
44
"defaultOpen": false
55
}

docs/content/docs/changelog/resilient-start.mdx

Lines changed: 327 additions & 0 deletions
Large diffs are not rendered by default.

packages/core/e2e/e2e.test.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import { setTimeout as sleep } from 'node:timers/promises';
44
import {
55
WorkflowRunCancelledError,
66
WorkflowRunFailedError,
7+
WorkflowWorldError,
78
} from '@workflow/errors';
9+
import type { World } from '@workflow/world';
810
import {
911
afterAll,
1012
assert,
@@ -2172,4 +2174,56 @@ describe('e2e', () => {
21722174
expect(returnValue.attempt).toBeGreaterThanOrEqual(1);
21732175
}
21742176
);
2177+
2178+
// ============================================================
2179+
// Resilient start: run completes even when run_created fails
2180+
// ============================================================
2181+
// TODO: Switch this to a stream-based workflow (e.g. readableStreamWorkflow)
2182+
// to also verify that serialization, flushing, and binary data work correctly
2183+
// over the queue boundary. Currently using addTenWorkflow to avoid the
2184+
// skipIf(isLocalDeployment()) barrier that stream tests require.
2185+
test(
2186+
'resilient start: addTenWorkflow completes when run_created returns 500',
2187+
{ timeout: 60_000 },
2188+
async () => {
2189+
// Get the real world and wrap it so the first events.create call
2190+
// (run_created) throws a 500 server error. The queue should still
2191+
// be dispatched with runInput, and the runtime should bootstrap
2192+
// the run via the run_started fallback path.
2193+
const realWorld = getWorld();
2194+
let createCallCount = 0;
2195+
const stubbedWorld: World = {
2196+
...realWorld,
2197+
events: {
2198+
...realWorld.events,
2199+
create: (async (...args: Parameters<World['events']['create']>) => {
2200+
createCallCount++;
2201+
if (createCallCount === 1) {
2202+
// Fail the very first call (run_created from start())
2203+
throw new WorkflowWorldError('Simulated storage outage', {
2204+
status: 500,
2205+
});
2206+
}
2207+
return realWorld.events.create(...args);
2208+
}) as World['events']['create'],
2209+
},
2210+
};
2211+
2212+
const run = await start(await e2e('addTenWorkflow'), [123], {
2213+
world: stubbedWorld,
2214+
});
2215+
2216+
// Verify the stub intercepted the run_created call (only call
2217+
// through the stubbed world — the server-side runtime uses its
2218+
// own world instance for run_started and subsequent events).
2219+
expect(createCallCount).toBe(1);
2220+
2221+
// The run should still complete despite run_created failing.
2222+
// The runtime's resilient start path creates the run from
2223+
// run_started, so returnValue polling may initially get
2224+
// WorkflowRunNotFoundError before the queue delivers.
2225+
const returnValue = await run.returnValue;
2226+
expect(returnValue).toBe(133);
2227+
}
2228+
);
21752229
});

packages/core/src/runtime.ts

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ export function workflowEntrypoint(
114114
runId,
115115
traceCarrier: traceContext,
116116
requestedAt,
117+
runInput,
117118
} = WorkflowInvokePayloadSchema.parse(message_);
118119
const { requestId } = metadata;
119120
// Extract the workflow name from the topic name
@@ -239,7 +240,7 @@ export function workflowEntrypoint(
239240
let workflowStartedAt = -1;
240241
let workflowRun: WorkflowRun | undefined;
241242
// Pre-loaded events from the run_started response.
242-
// When present, we skip the events.list call to reduce TTFB.
243+
// When present, we skip the events.list call.
243244
let preloadedEvents: Event[] | undefined;
244245

245246
// --- Infrastructure: prepare the run state ---
@@ -257,7 +258,25 @@ export function workflowEntrypoint(
257258
runId,
258259
{
259260
eventType: 'run_started',
260-
specVersion: SPEC_VERSION_CURRENT,
261+
// Use the spec version from the original start() call
262+
// when available, so the resilient start path creates
263+
// the run with the correct version (not always current).
264+
specVersion:
265+
runInput?.specVersion ?? SPEC_VERSION_CURRENT,
266+
// Pass run input from queue so the server can
267+
// create the run if run_created was missed.
268+
// Uint8Array values survive the queue natively
269+
// (CBOR on world-vercel, JSON reviver on world-local).
270+
...(runInput
271+
? {
272+
eventData: {
273+
input: runInput.input,
274+
deploymentId: runInput.deploymentId,
275+
workflowName: runInput.workflowName,
276+
executionContext: runInput.executionContext,
277+
},
278+
}
279+
: {}),
261280
},
262281
{ requestId }
263282
);
@@ -268,7 +287,7 @@ export function workflowEntrypoint(
268287
}
269288
workflowRun = result.run;
270289

271-
// If the world returned events, use them to skip
290+
// If the response includes events, use them to skip
272291
// the initial events.list call and reduce TTFB.
273292
if (result.events && result.events.length > 0) {
274293
preloadedEvents = result.events;
@@ -282,13 +301,16 @@ export function workflowEntrypoint(
282301
} catch (err) {
283302
// Run was concurrently completed/failed/cancelled
284303
if (EntityConflictError.is(err) || RunExpiredError.is(err)) {
304+
// EntityConflictError: run was concurrently
305+
// completed/failed/cancelled during setup.
306+
// RunExpiredError: run already in terminal state.
307+
// In both cases, skip processing this message.
285308
runtimeLogger.info(
286309
'Run already finished during setup, skipping',
287310
{ workflowRunId: runId, message: err.message }
288311
);
289312
return;
290-
}
291-
if (err instanceof WorkflowRuntimeError) {
313+
} else if (err instanceof WorkflowRuntimeError) {
292314
runtimeLogger.error(
293315
'Fatal runtime error during workflow setup',
294316
{ workflowRunId: runId, error: err.message }
@@ -319,9 +341,11 @@ export function workflowEntrypoint(
319341
throw failErr;
320342
}
321343
return;
344+
} else {
345+
throw err;
322346
}
323-
throw err;
324347
}
348+
325349
workflowStartedAt = +workflowRun.startedAt;
326350

327351
span?.setAttributes({

packages/core/src/runtime/run.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,19 @@ export class Run<TResult> {
8787
*/
8888
private encryptionKeyPromise: Promise<CryptoKey | undefined> | null = null;
8989

90-
constructor(runId: string) {
90+
/**
91+
* When true, run_created failed and the run may not exist yet (the
92+
* resilient start path will create it via run_started). pollReturnValue
93+
* retries on WorkflowRunNotFoundError only when this flag is set so
94+
* that normal runs fail fast on 404.
95+
* @internal
96+
*/
97+
private resilientStart = false;
98+
99+
constructor(runId: string, opts?: { resilientStart?: boolean }) {
91100
this.runId = runId;
92101
this.world = getWorld();
102+
this.resilientStart = opts?.resilientStart ?? false;
93103
}
94104

95105
/**
@@ -243,6 +253,15 @@ export class Run<TResult> {
243253
* @returns The workflow return value.
244254
*/
245255
private async pollReturnValue(): Promise<TResult> {
256+
// When resilientStart is true, run_created failed and the run may
257+
// not exist yet. Retry on WorkflowRunNotFoundError up to 3 times
258+
// (1s + 3s + 6s = 10s total) to give the queue time to deliver
259+
// and the runtime to create the run via run_started.
260+
// When resilientStart is false, 404 is a real error — fail fast.
261+
let notFoundRetries = 0;
262+
const NOT_FOUND_MAX_RETRIES = this.resilientStart ? 3 : 0;
263+
const NOT_FOUND_DELAYS = [1_000, 3_000, 6_000];
264+
246265
while (true) {
247266
try {
248267
const run = await this.world.runs.get(this.runId);
@@ -270,6 +289,15 @@ export class Run<TResult> {
270289
await new Promise((resolve) => setTimeout(resolve, 1_000));
271290
continue;
272291
}
292+
if (
293+
WorkflowRunNotFoundError.is(error) &&
294+
notFoundRetries < NOT_FOUND_MAX_RETRIES
295+
) {
296+
const delay = NOT_FOUND_DELAYS[notFoundRetries]!;
297+
notFoundRetries++;
298+
await new Promise((resolve) => setTimeout(resolve, delay));
299+
continue;
300+
}
273301
throw error;
274302
}
275303
}

packages/core/src/runtime/start.test.ts

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { WorkflowRuntimeError } from '@workflow/errors';
1+
import { WorkflowRuntimeError, WorkflowWorldError } from '@workflow/errors';
22
import { SPEC_VERSION_CURRENT, SPEC_VERSION_LEGACY } from '@workflow/world';
33
import {
44
afterEach,
@@ -391,6 +391,77 @@ describe('start', () => {
391391
});
392392
});
393393

394+
describe('resilient start (run_created failure)', () => {
395+
const validWorkflow = Object.assign(() => Promise.resolve('result'), {
396+
workflowId: 'test-workflow',
397+
});
398+
399+
afterEach(() => {
400+
vi.clearAllMocks();
401+
});
402+
403+
it('should succeed when events.create throws a 500 error (queue still dispatched)', async () => {
404+
const mockQueue = vi.fn().mockResolvedValue({ messageId: null });
405+
const serverError = new WorkflowWorldError('Internal Server Error', {
406+
status: 500,
407+
});
408+
const mockEventsCreate = vi.fn().mockRejectedValue(serverError);
409+
410+
vi.mocked(getWorld).mockReturnValue({
411+
getDeploymentId: vi.fn().mockResolvedValue('deploy_123'),
412+
events: { create: mockEventsCreate },
413+
queue: mockQueue,
414+
} as any);
415+
416+
// start() should NOT throw — the queue was still dispatched
417+
const run = await start(validWorkflow, [42]);
418+
expect(run.runId).toMatch(/^wrun_/);
419+
420+
// Queue should have been called with runInput
421+
expect(mockQueue).toHaveBeenCalledTimes(1);
422+
const [, queuePayload] = mockQueue.mock.calls[0];
423+
expect(queuePayload.runInput).toBeDefined();
424+
expect(queuePayload.runInput.deploymentId).toBe('deploy_123');
425+
expect(queuePayload.runInput.workflowName).toBe('test-workflow');
426+
expect(queuePayload.runInput.specVersion).toBe(SPEC_VERSION_CURRENT);
427+
});
428+
429+
it('should throw when queue fails even if events.create succeeds', async () => {
430+
const mockEventsCreate = vi.fn().mockResolvedValue({
431+
run: { runId: 'wrun_test', status: 'pending' },
432+
});
433+
const mockQueue = vi
434+
.fn()
435+
.mockRejectedValue(new Error('Queue unavailable'));
436+
437+
vi.mocked(getWorld).mockReturnValue({
438+
getDeploymentId: vi.fn().mockResolvedValue('deploy_123'),
439+
events: { create: mockEventsCreate },
440+
queue: mockQueue,
441+
} as any);
442+
443+
await expect(start(validWorkflow, [])).rejects.toThrow(
444+
'Queue unavailable'
445+
);
446+
});
447+
448+
it('should throw when events.create fails with a non-retryable error (e.g. 400)', async () => {
449+
const badRequest = new WorkflowWorldError('Bad Request', {
450+
status: 400,
451+
});
452+
const mockEventsCreate = vi.fn().mockRejectedValue(badRequest);
453+
const mockQueue = vi.fn().mockResolvedValue({ messageId: null });
454+
455+
vi.mocked(getWorld).mockReturnValue({
456+
getDeploymentId: vi.fn().mockResolvedValue('deploy_123'),
457+
events: { create: mockEventsCreate },
458+
queue: mockQueue,
459+
} as any);
460+
461+
await expect(start(validWorkflow, [])).rejects.toThrow('Bad Request');
462+
});
463+
});
464+
394465
describe('overload type inference', () => {
395466
// Type-only assertions that don't execute start() at runtime.
396467
// We use expectTypeOf on the function signature's return type directly.

0 commit comments

Comments
 (0)