Skip to content

Commit 5e06a7c

Browse files
pranaygpclaude
andauthored
Materialize waits as entities to prevent duplicate wait_completed events (#1057)
* Handle 409 conflict when completing waits that were already completed When multiple concurrent workflow invocations race to complete the same wait, the server returns 409 (conflict) for duplicates. This change handles the 409 gracefully in both runtime.ts (sleep elapsed check) and runs.ts (wakeUpRun), preventing crashes and treating already-completed waits as successful. Also updates event-sourcing docs to reflect that waits are now materialized as entities in storage with atomic completion guarantees. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Point e2e tests at workflow-server preview for wait materialization branch Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Materialize waits as entities in local and postgres worlds to prevent duplicate wait_completed events Adds Wait type/schema to the shared world package and implements wait entity materialization in both local (filesystem) and postgres world implementations, matching the DynamoDB behavior. wait_created creates a wait entity with status 'waiting', and wait_completed transitions it to 'completed' with guards that reject duplicates (409). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Fix waitId to use composite key for consistency with postgres world Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Clean up wait entities on terminal run states and register waits migration Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Clear WORKFLOW_SERVER_URL_OVERRIDE for merge Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Update changeset for all affected packages and restore server URL override Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Clear WORKFLOW_SERVER_URL_OVERRIDE now that server PR is merged The server-side changes (vercel/workflow-server#265) have been merged to main and deployed to production, so we no longer need to point at the preview URL. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 0946dad commit 5e06a7c

13 files changed

Lines changed: 432 additions & 29 deletions

File tree

.changeset/wait-complete-guard.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
---
2+
"@workflow/core": patch
3+
"@workflow/world": patch
4+
"@workflow/world-local": patch
5+
"@workflow/world-postgres": patch
6+
---
7+
8+
Materialize waits as entities to prevent duplicate wait_completed events
9+
10+
- `@workflow/core`: Handle 409 conflict gracefully when creating wait_completed events, preventing crashes when multiple concurrent invocations race to complete the same wait
11+
- `@workflow/world`: Add `Wait` type, `WaitSchema`, and `WaitStatusSchema` exports; add optional `wait` field to `EventResult`
12+
- `@workflow/world-local`: Materialize wait entities on wait_created/wait_completed with duplicate detection; clean up waits on terminal run states
13+
- `@workflow/world-postgres`: Add `workflow_waits` table with `wait_status` enum; materialize wait entities with conditional writes for duplicate prevention; clean up waits on terminal run states

docs/content/docs/how-it-works/event-sourcing.mdx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ In the Workflow DevKit, the following entity types are managed through events:
3333
- **Runs**: Workflow execution instances (materialized in storage)
3434
- **Steps**: Individual atomic operations within a workflow (materialized in storage)
3535
- **Hooks**: Suspension points that can receive external data (materialized in storage)
36-
- **Waits**: Sleep or delay operations (tracked via events only, not materialized)
36+
- **Waits**: Sleep or delay operations (materialized in storage)
3737

3838
## Entity Lifecycles
3939

@@ -151,7 +151,7 @@ flowchart TD
151151
- `completed`: Delay period has elapsed, workflow can resume
152152

153153
<Callout type="info">
154-
Unlike Runs, Steps, and Hooks, waits are conceptual entities tracked only through events. There is no separate "Wait" record in storage that can be queried—the wait state is derived entirely from the `wait_created` and `wait_completed` events in the event log.
154+
Like Runs, Steps, and Hooks, waits are materialized as entities in storage. When a `wait_created` event is processed, a wait entity is created with status `waiting`. When a `wait_completed` event is processed, the wait entity is atomically transitioned to `completed` — this guarantees that a wait can only be completed exactly once, even if multiple concurrent invocations attempt to complete it simultaneously.
155155
</Callout>
156156

157157
## Event Types Reference

packages/core/src/runtime.ts

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,26 @@ export function workflowEntrypoint(
220220

221221
// Create all wait_completed events
222222
for (const waitEvent of waitsToComplete) {
223-
const result = await world.events.create(
224-
runId,
225-
waitEvent
226-
);
227-
// Add the event to the events array so the workflow can see it
228-
events.push(result.event!);
223+
try {
224+
const result = await world.events.create(
225+
runId,
226+
waitEvent
227+
);
228+
// Add the event to the events array so the workflow can see it
229+
events.push(result.event!);
230+
} catch (err) {
231+
if (WorkflowAPIError.is(err) && err.status === 409) {
232+
runtimeLogger.info(
233+
'Wait already completed, skipping',
234+
{
235+
workflowRunId: runId,
236+
correlationId: waitEvent.correlationId,
237+
}
238+
);
239+
continue;
240+
}
241+
throw err;
242+
}
229243
}
230244

231245
const result = await trace(
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import { WorkflowAPIError } from '@workflow/errors';
2+
import type { Event, World } from '@workflow/world';
3+
import { describe, expect, it, vi } from 'vitest';
4+
5+
// Mock version module to avoid missing generated file
6+
vi.mock('../version.js', () => ({ version: '0.0.0-test' }));
7+
8+
import { wakeUpRun } from './runs.js';
9+
10+
describe('wakeUpRun', () => {
11+
function createMockWorld(
12+
overrides: {
13+
run?: Partial<
14+
ReturnType<World['runs']['get']> extends Promise<infer T> ? T : never
15+
>;
16+
events?: Event[];
17+
createError?: Error;
18+
} = {}
19+
): World {
20+
const run = {
21+
runId: 'wrun_123',
22+
workflowName: 'test-workflow',
23+
status: 'running' as const,
24+
specVersion: 2,
25+
input: [],
26+
createdAt: new Date(),
27+
updatedAt: new Date(),
28+
startedAt: new Date(),
29+
deploymentId: 'test-deployment',
30+
...overrides.run,
31+
};
32+
33+
const events = overrides.events ?? [];
34+
35+
return {
36+
runs: {
37+
get: vi.fn().mockResolvedValue(run),
38+
},
39+
events: {
40+
list: vi.fn().mockResolvedValue({
41+
data: events,
42+
hasMore: false,
43+
cursor: null,
44+
}),
45+
create: overrides.createError
46+
? vi.fn().mockRejectedValue(overrides.createError)
47+
: vi.fn().mockResolvedValue({ event: {} }),
48+
},
49+
queue: vi.fn().mockResolvedValue(undefined),
50+
} as unknown as World;
51+
}
52+
53+
it('should count 409 conflict as a successful stop', async () => {
54+
const events: Event[] = [
55+
{
56+
eventId: 'evnt_0',
57+
runId: 'wrun_123',
58+
eventType: 'wait_created',
59+
correlationId: 'wait_abc',
60+
eventData: { resumeAt: new Date('2024-01-01T00:00:01.000Z') },
61+
createdAt: new Date(),
62+
},
63+
];
64+
65+
const conflict = new WorkflowAPIError('Wait already completed', {
66+
status: 409,
67+
});
68+
69+
const world = createMockWorld({ events, createError: conflict });
70+
const result = await wakeUpRun(world, 'wrun_123');
71+
72+
expect(result.stoppedCount).toBe(1);
73+
expect(world.queue).toHaveBeenCalled();
74+
});
75+
76+
it('should throw for non-409 errors', async () => {
77+
const events: Event[] = [
78+
{
79+
eventId: 'evnt_0',
80+
runId: 'wrun_123',
81+
eventType: 'wait_created',
82+
correlationId: 'wait_abc',
83+
eventData: { resumeAt: new Date('2024-01-01T00:00:01.000Z') },
84+
createdAt: new Date(),
85+
},
86+
];
87+
88+
const serverError = new WorkflowAPIError('Internal server error', {
89+
status: 500,
90+
});
91+
92+
const world = createMockWorld({ events, createError: serverError });
93+
94+
await expect(wakeUpRun(world, 'wrun_123')).rejects.toThrow(AggregateError);
95+
});
96+
});

packages/core/src/runtime/runs.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { WorkflowAPIError } from '@workflow/errors';
12
import { hydrateWorkflowArguments } from '../serialization.js';
23
import {
34
type Event,
@@ -183,7 +184,11 @@ export async function wakeUpRun(
183184
await world.events.create(runId, eventData, { v1Compat: compatMode });
184185
stoppedCount++;
185186
} catch (err) {
186-
errors.push(err instanceof Error ? err : new Error(String(err)));
187+
if (WorkflowAPIError.is(err) && err.status === 409) {
188+
stoppedCount++;
189+
} else {
190+
errors.push(err instanceof Error ? err : new Error(String(err)));
191+
}
187192
}
188193
}
189194

packages/world-local/src/storage/events-storage.ts

Lines changed: 93 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type {
77
SerializedData,
88
Step,
99
Storage,
10+
Wait,
1011
WorkflowRun,
1112
} from '@workflow/world';
1213
import {
@@ -16,6 +17,7 @@ import {
1617
requiresNewerWorld,
1718
SPEC_VERSION_CURRENT,
1819
StepSchema,
20+
WaitSchema,
1921
WorkflowRunSchema,
2022
} from '@workflow/world';
2123
import { DEFAULT_RESOLVE_DATA_OPTION } from '../config.js';
@@ -31,6 +33,25 @@ import { getObjectCreatedAt, monotonicUlid } from './helpers.js';
3133
import { deleteAllHooksForRun } from './hooks-storage.js';
3234
import { handleLegacyEvent } from './legacy.js';
3335

36+
/**
37+
* Helper function to delete all waits associated with a workflow run.
38+
* Called when a run reaches a terminal state.
39+
*/
40+
async function deleteAllWaitsForRun(
41+
basedir: string,
42+
runId: string
43+
): Promise<void> {
44+
const waitsDir = path.join(basedir, 'waits');
45+
const files = await listJSONFiles(waitsDir);
46+
47+
for (const file of files) {
48+
if (file.startsWith(`${runId}-`)) {
49+
const waitPath = path.join(waitsDir, `${file}.json`);
50+
await deleteJSON(waitPath);
51+
}
52+
}
53+
}
54+
3455
/**
3556
* Creates the events storage implementation using the filesystem.
3657
* Implements the Storage['events'] interface with create, list, and listByCorrelationId operations.
@@ -156,7 +177,8 @@ export function createEventsStorage(basedir: string): Storage['events'] {
156177
// Creating new entities on terminal runs is not allowed
157178
if (
158179
data.eventType === 'step_created' ||
159-
data.eventType === 'hook_created'
180+
data.eventType === 'hook_created' ||
181+
data.eventType === 'wait_created'
160182
) {
161183
throw new WorkflowAPIError(
162184
`Cannot create new entities on run in terminal state "${currentRun.status}"`,
@@ -240,6 +262,7 @@ export function createEventsStorage(basedir: string): Storage['events'] {
240262
let run: WorkflowRun | undefined;
241263
let step: Step | undefined;
242264
let hook: Hook | undefined;
265+
let wait: Wait | undefined;
243266

244267
// Create/update entity based on event type (event-sourced architecture)
245268
// Run lifecycle events
@@ -312,7 +335,10 @@ export function createEventsStorage(basedir: string): Storage['events'] {
312335
updatedAt: now,
313336
};
314337
await writeJSON(runPath, run, { overwrite: true });
315-
await deleteAllHooksForRun(basedir, effectiveRunId);
338+
await Promise.all([
339+
deleteAllHooksForRun(basedir, effectiveRunId),
340+
deleteAllWaitsForRun(basedir, effectiveRunId),
341+
]);
316342
}
317343
} else if (data.eventType === 'run_failed' && 'eventData' in data) {
318344
const failedData = data.eventData as {
@@ -346,7 +372,10 @@ export function createEventsStorage(basedir: string): Storage['events'] {
346372
updatedAt: now,
347373
};
348374
await writeJSON(runPath, run, { overwrite: true });
349-
await deleteAllHooksForRun(basedir, effectiveRunId);
375+
await Promise.all([
376+
deleteAllHooksForRun(basedir, effectiveRunId),
377+
deleteAllWaitsForRun(basedir, effectiveRunId),
378+
]);
350379
}
351380
} else if (data.eventType === 'run_cancelled') {
352381
// Reuse currentRun from validation (already read above)
@@ -369,7 +398,10 @@ export function createEventsStorage(basedir: string): Storage['events'] {
369398
updatedAt: now,
370399
};
371400
await writeJSON(runPath, run, { overwrite: true });
372-
await deleteAllHooksForRun(basedir, effectiveRunId);
401+
await Promise.all([
402+
deleteAllHooksForRun(basedir, effectiveRunId),
403+
deleteAllWaitsForRun(basedir, effectiveRunId),
404+
]);
373405
}
374406
} else if (
375407
// Step lifecycle events
@@ -611,6 +643,62 @@ export function createEventsStorage(basedir: string): Storage['events'] {
611643
`${data.correlationId}.json`
612644
);
613645
await deleteJSON(hookPath);
646+
} else if (data.eventType === 'wait_created' && 'eventData' in data) {
647+
// wait_created: Creates wait entity with status 'waiting'
648+
const waitData = data.eventData as {
649+
resumeAt?: Date;
650+
};
651+
const waitCompositeKey = `${effectiveRunId}-${data.correlationId}`;
652+
const waitPath = path.join(
653+
basedir,
654+
'waits',
655+
`${waitCompositeKey}.json`
656+
);
657+
const existingWait = await readJSON(waitPath, WaitSchema);
658+
if (existingWait) {
659+
throw new WorkflowAPIError(
660+
`Wait "${data.correlationId}" already exists`,
661+
{ status: 409 }
662+
);
663+
}
664+
wait = {
665+
waitId: waitCompositeKey,
666+
runId: effectiveRunId,
667+
status: 'waiting',
668+
resumeAt: waitData.resumeAt,
669+
completedAt: undefined,
670+
createdAt: now,
671+
updatedAt: now,
672+
specVersion: effectiveSpecVersion,
673+
};
674+
await writeJSON(waitPath, wait);
675+
} else if (data.eventType === 'wait_completed') {
676+
// wait_completed: Transitions wait to 'completed', rejects duplicates
677+
const waitCompositeKey = `${effectiveRunId}-${data.correlationId}`;
678+
const waitPath = path.join(
679+
basedir,
680+
'waits',
681+
`${waitCompositeKey}.json`
682+
);
683+
const existingWait = await readJSON(waitPath, WaitSchema);
684+
if (!existingWait) {
685+
throw new WorkflowAPIError(`Wait "${data.correlationId}" not found`, {
686+
status: 404,
687+
});
688+
}
689+
if (existingWait.status === 'completed') {
690+
throw new WorkflowAPIError(
691+
`Wait "${data.correlationId}" already completed`,
692+
{ status: 409 }
693+
);
694+
}
695+
wait = {
696+
...existingWait,
697+
status: 'completed',
698+
completedAt: now,
699+
updatedAt: now,
700+
};
701+
await writeJSON(waitPath, wait, { overwrite: true });
614702
}
615703
// Note: hook_received events are stored in the event log but don't
616704
// modify the Hook entity (which doesn't have a payload field)
@@ -629,6 +717,7 @@ export function createEventsStorage(basedir: string): Storage['events'] {
629717
run,
630718
step,
631719
hook,
720+
wait,
632721
};
633722
},
634723

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
CREATE TYPE "public"."wait_status" AS ENUM('waiting', 'completed');--> statement-breakpoint
2+
CREATE TABLE IF NOT EXISTS "workflow"."workflow_waits" (
3+
"wait_id" varchar PRIMARY KEY NOT NULL,
4+
"run_id" varchar NOT NULL,
5+
"status" "wait_status" NOT NULL,
6+
"resume_at" timestamp,
7+
"completed_at" timestamp,
8+
"created_at" timestamp DEFAULT now() NOT NULL,
9+
"updated_at" timestamp DEFAULT now() NOT NULL,
10+
"spec_version" integer
11+
);
12+
--> statement-breakpoint
13+
CREATE INDEX IF NOT EXISTS "workflow_waits_run_id_index" ON "workflow"."workflow_waits" USING btree ("run_id");

packages/world-postgres/src/drizzle/migrations/meta/_journal.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@
5050
"when": 1768500000000,
5151
"tag": "0006_add_error_cbor",
5252
"breakpoints": true
53+
},
54+
{
55+
"idx": 7,
56+
"version": "7",
57+
"when": 1769500000000,
58+
"tag": "0007_add_waits_table",
59+
"breakpoints": true
5360
}
5461
]
5562
}

0 commit comments

Comments
 (0)