Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
38b6674
[core] Combine initial run fetch, event fetch, and run_started event …
VaguelySerious Mar 31, 2026
2f51139
[core] [world] Lazy run creation on start
VaguelySerious Mar 27, 2026
0dfd424
add local and postgres world changes
VaguelySerious Mar 27, 2026
b1c2bf3
fix get
VaguelySerious Mar 27, 2026
9202723
changelog
VaguelySerious Mar 27, 2026
c129183
fix local world
VaguelySerious Mar 27, 2026
5223192
fix backend
VaguelySerious Mar 27, 2026
5033640
e2e test
VaguelySerious Mar 27, 2026
d381cd3
docs
VaguelySerious Mar 27, 2026
10838f3
test fixes
VaguelySerious Mar 27, 2026
ccba633
docs
VaguelySerious Mar 28, 2026
770c419
fix
VaguelySerious Mar 28, 2026
ee29ee9
base64?
VaguelySerious Mar 28, 2026
11631dc
change e2e test
VaguelySerious Mar 28, 2026
ddd0bfd
changeset
VaguelySerious Mar 29, 2026
3ed82c1
409 in case of cold start + 404 backoff when polling run values
VaguelySerious Mar 29, 2026
7b51efd
remove override
VaguelySerious Mar 30, 2026
624f7a8
Address review feedback: fix eventData fallback, add idempotency comm…
VaguelySerious Apr 1, 2026
88d3882
queue changes
VaguelySerious Apr 1, 2026
b804f2b
Merge branch 'main' into peter/lazy-start
VaguelySerious Apr 1, 2026
2b7fcec
fix nit
VaguelySerious Apr 1, 2026
84f1e3e
queue transport
VaguelySerious Apr 1, 2026
8e77246
Merge branch 'main' into peter/lazy-start
VaguelySerious Apr 1, 2026
d8e9c27
Fix: Duplicate import of MAX_QUEUE_DELIVERIES from './runtime/constan…
vercel[bot] Apr 1, 2026
aa2fa6e
Version Packages (beta) (#1563)
Apr 1, 2026
499caec
fix(world-postgres): use typed JSON transport that preserves Uint8Array
VaguelySerious Apr 2, 2026
52b92ef
Merge branch 'main' into peter/lazy-start
VaguelySerious Apr 2, 2026
f1e5668
Merge branch 'main' into peter/lazy-start
VaguelySerious Apr 2, 2026
b6b2f93
Only do 404 checks when we know we did resilientStart
VaguelySerious Apr 2, 2026
4817776
Merge branch 'main' into peter/lazy-start
VaguelySerious Apr 2, 2026
f94870e
update docs
VaguelySerious Apr 2, 2026
06afa8b
Merge remote-tracking branch 'origin/main' into peter/lazy-start
VaguelySerious Apr 3, 2026
afd4f5f
docs
VaguelySerious Apr 3, 2026
3e70683
toctou fix
VaguelySerious Apr 3, 2026
b3f8bab
Merge remote-tracking branch 'origin/main' into peter/lazy-start
VaguelySerious Apr 3, 2026
c99c67c
fix(world-local): use writeExclusive for run_created entity to preven…
VaguelySerious Apr 3, 2026
64797f1
fix: generalize builtin step alias to handle bare-name and full-ID lo…
VaguelySerious Apr 3, 2026
a38b29a
Revert "fix: generalize builtin step alias to handle bare-name and fu…
VaguelySerious Apr 3, 2026
4c1a73b
refactor(world-vercel): move CBOR encode/decode into CborTransport
VaguelySerious Apr 4, 2026
90b9a7f
fix(core): use runInput.specVersion for run_started in resilient path
VaguelySerious Apr 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/four-donuts-glow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@workflow/world-postgres": patch
"@workflow/world-vercel": patch
"@workflow/world-local": patch
"@workflow/world": patch
"@workflow/core": patch
---

Allow workflow invocation to create run if initial storage call in `start` did not succeed. Send run input through queue to enable this. Allow creating run_created and run_started events together in World, and skip first event list call by returning events directly.
2 changes: 1 addition & 1 deletion docs/content/docs/changelog/meta.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"title": "Changelog",
"pages": ["index", "eager-processing"],
"pages": ["index", "eager-processing", "resilient-start"],
"defaultOpen": false
}
327 changes: 327 additions & 0 deletions docs/content/docs/changelog/resilient-start.mdx

Large diffs are not rendered by default.

