diff --git a/packages/mongodb/src/storage.ts b/packages/mongodb/src/storage.ts index 2eebb84..6fe74a7 100644 --- a/packages/mongodb/src/storage.ts +++ b/packages/mongodb/src/storage.ts @@ -21,6 +21,7 @@ import type { import { WorkflowAPIError } from '@workflow/errors'; import { MongoClient } from 'mongodb'; import { monotonicFactory } from 'ulid'; +import { debug } from './utils.js'; // Monotonic ULID factory ensures IDs are always increasing const generateUlid = monotonicFactory(); @@ -560,14 +561,7 @@ export async function createStorage(config: MongoStorageConfig = {}): Promise<{ // ========================================================================= hooks: { async create(runId, data: CreateHookRequest, params): Promise { - // Check for token uniqueness - const existing = await hooksCollection.findOne({ token: data.token }); - if (existing) { - throw new WorkflowAPIError( - `Hook with token ${data.token} already exists`, - { status: 409 } - ); - } + debug('hooks.create called:', { runId, hookId: data.hookId, token: data.token }); const now = new Date(); const hook: HookDocument = { @@ -581,27 +575,54 @@ export async function createStorage(config: MongoStorageConfig = {}): Promise<{ createdAt: now, }; - await hooksCollection.insertOne(hook); + try { + // Use atomic insert - the unique index on token enforces uniqueness + await hooksCollection.insertOne(hook); + debug('hooks.create succeeded:', { hookId: data.hookId, token: data.token }); + } catch (error: unknown) { + // Check for duplicate key error (code 11000) + if ( + error && + typeof error === 'object' && + 'code' in error && + (error as { code: number }).code === 11000 + ) { + debug('hooks.create duplicate token:', { token: data.token }); + throw new WorkflowAPIError( + `Hook with token ${data.token} already exists`, + { status: 409 } + ); + } + debug('hooks.create error:', error); + throw error; + } + return filterHookData(hook as Hook, params?.resolveData); }, async get(hookId, params): Promise { + debug('hooks.get called:', { hookId }); const hook = await hooksCollection.findOne({ hookId }); if (!hook) { + debug('hooks.get not found:', { hookId }); throw new WorkflowAPIError(`Hook not found: ${hookId}`, { status: 404, }); } + debug('hooks.get found:', { hookId, token: hook.token }); return filterHookData(hook as Hook, params?.resolveData); }, async getByToken(token, params): Promise { + debug('hooks.getByToken called:', { token }); const hook = await hooksCollection.findOne({ token }); if (!hook) { + debug('hooks.getByToken not found:', { token }); throw new WorkflowAPIError(`Hook not found for token: ${token}`, { status: 404, }); } + debug('hooks.getByToken found:', { hookId: hook.hookId, runId: hook.runId }); return filterHookData(hook as Hook, params?.resolveData); }, diff --git a/packages/testing/src/hooks.ts b/packages/testing/src/hooks.ts index 4fbc36c..2f8419b 100644 --- a/packages/testing/src/hooks.ts +++ b/packages/testing/src/hooks.ts @@ -237,5 +237,126 @@ export function hookCleanupTests(options: HookTestOptions) { await expect(storage.hooks.getByToken(token)).rejects.toThrow(); } }); + + test('webhook flow: creates and retrieves hooks with random tokens', async () => { + // This test mimics the e2e webhookWorkflow test pattern: + // 1. Generate random tokens (like Math.random().toString(36).slice(2)) + // 2. Create hooks with those tokens + // 3. Immediately look them up by token + // 4. Verify they can be found + + const generateToken = () => Math.random().toString(36).slice(2); + const tokens = [generateToken(), generateToken(), generateToken()]; + + const run = await storage.runs.create({ + deploymentId: 'test-deployment', + workflowName: 'webhookWorkflow', + input: tokens, + }); + + // Update to running status (simulating workflow execution) + await storage.runs.update(run.runId, { status: 'running' }); + + // Create hooks for each token (simulating workflow.waitForWebhook()) + const createdHooks = []; + for (let i = 0; i < tokens.length; i++) { + const hook = await storage.hooks.create(run.runId, { + hookId: `whook_webhook-${i}-${Date.now()}`, + token: tokens[i], + metadata: { index: i, tokenLength: tokens[i].length }, + }); + createdHooks.push(hook); + } + + // Immediately verify all hooks can be retrieved by token + // This is the critical test - the webhook endpoint must find hooks by token + for (let i = 0; i < tokens.length; i++) { + const retrieved = await storage.hooks.getByToken(tokens[i]); + expect(retrieved.token).toBe(tokens[i]); + expect(retrieved.runId).toBe(run.runId); + expect(retrieved.hookId).toBe(createdHooks[i].hookId); + } + + // Clean up + await storage.runs.update(run.runId, { status: 'completed', output: {} }); + }); + + test('webhook flow: handles concurrent hook creation and lookup', async () => { + // Test concurrent operations to catch any race conditions + + const run = await storage.runs.create({ + deploymentId: 'test-deployment', + workflowName: 'concurrent-webhook-test', + input: [], + }); + + await storage.runs.update(run.runId, { status: 'running' }); + + // Create multiple hooks concurrently + const tokens = Array.from({ length: 5 }, () => Math.random().toString(36).slice(2)); + const createPromises = tokens.map((token, i) => + storage.hooks.create(run.runId, { + hookId: `whook_concurrent-${i}-${Date.now()}`, + token, + metadata: { index: i }, + }) + ); + + await Promise.all(createPromises); + + // Look up all hooks concurrently + const lookupPromises = tokens.map((token) => storage.hooks.getByToken(token)); + const retrievedHooks = await Promise.all(lookupPromises); + + // Verify all hooks were found + for (let i = 0; i < tokens.length; i++) { + expect(retrievedHooks[i].token).toBe(tokens[i]); + expect(retrievedHooks[i].runId).toBe(run.runId); + } + + // Clean up + await storage.runs.update(run.runId, { status: 'completed', output: {} }); + }); + + test('webhook flow: getByToken returns 404 for non-existent token', async () => { + const nonExistentToken = `non-existent-${Date.now()}-${Math.random().toString(36).slice(2)}`; + + await expect(storage.hooks.getByToken(nonExistentToken)).rejects.toThrow(); + + // Also verify the error has the correct status + try { + await storage.hooks.getByToken(nonExistentToken); + expect.fail('Should have thrown'); + } catch (error) { + expect(error).toBeInstanceOf(WorkflowAPIError); + expect((error as WorkflowAPIError).status).toBe(404); + } + }); + + test('webhook flow: getByToken finds hook immediately after creation', async () => { + // This test verifies there's no delay between hook creation and visibility + + const run = await storage.runs.create({ + deploymentId: 'test-deployment', + workflowName: 'immediate-lookup-test', + input: [], + }); + + const token = `immediate-${Date.now()}-${Math.random().toString(36).slice(2)}`; + + // Create and immediately lookup - this should always succeed + await storage.hooks.create(run.runId, { + hookId: `whook_immediate-${Date.now()}`, + token, + metadata: {}, + }); + + // No delay between create and getByToken + const retrieved = await storage.hooks.getByToken(token); + expect(retrieved.token).toBe(token); + + // Clean up + await storage.runs.update(run.runId, { status: 'completed', output: {} }); + }); }); }