Skip to content

Agus/test webhook queue#6003

Draft
agusayerza wants to merge 7 commits into
masterfrom
agus/test-webhook-queue
Draft

Agus/test webhook queue#6003
agusayerza wants to merge 7 commits into
masterfrom
agus/test-webhook-queue

Conversation

@agusayerza
Copy link
Copy Markdown
Contributor

No description provided.

Introduces a DispatchQueuePublisher abstraction for publishing webhook execution messages to SQS. Uses activity log ID as the message identifier. Adds tracing and a shared webhook concurrency helper.
Adds a DispatchQueueConsumer in the jobs service that pulls webhook execution messages from SQS and dispatches them via orchestratorClient.executeWebhook(). Handles duplicate task name errors, aligns on activity log IDs, and removes redundant metrics.
…NAN-5340)

Routes webhook execution through the SQS dispatch queue instead of calling the orchestrator directly. Isolates queue log context failures so they don't block execution, fixes metric dimensions, and reuses the shared webhook concurrency helper.
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 issues found across 27 files

Confidence score: 2/5

  • There is a high-confidence functional risk in packages/server/lib/webhook/runWithConcurrencyLimit.ts: when concurrency <= 0, workers never start and results can be returned unprocessed, which can directly break webhook handling behavior.
  • packages/jobs/lib/webhook/dispatch-queue/consumer.ts also has a user-impacting reliability gap: if msg.Body is missing but ReceiptHandle exists, the message may be silently retried until DLQ without logs or metrics, making incidents hard to detect and triage.
  • Given one severe, concrete execution bug plus another observable processing/operability issue, this is higher merge risk until guarded and instrumented.
  • Pay close attention to packages/server/lib/webhook/runWithConcurrencyLimit.ts, packages/jobs/lib/webhook/dispatch-queue/consumer.ts - concurrency validation and silent message-drop/redelivery behavior need hardening.
Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="packages/jobs/lib/webhook/dispatch-queue/consumer.ts">

<violation number="1" location="packages/jobs/lib/webhook/dispatch-queue/consumer.ts:130">
P2: This early return silently drops the message without logging, metrics, or deletion. If `msg.Body` is undefined but `ReceiptHandle` exists, the message will be redelivered until the DLQ with no visibility. Consider treating this like the parse failure path below — log it, increment the poison pill metric, and delete the message.

(Based on your team's feedback about adding observability for failure paths and dropped work.) [FEEDBACK_USED]</violation>
</file>

<file name="packages/server/lib/webhook/runWithConcurrencyLimit.ts">

<violation number="1" location="packages/server/lib/webhook/runWithConcurrencyLimit.ts:3">
P1: Guard invalid concurrency values. With `concurrency <= 0`, no workers run and the function returns unprocessed results.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review, or fix all with cubic.

@@ -0,0 +1,20 @@
export async function runWithConcurrencyLimit<T, R>(items: T[], concurrency: number, worker: (item: T, index: number) => Promise<R>): Promise<R[]> {
const results: R[] = new Array(items.length);
const workerCount = Math.min(concurrency, items.length);
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai Bot Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Guard invalid concurrency values. With concurrency <= 0, no workers run and the function returns unprocessed results.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At packages/server/lib/webhook/runWithConcurrencyLimit.ts, line 3:

<comment>Guard invalid concurrency values. With `concurrency <= 0`, no workers run and the function returns unprocessed results.</comment>

<file context>
@@ -0,0 +1,20 @@
+export async function runWithConcurrencyLimit<T, R>(items: T[], concurrency: number, worker: (item: T, index: number) => Promise<R>): Promise<R[]> {
+    const results: R[] = new Array(items.length);
+    const workerCount = Math.min(concurrency, items.length);
+    let nextIndex = 0;
+
</file context>
Fix with Cubic


return await tracer.scope().activate(span, async () => {
try {
if (msg.Body === undefined || !msg.ReceiptHandle) {
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai Bot Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: This early return silently drops the message without logging, metrics, or deletion. If msg.Body is undefined but ReceiptHandle exists, the message will be redelivered until the DLQ with no visibility. Consider treating this like the parse failure path below — log it, increment the poison pill metric, and delete the message.

(Based on your team's feedback about adding observability for failure paths and dropped work.)

View Feedback

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At packages/jobs/lib/webhook/dispatch-queue/consumer.ts, line 130:

<comment>This early return silently drops the message without logging, metrics, or deletion. If `msg.Body` is undefined but `ReceiptHandle` exists, the message will be redelivered until the DLQ with no visibility. Consider treating this like the parse failure path below — log it, increment the poison pill metric, and delete the message.

(Based on your team's feedback about adding observability for failure paths and dropped work.) </comment>

<file context>
@@ -0,0 +1,242 @@
+
+        return await tracer.scope().activate(span, async () => {
+            try {
+                if (msg.Body === undefined || !msg.ReceiptHandle) {
+                    return;
+                }
</file context>
Fix with Cubic

Falls back to direct orchestrator dispatch when a webhook payload exceeds SQS's 1MB limit. Extracts dispatchExecutionsViaOrchestrator from InternalNango into a dedicated method to support both code paths cleanly.
…416)

Drops SQS messages older than a configurable threshold (default: 2 hours) instead of executing them, preventing stale webhook deliveries after queue backup or deployment gaps. Adds a log line for discarded messages.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant