Skip to content

Commit d1391e1

Browse files
pranaygpclaude
andauthored
Fix race condition allowing duplicate hook_disposed events (#1523)
* Fix race condition allowing duplicate hook_disposed events Concurrent workflow invocations could both post hook_disposed for the same hook, corrupting the event log with duplicate events. This mirrors the wait_completed race condition fixed in #1057/#1434. - world-local: Add writeExclusive lock file for hook_disposed (same pattern as wait_completed and step terminal states) - world-postgres: Use DELETE ... RETURNING to atomically detect if another caller already deleted the hook entity - suspension-handler: Improve log messages to distinguish hook-already- disposed (EntityConflictError) from run-already-completed (RunExpiredError) Fixes #1266 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Accept either EntityConflictError or HookNotFoundError in race test The concurrent hook_disposed race has two possible orderings: the loser may hit the lock file (EntityConflictError) or find the hook entity already deleted by the winner (HookNotFoundError). Both are correct. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Assert only one hook_disposed event in event log after race Verifies the losing concurrent caller didn't sneak an event in before the guard threw. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 8e7083b commit d1391e1

5 files changed

Lines changed: 78 additions & 4 deletions

File tree

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@workflow/core': patch
3+
'@workflow/world-local': patch
4+
'@workflow/world-postgres': patch
5+
---
6+
7+
Fix race condition allowing duplicate `hook_disposed` events for the same hook

packages/core/src/runtime/suspension-handler.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,17 @@ export async function handleSuspension({
169169
try {
170170
await world.events.create(runId, hookDisposedEvent, { requestId });
171171
} catch (err) {
172-
if (EntityConflictError.is(err) || RunExpiredError.is(err)) {
172+
if (EntityConflictError.is(err)) {
173+
// Hook was already disposed by a concurrent invocation — safe to skip
174+
runtimeLogger.info(
175+
'Hook already disposed, skipping duplicate disposal',
176+
{
177+
workflowRunId: runId,
178+
correlationId: queueItem.correlationId,
179+
message: err.message,
180+
}
181+
);
182+
} else if (RunExpiredError.is(err)) {
173183
runtimeLogger.info(
174184
'Workflow run already completed, skipping hook disposal',
175185
{

packages/world-local/src/storage.test.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1952,6 +1952,42 @@ describe('Storage', () => {
19521952
name: 'EntityConflictError',
19531953
});
19541954
});
1955+
1956+
it('should reject concurrent hook_disposed for the same hook', async () => {
1957+
await createHook(storage, testRunId, {
1958+
hookId: 'hook_race_1',
1959+
token: 'hook-race-token-1',
1960+
});
1961+
1962+
const results = await Promise.allSettled([
1963+
disposeHook(storage, testRunId, 'hook_race_1'),
1964+
disposeHook(storage, testRunId, 'hook_race_1'),
1965+
]);
1966+
1967+
const fulfilled = results.filter((r) => r.status === 'fulfilled');
1968+
const rejected = results.filter((r) => r.status === 'rejected');
1969+
1970+
expect(fulfilled).toHaveLength(1);
1971+
expect(rejected).toHaveLength(1);
1972+
// Depending on timing, the loser may hit the lock file (EntityConflictError)
1973+
// or find the hook entity already deleted (HookNotFoundError).
1974+
const reason = (rejected[0] as PromiseRejectedResult).reason as {
1975+
name?: string;
1976+
};
1977+
expect(['EntityConflictError', 'HookNotFoundError']).toContain(
1978+
reason.name
1979+
);
1980+
1981+
// Verify only one hook_disposed event was written to the log
1982+
const events = await storage.events.list({
1983+
runId: testRunId,
1984+
pagination: {},
1985+
});
1986+
const hookDisposedEvents = events.data.filter(
1987+
(e) => e.eventType === 'hook_disposed'
1988+
);
1989+
expect(hookDisposedEvents).toHaveLength(1);
1990+
});
19551991
});
19561992

19571993
describe('run terminal state validation', () => {

packages/world-local/src/storage/events-storage.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,19 @@ export function createEventsStorage(
714714
hook
715715
);
716716
} else if (data.eventType === 'hook_disposed') {
717+
// hook_disposed: Deletes hook entity, rejects duplicates.
718+
// Uses writeExclusive on a lock file to atomically prevent concurrent
719+
// invocations from both disposing the same hook (TOCTOU race).
720+
const hookLockName = tag
721+
? `${data.correlationId}.disposed.${tag}`
722+
: `${data.correlationId}.disposed`;
723+
const lockPath = path.join(basedir, '.locks', 'hooks', hookLockName);
724+
const claimed = await writeExclusive(lockPath, '');
725+
if (!claimed) {
726+
throw new EntityConflictError(
727+
`Hook "${data.correlationId}" already disposed`
728+
);
729+
}
717730
// Read the hook to get its token before deleting
718731
const hookPath = taggedPath(basedir, 'hooks', data.correlationId, tag);
719732
const existingHook = await readJSONWithFallback(

packages/world-postgres/src/storage.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1039,11 +1039,19 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] {
10391039
}
10401040
}
10411041

1042-
// Handle hook_disposed event: delete hook entity
1042+
// Handle hook_disposed event: delete hook entity atomically.
1043+
// Uses DELETE ... RETURNING to ensure only one concurrent caller
1044+
// succeeds — if no rows are returned, the hook was already disposed.
10431045
if (data.eventType === 'hook_disposed' && data.correlationId) {
1044-
await drizzle
1046+
const [deleted] = await drizzle
10451047
.delete(Schema.hooks)
1046-
.where(eq(Schema.hooks.hookId, data.correlationId));
1048+
.where(eq(Schema.hooks.hookId, data.correlationId))
1049+
.returning({ hookId: Schema.hooks.hookId });
1050+
if (!deleted) {
1051+
throw new EntityConflictError(
1052+
`Hook "${data.correlationId}" already disposed`
1053+
);
1054+
}
10471055
}
10481056

10491057
// Handle wait_created event: create wait entity

0 commit comments

Comments
 (0)