Agus/test webhook queue#6003
Conversation
Introduces a DispatchQueuePublisher abstraction for publishing webhook execution messages to SQS. Uses activity log ID as the message identifier. Adds tracing and a shared webhook concurrency helper.
Adds a DispatchQueueConsumer in the jobs service that pulls webhook execution messages from SQS and dispatches them via orchestratorClient.executeWebhook(). Handles duplicate task name errors, aligns on activity log IDs, and removes redundant metrics.
…NAN-5340) Routes webhook execution through the SQS dispatch queue instead of calling the orchestrator directly. Isolates queue log context failures so they don't block execution, fixes metric dimensions, and reuses the shared webhook concurrency helper.
There was a problem hiding this comment.
2 issues found across 27 files
Confidence score: 2/5
- There is a high-confidence functional risk in
packages/server/lib/webhook/runWithConcurrencyLimit.ts: whenconcurrency <= 0, workers never start and results can be returned unprocessed, which can directly break webhook handling behavior. packages/jobs/lib/webhook/dispatch-queue/consumer.tsalso has a user-impacting reliability gap: ifmsg.Bodyis missing butReceiptHandleexists, the message may be silently retried until DLQ without logs or metrics, making incidents hard to detect and triage.- Given one severe, concrete execution bug plus another observable processing/operability issue, this is higher merge risk until guarded and instrumented.
- Pay close attention to
packages/server/lib/webhook/runWithConcurrencyLimit.ts,packages/jobs/lib/webhook/dispatch-queue/consumer.ts- concurrency validation and silent message-drop/redelivery behavior need hardening.
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="packages/jobs/lib/webhook/dispatch-queue/consumer.ts">
<violation number="1" location="packages/jobs/lib/webhook/dispatch-queue/consumer.ts:130">
P2: This early return silently drops the message without logging, metrics, or deletion. If `msg.Body` is undefined but `ReceiptHandle` exists, the message will be redelivered until the DLQ with no visibility. Consider treating this like the parse failure path below — log it, increment the poison pill metric, and delete the message.
(Based on your team's feedback about adding observability for failure paths and dropped work.) [FEEDBACK_USED]</violation>
</file>
<file name="packages/server/lib/webhook/runWithConcurrencyLimit.ts">
<violation number="1" location="packages/server/lib/webhook/runWithConcurrencyLimit.ts:3">
P1: Guard invalid concurrency values. With `concurrency <= 0`, no workers run and the function returns unprocessed results.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review, or fix all with cubic.
| @@ -0,0 +1,20 @@ | |||
| export async function runWithConcurrencyLimit<T, R>(items: T[], concurrency: number, worker: (item: T, index: number) => Promise<R>): Promise<R[]> { | |||
| const results: R[] = new Array(items.length); | |||
| const workerCount = Math.min(concurrency, items.length); | |||
There was a problem hiding this comment.
P1: Guard invalid concurrency values. With concurrency <= 0, no workers run and the function returns unprocessed results.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At packages/server/lib/webhook/runWithConcurrencyLimit.ts, line 3:
<comment>Guard invalid concurrency values. With `concurrency <= 0`, no workers run and the function returns unprocessed results.</comment>
<file context>
@@ -0,0 +1,20 @@
+export async function runWithConcurrencyLimit<T, R>(items: T[], concurrency: number, worker: (item: T, index: number) => Promise<R>): Promise<R[]> {
+ const results: R[] = new Array(items.length);
+ const workerCount = Math.min(concurrency, items.length);
+ let nextIndex = 0;
+
</file context>
|
|
||
| return await tracer.scope().activate(span, async () => { | ||
| try { | ||
| if (msg.Body === undefined || !msg.ReceiptHandle) { |
There was a problem hiding this comment.
P2: This early return silently drops the message without logging, metrics, or deletion. If msg.Body is undefined but ReceiptHandle exists, the message will be redelivered until the DLQ with no visibility. Consider treating this like the parse failure path below — log it, increment the poison pill metric, and delete the message.
(Based on your team's feedback about adding observability for failure paths and dropped work.)
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At packages/jobs/lib/webhook/dispatch-queue/consumer.ts, line 130:
<comment>This early return silently drops the message without logging, metrics, or deletion. If `msg.Body` is undefined but `ReceiptHandle` exists, the message will be redelivered until the DLQ with no visibility. Consider treating this like the parse failure path below — log it, increment the poison pill metric, and delete the message.
(Based on your team's feedback about adding observability for failure paths and dropped work.) </comment>
<file context>
@@ -0,0 +1,242 @@
+
+ return await tracer.scope().activate(span, async () => {
+ try {
+ if (msg.Body === undefined || !msg.ReceiptHandle) {
+ return;
+ }
</file context>
Falls back to direct orchestrator dispatch when a webhook payload exceeds SQS's 1MB limit. Extracts dispatchExecutionsViaOrchestrator from InternalNango into a dedicated method to support both code paths cleanly.
…416) Drops SQS messages older than a configurable threshold (default: 2 hours) instead of executing them, preventing stale webhook deliveries after queue backup or deployment gaps. Adds a log line for discarded messages.
…cy to 1 for dev testing
90e243a to
41cad26
Compare
No description provided.