Skip to content

feat(webhook): Vercel receiver + MongoDB queue (changeStream consumer)#205

Open
snomiao wants to merge 5 commits intomainfrom
sno-bot
Open

feat(webhook): Vercel receiver + MongoDB queue (changeStream consumer)#205
snomiao wants to merge 5 commits intomainfrom
sno-bot

Conversation

@snomiao
Copy link
Copy Markdown
Member

@snomiao snomiao commented Apr 28, 2026

Summary

Move the Slack webhook ingest off the local VM bot and onto a serverless Vercel Function backed by a MongoDB queue. The bot tails the queue via changeStream, so Slack retries no longer get dropped while the bot is restarting/down.

What's new

bot/webhook-queue.ts — MongoDB-backed queue

  • enqueueWebhook() inserts a doc into webhook_queue (Atlas, replica set ⇒ changeStream supported)
  • startWebhookConsumer() drains any unprocessed backlog at boot, then tails the changeStream filtered by source
  • markWebhookProcessed() flips processed=true (and stashes truncated error stack on failure) so resume after restart doesn't reprocess
  • TTL index (expireAfterSeconds: 86400) on createdAt gives a 24h time-cap. Capped collections cap on bytes and don't allow per-doc updates, so we use a regular collection + TTL index instead.
  • Edge dedup collection (webhook_edge_dedup) with 1h TTL — used by the Vercel route to suppress retry storms before they reach the queue.

bot/slack-bot.ts — wire bot consumer

  • /slack/events HTTP handler now just verifies signature, dedups, and enqueueWebhook() — no inline dispatch.
  • startSlackBot() calls startWebhookConsumer({ sources: ["slack"] }) so the existing handleSlackEvent runs against queue docs instead of inbound HTTP.

app/api/webhook/slack/route.ts — Vercel receiver

  • Slack HMAC-SHA256 sig verification (5-min replay window)
  • URL verification challenge passthrough
  • event_callback → edge dedup → enqueueWebhook({ source: "slack" }) → 200
  • runtime: "nodejs" so the mongodb driver works (Edge runtime can't open Atlas connections)
  • Deployed at https://comfy-pr.vercel.app/api/webhook/slack

docs/cloudrun-vs-gke-vs-hybrid.md

Background analysis comparing GKE vs Cloud Run vs Hybrid deployment options that led to this lighter-weight queue design.

Cutover

Slack app A078498JA5T Event Subscriptions Request URL has been moved:

prbot.stukivx.xyz/slack/events  →  comfy-pr.vercel.app/api/webhook/slack

Slack verified the new URL successfully. The local /slack/events endpoint remains as a fallback during the cutover; safe to remove after a few days of clean Vercel traffic.

Test plan

  • bunx tsc --noEmit
  • bun test bot/taskInputFlow.spec.ts (4/4 pass)
  • Local: signed event_callback → bot HTTP /slack/events → webhook_queue insert → consumer dispatch → processed=true (~70ms)
  • Vercel: signed URL verification challenge → 200 {challenge:...} after SLACK_SIGNING_SECRET env var set
  • Live Slack DM "vercel webhook routing test" → comfy-pr.vercel.app → webhook_queue (source=slack) → VM consumer → processed=true within ~26s
  • Slack app config: Request URL updated to Vercel, Verified ✓
  • Run for a few days and confirm no dropped events / no auth issues / retry-storm dedup behaves
  • Then remove the local /slack/events HTTP handler and stop redirecting prbot.stukivx.xyz to the VM

🤖 Generated with Claude Code

snomiao and others added 2 commits April 28, 2026 22:21
The Slack webhook handler used to dispatch events synchronously inside
the http handler. A bot crash, restart, or even a slow tick on the local
VM would silently drop the event — Slack returns 200 to the retry, our
state has nothing.

Replace the inline dispatch with a queue:

- bot/webhook-queue.ts: thin abstraction over a `webhook_queue`
  collection in MongoDB. Inserts are durable; a TTL index on createdAt
  expires docs at 24h (capped feature can't TTL by time, hence index +
  regular collection). Tail with a changeStream filtered by source so
  multiple bot processes can split work later by source.
- /slack/events handler now: verify sig → dedup (existing 1h
  webhook-event-* TTL state) → enqueue → 200. The actual handleSlackEvent
  call moves into the consumer.
- startSlackBot wires startWebhookConsumer({ sources: ["slack"], ... })
  with drainBacklog=true so any docs still marked unprocessed from a
  previous crash get replayed in createdAt order on boot.
- markWebhookProcessed flips `processed=true` (and stashes the truncated
  error stack on failure) so changeStream resume after restart doesn't
  reprocess what already finished.

Atlas (mongodb+srv://) provides replica-set automatically so changeStream
just works; if we ever switch to a stand-alone instance we'd need a
polling fallback.

Verified end-to-end: signed test webhook → insert in webhook_queue →
consumer fires within ~70ms → processed=true.

Also adds docs/cloudrun-vs-gke-vs-hybrid.md (the deployment options
analysis that prompted this lighter-weight design choice).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Move the Slack webhook ingest off the local VM bot and onto the
existing comfy-pr Vercel project so the receiver stays up while the
bot is restarting/down. The bot tails MongoDB's webhook_queue via
changeStream, so the failure mode "Slack retries 3× while bot is
restarting → 200 + drop" goes away.

- app/api/webhook/slack/route.ts: HMAC-SHA256 signature verification
  (5-min replay window), URL verification challenge passthrough, and
  enqueueWebhook() into source="slack". `nodejs` runtime so the
  mongodb driver works.
- Edge-level dedup via `webhook_edge_dedup` collection: insert
  `_id="slack:${eventId}"` with 1h TTL so retry storms don't pile up
  duplicate queue docs. The actual queue stays content-deduped by the
  bot consumer.
- bot/webhook-queue.ts: add the matching TTL index on
  webhook_edge_dedup so the dedup keys auto-expire.

Slack app A078498JA5T Event Subscriptions Request URL has been moved
from prbot.stukivx.xyz/slack/events (VM caddy reverse-proxy) to
https://comfy-pr.vercel.app/api/webhook/slack. Verified end-to-end
with a real DM: Slack → Vercel Function (200) → Mongo insert →
changeStream → VM consumer → processed=true within ~26s.

The local bot's /slack/events endpoint stays around as a fallback
during the cutover; we can remove it once a few days of Vercel
traffic passes without issues.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 28, 2026 22:22
@vercel
Copy link
Copy Markdown
Contributor

vercel Bot commented Apr 28, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
comfy-pr Ready Ready Preview, Comment Apr 30, 2026 3:34pm

Request Review

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Moves Slack webhook ingestion from the VM bot HTTP handler to a serverless Vercel endpoint backed by a MongoDB queue, with the VM bot consuming queued webhooks via MongoDB change streams to survive restarts and reduce dropped events.

Changes:

  • Added a MongoDB-backed webhook queue with TTL + changeStream consumer.
  • Updated the VM bot’s /slack/events handler to enqueue Slack events instead of dispatching inline, and started a queue consumer at bot startup.
  • Added a Vercel Next.js route to verify Slack signatures, dedup at the edge, and enqueue events; plus deployment/architecture documentation.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.

File Description
docs/cloudrun-vs-gke-vs-hybrid.md Adds deployment option analysis and rationale for the queue-based approach.
bot/webhook-queue.ts Introduces MongoDB webhook_queue + TTL indexes + changeStream consumer/drain logic.
bot/slack-bot.ts Changes Slack Events ingestion to enqueue into Mongo and starts a queue consumer to dispatch events.
app/api/webhook/slack/route.ts Adds Vercel Slack receiver with signature verification, edge dedup, enqueue, and a health/status GET.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +117 to +137
export async function GET() {
try {
await db.admin().ping();
const col = db.collection("webhook_queue");
const total = await col.countDocuments({ source: "slack" });
const pending = await col.countDocuments({ source: "slack", processed: false });
return NextResponse.json({
status: "ok",
collection: "webhook_queue",
slack: { total, pending },
});
} catch (error) {
return NextResponse.json(
{
status: "error",
message: error instanceof Error ? error.message : String(error),
},
{ status: 500 },
);
}
}
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GET /api/webhook/slack exposes database connectivity and queue counts without any authentication. Since this route is publicly reachable on Vercel, it can leak operational details (and can be used for low-effort DB probing). Consider restricting it (e.g., require an admin token header, limit to Vercel cron/health checks via a secret, or remove the endpoint in production).

Copilot uses AI. Check for mistakes.
Comment thread bot/slack-bot.ts
Comment on lines 295 to 323
const alreadySeen = await SlackBotState.get(`webhook-event-${eventId}`);
if (alreadySeen) {
logger.info(
`Ignoring duplicate webhook event ${eventId} (retry=${retryNum ?? "0"}, reason=${retryReason ?? "-"}, firstSeenAt=${new Date(alreadySeen.receivedAt).toISOString()})`,
);
return new Response("", { status: 200 });
}

// TTL 1h — Slack retries up to ~30min, so 1h covers worst case
// TTL 1h — Slack retries up to ~30min, so 1h covers worst case.
// This is independent of the queue's 24h TTL: the dedup key only
// lives on the http-edge to suppress retry storms; queue docs
// are authoritative for replay.
await SlackBotState.set(
`webhook-event-${eventId}`,
{ receivedAt: Date.now(), retryNum, retryReason },
60 * 60 * 1000,
);

// Slack Events API puts the workspace id on the *envelope*
// (`payload.team_id`), not on the inner event in some shapes.
// Forward it onto the event so downstream zod schemas that
// require `team` (zAppMentionEvent, zSlackMessage filter) don't
// silently reject webhook-delivered mentions/DMs.
const event = {
...payload.event,
team: payload.event?.team || payload.team_id,
};
handleSlackEvent(event).catch((err) =>
logger.error("Webhook event handler error", { err, eventId }),
);
await enqueueWebhook({
source: "slack",
eventId,
payload,
meta: {
retryNum: retryNum ?? null,
retryReason: retryReason ?? null,
receivedAt: Date.now(),
},
}).catch((err) => logger.error("Webhook enqueue failed", { err, eventId }));
}
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slack webhook dedup key (webhook-event-${eventId}) is written before the queue insert, and enqueue failures are swallowed. If Mongo insert fails, the handler still returns 200 and subsequent Slack retries will be ignored due to the dedup key, causing permanent event loss. Consider only setting the dedup key after a successful enqueue, and on enqueue failure return a non-2xx (and avoid marking the event as seen) so Slack retries can recover.

Copilot uses AI. Check for mistakes.
Comment thread bot/slack-bot.ts Outdated
Comment on lines +418 to +419
};
await handleSlackEvent(event);
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The queue consumer dispatches Slack events without any dedup keyed on doc.eventId. If the same Slack event ends up enqueued multiple times (e.g., edge-dedup failure, manual replay, multiple ingesters), handleSlackEvent can run more than once. Consider adding a consumer-level idempotency guard (e.g., reuse the existing webhook-event-${eventId} state key, or a unique index/upsert pattern) before calling handleSlackEvent.

Suggested change
};
await handleSlackEvent(event);
};
const eventId = doc.eventId;
const processedStateKey = eventId ? `webhook-event-${eventId}` : null;
if (processedStateKey && (await SlackBotState.get(processedStateKey))) {
logger.info("Skipping duplicate Slack webhook event", { eventId });
return;
}
await handleSlackEvent(event);
if (processedStateKey) {
await SlackBotState.set(processedStateKey, {
processedAt: new Date().toISOString(),
});
}

Copilot uses AI. Check for mistakes.
Comment thread bot/webhook-queue.ts
Comment on lines +142 to +168
if (drainBacklog) {
const filter = {
processed: false,
...(sources ? { source: { $in: sources } } : {}),
};
const backlog = await col.find(filter).sort({ createdAt: 1 }).toArray();
if (backlog.length) {
log.info(`webhook-queue: draining ${backlog.length} unprocessed docs`);
}
for (const doc of backlog) {
try {
await opts.consume(doc);
await markWebhookProcessed(doc._id!);
} catch (err) {
log.error(`webhook-queue: backlog consume failed for ${doc._id}`, { err });
await markWebhookProcessed(doc._id!, err as Error);
}
}
}

const pipeline = sources
? [{ $match: { operationType: "insert", "fullDocument.source": { $in: sources } } }]
: [{ $match: { operationType: "insert" } }];

let stream: ChangeStream<WebhookQueueDoc> | null = col.watch(pipeline, {
fullDocument: "updateLookup",
});
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startWebhookConsumer() drains the backlog first and only then opens the changeStream. Any docs inserted after the backlog find() runs but before col.watch() starts can be missed by both (not in the backlog snapshot, and changeStream starts after the insert), leaving them processed=false until the next restart. To avoid this race, open the changeStream first (capturing a resume token/operationTime) and then drain up to that point, or use a resume token persisted in state so you can safely resume without gaps.

Copilot uses AI. Check for mistakes.
Comment thread bot/webhook-queue.ts
Comment on lines +17 to +19
* - Resume-from-_id on the changeStream means a clean restart picks up
* exactly where it left off with no duplicates beyond Mongo's
* at-least-once semantics.
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The header comment mentions “Resume-from-_id on the changeStream”, but the implementation doesn’t store or pass any resume token (resumeAfter/startAfter/startAtOperationTime) when calling col.watch(). Either implement resume-token persistence (e.g., store the last processed resumeToken in Mongo/Keyv and resume from it), or update the comment to match the current backlog-drain approach.

Suggested change
* - Resume-from-_id on the changeStream means a clean restart picks up
* exactly where it left off with no duplicates beyond Mongo's
* at-least-once semantics.
* - Restarts recover by draining any queued documents that still have
* `processed=false`; the changeStream handles newly inserted docs after
* startup, with duplicates still possible under Mongo's at-least-once semantics.

Copilot uses AI. Check for mistakes.
Comment on lines +81 to +110
// Edge-level dedup so a Slack retry storm doesn't insert N copies.
// The actual queue is content-deduped by the bot consumer.
try {
const dedupCol = db.collection("webhook_edge_dedup");
// _id is a string; the unique constraint is built-in, so a duplicate
// throws code 11000.
await dedupCol.insertOne({
_id: `slack:${eventId}` as unknown as never,
createdAt: new Date(),
});
} catch (err) {
const code = (err as { code?: number }).code;
if (code === 11000) {
console.log(`slack webhook dedup hit for ${eventId} (retry=${retryNum})`);
return new NextResponse("", { status: 200 });
}
console.warn("dedup insert failed, falling through", err);
}

await enqueueWebhook({
source: "slack",
eventId,
payload,
meta: {
retryNum: retryNum ?? null,
retryReason: retryReason ?? null,
receivedAt: Date.now(),
},
});
}
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edge dedup is recorded before enqueuing the webhook. If enqueueWebhook() fails (e.g., transient Mongo outage), the request will 500 and Slack will retry, but the retry will hit the dedup record and return 200 without enqueuing — permanently dropping the event. Consider only inserting the dedup marker after a successful enqueue, or delete/rollback the dedup marker when enqueue fails so retries can proceed.

