From 7d259558ff0eeeab85a62a95fd962d35546974a6 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Sat, 2 May 2026 15:47:41 +0900 Subject: [PATCH] Add additional tests for event consumer fixes for hook/sleep/step race conditions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes a false-positive unconsumed event during replay of a `for await (const payload of hook) { await step() }` pattern. The EventsConsumer could advance to a step_created event before the workflow code registered the step consumer. The previous deferred unconsumed check chained onto the promise queue once and waited 100ms, but missed a second round of async work (hook payload deserialization) triggered by the first drain's resolve(). Fix: after the initial queue drain, yield to the event loop (setTimeout(0)) so microtask chains propagate (e.g., step resolve → for-await resumes → createHookPromise → new deserialization), then re-chain onto the latest queue before starting the 100ms timeout. Adds regression tests across workflow.test.ts, hook-sleep-interaction.test.ts, e2e.test.ts, and a new workbench example to cover hook/sleep/step race conditions. Co-Authored-By: Pranay Prakash Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/core/e2e/e2e.test.ts | 41 ++++ packages/core/src/events-consumer.ts | 10 +- .../core/src/hook-sleep-interaction.test.ts | 165 ++++++++++++++- packages/core/src/workflow.test.ts | 193 ++++++++++++++++++ workbench/example/workflows/99_e2e.ts | 37 ++++ 5 files changed, 444 insertions(+), 2 deletions(-) diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index a9809a552c..671063fa7a 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -2340,6 +2340,47 @@ describe('e2e', () => { } ); + test( + 'hookWithSleepFinalStepWorkflow - step only on final payload', + { timeout: 120_000 }, + async () => { + // Regression test for the v0chat incident. Mirrors the production + // shape: a hook + fire-and-forget sleep, where the step runs only + // once the final (done) payload arrives. Replay ends up with two + // `hook_received` events followed by a single `step_created`, which + // is the race window for the deferred unconsumed-event check. + const token = Math.random().toString(36).slice(2); + + const run = await start(await e2e('hookWithSleepFinalStepWorkflow'), [ + token, + ]); + + // Wait for the hook to register. + await new Promise((resolve) => setTimeout(resolve, 5_000)); + + let hook = await getHookByToken(token); + expect(hook.runId).toBe(run.runId); + await resumeHook(hook, { type: 'msg', id: 1 }); + + // Let the workflow replay and suspend before the next payload so the + // final event log contains two `hook_received` entries before any + // `step_created` — the exact replay shape from production. + await new Promise((resolve) => setTimeout(resolve, 3_000)); + + hook = await getHookByToken(token); + await resumeHook(hook, { type: 'final', id: 2, done: true }); + + const returnValue = await run.returnValue; + expect(returnValue).toEqual({ + seen: [1, 2], + finalResult: { processed: true, type: 'final', id: 2 }, + }); + + const { json: runData } = await cliInspectJson(`runs ${run.runId}`); + expect(runData.status).toBe('completed'); + } + ); + test( 'sleepInLoopWorkflow - sleep inside loop with steps actually delays each iteration', { timeout: 60_000 }, diff --git a/packages/core/src/events-consumer.ts b/packages/core/src/events-consumer.ts index 683d67888b..d00e18ec80 100644 --- a/packages/core/src/events-consumer.ts +++ b/packages/core/src/events-consumer.ts @@ -1,6 +1,14 @@ import type { Event } from '@workflow/world'; import { eventsLogger } from './logger.js'; +/** + * Delay before firing the deferred unconsumed-event check after the promise + * queue has drained. Must be long enough for cross-VM microtask chains to + * propagate (resolve in host → workflow code in VM → subscribe call back + * in host). Any subscribe() arriving during this window cancels the check. + */ +export const DEFERRED_CHECK_DELAY_MS = 100; + export enum EventConsumerResult { /** * Callback consumed the event, but should not be removed from the callbacks list @@ -138,7 +146,7 @@ export class EventsConsumer { this.pendingUnconsumedCheck = null; this.onUnconsumedEvent(currentEvent); } - }, 100); + }, DEFERRED_CHECK_DELAY_MS); }); } }; diff --git a/packages/core/src/hook-sleep-interaction.test.ts b/packages/core/src/hook-sleep-interaction.test.ts index a706628b81..67661aa028 100644 --- a/packages/core/src/hook-sleep-interaction.test.ts +++ b/packages/core/src/hook-sleep-interaction.test.ts @@ -1,3 +1,4 @@ +import { WorkflowRuntimeError } from '@workflow/errors'; import { withResolvers } from '@workflow/utils'; import type { Event } from '@workflow/world'; import * as nanoid from 'nanoid'; @@ -34,12 +35,23 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext { const ulid = monotonicFactory(() => context.globalThis.Math.random()); const workflowStartedAt = context.globalThis.Date.now(); const promiseQueueHolder = { current: Promise.resolve() }; + // Forward onUnconsumedEvent through ctx.onWorkflowError so tests that wire + // onWorkflowError to a discontinuation promise (see runWithDiscontinuation) + // actually observe false-positive unconsumed-event detections instead of + // silently dropping them. + const ctxRef: { current?: WorkflowOrchestratorContext } = {}; const ctx: WorkflowOrchestratorContext = { runId: 'wrun_test', encryptionKey: undefined, globalThis: context.globalThis, eventsConsumer: new EventsConsumer(events, { - onUnconsumedEvent: () => {}, + onUnconsumedEvent: (event) => { + ctxRef.current?.onWorkflowError( + new WorkflowRuntimeError( + `Unconsumed event in event log: eventType=${event.eventType}, correlationId=${event.correlationId}, eventId=${event.eventId}. This indicates a corrupted or invalid event log.` + ) + ); + }, getPromiseQueue: () => promiseQueueHolder.current, }), invocationsQueue: new Map(), @@ -56,6 +68,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext { }, pendingDeliveries: 0, }; + ctxRef.current = ctx; return ctx; } @@ -599,6 +612,156 @@ function defineTests(mode: 'sync' | 'async') { }); }); + describe(`hook + sleep with step per payload ${label}`, () => { + it('should not trigger unconsumed event error when for-await loop calls a step per hook payload', async () => { + // Reproduces CI failure: hookWithSleepWorkflow event log had alternating + // hook_received + step lifecycle events. During replay, the EventsConsumer + // advances past the second step_created before the for-await loop has + // called processPayload (and registered the step consumer). The deferred + // unconsumed check must wait for the new async work (hook payload + // deserialization) before declaring the event orphaned. + await setupHydrateMock(); + const ops: Promise[] = []; + const [payload1, payload2, stepResult1, stepResult2] = await Promise.all([ + dehydrateStepReturnValue( + { type: 'subscribe', id: 1 }, + 'wrun_test', + undefined, + ops + ), + dehydrateStepReturnValue( + { type: 'done', done: true }, + 'wrun_test', + undefined, + ops + ), + dehydrateStepReturnValue( + { processed: true, type: 'subscribe', id: 1 }, + 'wrun_test', + undefined, + ops + ), + dehydrateStepReturnValue( + { processed: true, type: 'done' }, + 'wrun_test', + undefined, + ops + ), + ]); + + const ctx = setupWorkflowContext([ + { + eventId: 'evnt_0', + runId: 'wrun_test', + eventType: 'hook_created', + correlationId: `hook_${CORR_IDS[0]}`, + eventData: { token: 'test-token', isWebhook: false }, + createdAt: new Date(), + }, + { + eventId: 'evnt_1', + runId: 'wrun_test', + eventType: 'wait_created', + correlationId: `wait_${CORR_IDS[1]}`, + eventData: { resumeAt: new Date('2099-01-01') }, + createdAt: new Date(), + }, + // First hook payload → step lifecycle + { + eventId: 'evnt_2', + runId: 'wrun_test', + eventType: 'hook_received', + correlationId: `hook_${CORR_IDS[0]}`, + eventData: { payload: payload1 }, + createdAt: new Date(), + }, + { + eventId: 'evnt_3', + runId: 'wrun_test', + eventType: 'step_created', + correlationId: `step_${CORR_IDS[2]}`, + eventData: { stepName: 'processPayload', input: payload1 }, + createdAt: new Date(), + }, + { + eventId: 'evnt_4', + runId: 'wrun_test', + eventType: 'step_started', + correlationId: `step_${CORR_IDS[2]}`, + eventData: {}, + createdAt: new Date(), + }, + { + eventId: 'evnt_5', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: `step_${CORR_IDS[2]}`, + eventData: { result: stepResult1 }, + createdAt: new Date(), + }, + // Second hook payload → step lifecycle + { + eventId: 'evnt_6', + runId: 'wrun_test', + eventType: 'hook_received', + correlationId: `hook_${CORR_IDS[0]}`, + eventData: { payload: payload2 }, + createdAt: new Date(), + }, + { + eventId: 'evnt_7', + runId: 'wrun_test', + eventType: 'step_created', + correlationId: `step_${CORR_IDS[3]}`, + eventData: { stepName: 'processPayload', input: payload2 }, + createdAt: new Date(), + }, + { + eventId: 'evnt_8', + runId: 'wrun_test', + eventType: 'step_started', + correlationId: `step_${CORR_IDS[3]}`, + eventData: {}, + createdAt: new Date(), + }, + { + eventId: 'evnt_9', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: `step_${CORR_IDS[3]}`, + eventData: { result: stepResult2 }, + createdAt: new Date(), + }, + ]); + + const createHook = createCreateHook(ctx); + const sleep = createSleep(ctx); + const useStep = createUseStep(ctx); + + const { result, error } = await runWithDiscontinuation(ctx, async () => { + const hook = createHook(); + void sleep('1d'); + + const processPayload = useStep<[any], any>('processPayload'); + const results: any[] = []; + + for await (const payload of hook) { + const processed = await processPayload(payload); + results.push(processed); + if ((payload as any).done) break; + } + + return results; + }); + + expect(error).toBeUndefined(); + expect(result).toEqual([ + { processed: true, type: 'subscribe', id: 1 }, + { processed: true, type: 'done' }, + ]); + }); + }); + describe(`hook only (no concurrent pending entity) ${label}`, () => { it('should deliver all hook payloads and reach step when no sleep or incomplete step exists', async () => { await setupHydrateMock(); diff --git a/packages/core/src/workflow.test.ts b/packages/core/src/workflow.test.ts index eca37555c8..7d321b267a 100644 --- a/packages/core/src/workflow.test.ts +++ b/packages/core/src/workflow.test.ts @@ -1,13 +1,16 @@ import { types } from 'node:util'; import { HookConflictError, WorkflowRuntimeError } from '@workflow/errors'; import type { Event, WorkflowRun } from '@workflow/world'; +import { monotonicFactory } from 'ulid'; import { assert, describe, expect, it, vi } from 'vitest'; +import { DEFERRED_CHECK_DELAY_MS } from './events-consumer.js'; import type { WorkflowSuspension } from './global.js'; import { dehydrateStepReturnValue, dehydrateWorkflowArguments, hydrateWorkflowReturnValue, } from './serialization.js'; +import { createContext } from './vm/index.js'; import { runWorkflow } from './workflow.js'; // No encryption key = encryption disabled @@ -4484,4 +4487,194 @@ describe('runWorkflow', () => { ); expect(hydrated).toBe('third'); }); + + it('should not trigger unconsumed event error for for-await hook loop + unawaited sleep when hydrate latency exceeds the deferred-check window', async () => { + // Regression test for the v0chat production incident: a workflow that + // combines `createHook()`, fire-and-forget `sleep()`, and a per-payload + // step inside `for await` would falsely reject with + // `Corrupted event log` during replay. When hydrate latency exceeds the + // EventsConsumer's deferred-check timer, the second round of async work + // (hydrate payload #2 → for-await resumes → step subscribe) completes + // after the timer fires, so step_created is briefly orphaned. + // + // Event log shape mirrors the production run: + // run_created, run_started, + // hook_created, wait_created, + // hook_received #1 (not-done), + // hook_received #2 (done) — buffered during replay, + // step_created — orphaned between hook_received #2 consume and step subscribe, + // step_started, step_completed + const ops: Promise[] = []; + const workflowRunId = 'wrun_test'; + const startedAt = new Date('2024-01-01T00:00:00.000Z'); + + const workflowRun: WorkflowRun = { + runId: workflowRunId, + workflowName: 'workflow', + status: 'running', + input: await dehydrateWorkflowArguments( + [], + workflowRunId, + noEncryptionKey, + ops + ), + createdAt: startedAt, + updatedAt: startedAt, + startedAt, + deploymentId: 'test-deployment', + }; + + // Derive deterministic correlation IDs using the same seeded ULID + // factory runWorkflow uses internally, so events match what the runtime + // expects. + const seed = `${workflowRunId}:${workflowRun.workflowName}:${+startedAt}`; + const vm = createContext({ seed, fixedTimestamp: +startedAt }); + const ulid = monotonicFactory(() => vm.globalThis.Math.random()); + const hookCorr = `hook_${ulid(+startedAt)}`; + const waitCorr = `wait_${ulid(+startedAt)}`; + const stepCorr = `step_${ulid(+startedAt)}`; + + const payload1 = await dehydrateStepReturnValue( + { done: false }, + workflowRunId, + noEncryptionKey, + ops + ); + const payload2 = await dehydrateStepReturnValue( + { done: true }, + workflowRunId, + noEncryptionKey, + ops + ); + const stepResult = await dehydrateStepReturnValue( + 'ok', + workflowRunId, + noEncryptionKey, + ops + ); + + const events: Event[] = [ + { + eventId: 'evnt-0', + runId: workflowRunId, + eventType: 'run_created', + createdAt: startedAt, + }, + { + eventId: 'evnt-1', + runId: workflowRunId, + eventType: 'run_started', + createdAt: startedAt, + }, + { + eventId: 'evnt-2', + runId: workflowRunId, + eventType: 'hook_created', + correlationId: hookCorr, + eventData: { token: 'tok', isWebhook: false }, + createdAt: startedAt, + }, + { + eventId: 'evnt-3', + runId: workflowRunId, + eventType: 'wait_created', + correlationId: waitCorr, + eventData: { resumeAt: new Date('2099-01-01') }, + createdAt: startedAt, + }, + { + eventId: 'evnt-4', + runId: workflowRunId, + eventType: 'hook_received', + correlationId: hookCorr, + eventData: { payload: payload1 }, + createdAt: startedAt, + }, + { + eventId: 'evnt-5', + runId: workflowRunId, + eventType: 'hook_received', + correlationId: hookCorr, + eventData: { payload: payload2 }, + createdAt: startedAt, + }, + { + eventId: 'evnt-STEP-CREATED', + runId: workflowRunId, + eventType: 'step_created', + correlationId: stepCorr, + eventData: { stepName: 'doStep', input: payload2 }, + createdAt: startedAt, + }, + { + eventId: 'evnt-7', + runId: workflowRunId, + eventType: 'step_started', + correlationId: stepCorr, + createdAt: startedAt, + }, + { + eventId: 'evnt-8', + runId: workflowRunId, + eventType: 'step_completed', + correlationId: stepCorr, + eventData: { result: stepResult }, + createdAt: startedAt, + }, + ]; + + // Mock hydrate with a delay larger than the deferred-check window. In + // production this corresponds to slower-than-expected encrypted payload + // decryption (cold cache, contended CPU, etc.); the fix must not rely on + // hydrate completing within the timer window. + const BUFFER_FOR_TEST_MS = 50; + const hydrateDelayMs = DEFERRED_CHECK_DELAY_MS + BUFFER_FOR_TEST_MS; + const serialization = await import('./serialization.js'); + const originalHydrate = serialization.hydrateStepReturnValue; + const spy = vi + .spyOn(serialization, 'hydrateStepReturnValue') + .mockImplementation(async (...args) => { + await new Promise((r) => setTimeout(r, hydrateDelayMs)); + return originalHydrate(...args); + }); + + try { + const workflowCode = ` + const createHook = globalThis[Symbol.for("WORKFLOW_CREATE_HOOK")]; + const sleep = globalThis[Symbol.for("WORKFLOW_SLEEP")]; + const doStep = globalThis[Symbol.for("WORKFLOW_USE_STEP")]("doStep"); + async function workflow() { + const hook = createHook(); + // Fire-and-forget timeout, not awaited — mirrors the production + // agent-stop pattern. + void sleep("1d"); + for await (const payload of hook) { + if (payload && payload.done) { + await doStep(payload); + break; + } + } + return "finished"; + } + ${getWorkflowTransformCode('workflow')}`; + + // On an unfixed EventsConsumer this rejects with a WorkflowRuntimeError + // (the false-positive "Unconsumed event" error). With the fix it runs + // to completion. + const result = await runWorkflow( + workflowCode, + workflowRun, + events, + noEncryptionKey + ); + const hydrated = await hydrateWorkflowReturnValue( + result, + workflowRunId, + noEncryptionKey + ); + expect(hydrated).toBe('finished'); + } finally { + spy.mockRestore(); + } + }, 30_000); }); diff --git a/workbench/example/workflows/99_e2e.ts b/workbench/example/workflows/99_e2e.ts index 8fcc8d9173..3f460865d3 100644 --- a/workbench/example/workflows/99_e2e.ts +++ b/workbench/example/workflows/99_e2e.ts @@ -1514,6 +1514,43 @@ export async function hookWithSleepWorkflow(token: string) { ////////////////////////////////////////////////////////// +/** + * https://github.com/vercel/workflow/pull/1528 Regression test for false-positive + * unconsumed event in for-await hook loops with steps: a hook iteration with + * an unawaited sleep where the step is only invoked on the final payload. + * The replay event log ends up with two `hook_received` events before a + * single `step_created`, which is the exact shape that triggered the + * false-positive "Corrupted event log" error in production. + */ +export async function hookWithSleepFinalStepWorkflow(token: string) { + 'use workflow'; + + type Payload = { type: string; id?: number; done?: boolean }; + + using hook = createHook({ token }); + + // Fire-and-forget timeout — the "concurrent pending entity" that interacts + // with the hook iteration during replay. + void sleep('1d'); + + const seen: number[] = []; + let finalResult: any; + + for await (const payload of hook) { + if (typeof payload.id === 'number') { + seen.push(payload.id); + } + if (payload.done) { + finalResult = await processPayload(payload); + break; + } + } + + return { seen, finalResult }; +} + +////////////////////////////////////////////////////////// + async function addNumbers(a: number, b: number) { 'use step'; return a + b;