Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions packages/mongodb/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import type {
import { WorkflowAPIError } from '@workflow/errors';
import { MongoClient } from 'mongodb';
import { monotonicFactory } from 'ulid';
import { debug } from './utils.js';

// Monotonic ULID factory ensures IDs are always increasing
const generateUlid = monotonicFactory();
Expand Down Expand Up @@ -560,14 +561,7 @@ export async function createStorage(config: MongoStorageConfig = {}): Promise<{
// =========================================================================
hooks: {
async create(runId, data: CreateHookRequest, params): Promise<Hook> {
// Check for token uniqueness
const existing = await hooksCollection.findOne({ token: data.token });
if (existing) {
throw new WorkflowAPIError(
`Hook with token ${data.token} already exists`,
{ status: 409 }
);
}
debug('hooks.create called:', { runId, hookId: data.hookId, token: data.token });

const now = new Date();
const hook: HookDocument = {
Expand All @@ -581,27 +575,54 @@ export async function createStorage(config: MongoStorageConfig = {}): Promise<{
createdAt: now,
};

await hooksCollection.insertOne(hook);
try {
// Use atomic insert - the unique index on token enforces uniqueness
await hooksCollection.insertOne(hook);
debug('hooks.create succeeded:', { hookId: data.hookId, token: data.token });
} catch (error: unknown) {
// Check for duplicate key error (code 11000)
if (
error &&
typeof error === 'object' &&
'code' in error &&
(error as { code: number }).code === 11000
) {
debug('hooks.create duplicate token:', { token: data.token });
throw new WorkflowAPIError(
`Hook with token ${data.token} already exists`,
{ status: 409 }
);
}
debug('hooks.create error:', error);
throw error;
}

return filterHookData(hook as Hook, params?.resolveData);
},

async get(hookId, params): Promise<Hook> {
debug('hooks.get called:', { hookId });
const hook = await hooksCollection.findOne({ hookId });
if (!hook) {
debug('hooks.get not found:', { hookId });
throw new WorkflowAPIError(`Hook not found: ${hookId}`, {
status: 404,
});
}
debug('hooks.get found:', { hookId, token: hook.token });
return filterHookData(hook as Hook, params?.resolveData);
},

async getByToken(token, params): Promise<Hook> {
debug('hooks.getByToken called:', { token });
const hook = await hooksCollection.findOne({ token });
if (!hook) {
debug('hooks.getByToken not found:', { token });
throw new WorkflowAPIError(`Hook not found for token: ${token}`, {
status: 404,
});
}
debug('hooks.getByToken found:', { hookId: hook.hookId, runId: hook.runId });
return filterHookData(hook as Hook, params?.resolveData);
},

Expand Down
121 changes: 121 additions & 0 deletions packages/testing/src/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,5 +237,126 @@ export function hookCleanupTests(options: HookTestOptions) {
await expect(storage.hooks.getByToken(token)).rejects.toThrow();
}
});

test('webhook flow: creates and retrieves hooks with random tokens', async () => {
// This test mimics the e2e webhookWorkflow test pattern:
// 1. Generate random tokens (like Math.random().toString(36).slice(2))
// 2. Create hooks with those tokens
// 3. Immediately look them up by token
// 4. Verify they can be found

const generateToken = () => Math.random().toString(36).slice(2);
const tokens = [generateToken(), generateToken(), generateToken()];

const run = await storage.runs.create({
deploymentId: 'test-deployment',
workflowName: 'webhookWorkflow',
input: tokens,
});

// Update to running status (simulating workflow execution)
await storage.runs.update(run.runId, { status: 'running' });

// Create hooks for each token (simulating workflow.waitForWebhook())
const createdHooks = [];
for (let i = 0; i < tokens.length; i++) {
const hook = await storage.hooks.create(run.runId, {
hookId: `whook_webhook-${i}-${Date.now()}`,
token: tokens[i],
metadata: { index: i, tokenLength: tokens[i].length },
});
createdHooks.push(hook);
}

// Immediately verify all hooks can be retrieved by token
// This is the critical test - the webhook endpoint must find hooks by token
for (let i = 0; i < tokens.length; i++) {
const retrieved = await storage.hooks.getByToken(tokens[i]);
expect(retrieved.token).toBe(tokens[i]);
expect(retrieved.runId).toBe(run.runId);
expect(retrieved.hookId).toBe(createdHooks[i].hookId);
}

// Clean up
await storage.runs.update(run.runId, { status: 'completed', output: {} });
});

test('webhook flow: handles concurrent hook creation and lookup', async () => {
// Test concurrent operations to catch any race conditions

const run = await storage.runs.create({
deploymentId: 'test-deployment',
workflowName: 'concurrent-webhook-test',
input: [],
});

await storage.runs.update(run.runId, { status: 'running' });

// Create multiple hooks concurrently
const tokens = Array.from({ length: 5 }, () => Math.random().toString(36).slice(2));
const createPromises = tokens.map((token, i) =>
storage.hooks.create(run.runId, {
hookId: `whook_concurrent-${i}-${Date.now()}`,
token,
metadata: { index: i },
})
);

await Promise.all(createPromises);

// Look up all hooks concurrently
const lookupPromises = tokens.map((token) => storage.hooks.getByToken(token));
const retrievedHooks = await Promise.all(lookupPromises);

// Verify all hooks were found
for (let i = 0; i < tokens.length; i++) {
expect(retrievedHooks[i].token).toBe(tokens[i]);
expect(retrievedHooks[i].runId).toBe(run.runId);
}

// Clean up
await storage.runs.update(run.runId, { status: 'completed', output: {} });
});

test('webhook flow: getByToken returns 404 for non-existent token', async () => {
const nonExistentToken = `non-existent-${Date.now()}-${Math.random().toString(36).slice(2)}`;

await expect(storage.hooks.getByToken(nonExistentToken)).rejects.toThrow();

// Also verify the error has the correct status
try {
await storage.hooks.getByToken(nonExistentToken);
expect.fail('Should have thrown');
} catch (error) {
expect(error).toBeInstanceOf(WorkflowAPIError);
expect((error as WorkflowAPIError).status).toBe(404);
}
});

test('webhook flow: getByToken finds hook immediately after creation', async () => {
// This test verifies there's no delay between hook creation and visibility

const run = await storage.runs.create({
deploymentId: 'test-deployment',
workflowName: 'immediate-lookup-test',
input: [],
});

const token = `immediate-${Date.now()}-${Math.random().toString(36).slice(2)}`;

// Create and immediately lookup - this should always succeed
await storage.hooks.create(run.runId, {
hookId: `whook_immediate-${Date.now()}`,
token,
metadata: {},
});

// No delay between create and getByToken
const retrieved = await storage.hooks.getByToken(token);
expect(retrieved.token).toBe(token);

// Clean up
await storage.runs.update(run.runId, { status: 'completed', output: {} });
});
});
}