Copilot uses AI. Check for mistakes.
Comment on lines +84 to +88
const dedupCol = db.collection("webhook_edge_dedup");
// _id is a string; the unique constraint is built-in, so a duplicate
// throws code 11000.
await dedupCol.insertOne({
_id: `slack:${eventId}` as unknown as never,
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _id: slack:${eventId} as unknown as never cast is unnecessary and obscures the schema for webhook_edge_dedup docs. Prefer giving the collection a simple type (e.g., { _id: string; createdAt: Date }) and inserting _id as a plain string so type checking stays meaningful.

Suggested change
const dedupCol = db.collection("webhook_edge_dedup");
// _id is a string; the unique constraint is built-in, so a duplicate
// throws code 11000.
await dedupCol.insertOne({
_id: `slack:${eventId}` as unknown as never,
const dedupCol = db.collection<{ _id: string; createdAt: Date }>("webhook_edge_dedup");
// _id is a string; the unique constraint is built-in, so a duplicate
// throws code 11000.
await dedupCol.insertOne({
_id: `slack:${eventId}`,

Copilot uses AI. Check for mistakes.
Round 2 of the webhook-queue work — finish the multi-source story so
github and notion deliveries get the same restart-resilient pipeline
slack just got.

- app/api/webhook/github/route.ts: keep the legacy
  GithubWebhookEvents collection (for back-compat with existing
  setup-indexes.ts / webhook-events.ts helpers) and *also* enqueue
  into the unified webhook_queue keyed on x-github-delivery, so the
  VM bot consumer can react to github events alongside slack. Failure
  to enqueue is logged but does not fail the request — the legacy
  insert still succeeds.

- app/api/webhook/notion/route.ts: new endpoint. Two phases:
  1. Verification handshake — Notion POSTs `{verification_token}`
     once when you set the URL. Persist the token to
     webhook_notion_verification so the operator can register it as
     NOTION_WEBHOOK_VERIFICATION_TOKEN without scrubbing logs.
  2. Steady-state — verify X-Notion-Signature HMAC over the raw body,
     dedup by event id, enqueue source="notion".

- bot/slack-bot.ts: extend the queue consumer to subscribe to slack +
  github + notion. Slack still dispatches to handleSlackEvent;
  github / notion currently log-and-mark-processed since no
  downstream handler is wired yet. This keeps the queue tidy (docs
  don't accumulate as `processed: false` forever) and lets us see
  via logs that webhooks are landing.

Verified end-to-end against the deployed comfy-pr.vercel.app:
- POST /api/webhook/notion ping → 200, GET shows {total:0,pending:0}
- POST /api/webhook/github ping with x-github-delivery → 200,
  webhook_queue doc created, bot consumer logged "github event ...
  (ping) — no handler wired yet", processed=true within seconds.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two real Slack DMs to @comfy-pr-bot today both failed silently with
"An error occurred while processing this request, I will try it again
later". The pm2 log showed only:

    [error] Agent SDK error:
    { "err": {} }
    [error] claude-yes process for task ... exited with code 1

The empty err object was winston's serialization of an Error instance
hiding the real message. Running the same spawn manually surfaced it:

    Configuration error in /tmp/task-…/.claude.json:
    JSON Parse error: Unexpected EOF

The Claude Agent SDK CLI (v2.1.114) bails out at startup when it finds
a `~/.claude.json` that exists but isn't valid JSON. The CLI itself
truncates the file mid-write on aborted runs, leaving a 0-byte file
that poisons every subsequent task that reuses the same task user
(useradd is idempotent, so the home dir persists across tasks).

Two fixes:

- bot/task-user.ts: ensureClaudeConfig() — write `{}` whenever the
  file is missing, empty, or fails JSON.parse, before chowning to the
  task user. Runs on both create-fresh and resume-existing paths.
- bot/slack-bot.ts: pull message/code/stack out of caught Error before
  passing to winston so the next regression isn't invisible.

Verified: with the fix applied to the existing 0-byte file, the
SDK CLI runs `--version` cleanly under sudo as the task user.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
End-to-end debugging of "An error occurred while processing this
request" on Slack DMs (2026-04-30) found three bugs stacked on top of
each other. Each one masked the next, and the only error visible in
pm2 logs was an empty `Agent SDK error: { err: {} }`. Until disk
exhaustion was found, every fix uncovered the next layer.

1. SDK picks the musl binary on a glibc host
   The Claude Agent SDK v0.2.114 enumerates platform binaries in the
   order [linux-x64-musl, linux-x64] and resolves the first that is
   `require.resolve()`-able. Both are installed via the optionalDeps,
   so it always picks musl, which then fails on Debian glibc with
   "sudo: unable to execute …-musl/claude: No such file or directory".
   Fix: pass `pathToClaudeCodeExecutable` explicitly pointing at the
   `…linux-x64/claude` glibc binary.

2. sudo --preserve-env (no list) silently drops every env var
   --preserve-env without an explicit list only keeps env_keep
   defaults (HOME, PATH, TERM). ANTHROPIC_API_KEY, GH_TOKEN,
   MONGODB_URI, etc. were stripped before reaching the agent
   subprocess, which then died at startup. Fix: build the
   --preserve-env=KEY1,KEY2,... list from the resolved childEnv keys.

3. Mirror agent stderr so the next regression isn't invisible
   The SDK's `stderr` callback only fires for SDK-format messages.
   Plain stderr from the agent's startup path (e.g. "Credit balance
   is too low", "JSON Parse error" on a corrupt config) was getting
   swallowed by our SpawnedProcess wrapper, which doesn't expose
   stderr. Mirror it unconditionally to bot stderr so pm2 catches it.

Plus a small DEBUG_SPAWN dump (off by default) for spawn-args/cwd/env-key
sniffing if anything regresses again.

After all three fixes the spawn now actually reaches the Anthropic
backend cleanly. Today's outage is now a billing problem (low credit
balance), not a code problem.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants