diff --git a/package-lock.json b/package-lock.json index c1dd6c6302..512eb94f8e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -36496,6 +36496,7 @@ "version": "1.0.0", "license": "SEE LICENSE IN LICENSE FILE IN GIT REPOSITORY", "dependencies": { + "@aws-sdk/client-sqs": "3.993.0", "@modelcontextprotocol/sdk": "1.26.0", "@nangohq/authz": "file:../authz", "@nangohq/billing": "file:../billing", diff --git a/packages/jobs/lib/app.ts b/packages/jobs/lib/app.ts index 3603348765..ecf962d67f 100644 --- a/packages/jobs/lib/app.ts +++ b/packages/jobs/lib/app.ts @@ -7,12 +7,14 @@ import { destroy as destroyLogs, otlp } from '@nangohq/logs'; import { getOtlpRoutes } from '@nangohq/shared'; import { getLogger, initSentry, once, report, stringifyError } from '@nangohq/utils'; +import { orchestratorClient } from './clients.js'; import { envs } from './env.js'; import { LambdaInvocationsProcessor } from './invocations/lambda.processor.js'; import { Processor } from './processor/processor.js'; import { getDefaultFleet, startFleets, stopFleets } from './runtime/runtimes.js'; import { server } from './server.js'; import { pubsub } from './utils/pubsub.js'; +import { DispatchQueueConsumer } from './webhook/dispatch-queue/consumer.js'; const logger = getLogger('Jobs'); @@ -38,6 +40,19 @@ try { const processor = new Processor(orchestratorUrl); const invocationsProcessor = new LambdaInvocationsProcessor(); + const webhookDispatchConsumer = envs.NANGO_TASK_DISPATCH_QUEUE_URL + ? new DispatchQueueConsumer({ + queueUrl: envs.NANGO_TASK_DISPATCH_QUEUE_URL, + orchestratorClient, + webhookMaxConcurrency: envs.WEBHOOK_ENVIRONMENT_MAX_CONCURRENCY, + consumerConcurrency: envs.NANGO_TASK_DISPATCH_CONSUMER_CONCURRENCY, + maxMessages: envs.NANGO_TASK_DISPATCH_MAX_MESSAGES, + waitTimeSeconds: envs.NANGO_TASK_DISPATCH_WAIT_TIME_SECONDS, + visibilityTimeoutSeconds: envs.NANGO_TASK_DISPATCH_VISIBILITY_TIMEOUT_SECONDS, + maxAgeMs: envs.NANGO_TASK_DISPATCH_MAX_AGE_SECONDS * 1000 + }) + : undefined; + // We are using a setTimeout because we don't want overlapping setInterval if the DB is down let healthCheck: NodeJS.Timeout | undefined; let healthCheckFailures = 0; @@ -79,6 +94,9 @@ try { await db.readOnly.destroy(); await destroyKvstore(); await invocationsProcessor.stop(); + if (webhookDispatchConsumer) { + await webhookDispatchConsumer.stop(); + } console.info('Closed'); @@ -109,6 +127,11 @@ try { invocationsProcessor.start(); + if (webhookDispatchConsumer) { + webhookDispatchConsumer.start(); + logger.info('webhook dispatch queue consumer started'); + } + void otlp.register(getOtlpRoutes); } catch (err) { logger.error(stringifyError(err)); diff --git a/packages/jobs/lib/webhook/dispatch-queue/consumer.ts b/packages/jobs/lib/webhook/dispatch-queue/consumer.ts new file mode 100644 index 0000000000..93036ef4c2 --- /dev/null +++ b/packages/jobs/lib/webhook/dispatch-queue/consumer.ts @@ -0,0 +1,242 @@ +import { DeleteMessageCommand, ReceiveMessageCommand, SQSClient } from '@aws-sdk/client-sqs'; +import tracer from 'dd-trace'; +import * as z from 'zod'; + +import { logContextGetter } from '@nangohq/logs'; +import { isDuplicateTaskNameClientError, jsonSchema } from '@nangohq/nango-orchestrator'; +import { Err, Ok, getLogger, metrics, report } from '@nangohq/utils'; + +import { envs } from '../../env.js'; + +import type { Message } from '@aws-sdk/client-sqs'; +import type { OrchestratorClient } from '@nangohq/nango-orchestrator'; +import type { WebhookDispatchMessage } from '@nangohq/types'; +import type { Result } from '@nangohq/utils'; + +const logger = getLogger('jobs.webhook.dispatch-queue.consumer'); + +const messageSchema: z.ZodType = z.object({ + version: z.literal(1), + kind: z.literal('webhook'), + taskName: z.string().min(1), + createdAt: z.string().min(1), + accountId: z.number(), + integrationId: z.number(), + provider: z.string(), + parentSyncName: z.string(), + activityLogId: z.string(), + webhookName: z.string(), + connection: z.object({ + id: z.number(), + connection_id: z.string(), + provider_config_key: z.string(), + environment_id: z.number() + }), + payload: jsonSchema +}); + +export interface DispatchQueueConsumerProps { + queueUrl: string; + orchestratorClient: OrchestratorClient; + webhookMaxConcurrency: number; + consumerConcurrency: number; + maxMessages: number; + waitTimeSeconds: number; + visibilityTimeoutSeconds: number; + maxAgeMs: number; + sqs?: SQSClient; +} + +export class DispatchQueueConsumer { + private readonly sqs: SQSClient; + private readonly queueUrl: string; + private readonly orchestratorClient: OrchestratorClient; + private readonly webhookMaxConcurrency: number; + private readonly consumerConcurrency: number; + private readonly maxMessages: number; + private readonly waitTimeSeconds: number; + private readonly visibilityTimeoutSeconds: number; + private readonly maxAgeMs: number; + private readonly abortController = new AbortController(); + private loopPromises: Promise[] = []; + + constructor(props: DispatchQueueConsumerProps) { + this.queueUrl = props.queueUrl; + this.orchestratorClient = props.orchestratorClient; + this.webhookMaxConcurrency = props.webhookMaxConcurrency; + this.consumerConcurrency = props.consumerConcurrency; + this.maxMessages = props.maxMessages; + this.waitTimeSeconds = props.waitTimeSeconds; + this.visibilityTimeoutSeconds = props.visibilityTimeoutSeconds; + this.maxAgeMs = props.maxAgeMs; + this.sqs = props.sqs ?? new SQSClient(envs.AWS_REGION ? { region: envs.AWS_REGION } : {}); + } + + start(): void { + if (this.loopPromises.length > 0) { + return; + } + logger.info(`webhook dispatch consumer subscribing to ${this.queueUrl}`, { consumerConcurrency: this.consumerConcurrency }); + this.loopPromises = Array.from({ length: this.consumerConcurrency }, () => this.pollLoop()); + } + + async stop(): Promise { + this.abortController.abort(); + if (this.loopPromises.length > 0) { + await Promise.allSettled(this.loopPromises); + this.loopPromises = []; + } + this.sqs.destroy(); + } + + private async pollLoop(): Promise { + const signal = this.abortController.signal; + while (!signal.aborted) { + try { + const result = await this.sqs.send( + new ReceiveMessageCommand({ + QueueUrl: this.queueUrl, + MaxNumberOfMessages: this.maxMessages, + WaitTimeSeconds: this.waitTimeSeconds, + VisibilityTimeout: this.visibilityTimeoutSeconds, + MessageAttributeNames: ['All'], + MessageSystemAttributeNames: ['SentTimestamp', 'ApproximateReceiveCount'] + }), + { abortSignal: signal } + ); + + const messages = result.Messages ?? []; + if (messages.length === 0) continue; + + // Process all messages in a batch concurrently. Each SQS receive is already + // capped at maxMessages (<=10) so no extra semaphore is needed. + await Promise.all(messages.map((msg) => this.processMessage(msg))); + } catch (err) { + if (err instanceof Error && err.name === 'AbortError') break; + report(new Error('webhook dispatch consumer receive failed', { cause: err })); + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + } + } + + private async processMessage(msg: Message): Promise { + const active = tracer.scope().active(); + const span = tracer.startSpan('jobs.webhook.dispatch_queue.process', { + ...(active ? { childOf: active } : {}) + }); + + return await tracer.scope().activate(span, async () => { + try { + if (msg.Body === undefined || !msg.ReceiptHandle) { + return; + } + + const parsed = this.parseMessage(msg.Body); + if (parsed.isErr()) { + span.setTag('poison_pill', true); + span.setTag('poison_pill_reason', parsed.error.message); + metrics.increment(metrics.Types.WEBHOOK_DISPATCH_POISON_PILL); + // Parse failures are poison pills — redelivering won't help. Delete and move on. + await this.tryDeleteMessage(msg.ReceiptHandle); + return; + } + + const message = parsed.value; + const environmentId = message.connection.environment_id; + span.setTag('taskName', message.taskName); + span.setTag('provider', message.provider); + span.setTag('environmentId', environmentId); + + const sentTimestampMs = Number(msg.Attributes?.['SentTimestamp'] ?? '0'); + if (sentTimestampMs > 0) { + const dwellMs = Date.now() - sentTimestampMs; + metrics.duration(metrics.Types.WEBHOOK_DISPATCH_DWELL_MS, dwellMs, { provider: message.provider }); + + if (this.maxAgeMs > 0 && dwellMs > this.maxAgeMs) { + span.setTag('stale', true); + metrics.increment(metrics.Types.WEBHOOK_DISPATCH_STALE, 1, { accountId: message.accountId }); + const logCtx = logContextGetter.get({ id: message.activityLogId, accountId: message.accountId }); + await logCtx.warn('Webhook was discarded: it spent too long in the queue and was not processed.', { dwell_ms: dwellMs }); + await this.tryDeleteMessage(msg.ReceiptHandle); + return; + } + } + + const scheduleRes = await this.orchestratorClient.executeWebhook({ + name: message.taskName, + group: { key: `webhook:environment:${environmentId}`, maxConcurrency: this.webhookMaxConcurrency }, + args: { + webhookName: message.webhookName, + parentSyncName: message.parentSyncName, + connection: message.connection, + activityLogId: message.activityLogId, + input: message.payload + } + }); + + if (scheduleRes.isErr()) { + if (isDuplicateTaskNameClientError(scheduleRes.error)) { + span.setTag('duplicate_task_name', true); + metrics.increment(metrics.Types.WEBHOOK_DISPATCH_CONSUME_SUCCESS, 1, { provider: message.provider }); + await this.tryDeleteMessage(msg.ReceiptHandle); + return; + } + + metrics.increment(metrics.Types.WEBHOOK_DISPATCH_CONSUME_FAILURE, 1, { provider: message.provider }); + span.setTag('error', true); + span.setTag('error.type', scheduleRes.error.name); + span.setTag('error.message', scheduleRes.error.message); + + const responsePayload = getClientErrorResponsePayload(scheduleRes.error); + if (responsePayload) { + span.setTag('error.details', responsePayload); + } + + // Don't delete — let SQS redeliver and eventually DLQ. + return; + } + + metrics.increment(metrics.Types.WEBHOOK_DISPATCH_CONSUME_SUCCESS, 1, { provider: message.provider }); + + await this.tryDeleteMessage(msg.ReceiptHandle); + } finally { + span.finish(); + } + }); + } + + private parseMessage(body: string): Result { + try { + const json = JSON.parse(body); + const result = messageSchema.safeParse(json); + if (!result.success) { + return Err('invalid_schema'); + } + return Ok(result.data); + } catch (_err) { + return Err('json_parse'); + } + } + + private async tryDeleteMessage(receiptHandle: string): Promise { + try { + await this.sqs.send(new DeleteMessageCommand({ QueueUrl: this.queueUrl, ReceiptHandle: receiptHandle })); + } catch (err) { + report(new Error('webhook dispatch consumer delete failed', { cause: err })); + } + } +} + +function getClientErrorResponsePayload(err: { payload?: unknown }): string | null { + const payload = err.payload; + if (!payload || typeof payload !== 'object' || !('response' in payload)) { + return null; + } + + const responsePayload = payload.response; + if (responsePayload === undefined) { + return null; + } + + return JSON.stringify(responsePayload); +} diff --git a/packages/jobs/lib/webhook/dispatch-queue/consumer.unit.test.ts b/packages/jobs/lib/webhook/dispatch-queue/consumer.unit.test.ts new file mode 100644 index 0000000000..001251179d --- /dev/null +++ b/packages/jobs/lib/webhook/dispatch-queue/consumer.unit.test.ts @@ -0,0 +1,315 @@ +import { DeleteMessageCommand, ReceiveMessageCommand } from '@aws-sdk/client-sqs'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +import { Err, Ok } from '@nangohq/utils'; + +vi.mock('../../env.js', () => ({ + envs: { + AWS_REGION: undefined + } +})); + +import { DispatchQueueConsumer } from './consumer.js'; + +import type { SQSClient } from '@aws-sdk/client-sqs'; +import type { OrchestratorClient } from '@nangohq/nango-orchestrator'; +import type { WebhookDispatchMessage } from '@nangohq/types'; + +function buildMessage(overrides: Partial = {}): WebhookDispatchMessage { + return { + version: 1, + kind: 'webhook', + taskName: 'webhook:abc123', + createdAt: '2026-04-23T00:00:00.000Z', + accountId: 1, + integrationId: 3, + provider: 'github', + parentSyncName: 'sync-1', + activityLogId: 'log-1', + webhookName: 'push', + connection: { id: 42, connection_id: 'conn-1', provider_config_key: 'github-dev', environment_id: 2 }, + payload: { hello: 'world' }, + ...overrides + }; +} + +function abortError(): Error { + const error = new Error('aborted'); + error.name = 'AbortError'; + return error; +} + +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +interface Harness { + consumer: DispatchQueueConsumer; + sqsSend: ReturnType; + sqsDestroy: ReturnType; + orchestratorExecuteWebhook: ReturnType; +} + +function makeHarness( + opts: { + messages?: WebhookDispatchMessage[]; + badBody?: string; + consumerConcurrency?: number; + maxAgeMs?: number; + sqsSend?: ReturnType; + } = {} +): Harness { + const messages = opts.messages ?? []; + const bodyQueue: { Body: string; ReceiptHandle: string; Attributes: Record }[] = []; + for (let i = 0; i < messages.length; i++) { + bodyQueue.push({ Body: JSON.stringify(messages[i]), ReceiptHandle: `rh-${i}`, Attributes: { SentTimestamp: String(Date.now() - 500) } }); + } + if (opts.badBody !== undefined) { + bodyQueue.push({ Body: opts.badBody, ReceiptHandle: `rh-bad`, Attributes: { SentTimestamp: String(Date.now()) } }); + } + + const sqsSend = + opts.sqsSend ?? + vi.fn(async (command: unknown) => { + await new Promise((resolve) => setImmediate(resolve)); + if (command instanceof ReceiveMessageCommand) { + const messages = bodyQueue.splice(0, bodyQueue.length); + return { Messages: messages }; + } + if (command instanceof DeleteMessageCommand) { + return {}; + } + throw new Error(`unexpected command ${String(command)}`); + }); + + const sqsDestroy = vi.fn(); + const sqs = { send: sqsSend, destroy: sqsDestroy } as unknown as SQSClient; + + const orchestratorExecuteWebhook = vi.fn(); + orchestratorExecuteWebhook.mockResolvedValue(Ok({ taskId: 'task-1', retryKey: 'rk-1' })); + const orchestratorClient = { executeWebhook: orchestratorExecuteWebhook } as unknown as OrchestratorClient; + + const consumer = new DispatchQueueConsumer({ + sqs, + queueUrl: 'http://queue', + orchestratorClient, + webhookMaxConcurrency: 500, + consumerConcurrency: opts.consumerConcurrency ?? 1, + maxMessages: 10, + waitTimeSeconds: 0, + visibilityTimeoutSeconds: 30, + maxAgeMs: opts.maxAgeMs ?? 0 + }); + + return { consumer, sqsSend, sqsDestroy, orchestratorExecuteWebhook }; +} + +function getDeleteCalls(h: Harness) { + return h.sqsSend.mock.calls.filter((c) => c[0] instanceof DeleteMessageCommand); +} + +async function runOnce(h: Harness, waitFor: () => void | Promise): Promise { + h.consumer.start(); + await vi.waitFor(waitFor); + await h.consumer.stop(); +} + +describe('DispatchQueueConsumer', () => { + beforeEach(() => { + vi.restoreAllMocks(); + }); + + it('schedules a valid message and deletes it on success', async () => { + const msg = buildMessage(); + const h = makeHarness({ messages: [msg] }); + await runOnce(h, () => { + expect(getDeleteCalls(h)).toHaveLength(1); + }); + + expect(h.orchestratorExecuteWebhook).toHaveBeenCalledTimes(1); + const call = h.orchestratorExecuteWebhook.mock.calls[0]?.[0]; + expect(call).toMatchObject({ + name: msg.taskName, + group: { key: 'webhook:environment:2', maxConcurrency: 500 }, + args: { + webhookName: msg.webhookName, + parentSyncName: msg.parentSyncName, + connection: msg.connection, + activityLogId: msg.activityLogId, + input: msg.payload + } + }); + const deleteCalls = getDeleteCalls(h); + expect(deleteCalls).toHaveLength(1); + expect(h.sqsDestroy).toHaveBeenCalledOnce(); + }); + + it('treats duplicate task-name scheduling errors as already processed and deletes the message', async () => { + const h = makeHarness({ messages: [buildMessage()] }); + h.orchestratorExecuteWebhook.mockResolvedValueOnce( + Err({ + name: 'duplicate_task_name', + message: 'Task with name already exists', + payload: { taskName: 'webhook:abc123' } + }) + ); + + await runOnce(h, () => { + expect(getDeleteCalls(h)).toHaveLength(1); + }); + + expect(h.orchestratorExecuteWebhook).toHaveBeenCalledTimes(1); + const deleteCalls = getDeleteCalls(h); + expect(deleteCalls).toHaveLength(1); + }); + + it('does not delete when orchestrator returns a non-duplicate error', async () => { + const h = makeHarness({ messages: [buildMessage()] }); + h.orchestratorExecuteWebhook.mockResolvedValueOnce(Err({ name: 'boom', message: 'boom', payload: null })); + + await runOnce(h, () => { + expect(h.orchestratorExecuteWebhook).toHaveBeenCalledTimes(1); + }); + + expect(h.orchestratorExecuteWebhook).toHaveBeenCalledTimes(1); + const deleteCalls = getDeleteCalls(h); + expect(deleteCalls).toHaveLength(0); + }); + + it('deletes a poison-pill message without calling orchestrator', async () => { + const h = makeHarness({ badBody: 'not-json' }); + await runOnce(h, () => { + expect(getDeleteCalls(h)).toHaveLength(1); + }); + + expect(h.orchestratorExecuteWebhook).not.toHaveBeenCalled(); + const deleteCalls = getDeleteCalls(h); + expect(deleteCalls).toHaveLength(1); + }); + + it('treats an empty-body message as a poison pill and deletes it', async () => { + const h = makeHarness({ badBody: '' }); + await runOnce(h, () => { + expect(getDeleteCalls(h)).toHaveLength(1); + }); + + expect(h.orchestratorExecuteWebhook).not.toHaveBeenCalled(); + const deleteCalls = getDeleteCalls(h); + expect(deleteCalls).toHaveLength(1); + }); + + it('rejects a schema-invalid message as poison and deletes it', async () => { + const invalid = { ...buildMessage(), kind: 'wrong' }; + const h = makeHarness({ badBody: JSON.stringify(invalid) }); + await runOnce(h, () => { + expect(getDeleteCalls(h)).toHaveLength(1); + }); + + expect(h.orchestratorExecuteWebhook).not.toHaveBeenCalled(); + const deleteCalls = getDeleteCalls(h); + expect(deleteCalls).toHaveLength(1); + }); + + it('still deletes successfully scheduled messages during graceful shutdown', async () => { + const message = buildMessage(); + const scheduled = deferred>(); + let received = false; + const sqsSend = vi.fn(async (command: unknown, options?: { abortSignal?: AbortSignal }) => { + await new Promise((resolve) => setImmediate(resolve)); + if (command instanceof ReceiveMessageCommand) { + if (!received) { + received = true; + return { + Messages: [{ Body: JSON.stringify(message), ReceiptHandle: 'rh-0', Attributes: { SentTimestamp: String(Date.now() - 500) } }] + }; + } + + if (options?.abortSignal?.aborted) { + throw abortError(); + } + return { Messages: [] }; + } + + if (command instanceof DeleteMessageCommand) { + if (options?.abortSignal?.aborted) { + throw abortError(); + } + return {}; + } + + throw new Error(`unexpected command ${String(command)}`); + }); + + const h = makeHarness({ messages: [message], sqsSend }); + h.orchestratorExecuteWebhook.mockReturnValueOnce(scheduled.promise); + + h.consumer.start(); + await vi.waitFor(() => { + expect(h.orchestratorExecuteWebhook).toHaveBeenCalledOnce(); + }); + + const stopPromise = h.consumer.stop(); + scheduled.resolve(Ok({ taskId: 'task-1', retryKey: 'rk-1' })); + await stopPromise; + + const deleteCalls = h.sqsSend.mock.calls.filter((c) => c[0] instanceof DeleteMessageCommand); + expect(deleteCalls).toHaveLength(1); + expect(deleteCalls[0]?.[1]).toBeUndefined(); + }); + + it('deletes a stale message without calling orchestrator', async () => { + const h = makeHarness({ messages: [buildMessage()], maxAgeMs: 100 }); + // SentTimestamp in makeHarness is Date.now() - 500, which exceeds maxAgeMs of 100ms + await runOnce(h, () => { + expect(getDeleteCalls(h)).toHaveLength(1); + }); + + expect(h.orchestratorExecuteWebhook).not.toHaveBeenCalled(); + expect(getDeleteCalls(h)).toHaveLength(1); + }); + + it('starts one poll loop per configured consumerConcurrency', async () => { + let receiveCalls = 0; + const firstReceives = [deferred(), deferred()]; + const sqsSend = vi.fn(async (command: unknown, options?: { abortSignal?: AbortSignal }) => { + if (command instanceof ReceiveMessageCommand) { + const index = receiveCalls++; + const pending = firstReceives[index]; + if (pending) { + return await new Promise((resolve, reject) => { + const onAbort = () => reject(abortError()); + options?.abortSignal?.addEventListener('abort', onAbort, { once: true }); + pending.promise.then(() => resolve({ Messages: [] }), reject); + }); + } + + if (options?.abortSignal?.aborted) { + throw abortError(); + } + return { Messages: [] }; + } + + if (command instanceof DeleteMessageCommand) { + return {}; + } + + throw new Error(`unexpected command ${String(command)}`); + }); + + const h = makeHarness({ consumerConcurrency: 2, sqsSend }); + h.consumer.start(); + + await vi.waitFor(() => { + expect(receiveCalls).toBe(2); + }); + + await h.consumer.stop(); + expect(h.sqsDestroy).toHaveBeenCalledOnce(); + }); +}); diff --git a/packages/orchestrator/lib/clients/client.integration.test.ts b/packages/orchestrator/lib/clients/client.integration.test.ts index 8fc27b0928..174d4eba18 100644 --- a/packages/orchestrator/lib/clients/client.integration.test.ts +++ b/packages/orchestrator/lib/clients/client.integration.test.ts @@ -163,6 +163,41 @@ describe('OrchestratorClient', async () => { }); }); + describe('immediate', () => { + it('should return a structured duplicate-name error when task name already exists', async () => { + const name = nanoid(); + const groupKey = nanoid(); + const request = { + name, + group: { key: groupKey, maxConcurrency: 0 }, + retry: { count: 0, max: 0 }, + timeoutSettingsInSecs: { createdToStarted: 30, startedToCompleted: 30, heartbeat: 60 }, + args: { + type: 'action' as const, + actionName: nanoid(), + connection: { + id: 123, + connection_id: 'C', + provider_config_key: 'P', + environment_id: 456 + }, + activityLogId: '789', + input: { foo: 'bar' } + } + }; + + const first = await client.immediate(request); + expect(first.isOk()).toBe(true); + + const duplicate = await client.immediate(request); + expect(duplicate.isErr()).toBe(true); + if (duplicate.isErr()) { + expect(duplicate.error.name).toBe('duplicate_task_name'); + expect(duplicate.error.payload).toEqual({}); + } + }); + }); + describe('executeAction', () => { it('should be successful when action task succeed', async () => { const groupKey = nanoid(); diff --git a/packages/orchestrator/lib/clients/client.ts b/packages/orchestrator/lib/clients/client.ts index 7ba34d5fb0..7104a8cc8c 100644 --- a/packages/orchestrator/lib/clients/client.ts +++ b/packages/orchestrator/lib/clients/client.ts @@ -65,6 +65,15 @@ export class OrchestratorClient { public async immediate(props: ImmediateProps): Promise> { const res = await this.routeFetch(postImmediateRoute)({ body: props }); if ('error' in res) { + const duplicateMessage = getDuplicateTaskNameMessage(res.error.payload); + if (duplicateMessage !== null) { + return Err({ + name: 'duplicate_task_name', + message: duplicateMessage || 'Task with this name already exists', + payload: {} + }); + } + return Err({ name: res.error.code, message: res.error.message || `Error scheduling immediate task`, @@ -255,7 +264,7 @@ export class OrchestratorClient { public async executeWebhook(props: ExecuteWebhookProps): Promise { const { args, ...rest } = props; - const schedulingProps = { + const schedulingProps: ImmediateProps = { ...rest, retry: { count: 0, max: 0 }, timeoutSettingsInSecs: { @@ -517,3 +526,31 @@ export class OrchestratorClient { } } } + +function getDuplicateTaskNameMessage(payload: unknown): string | null { + if (!payload || typeof payload !== 'object' || !('error' in payload)) { + return null; + } + + const response = payload as { + error?: { + code?: string; + message?: string; + }; + }; + + if (response.error?.code !== 'duplicate_task_name') { + return null; + } + + return response.error.message || ''; +} + +export function isDuplicateTaskNameClientError(err: unknown): boolean { + if (!err || typeof err !== 'object') { + return false; + } + + const error = err as { name?: string }; + return error.name === 'duplicate_task_name'; +} diff --git a/packages/orchestrator/lib/clients/client.unit.test.ts b/packages/orchestrator/lib/clients/client.unit.test.ts new file mode 100644 index 0000000000..67522d8252 --- /dev/null +++ b/packages/orchestrator/lib/clients/client.unit.test.ts @@ -0,0 +1,100 @@ +import { afterEach, describe, expect, it, vi } from 'vitest'; + +import { OrchestratorClient } from './client.js'; + +import type { ImmediateProps } from './types.js'; + +function buildImmediateRequest(): ImmediateProps { + return { + name: 'task-1', + group: { key: 'group-1', maxConcurrency: 0 }, + retry: { count: 0, max: 0 }, + timeoutSettingsInSecs: { createdToStarted: 30, startedToCompleted: 30, heartbeat: 60 }, + args: { + type: 'action', + actionName: 'action-1', + connection: { + id: 123, + connection_id: 'connection-1', + provider_config_key: 'provider-config-key-1', + environment_id: 456 + }, + activityLogId: 'activity-log-1', + input: { foo: 'bar' }, + async: false + } + }; +} + +describe('OrchestratorClient immediate', () => { + afterEach(() => { + vi.restoreAllMocks(); + vi.unstubAllGlobals(); + }); + + it('maps duplicate-name conflicts from the API while preserving existing retries', async () => { + const fetchMock = vi.fn().mockImplementation(() => { + return new Response( + JSON.stringify({ + error: { + code: 'duplicate_task_name', + message: 'task already exists' + } + }), + { status: 409, headers: { 'content-type': 'application/json' } } + ); + }); + vi.stubGlobal('fetch', fetchMock); + + const client = new OrchestratorClient({ baseUrl: 'http://orchestrator.test' }); + const res = await client.immediate(buildImmediateRequest()); + + expect(res.isErr()).toBe(true); + if (res.isErr()) { + expect(res.error.name).toBe('duplicate_task_name'); + expect(res.error.payload).toEqual({}); + } + expect(fetchMock).toHaveBeenCalledTimes(3); + }); + + it('retries transient 5xx responses', async () => { + const fetchMock = vi + .fn() + .mockResolvedValueOnce( + new Response(JSON.stringify({ error: { code: 'server_error', message: 'temporary failure' } }), { + status: 500, + headers: { 'content-type': 'application/json' } + }) + ) + .mockResolvedValueOnce( + new Response(JSON.stringify({ error: { code: 'server_error', message: 'temporary failure' } }), { + status: 500, + headers: { 'content-type': 'application/json' } + }) + ) + .mockResolvedValueOnce(new Response(JSON.stringify({ taskId: 'task-1', retryKey: 'retry-key-1' }), { status: 200 })); + vi.stubGlobal('fetch', fetchMock); + + const client = new OrchestratorClient({ baseUrl: 'http://orchestrator.test' }); + const res = await client.immediate(buildImmediateRequest()); + + expect(res.isOk()).toBe(true); + expect(fetchMock).toHaveBeenCalledTimes(3); + }); + + it('preserves existing retries on non-immediate route errors', async () => { + const fetchMock = vi.fn().mockResolvedValue( + new Response(JSON.stringify({ error: { code: 'schedule_not_found', message: 'missing schedule' } }), { + status: 404, + headers: { 'content-type': 'application/json' } + }) + ); + vi.stubGlobal('fetch', fetchMock); + + const client = new OrchestratorClient({ baseUrl: 'http://orchestrator.test' }); + const res = await client.pauseSync({ scheduleName: 'schedule-1' }); + + expect(res.isErr()).toBe(true); + expect(fetchMock).toHaveBeenCalledTimes(3); + }); +}); diff --git a/packages/orchestrator/lib/routes/v1/postImmediate.ts b/packages/orchestrator/lib/routes/v1/postImmediate.ts index 491ed21aea..e79e4700e0 100644 --- a/packages/orchestrator/lib/routes/v1/postImmediate.ts +++ b/packages/orchestrator/lib/routes/v1/postImmediate.ts @@ -1,5 +1,6 @@ import * as z from 'zod'; +import { isDuplicateTaskNameError } from '@nangohq/scheduler'; import { validateRequest } from '@nangohq/utils'; import { actionArgsSchema, onEventArgsSchema, syncAbortArgsSchema, syncArgsSchema, webhookArgsSchema } from '../../clients/validate.js'; @@ -34,7 +35,7 @@ export type PostImmediate = Endpoint<{ }; args: JsonValue & { type: TaskType }; }; - Error: ApiError<'immediate_failed'>; + Error: ApiError<'immediate_failed' | 'duplicate_task_name'>; Success: { taskId: string; retryKey: string }; }>; @@ -111,6 +112,16 @@ const handler = (scheduler: Scheduler) => { heartbeatTimeoutSecs: res.locals.parsedBody.timeoutSettingsInSecs.heartbeat }); if (task.isErr()) { + if (isDuplicateTaskNameError(task.error)) { + res.status(409).json({ + error: { + code: 'duplicate_task_name', + message: task.error.message + } + }); + return; + } + res.status(500).json({ error: { code: 'immediate_failed', message: task.error.message } }); return; } diff --git a/packages/scheduler/lib/errors.ts b/packages/scheduler/lib/errors.ts new file mode 100644 index 0000000000..a37033b51f --- /dev/null +++ b/packages/scheduler/lib/errors.ts @@ -0,0 +1,10 @@ +export class DuplicateTaskNameError extends Error { + constructor() { + super('Task with this name already exists'); + this.name = 'DuplicateTaskNameError'; + } +} + +export function isDuplicateTaskNameError(err: unknown): boolean { + return err instanceof DuplicateTaskNameError; +} diff --git a/packages/scheduler/lib/index.ts b/packages/scheduler/lib/index.ts index f9a9a6b45f..fb61b8e07e 100644 --- a/packages/scheduler/lib/index.ts +++ b/packages/scheduler/lib/index.ts @@ -1,5 +1,6 @@ export * from './scheduler.js'; export * from './types.js'; +export * from './errors.js'; export * from './db/helpers.test.js'; export * from './db/client.js'; export * from './utils/format.js'; diff --git a/packages/scheduler/lib/models/tasks.integration.test.ts b/packages/scheduler/lib/models/tasks.integration.test.ts index bd23c60da5..08d3457022 100644 --- a/packages/scheduler/lib/models/tasks.integration.test.ts +++ b/packages/scheduler/lib/models/tasks.integration.test.ts @@ -4,6 +4,7 @@ import { nanoid } from '@nangohq/utils'; import * as tasks from './tasks.js'; import { getTestDbClient } from '../db/helpers.test.js'; +import { isDuplicateTaskNameError } from '../errors.js'; import { taskStates } from '../types.js'; import type { Task, TaskState } from '../types.js'; @@ -84,6 +85,17 @@ describe('Task', () => { expect(res.tasks[1]?.name).toBe('Also not capped'); expect(res.cappedGroupKeys).toEqual([props.groupKey]); }); + it('should error on unique-name collision', async () => { + const name = `dup-${nanoid()}`; + const first = await tasks.create(db, [{ ...props, name }]); + expect(first.isOk()).toBe(true); + + const second = await tasks.create(db, [{ ...props, name }]); + expect(second.isErr()).toBe(true); + if (second.isErr()) { + expect(isDuplicateTaskNameError(second.error)).toBe(true); + } + }); it('should have their heartbeat updated', async () => { const t = await startTask(db); await new Promise((resolve) => void setTimeout(resolve, 20)); diff --git a/packages/scheduler/lib/models/tasks.ts b/packages/scheduler/lib/models/tasks.ts index 0c2f995cb7..bcf52dd2d9 100644 --- a/packages/scheduler/lib/models/tasks.ts +++ b/packages/scheduler/lib/models/tasks.ts @@ -2,6 +2,7 @@ import { uuidv4, uuidv7 } from 'uuidv7'; import { Err, Ok, metrics, stringToHash, stringifyError } from '@nangohq/utils'; +import { DuplicateTaskNameError } from '../errors.js'; import { taskStates } from '../types.js'; import { SCHEDULES_TABLE } from './schedules.js'; import { envs } from '../env.js'; @@ -172,21 +173,33 @@ export async function create( metrics.increment(metrics.Types.ORCH_TASKS_DROPPED, droppedCount, { primitive, reason: 'task_cap' }); } const toInsert = Array.from(toInsertPerGroup.values()).flat(); - const inserted: Task[] = []; + const tasks: Task[] = []; while (toInsert.length) { const chunk = toInsert.splice(0, TASKS_INSERT_BATCH_SIZE); const batch = await db.from(TASKS_TABLE).insert(chunk.map(DbTask.to)).returning('*'); - inserted.push(...batch.map(DbTask.from)); + tasks.push(...batch.map(DbTask.from)); } return Ok({ - tasks: inserted, + tasks, cappedGroupKeys: Array.from(cappedGroupCounts.keys()) }); } catch (err) { + if (isTasksUniqueNameViolation(err)) { + return Err(new DuplicateTaskNameError()); + } return Err(new Error(`Error creating tasks: ${stringifyError(err)}`)); } } +function isTasksUniqueNameViolation(err: unknown): boolean { + if (!err || typeof err !== 'object') { + return false; + } + + const error = err as { code?: string; constraint?: string; message?: string }; + return error.code === '23505' && error.constraint === 'tasks_unique_name'; +} + // Coalesce concurrent queueSizes queries for the same group keys. // When multiple immediate() calls target the same group key concurrently, // they share a single DB query instead of each running their own count. diff --git a/packages/scheduler/lib/scheduler.integration.test.ts b/packages/scheduler/lib/scheduler.integration.test.ts index d3d3783a89..9858e64e92 100644 --- a/packages/scheduler/lib/scheduler.integration.test.ts +++ b/packages/scheduler/lib/scheduler.integration.test.ts @@ -4,6 +4,7 @@ import { nanoid } from '@nangohq/utils'; import { getTestDbClient } from './db/helpers.test.js'; import { envs } from './env.js'; +import { isDuplicateTaskNameError } from './errors.js'; import { Scheduler } from './scheduler.js'; import type { TaskProps } from './models/tasks.js'; @@ -70,6 +71,32 @@ describe('Scheduler', () => { await immediate(scheduler); expect(callbacks.CREATED).toHaveBeenCalledOnce(); }); + it('should return a duplicate-name error when an immediate task already exists', async () => { + const name = `dup-${nanoid()}`; + const groupKey = nanoid(); + + await immediate(scheduler, { taskProps: { name, groupKey } }); + + const duplicate = await scheduler.immediate({ + name, + payload: {}, + groupKey, + groupMaxConcurrency: 0, + retryMax: 1, + retryCount: 0, + createdToStartedTimeoutSecs: 3600, + startedToCompletedTimeoutSecs: 3600, + heartbeatTimeoutSecs: 600, + ownerKey: null, + retryKey: null + }); + + expect(duplicate.isErr()).toBe(true); + if (duplicate.isErr()) { + expect(isDuplicateTaskNameError(duplicate.error)).toBe(true); + } + expect(callbacks.CREATED).toHaveBeenCalledOnce(); + }); it('should call callback when task is started', async () => { const task = await immediate(scheduler); (await scheduler.dequeue({ groupKeyPattern: task.groupKey, limit: 1 })).unwrap(); diff --git a/packages/server/lib/webhook/dispatch-queue/client.ts b/packages/server/lib/webhook/dispatch-queue/client.ts new file mode 100644 index 0000000000..e63336c519 --- /dev/null +++ b/packages/server/lib/webhook/dispatch-queue/client.ts @@ -0,0 +1,26 @@ +import { SQSClient } from '@aws-sdk/client-sqs'; + +import { DispatchQueuePublisher } from './publisher.js'; +import { envs } from '../../env.js'; + +function buildPublisher(): DispatchQueuePublisher | null { + const url = envs.NANGO_TASK_DISPATCH_QUEUE_URL; + if (!url) { + return null; + } + if (new URL(url).pathname.replace(/\/$/, '').endsWith('.fifo')) { + throw new Error('Webhook dispatch queue must be a Standard SQS queue; FIFO queues would serialize environment traffic.'); + } + return new DispatchQueuePublisher({ + sqs: new SQSClient(envs.AWS_REGION ? { region: envs.AWS_REGION } : {}), + queueUrl: url, + batchSize: envs.NANGO_TASK_DISPATCH_PUBLISH_BATCH_SIZE, + publishConcurrency: envs.NANGO_TASK_DISPATCH_PUBLISH_CONCURRENCY + }); +} + +/** + * Singleton DispatchQueuePublisher, or null when `NANGO_TASK_DISPATCH_QUEUE_URL` + * is unset (self-hosted / local dev). Callers must check for null before use. + */ +export const dispatchQueuePublisher: DispatchQueuePublisher | null = buildPublisher(); diff --git a/packages/server/lib/webhook/dispatch-queue/client.unit.test.ts b/packages/server/lib/webhook/dispatch-queue/client.unit.test.ts new file mode 100644 index 0000000000..79bfeb5f1d --- /dev/null +++ b/packages/server/lib/webhook/dispatch-queue/client.unit.test.ts @@ -0,0 +1,52 @@ +import { afterEach, describe, expect, it, vi } from 'vitest'; + +describe('dispatchQueuePublisher', () => { + afterEach(() => { + vi.resetModules(); + vi.unmock('../../env.js'); + }); + + it('wires batch size and publish concurrency from envs', async () => { + vi.doMock('../../env.js', () => ({ + envs: { + AWS_REGION: 'eu-west-1', + NANGO_TASK_DISPATCH_QUEUE_URL: 'https://sqs.us-west-2.amazonaws.com/123456789012/nango-task-dispatch-development', + NANGO_TASK_DISPATCH_PUBLISH_BATCH_SIZE: 8, + NANGO_TASK_DISPATCH_PUBLISH_CONCURRENCY: 3 + } + })); + + const { dispatchQueuePublisher } = await import('./client.js'); + const publisher = dispatchQueuePublisher as any; + + expect(publisher).not.toBeNull(); + expect(publisher.batchSize).toBe(8); + expect(publisher.publishConcurrency).toBe(3); + }); + + it('rejects fifo queue urls', async () => { + vi.doMock('../../env.js', () => ({ + envs: { + AWS_REGION: 'eu-west-1', + NANGO_TASK_DISPATCH_QUEUE_URL: 'https://sqs.us-west-2.amazonaws.com/123456789012/nango-task-dispatch-development.fifo', + NANGO_TASK_DISPATCH_PUBLISH_BATCH_SIZE: 10, + NANGO_TASK_DISPATCH_PUBLISH_CONCURRENCY: 2 + } + })); + + await expect(import('./client.js')).rejects.toThrow('Webhook dispatch queue must be a Standard SQS queue'); + }); + + it('rejects fifo queue urls with trailing slash', async () => { + vi.doMock('../../env.js', () => ({ + envs: { + AWS_REGION: 'eu-west-1', + NANGO_TASK_DISPATCH_QUEUE_URL: 'https://sqs.us-west-2.amazonaws.com/123456789012/nango-task-dispatch-development.fifo/', + NANGO_TASK_DISPATCH_PUBLISH_BATCH_SIZE: 10, + NANGO_TASK_DISPATCH_PUBLISH_CONCURRENCY: 2 + } + })); + + await expect(import('./client.js')).rejects.toThrow('Webhook dispatch queue must be a Standard SQS queue'); + }); +}); diff --git a/packages/server/lib/webhook/dispatch-queue/publisher.ts b/packages/server/lib/webhook/dispatch-queue/publisher.ts new file mode 100644 index 0000000000..ccc8c5b339 --- /dev/null +++ b/packages/server/lib/webhook/dispatch-queue/publisher.ts @@ -0,0 +1,232 @@ +import { SendMessageBatchCommand } from '@aws-sdk/client-sqs'; +import tracer from 'dd-trace'; + +import { metrics, report } from '@nangohq/utils'; + +import { runWithConcurrencyLimit } from '../runWithConcurrencyLimit.js'; + +import type { SQSClient, SendMessageBatchCommandOutput, SendMessageBatchRequestEntry } from '@aws-sdk/client-sqs'; +import type { WebhookDispatchMessage } from '@nangohq/types'; + +const SQS_BATCH_MAX_ENTRIES = 10; +const DEFAULT_PUBLISH_CONCURRENCY = 10; +const SQS_BATCH_MAX_BYTES = 1_048_576; + +export interface DispatchQueuePublisherProps { + sqs: SQSClient; + queueUrl: string; + /** Max entries per SQS SendMessageBatch request. AWS caps this at 10. */ + batchSize?: number; + /** Max concurrent SQS SendMessageBatch requests. Defaults to 10. */ + publishConcurrency?: number; +} + +export interface PublishResult { + enqueued: number; + failed: number; + failedActivityLogIds: string[]; +} + +export interface PreparedDispatchMessage { + message: WebhookDispatchMessage; + byteSize: number; +} + +interface BatchPublishResult extends PublishResult { + retriedEntries: number; +} + +export class DispatchQueuePublisher { + private readonly sqs: SQSClient; + private readonly queueUrl: string; + private readonly batchSize: number; + private readonly publishConcurrency: number; + + constructor(props: DispatchQueuePublisherProps) { + this.sqs = props.sqs; + this.queueUrl = props.queueUrl; + const configuredBatchSize = props.batchSize ?? SQS_BATCH_MAX_ENTRIES; + if (configuredBatchSize < 1) { + throw new RangeError(`batchSize must be > 0`); + } + this.batchSize = Math.min(configuredBatchSize, SQS_BATCH_MAX_ENTRIES); + + const configuredPublishConcurrency = props.publishConcurrency ?? DEFAULT_PUBLISH_CONCURRENCY; + if (configuredPublishConcurrency < 1) { + throw new RangeError(`publishConcurrency must be > 0`); + } + this.publishConcurrency = configuredPublishConcurrency; + } + + /** + * Publish a list of dispatch messages in batches. Batches are fired in parallel up to + * `publishConcurrency`; failed entries within a batch are retried once inline. Any + * entries still failing after the retry are counted as `failed`. Regular SQS send + * failures and partial failures are returned in the counts instead of throwing so the + * caller can treat them as a metric/trace concern, not an HTTP 500 (provider retries + * are worse). + */ + async publish(messages: PreparedDispatchMessage[], messageGroupId: string): Promise { + if (messages.length === 0) { + return { enqueued: 0, failed: 0, failedActivityLogIds: [] }; + } + + const activeSpan = tracer.scope().active(); + const firstMessage = messages[0]!.message; + const batches = chunk(messages, this.batchSize); + const span = tracer.startSpan('webhook.dispatch.publish', { + ...(activeSpan ? { childOf: activeSpan } : {}), + tags: { + 'nango.accountId': firstMessage.accountId, + 'nango.environmentId': firstMessage.connection.environment_id, + 'nango.integrationId': firstMessage.integrationId, + 'nango.provider': firstMessage.provider, + 'nango.providerConfigKey': firstMessage.connection.provider_config_key, + 'nango.messageCount': messages.length, + 'nango.batchCount': batches.length + } + }); + + return await tracer.scope().activate(span, async () => { + try { + const results = await runWithConcurrencyLimit(batches, this.publishConcurrency, async (batch) => { + return await this.sendBatch(batch, messageGroupId); + }); + + const enqueued = results.reduce((sum, r) => sum + r.enqueued, 0); + const failed = results.reduce((sum, r) => sum + r.failed, 0); + const failedActivityLogIds = results.flatMap((r) => r.failedActivityLogIds); + const retriedEntries = results.reduce((sum, r) => sum + r.retriedEntries, 0); + const retriedBatches = results.filter((r) => r.retriedEntries > 0).length; + + span.setTag('nango.enqueued', enqueued); + span.setTag('nango.failed', failed); + span.setTag('nango.retriedEntries', retriedEntries); + span.setTag('nango.retriedBatches', retriedBatches); + + const provider = firstMessage.provider; + + if (enqueued > 0) { + metrics.increment(metrics.Types.WEBHOOK_DISPATCH_PUBLISH_SUCCESS, enqueued, { provider }); + } + if (failed > 0) { + const error = new Error(`Failed to enqueue ${failed} webhook dispatch message${failed === 1 ? '' : 's'}`); + span.setTag('error', error); + span.setTag('nango.partialFailure', true); + + metrics.increment(metrics.Types.WEBHOOK_DISPATCH_PUBLISH_FAILURE, failed, { provider }); + } + + return { enqueued, failed, failedActivityLogIds }; + } catch (err) { + span.setTag('error', err); + throw err; + } finally { + span.finish(); + } + }); + } + + private async sendBatch(batch: PreparedDispatchMessage[], messageGroupId: string): Promise { + const entries = batch.map((message, idx) => toEntry(message, idx, messageGroupId)); + + const first = await this.trySend(entries); + const failedIndices = first.failedIds.flatMap((id) => { + const index = entryIdToIndex(id); + if (index === null) { + report(new Error('webhook_dispatch_invalid_failed_entry_id'), { entryId: id }); + return []; + } + + return [index]; + }); + const invalidFailedCount = first.failedIds.length - failedIndices.length; + if (failedIndices.length === 0) { + return { + enqueued: batch.length - invalidFailedCount, + failed: invalidFailedCount, + failedActivityLogIds: [], + retriedEntries: 0 + }; + } + + const retryEntries = failedIndices.map((i) => entries[i]).filter((e): e is SendMessageBatchRequestEntry => e !== undefined); + const second = await this.trySend(retryEntries); + + const enqueued = batch.length - invalidFailedCount - second.failedIds.length; + return { + enqueued, + failed: invalidFailedCount + second.failedIds.length, + failedActivityLogIds: second.failedIds.flatMap((id) => { + const index = entryIdToIndex(id); + if (index === null) { + report(new Error('webhook_dispatch_invalid_failed_entry_id'), { entryId: id }); + return []; + } + + const activityLogId = batch[index]?.message.activityLogId; + return activityLogId ? [activityLogId] : []; + }), + retriedEntries: failedIndices.length + }; + } + + private async trySend(entries: SendMessageBatchRequestEntry[]): Promise<{ failedIds: string[] }> { + if (entries.length === 0) return { failedIds: [] }; + try { + const response: SendMessageBatchCommandOutput = await this.sqs.send(new SendMessageBatchCommand({ QueueUrl: this.queueUrl, Entries: entries })); + const failedIds = (response.Failed ?? []).map((f) => f.Id).filter((id): id is string => typeof id === 'string'); + return { failedIds }; + } catch (err) { + tracer.scope().active()?.setTag('sqs.send.error', err); + return { failedIds: entries.map((e) => e.Id!).filter((id): id is string => typeof id === 'string') }; + } + } +} + +function toEntry(message: PreparedDispatchMessage, index: number, messageGroupId: string): SendMessageBatchRequestEntry { + return { + Id: indexToEntryId(index), + MessageBody: JSON.stringify(message.message), + MessageGroupId: messageGroupId + }; +} + +function indexToEntryId(index: number): string { + return `m${index}`; +} + +function entryIdToIndex(id: string): number | null { + if (!id.startsWith('m')) { + return null; + } + + const index = Number.parseInt(id.slice(1), 10); + return Number.isNaN(index) ? null : index; +} + +function chunk(items: PreparedDispatchMessage[], size: number): PreparedDispatchMessage[][] { + const chunks: PreparedDispatchMessage[][] = []; + let currentChunk: PreparedDispatchMessage[] = []; + let currentChunkBytes = 0; + + for (const item of items) { + const exceedsBatchSize = currentChunk.length >= size; + const exceedsBatchBytes = currentChunkBytes + item.byteSize > SQS_BATCH_MAX_BYTES; + + if (currentChunk.length > 0 && (exceedsBatchSize || exceedsBatchBytes)) { + chunks.push(currentChunk); + currentChunk = []; + currentChunkBytes = 0; + } + + currentChunk.push(item); + currentChunkBytes += item.byteSize; + } + + if (currentChunk.length > 0) { + chunks.push(currentChunk); + } + + return chunks; +} diff --git a/packages/server/lib/webhook/dispatch-queue/publisher.unit.test.ts b/packages/server/lib/webhook/dispatch-queue/publisher.unit.test.ts new file mode 100644 index 0000000000..e2fd759543 --- /dev/null +++ b/packages/server/lib/webhook/dispatch-queue/publisher.unit.test.ts @@ -0,0 +1,355 @@ +import { SendMessageBatchCommand } from '@aws-sdk/client-sqs'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +const tracerMocks = vi.hoisted(() => { + const span = { + setTag: vi.fn().mockReturnThis(), + finish: vi.fn() + }; + const activeSpan = { + traceId: 'active-trace', + setTag: vi.fn().mockReturnThis() + }; + + return { + activeSpan, + active: vi.fn(() => activeSpan), + activate: vi.fn(async (_span, callback: () => Promise) => await callback()), + startSpan: vi.fn(() => span), + span, + dogstatsd: { + increment: vi.fn(), + decrement: vi.fn(), + gauge: vi.fn(), + histogram: vi.fn(), + distribution: vi.fn() + } + }; +}); + +const utilsMocks = vi.hoisted(() => ({ + report: vi.fn() +})); + +vi.mock('dd-trace', () => { + return { + default: { + scope: () => ({ active: tracerMocks.active, activate: tracerMocks.activate }), + startSpan: tracerMocks.startSpan, + dogstatsd: tracerMocks.dogstatsd + } + }; +}); + +vi.mock('@nangohq/utils', async (importOriginal) => { + const actual = await importOriginal(); + + if (!actual || typeof actual !== 'object') { + throw new Error('Invalid @nangohq/utils mock'); + } + + return { + ...actual, + report: utilsMocks.report, + metrics: { + Types: { + WEBHOOK_DISPATCH_PUBLISH_SUCCESS: 'nango.webhook.dispatch_queue.publish.success', + WEBHOOK_DISPATCH_PUBLISH_FAILURE: 'nango.webhook.dispatch_queue.publish.failure' + }, + increment: tracerMocks.dogstatsd.increment + } + }; +}); + +import { DispatchQueuePublisher } from './publisher.js'; + +import type { PreparedDispatchMessage } from './publisher.js'; +import type { SQSClient, SendMessageBatchCommandOutput } from '@aws-sdk/client-sqs'; +import type { WebhookDispatchMessage } from '@nangohq/types'; + +function buildMessage(overrides: Partial = {}): WebhookDispatchMessage { + return { + version: 1, + kind: 'webhook', + taskName: 'webhook:abc123', + createdAt: '2026-04-23T00:00:00.000Z', + accountId: 1, + integrationId: 3, + provider: 'github', + parentSyncName: 'sync-1', + activityLogId: 'log-1', + webhookName: 'sync-1', + connection: { id: 42, connection_id: 'conn-1', provider_config_key: 'github-dev', environment_id: 2 }, + payload: { hello: 'world' }, + ...overrides + }; +} + +function buildPreparedMessage({ + messageOverrides, + byteSize +}: { + messageOverrides?: Partial; + byteSize?: number; +} = {}): PreparedDispatchMessage { + const message = buildMessage(messageOverrides); + + return { + message, + byteSize: byteSize ?? Buffer.byteLength(JSON.stringify(message), 'utf8') + }; +} + +function makeSqsMock( + responder: (command: SendMessageBatchCommand, callIndex: number) => SendMessageBatchCommandOutput | Promise +): { sqs: SQSClient; send: ReturnType } { + const send = vi.fn(); + let callIndex = 0; + send.mockImplementation(async (command: unknown) => { + if (!(command instanceof SendMessageBatchCommand)) { + throw new Error(`Unexpected command: ${String(command)}`); + } + return await responder(command, callIndex++); + }); + return { sqs: { send } as unknown as SQSClient, send }; +} + +function successfulBatchResponse(command: SendMessageBatchCommand): SendMessageBatchCommandOutput { + return { + $metadata: {}, + Successful: (command.input.Entries ?? []).map((e) => ({ Id: e.Id!, MessageId: 'x', MD5OfMessageBody: 'x' })), + Failed: [] + }; +} + +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +describe('DispatchQueuePublisher', () => { + beforeEach(() => { + vi.restoreAllMocks(); + utilsMocks.report.mockClear(); + tracerMocks.active.mockClear(); + tracerMocks.activate.mockClear(); + tracerMocks.startSpan.mockClear(); + tracerMocks.span.setTag.mockClear(); + tracerMocks.span.finish.mockClear(); + tracerMocks.activeSpan.setTag.mockClear(); + tracerMocks.dogstatsd.increment.mockClear(); + tracerMocks.dogstatsd.decrement.mockClear(); + tracerMocks.dogstatsd.gauge.mockClear(); + tracerMocks.dogstatsd.histogram.mockClear(); + tracerMocks.dogstatsd.distribution.mockClear(); + }); + + it('throws on invalid batchSize', () => { + const { sqs } = makeSqsMock(() => ({ $metadata: {}, Successful: [], Failed: [] })); + expect(() => new DispatchQueuePublisher({ sqs, queueUrl: 'http://q', batchSize: 0 })).toThrow('batchSize must be > 0'); + expect(() => new DispatchQueuePublisher({ sqs, queueUrl: 'http://q', batchSize: -1 })).toThrow('batchSize must be > 0'); + }); + + it('throws on invalid publishConcurrency', () => { + const { sqs } = makeSqsMock(() => ({ $metadata: {}, Successful: [], Failed: [] })); + expect(() => new DispatchQueuePublisher({ sqs, queueUrl: 'http://q', publishConcurrency: 0 })).toThrow('publishConcurrency must be > 0'); + }); + + it('returns {enqueued:0,failed:0} for empty input', async () => { + const { sqs, send } = makeSqsMock(() => ({ $metadata: {}, Successful: [], Failed: [] })); + const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); + const res = await publisher.publish([], 'account:1:env:2'); + expect(res).toEqual({ enqueued: 0, failed: 0, failedActivityLogIds: [] }); + expect(send.mock.calls).toHaveLength(0); + }); + + it('chunks into batches of <=10 for a 25-message input', async () => { + const { sqs, send } = makeSqsMock((cmd) => successfulBatchResponse(cmd)); + const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); + const messages = Array.from({ length: 25 }, () => buildPreparedMessage()); + const res = await publisher.publish(messages, 'account:1:env:2'); + expect(res).toEqual({ enqueued: 25, failed: 0, failedActivityLogIds: [] }); + expect(send.mock.calls).toHaveLength(3); + const entryCounts = send.mock.calls.map((call) => (call[0] as SendMessageBatchCommand).input.Entries?.length); + expect(entryCounts).toEqual([10, 10, 5]); + expect(tracerMocks.startSpan).toHaveBeenCalledWith('webhook.dispatch.publish', { + childOf: expect.objectContaining({ traceId: 'active-trace' }), + tags: expect.objectContaining({ + 'nango.accountId': 1, + 'nango.environmentId': 2, + 'nango.integrationId': 3, + 'nango.provider': 'github', + 'nango.providerConfigKey': 'github-dev', + 'nango.messageCount': 25, + 'nango.batchCount': 3 + }) + }); + expect(tracerMocks.activate).toHaveBeenCalledWith(tracerMocks.span, expect.any(Function)); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.enqueued', 25); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.failed', 0); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.retriedEntries', 0); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.retriedBatches', 0); + expect(tracerMocks.dogstatsd.increment).toHaveBeenCalledTimes(1); + expect(tracerMocks.dogstatsd.increment).toHaveBeenCalledWith('nango.webhook.dispatch_queue.publish.success', 25, { provider: 'github' }); + expect(tracerMocks.span.finish).toHaveBeenCalledTimes(1); + }); + + it('sets MessageGroupId on every entry', async () => { + const groupId = 'account:7:env:9'; + const seen: string[] = []; + const bodies: string[] = []; + const { sqs } = makeSqsMock((cmd) => { + for (const entry of cmd.input.Entries ?? []) { + if (entry.MessageGroupId) seen.push(entry.MessageGroupId); + if (entry.MessageBody) bodies.push(entry.MessageBody); + } + return successfulBatchResponse(cmd); + }); + const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); + const first = buildPreparedMessage(); + const second = buildPreparedMessage({ messageOverrides: { activityLogId: 'log-2' } }); + + await publisher.publish([first, second], groupId); + expect(seen).toEqual([groupId, groupId]); + expect(bodies).toEqual([JSON.stringify(first.message), JSON.stringify(second.message)]); + }); + + it('splits batches when cumulative bytes exceed the SQS request limit', async () => { + const { sqs, send } = makeSqsMock((cmd) => successfulBatchResponse(cmd)); + const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); + + const res = await publisher.publish( + [buildPreparedMessage({ byteSize: 600_000 }), buildPreparedMessage({ byteSize: 300_000 }), buildPreparedMessage({ byteSize: 300_000 })], + 'account:1:env:2' + ); + + expect(res).toEqual({ enqueued: 3, failed: 0, failedActivityLogIds: [] }); + expect(send.mock.calls).toHaveLength(2); + const entryCounts = send.mock.calls.map((call) => (call[0] as SendMessageBatchCommand).input.Entries?.length); + expect(entryCounts).toEqual([2, 1]); + }); + + it('limits concurrent batch publishes to publishConcurrency', async () => { + const responses = Array.from({ length: 4 }, () => deferred()); + let inFlight = 0; + let maxInFlight = 0; + + const { sqs, send } = makeSqsMock((_, call) => { + inFlight += 1; + maxInFlight = Math.max(maxInFlight, inFlight); + return responses[call]!.promise.finally(() => { + inFlight -= 1; + }); + }); + + const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q', batchSize: 1, publishConcurrency: 2 }); + const publishPromise = publisher.publish( + Array.from({ length: 4 }, () => buildPreparedMessage()), + 'account:1:env:2' + ); + + await vi.waitFor(() => { + expect(send.mock.calls).toHaveLength(2); + }); + expect(maxInFlight).toBe(2); + + responses[0]!.resolve(successfulBatchResponse(send.mock.calls[0]![0] as SendMessageBatchCommand)); + responses[1]!.resolve(successfulBatchResponse(send.mock.calls[1]![0] as SendMessageBatchCommand)); + + await vi.waitFor(() => { + expect(send.mock.calls).toHaveLength(4); + }); + expect(maxInFlight).toBe(2); + + responses[2]!.resolve(successfulBatchResponse(send.mock.calls[2]![0] as SendMessageBatchCommand)); + responses[3]!.resolve(successfulBatchResponse(send.mock.calls[3]![0] as SendMessageBatchCommand)); + + await expect(publishPromise).resolves.toEqual({ enqueued: 4, failed: 0, failedActivityLogIds: [] }); + }); + + it('retries failed entries once and reports remaining failures', async () => { + const responses: SendMessageBatchCommandOutput[] = [ + { + $metadata: {}, + Successful: [{ Id: 'm0', MessageId: 'ok', MD5OfMessageBody: 'x' }], + Failed: [ + { Id: 'm1', SenderFault: false, Code: 'InternalError', Message: 'boom' }, + { Id: 'm2', SenderFault: false, Code: 'InternalError', Message: 'boom' } + ] + }, + // second call is the retry of m1, m2 — only m1 recovers + { + $metadata: {}, + Successful: [{ Id: 'm1', MessageId: 'ok', MD5OfMessageBody: 'x' }], + Failed: [{ Id: 'm2', SenderFault: false, Code: 'InternalError', Message: 'still broken' }] + } + ]; + const { sqs, send } = makeSqsMock((_n, call) => responses[call]!); + const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); + const res = await publisher.publish( + [ + buildPreparedMessage(), + buildPreparedMessage({ messageOverrides: { activityLogId: 'log-2' } }), + buildPreparedMessage({ messageOverrides: { activityLogId: 'log-3' } }) + ], + 'account:1:env:2' + ); + expect(res).toEqual({ enqueued: 2, failed: 1, failedActivityLogIds: ['log-3'] }); + expect(send.mock.calls).toHaveLength(2); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.enqueued', 2); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.failed', 1); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.retriedEntries', 2); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.retriedBatches', 1); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.partialFailure', true); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('error', expect.any(Error)); + expect(tracerMocks.dogstatsd.increment).toHaveBeenCalledTimes(2); + expect(tracerMocks.dogstatsd.increment).toHaveBeenCalledWith('nango.webhook.dispatch_queue.publish.success', 2, { provider: 'github' }); + expect(tracerMocks.dogstatsd.increment).toHaveBeenCalledWith('nango.webhook.dispatch_queue.publish.failure', 1, { provider: 'github' }); + expect(tracerMocks.span.finish).toHaveBeenCalledTimes(1); + }); + + it('treats an SDK throw as all-entries-failed and still retries', async () => { + let call = 0; + const { sqs, send } = makeSqsMock(() => { + call++; + if (call === 1) throw new Error('network'); + return { $metadata: {}, Successful: [{ Id: 'm0', MessageId: 'ok', MD5OfMessageBody: 'x' }], Failed: [] }; + }); + const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); + const res = await publisher.publish([buildPreparedMessage()], 'account:1:env:2'); + expect(res).toEqual({ enqueued: 1, failed: 0, failedActivityLogIds: [] }); + expect(send.mock.calls).toHaveLength(2); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.retriedEntries', 1); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.retriedBatches', 1); + expect(tracerMocks.span.setTag.mock.calls.filter(([tag]) => tag === 'error')).toHaveLength(0); + expect(tracerMocks.activeSpan.setTag).toHaveBeenCalledWith('sqs.send.error', expect.any(Error)); + }); + + it('reports malformed failed entry ids without crashing retries', async () => { + const { sqs } = makeSqsMock((_n, call) => { + if (call === 0) { + return { + $metadata: {}, + Successful: [], + Failed: [{ Id: 'bogus', SenderFault: false, Code: 'InternalError', Message: 'boom' }] + }; + } + + return { + $metadata: {}, + Successful: [], + Failed: [{ Id: 'bogus', SenderFault: false, Code: 'InternalError', Message: 'still boom' }] + }; + }); + const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); + + const res = await publisher.publish([buildPreparedMessage()], 'account:1:env:2'); + + expect(res).toEqual({ enqueued: 0, failed: 1, failedActivityLogIds: [] }); + }); +}); diff --git a/packages/server/lib/webhook/internal-nango.ts b/packages/server/lib/webhook/internal-nango.ts index f002c8f039..375104f8c9 100644 --- a/packages/server/lib/webhook/internal-nango.ts +++ b/packages/server/lib/webhook/internal-nango.ts @@ -1,13 +1,70 @@ +import { createHash } from 'node:crypto'; + import get from 'lodash-es/get.js'; -import { connectionService, getSyncConfigsByConfigIdForWebhook } from '@nangohq/shared'; +import { OtlpSpan } from '@nangohq/logs'; +import { NangoError, connectionService, getSyncConfigsByConfigIdForWebhook } from '@nangohq/shared'; +import { errorToObject, metrics, report } from '@nangohq/utils'; import { envs } from '../env.js'; +import { runWithConcurrencyLimit } from './runWithConcurrencyLimit.js'; import { getOrchestrator } from '../utils/utils.js'; +import { dispatchQueuePublisher } from './dispatch-queue/client.js'; + +import type { DispatchQueuePublisher, PreparedDispatchMessage } from './dispatch-queue/publisher.js'; +import type { LogContext, LogContextGetter } from '@nangohq/logs'; +import type { + ConnectionInternal, + DBConnectionDecrypted, + DBEnvironment, + DBIntegrationDecrypted, + DBPlan, + DBSyncConfig, + DBTeam, + Metadata, + WebhookDispatchMessage +} from '@nangohq/types'; -import type { LogContextGetter } from '@nangohq/logs'; -import type { Config } from '@nangohq/shared'; -import type { ConnectionInternal, DBConnectionDecrypted, DBEnvironment, DBIntegrationDecrypted, DBPlan, DBTeam, Metadata } from '@nangohq/types'; +const LARGE_FANOUT_THRESHOLD = 200; +const LOG_CONTEXT_CREATE_CONCURRENCY = 25; +const SQS_MAX_MESSAGE_BODY_BYTES = 1_048_576; + +interface MatchedExecution { + syncConfig: DBSyncConfig; + webhook: string; + connection: DBConnectionDecrypted | ConnectionInternal; +} + +interface QueuedExecution { + syncConfig: DBSyncConfig; + webhook: string; + connection: DBConnectionDecrypted | ConnectionInternal; + kind: 'queued'; + logCtx: Awaited>; + preparedMessage: PreparedDispatchMessage; +} + +interface FailedQueuedExecution extends MatchedExecution { + kind: 'failed'; + error: unknown; +} + +function computeTaskName({ + environmentId, + providerConfigKey, + parentSyncName, + connectionId, + activityLogId +}: { + environmentId: number; + providerConfigKey: string; + parentSyncName: string; + connectionId: number; + activityLogId: string; +}): string { + const hash = createHash('sha256').update(`${environmentId}:${providerConfigKey}:${parentSyncName}:${connectionId}:${activityLogId}`).digest('hex'); + return `webhook:env:${environmentId}:connection:${connectionId}:${hash.slice(0, 32)}`; +} export class InternalNango { readonly team: DBTeam; @@ -108,11 +165,46 @@ export class InternalNango { // use webhookTypeValue if provided (direct value from headers), otherwise extract from body const type = webhookTypeValue || (webhookType ? get(body, webhookType) : undefined); - const orchestrator = getOrchestrator(); + const publisher = envs.WEBHOOK_INGRESS_USE_DISPATCH_QUEUE ? dispatchQueuePublisher : null; + + if (publisher) { + await this.dispatchViaQueue({ + publisher, + connections, + syncConfigsWithWebhooks, + body, + type, + webhookHeaderValue + }); + } else { + await this.dispatchViaOrchestrator({ connections, syncConfigsWithWebhooks, body, type, webhookHeaderValue }); + } + + const connectionMetadata = connections.reduce>((acc, connection) => { + acc[connection.connection_id] = 'metadata' in connection ? connection.metadata : null; + return acc; + }, {}); + + return { connectionIds: connections.map((connection) => connection.connection_id), connectionMetadata }; + } + + private async dispatchViaOrchestrator({ + connections, + syncConfigsWithWebhooks, + body, + type, + webhookHeaderValue + }: { + connections: (DBConnectionDecrypted | ConnectionInternal)[]; + syncConfigsWithWebhooks: DBSyncConfig[]; + body: Record; + type: string | undefined; + webhookHeaderValue: string | undefined; + }): Promise { + const executions: { syncConfig: DBSyncConfig; webhook: string; connection: DBConnectionDecrypted | ConnectionInternal; logCtx: LogContext }[] = []; for (const syncConfig of syncConfigsWithWebhooks) { const { webhook_subscriptions } = syncConfig; - if (!webhook_subscriptions) { continue; } @@ -126,21 +218,25 @@ export class InternalNango { if (type === webhook || webhookHeaderValue === webhook || webhook === '*') { for (const connection of connections) { - await orchestrator.triggerWebhook({ - account: this.team, - environment: this.environment, - integration: this.integration as Config, - connection, - webhookName: webhook, - syncConfig, - input: body, - maxConcurrency: envs.WEBHOOK_ENVIRONMENT_MAX_CONCURRENCY, - logContextGetter: this.logContextGetter - }); + try { + const logCtx = await this.logContextGetter.create( + { operation: { type: 'webhook', action: 'incoming' }, expiresAt: new Date(Date.now() + 15 * 60 * 1000).toISOString() }, + { + account: this.team, + environment: this.environment, + integration: { id: this.integration.id!, name: this.integration.unique_key, provider: this.integration.provider }, + connection: { id: connection.id, name: connection.connection_id }, + syncConfig: { id: syncConfig.id, name: syncConfig.sync_name } + } + ); + logCtx.attachSpan(new OtlpSpan(logCtx.operation)); + executions.push({ syncConfig, webhook, connection, logCtx }); + } catch (err) { + report(err, { context: 'dispatchViaOrchestrator log context creation failed' }); + } } triggered = true; - if (webhook === '*') { // Only trigger once since it will match all webhooks break; @@ -149,11 +245,237 @@ export class InternalNango { } } - const connectionMetadata = connections.reduce>((acc, connection) => { - acc[connection.connection_id] = 'metadata' in connection ? connection.metadata : null; - return acc; - }, {}); + await this.dispatchExecutionsViaOrchestrator(executions, body); + } - return { connectionIds: connections.map((connection) => connection.connection_id), connectionMetadata }; + private async dispatchExecutionsViaOrchestrator( + executions: { syncConfig: DBSyncConfig; webhook: string; connection: DBConnectionDecrypted | ConnectionInternal; logCtx: LogContext }[], + body: Record + ): Promise { + const orchestrator = getOrchestrator(); + for (const { syncConfig, webhook, connection, logCtx } of executions) { + await orchestrator.triggerWebhook({ + connection, + webhookName: webhook, + syncConfig, + input: body, + maxConcurrency: envs.WEBHOOK_ENVIRONMENT_MAX_CONCURRENCY, + logCtx + }); + } + } + + private async dispatchViaQueue({ + publisher, + connections, + syncConfigsWithWebhooks, + body, + type, + webhookHeaderValue + }: { + publisher: DispatchQueuePublisher; + connections: (DBConnectionDecrypted | ConnectionInternal)[]; + syncConfigsWithWebhooks: DBSyncConfig[]; + body: Record; + type: string | undefined; + webhookHeaderValue: string | undefined; + }): Promise { + const matchedExecutions: MatchedExecution[] = []; + + for (const syncConfig of syncConfigsWithWebhooks) { + const { webhook_subscriptions } = syncConfig; + if (!webhook_subscriptions) continue; + + let matched = false; + for (const webhook of webhook_subscriptions) { + if (matched) break; + + if (type === webhook || webhookHeaderValue === webhook || webhook === '*') { + for (const connection of connections) { + matchedExecutions.push({ syncConfig, webhook, connection }); + } + + matched = true; + } + } + } + + if (matchedExecutions.length === 0) return; + + const queuePreparationResults = await runWithConcurrencyLimit( + matchedExecutions, + LOG_CONTEXT_CREATE_CONCURRENCY, + async ({ syncConfig, webhook, connection }) => { + let logCtx: Awaited> | null = null; + + try { + // Create a log context per (syncConfig × connection × webhook) triple before publishing + // so the runner picks up the same activityLogId it would have in the direct-schedule path. + logCtx = await this.logContextGetter.create( + { operation: { type: 'webhook', action: 'incoming' }, expiresAt: new Date(Date.now() + 15 * 60 * 1000).toISOString() }, + { + account: this.team, + environment: this.environment, + integration: { id: this.integration.id!, name: this.integration.unique_key, provider: this.integration.provider }, + connection: { id: connection.id, name: connection.connection_id }, + syncConfig: { id: syncConfig.id, name: syncConfig.sync_name } + } + ); + logCtx.attachSpan(new OtlpSpan(logCtx.operation)); + + const message: WebhookDispatchMessage = { + version: 1, + kind: 'webhook', + taskName: computeTaskName({ + environmentId: this.environment.id, + providerConfigKey: this.integration.unique_key, + parentSyncName: syncConfig.sync_name, + connectionId: connection.id, + activityLogId: logCtx.id + }), + createdAt: new Date().toISOString(), + accountId: this.team.id, + integrationId: this.integration.id!, + provider: this.integration.provider, + parentSyncName: syncConfig.sync_name, + activityLogId: logCtx.id, + webhookName: webhook, + connection: { + id: connection.id, + connection_id: connection.connection_id, + provider_config_key: connection.provider_config_key, + environment_id: connection.environment_id + }, + payload: body + }; + const serializedBody = JSON.stringify(message); + const preparedMessage: PreparedDispatchMessage = { + message, + byteSize: Buffer.byteLength(serializedBody, 'utf8') + }; + + return { + kind: 'queued' as const, + logCtx, + syncConfig, + webhook, + connection, + preparedMessage + }; + } catch (err) { + if (logCtx) { + const formattedError = err instanceof NangoError ? err : new NangoError('webhook_failure', { error: errorToObject(err) }); + + await logCtx.error('The webhook failed during queue preparation', { + error: err, + webhook, + connection: connection.connection_id, + integration: connection.provider_config_key + }); + await logCtx.enrichOperation({ error: formattedError }); + await logCtx.failed(); + } + + return { + kind: 'failed' as const, + error: err, + syncConfig, + webhook, + connection + }; + } + } + ); + + const queuedExecutions = queuePreparationResults.filter((result): result is QueuedExecution => result.kind === 'queued'); + const failedQueuedExecutions = queuePreparationResults.filter((result): result is FailedQueuedExecution => result.kind === 'failed'); + + for (const { error, syncConfig, webhook, connection } of failedQueuedExecutions) { + report(error, { + error: 'The webhook could not be prepared for queue dispatch', + provider: this.integration.provider, + accountId: this.team.id, + environmentId: this.environment.id, + syncConfigId: syncConfig.id, + syncName: syncConfig.sync_name, + webhook, + connectionId: connection.id, + connection: connection.connection_id, + integration: connection.provider_config_key + }); + } + + if (queuedExecutions.length === 0) { + return; + } + + if (matchedExecutions.length > LARGE_FANOUT_THRESHOLD) { + metrics.increment(metrics.Types.WEBHOOK_DISPATCH_LARGE_FANOUT, 1, { + provider: this.integration.provider, + accountId: this.team.id, + environmentId: this.environment.id + }); + } + + const queueEligibleExecutions = queuedExecutions.filter(({ preparedMessage }) => preparedMessage.byteSize <= SQS_MAX_MESSAGE_BODY_BYTES); + const oversizedExecutions = queuedExecutions.filter(({ preparedMessage }) => preparedMessage.byteSize > SQS_MAX_MESSAGE_BODY_BYTES); + + let unmappedFailureCount = 0; + + if (queueEligibleExecutions.length > 0) { + const messageGroupId = `account:${this.team.id}:env:${this.environment.id}`; + const publishResult = await publisher.publish( + queueEligibleExecutions.map(({ preparedMessage }) => preparedMessage), + messageGroupId + ); + const failedActivityLogIds = new Set(publishResult.failedActivityLogIds); + unmappedFailureCount = publishResult.failed - failedActivityLogIds.size; + + for (const { + preparedMessage: { message }, + logCtx + } of queueEligibleExecutions) { + if (!failedActivityLogIds.has(message.activityLogId) && unmappedFailureCount === 0) { + void logCtx.info('The webhook was successfully queued for execution', { + action: message.webhookName, + connection: message.connection.connection_id, + integration: message.connection.provider_config_key + }); + continue; + } + + const error = new NangoError('webhook_failure', { + error: 'The webhook could not be queued for execution', + taskName: message.taskName + }); + + await logCtx.error('The webhook failed to queue for execution', { + error, + webhook: message.webhookName, + connection: message.connection.connection_id, + integration: message.connection.provider_config_key + }); + await logCtx.enrichOperation({ error }); + await logCtx.failed(); + } + } + + if (oversizedExecutions.length > 0) { + metrics.increment(metrics.Types.WEBHOOK_DISPATCH_BYPASS_OVERSIZE, oversizedExecutions.length, { + provider: this.integration.provider, + accountId: this.team.id, + environmentId: this.environment.id + }); + + await this.dispatchExecutionsViaOrchestrator(oversizedExecutions, body); + } + + if (unmappedFailureCount > 0) { + report(new Error('webhook_dispatch_fanout_unmapped_failures'), { + unmappedFailureCount, + accountId: this.team.id, + environmentId: this.environment.id + }); + } } } diff --git a/packages/server/lib/webhook/internal-nango.unit.test.ts b/packages/server/lib/webhook/internal-nango.unit.test.ts new file mode 100644 index 0000000000..d258fe8813 --- /dev/null +++ b/packages/server/lib/webhook/internal-nango.unit.test.ts @@ -0,0 +1,363 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +const mocks = vi.hoisted(() => { + return { + envs: { + WEBHOOK_INGRESS_USE_DISPATCH_QUEUE: true, + WEBHOOK_ENVIRONMENT_MAX_CONCURRENCY: 7 + }, + dispatchQueueClient: { dispatchQueuePublisher: null as any }, + triggerWebhook: vi.fn(), + report: vi.fn(), + metricsIncrement: vi.fn(), + getConnectionsByEnvironmentAndConfig: vi.fn(), + getSyncConfigsByConfigIdForWebhook: vi.fn() + }; +}); + +vi.mock('../env.js', () => ({ envs: mocks.envs })); +vi.mock('./dispatch-queue/client.js', () => mocks.dispatchQueueClient); +vi.mock('../utils/utils.js', () => ({ getOrchestrator: () => ({ triggerWebhook: mocks.triggerWebhook }) })); +vi.mock('@nangohq/utils', async (importOriginal) => { + const actual = await importOriginal(); + + if (!actual || typeof actual !== 'object' || !('metrics' in actual)) { + throw new Error('Invalid @nangohq/utils mock: missing metrics export'); + } + + const { metrics } = actual; + if (!metrics || typeof metrics !== 'object' || !('Types' in metrics) || !metrics.Types || typeof metrics.Types !== 'object') { + throw new Error('Invalid @nangohq/utils mock: missing metrics.Types export'); + } + + return { + ...(actual as object), + report: mocks.report, + metrics: { + ...metrics, + Types: { + ...metrics.Types, + WEBHOOK_DISPATCH_BYPASS_OVERSIZE: 'nango.webhook.dispatch_queue.bypass_oversize' + }, + increment: mocks.metricsIncrement + } + }; +}); +vi.mock('@nangohq/shared', () => { + class NangoError extends Error { + public payload: Record; + + constructor(type: string, payload: Record) { + super(type); + this.name = type; + this.payload = payload; + } + } + + return { + NangoError, + connectionService: { + getConnectionsByEnvironmentAndConfig: mocks.getConnectionsByEnvironmentAndConfig + }, + getSyncConfigsByConfigIdForWebhook: mocks.getSyncConfigsByConfigIdForWebhook + }; +}); + +import { InternalNango } from './internal-nango.js'; + +function createLogCtx(id: string) { + return { + id, + operation: { id }, + attachSpan: vi.fn(), + info: vi.fn().mockResolvedValue(true), + error: vi.fn().mockResolvedValue(true), + enrichOperation: vi.fn().mockResolvedValue(undefined), + failed: vi.fn().mockResolvedValue(undefined) + }; +} + +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +function makeInternalNango(logContexts: ReturnType[], logContextGetterOverride?: any) { + const logContextGetter = + logContextGetterOverride ?? + ({ + create: vi.fn().mockImplementation(() => { + const next = logContexts.shift(); + if (!next) { + throw new Error('Missing log context'); + } + return next; + }) + } as any); + + return { + nango: new InternalNango({ + team: { id: 1 } as any, + environment: { id: 2 } as any, + plan: undefined, + integration: { id: 3, unique_key: 'github-dev', provider: 'github' } as any, + logContextGetter + }), + logContextGetter + }; +} + +describe('InternalNango queue dispatch', () => { + beforeEach(() => { + vi.clearAllMocks(); + mocks.envs.WEBHOOK_INGRESS_USE_DISPATCH_QUEUE = true; + mocks.dispatchQueueClient.dispatchQueuePublisher = null; + mocks.getConnectionsByEnvironmentAndConfig.mockResolvedValue([ + { id: 11, connection_id: 'conn-1', provider_config_key: 'github-dev', environment_id: 2, metadata: null }, + { id: 12, connection_id: 'conn-2', provider_config_key: 'github-dev', environment_id: 2, metadata: null } + ]); + mocks.getSyncConfigsByConfigIdForWebhook.mockResolvedValue([{ id: 21, sync_name: 'sync-1', webhook_subscriptions: ['push'] }]); + mocks.triggerWebhook.mockResolvedValue(undefined); + }); + + it('logs queued successes and marks failed publishes as failed operations', async () => { + const publisher = { + publish: vi.fn().mockResolvedValue({ enqueued: 1, failed: 1, failedActivityLogIds: ['log-2'] }) + }; + mocks.dispatchQueueClient.dispatchQueuePublisher = publisher; + + const logCtx1 = createLogCtx('log-1'); + const logCtx2 = createLogCtx('log-2'); + const { nango } = makeInternalNango([logCtx1, logCtx2]); + + const result = await nango.executeScriptForWebhooks({ body: { event: 'x' }, webhookTypeValue: 'push' }); + + expect(result.connectionIds).toEqual(['conn-1', 'conn-2']); + expect(publisher.publish).toHaveBeenCalledTimes(1); + expect(logCtx1.info).toHaveBeenCalledWith('The webhook was successfully queued for execution', { + action: 'push', + connection: 'conn-1', + integration: 'github-dev' + }); + expect(logCtx2.error).toHaveBeenCalledWith('The webhook failed to queue for execution', { + error: expect.any(Error), + webhook: 'push', + connection: 'conn-2', + integration: 'github-dev' + }); + expect(logCtx2.enrichOperation).toHaveBeenCalledWith({ error: expect.any(Error) }); + expect(logCtx2.failed).toHaveBeenCalledOnce(); + }); + + it('falls back to direct orchestrator dispatch when the feature flag is off', async () => { + mocks.envs.WEBHOOK_INGRESS_USE_DISPATCH_QUEUE = false; + const publisher = { publish: vi.fn() }; + mocks.dispatchQueueClient.dispatchQueuePublisher = publisher; + + const logCtx1 = createLogCtx('log-1'); + const logCtx2 = createLogCtx('log-2'); + const { nango, logContextGetter } = makeInternalNango([logCtx1, logCtx2]); + + const result = await nango.executeScriptForWebhooks({ body: { event: 'x' }, webhookTypeValue: 'push' }); + + expect(result.connectionIds).toEqual(['conn-1', 'conn-2']); + expect(mocks.triggerWebhook).toHaveBeenCalledTimes(2); + expect(logContextGetter.create).toHaveBeenCalledTimes(2); + expect(publisher.publish).not.toHaveBeenCalled(); + }); + + it('dispatches successful orchestrator executions even when one log context creation fails', async () => { + mocks.envs.WEBHOOK_INGRESS_USE_DISPATCH_QUEUE = false; + const publisher = { publish: vi.fn() }; + mocks.dispatchQueueClient.dispatchQueuePublisher = publisher; + + const logCtx2 = createLogCtx('log-2'); + const logContextGetter = { + create: vi.fn().mockRejectedValueOnce(new Error('transient db failure')).mockResolvedValueOnce(logCtx2) + } as any; + const { nango } = makeInternalNango([], logContextGetter); + + const result = await nango.executeScriptForWebhooks({ body: { event: 'x' }, webhookTypeValue: 'push' }); + + expect(result.connectionIds).toEqual(['conn-1', 'conn-2']); + expect(mocks.triggerWebhook).toHaveBeenCalledTimes(1); + expect(mocks.triggerWebhook).toHaveBeenCalledWith(expect.objectContaining({ logCtx: logCtx2 })); + expect(publisher.publish).not.toHaveBeenCalled(); + }); + + it('creates log contexts concurrently before publishing queue messages', async () => { + const publisher = { + publish: vi.fn().mockResolvedValue({ enqueued: 2, failed: 0, failedActivityLogIds: [] }) + }; + mocks.dispatchQueueClient.dispatchQueuePublisher = publisher; + + const blockers = [deferred(), deferred()]; + const contexts = [createLogCtx('log-1'), createLogCtx('log-2')]; + let createIndex = 0; + let inFlight = 0; + let maxInFlight = 0; + const logContextGetter = { + create: vi.fn().mockImplementation(async () => { + const index = createIndex++; + inFlight += 1; + maxInFlight = Math.max(maxInFlight, inFlight); + await blockers[index]!.promise; + inFlight -= 1; + return contexts[index]!; + }) + } as any; + const { nango } = makeInternalNango([], logContextGetter); + + const execution = nango.executeScriptForWebhooks({ body: { event: 'x' }, webhookTypeValue: 'push' }); + + await vi.waitFor(() => { + expect(logContextGetter.create).toHaveBeenCalledTimes(2); + }); + expect(maxInFlight).toBe(2); + + blockers[0]!.resolve(); + blockers[1]!.resolve(); + + const result = await execution; + + expect(result.connectionIds).toEqual(['conn-1', 'conn-2']); + expect(publisher.publish).toHaveBeenCalledOnce(); + }); + + it('publishes successful executions even when one log context creation fails', async () => { + const publisher = { + publish: vi.fn().mockResolvedValue({ enqueued: 1, failed: 0, failedActivityLogIds: [] }) + }; + mocks.dispatchQueueClient.dispatchQueuePublisher = publisher; + + const logCtx = createLogCtx('log-2'); + const logContextGetter = { + create: vi.fn().mockRejectedValueOnce(new Error('transient db failure')).mockResolvedValueOnce(logCtx) + } as any; + const { nango } = makeInternalNango([], logContextGetter); + + const result = await nango.executeScriptForWebhooks({ body: { event: 'x' }, webhookTypeValue: 'push' }); + + expect(result.connectionIds).toEqual(['conn-1', 'conn-2']); + expect(publisher.publish).toHaveBeenCalledTimes(1); + expect(publisher.publish).toHaveBeenCalledWith( + [ + expect.objectContaining({ + message: expect.objectContaining({ + activityLogId: 'log-2', + webhookName: 'push', + connection: expect.objectContaining({ connection_id: 'conn-2' }) + }) + }) + ], + 'account:1:env:2' + ); + expect(logCtx.info).toHaveBeenCalledWith('The webhook was successfully queued for execution', { + action: 'push', + connection: 'conn-2', + integration: 'github-dev' + }); + expect(mocks.report).toHaveBeenCalledWith(expect.any(Error), { + error: 'The webhook could not be prepared for queue dispatch', + provider: 'github', + accountId: 1, + environmentId: 2, + syncConfigId: 21, + syncName: 'sync-1', + webhook: 'push', + connectionId: 11, + connection: 'conn-1', + integration: 'github-dev' + }); + }); + + it('fails the log context when queue preparation fails after creation', async () => { + const publisher = { + publish: vi.fn().mockResolvedValue({ enqueued: 1, failed: 0, failedActivityLogIds: [] }) + }; + mocks.dispatchQueueClient.dispatchQueuePublisher = publisher; + + const failedLogCtx = createLogCtx('log-1'); + failedLogCtx.attachSpan.mockImplementation(() => { + throw new Error('attach span failed'); + }); + const successfulLogCtx = createLogCtx('log-2'); + const { nango } = makeInternalNango([failedLogCtx, successfulLogCtx]); + + const result = await nango.executeScriptForWebhooks({ body: { event: 'x' }, webhookTypeValue: 'push' }); + + expect(result.connectionIds).toEqual(['conn-1', 'conn-2']); + expect(publisher.publish).toHaveBeenCalledWith( + [ + expect.objectContaining({ + message: expect.objectContaining({ + activityLogId: 'log-2', + connection: expect.objectContaining({ connection_id: 'conn-2' }) + }) + }) + ], + 'account:1:env:2' + ); + expect(failedLogCtx.error).toHaveBeenCalledWith('The webhook failed during queue preparation', { + error: expect.any(Error), + webhook: 'push', + connection: 'conn-1', + integration: 'github-dev' + }); + expect(failedLogCtx.enrichOperation).toHaveBeenCalledWith({ error: expect.any(Error) }); + expect(failedLogCtx.failed).toHaveBeenCalledOnce(); + expect(successfulLogCtx.info).toHaveBeenCalledWith('The webhook was successfully queued for execution', { + action: 'push', + connection: 'conn-2', + integration: 'github-dev' + }); + }); + + it('marks all executions as failed and reports when publish has unmapped failures', async () => { + const publisher = { + publish: vi.fn().mockResolvedValue({ enqueued: 0, failed: 2, failedActivityLogIds: [] }) + }; + mocks.dispatchQueueClient.dispatchQueuePublisher = publisher; + + const logCtx1 = createLogCtx('log-1'); + const logCtx2 = createLogCtx('log-2'); + const { nango } = makeInternalNango([logCtx1, logCtx2]); + + await nango.executeScriptForWebhooks({ body: { event: 'x' }, webhookTypeValue: 'push' }); + + expect(logCtx1.info).not.toHaveBeenCalled(); + expect(logCtx2.info).not.toHaveBeenCalled(); + expect(logCtx1.failed).toHaveBeenCalledOnce(); + expect(logCtx2.failed).toHaveBeenCalledOnce(); + expect(mocks.report).toHaveBeenCalledWith(expect.any(Error), { + unmappedFailureCount: 2, + accountId: 1, + environmentId: 2 + }); + }); + + it('dispatches oversized messages directly to the orchestrator and emits a metric', async () => { + const publisher = { publish: vi.fn() }; + mocks.dispatchQueueClient.dispatchQueuePublisher = publisher; + + const logCtx1 = createLogCtx('log-1'); + const logCtx2 = createLogCtx('log-2'); + const { nango } = makeInternalNango([logCtx1, logCtx2]); + + const result = await nango.executeScriptForWebhooks({ body: { payload: 'x'.repeat(1_100_000) }, webhookTypeValue: 'push' }); + + expect(result.connectionIds).toEqual(['conn-1', 'conn-2']); + expect(publisher.publish).not.toHaveBeenCalled(); + expect(mocks.triggerWebhook).toHaveBeenCalledTimes(2); + expect(mocks.metricsIncrement).toHaveBeenCalledWith('nango.webhook.dispatch_queue.bypass_oversize', 2, { + provider: 'github', + accountId: 1, + environmentId: 2 + }); + }); +}); diff --git a/packages/server/lib/webhook/runWithConcurrencyLimit.ts b/packages/server/lib/webhook/runWithConcurrencyLimit.ts new file mode 100644 index 0000000000..b4cb645898 --- /dev/null +++ b/packages/server/lib/webhook/runWithConcurrencyLimit.ts @@ -0,0 +1,20 @@ +export async function runWithConcurrencyLimit(items: T[], concurrency: number, worker: (item: T, index: number) => Promise): Promise { + const results: R[] = new Array(items.length); + const workerCount = Math.min(concurrency, items.length); + let nextIndex = 0; + + await Promise.all( + Array.from({ length: workerCount }, async () => { + while (true) { + const index = nextIndex++; + if (index >= items.length) { + return; + } + + results[index] = await worker(items[index]!, index); + } + }) + ); + + return results; +} diff --git a/packages/server/package.json b/packages/server/package.json index a8fca8b0d1..43ad1a7c2f 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -19,6 +19,7 @@ }, "license": "SEE LICENSE IN LICENSE FILE IN GIT REPOSITORY", "dependencies": { + "@aws-sdk/client-sqs": "3.993.0", "@modelcontextprotocol/sdk": "1.26.0", "@nangohq/authz": "file:../authz", "@nangohq/usage": "file:../usage", diff --git a/packages/shared/lib/clients/orchestrator.ts b/packages/shared/lib/clients/orchestrator.ts index 75d244dd8d..379f56d968 100644 --- a/packages/shared/lib/clients/orchestrator.ts +++ b/packages/shared/lib/clients/orchestrator.ts @@ -3,7 +3,6 @@ import ms from 'ms'; import { v4 as uuid } from 'uuid'; import db from '@nangohq/database'; -import { OtlpSpan } from '@nangohq/logs'; import { Err, Ok, errorToObject, getCheckpointKey, getFrequencyMs, stringifyError } from '@nangohq/utils'; import { hardDeleteCheckpoints } from '../index.js'; @@ -33,16 +32,7 @@ import type { VoidReturn } from '@nangohq/nango-orchestrator'; import type { RecordCount } from '@nangohq/records'; -import type { - AsyncActionResponse, - ConnectionInternal, - ConnectionJobs, - DBConnection, - DBConnectionDecrypted, - DBEnvironment, - DBSyncConfig, - DBTeam -} from '@nangohq/types'; +import type { AsyncActionResponse, ConnectionInternal, ConnectionJobs, DBConnection, DBConnectionDecrypted, DBSyncConfig } from '@nangohq/types'; import type { Result } from '@nangohq/utils'; import type { JsonValue } from 'type-fest'; @@ -234,25 +224,19 @@ export class Orchestrator { } async triggerWebhook({ - account, - environment, - integration, connection, webhookName, syncConfig, input, maxConcurrency, - logContextGetter + logCtx }: { - account: DBTeam; - environment: DBEnvironment; - integration: ProviderConfig; connection: ConnectionJobs; webhookName: string; syncConfig: DBSyncConfig; input: object; maxConcurrency: number; - logContextGetter: LogContextGetter; + logCtx: LogContext; }): Promise> { const activeSpan = tracer.scope().active(); const spanTags = { @@ -267,17 +251,6 @@ export class Orchestrator { tags: spanTags, ...(activeSpan ? { childOf: activeSpan } : {}) }); - const logCtx = await logContextGetter.create( - { operation: { type: 'webhook', action: 'incoming' }, expiresAt: new Date(Date.now() + 15 * 60 * 1000).toISOString() }, - { - account, - environment, - integration: { id: integration.id!, name: integration.unique_key, provider: integration.provider }, - connection: { id: connection.id, name: connection.connection_id }, - syncConfig: { id: syncConfig.id, name: syncConfig.sync_name } - } - ); - logCtx.attachSpan(new OtlpSpan(logCtx.operation)); try { let parsedInput = null; diff --git a/packages/types/lib/index.ts b/packages/types/lib/index.ts index 1230e09a97..e51dfb73e5 100644 --- a/packages/types/lib/index.ts +++ b/packages/types/lib/index.ts @@ -80,6 +80,7 @@ export type * from './environment/api/otlp.js'; export type * from './environment/variable/index.js'; export type * from './environment/variable/api.js'; export type * from './webhooks/api.js'; +export type * from './webhooks/dispatch.js'; export type * from './webhooks/http.api.js'; export type * from './flow/http.api.js'; export type * from './flow/index.js'; diff --git a/packages/types/lib/webhooks/dispatch.ts b/packages/types/lib/webhooks/dispatch.ts new file mode 100644 index 0000000000..5387f571ca --- /dev/null +++ b/packages/types/lib/webhooks/dispatch.ts @@ -0,0 +1,35 @@ +import type { JsonValue } from 'type-fest'; + +/** + * SQS message envelope for the webhook task-dispatch queue. + * + * The server produces one of these per (syncConfig × webhook subscription × connection) + * triple resulting from an inbound provider webhook. The jobs consumer parses these and + * calls the orchestrator to schedule the actual webhook task using `taskName` as the + * idempotency key. + */ +export interface WebhookDispatchMessage { + version: 1; + kind: 'webhook'; + /** + * Deterministic scheduler task name; used by the orchestrator as the + * idempotency key so duplicate SQS deliveries resolve to the same task. + */ + taskName: string; + createdAt: string; + accountId: number; + integrationId: number; + provider: string; + parentSyncName: string; + /** Webhook subscription name matched on the inbound payload; passed to executeWebhook as args.webhookName. */ + webhookName: string; + /** Activity log id created before enqueue; reused so redelivery of this published message keeps the same taskName. */ + activityLogId: string; + connection: { + id: number; + connection_id: string; + provider_config_key: string; + environment_id: number; + }; + payload: JsonValue; +} diff --git a/packages/utils/lib/environment/parse.ts b/packages/utils/lib/environment/parse.ts index 9580b2050e..01d495d81c 100644 --- a/packages/utils/lib/environment/parse.ts +++ b/packages/utils/lib/environment/parse.ts @@ -537,7 +537,7 @@ export const ENVS = z.object({ NANGO_WEBHOOK_CIRCUIT_BREAKER_AUTO_RESET_SECS: z.coerce.number().optional().default(3600), // WEBHOOK INGRESS - WEBHOOK_INGRESS_USE_DISPATCH_QUEUE: z.stringbool().optional().default(false), + WEBHOOK_INGRESS_USE_DISPATCH_QUEUE: z.stringbool().optional().default(true), NANGO_WEBHOOK_INGRESS_RATE_LIMIT_PER_MIN: z.coerce.number().min(0).optional().default(4000), NANGO_WEBHOOK_INGRESS_RATE_LIMIT_ENFORCE: z.stringbool().optional().default(false), @@ -547,9 +547,10 @@ export const ENVS = z.object({ NANGO_TASK_DISPATCH_MAX_MESSAGES: z.coerce.number().min(1).max(10).optional().default(10), NANGO_TASK_DISPATCH_WAIT_TIME_SECONDS: z.coerce.number().min(0).max(20).optional().default(20), NANGO_TASK_DISPATCH_VISIBILITY_TIMEOUT_SECONDS: z.coerce.number().min(0).max(43200).optional().default(30), - NANGO_TASK_DISPATCH_CONSUMER_CONCURRENCY: z.coerce.number().min(1).optional().default(50), + NANGO_TASK_DISPATCH_CONSUMER_CONCURRENCY: z.coerce.number().min(1).optional().default(1), NANGO_TASK_DISPATCH_PUBLISH_BATCH_SIZE: z.coerce.number().min(1).max(10).optional().default(10), - NANGO_TASK_DISPATCH_PUBLISH_CONCURRENCY: z.coerce.number().min(1).optional().default(5), + NANGO_TASK_DISPATCH_PUBLISH_CONCURRENCY: z.coerce.number().min(1).optional().default(10), + NANGO_TASK_DISPATCH_MAX_AGE_SECONDS: z.coerce.number().min(0).optional().default(7200), // E2B sandboxes E2B_API_KEY: z.string().optional(), diff --git a/packages/utils/lib/environment/parse.unit.test.ts b/packages/utils/lib/environment/parse.unit.test.ts index 6fc9a63fd9..e13c354bfc 100644 --- a/packages/utils/lib/environment/parse.unit.test.ts +++ b/packages/utils/lib/environment/parse.unit.test.ts @@ -200,7 +200,7 @@ describe('parse', () => { NANGO_TASK_DISPATCH_VISIBILITY_TIMEOUT_SECONDS: 30, NANGO_TASK_DISPATCH_CONSUMER_CONCURRENCY: 50, NANGO_TASK_DISPATCH_PUBLISH_BATCH_SIZE: 10, - NANGO_TASK_DISPATCH_PUBLISH_CONCURRENCY: 5 + NANGO_TASK_DISPATCH_PUBLISH_CONCURRENCY: 10 }); expect(res.NANGO_TASK_DISPATCH_QUEUE_URL).toBeUndefined(); expect(res.NANGO_TASK_DISPATCH_DLQ_URL).toBeUndefined(); diff --git a/packages/utils/lib/telemetry/metrics.ts b/packages/utils/lib/telemetry/metrics.ts index cc16aba214..2416b1e7dd 100644 --- a/packages/utils/lib/telemetry/metrics.ts +++ b/packages/utils/lib/telemetry/metrics.ts @@ -68,6 +68,16 @@ export enum Types { WEBHOOK_ASYNC_ACTION_FAILED = 'nango.webhook.async_action.failed', WEBHOOK_INCOMING_PAYLOAD_SIZE_BYTES = 'nango.webhook.incoming.payloadSizeBytes', + WEBHOOK_DISPATCH_PUBLISH_SUCCESS = 'nango.webhook.dispatch_queue.publish.success', + WEBHOOK_DISPATCH_PUBLISH_FAILURE = 'nango.webhook.dispatch_queue.publish.failure', + WEBHOOK_DISPATCH_BYPASS_OVERSIZE = 'nango.webhook.dispatch_queue.bypass_oversize', + WEBHOOK_DISPATCH_LARGE_FANOUT = 'nango.webhook.dispatch_queue.large_fanout', + WEBHOOK_DISPATCH_CONSUME_SUCCESS = 'nango.webhook.dispatch_queue.consume.success', + WEBHOOK_DISPATCH_CONSUME_FAILURE = 'nango.webhook.dispatch_queue.consume.failure', + WEBHOOK_DISPATCH_POISON_PILL = 'nango.webhook.dispatch_queue.poison_pill', + WEBHOOK_DISPATCH_STALE = 'nango.webhook.dispatch_queue.stale', + WEBHOOK_DISPATCH_DWELL_MS = 'nango.webhook.dispatch_queue.dwell_ms', + ORCH_TASKS_CREATED = 'nango.orch.tasks.created', ORCH_TASKS_DROPPED = 'nango.orch.tasks.dropped', ORCH_TASKS_STARTED = 'nango.orch.tasks.started',