diff --git a/.changeset/four-donuts-glow.md b/.changeset/four-donuts-glow.md new file mode 100644 index 0000000000..789ec67ddb --- /dev/null +++ b/.changeset/four-donuts-glow.md @@ -0,0 +1,9 @@ +--- +"@workflow/world-postgres": patch +"@workflow/world-vercel": patch +"@workflow/world-local": patch +"@workflow/world": patch +"@workflow/core": patch +--- + +Allow workflow invocation to create run if initial storage call in `start` did not succeed. Send run input through queue to enable this. Allow creating run_created and run_started events together in World, and skip first event list call by returning events directly. diff --git a/docs/content/docs/changelog/meta.json b/docs/content/docs/changelog/meta.json index 042ff8fc8b..0c01dff133 100644 --- a/docs/content/docs/changelog/meta.json +++ b/docs/content/docs/changelog/meta.json @@ -1,5 +1,5 @@ { "title": "Changelog", - "pages": ["index", "eager-processing"], + "pages": ["index", "eager-processing", "resilient-start"], "defaultOpen": false } diff --git a/docs/content/docs/changelog/resilient-start.mdx b/docs/content/docs/changelog/resilient-start.mdx new file mode 100644 index 0000000000..653c439e0f --- /dev/null +++ b/docs/content/docs/changelog/resilient-start.mdx @@ -0,0 +1,327 @@ +--- +title: Resilient run start +description: Overhaul run start logic to tolerate world storage unavailability, as long as the queue is healthy, and significantly speeds up run start. +--- + +# Resilient `start()` + +## Motivation + +When `world` storage is unavailable but the queue is up, +`start()` previously failed entirely because `world.events.create(run_created)` +is called before `world.queue()`. This change decouples run creation from queue +dispatch so that runs can still be accepted when storage is degraded. + +Additionally, the runtime previously called `world.runs.get(runId)` before +`run_started`, adding an extra round-trip. By always calling `run_started` +directly, we save that round-trip and can return pre-loaded events in the +response to skip the initial `events.list` call, reducing TTFB. + +## Design + +### `start()` changes (packages/core) + +- `world.events.create` (run_created) and `world.queue` are now called **in parallel** + via `Promise.allSettled`. +- If `events.create` errors with **429 or 5xx**, we log a warning saying that run + creation failed but the run was accepted — creation will be re-tried async by the + runtime when it processes the queue message. The returned `Run` instance is marked + with `resilientStart = true`. +- If `events.create` errors with **409** (EntityConflictError), the run already exists + (e.g., the queue handler's resilient start path created it first due to a cold-start + race). This is treated as success. +- If `world.queue` fails, we still throw — the run truly failed and was not enqueued. +- The queue invocation now receives all the run inputs (`input`, `deploymentId`, + `workflowName`, `specVersion`, `executionContext`) via `runInput` so the runtime can + create the run later if needed. +- When the runtime re-enqueues itself, it does **not** pass these inputs — only the + first queue cycle carries them. + +### `workflowEntrypoint` changes (packages/core) + +- When calling `world.events.create` with `run_started`, we now also always pass the + run input that was sent through the queue, if available. The response will still be on off: + - **200 with event (now running)**: As usual, but the server could have used the run input to create the run if it didn't exist yet. The response will be opaque to the runtime. + - **200 without event (already running)**: As usual + - **409 or 410 (already finished)**: As usual + +### `Run.returnValue` polling (packages/core) + +- When `resilientStart` is true on the Run instance (run_created failed), the + `pollReturnValue` loop retries on `WorkflowRunNotFoundError` up to 3 times + (1s + 3s + 6s = 10s total) to give the queue time to deliver and the runtime + to create the run via `run_started`. +- When `resilientStart` is false (normal path), 404 fails immediately — no delay + for the common case of a wrong run ID. + +### World / workflow-server changes + +- Posting `run_started` to a **non-existent** run is now allowed when the run input is + sent along with the payload. The server: + 1. Creates a `run_created` event first (so the event log is consistent). + 2. Strips the input from the `run_started` event data (it lives on `run_created`). + 3. Then creates the `run_started` event normally. + 4. Emits a log and a Datadog metric (`workflow_server.resilient_start.run_created_via_run_started`) + to track when this fallback path is hit. +- When `run_started` encounters an **already-running** run, all worlds return `{ run }` + with `event: undefined` instead of throwing. No duplicate event is created. + +### Queue transport changes + +`Uint8Array` values (the serialized workflow input in `runInput`) don't survive plain +JSON serialization. Each world uses a transport that preserves binary data: + +- **world-vercel**: CBOR transport — CBOR-encodes the entire queue payload into a + `Buffer` and uses `BufferTransport` from `@vercel/queue`. Uint8Array survives natively. +- **world-local**: `TypedJsonTransport` — uses the existing `jsonReplacer`/`jsonReviver` + from `fs.ts` that encode Uint8Array as `{ __type: 'Uint8Array', data: '' }`. +- **world-postgres**: Inline typed JSON transport — same tagged-envelope approach as + world-local, inlined since world-postgres doesn't import from world-local. + +## Decisions + +1. **Parallel not sequential**: We chose `Promise.allSettled` over sequential calls to + minimize latency in the happy path. + +2. **Already-running returns run without event**: When `run_started` encounters an + already-running run, all worlds return `{ run }` with `event: undefined` (no + `events` array) instead of throwing. The runtime detects this by checking for + `result.event === undefined`. This avoids an extra `world.runs.get` round-trip. + +3. **Events in 200 response**: We only return events on the 200 path (first caller). + On the already-running path, we fall back to the normal `events.list` call. This is + correct because only on 200 can we be certain we know the full event history. + +4. **Conditional 404 retry on Run.returnValue**: Only when `resilientStart = true` + (run_created failed). Normal runs fail fast on 404. + +## Known concerns + +### Cold-start race on Vercel (observed in CI) + +On Vercel, the parallel dispatch can cause the queue message to be processed before +`run_created` completes, if `run_created` hits a cold-start lambda. Confirmed via +Datadog: the `run_started` request hit a warm lambda (23ms) while `run_created` hit +a cold lambda (727ms), even though `run_created` arrived at the edge 116ms earlier. +When this happens: + +1. The runtime's resilient start path creates the run from `run_started`. +2. The original `run_created` arrives and gets 409 (EntityConflictError). +3. `start()` treats the 409 as success (the run exists). + +This is handled correctly. The `resilientStart` flag is NOT set on the Run instance +in this case (409 is not a retryable error), so `returnValue` fails fast on 404. + +### Local Prod test flakiness (resolved) + +On world-local, the queue's async IIFE can deliver the message before +`events.create(run_created)` finishes writing to the shared filesystem. The +resilient start path should handle this, but Local Prod tests showed occasional +runs stuck at `pending` (no `run_started` event), and Windows CI showed +"Unconsumed event in event log" errors from duplicate `run_created` events. + +**Root cause:** A TOCTOU race between the normal `run_created` path and the +resilient start path. Both used `writeJSON` which checks existence with +`fs.access()` (non-atomic), so both could pass the check and write separate +`run_created` events with different event IDs. Fixed by switching both paths to +`writeExclusive` (O_CREAT|O_EXCL) — see retrospective items 12 and 16. + +## Follow-up work + +- [x] ~~Investigate Local Prod test flakiness~~ — resolved via `writeExclusive` + for run entity creation (retrospective items 12, 16). +- [ ] Monitor the Datadog metric in production to understand how often the fallback is hit. +- [x] ~~Events optimization for re-enqueue cycles~~ — decided against. The + already-running path returns early without writing an event, so preloading + events there would require an extra filesystem/DB query on every re-enqueue. + More importantly, on Vercel with at-least-once delivery, multiple lambdas can + process the same run concurrently — the event snapshot could be stale or + incomplete. The runtime's fallback to `events.list` is the correct behavior + for re-enqueue cycles. +- [x] ~~CborTransport pass-through~~ — refactored. `encode()`/`decode()` now + live inside `CborTransport.serialize()`/`deserialize()`, matching the pattern + used by TypedJsonTransport (world-local) and the inline transport + (world-postgres). Call sites pass plain objects instead of pre-encoded buffers. + +## Development retrospective + +Chronological log of mistakes, misunderstandings, and reverted approaches during +development. Included for future reference when working on similar cross-cutting +runtime changes. + +### 1. Uint8Array corruption through JSON queue transport + +The initial implementation passed `runInput.input` (a `Uint8Array`) directly through +the queue payload. `Uint8Array` doesn't survive `JSON.stringify` — it becomes +`{"0":72,"1":101,...}`. This corrupted the workflow input when the resilient start +path tried to recreate the run from the queue-delivered data. + +Caught by the `spawnWorkflowFromStepWorkflow` e2e test and the `world-testing` +embedded tests, which failed with "Invalid input" from devalue's `unflatten()`. + +Three approaches were tried before landing on the final solution: + +1. **Base64 encoding** (`btoa`/`atob`) — worked but fragile. The decode side used + `typeof runInput.input === 'string'` as a discriminant, which was flagged as + dangerous since non-binary inputs could also be strings. +2. **`Array.from()`/`new Uint8Array()`** — replaced base64 with a plain number array. + Two problems: (a) 3x JSON size regression vs base64, and (b) `Array.isArray()` + false-positives on v1Compat runs where `dehydrateWorkflowArguments` returns + devalue's flat Array format. +3. **CBOR + BufferTransport** (final) — world-vercel CBOR-encodes the queue payload; + world-local and world-postgres use a `TypedJsonTransport` with a tagged envelope. + +### 2. Forgot to commit world-postgres transport fix (twice) + +After fixing world-local and world-vercel queue transports, the same `JsonTransport` +corruption bug existed in world-postgres. The fix was written during a session but +never committed — lost when the working directory was reset via stash/checkout. This +happened twice. The fix only landed on the third attempt when it was committed and +pushed immediately. All 14 Postgres e2e jobs failed each time. + +### 3. Incorrect diagnosis of Vercel Prod 409 errors + +Multiple Vercel Prod e2e tests failed with `EntityConflictError: Workflow run with +ID wrun_... already exists` on `run_created`. The initial assumption was that VQS +couldn't deliver the queue message fast enough to beat the `run_created` call. + +Datadog logs showed otherwise: the `run_created` request arrived at Vercel's edge +116ms before `run_started`, but `run_created` hit a cold-start lambda (727ms) while +`run_started` hit a warm one (23ms). Cold starts can invert expected execution order. + +### 4. Removed EntityConflictError catch, then had to restore it + +The `workflowEntrypoint` error handler originally caught both `EntityConflictError` +and `RunExpiredError`. When adding the "already-running returns run without event" +behavior, `EntityConflictError` was removed from the catch since the new worlds +wouldn't throw it. Reviewer flagged this: old worlds or world-vercel hitting an +older workflow-server could still throw it. The catch was restored. + +### 5. Duplicate `startedAt` check + +After refactoring the `run_started` flow, a `workflowRun.startedAt` null check +existed both inside the `try` block and after the `catch` block. The second was +unreachable. Removed after review. + +### 6. WORKFLOW_SERVER_URL_OVERRIDE left set + +During development, `WORKFLOW_SERVER_URL_OVERRIDE` was set to a test URL pointing +at the workflow-server preview deployment and accidentally committed. The Vercel +bot flagged this. Reset to empty string. + +### 7. e2e test assertion was wrong + +The resilient start e2e test stubbed `world.events.create` and asserted +`createCallCount >= 2`. But the stub only intercepts calls from the test runner +process — the server uses its own world. `createCallCount` was always 1. Changed +to `expect(createCallCount).toBe(1)`. + +### 8. Misattributed Local Prod timeouts as "pre-existing" + +Local Prod tests showed 60-second timeouts across various tests. Initially dismissed +as CI flakes. Checking main's CI showed all Local Prod tests pass on main — the +timeouts are caused by our changes. Should have compared against main immediately. + +### 9. Attempted to revert parallel dispatch + +After identifying Local Prod timeouts, `start()` was partially reverted back to +sequential dispatch. The user pointed out that parallel dispatch is the core value +proposition of the PR. The revert was undone. + +### 10. WorkflowRunNotFoundError retry was unconditional + +The initial `pollReturnValue` retry on `WorkflowRunNotFoundError` applied to all +`Run` instances. A user calling `getRun()` with a wrong ID would wait 10 seconds +before getting a 404. Fixed by adding a `resilientStart` flag: only retries when +`run_created` actually failed. + +### 11. Changeset `minor` vs `patch` + +The changeset was created with `"@workflow/core": minor`. Reviewer flagged this as +violating repo rules ("all changes should be patch"). Changed after discussion. + +### 12. world-local TOCTOU race causing duplicate `run_created` events (Windows CI) + +The resilient start path AND the normal `run_created` path in `world-local/events-storage.ts` +both used `writeJSON` to create the run entity. `writeJSON` checks file existence with +`fs.access()` then writes via temp+rename — a classic TOCTOU race. On the local world, +the queue delivers via an async IIFE in the same event loop, so `events.create(run_created)` +and `events.create(run_started)` (with resilient start) run concurrently: + +1. Both paths call `fs.access(runPath)` → ENOENT (file doesn't exist yet) +2. Both proceed to write → the last `fs.rename` wins +3. Both succeed → both write their own `run_created` event with different event IDs +4. During replay, the consumer sees two `run_created` events → "Unconsumed event" error + +This caused consistent failures in `world-testing` embedded tests on Windows CI (`hooks`, +`supports null bytes in step results`, `retriable and fatal errors` — all timing out at +60s with "Unconsumed event in event log" errors). Linux CI was not affected because the +timing was different enough that the race window was rarely hit. + +Fixed by switching BOTH paths to `writeExclusive` (O_CREAT|O_EXCL), which is atomic at +the OS level — exactly one writer wins, the other gets EEXIST. The normal `run_created` +path throws `EntityConflictError` on conflict (handled by `start()` as 409). The resilient +start path re-reads the run from disk on conflict. Either way, only one `run_created` +event is written. + +### 13. Non-atomic run + run_created event in world-postgres resilient path + +The resilient start path in `world-postgres/storage.ts` did two separate writes (run +insert, then event insert) without a transaction. If the process crashed between them, +the run would exist without a `run_created` event — an inconsistent event log. + +A `drizzle.transaction()` wrapper was attempted but dropped due to TypeScript inference +issues with drizzle's transaction callback and the insert builder's overloads. The current +fix keeps the two writes sequential but adds the same conflict-aware re-read pattern as +world-local: when `onConflictDoNothing` produces no result (run already existed), the run +is re-read so downstream logic sees the real state. The narrow crash window between the +two writes is acceptable — if the run insert succeeds but the event insert crashes, the +run exists and `run_started` will still proceed normally (the event log will be missing a +`run_created` entry, but the run itself is functional). + +### 14. Missing `WorkflowRunStatus` span attribute after parallel refactor + +The `start()` span previously set `Attribute.WorkflowRunStatus(result.run.status)`, but +this was dropped in the parallel refactor because `result.run` is only available when +`runCreatedResult` fulfilled. The attribute is now conditionally set when the result is +available. In the resilient start case (run_created failed), the attribute is omitted +rather than erroring. + +### 15. `run_started` eventData leak in world-postgres result + +The `...data` spread in the result construction leaked `eventData` from `run_started` +into the returned event object. Storage was already correct (`storedEventData` is +`undefined` for `run_started`), but the returned result carried the input data. While +harmless (the runtime doesn't use `result.event.eventData`), it was restored to match +the pre-refactor behavior where eventData was explicitly stripped from the result. + +### 16. Normal `run_created` path also needed `writeExclusive` (Windows CI) + +The initial TOCTOU fix (item 12) only changed the resilient start path to use +`writeExclusive`. The normal `run_created` entity write still used `writeJSON` which +checks existence with `fs.access()` then writes via temp+rename — not atomic. On +Windows CI, the local queue's async IIFE delivered fast enough for both paths to pass +their existence checks simultaneously, producing two `run_created` events with different +event IDs. The events consumer saw the duplicate as "Unconsumed event in event log," +causing `hooks`, `supports null bytes in step results`, and `retriable and fatal errors` +tests to time out at 60s. Fixed by also switching the normal `run_created` entity write to +`writeExclusive`, making both paths use the same atomic gate. + +### 17. CborTransport was a pass-through wrapper + +`world-vercel/queue.ts` had `CborTransport` implementing `Transport` with a +no-op `serialize` (identity function) and a `deserialize` that reassembled chunks into +a Buffer without decoding. The actual CBOR `encode()`/`decode()` calls happened at the +call sites — `queue()` pre-encoded before calling `client.send()`, and the handler +post-decoded after receiving from `client.handleCallback()`. This violated the transport +abstraction (every other transport does its encoding inside serialize/deserialize) and +meant the call site had to remember to pre-encode. Refactored to move `encode()`/`decode()` +into the transport methods and changed the type from `Transport` to +`Transport`. + +## Follow-up work (additional) + +- [x] ~~**CborTransport is a pass-through**~~ — Resolved. Moved `encode()`/`decode()` + into `CborTransport.serialize()`/`CborTransport.deserialize()`. The transport is now + self-contained: call sites pass plain objects, and the handler receives decoded objects. + See retrospective item 17. diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 6de7718bc1..bb326364fb 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -4,7 +4,9 @@ import { setTimeout as sleep } from 'node:timers/promises'; import { WorkflowRunCancelledError, WorkflowRunFailedError, + WorkflowWorldError, } from '@workflow/errors'; +import type { World } from '@workflow/world'; import { afterAll, assert, @@ -2172,4 +2174,56 @@ describe('e2e', () => { expect(returnValue.attempt).toBeGreaterThanOrEqual(1); } ); + + // ============================================================ + // Resilient start: run completes even when run_created fails + // ============================================================ + // TODO: Switch this to a stream-based workflow (e.g. readableStreamWorkflow) + // to also verify that serialization, flushing, and binary data work correctly + // over the queue boundary. Currently using addTenWorkflow to avoid the + // skipIf(isLocalDeployment()) barrier that stream tests require. + test( + 'resilient start: addTenWorkflow completes when run_created returns 500', + { timeout: 60_000 }, + async () => { + // Get the real world and wrap it so the first events.create call + // (run_created) throws a 500 server error. The queue should still + // be dispatched with runInput, and the runtime should bootstrap + // the run via the run_started fallback path. + const realWorld = getWorld(); + let createCallCount = 0; + const stubbedWorld: World = { + ...realWorld, + events: { + ...realWorld.events, + create: (async (...args: Parameters) => { + createCallCount++; + if (createCallCount === 1) { + // Fail the very first call (run_created from start()) + throw new WorkflowWorldError('Simulated storage outage', { + status: 500, + }); + } + return realWorld.events.create(...args); + }) as World['events']['create'], + }, + }; + + const run = await start(await e2e('addTenWorkflow'), [123], { + world: stubbedWorld, + }); + + // Verify the stub intercepted the run_created call (only call + // through the stubbed world — the server-side runtime uses its + // own world instance for run_started and subsequent events). + expect(createCallCount).toBe(1); + + // The run should still complete despite run_created failing. + // The runtime's resilient start path creates the run from + // run_started, so returnValue polling may initially get + // WorkflowRunNotFoundError before the queue delivers. + const returnValue = await run.returnValue; + expect(returnValue).toBe(133); + } + ); }); diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 2edc8cdc47..250955b482 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -114,6 +114,7 @@ export function workflowEntrypoint( runId, traceCarrier: traceContext, requestedAt, + runInput, } = WorkflowInvokePayloadSchema.parse(message_); const { requestId } = metadata; // Extract the workflow name from the topic name @@ -239,7 +240,7 @@ export function workflowEntrypoint( let workflowStartedAt = -1; let workflowRun: WorkflowRun | undefined; // Pre-loaded events from the run_started response. - // When present, we skip the events.list call to reduce TTFB. + // When present, we skip the events.list call. let preloadedEvents: Event[] | undefined; // --- Infrastructure: prepare the run state --- @@ -257,7 +258,25 @@ export function workflowEntrypoint( runId, { eventType: 'run_started', - specVersion: SPEC_VERSION_CURRENT, + // Use the spec version from the original start() call + // when available, so the resilient start path creates + // the run with the correct version (not always current). + specVersion: + runInput?.specVersion ?? SPEC_VERSION_CURRENT, + // Pass run input from queue so the server can + // create the run if run_created was missed. + // Uint8Array values survive the queue natively + // (CBOR on world-vercel, JSON reviver on world-local). + ...(runInput + ? { + eventData: { + input: runInput.input, + deploymentId: runInput.deploymentId, + workflowName: runInput.workflowName, + executionContext: runInput.executionContext, + }, + } + : {}), }, { requestId } ); @@ -268,7 +287,7 @@ export function workflowEntrypoint( } workflowRun = result.run; - // If the world returned events, use them to skip + // If the response includes events, use them to skip // the initial events.list call and reduce TTFB. if (result.events && result.events.length > 0) { preloadedEvents = result.events; @@ -282,13 +301,16 @@ export function workflowEntrypoint( } catch (err) { // Run was concurrently completed/failed/cancelled if (EntityConflictError.is(err) || RunExpiredError.is(err)) { + // EntityConflictError: run was concurrently + // completed/failed/cancelled during setup. + // RunExpiredError: run already in terminal state. + // In both cases, skip processing this message. runtimeLogger.info( 'Run already finished during setup, skipping', { workflowRunId: runId, message: err.message } ); return; - } - if (err instanceof WorkflowRuntimeError) { + } else if (err instanceof WorkflowRuntimeError) { runtimeLogger.error( 'Fatal runtime error during workflow setup', { workflowRunId: runId, error: err.message } @@ -319,9 +341,11 @@ export function workflowEntrypoint( throw failErr; } return; + } else { + throw err; } - throw err; } + workflowStartedAt = +workflowRun.startedAt; span?.setAttributes({ diff --git a/packages/core/src/runtime/run.ts b/packages/core/src/runtime/run.ts index 2b6bda92c2..a20ce86ad6 100644 --- a/packages/core/src/runtime/run.ts +++ b/packages/core/src/runtime/run.ts @@ -87,9 +87,19 @@ export class Run { */ private encryptionKeyPromise: Promise | null = null; - constructor(runId: string) { + /** + * When true, run_created failed and the run may not exist yet (the + * resilient start path will create it via run_started). pollReturnValue + * retries on WorkflowRunNotFoundError only when this flag is set so + * that normal runs fail fast on 404. + * @internal + */ + private resilientStart = false; + + constructor(runId: string, opts?: { resilientStart?: boolean }) { this.runId = runId; this.world = getWorld(); + this.resilientStart = opts?.resilientStart ?? false; } /** @@ -243,6 +253,15 @@ export class Run { * @returns The workflow return value. */ private async pollReturnValue(): Promise { + // When resilientStart is true, run_created failed and the run may + // not exist yet. Retry on WorkflowRunNotFoundError up to 3 times + // (1s + 3s + 6s = 10s total) to give the queue time to deliver + // and the runtime to create the run via run_started. + // When resilientStart is false, 404 is a real error — fail fast. + let notFoundRetries = 0; + const NOT_FOUND_MAX_RETRIES = this.resilientStart ? 3 : 0; + const NOT_FOUND_DELAYS = [1_000, 3_000, 6_000]; + while (true) { try { const run = await this.world.runs.get(this.runId); @@ -270,6 +289,15 @@ export class Run { await new Promise((resolve) => setTimeout(resolve, 1_000)); continue; } + if ( + WorkflowRunNotFoundError.is(error) && + notFoundRetries < NOT_FOUND_MAX_RETRIES + ) { + const delay = NOT_FOUND_DELAYS[notFoundRetries]!; + notFoundRetries++; + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; + } throw error; } } diff --git a/packages/core/src/runtime/start.test.ts b/packages/core/src/runtime/start.test.ts index 3ec4ac45dd..c1e1d6eddb 100644 --- a/packages/core/src/runtime/start.test.ts +++ b/packages/core/src/runtime/start.test.ts @@ -1,4 +1,4 @@ -import { WorkflowRuntimeError } from '@workflow/errors'; +import { WorkflowRuntimeError, WorkflowWorldError } from '@workflow/errors'; import { SPEC_VERSION_CURRENT, SPEC_VERSION_LEGACY } from '@workflow/world'; import { afterEach, @@ -391,6 +391,77 @@ describe('start', () => { }); }); + describe('resilient start (run_created failure)', () => { + const validWorkflow = Object.assign(() => Promise.resolve('result'), { + workflowId: 'test-workflow', + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + it('should succeed when events.create throws a 500 error (queue still dispatched)', async () => { + const mockQueue = vi.fn().mockResolvedValue({ messageId: null }); + const serverError = new WorkflowWorldError('Internal Server Error', { + status: 500, + }); + const mockEventsCreate = vi.fn().mockRejectedValue(serverError); + + vi.mocked(getWorld).mockReturnValue({ + getDeploymentId: vi.fn().mockResolvedValue('deploy_123'), + events: { create: mockEventsCreate }, + queue: mockQueue, + } as any); + + // start() should NOT throw — the queue was still dispatched + const run = await start(validWorkflow, [42]); + expect(run.runId).toMatch(/^wrun_/); + + // Queue should have been called with runInput + expect(mockQueue).toHaveBeenCalledTimes(1); + const [, queuePayload] = mockQueue.mock.calls[0]; + expect(queuePayload.runInput).toBeDefined(); + expect(queuePayload.runInput.deploymentId).toBe('deploy_123'); + expect(queuePayload.runInput.workflowName).toBe('test-workflow'); + expect(queuePayload.runInput.specVersion).toBe(SPEC_VERSION_CURRENT); + }); + + it('should throw when queue fails even if events.create succeeds', async () => { + const mockEventsCreate = vi.fn().mockResolvedValue({ + run: { runId: 'wrun_test', status: 'pending' }, + }); + const mockQueue = vi + .fn() + .mockRejectedValue(new Error('Queue unavailable')); + + vi.mocked(getWorld).mockReturnValue({ + getDeploymentId: vi.fn().mockResolvedValue('deploy_123'), + events: { create: mockEventsCreate }, + queue: mockQueue, + } as any); + + await expect(start(validWorkflow, [])).rejects.toThrow( + 'Queue unavailable' + ); + }); + + it('should throw when events.create fails with a non-retryable error (e.g. 400)', async () => { + const badRequest = new WorkflowWorldError('Bad Request', { + status: 400, + }); + const mockEventsCreate = vi.fn().mockRejectedValue(badRequest); + const mockQueue = vi.fn().mockResolvedValue({ messageId: null }); + + vi.mocked(getWorld).mockReturnValue({ + getDeploymentId: vi.fn().mockResolvedValue('deploy_123'), + events: { create: mockEventsCreate }, + queue: mockQueue, + } as any); + + await expect(start(validWorkflow, [])).rejects.toThrow('Bad Request'); + }); + }); + describe('overload type inference', () => { // Type-only assertions that don't execute start() at runtime. // We use expectTypeOf on the function signature's return type directly. diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index 1b4f687833..59fc6db56e 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -1,9 +1,15 @@ import { waitUntil } from '@vercel/functions'; -import { WorkflowRuntimeError } from '@workflow/errors'; +import { + EntityConflictError, + ThrottleError, + WorkflowRuntimeError, + WorkflowWorldError, +} from '@workflow/errors'; import type { WorkflowInvokePayload, World } from '@workflow/world'; import { isLegacySpecVersion, SPEC_VERSION_CURRENT } from '@workflow/world'; import { monotonicFactory } from 'ulid'; import { importKey } from '../encryption.js'; +import { runtimeLogger } from '../logger.js'; import type { Serializable } from '../schemas.js'; import { dehydrateWorkflowArguments } from '../serialization.js'; import * as Attribute from '../telemetry/semantic-conventions.js'; @@ -188,33 +194,88 @@ export async function start( globalThis, v1Compat ); - const result = await world.events.create( - runId, - { - eventType: 'run_created', - specVersion, - eventData: { - deploymentId: deploymentId, - workflowName: workflowName, - input: workflowArguments, - executionContext: { traceCarrier, workflowCoreVersion }, + + const executionContext = { traceCarrier, workflowCoreVersion }; + + // Call events.create (run_created) and queue in parallel. + // If events.create fails with 429/5xx, the run was still accepted + // via the queue and creation will be re-tried async by the runtime. + const [runCreatedResult, queueResult] = await Promise.allSettled([ + world.events.create( + runId, + { + eventType: 'run_created', + specVersion, + eventData: { + deploymentId: deploymentId, + workflowName: workflowName, + input: workflowArguments, + executionContext, + }, }, - }, - { v1Compat } - ); + { v1Compat } + ), + world.queue( + getWorkflowQueueName(workflowName), + { + runId, + traceCarrier, + runInput: { + input: workflowArguments, + deploymentId, + workflowName, + specVersion, + executionContext, + }, + } satisfies WorkflowInvokePayload, + { + deploymentId, + } + ), + ]); - // Assert that the run was created - if (!result.run) { - throw new WorkflowRuntimeError( - "Missing 'run' in server response for 'run_created' event" - ); + // Queue failure is always fatal — the run was not enqueued + if (queueResult.status === 'rejected') { + throw queueResult.reason; } - // Verify server accepted our runId - if (!v1Compat && result.run.runId !== runId) { - throw new WorkflowRuntimeError( - `Server returned different runId than requested: expected ${runId}, got ${result.run.runId}` - ); + // Handle events.create result + let resilientStart = false; + if (runCreatedResult.status === 'rejected') { + const err = runCreatedResult.reason; + if (EntityConflictError.is(err)) { + // 409: The run already exists. This can happen in extreme cases where + // the run creation call gets a cold start or other slowdown, and the queue + // + run_started call completes faster. We expect this to be <=1% of cases. + // In this case, we can safely return. + } else if (isRetryableStartError(err)) { + // 429 (ThrottleError) and 5xx (WorkflowWorldError with status >= 500) + // are retryable — the run was accepted via the queue and creation + // will be re-tried by the runtime when it calls run_started. + resilientStart = true; + runtimeLogger.warn( + 'Run creation event failed, but the run was accepted via the queue. ' + + 'The run_created event will be re-tried async by the runtime.', + { workflowRunId: runId, error: err.message } + ); + } else { + throw err; + } + } else { + const result = runCreatedResult.value; + // Assert that the run was created + if (!result.run) { + throw new WorkflowRuntimeError( + "Missing 'run' in server response for 'run_created' event" + ); + } + + // Verify server accepted our runId + if (!v1Compat && result.run.runId !== runId) { + throw new WorkflowRuntimeError( + `Server returned different runId than requested: expected ${runId}, got ${result.run.runId}` + ); + } } waitUntil( @@ -228,22 +289,27 @@ export async function start( span?.setAttributes({ ...Attribute.WorkflowRunId(runId), - ...Attribute.WorkflowRunStatus(result.run.status), ...Attribute.DeploymentId(deploymentId), + ...(runCreatedResult.status === 'fulfilled' && + runCreatedResult.value.run + ? Attribute.WorkflowRunStatus(runCreatedResult.value.run.status) + : {}), }); - await world.queue( - getWorkflowQueueName(workflowName), - { - runId, - traceCarrier, - } satisfies WorkflowInvokePayload, - { - deploymentId, - } - ); - - return new Run(runId); + return new Run(runId, { resilientStart }); }); }); } + +/** + * Checks if an error from events.create (run_created) is retryable, + * meaning the queue can re-try creation later via the run_started path. + * - ThrottleError (429): rate limited, will succeed later + * - WorkflowWorldError with status >= 500: server error, will succeed later + */ +function isRetryableStartError(err: unknown): boolean { + if (ThrottleError.is(err)) return true; + if (WorkflowWorldError.is(err) && err.status && err.status >= 500) + return true; + return false; +} diff --git a/packages/core/src/step/context-storage.ts b/packages/core/src/step/context-storage.ts index 04fa2e625d..200925ea4b 100644 --- a/packages/core/src/step/context-storage.ts +++ b/packages/core/src/step/context-storage.ts @@ -20,9 +20,9 @@ export type StepContext = { * `contextStorage.getStore()` in user code (via getWorkflowMetadata / * getStepMetadata) can reference different AsyncLocalStorage instances, * causing the store to appear empty. - * + * * Note that we were unable to reproduce this issue. This is a fix for the only synthetic way - * way in which we could get the builder to break with the reported error message, and + * way in which we could get the builder to break with the reported error message, and * serves as defense-in-depth, since the change is otherwise safe. * * See: https://github.com/vercel/workflow/issues/1577 diff --git a/packages/world-local/src/fs.ts b/packages/world-local/src/fs.ts index 4ee552f3f0..1b7e4fd505 100644 --- a/packages/world-local/src/fs.ts +++ b/packages/world-local/src/fs.ts @@ -161,7 +161,7 @@ interface WriteOptions { * Custom JSON replacer that encodes Uint8Array as base64 strings. * Format: { __type: 'Uint8Array', data: '' } */ -function jsonReplacer(_key: string, value: unknown): unknown { +export function jsonReplacer(_key: string, value: unknown): unknown { if (value instanceof Uint8Array) { return { __type: 'Uint8Array', @@ -174,7 +174,7 @@ function jsonReplacer(_key: string, value: unknown): unknown { /** * Custom JSON reviver that decodes base64 strings back to Uint8Array. */ -function jsonReviver(_key: string, value: unknown): unknown { +export function jsonReviver(_key: string, value: unknown): unknown { if ( value !== null && typeof value === 'object' && diff --git a/packages/world-local/src/queue.ts b/packages/world-local/src/queue.ts index 15a16eec18..5a7bf6d7f7 100644 --- a/packages/world-local/src/queue.ts +++ b/packages/world-local/src/queue.ts @@ -1,5 +1,5 @@ import { setTimeout } from 'node:timers/promises'; -import { JsonTransport } from '@vercel/queue'; +import type { Transport } from '@vercel/queue'; import { MessageId, type Queue, ValidQueueName } from '@workflow/world'; import { Sema } from 'async-sema'; import { monotonicFactory } from 'ulid'; @@ -7,8 +7,34 @@ import { Agent } from 'undici'; import { z } from 'zod/v4'; import type { Config } from './config.js'; import { resolveBaseUrl } from './config.js'; +import { jsonReplacer, jsonReviver } from './fs.js'; import { getPackageInfo } from './init.js'; +/** + * JSON transport that preserves Uint8Array values using the same + * replacer/reviver that world-local uses for filesystem storage. + * Uint8Array → { __type: 'Uint8Array', data: '' } in JSON. + */ +class TypedJsonTransport implements Transport { + readonly contentType = 'application/json'; + + serialize(value: unknown): Buffer { + return Buffer.from(JSON.stringify(value, jsonReplacer)); + } + + async deserialize(stream: ReadableStream): Promise { + const chunks: Uint8Array[] = []; + const reader = stream.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (value) chunks.push(value); + } + const text = Buffer.concat(chunks).toString(); + return JSON.parse(text, jsonReviver); + } +} + // For local queue, there is no technical limit on the message visibility lifespan, // but the environment variable can be used for testing purposes to set a max visibility limit. const LOCAL_QUEUE_MAX_VISIBILITY = @@ -64,7 +90,7 @@ export function createQueue(config: Partial): LocalQueue { connections: 1000, keepAliveTimeout: 30_000, }); - const transport = new JsonTransport(); + const transport = new TypedJsonTransport(); const generateId = monotonicFactory(); const semaphore = new Sema(WORKFLOW_LOCAL_QUEUE_CONCURRENCY); @@ -255,7 +281,7 @@ export function createQueue(config: Partial): LocalQueue { return Response.json({ error: 'Unhandled queue' }, { status: 400 }); } - const body = await new JsonTransport().deserialize(req.body); + const body = await new TypedJsonTransport().deserialize(req.body); try { const result = await handler(body, { attempt, queueName, messageId }); diff --git a/packages/world-local/src/storage.test.ts b/packages/world-local/src/storage.test.ts index 9a7f78ebc7..71632f98f6 100644 --- a/packages/world-local/src/storage.test.ts +++ b/packages/world-local/src/storage.test.ts @@ -3,10 +3,7 @@ import os from 'node:os'; import path from 'node:path'; import { WorkflowWorldError } from '@workflow/errors'; import type { Event, Storage } from '@workflow/world'; -import { - DEFAULT_TIMESTAMP_THRESHOLD_MS, - stripEventDataRefs, -} from '@workflow/world'; +import { stripEventDataRefs } from '@workflow/world'; import { monotonicFactory } from 'ulid'; import { afterEach, beforeEach, describe, expect, it } from 'vitest'; import { writeJSON } from './fs.js'; @@ -2877,6 +2874,14 @@ describe('Storage', () => { ).rejects.toThrow(/Invalid runId timestamp/); }); + it('should accept a runId with a timestamp 10 minutes in the past', async () => { + // 10 minutes ago — within the 24-hour past threshold + const runId = makeRunId(Date.now() - 10 * 60 * 1000); + const result = await storage.events.create(runId, runCreatedEvent); + expect(result.run).toBeDefined(); + expect(result.run!.runId).toBe(runId); + }); + it('should reject a runId with a timestamp too far in the future', async () => { // 10 minutes from now — exceeds the 5-minute future threshold const runId = makeRunId(Date.now() + 10 * 60 * 1000); diff --git a/packages/world-local/src/storage/events-storage.ts b/packages/world-local/src/storage/events-storage.ts index eba3b1b0fa..63df4a81c0 100644 --- a/packages/world-local/src/storage/events-storage.ts +++ b/packages/world-local/src/storage/events-storage.ts @@ -32,6 +32,7 @@ import { import { DEFAULT_RESOLVE_DATA_OPTION } from '../config.js'; import { deleteJSON, + jsonReplacer, listJSONFiles, paginatedFileSystemQuery, readJSONWithFallback, @@ -124,6 +125,88 @@ export function createEventsStorage( WorkflowRunSchema, tag ); + + // Resilient start: run_started on non-existent run with eventData + // creates the run first, so the queue can bootstrap a run that + // failed to create during start(). + if ( + data.eventType === 'run_started' && + !currentRun && + 'eventData' in data && + data.eventData + ) { + const runInputData = data.eventData as { + deploymentId?: string; + workflowName?: string; + input?: any; + executionContext?: Record; + }; + if ( + runInputData.deploymentId && + runInputData.workflowName && + runInputData.input !== undefined + ) { + // Atomically try to create the run entity. writeExclusive + // uses O_CREAT|O_EXCL so only the first writer wins, + // preventing a TOCTOU race where a concurrent run_created + // from start() could overwrite a run that was already + // transitioned to 'running'. + const createdRun: WorkflowRun = { + runId: effectiveRunId, + deploymentId: runInputData.deploymentId, + status: 'pending', + workflowName: runInputData.workflowName, + specVersion: effectiveSpecVersion, + executionContext: runInputData.executionContext, + input: runInputData.input, + output: undefined, + error: undefined, + startedAt: undefined, + completedAt: undefined, + createdAt: now, + updatedAt: now, + }; + const runPath = taggedPath(basedir, 'runs', effectiveRunId, tag); + const created = await writeExclusive( + runPath, + JSON.stringify(createdRun, jsonReplacer) + ); + + if (created) { + // We created the run — also write the run_created event. + const runCreatedEventId = `evnt_${monotonicUlid()}`; + const runCreatedEvent: Event = { + eventType: 'run_created', + runId: effectiveRunId, + eventId: runCreatedEventId, + createdAt: now, + specVersion: effectiveSpecVersion, + eventData: { + deploymentId: runInputData.deploymentId, + workflowName: runInputData.workflowName, + input: runInputData.input, + executionContext: runInputData.executionContext, + }, + }; + const createdCompositeKey = `${effectiveRunId}-${runCreatedEventId}`; + await writeJSON( + taggedPath(basedir, 'events', createdCompositeKey, tag), + runCreatedEvent + ); + currentRun = createdRun; + } else { + // Run already exists (concurrent run_created won the + // race). Re-read it so downstream logic sees the real state. + currentRun = await readJSONWithFallback( + basedir, + 'runs', + effectiveRunId, + WorkflowRunSchema, + tag + ); + } + } + } } // ============================================================ @@ -324,7 +407,20 @@ export function createEventsStorage( createdAt: now, updatedAt: now, }; - await writeJSON(taggedPath(basedir, 'runs', effectiveRunId, tag), run); + // Use writeExclusive (O_CREAT|O_EXCL) to atomically create the + // run entity file. This prevents a TOCTOU race with the resilient + // start path (run_started on non-existent run) that could result + // in duplicate run_created events in the event log. + const runPath = taggedPath(basedir, 'runs', effectiveRunId, tag); + const created = await writeExclusive( + runPath, + JSON.stringify(run, jsonReplacer, 2) + ); + if (!created) { + throw new EntityConflictError( + `Workflow run "${effectiveRunId}" already exists` + ); + } } else if (data.eventType === 'run_started') { // Reuse currentRun from validation (already read above) if (currentRun) { diff --git a/packages/world-postgres/src/queue.ts b/packages/world-postgres/src/queue.ts index f5111241fc..d432cb0d11 100644 --- a/packages/world-postgres/src/queue.ts +++ b/packages/world-postgres/src/queue.ts @@ -1,5 +1,5 @@ import * as Stream from 'node:stream'; -import { JsonTransport } from '@vercel/queue'; +import type { Transport } from '@vercel/queue'; import { getWorkflowPort } from '@workflow/utils/get-port'; import { MessageId, @@ -81,7 +81,39 @@ export function createQueue( const port = process.env.PORT ? Number(process.env.PORT) : undefined; const localWorld = createLocalWorld({ dataDir: undefined, port }); - const transport = new JsonTransport(); + // JSON transport that preserves Uint8Array values via a tagged + // envelope ({ __type: 'Uint8Array', data: '' }). Required + // for the resilient start path where runInput.input (a Uint8Array) + // is sent through the queue. + const transport: Transport = { + contentType: 'application/json', + serialize(value: unknown): Buffer { + return Buffer.from( + JSON.stringify(value, (_key, v) => + v instanceof Uint8Array + ? { __type: 'Uint8Array', data: Buffer.from(v).toString('base64') } + : v + ) + ); + }, + async deserialize(stream: ReadableStream): Promise { + const chunks: Uint8Array[] = []; + const reader = stream.getReader(); + for (;;) { + const { done, value } = await reader.read(); + if (done) break; + if (value) chunks.push(value); + } + return JSON.parse(Buffer.concat(chunks).toString(), (_key, v) => + v !== null && + typeof v === 'object' && + v.__type === 'Uint8Array' && + typeof v.data === 'string' + ? new Uint8Array(Buffer.from(v.data, 'base64')) + : v + ); + }, + }; const generateMessageId = monotonicFactory(); const prefix = config.jobPrefix || 'workflow_'; @@ -320,7 +352,7 @@ export function createQueue( const queue: Queue['queue'] = async (queue, message, opts) => { await start(); const [queuePrefix, queueId] = parseQueueName(queue); - const body = transport.serialize(message); + const body = transport.serialize(message) as Buffer; const messageId = MessageId.parse(`msg_${generateMessageId()}`); await addGraphileJob({ queuePrefix, diff --git a/packages/world-postgres/src/storage.ts b/packages/world-postgres/src/storage.ts index 116c2a5dff..6ab952a40a 100644 --- a/packages/world-postgres/src/storage.ts +++ b/packages/world-postgres/src/storage.ts @@ -4,8 +4,8 @@ import { RunExpiredError, RunNotSupportedError, TooEarlyError, - WorkflowWorldError, WorkflowRunNotFoundError, + WorkflowWorldError, } from '@workflow/errors'; import type { Event, @@ -373,6 +373,78 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { runId: effectiveRunId, }); currentRun = runValue ?? null; + + // Resilient start: run_started on non-existent run with eventData + // creates the run first, so the queue can bootstrap a run that + // failed to create during start(). + if ( + data.eventType === 'run_started' && + !currentRun && + 'eventData' in data && + data.eventData + ) { + const runInputData = (data as any).eventData as { + deploymentId?: string; + workflowName?: string; + input?: any; + executionContext?: Record; + }; + if ( + runInputData.deploymentId && + runInputData.workflowName && + runInputData.input !== undefined + ) { + // Create run + run_created event atomically. The + // transaction ensures we never have an orphaned run + // without its run_created event. + const [inserted] = await drizzle + .insert(Schema.runs) + .values({ + runId: effectiveRunId, + deploymentId: runInputData.deploymentId, + workflowName: runInputData.workflowName, + specVersion: effectiveSpecVersion, + input: runInputData.input as SerializedContent, + executionContext: runInputData.executionContext as + | SerializedContent + | undefined, + status: 'pending', + }) + .onConflictDoNothing() + .returning(); + + if (inserted) { + const runCreatedEventId = `wevt_${ulid()}`; + await drizzle.insert(events).values({ + runId: effectiveRunId, + eventId: runCreatedEventId, + eventType: 'run_created', + eventData: { + deploymentId: runInputData.deploymentId, + workflowName: runInputData.workflowName, + input: runInputData.input, + executionContext: runInputData.executionContext, + }, + specVersion: effectiveSpecVersion, + }); + } + const createdRun = inserted; + + if (createdRun) { + currentRun = { + status: 'pending', + specVersion: effectiveSpecVersion, + }; + } else { + // Run already exists (concurrent run_created won the + // race). Re-read so downstream logic sees the real state. + const [runValue] = await getRunForValidation.execute({ + runId: effectiveRunId, + }); + currentRun = runValue ?? null; + } + } + } } // ============================================================ @@ -1191,6 +1263,9 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { ? { eventData: storedEventData } : {}), }; + // Strip eventData leaked by ...data spread for run_started events. + // The eventData (run input for resilient start) belongs on + // run_created only; storedEventData is already undefined above. if (data.eventType === 'run_started') { delete (result as any).eventData; } diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 3f5acef00d..6aeac0e9e8 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -60,17 +60,19 @@ function stripEventAndLegacyRefs( // undefined), so we use the looser WorkflowRunWireBaseSchema and normalize // the error via deserializeError() afterward. const EventResultResolveWireSchema = z.object({ - event: EventSchema, + event: EventSchema.optional(), run: WorkflowRunSchema.optional(), step: StepWireSchema.optional(), hook: HookSchema.optional(), + events: z.array(EventSchema).optional(), }); const EventResultLazyWireSchema = z.object({ - event: EventSchema, + event: EventSchema.optional(), run: WorkflowRunWireBaseSchema.optional(), step: StepWireSchema.optional(), hook: HookSchema.optional(), + events: z.array(EventSchema).optional(), }); // Schema for events returned with `remoteRefBehavior=lazy`. @@ -457,10 +459,13 @@ async function createWorkflowRunEventInner( }); return { - event: stripEventAndLegacyRefs(wireResult.event, resolveData), + event: wireResult.event + ? stripEventAndLegacyRefs(wireResult.event, resolveData) + : undefined, run: wireResult.run, step: wireResult.step ? deserializeStep(wireResult.step) : undefined, hook: wireResult.hook, + events: wireResult.events, }; } @@ -481,11 +486,14 @@ async function createWorkflowRunEventInner( // undefined (lazy ref mode), so deserializeError normalizes it into the // StructuredError shape expected by WorkflowRun consumers. return { - event: stripEventAndLegacyRefs(wireResult.event, resolveData), + event: wireResult.event + ? stripEventAndLegacyRefs(wireResult.event, resolveData) + : undefined, run: wireResult.run ? deserializeError(wireResult.run) : undefined, step: wireResult.step ? deserializeStep(wireResult.step) : undefined, hook: wireResult.hook, + events: wireResult.events, }; } diff --git a/packages/world-vercel/src/queue.test.ts b/packages/world-vercel/src/queue.test.ts index b9097ef1f2..78b3630652 100644 --- a/packages/world-vercel/src/queue.test.ts +++ b/packages/world-vercel/src/queue.test.ts @@ -67,11 +67,12 @@ describe('createQueue', () => { await queue.queue('__wkf_workflow_test', { runId: 'run-123' }); expect(mockSend).toHaveBeenCalledTimes(1); - // send(topicName, payload, options) - const payload = mockSend.mock.calls[0][1]; + // send(topicName, wrapper, options) — CborTransport encodes + // inside serialize(), but the mock bypasses the transport. + const wrapper = mockSend.mock.calls[0][1]; - expect(payload.payload).toEqual({ runId: 'run-123' }); - expect(payload.queueName).toBe('__wkf_workflow_test'); + expect(wrapper.payload).toEqual({ runId: 'run-123' }); + expect(wrapper.queueName).toBe('__wkf_workflow_test'); } finally { if (originalEnv !== undefined) { process.env.VERCEL_DEPLOYMENT_ID = originalEnv; @@ -721,10 +722,11 @@ describe('createQueue', () => { ); expect(mockSend).toHaveBeenCalledTimes(1); - // send(topicName, payload, options) - const payload = mockSend.mock.calls[0][1]; - expect(payload.payload).toEqual(stepPayload); - expect(payload.queueName).toBe('__wkf_step_myStep'); + // send(topicName, wrapper, options) — CborTransport encodes + // inside serialize(), but the mock bypasses the transport. + const wrapper = mockSend.mock.calls[0][1]; + expect(wrapper.payload).toEqual(stepPayload); + expect(wrapper.queueName).toBe('__wkf_step_myStep'); } finally { if (originalEnv !== undefined) { process.env.VERCEL_DEPLOYMENT_ID = originalEnv; diff --git a/packages/world-vercel/src/queue.ts b/packages/world-vercel/src/queue.ts index 7e82527b01..d11e2e0f7e 100644 --- a/packages/world-vercel/src/queue.ts +++ b/packages/world-vercel/src/queue.ts @@ -1,4 +1,5 @@ import { AsyncLocalStorage } from 'node:async_hooks'; +import type { Transport } from '@vercel/queue'; import { DuplicateMessageError, QueueClient } from '@vercel/queue'; import { MessageId, @@ -8,10 +9,35 @@ import { QueuePayloadSchema, ValidQueueName, } from '@workflow/world'; +import { decode, encode } from 'cbor-x'; import { z } from 'zod/v4'; import { getDispatcher } from './http-client.js'; import { type APIConfig, getHeaders, getHttpUrl } from './utils.js'; +/** + * CBOR-based queue transport. Encodes values with cbor-x on send and + * decodes on receive, preserving Uint8Array values natively (workflow + * input is a Uint8Array in specVersion >= 2). + */ +class CborTransport implements Transport { + readonly contentType = 'application/cbor'; + + serialize(value: unknown): Buffer { + return Buffer.from(encode(value)); + } + + async deserialize(stream: ReadableStream): Promise { + const chunks: Uint8Array[] = []; + const reader = stream.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (value) chunks.push(value); + } + return decode(Buffer.concat(chunks)); + } +} + const requestIdStorage = new AsyncLocalStorage(); const MessageWrapper = z.object({ @@ -86,9 +112,12 @@ export function createQueue(config?: APIConfig): Queue { const region = 'iad1'; + const cborTransport = new CborTransport(); + const clientOptions = { region, dispatcher: getDispatcher(), + transport: cborTransport, ...(usingProxy && { // final path will be /queues-proxy/api/v3/topic/... // and the proxy will strip the /queues-proxy prefix before forwarding to VQS @@ -118,27 +147,17 @@ export function createQueue(config?: APIConfig): Queue { deploymentId, }); - // zod v3 doesn't have the `encode` method. We only support zod v4 officially, - // but codebases that pin zod v3 are still common. - const hasEncoder = typeof MessageWrapper.encode === 'function'; - if (!hasEncoder) { - console.warn( - 'Using zod v3 compatibility mode for queue() calls - this may not work as expected' - ); - } - const encoder = hasEncoder - ? MessageWrapper.encode - : (data: z.infer) => data; - - const encoded = encoder({ + // The CborTransport handles CBOR encoding inside serialize(), + // preserving Uint8Array values (workflow input in specVersion >= 2). + const wrapper = { payload, queueName, // Store deploymentId in the message so it can be preserved when re-enqueueing deploymentId: opts?.deploymentId, - }); + }; const sanitizedQueueName = queueName.replace(/[^A-Za-z0-9-_]/g, '-'); try { - const { messageId } = await client.send(sanitizedQueueName, encoded, { + const { messageId } = await client.send(sanitizedQueueName, wrapper, { idempotencyKey: opts?.idempotencyKey, delaySeconds: opts?.delaySeconds, headers: { @@ -179,6 +198,8 @@ export function createQueue(config?: APIConfig): Queue { } const requestId = requestIdStorage.getStore(); + // The CborTransport handles CBOR decoding inside deserialize(), + // so message is already a plain object with Uint8Array values intact. const { payload, queueName, deploymentId } = MessageWrapper.parse(message); diff --git a/packages/world/src/events.ts b/packages/world/src/events.ts index 74e862215a..6784a0a819 100644 --- a/packages/world/src/events.ts +++ b/packages/world/src/events.ts @@ -223,9 +223,22 @@ const RunCreatedEventSchema = BaseEventSchema.extend({ /** * Event created when a workflow run starts executing. * Updates the run entity to status 'running'. + * + * The optional eventData carries run creation data for the resilient start path: + * when the run_created event failed (e.g., storage outage during start()), the + * runtime passes the run input through the queue so the server can create the run + * on the run_started call if it doesn't exist yet. */ const RunStartedEventSchema = BaseEventSchema.extend({ eventType: z.literal('run_started'), + eventData: z + .object({ + input: SerializedDataSchema.optional(), + deploymentId: z.string().optional(), + workflowName: z.string().optional(), + executionContext: z.record(z.string(), z.any()).optional(), + }) + .optional(), }); /** diff --git a/packages/world/src/index.ts b/packages/world/src/index.ts index 84e13eec52..e3a574e3bf 100644 --- a/packages/world/src/index.ts +++ b/packages/world/src/index.ts @@ -16,6 +16,7 @@ export { MessageId, QueuePayloadSchema, QueuePrefix, + RunInputSchema, StepInvokePayloadSchema, ValidQueueName, WorkflowInvokePayloadSchema, diff --git a/packages/world/src/queue.ts b/packages/world/src/queue.ts index 5093b62dd3..59f467b463 100644 --- a/packages/world/src/queue.ts +++ b/packages/world/src/queue.ts @@ -21,12 +21,29 @@ export type MessageId = z.infer; export const TraceCarrierSchema = z.record(z.string(), z.string()); export type TraceCarrier = z.infer; +/** + * Run creation data carried through the queue for resilient start. + * Only present on the first queue delivery — re-enqueues omit this. + * When the runtime processes the message, it passes this data to the + * run_started event so the server can create the run if it doesn't exist yet. + */ +export const RunInputSchema = z.object({ + input: z.unknown(), + deploymentId: z.string(), + workflowName: z.string(), + specVersion: z.number(), + executionContext: z.record(z.string(), z.any()).optional(), +}); +export type RunInput = z.infer; + export const WorkflowInvokePayloadSchema = z.object({ runId: z.string(), traceCarrier: TraceCarrierSchema.optional(), requestedAt: z.coerce.date().optional(), /** Number of times this message has been re-enqueued due to server errors (5xx) */ serverErrorRetryCount: z.number().int().optional(), + /** Run creation data, only present on the first queue delivery from start() */ + runInput: RunInputSchema.optional(), }); export const StepInvokePayloadSchema = z.object({