From ec71242da9058c0107fe053e052e3a4f438b304d Mon Sep 17 00:00:00 2001 From: agusayerza Date: Thu, 30 Apr 2026 10:06:21 -0300 Subject: [PATCH 1/7] feat(webhooks): add dispatch queue publisher abstraction (NAN-5339) Introduces a DispatchQueuePublisher abstraction for publishing webhook execution messages to SQS. Uses activity log ID as the message identifier. Adds tracing and a shared webhook concurrency helper. --- package-lock.json | 1 + .../lib/webhook/dispatch-queue/publisher.ts | 173 ++++++++++++ .../dispatch-queue/publisher.unit.test.ts | 263 ++++++++++++++++++ .../lib/webhook/runWithConcurrencyLimit.ts | 20 ++ packages/server/package.json | 1 + packages/types/lib/index.ts | 1 + packages/types/lib/webhooks/dispatch.ts | 33 +++ packages/utils/lib/environment/parse.ts | 2 +- .../utils/lib/environment/parse.unit.test.ts | 2 +- packages/utils/lib/telemetry/metrics.ts | 10 + 10 files changed, 504 insertions(+), 2 deletions(-) create mode 100644 packages/server/lib/webhook/dispatch-queue/publisher.ts create mode 100644 packages/server/lib/webhook/dispatch-queue/publisher.unit.test.ts create mode 100644 packages/server/lib/webhook/runWithConcurrencyLimit.ts create mode 100644 packages/types/lib/webhooks/dispatch.ts diff --git a/package-lock.json b/package-lock.json index c1dd6c6302..512eb94f8e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -36496,6 +36496,7 @@ "version": "1.0.0", "license": "SEE LICENSE IN LICENSE FILE IN GIT REPOSITORY", "dependencies": { + "@aws-sdk/client-sqs": "3.993.0", "@modelcontextprotocol/sdk": "1.26.0", "@nangohq/authz": "file:../authz", "@nangohq/billing": "file:../billing", diff --git a/packages/server/lib/webhook/dispatch-queue/publisher.ts b/packages/server/lib/webhook/dispatch-queue/publisher.ts new file mode 100644 index 0000000000..e70b28eb97 --- /dev/null +++ b/packages/server/lib/webhook/dispatch-queue/publisher.ts @@ -0,0 +1,173 @@ +import { SendMessageBatchCommand } from '@aws-sdk/client-sqs'; +import tracer from 'dd-trace'; + +import { metrics } from '@nangohq/utils'; + +import { runWithConcurrencyLimit } from '../runWithConcurrencyLimit.js'; + +import type { SQSClient, SendMessageBatchCommandOutput, SendMessageBatchRequestEntry } from '@aws-sdk/client-sqs'; +import type { WebhookDispatchMessage } from '@nangohq/types'; + +const SQS_BATCH_MAX_ENTRIES = 10; +const DEFAULT_PUBLISH_CONCURRENCY = 10; + +export interface DispatchQueuePublisherProps { + sqs: SQSClient; + queueUrl: string; + /** Max entries per SQS SendMessageBatch request. AWS caps this at 10. */ + batchSize?: number; + /** Max concurrent SQS SendMessageBatch requests. Defaults to 10. */ + publishConcurrency?: number; +} + +export interface PublishResult { + enqueued: number; + failed: number; +} + +interface BatchPublishResult extends PublishResult { + retriedEntries: number; +} + +export class DispatchQueuePublisher { + private readonly sqs: SQSClient; + private readonly queueUrl: string; + private readonly batchSize: number; + private readonly publishConcurrency: number; + + constructor(props: DispatchQueuePublisherProps) { + this.sqs = props.sqs; + this.queueUrl = props.queueUrl; + const configuredBatchSize = props.batchSize ?? SQS_BATCH_MAX_ENTRIES; + if (configuredBatchSize < 1) { + throw new RangeError(`batchSize must be > 0`); + } + this.batchSize = Math.min(configuredBatchSize, SQS_BATCH_MAX_ENTRIES); + + const configuredPublishConcurrency = props.publishConcurrency ?? DEFAULT_PUBLISH_CONCURRENCY; + if (configuredPublishConcurrency < 1) { + throw new RangeError(`publishConcurrency must be > 0`); + } + this.publishConcurrency = configuredPublishConcurrency; + } + + /** + * Publish a list of dispatch messages in batches. Batches are fired in parallel up to + * `publishConcurrency`; failed entries within a batch are retried once inline. Any + * entries still failing after the retry are counted as `failed`. Regular SQS send + * failures and partial failures are returned in the counts instead of throwing so the + * caller can treat them as a metric/trace concern, not an HTTP 500 (provider retries + * are worse). + */ + async publish(messages: WebhookDispatchMessage[], messageGroupId: string): Promise { + if (messages.length === 0) { + return { enqueued: 0, failed: 0 }; + } + + const activeSpan = tracer.scope().active(); + const firstMessage = messages[0]!; + const batches = chunk(messages, this.batchSize); + const span = tracer.startSpan('webhook.dispatch.publish', { + ...(activeSpan ? { childOf: activeSpan } : {}), + tags: { + 'nango.accountId': firstMessage.accountId, + 'nango.environmentId': firstMessage.connection.environment_id, + 'nango.integrationId': firstMessage.integrationId, + 'nango.provider': firstMessage.provider, + 'nango.providerConfigKey': firstMessage.connection.provider_config_key, + 'nango.messageCount': messages.length, + 'nango.batchCount': batches.length + } + }); + + return await tracer.scope().activate(span, async () => { + try { + const results = await runWithConcurrencyLimit(batches, this.publishConcurrency, async (batch) => { + return await this.sendBatch(batch, messageGroupId); + }); + + const enqueued = results.reduce((sum, r) => sum + r.enqueued, 0); + const failed = results.reduce((sum, r) => sum + r.failed, 0); + const retriedEntries = results.reduce((sum, r) => sum + r.retriedEntries, 0); + const retriedBatches = results.filter((r) => r.retriedEntries > 0).length; + + span.setTag('nango.enqueued', enqueued); + span.setTag('nango.failed', failed); + span.setTag('nango.retriedEntries', retriedEntries); + span.setTag('nango.retriedBatches', retriedBatches); + + const provider = firstMessage.provider; + + if (enqueued > 0) { + metrics.increment(metrics.Types.WEBHOOK_DISPATCH_PUBLISH_SUCCESS, enqueued, { provider }); + } + if (failed > 0) { + const error = new Error(`Failed to enqueue ${failed} webhook dispatch message${failed === 1 ? '' : 's'}`); + span.setTag('error', error); + span.setTag('nango.partialFailure', true); + + metrics.increment(metrics.Types.WEBHOOK_DISPATCH_PUBLISH_FAILURE, failed, { provider }); + } + + return { enqueued, failed }; + } catch (err) { + span.setTag('error', err); + throw err; + } finally { + span.finish(); + } + }); + } + + private async sendBatch(batch: WebhookDispatchMessage[], messageGroupId: string): Promise { + const entries = batch.map((message, idx) => toEntry(message, idx, messageGroupId)); + + const first = await this.trySend(entries); + const failedIndices = first.failedIds.map((id) => entryIdToIndex(id)); + if (failedIndices.length === 0) { + return { enqueued: batch.length, failed: 0, retriedEntries: 0 }; + } + + const retryEntries = failedIndices.map((i) => entries[i]).filter((e): e is SendMessageBatchRequestEntry => e !== undefined); + const second = await this.trySend(retryEntries); + + const enqueued = batch.length - second.failedIds.length; + return { enqueued, failed: second.failedIds.length, retriedEntries: failedIndices.length }; + } + + private async trySend(entries: SendMessageBatchRequestEntry[]): Promise<{ failedIds: string[] }> { + if (entries.length === 0) return { failedIds: [] }; + try { + const response: SendMessageBatchCommandOutput = await this.sqs.send(new SendMessageBatchCommand({ QueueUrl: this.queueUrl, Entries: entries })); + const failedIds = (response.Failed ?? []).map((f) => f.Id).filter((id): id is string => typeof id === 'string'); + return { failedIds }; + } catch (err) { + tracer.scope().active()?.setTag('sqs.send.error', err); + return { failedIds: entries.map((e) => e.Id!).filter((id): id is string => typeof id === 'string') }; + } + } +} + +function toEntry(message: WebhookDispatchMessage, index: number, messageGroupId: string): SendMessageBatchRequestEntry { + return { + Id: indexToEntryId(index), + MessageBody: JSON.stringify(message), + MessageGroupId: messageGroupId + }; +} + +function indexToEntryId(index: number): string { + return `m${index}`; +} + +function entryIdToIndex(id: string): number { + return Number.parseInt(id.slice(1), 10); +} + +function chunk(items: T[], size: number): T[][] { + const chunks: T[][] = []; + for (let i = 0; i < items.length; i += size) { + chunks.push(items.slice(i, i + size)); + } + return chunks; +} diff --git a/packages/server/lib/webhook/dispatch-queue/publisher.unit.test.ts b/packages/server/lib/webhook/dispatch-queue/publisher.unit.test.ts new file mode 100644 index 0000000000..1c16392137 --- /dev/null +++ b/packages/server/lib/webhook/dispatch-queue/publisher.unit.test.ts @@ -0,0 +1,263 @@ +import { SendMessageBatchCommand } from '@aws-sdk/client-sqs'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +const tracerMocks = vi.hoisted(() => { + const span = { + setTag: vi.fn().mockReturnThis(), + finish: vi.fn() + }; + const activeSpan = { + traceId: 'active-trace', + setTag: vi.fn().mockReturnThis() + }; + + return { + activeSpan, + active: vi.fn(() => activeSpan), + activate: vi.fn(async (_span, callback: () => Promise) => await callback()), + startSpan: vi.fn(() => span), + span, + dogstatsd: { + increment: vi.fn(), + decrement: vi.fn(), + gauge: vi.fn(), + histogram: vi.fn(), + distribution: vi.fn() + } + }; +}); + +vi.mock('dd-trace', () => { + return { + default: { + scope: () => ({ active: tracerMocks.active, activate: tracerMocks.activate }), + startSpan: tracerMocks.startSpan, + dogstatsd: tracerMocks.dogstatsd + } + }; +}); + +import { DispatchQueuePublisher } from './publisher.js'; + +import type { SQSClient, SendMessageBatchCommandOutput } from '@aws-sdk/client-sqs'; +import type { WebhookDispatchMessage } from '@nangohq/types'; + +function buildMessage(overrides: Partial = {}): WebhookDispatchMessage { + return { + version: 1, + kind: 'webhook', + taskName: 'webhook:abc123', + createdAt: '2026-04-23T00:00:00.000Z', + accountId: 1, + integrationId: 3, + provider: 'github', + parentSyncName: 'sync-1', + activityLogId: 'log-1', + webhookName: 'sync-1', + connection: { id: 42, connection_id: 'conn-1', provider_config_key: 'github-dev', environment_id: 2 }, + payload: { hello: 'world' }, + ...overrides + }; +} + +function makeSqsMock( + responder: (command: SendMessageBatchCommand, callIndex: number) => SendMessageBatchCommandOutput | Promise +): { sqs: SQSClient; send: ReturnType } { + const send = vi.fn(); + let callIndex = 0; + send.mockImplementation(async (command: unknown) => { + if (!(command instanceof SendMessageBatchCommand)) { + throw new Error(`Unexpected command: ${String(command)}`); + } + return await responder(command, callIndex++); + }); + return { sqs: { send } as unknown as SQSClient, send }; +} + +function successfulBatchResponse(command: SendMessageBatchCommand): SendMessageBatchCommandOutput { + return { + $metadata: {}, + Successful: (command.input.Entries ?? []).map((e) => ({ Id: e.Id!, MessageId: 'x', MD5OfMessageBody: 'x' })), + Failed: [] + }; +} + +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +describe('DispatchQueuePublisher', () => { + beforeEach(() => { + vi.restoreAllMocks(); + tracerMocks.active.mockClear(); + tracerMocks.activate.mockClear(); + tracerMocks.startSpan.mockClear(); + tracerMocks.span.setTag.mockClear(); + tracerMocks.span.finish.mockClear(); + tracerMocks.activeSpan.setTag.mockClear(); + tracerMocks.dogstatsd.increment.mockClear(); + tracerMocks.dogstatsd.decrement.mockClear(); + tracerMocks.dogstatsd.gauge.mockClear(); + tracerMocks.dogstatsd.histogram.mockClear(); + tracerMocks.dogstatsd.distribution.mockClear(); + }); + + it('throws on invalid batchSize', () => { + const { sqs } = makeSqsMock(() => ({ $metadata: {}, Successful: [], Failed: [] })); + expect(() => new DispatchQueuePublisher({ sqs, queueUrl: 'http://q', batchSize: 0 })).toThrow('batchSize must be > 0'); + expect(() => new DispatchQueuePublisher({ sqs, queueUrl: 'http://q', batchSize: -1 })).toThrow('batchSize must be > 0'); + }); + + it('throws on invalid publishConcurrency', () => { + const { sqs } = makeSqsMock(() => ({ $metadata: {}, Successful: [], Failed: [] })); + expect(() => new DispatchQueuePublisher({ sqs, queueUrl: 'http://q', publishConcurrency: 0 })).toThrow('publishConcurrency must be > 0'); + }); + + it('returns {enqueued:0,failed:0} for empty input', async () => { + const { sqs, send } = makeSqsMock(() => ({ $metadata: {}, Successful: [], Failed: [] })); + const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); + const res = await publisher.publish([], 'account:1:env:2'); + expect(res).toEqual({ enqueued: 0, failed: 0 }); + expect(send.mock.calls).toHaveLength(0); + }); + + it('chunks into batches of <=10 for a 25-message input', async () => { + const { sqs, send } = makeSqsMock((cmd) => successfulBatchResponse(cmd)); + const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); + const messages = Array.from({ length: 25 }, () => buildMessage()); + const res = await publisher.publish(messages, 'account:1:env:2'); + expect(res).toEqual({ enqueued: 25, failed: 0 }); + expect(send.mock.calls).toHaveLength(3); + const entryCounts = send.mock.calls.map((call) => (call[0] as SendMessageBatchCommand).input.Entries?.length); + expect(entryCounts).toEqual([10, 10, 5]); + expect(tracerMocks.startSpan).toHaveBeenCalledWith('webhook.dispatch.publish', { + childOf: expect.objectContaining({ traceId: 'active-trace' }), + tags: expect.objectContaining({ + 'nango.accountId': 1, + 'nango.environmentId': 2, + 'nango.integrationId': 3, + 'nango.provider': 'github', + 'nango.providerConfigKey': 'github-dev', + 'nango.messageCount': 25, + 'nango.batchCount': 3 + }) + }); + expect(tracerMocks.activate).toHaveBeenCalledWith(tracerMocks.span, expect.any(Function)); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.enqueued', 25); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.failed', 0); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.retriedEntries', 0); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.retriedBatches', 0); + expect(tracerMocks.dogstatsd.increment).toHaveBeenCalledTimes(1); + expect(tracerMocks.dogstatsd.increment).toHaveBeenCalledWith('nango.webhook.dispatch_queue.publish.success', 25, { provider: 'github' }); + expect(tracerMocks.span.finish).toHaveBeenCalledTimes(1); + }); + + it('sets MessageGroupId on every entry', async () => { + const groupId = 'account:7:env:9'; + const seen: string[] = []; + const { sqs } = makeSqsMock((cmd) => { + for (const entry of cmd.input.Entries ?? []) { + if (entry.MessageGroupId) seen.push(entry.MessageGroupId); + } + return successfulBatchResponse(cmd); + }); + const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); + await publisher.publish([buildMessage(), buildMessage()], groupId); + expect(seen).toEqual([groupId, groupId]); + }); + + it('limits concurrent batch publishes to publishConcurrency', async () => { + const responses = Array.from({ length: 4 }, () => deferred()); + let inFlight = 0; + let maxInFlight = 0; + + const { sqs, send } = makeSqsMock((_, call) => { + inFlight += 1; + maxInFlight = Math.max(maxInFlight, inFlight); + return responses[call]!.promise.finally(() => { + inFlight -= 1; + }); + }); + + const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q', batchSize: 1, publishConcurrency: 2 }); + const publishPromise = publisher.publish( + Array.from({ length: 4 }, () => buildMessage()), + 'account:1:env:2' + ); + + await vi.waitFor(() => { + expect(send.mock.calls).toHaveLength(2); + }); + expect(maxInFlight).toBe(2); + + responses[0]!.resolve(successfulBatchResponse(send.mock.calls[0]![0] as SendMessageBatchCommand)); + responses[1]!.resolve(successfulBatchResponse(send.mock.calls[1]![0] as SendMessageBatchCommand)); + + await vi.waitFor(() => { + expect(send.mock.calls).toHaveLength(4); + }); + expect(maxInFlight).toBe(2); + + responses[2]!.resolve(successfulBatchResponse(send.mock.calls[2]![0] as SendMessageBatchCommand)); + responses[3]!.resolve(successfulBatchResponse(send.mock.calls[3]![0] as SendMessageBatchCommand)); + + await expect(publishPromise).resolves.toEqual({ enqueued: 4, failed: 0 }); + }); + + it('retries failed entries once and reports remaining failures', async () => { + const responses: SendMessageBatchCommandOutput[] = [ + { + $metadata: {}, + Successful: [{ Id: 'm0', MessageId: 'ok', MD5OfMessageBody: 'x' }], + Failed: [ + { Id: 'm1', SenderFault: false, Code: 'InternalError', Message: 'boom' }, + { Id: 'm2', SenderFault: false, Code: 'InternalError', Message: 'boom' } + ] + }, + // second call is the retry of m1, m2 — only m1 recovers + { + $metadata: {}, + Successful: [{ Id: 'm1', MessageId: 'ok', MD5OfMessageBody: 'x' }], + Failed: [{ Id: 'm2', SenderFault: false, Code: 'InternalError', Message: 'still broken' }] + } + ]; + const { sqs, send } = makeSqsMock((_n, call) => responses[call]!); + const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); + const res = await publisher.publish([buildMessage(), buildMessage(), buildMessage()], 'account:1:env:2'); + expect(res).toEqual({ enqueued: 2, failed: 1 }); + expect(send.mock.calls).toHaveLength(2); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.enqueued', 2); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.failed', 1); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.retriedEntries', 2); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.retriedBatches', 1); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.partialFailure', true); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('error', expect.any(Error)); + expect(tracerMocks.dogstatsd.increment).toHaveBeenCalledTimes(2); + expect(tracerMocks.dogstatsd.increment).toHaveBeenCalledWith('nango.webhook.dispatch_queue.publish.success', 2, { provider: 'github' }); + expect(tracerMocks.dogstatsd.increment).toHaveBeenCalledWith('nango.webhook.dispatch_queue.publish.failure', 1, { provider: 'github' }); + expect(tracerMocks.span.finish).toHaveBeenCalledTimes(1); + }); + + it('treats an SDK throw as all-entries-failed and still retries', async () => { + let call = 0; + const { sqs, send } = makeSqsMock(() => { + call++; + if (call === 1) throw new Error('network'); + return { $metadata: {}, Successful: [{ Id: 'm0', MessageId: 'ok', MD5OfMessageBody: 'x' }], Failed: [] }; + }); + const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); + const res = await publisher.publish([buildMessage()], 'account:1:env:2'); + expect(res).toEqual({ enqueued: 1, failed: 0 }); + expect(send.mock.calls).toHaveLength(2); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.retriedEntries', 1); + expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.retriedBatches', 1); + expect(tracerMocks.span.setTag.mock.calls.filter(([tag]) => tag === 'error')).toHaveLength(0); + expect(tracerMocks.activeSpan.setTag).toHaveBeenCalledWith('sqs.send.error', expect.any(Error)); + }); +}); diff --git a/packages/server/lib/webhook/runWithConcurrencyLimit.ts b/packages/server/lib/webhook/runWithConcurrencyLimit.ts new file mode 100644 index 0000000000..b4cb645898 --- /dev/null +++ b/packages/server/lib/webhook/runWithConcurrencyLimit.ts @@ -0,0 +1,20 @@ +export async function runWithConcurrencyLimit(items: T[], concurrency: number, worker: (item: T, index: number) => Promise): Promise { + const results: R[] = new Array(items.length); + const workerCount = Math.min(concurrency, items.length); + let nextIndex = 0; + + await Promise.all( + Array.from({ length: workerCount }, async () => { + while (true) { + const index = nextIndex++; + if (index >= items.length) { + return; + } + + results[index] = await worker(items[index]!, index); + } + }) + ); + + return results; +} diff --git a/packages/server/package.json b/packages/server/package.json index a8fca8b0d1..43ad1a7c2f 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -19,6 +19,7 @@ }, "license": "SEE LICENSE IN LICENSE FILE IN GIT REPOSITORY", "dependencies": { + "@aws-sdk/client-sqs": "3.993.0", "@modelcontextprotocol/sdk": "1.26.0", "@nangohq/authz": "file:../authz", "@nangohq/usage": "file:../usage", diff --git a/packages/types/lib/index.ts b/packages/types/lib/index.ts index 1230e09a97..e51dfb73e5 100644 --- a/packages/types/lib/index.ts +++ b/packages/types/lib/index.ts @@ -80,6 +80,7 @@ export type * from './environment/api/otlp.js'; export type * from './environment/variable/index.js'; export type * from './environment/variable/api.js'; export type * from './webhooks/api.js'; +export type * from './webhooks/dispatch.js'; export type * from './webhooks/http.api.js'; export type * from './flow/http.api.js'; export type * from './flow/index.js'; diff --git a/packages/types/lib/webhooks/dispatch.ts b/packages/types/lib/webhooks/dispatch.ts new file mode 100644 index 0000000000..b4dc9cfa50 --- /dev/null +++ b/packages/types/lib/webhooks/dispatch.ts @@ -0,0 +1,33 @@ +/** + * SQS message envelope for the webhook task-dispatch queue. + * + * The server produces one of these per (syncConfig × webhook subscription × connection) + * triple resulting from an inbound provider webhook. The jobs consumer parses these and + * calls the orchestrator to schedule the actual webhook task using `taskName` as the + * idempotency key. + */ +export interface WebhookDispatchMessage { + version: 1; + kind: 'webhook'; + /** + * Deterministic scheduler task name; used by the orchestrator as the + * idempotency key so duplicate SQS deliveries resolve to the same task. + */ + taskName: string; + createdAt: string; + accountId: number; + integrationId: number; + provider: string; + parentSyncName: string; + /** Webhook subscription name matched on the inbound payload; passed to executeWebhook as args.webhookName. */ + webhookName: string; + /** Activity log id created before enqueue; also reused as the taskName dedupe seed. */ + activityLogId: string; + connection: { + id: number; + connection_id: string; + provider_config_key: string; + environment_id: number; + }; + payload: unknown; +} diff --git a/packages/utils/lib/environment/parse.ts b/packages/utils/lib/environment/parse.ts index 9580b2050e..413a5d853f 100644 --- a/packages/utils/lib/environment/parse.ts +++ b/packages/utils/lib/environment/parse.ts @@ -549,7 +549,7 @@ export const ENVS = z.object({ NANGO_TASK_DISPATCH_VISIBILITY_TIMEOUT_SECONDS: z.coerce.number().min(0).max(43200).optional().default(30), NANGO_TASK_DISPATCH_CONSUMER_CONCURRENCY: z.coerce.number().min(1).optional().default(50), NANGO_TASK_DISPATCH_PUBLISH_BATCH_SIZE: z.coerce.number().min(1).max(10).optional().default(10), - NANGO_TASK_DISPATCH_PUBLISH_CONCURRENCY: z.coerce.number().min(1).optional().default(5), + NANGO_TASK_DISPATCH_PUBLISH_CONCURRENCY: z.coerce.number().min(1).optional().default(10), // E2B sandboxes E2B_API_KEY: z.string().optional(), diff --git a/packages/utils/lib/environment/parse.unit.test.ts b/packages/utils/lib/environment/parse.unit.test.ts index 6fc9a63fd9..e13c354bfc 100644 --- a/packages/utils/lib/environment/parse.unit.test.ts +++ b/packages/utils/lib/environment/parse.unit.test.ts @@ -200,7 +200,7 @@ describe('parse', () => { NANGO_TASK_DISPATCH_VISIBILITY_TIMEOUT_SECONDS: 30, NANGO_TASK_DISPATCH_CONSUMER_CONCURRENCY: 50, NANGO_TASK_DISPATCH_PUBLISH_BATCH_SIZE: 10, - NANGO_TASK_DISPATCH_PUBLISH_CONCURRENCY: 5 + NANGO_TASK_DISPATCH_PUBLISH_CONCURRENCY: 10 }); expect(res.NANGO_TASK_DISPATCH_QUEUE_URL).toBeUndefined(); expect(res.NANGO_TASK_DISPATCH_DLQ_URL).toBeUndefined(); diff --git a/packages/utils/lib/telemetry/metrics.ts b/packages/utils/lib/telemetry/metrics.ts index cc16aba214..1bdf7adc02 100644 --- a/packages/utils/lib/telemetry/metrics.ts +++ b/packages/utils/lib/telemetry/metrics.ts @@ -68,6 +68,16 @@ export enum Types { WEBHOOK_ASYNC_ACTION_FAILED = 'nango.webhook.async_action.failed', WEBHOOK_INCOMING_PAYLOAD_SIZE_BYTES = 'nango.webhook.incoming.payloadSizeBytes', + WEBHOOK_DISPATCH_PUBLISH_SUCCESS = 'nango.webhook.dispatch_queue.publish.success', + WEBHOOK_DISPATCH_PUBLISH_FAILURE = 'nango.webhook.dispatch_queue.publish.failure', + WEBHOOK_DISPATCH_LARGE_FANOUT = 'nango.webhook.dispatch_queue.large_fanout', + WEBHOOK_DISPATCH_CONSUME_SUCCESS = 'nango.webhook.dispatch_queue.consume.success', + WEBHOOK_DISPATCH_CONSUME_FAILURE = 'nango.webhook.dispatch_queue.consume.failure', + WEBHOOK_DISPATCH_SCHEDULE_SUCCESS = 'nango.webhook.dispatch_queue.schedule.success', + WEBHOOK_DISPATCH_SCHEDULE_FAILURE = 'nango.webhook.dispatch_queue.schedule.failure', + WEBHOOK_DISPATCH_POISON_PILL = 'nango.webhook.dispatch_queue.poison_pill', + WEBHOOK_DISPATCH_DWELL_MS = 'nango.webhook.dispatch_queue.dwell_ms', + ORCH_TASKS_CREATED = 'nango.orch.tasks.created', ORCH_TASKS_DROPPED = 'nango.orch.tasks.dropped', ORCH_TASKS_STARTED = 'nango.orch.tasks.started', From 246175862ca7dcb34868b979488248079602e6a6 Mon Sep 17 00:00:00 2001 From: agusayerza Date: Thu, 30 Apr 2026 10:19:04 -0300 Subject: [PATCH 2/7] feat(webhooks): add jobs dispatch queue consumer (NAN-5341) Adds a DispatchQueueConsumer in the jobs service that pulls webhook execution messages from SQS and dispatches them via orchestratorClient.executeWebhook(). Handles duplicate task name errors, aligns on activity log IDs, and removes redundant metrics. --- packages/jobs/lib/app.ts | 22 ++ .../lib/webhook/dispatch-queue/consumer.ts | 228 +++++++++++++ .../dispatch-queue/consumer.unit.test.ts | 302 ++++++++++++++++++ .../lib/clients/client.integration.test.ts | 35 ++ packages/orchestrator/lib/clients/client.ts | 39 ++- .../lib/clients/client.unit.test.ts | 100 ++++++ .../lib/routes/v1/postImmediate.ts | 13 +- packages/scheduler/lib/errors.ts | 10 + packages/scheduler/lib/index.ts | 1 + .../lib/models/tasks.integration.test.ts | 12 + packages/scheduler/lib/models/tasks.ts | 19 +- .../lib/scheduler.integration.test.ts | 27 ++ packages/types/lib/webhooks/dispatch.ts | 6 +- packages/utils/lib/telemetry/metrics.ts | 2 - 14 files changed, 807 insertions(+), 9 deletions(-) create mode 100644 packages/jobs/lib/webhook/dispatch-queue/consumer.ts create mode 100644 packages/jobs/lib/webhook/dispatch-queue/consumer.unit.test.ts create mode 100644 packages/orchestrator/lib/clients/client.unit.test.ts create mode 100644 packages/scheduler/lib/errors.ts diff --git a/packages/jobs/lib/app.ts b/packages/jobs/lib/app.ts index 3603348765..1066f0d6d7 100644 --- a/packages/jobs/lib/app.ts +++ b/packages/jobs/lib/app.ts @@ -7,12 +7,14 @@ import { destroy as destroyLogs, otlp } from '@nangohq/logs'; import { getOtlpRoutes } from '@nangohq/shared'; import { getLogger, initSentry, once, report, stringifyError } from '@nangohq/utils'; +import { orchestratorClient } from './clients.js'; import { envs } from './env.js'; import { LambdaInvocationsProcessor } from './invocations/lambda.processor.js'; import { Processor } from './processor/processor.js'; import { getDefaultFleet, startFleets, stopFleets } from './runtime/runtimes.js'; import { server } from './server.js'; import { pubsub } from './utils/pubsub.js'; +import { DispatchQueueConsumer } from './webhook/dispatch-queue/consumer.js'; const logger = getLogger('Jobs'); @@ -38,6 +40,18 @@ try { const processor = new Processor(orchestratorUrl); const invocationsProcessor = new LambdaInvocationsProcessor(); + const webhookDispatchConsumer = envs.NANGO_TASK_DISPATCH_QUEUE_URL + ? new DispatchQueueConsumer({ + queueUrl: envs.NANGO_TASK_DISPATCH_QUEUE_URL, + orchestratorClient, + webhookMaxConcurrency: envs.WEBHOOK_ENVIRONMENT_MAX_CONCURRENCY, + consumerConcurrency: envs.NANGO_TASK_DISPATCH_CONSUMER_CONCURRENCY, + maxMessages: envs.NANGO_TASK_DISPATCH_MAX_MESSAGES, + waitTimeSeconds: envs.NANGO_TASK_DISPATCH_WAIT_TIME_SECONDS, + visibilityTimeoutSeconds: envs.NANGO_TASK_DISPATCH_VISIBILITY_TIMEOUT_SECONDS + }) + : undefined; + // We are using a setTimeout because we don't want overlapping setInterval if the DB is down let healthCheck: NodeJS.Timeout | undefined; let healthCheckFailures = 0; @@ -79,6 +93,9 @@ try { await db.readOnly.destroy(); await destroyKvstore(); await invocationsProcessor.stop(); + if (webhookDispatchConsumer) { + await webhookDispatchConsumer.stop(); + } console.info('Closed'); @@ -109,6 +126,11 @@ try { invocationsProcessor.start(); + if (webhookDispatchConsumer) { + webhookDispatchConsumer.start(); + logger.info('webhook dispatch queue consumer started'); + } + void otlp.register(getOtlpRoutes); } catch (err) { logger.error(stringifyError(err)); diff --git a/packages/jobs/lib/webhook/dispatch-queue/consumer.ts b/packages/jobs/lib/webhook/dispatch-queue/consumer.ts new file mode 100644 index 0000000000..ab0b6f2a48 --- /dev/null +++ b/packages/jobs/lib/webhook/dispatch-queue/consumer.ts @@ -0,0 +1,228 @@ +import { DeleteMessageCommand, ReceiveMessageCommand, SQSClient } from '@aws-sdk/client-sqs'; +import tracer from 'dd-trace'; +import * as z from 'zod'; + +import { isDuplicateTaskNameClientError, jsonSchema } from '@nangohq/nango-orchestrator'; +import { Err, Ok, getLogger, metrics, report } from '@nangohq/utils'; + +import { envs } from '../../env.js'; + +import type { Message } from '@aws-sdk/client-sqs'; +import type { OrchestratorClient } from '@nangohq/nango-orchestrator'; +import type { WebhookDispatchMessage } from '@nangohq/types'; +import type { Result } from '@nangohq/utils'; + +const logger = getLogger('jobs.webhook.dispatch-queue.consumer'); + +const messageSchema: z.ZodType = z.object({ + version: z.literal(1), + kind: z.literal('webhook'), + taskName: z.string().min(1), + createdAt: z.string().min(1), + accountId: z.number(), + integrationId: z.number(), + provider: z.string(), + parentSyncName: z.string(), + activityLogId: z.string(), + webhookName: z.string(), + connection: z.object({ + id: z.number(), + connection_id: z.string(), + provider_config_key: z.string(), + environment_id: z.number() + }), + payload: jsonSchema +}); + +export interface DispatchQueueConsumerProps { + queueUrl: string; + orchestratorClient: OrchestratorClient; + webhookMaxConcurrency: number; + consumerConcurrency: number; + maxMessages: number; + waitTimeSeconds: number; + visibilityTimeoutSeconds: number; + sqs?: SQSClient; +} + +export class DispatchQueueConsumer { + private readonly sqs: SQSClient; + private readonly queueUrl: string; + private readonly orchestratorClient: OrchestratorClient; + private readonly webhookMaxConcurrency: number; + private readonly consumerConcurrency: number; + private readonly maxMessages: number; + private readonly waitTimeSeconds: number; + private readonly visibilityTimeoutSeconds: number; + private readonly abortController = new AbortController(); + private loopPromises: Promise[] = []; + + constructor(props: DispatchQueueConsumerProps) { + this.queueUrl = props.queueUrl; + this.orchestratorClient = props.orchestratorClient; + this.webhookMaxConcurrency = props.webhookMaxConcurrency; + this.consumerConcurrency = props.consumerConcurrency; + this.maxMessages = props.maxMessages; + this.waitTimeSeconds = props.waitTimeSeconds; + this.visibilityTimeoutSeconds = props.visibilityTimeoutSeconds; + this.sqs = props.sqs ?? new SQSClient(envs.AWS_REGION ? { region: envs.AWS_REGION } : {}); + } + + start(): void { + if (this.loopPromises.length > 0) { + return; + } + logger.info(`webhook dispatch consumer subscribing to ${this.queueUrl}`, { consumerConcurrency: this.consumerConcurrency }); + this.loopPromises = Array.from({ length: this.consumerConcurrency }, () => this.pollLoop()); + } + + async stop(): Promise { + this.abortController.abort(); + if (this.loopPromises.length > 0) { + await Promise.allSettled(this.loopPromises); + this.loopPromises = []; + } + this.sqs.destroy(); + } + + private async pollLoop(): Promise { + const signal = this.abortController.signal; + while (!signal.aborted) { + try { + const result = await this.sqs.send( + new ReceiveMessageCommand({ + QueueUrl: this.queueUrl, + MaxNumberOfMessages: this.maxMessages, + WaitTimeSeconds: this.waitTimeSeconds, + VisibilityTimeout: this.visibilityTimeoutSeconds, + MessageAttributeNames: ['All'], + MessageSystemAttributeNames: ['SentTimestamp', 'ApproximateReceiveCount'] + }), + { abortSignal: signal } + ); + + const messages = result.Messages ?? []; + if (messages.length === 0) continue; + + // Process all messages in a batch concurrently. Each SQS receive is already + // capped at maxMessages (<=10) so no extra semaphore is needed. + await Promise.all(messages.map((msg) => this.processMessage(msg))); + } catch (err) { + if (err instanceof Error && err.name === 'AbortError') break; + report(new Error('webhook dispatch consumer receive failed', { cause: err })); + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + } + } + + private async processMessage(msg: Message): Promise { + const active = tracer.scope().active(); + const span = tracer.startSpan('jobs.webhook.dispatch_queue.process', { + ...(active ? { childOf: active } : {}) + }); + + return await tracer.scope().activate(span, async () => { + try { + if (msg.Body === undefined || !msg.ReceiptHandle) { + return; + } + + const parsed = this.parseMessage(msg.Body); + if (parsed.isErr()) { + span.setTag('poison_pill', true); + span.setTag('poison_pill_reason', parsed.error.message); + metrics.increment(metrics.Types.WEBHOOK_DISPATCH_POISON_PILL); + // Parse failures are poison pills — redelivering won't help. Delete and move on. + await this.tryDeleteMessage(msg.ReceiptHandle); + return; + } + + const message = parsed.value; + const environmentId = message.connection.environment_id; + span.setTag('taskName', message.taskName); + span.setTag('provider', message.provider); + span.setTag('environmentId', environmentId); + + const sentTimestampMs = Number(msg.Attributes?.['SentTimestamp'] ?? '0'); + if (sentTimestampMs > 0) { + metrics.duration(metrics.Types.WEBHOOK_DISPATCH_DWELL_MS, Date.now() - sentTimestampMs, { provider: message.provider }); + } + + const scheduleRes = await this.orchestratorClient.executeWebhook({ + name: message.taskName, + group: { key: `webhook:environment:${environmentId}`, maxConcurrency: this.webhookMaxConcurrency }, + args: { + webhookName: message.webhookName, + parentSyncName: message.parentSyncName, + connection: message.connection, + activityLogId: message.activityLogId, + input: message.payload + } + }); + + if (scheduleRes.isErr()) { + if (isDuplicateTaskNameClientError(scheduleRes.error)) { + span.setTag('duplicate_task_name', true); + metrics.increment(metrics.Types.WEBHOOK_DISPATCH_CONSUME_SUCCESS, 1, { provider: message.provider }); + await this.tryDeleteMessage(msg.ReceiptHandle); + return; + } + + metrics.increment(metrics.Types.WEBHOOK_DISPATCH_CONSUME_FAILURE, 1, { provider: message.provider }); + span.setTag('error', true); + span.setTag('error.type', scheduleRes.error.name); + span.setTag('error.message', scheduleRes.error.message); + + const responsePayload = getClientErrorResponsePayload(scheduleRes.error); + if (responsePayload) { + span.setTag('error.details', responsePayload); + } + + // Don't delete — let SQS redeliver and eventually DLQ. + return; + } + + metrics.increment(metrics.Types.WEBHOOK_DISPATCH_CONSUME_SUCCESS, 1, { provider: message.provider }); + + await this.tryDeleteMessage(msg.ReceiptHandle); + } finally { + span.finish(); + } + }); + } + + private parseMessage(body: string): Result { + try { + const json = JSON.parse(body); + const result = messageSchema.safeParse(json); + if (!result.success) { + return Err('invalid_schema'); + } + return Ok(result.data); + } catch (_err) { + return Err('json_parse'); + } + } + + private async tryDeleteMessage(receiptHandle: string): Promise { + try { + await this.sqs.send(new DeleteMessageCommand({ QueueUrl: this.queueUrl, ReceiptHandle: receiptHandle })); + } catch (err) { + report(new Error('webhook dispatch consumer delete failed', { cause: err })); + } + } +} + +function getClientErrorResponsePayload(err: { payload?: unknown }): string | null { + const payload = err.payload; + if (!payload || typeof payload !== 'object' || !('response' in payload)) { + return null; + } + + const responsePayload = payload.response; + if (responsePayload === undefined) { + return null; + } + + return JSON.stringify(responsePayload); +} diff --git a/packages/jobs/lib/webhook/dispatch-queue/consumer.unit.test.ts b/packages/jobs/lib/webhook/dispatch-queue/consumer.unit.test.ts new file mode 100644 index 0000000000..a168efe03b --- /dev/null +++ b/packages/jobs/lib/webhook/dispatch-queue/consumer.unit.test.ts @@ -0,0 +1,302 @@ +import { DeleteMessageCommand, ReceiveMessageCommand } from '@aws-sdk/client-sqs'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +import { Err, Ok } from '@nangohq/utils'; + +vi.mock('../../env.js', () => ({ + envs: { + AWS_REGION: undefined + } +})); + +import { DispatchQueueConsumer } from './consumer.js'; + +import type { SQSClient } from '@aws-sdk/client-sqs'; +import type { OrchestratorClient } from '@nangohq/nango-orchestrator'; +import type { WebhookDispatchMessage } from '@nangohq/types'; + +function buildMessage(overrides: Partial = {}): WebhookDispatchMessage { + return { + version: 1, + kind: 'webhook', + taskName: 'webhook:abc123', + createdAt: '2026-04-23T00:00:00.000Z', + accountId: 1, + integrationId: 3, + provider: 'github', + parentSyncName: 'sync-1', + activityLogId: 'log-1', + webhookName: 'push', + connection: { id: 42, connection_id: 'conn-1', provider_config_key: 'github-dev', environment_id: 2 }, + payload: { hello: 'world' }, + ...overrides + }; +} + +function abortError(): Error { + const error = new Error('aborted'); + error.name = 'AbortError'; + return error; +} + +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +interface Harness { + consumer: DispatchQueueConsumer; + sqsSend: ReturnType; + sqsDestroy: ReturnType; + orchestratorExecuteWebhook: ReturnType; +} + +function makeHarness( + opts: { + messages?: WebhookDispatchMessage[]; + badBody?: string; + consumerConcurrency?: number; + sqsSend?: ReturnType; + } = {} +): Harness { + const messages = opts.messages ?? []; + const bodyQueue: { Body: string; ReceiptHandle: string; Attributes: Record }[] = []; + for (let i = 0; i < messages.length; i++) { + bodyQueue.push({ Body: JSON.stringify(messages[i]), ReceiptHandle: `rh-${i}`, Attributes: { SentTimestamp: String(Date.now() - 500) } }); + } + if (opts.badBody !== undefined) { + bodyQueue.push({ Body: opts.badBody, ReceiptHandle: `rh-bad`, Attributes: { SentTimestamp: String(Date.now()) } }); + } + + const sqsSend = + opts.sqsSend ?? + vi.fn(async (command: unknown) => { + await new Promise((resolve) => setImmediate(resolve)); + if (command instanceof ReceiveMessageCommand) { + const messages = bodyQueue.splice(0, bodyQueue.length); + return { Messages: messages }; + } + if (command instanceof DeleteMessageCommand) { + return {}; + } + throw new Error(`unexpected command ${String(command)}`); + }); + + const sqsDestroy = vi.fn(); + const sqs = { send: sqsSend, destroy: sqsDestroy } as unknown as SQSClient; + + const orchestratorExecuteWebhook = vi.fn(); + orchestratorExecuteWebhook.mockResolvedValue(Ok({ taskId: 'task-1', retryKey: 'rk-1' })); + const orchestratorClient = { executeWebhook: orchestratorExecuteWebhook } as unknown as OrchestratorClient; + + const consumer = new DispatchQueueConsumer({ + sqs, + queueUrl: 'http://queue', + orchestratorClient, + webhookMaxConcurrency: 500, + consumerConcurrency: opts.consumerConcurrency ?? 1, + maxMessages: 10, + waitTimeSeconds: 0, + visibilityTimeoutSeconds: 30 + }); + + return { consumer, sqsSend, sqsDestroy, orchestratorExecuteWebhook }; +} + +function getDeleteCalls(h: Harness) { + return h.sqsSend.mock.calls.filter((c) => c[0] instanceof DeleteMessageCommand); +} + +async function runOnce(h: Harness, waitFor: () => void | Promise): Promise { + h.consumer.start(); + await vi.waitFor(waitFor); + await h.consumer.stop(); +} + +describe('DispatchQueueConsumer', () => { + beforeEach(() => { + vi.restoreAllMocks(); + }); + + it('schedules a valid message and deletes it on success', async () => { + const msg = buildMessage(); + const h = makeHarness({ messages: [msg] }); + await runOnce(h, () => { + expect(getDeleteCalls(h)).toHaveLength(1); + }); + + expect(h.orchestratorExecuteWebhook).toHaveBeenCalledTimes(1); + const call = h.orchestratorExecuteWebhook.mock.calls[0]?.[0]; + expect(call).toMatchObject({ + name: msg.taskName, + group: { key: 'webhook:environment:2', maxConcurrency: 500 }, + args: { + webhookName: msg.webhookName, + parentSyncName: msg.parentSyncName, + connection: msg.connection, + activityLogId: msg.activityLogId, + input: msg.payload + } + }); + const deleteCalls = getDeleteCalls(h); + expect(deleteCalls).toHaveLength(1); + expect(h.sqsDestroy).toHaveBeenCalledOnce(); + }); + + it('treats duplicate task-name scheduling errors as already processed and deletes the message', async () => { + const h = makeHarness({ messages: [buildMessage()] }); + h.orchestratorExecuteWebhook.mockResolvedValueOnce( + Err({ + name: 'duplicate_task_name', + message: 'Task with name already exists', + payload: { taskName: 'webhook:abc123' } + }) + ); + + await runOnce(h, () => { + expect(getDeleteCalls(h)).toHaveLength(1); + }); + + expect(h.orchestratorExecuteWebhook).toHaveBeenCalledTimes(1); + const deleteCalls = getDeleteCalls(h); + expect(deleteCalls).toHaveLength(1); + }); + + it('does not delete when orchestrator returns a non-duplicate error', async () => { + const h = makeHarness({ messages: [buildMessage()] }); + h.orchestratorExecuteWebhook.mockResolvedValueOnce(Err({ name: 'boom', message: 'boom', payload: null })); + + await runOnce(h, () => { + expect(h.orchestratorExecuteWebhook).toHaveBeenCalledTimes(1); + }); + + expect(h.orchestratorExecuteWebhook).toHaveBeenCalledTimes(1); + const deleteCalls = getDeleteCalls(h); + expect(deleteCalls).toHaveLength(0); + }); + + it('deletes a poison-pill message without calling orchestrator', async () => { + const h = makeHarness({ badBody: 'not-json' }); + await runOnce(h, () => { + expect(getDeleteCalls(h)).toHaveLength(1); + }); + + expect(h.orchestratorExecuteWebhook).not.toHaveBeenCalled(); + const deleteCalls = getDeleteCalls(h); + expect(deleteCalls).toHaveLength(1); + }); + + it('treats an empty-body message as a poison pill and deletes it', async () => { + const h = makeHarness({ badBody: '' }); + await runOnce(h, () => { + expect(getDeleteCalls(h)).toHaveLength(1); + }); + + expect(h.orchestratorExecuteWebhook).not.toHaveBeenCalled(); + const deleteCalls = getDeleteCalls(h); + expect(deleteCalls).toHaveLength(1); + }); + + it('rejects a schema-invalid message as poison and deletes it', async () => { + const invalid = { ...buildMessage(), kind: 'wrong' }; + const h = makeHarness({ badBody: JSON.stringify(invalid) }); + await runOnce(h, () => { + expect(getDeleteCalls(h)).toHaveLength(1); + }); + + expect(h.orchestratorExecuteWebhook).not.toHaveBeenCalled(); + const deleteCalls = getDeleteCalls(h); + expect(deleteCalls).toHaveLength(1); + }); + + it('still deletes successfully scheduled messages during graceful shutdown', async () => { + const message = buildMessage(); + const scheduled = deferred>(); + let received = false; + const sqsSend = vi.fn(async (command: unknown, options?: { abortSignal?: AbortSignal }) => { + await new Promise((resolve) => setImmediate(resolve)); + if (command instanceof ReceiveMessageCommand) { + if (!received) { + received = true; + return { + Messages: [{ Body: JSON.stringify(message), ReceiptHandle: 'rh-0', Attributes: { SentTimestamp: String(Date.now() - 500) } }] + }; + } + + if (options?.abortSignal?.aborted) { + throw abortError(); + } + return { Messages: [] }; + } + + if (command instanceof DeleteMessageCommand) { + if (options?.abortSignal?.aborted) { + throw abortError(); + } + return {}; + } + + throw new Error(`unexpected command ${String(command)}`); + }); + + const h = makeHarness({ messages: [message], sqsSend }); + h.orchestratorExecuteWebhook.mockReturnValueOnce(scheduled.promise); + + h.consumer.start(); + await vi.waitFor(() => { + expect(h.orchestratorExecuteWebhook).toHaveBeenCalledOnce(); + }); + + const stopPromise = h.consumer.stop(); + scheduled.resolve(Ok({ taskId: 'task-1', retryKey: 'rk-1' })); + await stopPromise; + + const deleteCalls = h.sqsSend.mock.calls.filter((c) => c[0] instanceof DeleteMessageCommand); + expect(deleteCalls).toHaveLength(1); + expect(deleteCalls[0]?.[1]).toBeUndefined(); + }); + + it('starts one poll loop per configured consumerConcurrency', async () => { + let receiveCalls = 0; + const firstReceives = [deferred(), deferred()]; + const sqsSend = vi.fn(async (command: unknown, options?: { abortSignal?: AbortSignal }) => { + if (command instanceof ReceiveMessageCommand) { + const index = receiveCalls++; + const pending = firstReceives[index]; + if (pending) { + return await new Promise((resolve, reject) => { + const onAbort = () => reject(abortError()); + options?.abortSignal?.addEventListener('abort', onAbort, { once: true }); + pending.promise.then(() => resolve({ Messages: [] }), reject); + }); + } + + if (options?.abortSignal?.aborted) { + throw abortError(); + } + return { Messages: [] }; + } + + if (command instanceof DeleteMessageCommand) { + return {}; + } + + throw new Error(`unexpected command ${String(command)}`); + }); + + const h = makeHarness({ consumerConcurrency: 2, sqsSend }); + h.consumer.start(); + + await vi.waitFor(() => { + expect(receiveCalls).toBe(2); + }); + + await h.consumer.stop(); + expect(h.sqsDestroy).toHaveBeenCalledOnce(); + }); +}); diff --git a/packages/orchestrator/lib/clients/client.integration.test.ts b/packages/orchestrator/lib/clients/client.integration.test.ts index 8fc27b0928..174d4eba18 100644 --- a/packages/orchestrator/lib/clients/client.integration.test.ts +++ b/packages/orchestrator/lib/clients/client.integration.test.ts @@ -163,6 +163,41 @@ describe('OrchestratorClient', async () => { }); }); + describe('immediate', () => { + it('should return a structured duplicate-name error when task name already exists', async () => { + const name = nanoid(); + const groupKey = nanoid(); + const request = { + name, + group: { key: groupKey, maxConcurrency: 0 }, + retry: { count: 0, max: 0 }, + timeoutSettingsInSecs: { createdToStarted: 30, startedToCompleted: 30, heartbeat: 60 }, + args: { + type: 'action' as const, + actionName: nanoid(), + connection: { + id: 123, + connection_id: 'C', + provider_config_key: 'P', + environment_id: 456 + }, + activityLogId: '789', + input: { foo: 'bar' } + } + }; + + const first = await client.immediate(request); + expect(first.isOk()).toBe(true); + + const duplicate = await client.immediate(request); + expect(duplicate.isErr()).toBe(true); + if (duplicate.isErr()) { + expect(duplicate.error.name).toBe('duplicate_task_name'); + expect(duplicate.error.payload).toEqual({}); + } + }); + }); + describe('executeAction', () => { it('should be successful when action task succeed', async () => { const groupKey = nanoid(); diff --git a/packages/orchestrator/lib/clients/client.ts b/packages/orchestrator/lib/clients/client.ts index 7ba34d5fb0..7104a8cc8c 100644 --- a/packages/orchestrator/lib/clients/client.ts +++ b/packages/orchestrator/lib/clients/client.ts @@ -65,6 +65,15 @@ export class OrchestratorClient { public async immediate(props: ImmediateProps): Promise> { const res = await this.routeFetch(postImmediateRoute)({ body: props }); if ('error' in res) { + const duplicateMessage = getDuplicateTaskNameMessage(res.error.payload); + if (duplicateMessage !== null) { + return Err({ + name: 'duplicate_task_name', + message: duplicateMessage || 'Task with this name already exists', + payload: {} + }); + } + return Err({ name: res.error.code, message: res.error.message || `Error scheduling immediate task`, @@ -255,7 +264,7 @@ export class OrchestratorClient { public async executeWebhook(props: ExecuteWebhookProps): Promise { const { args, ...rest } = props; - const schedulingProps = { + const schedulingProps: ImmediateProps = { ...rest, retry: { count: 0, max: 0 }, timeoutSettingsInSecs: { @@ -517,3 +526,31 @@ export class OrchestratorClient { } } } + +function getDuplicateTaskNameMessage(payload: unknown): string | null { + if (!payload || typeof payload !== 'object' || !('error' in payload)) { + return null; + } + + const response = payload as { + error?: { + code?: string; + message?: string; + }; + }; + + if (response.error?.code !== 'duplicate_task_name') { + return null; + } + + return response.error.message || ''; +} + +export function isDuplicateTaskNameClientError(err: unknown): boolean { + if (!err || typeof err !== 'object') { + return false; + } + + const error = err as { name?: string }; + return error.name === 'duplicate_task_name'; +} diff --git a/packages/orchestrator/lib/clients/client.unit.test.ts b/packages/orchestrator/lib/clients/client.unit.test.ts new file mode 100644 index 0000000000..67522d8252 --- /dev/null +++ b/packages/orchestrator/lib/clients/client.unit.test.ts @@ -0,0 +1,100 @@ +import { afterEach, describe, expect, it, vi } from 'vitest'; + +import { OrchestratorClient } from './client.js'; + +import type { ImmediateProps } from './types.js'; + +function buildImmediateRequest(): ImmediateProps { + return { + name: 'task-1', + group: { key: 'group-1', maxConcurrency: 0 }, + retry: { count: 0, max: 0 }, + timeoutSettingsInSecs: { createdToStarted: 30, startedToCompleted: 30, heartbeat: 60 }, + args: { + type: 'action', + actionName: 'action-1', + connection: { + id: 123, + connection_id: 'connection-1', + provider_config_key: 'provider-config-key-1', + environment_id: 456 + }, + activityLogId: 'activity-log-1', + input: { foo: 'bar' }, + async: false + } + }; +} + +describe('OrchestratorClient immediate', () => { + afterEach(() => { + vi.restoreAllMocks(); + vi.unstubAllGlobals(); + }); + + it('maps duplicate-name conflicts from the API while preserving existing retries', async () => { + const fetchMock = vi.fn().mockImplementation(() => { + return new Response( + JSON.stringify({ + error: { + code: 'duplicate_task_name', + message: 'task already exists' + } + }), + { status: 409, headers: { 'content-type': 'application/json' } } + ); + }); + vi.stubGlobal('fetch', fetchMock); + + const client = new OrchestratorClient({ baseUrl: 'http://orchestrator.test' }); + const res = await client.immediate(buildImmediateRequest()); + + expect(res.isErr()).toBe(true); + if (res.isErr()) { + expect(res.error.name).toBe('duplicate_task_name'); + expect(res.error.payload).toEqual({}); + } + expect(fetchMock).toHaveBeenCalledTimes(3); + }); + + it('retries transient 5xx responses', async () => { + const fetchMock = vi + .fn() + .mockResolvedValueOnce( + new Response(JSON.stringify({ error: { code: 'server_error', message: 'temporary failure' } }), { + status: 500, + headers: { 'content-type': 'application/json' } + }) + ) + .mockResolvedValueOnce( + new Response(JSON.stringify({ error: { code: 'server_error', message: 'temporary failure' } }), { + status: 500, + headers: { 'content-type': 'application/json' } + }) + ) + .mockResolvedValueOnce(new Response(JSON.stringify({ taskId: 'task-1', retryKey: 'retry-key-1' }), { status: 200 })); + vi.stubGlobal('fetch', fetchMock); + + const client = new OrchestratorClient({ baseUrl: 'http://orchestrator.test' }); + const res = await client.immediate(buildImmediateRequest()); + + expect(res.isOk()).toBe(true); + expect(fetchMock).toHaveBeenCalledTimes(3); + }); + + it('preserves existing retries on non-immediate route errors', async () => { + const fetchMock = vi.fn().mockResolvedValue( + new Response(JSON.stringify({ error: { code: 'schedule_not_found', message: 'missing schedule' } }), { + status: 404, + headers: { 'content-type': 'application/json' } + }) + ); + vi.stubGlobal('fetch', fetchMock); + + const client = new OrchestratorClient({ baseUrl: 'http://orchestrator.test' }); + const res = await client.pauseSync({ scheduleName: 'schedule-1' }); + + expect(res.isErr()).toBe(true); + expect(fetchMock).toHaveBeenCalledTimes(3); + }); +}); diff --git a/packages/orchestrator/lib/routes/v1/postImmediate.ts b/packages/orchestrator/lib/routes/v1/postImmediate.ts index 491ed21aea..e79e4700e0 100644 --- a/packages/orchestrator/lib/routes/v1/postImmediate.ts +++ b/packages/orchestrator/lib/routes/v1/postImmediate.ts @@ -1,5 +1,6 @@ import * as z from 'zod'; +import { isDuplicateTaskNameError } from '@nangohq/scheduler'; import { validateRequest } from '@nangohq/utils'; import { actionArgsSchema, onEventArgsSchema, syncAbortArgsSchema, syncArgsSchema, webhookArgsSchema } from '../../clients/validate.js'; @@ -34,7 +35,7 @@ export type PostImmediate = Endpoint<{ }; args: JsonValue & { type: TaskType }; }; - Error: ApiError<'immediate_failed'>; + Error: ApiError<'immediate_failed' | 'duplicate_task_name'>; Success: { taskId: string; retryKey: string }; }>; @@ -111,6 +112,16 @@ const handler = (scheduler: Scheduler) => { heartbeatTimeoutSecs: res.locals.parsedBody.timeoutSettingsInSecs.heartbeat }); if (task.isErr()) { + if (isDuplicateTaskNameError(task.error)) { + res.status(409).json({ + error: { + code: 'duplicate_task_name', + message: task.error.message + } + }); + return; + } + res.status(500).json({ error: { code: 'immediate_failed', message: task.error.message } }); return; } diff --git a/packages/scheduler/lib/errors.ts b/packages/scheduler/lib/errors.ts new file mode 100644 index 0000000000..a37033b51f --- /dev/null +++ b/packages/scheduler/lib/errors.ts @@ -0,0 +1,10 @@ +export class DuplicateTaskNameError extends Error { + constructor() { + super('Task with this name already exists'); + this.name = 'DuplicateTaskNameError'; + } +} + +export function isDuplicateTaskNameError(err: unknown): boolean { + return err instanceof DuplicateTaskNameError; +} diff --git a/packages/scheduler/lib/index.ts b/packages/scheduler/lib/index.ts index f9a9a6b45f..fb61b8e07e 100644 --- a/packages/scheduler/lib/index.ts +++ b/packages/scheduler/lib/index.ts @@ -1,5 +1,6 @@ export * from './scheduler.js'; export * from './types.js'; +export * from './errors.js'; export * from './db/helpers.test.js'; export * from './db/client.js'; export * from './utils/format.js'; diff --git a/packages/scheduler/lib/models/tasks.integration.test.ts b/packages/scheduler/lib/models/tasks.integration.test.ts index bd23c60da5..08d3457022 100644 --- a/packages/scheduler/lib/models/tasks.integration.test.ts +++ b/packages/scheduler/lib/models/tasks.integration.test.ts @@ -4,6 +4,7 @@ import { nanoid } from '@nangohq/utils'; import * as tasks from './tasks.js'; import { getTestDbClient } from '../db/helpers.test.js'; +import { isDuplicateTaskNameError } from '../errors.js'; import { taskStates } from '../types.js'; import type { Task, TaskState } from '../types.js'; @@ -84,6 +85,17 @@ describe('Task', () => { expect(res.tasks[1]?.name).toBe('Also not capped'); expect(res.cappedGroupKeys).toEqual([props.groupKey]); }); + it('should error on unique-name collision', async () => { + const name = `dup-${nanoid()}`; + const first = await tasks.create(db, [{ ...props, name }]); + expect(first.isOk()).toBe(true); + + const second = await tasks.create(db, [{ ...props, name }]); + expect(second.isErr()).toBe(true); + if (second.isErr()) { + expect(isDuplicateTaskNameError(second.error)).toBe(true); + } + }); it('should have their heartbeat updated', async () => { const t = await startTask(db); await new Promise((resolve) => void setTimeout(resolve, 20)); diff --git a/packages/scheduler/lib/models/tasks.ts b/packages/scheduler/lib/models/tasks.ts index 0c2f995cb7..bcf52dd2d9 100644 --- a/packages/scheduler/lib/models/tasks.ts +++ b/packages/scheduler/lib/models/tasks.ts @@ -2,6 +2,7 @@ import { uuidv4, uuidv7 } from 'uuidv7'; import { Err, Ok, metrics, stringToHash, stringifyError } from '@nangohq/utils'; +import { DuplicateTaskNameError } from '../errors.js'; import { taskStates } from '../types.js'; import { SCHEDULES_TABLE } from './schedules.js'; import { envs } from '../env.js'; @@ -172,21 +173,33 @@ export async function create( metrics.increment(metrics.Types.ORCH_TASKS_DROPPED, droppedCount, { primitive, reason: 'task_cap' }); } const toInsert = Array.from(toInsertPerGroup.values()).flat(); - const inserted: Task[] = []; + const tasks: Task[] = []; while (toInsert.length) { const chunk = toInsert.splice(0, TASKS_INSERT_BATCH_SIZE); const batch = await db.from(TASKS_TABLE).insert(chunk.map(DbTask.to)).returning('*'); - inserted.push(...batch.map(DbTask.from)); + tasks.push(...batch.map(DbTask.from)); } return Ok({ - tasks: inserted, + tasks, cappedGroupKeys: Array.from(cappedGroupCounts.keys()) }); } catch (err) { + if (isTasksUniqueNameViolation(err)) { + return Err(new DuplicateTaskNameError()); + } return Err(new Error(`Error creating tasks: ${stringifyError(err)}`)); } } +function isTasksUniqueNameViolation(err: unknown): boolean { + if (!err || typeof err !== 'object') { + return false; + } + + const error = err as { code?: string; constraint?: string; message?: string }; + return error.code === '23505' && error.constraint === 'tasks_unique_name'; +} + // Coalesce concurrent queueSizes queries for the same group keys. // When multiple immediate() calls target the same group key concurrently, // they share a single DB query instead of each running their own count. diff --git a/packages/scheduler/lib/scheduler.integration.test.ts b/packages/scheduler/lib/scheduler.integration.test.ts index d3d3783a89..9858e64e92 100644 --- a/packages/scheduler/lib/scheduler.integration.test.ts +++ b/packages/scheduler/lib/scheduler.integration.test.ts @@ -4,6 +4,7 @@ import { nanoid } from '@nangohq/utils'; import { getTestDbClient } from './db/helpers.test.js'; import { envs } from './env.js'; +import { isDuplicateTaskNameError } from './errors.js'; import { Scheduler } from './scheduler.js'; import type { TaskProps } from './models/tasks.js'; @@ -70,6 +71,32 @@ describe('Scheduler', () => { await immediate(scheduler); expect(callbacks.CREATED).toHaveBeenCalledOnce(); }); + it('should return a duplicate-name error when an immediate task already exists', async () => { + const name = `dup-${nanoid()}`; + const groupKey = nanoid(); + + await immediate(scheduler, { taskProps: { name, groupKey } }); + + const duplicate = await scheduler.immediate({ + name, + payload: {}, + groupKey, + groupMaxConcurrency: 0, + retryMax: 1, + retryCount: 0, + createdToStartedTimeoutSecs: 3600, + startedToCompletedTimeoutSecs: 3600, + heartbeatTimeoutSecs: 600, + ownerKey: null, + retryKey: null + }); + + expect(duplicate.isErr()).toBe(true); + if (duplicate.isErr()) { + expect(isDuplicateTaskNameError(duplicate.error)).toBe(true); + } + expect(callbacks.CREATED).toHaveBeenCalledOnce(); + }); it('should call callback when task is started', async () => { const task = await immediate(scheduler); (await scheduler.dequeue({ groupKeyPattern: task.groupKey, limit: 1 })).unwrap(); diff --git a/packages/types/lib/webhooks/dispatch.ts b/packages/types/lib/webhooks/dispatch.ts index b4dc9cfa50..5387f571ca 100644 --- a/packages/types/lib/webhooks/dispatch.ts +++ b/packages/types/lib/webhooks/dispatch.ts @@ -1,3 +1,5 @@ +import type { JsonValue } from 'type-fest'; + /** * SQS message envelope for the webhook task-dispatch queue. * @@ -21,7 +23,7 @@ export interface WebhookDispatchMessage { parentSyncName: string; /** Webhook subscription name matched on the inbound payload; passed to executeWebhook as args.webhookName. */ webhookName: string; - /** Activity log id created before enqueue; also reused as the taskName dedupe seed. */ + /** Activity log id created before enqueue; reused so redelivery of this published message keeps the same taskName. */ activityLogId: string; connection: { id: number; @@ -29,5 +31,5 @@ export interface WebhookDispatchMessage { provider_config_key: string; environment_id: number; }; - payload: unknown; + payload: JsonValue; } diff --git a/packages/utils/lib/telemetry/metrics.ts b/packages/utils/lib/telemetry/metrics.ts index 1bdf7adc02..033d17a3c2 100644 --- a/packages/utils/lib/telemetry/metrics.ts +++ b/packages/utils/lib/telemetry/metrics.ts @@ -73,8 +73,6 @@ export enum Types { WEBHOOK_DISPATCH_LARGE_FANOUT = 'nango.webhook.dispatch_queue.large_fanout', WEBHOOK_DISPATCH_CONSUME_SUCCESS = 'nango.webhook.dispatch_queue.consume.success', WEBHOOK_DISPATCH_CONSUME_FAILURE = 'nango.webhook.dispatch_queue.consume.failure', - WEBHOOK_DISPATCH_SCHEDULE_SUCCESS = 'nango.webhook.dispatch_queue.schedule.success', - WEBHOOK_DISPATCH_SCHEDULE_FAILURE = 'nango.webhook.dispatch_queue.schedule.failure', WEBHOOK_DISPATCH_POISON_PILL = 'nango.webhook.dispatch_queue.poison_pill', WEBHOOK_DISPATCH_DWELL_MS = 'nango.webhook.dispatch_queue.dwell_ms', From 340d53fc9dc09573b8c57e3d81fca219276a7f3b Mon Sep 17 00:00:00 2001 From: agusayerza Date: Thu, 30 Apr 2026 10:19:30 -0300 Subject: [PATCH 3/7] feat(webhooks): switch internal-nango to publish via dispatch queue (NAN-5340) Routes webhook execution through the SQS dispatch queue instead of calling the orchestrator directly. Isolates queue log context failures so they don't block execution, fixes metric dimensions, and reuses the shared webhook concurrency helper. --- .../lib/webhook/dispatch-queue/client.ts | 26 ++ .../dispatch-queue/client.unit.test.ts | 52 +++ .../lib/webhook/dispatch-queue/publisher.ts | 53 ++- .../dispatch-queue/publisher.unit.test.ts | 38 ++- packages/server/lib/webhook/internal-nango.ts | 317 +++++++++++++++++- .../lib/webhook/internal-nango.unit.test.ts | 300 +++++++++++++++++ 6 files changed, 762 insertions(+), 24 deletions(-) create mode 100644 packages/server/lib/webhook/dispatch-queue/client.ts create mode 100644 packages/server/lib/webhook/dispatch-queue/client.unit.test.ts create mode 100644 packages/server/lib/webhook/internal-nango.unit.test.ts diff --git a/packages/server/lib/webhook/dispatch-queue/client.ts b/packages/server/lib/webhook/dispatch-queue/client.ts new file mode 100644 index 0000000000..e63336c519 --- /dev/null +++ b/packages/server/lib/webhook/dispatch-queue/client.ts @@ -0,0 +1,26 @@ +import { SQSClient } from '@aws-sdk/client-sqs'; + +import { DispatchQueuePublisher } from './publisher.js'; +import { envs } from '../../env.js'; + +function buildPublisher(): DispatchQueuePublisher | null { + const url = envs.NANGO_TASK_DISPATCH_QUEUE_URL; + if (!url) { + return null; + } + if (new URL(url).pathname.replace(/\/$/, '').endsWith('.fifo')) { + throw new Error('Webhook dispatch queue must be a Standard SQS queue; FIFO queues would serialize environment traffic.'); + } + return new DispatchQueuePublisher({ + sqs: new SQSClient(envs.AWS_REGION ? { region: envs.AWS_REGION } : {}), + queueUrl: url, + batchSize: envs.NANGO_TASK_DISPATCH_PUBLISH_BATCH_SIZE, + publishConcurrency: envs.NANGO_TASK_DISPATCH_PUBLISH_CONCURRENCY + }); +} + +/** + * Singleton DispatchQueuePublisher, or null when `NANGO_TASK_DISPATCH_QUEUE_URL` + * is unset (self-hosted / local dev). Callers must check for null before use. + */ +export const dispatchQueuePublisher: DispatchQueuePublisher | null = buildPublisher(); diff --git a/packages/server/lib/webhook/dispatch-queue/client.unit.test.ts b/packages/server/lib/webhook/dispatch-queue/client.unit.test.ts new file mode 100644 index 0000000000..79bfeb5f1d --- /dev/null +++ b/packages/server/lib/webhook/dispatch-queue/client.unit.test.ts @@ -0,0 +1,52 @@ +import { afterEach, describe, expect, it, vi } from 'vitest'; + +describe('dispatchQueuePublisher', () => { + afterEach(() => { + vi.resetModules(); + vi.unmock('../../env.js'); + }); + + it('wires batch size and publish concurrency from envs', async () => { + vi.doMock('../../env.js', () => ({ + envs: { + AWS_REGION: 'eu-west-1', + NANGO_TASK_DISPATCH_QUEUE_URL: 'https://sqs.us-west-2.amazonaws.com/123456789012/nango-task-dispatch-development', + NANGO_TASK_DISPATCH_PUBLISH_BATCH_SIZE: 8, + NANGO_TASK_DISPATCH_PUBLISH_CONCURRENCY: 3 + } + })); + + const { dispatchQueuePublisher } = await import('./client.js'); + const publisher = dispatchQueuePublisher as any; + + expect(publisher).not.toBeNull(); + expect(publisher.batchSize).toBe(8); + expect(publisher.publishConcurrency).toBe(3); + }); + + it('rejects fifo queue urls', async () => { + vi.doMock('../../env.js', () => ({ + envs: { + AWS_REGION: 'eu-west-1', + NANGO_TASK_DISPATCH_QUEUE_URL: 'https://sqs.us-west-2.amazonaws.com/123456789012/nango-task-dispatch-development.fifo', + NANGO_TASK_DISPATCH_PUBLISH_BATCH_SIZE: 10, + NANGO_TASK_DISPATCH_PUBLISH_CONCURRENCY: 2 + } + })); + + await expect(import('./client.js')).rejects.toThrow('Webhook dispatch queue must be a Standard SQS queue'); + }); + + it('rejects fifo queue urls with trailing slash', async () => { + vi.doMock('../../env.js', () => ({ + envs: { + AWS_REGION: 'eu-west-1', + NANGO_TASK_DISPATCH_QUEUE_URL: 'https://sqs.us-west-2.amazonaws.com/123456789012/nango-task-dispatch-development.fifo/', + NANGO_TASK_DISPATCH_PUBLISH_BATCH_SIZE: 10, + NANGO_TASK_DISPATCH_PUBLISH_CONCURRENCY: 2 + } + })); + + await expect(import('./client.js')).rejects.toThrow('Webhook dispatch queue must be a Standard SQS queue'); + }); +}); diff --git a/packages/server/lib/webhook/dispatch-queue/publisher.ts b/packages/server/lib/webhook/dispatch-queue/publisher.ts index e70b28eb97..bce8b0ca3b 100644 --- a/packages/server/lib/webhook/dispatch-queue/publisher.ts +++ b/packages/server/lib/webhook/dispatch-queue/publisher.ts @@ -1,7 +1,7 @@ import { SendMessageBatchCommand } from '@aws-sdk/client-sqs'; import tracer from 'dd-trace'; -import { metrics } from '@nangohq/utils'; +import { metrics, report } from '@nangohq/utils'; import { runWithConcurrencyLimit } from '../runWithConcurrencyLimit.js'; @@ -23,6 +23,7 @@ export interface DispatchQueuePublisherProps { export interface PublishResult { enqueued: number; failed: number; + failedActivityLogIds: string[]; } interface BatchPublishResult extends PublishResult { @@ -61,7 +62,7 @@ export class DispatchQueuePublisher { */ async publish(messages: WebhookDispatchMessage[], messageGroupId: string): Promise { if (messages.length === 0) { - return { enqueued: 0, failed: 0 }; + return { enqueued: 0, failed: 0, failedActivityLogIds: [] }; } const activeSpan = tracer.scope().active(); @@ -88,6 +89,7 @@ export class DispatchQueuePublisher { const enqueued = results.reduce((sum, r) => sum + r.enqueued, 0); const failed = results.reduce((sum, r) => sum + r.failed, 0); + const failedActivityLogIds = results.flatMap((r) => r.failedActivityLogIds); const retriedEntries = results.reduce((sum, r) => sum + r.retriedEntries, 0); const retriedBatches = results.filter((r) => r.retriedEntries > 0).length; @@ -109,7 +111,7 @@ export class DispatchQueuePublisher { metrics.increment(metrics.Types.WEBHOOK_DISPATCH_PUBLISH_FAILURE, failed, { provider }); } - return { enqueued, failed }; + return { enqueued, failed, failedActivityLogIds }; } catch (err) { span.setTag('error', err); throw err; @@ -123,16 +125,44 @@ export class DispatchQueuePublisher { const entries = batch.map((message, idx) => toEntry(message, idx, messageGroupId)); const first = await this.trySend(entries); - const failedIndices = first.failedIds.map((id) => entryIdToIndex(id)); + const failedIndices = first.failedIds.flatMap((id) => { + const index = entryIdToIndex(id); + if (index === null) { + report(new Error('webhook_dispatch_invalid_failed_entry_id'), { entryId: id }); + return []; + } + + return [index]; + }); + const invalidFailedCount = first.failedIds.length - failedIndices.length; if (failedIndices.length === 0) { - return { enqueued: batch.length, failed: 0, retriedEntries: 0 }; + return { + enqueued: batch.length - invalidFailedCount, + failed: invalidFailedCount, + failedActivityLogIds: [], + retriedEntries: 0 + }; } const retryEntries = failedIndices.map((i) => entries[i]).filter((e): e is SendMessageBatchRequestEntry => e !== undefined); const second = await this.trySend(retryEntries); - const enqueued = batch.length - second.failedIds.length; - return { enqueued, failed: second.failedIds.length, retriedEntries: failedIndices.length }; + const enqueued = batch.length - invalidFailedCount - second.failedIds.length; + return { + enqueued, + failed: invalidFailedCount + second.failedIds.length, + failedActivityLogIds: second.failedIds.flatMap((id) => { + const index = entryIdToIndex(id); + if (index === null) { + report(new Error('webhook_dispatch_invalid_failed_entry_id'), { entryId: id }); + return []; + } + + const activityLogId = batch[index]?.activityLogId; + return activityLogId ? [activityLogId] : []; + }), + retriedEntries: failedIndices.length + }; } private async trySend(entries: SendMessageBatchRequestEntry[]): Promise<{ failedIds: string[] }> { @@ -160,8 +190,13 @@ function indexToEntryId(index: number): string { return `m${index}`; } -function entryIdToIndex(id: string): number { - return Number.parseInt(id.slice(1), 10); +function entryIdToIndex(id: string): number | null { + if (!id.startsWith('m')) { + return null; + } + + const index = Number.parseInt(id.slice(1), 10); + return Number.isNaN(index) ? null : index; } function chunk(items: T[], size: number): T[][] { diff --git a/packages/server/lib/webhook/dispatch-queue/publisher.unit.test.ts b/packages/server/lib/webhook/dispatch-queue/publisher.unit.test.ts index 1c16392137..3a0bcebe14 100644 --- a/packages/server/lib/webhook/dispatch-queue/publisher.unit.test.ts +++ b/packages/server/lib/webhook/dispatch-queue/publisher.unit.test.ts @@ -123,7 +123,7 @@ describe('DispatchQueuePublisher', () => { const { sqs, send } = makeSqsMock(() => ({ $metadata: {}, Successful: [], Failed: [] })); const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); const res = await publisher.publish([], 'account:1:env:2'); - expect(res).toEqual({ enqueued: 0, failed: 0 }); + expect(res).toEqual({ enqueued: 0, failed: 0, failedActivityLogIds: [] }); expect(send.mock.calls).toHaveLength(0); }); @@ -132,7 +132,7 @@ describe('DispatchQueuePublisher', () => { const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); const messages = Array.from({ length: 25 }, () => buildMessage()); const res = await publisher.publish(messages, 'account:1:env:2'); - expect(res).toEqual({ enqueued: 25, failed: 0 }); + expect(res).toEqual({ enqueued: 25, failed: 0, failedActivityLogIds: [] }); expect(send.mock.calls).toHaveLength(3); const entryCounts = send.mock.calls.map((call) => (call[0] as SendMessageBatchCommand).input.Entries?.length); expect(entryCounts).toEqual([10, 10, 5]); @@ -207,7 +207,7 @@ describe('DispatchQueuePublisher', () => { responses[2]!.resolve(successfulBatchResponse(send.mock.calls[2]![0] as SendMessageBatchCommand)); responses[3]!.resolve(successfulBatchResponse(send.mock.calls[3]![0] as SendMessageBatchCommand)); - await expect(publishPromise).resolves.toEqual({ enqueued: 4, failed: 0 }); + await expect(publishPromise).resolves.toEqual({ enqueued: 4, failed: 0, failedActivityLogIds: [] }); }); it('retries failed entries once and reports remaining failures', async () => { @@ -229,8 +229,11 @@ describe('DispatchQueuePublisher', () => { ]; const { sqs, send } = makeSqsMock((_n, call) => responses[call]!); const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); - const res = await publisher.publish([buildMessage(), buildMessage(), buildMessage()], 'account:1:env:2'); - expect(res).toEqual({ enqueued: 2, failed: 1 }); + const res = await publisher.publish( + [buildMessage(), buildMessage({ activityLogId: 'log-2' }), buildMessage({ activityLogId: 'log-3' })], + 'account:1:env:2' + ); + expect(res).toEqual({ enqueued: 2, failed: 1, failedActivityLogIds: ['log-3'] }); expect(send.mock.calls).toHaveLength(2); expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.enqueued', 2); expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.failed', 1); @@ -253,11 +256,34 @@ describe('DispatchQueuePublisher', () => { }); const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); const res = await publisher.publish([buildMessage()], 'account:1:env:2'); - expect(res).toEqual({ enqueued: 1, failed: 0 }); + expect(res).toEqual({ enqueued: 1, failed: 0, failedActivityLogIds: [] }); expect(send.mock.calls).toHaveLength(2); expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.retriedEntries', 1); expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.retriedBatches', 1); expect(tracerMocks.span.setTag.mock.calls.filter(([tag]) => tag === 'error')).toHaveLength(0); expect(tracerMocks.activeSpan.setTag).toHaveBeenCalledWith('sqs.send.error', expect.any(Error)); }); + + it('reports malformed failed entry ids without crashing retries', async () => { + const { sqs } = makeSqsMock((_n, call) => { + if (call === 0) { + return { + $metadata: {}, + Successful: [], + Failed: [{ Id: 'bogus', SenderFault: false, Code: 'InternalError', Message: 'boom' }] + }; + } + + return { + $metadata: {}, + Successful: [], + Failed: [{ Id: 'bogus', SenderFault: false, Code: 'InternalError', Message: 'still boom' }] + }; + }); + const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); + + const res = await publisher.publish([buildMessage()], 'account:1:env:2'); + + expect(res).toEqual({ enqueued: 0, failed: 1, failedActivityLogIds: [] }); + }); }); diff --git a/packages/server/lib/webhook/internal-nango.ts b/packages/server/lib/webhook/internal-nango.ts index f002c8f039..4223fad69f 100644 --- a/packages/server/lib/webhook/internal-nango.ts +++ b/packages/server/lib/webhook/internal-nango.ts @@ -1,13 +1,75 @@ +import { createHash } from 'node:crypto'; + import get from 'lodash-es/get.js'; -import { connectionService, getSyncConfigsByConfigIdForWebhook } from '@nangohq/shared'; +import { OtlpSpan } from '@nangohq/logs'; +import { NangoError, connectionService, getSyncConfigsByConfigIdForWebhook } from '@nangohq/shared'; +import { errorToObject, metrics, report } from '@nangohq/utils'; import { envs } from '../env.js'; +import { runWithConcurrencyLimit } from './runWithConcurrencyLimit.js'; import { getOrchestrator } from '../utils/utils.js'; +import { dispatchQueuePublisher } from './dispatch-queue/client.js'; +import type { DispatchQueuePublisher } from './dispatch-queue/publisher.js'; import type { LogContextGetter } from '@nangohq/logs'; import type { Config } from '@nangohq/shared'; -import type { ConnectionInternal, DBConnectionDecrypted, DBEnvironment, DBIntegrationDecrypted, DBPlan, DBTeam, Metadata } from '@nangohq/types'; +import type { ConnectionInternal, DBConnectionDecrypted, DBEnvironment, DBIntegrationDecrypted, DBPlan, DBSyncConfig, DBTeam, Metadata } from '@nangohq/types'; + +const LARGE_FANOUT_THRESHOLD = 200; +const LOG_CONTEXT_CREATE_CONCURRENCY = 25; + +interface MatchedExecution { + syncConfig: DBSyncConfig; + webhook: string; + connection: DBConnectionDecrypted | ConnectionInternal; +} + +interface QueuedExecution { + kind: 'queued'; + logCtx: Awaited>; + message: { + version: 1; + kind: 'webhook'; + taskName: string; + createdAt: string; + accountId: number; + integrationId: number; + provider: string; + parentSyncName: string; + activityLogId: string; + webhookName: string; + connection: { + id: number; + connection_id: string; + provider_config_key: string; + environment_id: number; + }; + payload: Record; + }; +} + +interface FailedQueuedExecution extends MatchedExecution { + kind: 'failed'; + error: unknown; +} + +function computeTaskName({ + environmentId, + providerConfigKey, + parentSyncName, + connectionId, + activityLogId +}: { + environmentId: number; + providerConfigKey: string; + parentSyncName: string; + connectionId: number; + activityLogId: string; +}): string { + const hash = createHash('sha256').update(`${environmentId}:${providerConfigKey}:${parentSyncName}:${connectionId}:${activityLogId}`).digest('hex'); + return `webhook:env:${environmentId}:connection:${connectionId}:${hash.slice(0, 32)}`; +} export class InternalNango { readonly team: DBTeam; @@ -108,11 +170,46 @@ export class InternalNango { // use webhookTypeValue if provided (direct value from headers), otherwise extract from body const type = webhookTypeValue || (webhookType ? get(body, webhookType) : undefined); + const publisher = envs.WEBHOOK_INGRESS_USE_DISPATCH_QUEUE ? dispatchQueuePublisher : null; + + if (publisher) { + await this.dispatchViaQueue({ + publisher, + connections, + syncConfigsWithWebhooks, + body, + type, + webhookHeaderValue + }); + } else { + await this.dispatchViaOrchestrator({ connections, syncConfigsWithWebhooks, body, type, webhookHeaderValue }); + } + + const connectionMetadata = connections.reduce>((acc, connection) => { + acc[connection.connection_id] = 'metadata' in connection ? connection.metadata : null; + return acc; + }, {}); + + return { connectionIds: connections.map((connection) => connection.connection_id), connectionMetadata }; + } + + private async dispatchViaOrchestrator({ + connections, + syncConfigsWithWebhooks, + body, + type, + webhookHeaderValue + }: { + connections: (DBConnectionDecrypted | ConnectionInternal)[]; + syncConfigsWithWebhooks: DBSyncConfig[]; + body: Record; + type: string | undefined; + webhookHeaderValue: string | undefined; + }): Promise { const orchestrator = getOrchestrator(); for (const syncConfig of syncConfigsWithWebhooks) { const { webhook_subscriptions } = syncConfig; - if (!webhook_subscriptions) { continue; } @@ -140,7 +237,6 @@ export class InternalNango { } triggered = true; - if (webhook === '*') { // Only trigger once since it will match all webhooks break; @@ -148,12 +244,215 @@ export class InternalNango { } } } + } - const connectionMetadata = connections.reduce>((acc, connection) => { - acc[connection.connection_id] = 'metadata' in connection ? connection.metadata : null; - return acc; - }, {}); + private async dispatchViaQueue({ + publisher, + connections, + syncConfigsWithWebhooks, + body, + type, + webhookHeaderValue + }: { + publisher: DispatchQueuePublisher; + connections: (DBConnectionDecrypted | ConnectionInternal)[]; + syncConfigsWithWebhooks: DBSyncConfig[]; + body: Record; + type: string | undefined; + webhookHeaderValue: string | undefined; + }): Promise { + const matchedExecutions: MatchedExecution[] = []; - return { connectionIds: connections.map((connection) => connection.connection_id), connectionMetadata }; + for (const syncConfig of syncConfigsWithWebhooks) { + const { webhook_subscriptions } = syncConfig; + if (!webhook_subscriptions) continue; + + let matched = false; + for (const webhook of webhook_subscriptions) { + if (matched) break; + + if (type === webhook || webhookHeaderValue === webhook || webhook === '*') { + for (const connection of connections) { + matchedExecutions.push({ syncConfig, webhook, connection }); + } + + matched = true; + } + } + } + + if (matchedExecutions.length === 0) return; + + const queuePreparationResults = await runWithConcurrencyLimit( + matchedExecutions, + LOG_CONTEXT_CREATE_CONCURRENCY, + async ({ syncConfig, webhook, connection }) => { + let logCtx: Awaited> | null = null; + + try { + // Create a log context per (syncConfig × connection × webhook) triple before publishing + // so the runner picks up the same activityLogId it would have in the direct-schedule path. + logCtx = await this.logContextGetter.create( + { operation: { type: 'webhook', action: 'incoming' }, expiresAt: new Date(Date.now() + 15 * 60 * 1000).toISOString() }, + { + account: this.team, + environment: this.environment, + integration: { id: this.integration.id!, name: this.integration.unique_key, provider: this.integration.provider }, + connection: { id: connection.id, name: connection.connection_id }, + syncConfig: { id: syncConfig.id, name: syncConfig.sync_name } + } + ); + logCtx.attachSpan(new OtlpSpan(logCtx.operation)); + + return { + kind: 'queued' as const, + logCtx, + message: { + version: 1 as const, + kind: 'webhook' as const, + taskName: computeTaskName({ + environmentId: this.environment.id, + providerConfigKey: this.integration.unique_key, + parentSyncName: syncConfig.sync_name, + connectionId: connection.id, + activityLogId: logCtx.id + }), + createdAt: new Date().toISOString(), + accountId: this.team.id, + integrationId: this.integration.id!, + provider: this.integration.provider, + parentSyncName: syncConfig.sync_name, + activityLogId: logCtx.id, + webhookName: webhook, + connection: { + id: connection.id, + connection_id: connection.connection_id, + provider_config_key: connection.provider_config_key, + environment_id: connection.environment_id + }, + payload: body + } + }; + } catch (err) { + if (logCtx) { + const failedLogCtx = logCtx; + const formattedError = err instanceof NangoError ? err : new NangoError('webhook_failure', { error: errorToObject(err) }); + + for (const operation of [ + async () => { + await failedLogCtx.error('The webhook failed during queue preparation', { + error: err, + webhook, + connection: connection.connection_id, + integration: connection.provider_config_key + }); + }, + async () => { + await failedLogCtx.enrichOperation({ error: formattedError }); + }, + async () => { + await failedLogCtx.failed(); + } + ]) { + try { + await operation(); + } catch (logCtxErr) { + report(logCtxErr, { + error: 'The webhook queue preparation failure could not be written to the log context', + activityLogId: failedLogCtx.id, + provider: this.integration.provider, + accountId: this.team.id, + environmentId: this.environment.id, + syncConfigId: syncConfig.id, + syncName: syncConfig.sync_name, + webhook, + connectionId: connection.id, + connection: connection.connection_id, + integration: connection.provider_config_key + }); + } + } + } + + return { + kind: 'failed' as const, + error: err, + syncConfig, + webhook, + connection + }; + } + } + ); + + const queuedExecutions = queuePreparationResults.filter((result): result is QueuedExecution => result.kind === 'queued'); + const failedQueuedExecutions = queuePreparationResults.filter((result): result is FailedQueuedExecution => result.kind === 'failed'); + + for (const { error, syncConfig, webhook, connection } of failedQueuedExecutions) { + report(error, { + error: 'The webhook could not be prepared for queue dispatch', + provider: this.integration.provider, + accountId: this.team.id, + environmentId: this.environment.id, + syncConfigId: syncConfig.id, + syncName: syncConfig.sync_name, + webhook, + connectionId: connection.id, + connection: connection.connection_id, + integration: connection.provider_config_key + }); + } + + if (queuedExecutions.length === 0) { + return; + } + + const messages = queuedExecutions.map(({ message }) => message); + + if (matchedExecutions.length > LARGE_FANOUT_THRESHOLD) { + metrics.increment(metrics.Types.WEBHOOK_DISPATCH_LARGE_FANOUT, 1, { + provider: this.integration.provider, + accountId: this.team.id, + environmentId: this.environment.id + }); + } + + const messageGroupId = `account:${this.team.id}:env:${this.environment.id}`; + const publishResult = await publisher.publish(messages, messageGroupId); + const failedActivityLogIds = new Set(publishResult.failedActivityLogIds); + const unmappedFailureCount = publishResult.failed - failedActivityLogIds.size; + + for (const { message, logCtx } of queuedExecutions) { + if (!failedActivityLogIds.has(message.activityLogId) && unmappedFailureCount === 0) { + void logCtx.info('The webhook was successfully queued for execution', { + action: message.webhookName, + connection: message.connection.connection_id, + integration: message.connection.provider_config_key + }); + continue; + } + + const error = new NangoError('webhook_failure', { + error: 'The webhook could not be queued for execution', + taskName: message.taskName + }); + + await logCtx.error('The webhook failed to queue for execution', { + error, + webhook: message.webhookName, + connection: message.connection.connection_id, + integration: message.connection.provider_config_key + }); + await logCtx.enrichOperation({ error }); + await logCtx.failed(); + } + + if (unmappedFailureCount > 0) { + report(new Error('webhook_dispatch_fanout_unmapped_failures'), { + unmappedFailureCount, + accountId: this.team.id, + environmentId: this.environment.id + }); + } } } diff --git a/packages/server/lib/webhook/internal-nango.unit.test.ts b/packages/server/lib/webhook/internal-nango.unit.test.ts new file mode 100644 index 0000000000..1ec4460dde --- /dev/null +++ b/packages/server/lib/webhook/internal-nango.unit.test.ts @@ -0,0 +1,300 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +const mocks = vi.hoisted(() => { + return { + envs: { + WEBHOOK_INGRESS_USE_DISPATCH_QUEUE: true, + WEBHOOK_ENVIRONMENT_MAX_CONCURRENCY: 7 + }, + dispatchQueueClient: { dispatchQueuePublisher: null as any }, + triggerWebhook: vi.fn(), + report: vi.fn(), + getConnectionsByEnvironmentAndConfig: vi.fn(), + getSyncConfigsByConfigIdForWebhook: vi.fn() + }; +}); + +vi.mock('../env.js', () => ({ envs: mocks.envs })); +vi.mock('./dispatch-queue/client.js', () => mocks.dispatchQueueClient); +vi.mock('../utils/utils.js', () => ({ getOrchestrator: () => ({ triggerWebhook: mocks.triggerWebhook }) })); +vi.mock('@nangohq/utils', async (importOriginal) => { + const actual = await importOriginal(); + + return { + ...(actual as object), + report: mocks.report + }; +}); +vi.mock('@nangohq/shared', () => { + class NangoError extends Error { + public payload: Record; + + constructor(type: string, payload: Record) { + super(type); + this.name = type; + this.payload = payload; + } + } + + return { + NangoError, + connectionService: { + getConnectionsByEnvironmentAndConfig: mocks.getConnectionsByEnvironmentAndConfig + }, + getSyncConfigsByConfigIdForWebhook: mocks.getSyncConfigsByConfigIdForWebhook + }; +}); + +import { InternalNango } from './internal-nango.js'; + +function createLogCtx(id: string) { + return { + id, + operation: { id }, + attachSpan: vi.fn(), + info: vi.fn().mockResolvedValue(true), + error: vi.fn().mockResolvedValue(true), + enrichOperation: vi.fn().mockResolvedValue(undefined), + failed: vi.fn().mockResolvedValue(undefined) + }; +} + +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +function makeInternalNango(logContexts: ReturnType[], logContextGetterOverride?: any) { + const logContextGetter = + logContextGetterOverride ?? + ({ + create: vi.fn().mockImplementation(() => { + const next = logContexts.shift(); + if (!next) { + throw new Error('Missing log context'); + } + return next; + }) + } as any); + + return { + nango: new InternalNango({ + team: { id: 1 } as any, + environment: { id: 2 } as any, + plan: undefined, + integration: { id: 3, unique_key: 'github-dev', provider: 'github' } as any, + logContextGetter + }), + logContextGetter + }; +} + +describe('InternalNango queue dispatch', () => { + beforeEach(() => { + vi.clearAllMocks(); + mocks.envs.WEBHOOK_INGRESS_USE_DISPATCH_QUEUE = true; + mocks.dispatchQueueClient.dispatchQueuePublisher = null; + mocks.getConnectionsByEnvironmentAndConfig.mockResolvedValue([ + { id: 11, connection_id: 'conn-1', provider_config_key: 'github-dev', environment_id: 2, metadata: null }, + { id: 12, connection_id: 'conn-2', provider_config_key: 'github-dev', environment_id: 2, metadata: null } + ]); + mocks.getSyncConfigsByConfigIdForWebhook.mockResolvedValue([{ id: 21, sync_name: 'sync-1', webhook_subscriptions: ['push'] }]); + mocks.triggerWebhook.mockResolvedValue(undefined); + }); + + it('logs queued successes and marks failed publishes as failed operations', async () => { + const publisher = { + publish: vi.fn().mockResolvedValue({ enqueued: 1, failed: 1, failedActivityLogIds: ['log-2'] }) + }; + mocks.dispatchQueueClient.dispatchQueuePublisher = publisher; + + const logCtx1 = createLogCtx('log-1'); + const logCtx2 = createLogCtx('log-2'); + const { nango } = makeInternalNango([logCtx1, logCtx2]); + + const result = await nango.executeScriptForWebhooks({ body: { event: 'x' }, webhookTypeValue: 'push' }); + + expect(result.connectionIds).toEqual(['conn-1', 'conn-2']); + expect(publisher.publish).toHaveBeenCalledTimes(1); + expect(logCtx1.info).toHaveBeenCalledWith('The webhook was successfully queued for execution', { + action: 'push', + connection: 'conn-1', + integration: 'github-dev' + }); + expect(logCtx2.error).toHaveBeenCalledWith('The webhook failed to queue for execution', { + error: expect.any(Error), + webhook: 'push', + connection: 'conn-2', + integration: 'github-dev' + }); + expect(logCtx2.enrichOperation).toHaveBeenCalledWith({ error: expect.any(Error) }); + expect(logCtx2.failed).toHaveBeenCalledOnce(); + }); + + it('falls back to direct orchestrator dispatch when the feature flag is off', async () => { + mocks.envs.WEBHOOK_INGRESS_USE_DISPATCH_QUEUE = false; + const publisher = { publish: vi.fn() }; + mocks.dispatchQueueClient.dispatchQueuePublisher = publisher; + + const { nango, logContextGetter } = makeInternalNango([]); + + const result = await nango.executeScriptForWebhooks({ body: { event: 'x' }, webhookTypeValue: 'push' }); + + expect(result.connectionIds).toEqual(['conn-1', 'conn-2']); + expect(mocks.triggerWebhook).toHaveBeenCalledTimes(2); + expect(logContextGetter.create).not.toHaveBeenCalled(); + expect(publisher.publish).not.toHaveBeenCalled(); + }); + + it('creates log contexts concurrently before publishing queue messages', async () => { + const publisher = { + publish: vi.fn().mockResolvedValue({ enqueued: 2, failed: 0, failedActivityLogIds: [] }) + }; + mocks.dispatchQueueClient.dispatchQueuePublisher = publisher; + + const blockers = [deferred(), deferred()]; + const contexts = [createLogCtx('log-1'), createLogCtx('log-2')]; + let createIndex = 0; + let inFlight = 0; + let maxInFlight = 0; + const logContextGetter = { + create: vi.fn().mockImplementation(async () => { + const index = createIndex++; + inFlight += 1; + maxInFlight = Math.max(maxInFlight, inFlight); + await blockers[index]!.promise; + inFlight -= 1; + return contexts[index]!; + }) + } as any; + const { nango } = makeInternalNango([], logContextGetter); + + const execution = nango.executeScriptForWebhooks({ body: { event: 'x' }, webhookTypeValue: 'push' }); + + await vi.waitFor(() => { + expect(logContextGetter.create).toHaveBeenCalledTimes(2); + }); + expect(maxInFlight).toBe(2); + + blockers[0]!.resolve(); + blockers[1]!.resolve(); + + const result = await execution; + + expect(result.connectionIds).toEqual(['conn-1', 'conn-2']); + expect(publisher.publish).toHaveBeenCalledOnce(); + }); + + it('publishes successful executions even when one log context creation fails', async () => { + const publisher = { + publish: vi.fn().mockResolvedValue({ enqueued: 1, failed: 0, failedActivityLogIds: [] }) + }; + mocks.dispatchQueueClient.dispatchQueuePublisher = publisher; + + const logCtx = createLogCtx('log-2'); + const logContextGetter = { + create: vi.fn().mockRejectedValueOnce(new Error('transient db failure')).mockResolvedValueOnce(logCtx) + } as any; + const { nango } = makeInternalNango([], logContextGetter); + + const result = await nango.executeScriptForWebhooks({ body: { event: 'x' }, webhookTypeValue: 'push' }); + + expect(result.connectionIds).toEqual(['conn-1', 'conn-2']); + expect(publisher.publish).toHaveBeenCalledTimes(1); + expect(publisher.publish).toHaveBeenCalledWith( + [ + expect.objectContaining({ + activityLogId: 'log-2', + webhookName: 'push', + connection: expect.objectContaining({ connection_id: 'conn-2' }) + }) + ], + 'account:1:env:2' + ); + expect(logCtx.info).toHaveBeenCalledWith('The webhook was successfully queued for execution', { + action: 'push', + connection: 'conn-2', + integration: 'github-dev' + }); + expect(mocks.report).toHaveBeenCalledWith(expect.any(Error), { + error: 'The webhook could not be prepared for queue dispatch', + provider: 'github', + accountId: 1, + environmentId: 2, + syncConfigId: 21, + syncName: 'sync-1', + webhook: 'push', + connectionId: 11, + connection: 'conn-1', + integration: 'github-dev' + }); + }); + + it('fails the log context when queue preparation fails after creation', async () => { + const publisher = { + publish: vi.fn().mockResolvedValue({ enqueued: 1, failed: 0, failedActivityLogIds: [] }) + }; + mocks.dispatchQueueClient.dispatchQueuePublisher = publisher; + + const failedLogCtx = createLogCtx('log-1'); + failedLogCtx.attachSpan.mockImplementation(() => { + throw new Error('attach span failed'); + }); + const successfulLogCtx = createLogCtx('log-2'); + const { nango } = makeInternalNango([failedLogCtx, successfulLogCtx]); + + const result = await nango.executeScriptForWebhooks({ body: { event: 'x' }, webhookTypeValue: 'push' }); + + expect(result.connectionIds).toEqual(['conn-1', 'conn-2']); + expect(publisher.publish).toHaveBeenCalledWith( + [ + expect.objectContaining({ + activityLogId: 'log-2', + connection: expect.objectContaining({ connection_id: 'conn-2' }) + }) + ], + 'account:1:env:2' + ); + expect(failedLogCtx.error).toHaveBeenCalledWith('The webhook failed during queue preparation', { + error: expect.any(Error), + webhook: 'push', + connection: 'conn-1', + integration: 'github-dev' + }); + expect(failedLogCtx.enrichOperation).toHaveBeenCalledWith({ error: expect.any(Error) }); + expect(failedLogCtx.failed).toHaveBeenCalledOnce(); + expect(successfulLogCtx.info).toHaveBeenCalledWith('The webhook was successfully queued for execution', { + action: 'push', + connection: 'conn-2', + integration: 'github-dev' + }); + }); + + it('marks all executions as failed and reports when publish has unmapped failures', async () => { + const publisher = { + publish: vi.fn().mockResolvedValue({ enqueued: 0, failed: 2, failedActivityLogIds: [] }) + }; + mocks.dispatchQueueClient.dispatchQueuePublisher = publisher; + + const logCtx1 = createLogCtx('log-1'); + const logCtx2 = createLogCtx('log-2'); + const { nango } = makeInternalNango([logCtx1, logCtx2]); + + await nango.executeScriptForWebhooks({ body: { event: 'x' }, webhookTypeValue: 'push' }); + + expect(logCtx1.info).not.toHaveBeenCalled(); + expect(logCtx2.info).not.toHaveBeenCalled(); + expect(logCtx1.failed).toHaveBeenCalledOnce(); + expect(logCtx2.failed).toHaveBeenCalledOnce(); + expect(mocks.report).toHaveBeenCalledWith(expect.any(Error), { + unmappedFailureCount: 2, + accountId: 1, + environmentId: 2 + }); + }); +}); From 65d368468bda8c4888d0c95158df72cd62e079dc Mon Sep 17 00:00:00 2001 From: agusayerza Date: Thu, 30 Apr 2026 11:43:14 -0300 Subject: [PATCH 4/7] Remove awaiting operation array --- packages/server/lib/webhook/internal-nango.ts | 43 ++++--------------- 1 file changed, 8 insertions(+), 35 deletions(-) diff --git a/packages/server/lib/webhook/internal-nango.ts b/packages/server/lib/webhook/internal-nango.ts index 4223fad69f..1e23e1373d 100644 --- a/packages/server/lib/webhook/internal-nango.ts +++ b/packages/server/lib/webhook/internal-nango.ts @@ -335,43 +335,16 @@ export class InternalNango { }; } catch (err) { if (logCtx) { - const failedLogCtx = logCtx; const formattedError = err instanceof NangoError ? err : new NangoError('webhook_failure', { error: errorToObject(err) }); - for (const operation of [ - async () => { - await failedLogCtx.error('The webhook failed during queue preparation', { - error: err, - webhook, - connection: connection.connection_id, - integration: connection.provider_config_key - }); - }, - async () => { - await failedLogCtx.enrichOperation({ error: formattedError }); - }, - async () => { - await failedLogCtx.failed(); - } - ]) { - try { - await operation(); - } catch (logCtxErr) { - report(logCtxErr, { - error: 'The webhook queue preparation failure could not be written to the log context', - activityLogId: failedLogCtx.id, - provider: this.integration.provider, - accountId: this.team.id, - environmentId: this.environment.id, - syncConfigId: syncConfig.id, - syncName: syncConfig.sync_name, - webhook, - connectionId: connection.id, - connection: connection.connection_id, - integration: connection.provider_config_key - }); - } - } + await logCtx.error('The webhook failed during queue preparation', { + error: err, + webhook, + connection: connection.connection_id, + integration: connection.provider_config_key + }); + await logCtx.enrichOperation({ error: formattedError }); + await logCtx.failed(); } return { From 76ec8f47ede485b3de3159b84072e44f39486f46 Mon Sep 17 00:00:00 2001 From: agusayerza Date: Thu, 30 Apr 2026 10:19:51 -0300 Subject: [PATCH 5/7] fix(webhooks): bypass SQS for oversize dispatch payloads (NAN-5403) Falls back to direct orchestrator dispatch when a webhook payload exceeds SQS's 1MB limit. Extracts dispatchExecutionsViaOrchestrator from InternalNango into a dedicated method to support both code paths cleanly. --- .../lib/webhook/dispatch-queue/publisher.ts | 44 +++- .../dispatch-queue/publisher.unit.test.ts | 78 ++++++- packages/server/lib/webhook/internal-nango.ts | 214 +++++++++++------- .../lib/webhook/internal-nango.unit.test.ts | 81 ++++++- packages/shared/lib/clients/orchestrator.ts | 33 +-- packages/utils/lib/telemetry/metrics.ts | 1 + 6 files changed, 315 insertions(+), 136 deletions(-) diff --git a/packages/server/lib/webhook/dispatch-queue/publisher.ts b/packages/server/lib/webhook/dispatch-queue/publisher.ts index bce8b0ca3b..ccc8c5b339 100644 --- a/packages/server/lib/webhook/dispatch-queue/publisher.ts +++ b/packages/server/lib/webhook/dispatch-queue/publisher.ts @@ -10,6 +10,7 @@ import type { WebhookDispatchMessage } from '@nangohq/types'; const SQS_BATCH_MAX_ENTRIES = 10; const DEFAULT_PUBLISH_CONCURRENCY = 10; +const SQS_BATCH_MAX_BYTES = 1_048_576; export interface DispatchQueuePublisherProps { sqs: SQSClient; @@ -26,6 +27,11 @@ export interface PublishResult { failedActivityLogIds: string[]; } +export interface PreparedDispatchMessage { + message: WebhookDispatchMessage; + byteSize: number; +} + interface BatchPublishResult extends PublishResult { retriedEntries: number; } @@ -60,13 +66,13 @@ export class DispatchQueuePublisher { * caller can treat them as a metric/trace concern, not an HTTP 500 (provider retries * are worse). */ - async publish(messages: WebhookDispatchMessage[], messageGroupId: string): Promise { + async publish(messages: PreparedDispatchMessage[], messageGroupId: string): Promise { if (messages.length === 0) { return { enqueued: 0, failed: 0, failedActivityLogIds: [] }; } const activeSpan = tracer.scope().active(); - const firstMessage = messages[0]!; + const firstMessage = messages[0]!.message; const batches = chunk(messages, this.batchSize); const span = tracer.startSpan('webhook.dispatch.publish', { ...(activeSpan ? { childOf: activeSpan } : {}), @@ -121,7 +127,7 @@ export class DispatchQueuePublisher { }); } - private async sendBatch(batch: WebhookDispatchMessage[], messageGroupId: string): Promise { + private async sendBatch(batch: PreparedDispatchMessage[], messageGroupId: string): Promise { const entries = batch.map((message, idx) => toEntry(message, idx, messageGroupId)); const first = await this.trySend(entries); @@ -158,7 +164,7 @@ export class DispatchQueuePublisher { return []; } - const activityLogId = batch[index]?.activityLogId; + const activityLogId = batch[index]?.message.activityLogId; return activityLogId ? [activityLogId] : []; }), retriedEntries: failedIndices.length @@ -178,10 +184,10 @@ export class DispatchQueuePublisher { } } -function toEntry(message: WebhookDispatchMessage, index: number, messageGroupId: string): SendMessageBatchRequestEntry { +function toEntry(message: PreparedDispatchMessage, index: number, messageGroupId: string): SendMessageBatchRequestEntry { return { Id: indexToEntryId(index), - MessageBody: JSON.stringify(message), + MessageBody: JSON.stringify(message.message), MessageGroupId: messageGroupId }; } @@ -199,10 +205,28 @@ function entryIdToIndex(id: string): number | null { return Number.isNaN(index) ? null : index; } -function chunk(items: T[], size: number): T[][] { - const chunks: T[][] = []; - for (let i = 0; i < items.length; i += size) { - chunks.push(items.slice(i, i + size)); +function chunk(items: PreparedDispatchMessage[], size: number): PreparedDispatchMessage[][] { + const chunks: PreparedDispatchMessage[][] = []; + let currentChunk: PreparedDispatchMessage[] = []; + let currentChunkBytes = 0; + + for (const item of items) { + const exceedsBatchSize = currentChunk.length >= size; + const exceedsBatchBytes = currentChunkBytes + item.byteSize > SQS_BATCH_MAX_BYTES; + + if (currentChunk.length > 0 && (exceedsBatchSize || exceedsBatchBytes)) { + chunks.push(currentChunk); + currentChunk = []; + currentChunkBytes = 0; + } + + currentChunk.push(item); + currentChunkBytes += item.byteSize; + } + + if (currentChunk.length > 0) { + chunks.push(currentChunk); } + return chunks; } diff --git a/packages/server/lib/webhook/dispatch-queue/publisher.unit.test.ts b/packages/server/lib/webhook/dispatch-queue/publisher.unit.test.ts index 3a0bcebe14..e2fd759543 100644 --- a/packages/server/lib/webhook/dispatch-queue/publisher.unit.test.ts +++ b/packages/server/lib/webhook/dispatch-queue/publisher.unit.test.ts @@ -27,6 +27,10 @@ const tracerMocks = vi.hoisted(() => { }; }); +const utilsMocks = vi.hoisted(() => ({ + report: vi.fn() +})); + vi.mock('dd-trace', () => { return { default: { @@ -37,8 +41,29 @@ vi.mock('dd-trace', () => { }; }); +vi.mock('@nangohq/utils', async (importOriginal) => { + const actual = await importOriginal(); + + if (!actual || typeof actual !== 'object') { + throw new Error('Invalid @nangohq/utils mock'); + } + + return { + ...actual, + report: utilsMocks.report, + metrics: { + Types: { + WEBHOOK_DISPATCH_PUBLISH_SUCCESS: 'nango.webhook.dispatch_queue.publish.success', + WEBHOOK_DISPATCH_PUBLISH_FAILURE: 'nango.webhook.dispatch_queue.publish.failure' + }, + increment: tracerMocks.dogstatsd.increment + } + }; +}); + import { DispatchQueuePublisher } from './publisher.js'; +import type { PreparedDispatchMessage } from './publisher.js'; import type { SQSClient, SendMessageBatchCommandOutput } from '@aws-sdk/client-sqs'; import type { WebhookDispatchMessage } from '@nangohq/types'; @@ -60,6 +85,21 @@ function buildMessage(overrides: Partial = {}): WebhookD }; } +function buildPreparedMessage({ + messageOverrides, + byteSize +}: { + messageOverrides?: Partial; + byteSize?: number; +} = {}): PreparedDispatchMessage { + const message = buildMessage(messageOverrides); + + return { + message, + byteSize: byteSize ?? Buffer.byteLength(JSON.stringify(message), 'utf8') + }; +} + function makeSqsMock( responder: (command: SendMessageBatchCommand, callIndex: number) => SendMessageBatchCommandOutput | Promise ): { sqs: SQSClient; send: ReturnType } { @@ -95,6 +135,7 @@ function deferred() { describe('DispatchQueuePublisher', () => { beforeEach(() => { vi.restoreAllMocks(); + utilsMocks.report.mockClear(); tracerMocks.active.mockClear(); tracerMocks.activate.mockClear(); tracerMocks.startSpan.mockClear(); @@ -130,7 +171,7 @@ describe('DispatchQueuePublisher', () => { it('chunks into batches of <=10 for a 25-message input', async () => { const { sqs, send } = makeSqsMock((cmd) => successfulBatchResponse(cmd)); const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); - const messages = Array.from({ length: 25 }, () => buildMessage()); + const messages = Array.from({ length: 25 }, () => buildPreparedMessage()); const res = await publisher.publish(messages, 'account:1:env:2'); expect(res).toEqual({ enqueued: 25, failed: 0, failedActivityLogIds: [] }); expect(send.mock.calls).toHaveLength(3); @@ -161,15 +202,36 @@ describe('DispatchQueuePublisher', () => { it('sets MessageGroupId on every entry', async () => { const groupId = 'account:7:env:9'; const seen: string[] = []; + const bodies: string[] = []; const { sqs } = makeSqsMock((cmd) => { for (const entry of cmd.input.Entries ?? []) { if (entry.MessageGroupId) seen.push(entry.MessageGroupId); + if (entry.MessageBody) bodies.push(entry.MessageBody); } return successfulBatchResponse(cmd); }); const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); - await publisher.publish([buildMessage(), buildMessage()], groupId); + const first = buildPreparedMessage(); + const second = buildPreparedMessage({ messageOverrides: { activityLogId: 'log-2' } }); + + await publisher.publish([first, second], groupId); expect(seen).toEqual([groupId, groupId]); + expect(bodies).toEqual([JSON.stringify(first.message), JSON.stringify(second.message)]); + }); + + it('splits batches when cumulative bytes exceed the SQS request limit', async () => { + const { sqs, send } = makeSqsMock((cmd) => successfulBatchResponse(cmd)); + const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); + + const res = await publisher.publish( + [buildPreparedMessage({ byteSize: 600_000 }), buildPreparedMessage({ byteSize: 300_000 }), buildPreparedMessage({ byteSize: 300_000 })], + 'account:1:env:2' + ); + + expect(res).toEqual({ enqueued: 3, failed: 0, failedActivityLogIds: [] }); + expect(send.mock.calls).toHaveLength(2); + const entryCounts = send.mock.calls.map((call) => (call[0] as SendMessageBatchCommand).input.Entries?.length); + expect(entryCounts).toEqual([2, 1]); }); it('limits concurrent batch publishes to publishConcurrency', async () => { @@ -187,7 +249,7 @@ describe('DispatchQueuePublisher', () => { const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q', batchSize: 1, publishConcurrency: 2 }); const publishPromise = publisher.publish( - Array.from({ length: 4 }, () => buildMessage()), + Array.from({ length: 4 }, () => buildPreparedMessage()), 'account:1:env:2' ); @@ -230,7 +292,11 @@ describe('DispatchQueuePublisher', () => { const { sqs, send } = makeSqsMock((_n, call) => responses[call]!); const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); const res = await publisher.publish( - [buildMessage(), buildMessage({ activityLogId: 'log-2' }), buildMessage({ activityLogId: 'log-3' })], + [ + buildPreparedMessage(), + buildPreparedMessage({ messageOverrides: { activityLogId: 'log-2' } }), + buildPreparedMessage({ messageOverrides: { activityLogId: 'log-3' } }) + ], 'account:1:env:2' ); expect(res).toEqual({ enqueued: 2, failed: 1, failedActivityLogIds: ['log-3'] }); @@ -255,7 +321,7 @@ describe('DispatchQueuePublisher', () => { return { $metadata: {}, Successful: [{ Id: 'm0', MessageId: 'ok', MD5OfMessageBody: 'x' }], Failed: [] }; }); const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); - const res = await publisher.publish([buildMessage()], 'account:1:env:2'); + const res = await publisher.publish([buildPreparedMessage()], 'account:1:env:2'); expect(res).toEqual({ enqueued: 1, failed: 0, failedActivityLogIds: [] }); expect(send.mock.calls).toHaveLength(2); expect(tracerMocks.span.setTag).toHaveBeenCalledWith('nango.retriedEntries', 1); @@ -282,7 +348,7 @@ describe('DispatchQueuePublisher', () => { }); const publisher = new DispatchQueuePublisher({ sqs, queueUrl: 'http://q' }); - const res = await publisher.publish([buildMessage()], 'account:1:env:2'); + const res = await publisher.publish([buildPreparedMessage()], 'account:1:env:2'); expect(res).toEqual({ enqueued: 0, failed: 1, failedActivityLogIds: [] }); }); diff --git a/packages/server/lib/webhook/internal-nango.ts b/packages/server/lib/webhook/internal-nango.ts index 1e23e1373d..375104f8c9 100644 --- a/packages/server/lib/webhook/internal-nango.ts +++ b/packages/server/lib/webhook/internal-nango.ts @@ -11,13 +11,23 @@ import { runWithConcurrencyLimit } from './runWithConcurrencyLimit.js'; import { getOrchestrator } from '../utils/utils.js'; import { dispatchQueuePublisher } from './dispatch-queue/client.js'; -import type { DispatchQueuePublisher } from './dispatch-queue/publisher.js'; -import type { LogContextGetter } from '@nangohq/logs'; -import type { Config } from '@nangohq/shared'; -import type { ConnectionInternal, DBConnectionDecrypted, DBEnvironment, DBIntegrationDecrypted, DBPlan, DBSyncConfig, DBTeam, Metadata } from '@nangohq/types'; +import type { DispatchQueuePublisher, PreparedDispatchMessage } from './dispatch-queue/publisher.js'; +import type { LogContext, LogContextGetter } from '@nangohq/logs'; +import type { + ConnectionInternal, + DBConnectionDecrypted, + DBEnvironment, + DBIntegrationDecrypted, + DBPlan, + DBSyncConfig, + DBTeam, + Metadata, + WebhookDispatchMessage +} from '@nangohq/types'; const LARGE_FANOUT_THRESHOLD = 200; const LOG_CONTEXT_CREATE_CONCURRENCY = 25; +const SQS_MAX_MESSAGE_BODY_BYTES = 1_048_576; interface MatchedExecution { syncConfig: DBSyncConfig; @@ -26,27 +36,12 @@ interface MatchedExecution { } interface QueuedExecution { + syncConfig: DBSyncConfig; + webhook: string; + connection: DBConnectionDecrypted | ConnectionInternal; kind: 'queued'; logCtx: Awaited>; - message: { - version: 1; - kind: 'webhook'; - taskName: string; - createdAt: string; - accountId: number; - integrationId: number; - provider: string; - parentSyncName: string; - activityLogId: string; - webhookName: string; - connection: { - id: number; - connection_id: string; - provider_config_key: string; - environment_id: number; - }; - payload: Record; - }; + preparedMessage: PreparedDispatchMessage; } interface FailedQueuedExecution extends MatchedExecution { @@ -206,7 +201,7 @@ export class InternalNango { type: string | undefined; webhookHeaderValue: string | undefined; }): Promise { - const orchestrator = getOrchestrator(); + const executions: { syncConfig: DBSyncConfig; webhook: string; connection: DBConnectionDecrypted | ConnectionInternal; logCtx: LogContext }[] = []; for (const syncConfig of syncConfigsWithWebhooks) { const { webhook_subscriptions } = syncConfig; @@ -223,17 +218,22 @@ export class InternalNango { if (type === webhook || webhookHeaderValue === webhook || webhook === '*') { for (const connection of connections) { - await orchestrator.triggerWebhook({ - account: this.team, - environment: this.environment, - integration: this.integration as Config, - connection, - webhookName: webhook, - syncConfig, - input: body, - maxConcurrency: envs.WEBHOOK_ENVIRONMENT_MAX_CONCURRENCY, - logContextGetter: this.logContextGetter - }); + try { + const logCtx = await this.logContextGetter.create( + { operation: { type: 'webhook', action: 'incoming' }, expiresAt: new Date(Date.now() + 15 * 60 * 1000).toISOString() }, + { + account: this.team, + environment: this.environment, + integration: { id: this.integration.id!, name: this.integration.unique_key, provider: this.integration.provider }, + connection: { id: connection.id, name: connection.connection_id }, + syncConfig: { id: syncConfig.id, name: syncConfig.sync_name } + } + ); + logCtx.attachSpan(new OtlpSpan(logCtx.operation)); + executions.push({ syncConfig, webhook, connection, logCtx }); + } catch (err) { + report(err, { context: 'dispatchViaOrchestrator log context creation failed' }); + } } triggered = true; @@ -244,6 +244,25 @@ export class InternalNango { } } } + + await this.dispatchExecutionsViaOrchestrator(executions, body); + } + + private async dispatchExecutionsViaOrchestrator( + executions: { syncConfig: DBSyncConfig; webhook: string; connection: DBConnectionDecrypted | ConnectionInternal; logCtx: LogContext }[], + body: Record + ): Promise { + const orchestrator = getOrchestrator(); + for (const { syncConfig, webhook, connection, logCtx } of executions) { + await orchestrator.triggerWebhook({ + connection, + webhookName: webhook, + syncConfig, + input: body, + maxConcurrency: envs.WEBHOOK_ENVIRONMENT_MAX_CONCURRENCY, + logCtx + }); + } } private async dispatchViaQueue({ @@ -304,34 +323,44 @@ export class InternalNango { ); logCtx.attachSpan(new OtlpSpan(logCtx.operation)); + const message: WebhookDispatchMessage = { + version: 1, + kind: 'webhook', + taskName: computeTaskName({ + environmentId: this.environment.id, + providerConfigKey: this.integration.unique_key, + parentSyncName: syncConfig.sync_name, + connectionId: connection.id, + activityLogId: logCtx.id + }), + createdAt: new Date().toISOString(), + accountId: this.team.id, + integrationId: this.integration.id!, + provider: this.integration.provider, + parentSyncName: syncConfig.sync_name, + activityLogId: logCtx.id, + webhookName: webhook, + connection: { + id: connection.id, + connection_id: connection.connection_id, + provider_config_key: connection.provider_config_key, + environment_id: connection.environment_id + }, + payload: body + }; + const serializedBody = JSON.stringify(message); + const preparedMessage: PreparedDispatchMessage = { + message, + byteSize: Buffer.byteLength(serializedBody, 'utf8') + }; + return { kind: 'queued' as const, logCtx, - message: { - version: 1 as const, - kind: 'webhook' as const, - taskName: computeTaskName({ - environmentId: this.environment.id, - providerConfigKey: this.integration.unique_key, - parentSyncName: syncConfig.sync_name, - connectionId: connection.id, - activityLogId: logCtx.id - }), - createdAt: new Date().toISOString(), - accountId: this.team.id, - integrationId: this.integration.id!, - provider: this.integration.provider, - parentSyncName: syncConfig.sync_name, - activityLogId: logCtx.id, - webhookName: webhook, - connection: { - id: connection.id, - connection_id: connection.connection_id, - provider_config_key: connection.provider_config_key, - environment_id: connection.environment_id - }, - payload: body - } + syncConfig, + webhook, + connection, + preparedMessage }; } catch (err) { if (logCtx) { @@ -380,8 +409,6 @@ export class InternalNango { return; } - const messages = queuedExecutions.map(({ message }) => message); - if (matchedExecutions.length > LARGE_FANOUT_THRESHOLD) { metrics.increment(metrics.Types.WEBHOOK_DISPATCH_LARGE_FANOUT, 1, { provider: this.integration.provider, @@ -390,34 +417,57 @@ export class InternalNango { }); } - const messageGroupId = `account:${this.team.id}:env:${this.environment.id}`; - const publishResult = await publisher.publish(messages, messageGroupId); - const failedActivityLogIds = new Set(publishResult.failedActivityLogIds); - const unmappedFailureCount = publishResult.failed - failedActivityLogIds.size; + const queueEligibleExecutions = queuedExecutions.filter(({ preparedMessage }) => preparedMessage.byteSize <= SQS_MAX_MESSAGE_BODY_BYTES); + const oversizedExecutions = queuedExecutions.filter(({ preparedMessage }) => preparedMessage.byteSize > SQS_MAX_MESSAGE_BODY_BYTES); + + let unmappedFailureCount = 0; + + if (queueEligibleExecutions.length > 0) { + const messageGroupId = `account:${this.team.id}:env:${this.environment.id}`; + const publishResult = await publisher.publish( + queueEligibleExecutions.map(({ preparedMessage }) => preparedMessage), + messageGroupId + ); + const failedActivityLogIds = new Set(publishResult.failedActivityLogIds); + unmappedFailureCount = publishResult.failed - failedActivityLogIds.size; + + for (const { + preparedMessage: { message }, + logCtx + } of queueEligibleExecutions) { + if (!failedActivityLogIds.has(message.activityLogId) && unmappedFailureCount === 0) { + void logCtx.info('The webhook was successfully queued for execution', { + action: message.webhookName, + connection: message.connection.connection_id, + integration: message.connection.provider_config_key + }); + continue; + } - for (const { message, logCtx } of queuedExecutions) { - if (!failedActivityLogIds.has(message.activityLogId) && unmappedFailureCount === 0) { - void logCtx.info('The webhook was successfully queued for execution', { - action: message.webhookName, + const error = new NangoError('webhook_failure', { + error: 'The webhook could not be queued for execution', + taskName: message.taskName + }); + + await logCtx.error('The webhook failed to queue for execution', { + error, + webhook: message.webhookName, connection: message.connection.connection_id, integration: message.connection.provider_config_key }); - continue; + await logCtx.enrichOperation({ error }); + await logCtx.failed(); } + } - const error = new NangoError('webhook_failure', { - error: 'The webhook could not be queued for execution', - taskName: message.taskName + if (oversizedExecutions.length > 0) { + metrics.increment(metrics.Types.WEBHOOK_DISPATCH_BYPASS_OVERSIZE, oversizedExecutions.length, { + provider: this.integration.provider, + accountId: this.team.id, + environmentId: this.environment.id }); - await logCtx.error('The webhook failed to queue for execution', { - error, - webhook: message.webhookName, - connection: message.connection.connection_id, - integration: message.connection.provider_config_key - }); - await logCtx.enrichOperation({ error }); - await logCtx.failed(); + await this.dispatchExecutionsViaOrchestrator(oversizedExecutions, body); } if (unmappedFailureCount > 0) { diff --git a/packages/server/lib/webhook/internal-nango.unit.test.ts b/packages/server/lib/webhook/internal-nango.unit.test.ts index 1ec4460dde..ca74ad4465 100644 --- a/packages/server/lib/webhook/internal-nango.unit.test.ts +++ b/packages/server/lib/webhook/internal-nango.unit.test.ts @@ -9,6 +9,8 @@ const mocks = vi.hoisted(() => { dispatchQueueClient: { dispatchQueuePublisher: null as any }, triggerWebhook: vi.fn(), report: vi.fn(), + triggerWebhookWithLogContext: vi.fn(), + metricsIncrement: vi.fn(), getConnectionsByEnvironmentAndConfig: vi.fn(), getSyncConfigsByConfigIdForWebhook: vi.fn() }; @@ -20,9 +22,26 @@ vi.mock('../utils/utils.js', () => ({ getOrchestrator: () => ({ triggerWebhook: vi.mock('@nangohq/utils', async (importOriginal) => { const actual = await importOriginal(); + if (!actual || typeof actual !== 'object' || !('metrics' in actual)) { + throw new Error('Invalid @nangohq/utils mock: missing metrics export'); + } + + const { metrics } = actual; + if (!metrics || typeof metrics !== 'object' || !('Types' in metrics) || !metrics.Types || typeof metrics.Types !== 'object') { + throw new Error('Invalid @nangohq/utils mock: missing metrics.Types export'); + } + return { ...(actual as object), - report: mocks.report + report: mocks.report, + metrics: { + ...metrics, + Types: { + ...metrics.Types, + WEBHOOK_DISPATCH_BYPASS_OVERSIZE: 'nango.webhook.dispatch_queue.bypass_oversize' + }, + increment: mocks.metricsIncrement + } }; }); vi.mock('@nangohq/shared', () => { @@ -105,6 +124,7 @@ describe('InternalNango queue dispatch', () => { ]); mocks.getSyncConfigsByConfigIdForWebhook.mockResolvedValue([{ id: 21, sync_name: 'sync-1', webhook_subscriptions: ['push'] }]); mocks.triggerWebhook.mockResolvedValue(undefined); + mocks.triggerWebhookWithLogContext.mockResolvedValue(undefined); }); it('logs queued successes and marks failed publishes as failed operations', async () => { @@ -141,13 +161,34 @@ describe('InternalNango queue dispatch', () => { const publisher = { publish: vi.fn() }; mocks.dispatchQueueClient.dispatchQueuePublisher = publisher; - const { nango, logContextGetter } = makeInternalNango([]); + const logCtx1 = createLogCtx('log-1'); + const logCtx2 = createLogCtx('log-2'); + const { nango, logContextGetter } = makeInternalNango([logCtx1, logCtx2]); const result = await nango.executeScriptForWebhooks({ body: { event: 'x' }, webhookTypeValue: 'push' }); expect(result.connectionIds).toEqual(['conn-1', 'conn-2']); expect(mocks.triggerWebhook).toHaveBeenCalledTimes(2); - expect(logContextGetter.create).not.toHaveBeenCalled(); + expect(logContextGetter.create).toHaveBeenCalledTimes(2); + expect(publisher.publish).not.toHaveBeenCalled(); + }); + + it('dispatches successful orchestrator executions even when one log context creation fails', async () => { + mocks.envs.WEBHOOK_INGRESS_USE_DISPATCH_QUEUE = false; + const publisher = { publish: vi.fn() }; + mocks.dispatchQueueClient.dispatchQueuePublisher = publisher; + + const logCtx2 = createLogCtx('log-2'); + const logContextGetter = { + create: vi.fn().mockRejectedValueOnce(new Error('transient db failure')).mockResolvedValueOnce(logCtx2) + } as any; + const { nango } = makeInternalNango([], logContextGetter); + + const result = await nango.executeScriptForWebhooks({ body: { event: 'x' }, webhookTypeValue: 'push' }); + + expect(result.connectionIds).toEqual(['conn-1', 'conn-2']); + expect(mocks.triggerWebhook).toHaveBeenCalledTimes(1); + expect(mocks.triggerWebhook).toHaveBeenCalledWith(expect.objectContaining({ logCtx: logCtx2 })); expect(publisher.publish).not.toHaveBeenCalled(); }); @@ -209,9 +250,11 @@ describe('InternalNango queue dispatch', () => { expect(publisher.publish).toHaveBeenCalledWith( [ expect.objectContaining({ - activityLogId: 'log-2', - webhookName: 'push', - connection: expect.objectContaining({ connection_id: 'conn-2' }) + message: expect.objectContaining({ + activityLogId: 'log-2', + webhookName: 'push', + connection: expect.objectContaining({ connection_id: 'conn-2' }) + }) }) ], 'account:1:env:2' @@ -254,8 +297,10 @@ describe('InternalNango queue dispatch', () => { expect(publisher.publish).toHaveBeenCalledWith( [ expect.objectContaining({ - activityLogId: 'log-2', - connection: expect.objectContaining({ connection_id: 'conn-2' }) + message: expect.objectContaining({ + activityLogId: 'log-2', + connection: expect.objectContaining({ connection_id: 'conn-2' }) + }) }) ], 'account:1:env:2' @@ -297,4 +342,24 @@ describe('InternalNango queue dispatch', () => { environmentId: 2 }); }); + + it('dispatches oversized messages directly to the orchestrator and emits a metric', async () => { + const publisher = { publish: vi.fn() }; + mocks.dispatchQueueClient.dispatchQueuePublisher = publisher; + + const logCtx1 = createLogCtx('log-1'); + const logCtx2 = createLogCtx('log-2'); + const { nango } = makeInternalNango([logCtx1, logCtx2]); + + const result = await nango.executeScriptForWebhooks({ body: { payload: 'x'.repeat(1_100_000) }, webhookTypeValue: 'push' }); + + expect(result.connectionIds).toEqual(['conn-1', 'conn-2']); + expect(publisher.publish).not.toHaveBeenCalled(); + expect(mocks.triggerWebhook).toHaveBeenCalledTimes(2); + expect(mocks.metricsIncrement).toHaveBeenCalledWith('nango.webhook.dispatch_queue.bypass_oversize', 2, { + provider: 'github', + accountId: 1, + environmentId: 2 + }); + }); }); diff --git a/packages/shared/lib/clients/orchestrator.ts b/packages/shared/lib/clients/orchestrator.ts index 75d244dd8d..379f56d968 100644 --- a/packages/shared/lib/clients/orchestrator.ts +++ b/packages/shared/lib/clients/orchestrator.ts @@ -3,7 +3,6 @@ import ms from 'ms'; import { v4 as uuid } from 'uuid'; import db from '@nangohq/database'; -import { OtlpSpan } from '@nangohq/logs'; import { Err, Ok, errorToObject, getCheckpointKey, getFrequencyMs, stringifyError } from '@nangohq/utils'; import { hardDeleteCheckpoints } from '../index.js'; @@ -33,16 +32,7 @@ import type { VoidReturn } from '@nangohq/nango-orchestrator'; import type { RecordCount } from '@nangohq/records'; -import type { - AsyncActionResponse, - ConnectionInternal, - ConnectionJobs, - DBConnection, - DBConnectionDecrypted, - DBEnvironment, - DBSyncConfig, - DBTeam -} from '@nangohq/types'; +import type { AsyncActionResponse, ConnectionInternal, ConnectionJobs, DBConnection, DBConnectionDecrypted, DBSyncConfig } from '@nangohq/types'; import type { Result } from '@nangohq/utils'; import type { JsonValue } from 'type-fest'; @@ -234,25 +224,19 @@ export class Orchestrator { } async triggerWebhook({ - account, - environment, - integration, connection, webhookName, syncConfig, input, maxConcurrency, - logContextGetter + logCtx }: { - account: DBTeam; - environment: DBEnvironment; - integration: ProviderConfig; connection: ConnectionJobs; webhookName: string; syncConfig: DBSyncConfig; input: object; maxConcurrency: number; - logContextGetter: LogContextGetter; + logCtx: LogContext; }): Promise> { const activeSpan = tracer.scope().active(); const spanTags = { @@ -267,17 +251,6 @@ export class Orchestrator { tags: spanTags, ...(activeSpan ? { childOf: activeSpan } : {}) }); - const logCtx = await logContextGetter.create( - { operation: { type: 'webhook', action: 'incoming' }, expiresAt: new Date(Date.now() + 15 * 60 * 1000).toISOString() }, - { - account, - environment, - integration: { id: integration.id!, name: integration.unique_key, provider: integration.provider }, - connection: { id: connection.id, name: connection.connection_id }, - syncConfig: { id: syncConfig.id, name: syncConfig.sync_name } - } - ); - logCtx.attachSpan(new OtlpSpan(logCtx.operation)); try { let parsedInput = null; diff --git a/packages/utils/lib/telemetry/metrics.ts b/packages/utils/lib/telemetry/metrics.ts index 033d17a3c2..2a5a152b87 100644 --- a/packages/utils/lib/telemetry/metrics.ts +++ b/packages/utils/lib/telemetry/metrics.ts @@ -70,6 +70,7 @@ export enum Types { WEBHOOK_DISPATCH_PUBLISH_SUCCESS = 'nango.webhook.dispatch_queue.publish.success', WEBHOOK_DISPATCH_PUBLISH_FAILURE = 'nango.webhook.dispatch_queue.publish.failure', + WEBHOOK_DISPATCH_BYPASS_OVERSIZE = 'nango.webhook.dispatch_queue.bypass_oversize', WEBHOOK_DISPATCH_LARGE_FANOUT = 'nango.webhook.dispatch_queue.large_fanout', WEBHOOK_DISPATCH_CONSUME_SUCCESS = 'nango.webhook.dispatch_queue.consume.success', WEBHOOK_DISPATCH_CONSUME_FAILURE = 'nango.webhook.dispatch_queue.consume.failure', From 3f762bd268a7ef3da9a066ac548871db82d3d0ac Mon Sep 17 00:00:00 2001 From: agusayerza Date: Thu, 30 Apr 2026 10:20:17 -0300 Subject: [PATCH 6/7] feat(webhooks): discard stale messages from the dispatch queue (NAN-5416) Drops SQS messages older than a configurable threshold (default: 2 hours) instead of executing them, preventing stale webhook deliveries after queue backup or deployment gaps. Adds a log line for discarded messages. --- packages/jobs/lib/app.ts | 3 ++- .../jobs/lib/webhook/dispatch-queue/consumer.ts | 16 +++++++++++++++- .../webhook/dispatch-queue/consumer.unit.test.ts | 15 ++++++++++++++- .../lib/webhook/internal-nango.unit.test.ts | 2 -- packages/utils/lib/environment/parse.ts | 1 + packages/utils/lib/telemetry/metrics.ts | 1 + 6 files changed, 33 insertions(+), 5 deletions(-) diff --git a/packages/jobs/lib/app.ts b/packages/jobs/lib/app.ts index 1066f0d6d7..ecf962d67f 100644 --- a/packages/jobs/lib/app.ts +++ b/packages/jobs/lib/app.ts @@ -48,7 +48,8 @@ try { consumerConcurrency: envs.NANGO_TASK_DISPATCH_CONSUMER_CONCURRENCY, maxMessages: envs.NANGO_TASK_DISPATCH_MAX_MESSAGES, waitTimeSeconds: envs.NANGO_TASK_DISPATCH_WAIT_TIME_SECONDS, - visibilityTimeoutSeconds: envs.NANGO_TASK_DISPATCH_VISIBILITY_TIMEOUT_SECONDS + visibilityTimeoutSeconds: envs.NANGO_TASK_DISPATCH_VISIBILITY_TIMEOUT_SECONDS, + maxAgeMs: envs.NANGO_TASK_DISPATCH_MAX_AGE_SECONDS * 1000 }) : undefined; diff --git a/packages/jobs/lib/webhook/dispatch-queue/consumer.ts b/packages/jobs/lib/webhook/dispatch-queue/consumer.ts index ab0b6f2a48..93036ef4c2 100644 --- a/packages/jobs/lib/webhook/dispatch-queue/consumer.ts +++ b/packages/jobs/lib/webhook/dispatch-queue/consumer.ts @@ -2,6 +2,7 @@ import { DeleteMessageCommand, ReceiveMessageCommand, SQSClient } from '@aws-sdk import tracer from 'dd-trace'; import * as z from 'zod'; +import { logContextGetter } from '@nangohq/logs'; import { isDuplicateTaskNameClientError, jsonSchema } from '@nangohq/nango-orchestrator'; import { Err, Ok, getLogger, metrics, report } from '@nangohq/utils'; @@ -42,6 +43,7 @@ export interface DispatchQueueConsumerProps { maxMessages: number; waitTimeSeconds: number; visibilityTimeoutSeconds: number; + maxAgeMs: number; sqs?: SQSClient; } @@ -54,6 +56,7 @@ export class DispatchQueueConsumer { private readonly maxMessages: number; private readonly waitTimeSeconds: number; private readonly visibilityTimeoutSeconds: number; + private readonly maxAgeMs: number; private readonly abortController = new AbortController(); private loopPromises: Promise[] = []; @@ -65,6 +68,7 @@ export class DispatchQueueConsumer { this.maxMessages = props.maxMessages; this.waitTimeSeconds = props.waitTimeSeconds; this.visibilityTimeoutSeconds = props.visibilityTimeoutSeconds; + this.maxAgeMs = props.maxAgeMs; this.sqs = props.sqs ?? new SQSClient(envs.AWS_REGION ? { region: envs.AWS_REGION } : {}); } @@ -145,7 +149,17 @@ export class DispatchQueueConsumer { const sentTimestampMs = Number(msg.Attributes?.['SentTimestamp'] ?? '0'); if (sentTimestampMs > 0) { - metrics.duration(metrics.Types.WEBHOOK_DISPATCH_DWELL_MS, Date.now() - sentTimestampMs, { provider: message.provider }); + const dwellMs = Date.now() - sentTimestampMs; + metrics.duration(metrics.Types.WEBHOOK_DISPATCH_DWELL_MS, dwellMs, { provider: message.provider }); + + if (this.maxAgeMs > 0 && dwellMs > this.maxAgeMs) { + span.setTag('stale', true); + metrics.increment(metrics.Types.WEBHOOK_DISPATCH_STALE, 1, { accountId: message.accountId }); + const logCtx = logContextGetter.get({ id: message.activityLogId, accountId: message.accountId }); + await logCtx.warn('Webhook was discarded: it spent too long in the queue and was not processed.', { dwell_ms: dwellMs }); + await this.tryDeleteMessage(msg.ReceiptHandle); + return; + } } const scheduleRes = await this.orchestratorClient.executeWebhook({ diff --git a/packages/jobs/lib/webhook/dispatch-queue/consumer.unit.test.ts b/packages/jobs/lib/webhook/dispatch-queue/consumer.unit.test.ts index a168efe03b..001251179d 100644 --- a/packages/jobs/lib/webhook/dispatch-queue/consumer.unit.test.ts +++ b/packages/jobs/lib/webhook/dispatch-queue/consumer.unit.test.ts @@ -61,6 +61,7 @@ function makeHarness( messages?: WebhookDispatchMessage[]; badBody?: string; consumerConcurrency?: number; + maxAgeMs?: number; sqsSend?: ReturnType; } = {} ): Harness { @@ -102,7 +103,8 @@ function makeHarness( consumerConcurrency: opts.consumerConcurrency ?? 1, maxMessages: 10, waitTimeSeconds: 0, - visibilityTimeoutSeconds: 30 + visibilityTimeoutSeconds: 30, + maxAgeMs: opts.maxAgeMs ?? 0 }); return { consumer, sqsSend, sqsDestroy, orchestratorExecuteWebhook }; @@ -261,6 +263,17 @@ describe('DispatchQueueConsumer', () => { expect(deleteCalls[0]?.[1]).toBeUndefined(); }); + it('deletes a stale message without calling orchestrator', async () => { + const h = makeHarness({ messages: [buildMessage()], maxAgeMs: 100 }); + // SentTimestamp in makeHarness is Date.now() - 500, which exceeds maxAgeMs of 100ms + await runOnce(h, () => { + expect(getDeleteCalls(h)).toHaveLength(1); + }); + + expect(h.orchestratorExecuteWebhook).not.toHaveBeenCalled(); + expect(getDeleteCalls(h)).toHaveLength(1); + }); + it('starts one poll loop per configured consumerConcurrency', async () => { let receiveCalls = 0; const firstReceives = [deferred(), deferred()]; diff --git a/packages/server/lib/webhook/internal-nango.unit.test.ts b/packages/server/lib/webhook/internal-nango.unit.test.ts index ca74ad4465..d258fe8813 100644 --- a/packages/server/lib/webhook/internal-nango.unit.test.ts +++ b/packages/server/lib/webhook/internal-nango.unit.test.ts @@ -9,7 +9,6 @@ const mocks = vi.hoisted(() => { dispatchQueueClient: { dispatchQueuePublisher: null as any }, triggerWebhook: vi.fn(), report: vi.fn(), - triggerWebhookWithLogContext: vi.fn(), metricsIncrement: vi.fn(), getConnectionsByEnvironmentAndConfig: vi.fn(), getSyncConfigsByConfigIdForWebhook: vi.fn() @@ -124,7 +123,6 @@ describe('InternalNango queue dispatch', () => { ]); mocks.getSyncConfigsByConfigIdForWebhook.mockResolvedValue([{ id: 21, sync_name: 'sync-1', webhook_subscriptions: ['push'] }]); mocks.triggerWebhook.mockResolvedValue(undefined); - mocks.triggerWebhookWithLogContext.mockResolvedValue(undefined); }); it('logs queued successes and marks failed publishes as failed operations', async () => { diff --git a/packages/utils/lib/environment/parse.ts b/packages/utils/lib/environment/parse.ts index 413a5d853f..63eabb6812 100644 --- a/packages/utils/lib/environment/parse.ts +++ b/packages/utils/lib/environment/parse.ts @@ -550,6 +550,7 @@ export const ENVS = z.object({ NANGO_TASK_DISPATCH_CONSUMER_CONCURRENCY: z.coerce.number().min(1).optional().default(50), NANGO_TASK_DISPATCH_PUBLISH_BATCH_SIZE: z.coerce.number().min(1).max(10).optional().default(10), NANGO_TASK_DISPATCH_PUBLISH_CONCURRENCY: z.coerce.number().min(1).optional().default(10), + NANGO_TASK_DISPATCH_MAX_AGE_SECONDS: z.coerce.number().min(0).optional().default(7200), // E2B sandboxes E2B_API_KEY: z.string().optional(), diff --git a/packages/utils/lib/telemetry/metrics.ts b/packages/utils/lib/telemetry/metrics.ts index 2a5a152b87..2416b1e7dd 100644 --- a/packages/utils/lib/telemetry/metrics.ts +++ b/packages/utils/lib/telemetry/metrics.ts @@ -75,6 +75,7 @@ export enum Types { WEBHOOK_DISPATCH_CONSUME_SUCCESS = 'nango.webhook.dispatch_queue.consume.success', WEBHOOK_DISPATCH_CONSUME_FAILURE = 'nango.webhook.dispatch_queue.consume.failure', WEBHOOK_DISPATCH_POISON_PILL = 'nango.webhook.dispatch_queue.poison_pill', + WEBHOOK_DISPATCH_STALE = 'nango.webhook.dispatch_queue.stale', WEBHOOK_DISPATCH_DWELL_MS = 'nango.webhook.dispatch_queue.dwell_ms', ORCH_TASKS_CREATED = 'nango.orch.tasks.created', From 41cad2689204612df46f1e9ec77f3d62859c674a Mon Sep 17 00:00:00 2001 From: agusayerza Date: Thu, 30 Apr 2026 10:41:53 -0300 Subject: [PATCH 7/7] [DO-NOT-MERGE] force-enable dispatch queue and set consumer concurrency to 1 for dev testing --- packages/utils/lib/environment/parse.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/utils/lib/environment/parse.ts b/packages/utils/lib/environment/parse.ts index 63eabb6812..01d495d81c 100644 --- a/packages/utils/lib/environment/parse.ts +++ b/packages/utils/lib/environment/parse.ts @@ -537,7 +537,7 @@ export const ENVS = z.object({ NANGO_WEBHOOK_CIRCUIT_BREAKER_AUTO_RESET_SECS: z.coerce.number().optional().default(3600), // WEBHOOK INGRESS - WEBHOOK_INGRESS_USE_DISPATCH_QUEUE: z.stringbool().optional().default(false), + WEBHOOK_INGRESS_USE_DISPATCH_QUEUE: z.stringbool().optional().default(true), NANGO_WEBHOOK_INGRESS_RATE_LIMIT_PER_MIN: z.coerce.number().min(0).optional().default(4000), NANGO_WEBHOOK_INGRESS_RATE_LIMIT_ENFORCE: z.stringbool().optional().default(false), @@ -547,7 +547,7 @@ export const ENVS = z.object({ NANGO_TASK_DISPATCH_MAX_MESSAGES: z.coerce.number().min(1).max(10).optional().default(10), NANGO_TASK_DISPATCH_WAIT_TIME_SECONDS: z.coerce.number().min(0).max(20).optional().default(20), NANGO_TASK_DISPATCH_VISIBILITY_TIMEOUT_SECONDS: z.coerce.number().min(0).max(43200).optional().default(30), - NANGO_TASK_DISPATCH_CONSUMER_CONCURRENCY: z.coerce.number().min(1).optional().default(50), + NANGO_TASK_DISPATCH_CONSUMER_CONCURRENCY: z.coerce.number().min(1).optional().default(1), NANGO_TASK_DISPATCH_PUBLISH_BATCH_SIZE: z.coerce.number().min(1).max(10).optional().default(10), NANGO_TASK_DISPATCH_PUBLISH_CONCURRENCY: z.coerce.number().min(1).optional().default(10), NANGO_TASK_DISPATCH_MAX_AGE_SECONDS: z.coerce.number().min(0).optional().default(7200),