54 changes: 54 additions & 0 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import { setTimeout as sleep } from 'node:timers/promises';
import {
WorkflowRunCancelledError,
WorkflowRunFailedError,
WorkflowWorldError,
} from '@workflow/errors';
import type { World } from '@workflow/world';
import {
afterAll,
assert,
Expand Down Expand Up @@ -2172,4 +2174,56 @@ describe('e2e', () => {
expect(returnValue.attempt).toBeGreaterThanOrEqual(1);
}
);

// ============================================================
// Resilient start: run completes even when run_created fails
// ============================================================
// TODO: Switch this to a stream-based workflow (e.g. readableStreamWorkflow)
// to also verify that serialization, flushing, and binary data work correctly
// over the queue boundary. Currently using addTenWorkflow to avoid the
// skipIf(isLocalDeployment()) barrier that stream tests require.
test(
'resilient start: addTenWorkflow completes when run_created returns 500',
{ timeout: 60_000 },
async () => {
// Get the real world and wrap it so the first events.create call
// (run_created) throws a 500 server error. The queue should still
// be dispatched with runInput, and the runtime should bootstrap
// the run via the run_started fallback path.
const realWorld = getWorld();
let createCallCount = 0;
const stubbedWorld: World = {
...realWorld,
events: {
...realWorld.events,
create: (async (...args: Parameters<World['events']['create']>) => {
createCallCount++;
if (createCallCount === 1) {
// Fail the very first call (run_created from start())
throw new WorkflowWorldError('Simulated storage outage', {
status: 500,
});
}
return realWorld.events.create(...args);
}) as World['events']['create'],
},
};

const run = await start(await e2e('addTenWorkflow'), [123], {
world: stubbedWorld,
});

// Verify the stub intercepted the run_created call (only call
// through the stubbed world — the server-side runtime uses its
// own world instance for run_started and subsequent events).
expect(createCallCount).toBe(1);

// The run should still complete despite run_created failing.
// The runtime's resilient start path creates the run from
// run_started, so returnValue polling may initially get
// WorkflowRunNotFoundError before the queue delivers.
const returnValue = await run.returnValue;
expect(returnValue).toBe(133);
}
);
});
36 changes: 30 additions & 6 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ export function workflowEntrypoint(
runId,
traceCarrier: traceContext,
requestedAt,
runInput,
} = WorkflowInvokePayloadSchema.parse(message_);
const { requestId } = metadata;
// Extract the workflow name from the topic name
Expand Down Expand Up @@ -239,7 +240,7 @@ export function workflowEntrypoint(
let workflowStartedAt = -1;
let workflowRun: WorkflowRun | undefined;
// Pre-loaded events from the run_started response.
// When present, we skip the events.list call to reduce TTFB.
// When present, we skip the events.list call.
let preloadedEvents: Event[] | undefined;

// --- Infrastructure: prepare the run state ---
Expand All @@ -257,7 +258,25 @@ export function workflowEntrypoint(
runId,
{
eventType: 'run_started',
specVersion: SPEC_VERSION_CURRENT,
// Use the spec version from the original start() call
// when available, so the resilient start path creates
// the run with the correct version (not always current).
specVersion:
runInput?.specVersion ?? SPEC_VERSION_CURRENT,
// Pass run input from queue so the server can
// create the run if run_created was missed.
// Uint8Array values survive the queue natively
// (CBOR on world-vercel, JSON reviver on world-local).
...(runInput
? {
eventData: {
input: runInput.input,
deploymentId: runInput.deploymentId,
workflowName: runInput.workflowName,
executionContext: runInput.executionContext,
},
}
: {}),
},
{ requestId }
);
Expand All @@ -268,7 +287,7 @@ export function workflowEntrypoint(
}
workflowRun = result.run;

// If the world returned events, use them to skip
// If the response includes events, use them to skip
// the initial events.list call and reduce TTFB.
if (result.events && result.events.length > 0) {
preloadedEvents = result.events;
Expand All @@ -282,13 +301,16 @@ export function workflowEntrypoint(
} catch (err) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking (behavior change): The old code caught both EntityConflictError and RunExpiredError here. The new code only catches RunExpiredError. This means if events.create('run_started') throws EntityConflictError (e.g., duplicate eventId from a concurrent request), it will now propagate to the queue handler and cause a retry — previously it was silently consumed.

Is this intentional? The design doc says already-running returns { run } without throwing, but EntityConflictError can come from other sources (e.g., DB unique constraint on the event ID). If intentional, add a comment explaining why EntityConflictError is no longer expected here. If not, it should be re-added.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved. EntityConflictError was restored in the catch block. Current code at runtime.ts:284:

if (EntityConflictError.is(err) || RunExpiredError.is(err)) {
  runtimeLogger.info(
    'Run already finished during setup, skipping',
    { workflowRunId: runId, message: err.message }
  );
  return;
}

See retrospective item 4 in resilient-start.mdx for context on why it was removed and then restored.

// Run was concurrently completed/failed/cancelled
if (EntityConflictError.is(err) || RunExpiredError.is(err)) {
// EntityConflictError: run was concurrently
// completed/failed/cancelled during setup.
// RunExpiredError: run already in terminal state.
// In both cases, skip processing this message.
runtimeLogger.info(
'Run already finished during setup, skipping',
{ workflowRunId: runId, message: err.message }
);
return;
}
if (err instanceof WorkflowRuntimeError) {
} else if (err instanceof WorkflowRuntimeError) {
runtimeLogger.error(
'Fatal runtime error during workflow setup',
{ workflowRunId: runId, error: err.message }
Expand Down Expand Up @@ -319,9 +341,11 @@ export function workflowEntrypoint(
throw failErr;
}
return;
} else {
throw err;
}
throw err;
}

workflowStartedAt = +workflowRun.startedAt;

span?.setAttributes({
Expand Down
30 changes: 29 additions & 1 deletion packages/core/src/runtime/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,19 @@ export class Run<TResult> {
*/
private encryptionKeyPromise: Promise<CryptoKey | undefined> | null = null;

constructor(runId: string) {
/**
* When true, run_created failed and the run may not exist yet (the
* resilient start path will create it via run_started). pollReturnValue
* retries on WorkflowRunNotFoundError only when this flag is set so
* that normal runs fail fast on 404.
* @internal
*/
private resilientStart = false;

constructor(runId: string, opts?: { resilientStart?: boolean }) {
this.runId = runId;
this.world = getWorld();
this.resilientStart = opts?.resilientStart ?? false;
}

/**
Expand Down Expand Up @@ -243,6 +253,15 @@ export class Run<TResult> {
* @returns The workflow return value.
*/
private async pollReturnValue(): Promise<TResult> {
// When resilientStart is true, run_created failed and the run may
// not exist yet. Retry on WorkflowRunNotFoundError up to 3 times
// (1s + 3s + 6s = 10s total) to give the queue time to deliver
// and the runtime to create the run via run_started.
// When resilientStart is false, 404 is a real error — fail fast.
let notFoundRetries = 0;
const NOT_FOUND_MAX_RETRIES = this.resilientStart ? 3 : 0;
const NOT_FOUND_DELAYS = [1_000, 3_000, 6_000];

while (true) {
try {
const run = await this.world.runs.get(this.runId);
Expand Down Expand Up @@ -270,6 +289,15 @@ export class Run<TResult> {
await new Promise((resolve) => setTimeout(resolve, 1_000));
continue;
}
if (
WorkflowRunNotFoundError.is(error) &&
notFoundRetries < NOT_FOUND_MAX_RETRIES
) {
const delay = NOT_FOUND_DELAYS[notFoundRetries]!;
notFoundRetries++;
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
throw error;
}
}
Expand Down
73 changes: 72 additions & 1 deletion packages/core/src/runtime/start.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { WorkflowRuntimeError } from '@workflow/errors';
import { WorkflowRuntimeError, WorkflowWorldError } from '@workflow/errors';
import { SPEC_VERSION_CURRENT, SPEC_VERSION_LEGACY } from '@workflow/world';
import {
afterEach,
Expand Down Expand Up @@ -391,6 +391,77 @@ describe('start', () => {
});
});

describe('resilient start (run_created failure)', () => {
const validWorkflow = Object.assign(() => Promise.resolve('result'), {
workflowId: 'test-workflow',
});

afterEach(() => {
vi.clearAllMocks();
});

it('should succeed when events.create throws a 500 error (queue still dispatched)', async () => {
const mockQueue = vi.fn().mockResolvedValue({ messageId: null });
const serverError = new WorkflowWorldError('Internal Server Error', {
status: 500,
});
const mockEventsCreate = vi.fn().mockRejectedValue(serverError);

vi.mocked(getWorld).mockReturnValue({
getDeploymentId: vi.fn().mockResolvedValue('deploy_123'),
events: { create: mockEventsCreate },
queue: mockQueue,
} as any);

// start() should NOT throw — the queue was still dispatched
const run = await start(validWorkflow, [42]);
expect(run.runId).toMatch(/^wrun_/);

// Queue should have been called with runInput
expect(mockQueue).toHaveBeenCalledTimes(1);
const [, queuePayload] = mockQueue.mock.calls[0];
expect(queuePayload.runInput).toBeDefined();
expect(queuePayload.runInput.deploymentId).toBe('deploy_123');
expect(queuePayload.runInput.workflowName).toBe('test-workflow');
expect(queuePayload.runInput.specVersion).toBe(SPEC_VERSION_CURRENT);
});

it('should throw when queue fails even if events.create succeeds', async () => {
const mockEventsCreate = vi.fn().mockResolvedValue({
run: { runId: 'wrun_test', status: 'pending' },
});
const mockQueue = vi
.fn()
.mockRejectedValue(new Error('Queue unavailable'));

vi.mocked(getWorld).mockReturnValue({
getDeploymentId: vi.fn().mockResolvedValue('deploy_123'),
events: { create: mockEventsCreate },
queue: mockQueue,
} as any);

await expect(start(validWorkflow, [])).rejects.toThrow(
'Queue unavailable'
);
});

it('should throw when events.create fails with a non-retryable error (e.g. 400)', async () => {
const badRequest = new WorkflowWorldError('Bad Request', {
status: 400,
});
const mockEventsCreate = vi.fn().mockRejectedValue(badRequest);
const mockQueue = vi.fn().mockResolvedValue({ messageId: null });

vi.mocked(getWorld).mockReturnValue({
getDeploymentId: vi.fn().mockResolvedValue('deploy_123'),
events: { create: mockEventsCreate },
queue: mockQueue,
} as any);

await expect(start(validWorkflow, [])).rejects.toThrow('Bad Request');
});
});

describe('overload type inference', () => {
// Type-only assertions that don't execute start() at runtime.
// We use expectTypeOf on the function signature's return type directly.
Expand Down
Loading
Loading