Skip to content

Commit 4c6a89f

Browse files
committed
qstash dedup key
1 parent 5b9cef6 commit 4c6a89f

4 files changed

Lines changed: 37 additions & 4 deletions

File tree

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
-- Drop the existing full unique constraint on deduplicationKey
2+
ALTER TABLE "OutgoingRequest" DROP CONSTRAINT "OutgoingRequest_deduplicationKey_key";
3+
4+
-- SPLIT_STATEMENT_SENTINEL
5+
-- Create a partial unique index that only enforces uniqueness for rows
6+
-- where startedFulfillingAt IS NULL (i.e. pending/unclaimed requests).
7+
-- This allows duplicate deduplicationKey values for rows that have already
8+
-- been claimed for processing.
9+
CREATE UNIQUE INDEX "OutgoingRequest_deduplicationKey_pending_key"
10+
ON "OutgoingRequest" ("deduplicationKey")
11+
WHERE "startedFulfillingAt" IS NULL;

apps/backend/prisma/schema.prisma

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1090,7 +1090,8 @@ model OutgoingRequest {
10901090
startedFulfillingAt DateTime?
10911091
deduplicationKey String?
10921092
1093-
@@unique([deduplicationKey])
1093+
// Partial unique index on deduplicationKey WHERE startedFulfillingAt IS NULL
1094+
// is created in a custom migration (not expressible in Prisma schema)
10941095
@@index([startedFulfillingAt, createdAt])
10951096
@@index([startedFulfillingAt, deduplicationKey])
10961097
}

apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000;
2020
const DIRECT_SYNC_ENV = "STACK_EXTERNAL_DB_SYNC_DIRECT";
2121
const POLLER_CLAIM_LIMIT_ENV = "STACK_EXTERNAL_DB_SYNC_POLL_CLAIM_LIMIT";
2222
const DEFAULT_POLL_CLAIM_LIMIT = 1000;
23+
const STALE_REQUEST_THRESHOLD_MS = 60 * 1000;
2324

2425
function parseMaxDurationMs(value: string | undefined): number {
2526
if (!value) return DEFAULT_MAX_DURATION_MS;
@@ -90,7 +91,6 @@ export const GET = createSmartRouteHandler({
9091
span.setAttribute("stack.external-db-sync.max-duration-ms", maxDurationMs);
9192
span.setAttribute("stack.external-db-sync.poll-interval-ms", pollIntervalMs);
9293
span.setAttribute("stack.external-db-sync.poller-claim-limit", pollerClaimLimit);
93-
span.setAttribute("stack.external-db-sync.direct-sync", directSyncEnabled());
9494
span.setAttribute("stack.external-db-sync.stale-claim-minutes", staleClaimIntervalMinutes);
9595

9696
let totalRequestsProcessed = 0;
@@ -172,11 +172,13 @@ export const GET = createSmartRouteHandler({
172172
}
173173

174174
const flowControl = options.flowControl as UpstashRequest["flowControl"];
175+
const deduplicationId = options.deduplicationId as UpstashRequest["deduplicationId"];
175176

176177
return {
177178
url: fullUrl,
178179
body: options.body,
179180
...(flowControl ? { flowControl } : {}),
181+
...(deduplicationId ? { deduplicationId } : {})
180182
};
181183
}
182184

@@ -243,6 +245,24 @@ export const GET = createSmartRouteHandler({
243245
return { stopReason: "disabled", processed: 0 };
244246
}
245247

248+
const staleRequests = await globalPrismaClient.$queryRaw<{ id: string, startedFulfillingAt: Date }[]>`
249+
SELECT "id", "startedFulfillingAt"
250+
FROM "OutgoingRequest"
251+
WHERE "startedFulfillingAt" IS NOT NULL
252+
AND "startedFulfillingAt" < NOW() - ${STALE_REQUEST_THRESHOLD_MS} * INTERVAL '1 millisecond'
253+
LIMIT 10
254+
`;
255+
iterationSpan.setAttribute("stack.external-db-sync.stale-count", staleRequests.length);
256+
if (staleRequests.length > 0) {
257+
captureError(
258+
"poller-stale-outgoing-requests",
259+
new StackAssertionError(
260+
`Found ${staleRequests.length} outgoing request(s) with startedFulfillingAt older than ${STALE_REQUEST_THRESHOLD_MS}ms`,
261+
{ staleRequestIds: staleRequests.map(r => r.id) },
262+
),
263+
);
264+
}
265+
246266
const pendingRequests = await claimPendingRequests();
247267
iterationSpan.setAttribute("stack.external-db-sync.pending-count", pendingRequests.length);
248268

apps/backend/src/lib/external-db-sync-queue.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,12 @@ export async function enqueueExternalDbSyncBatch(tenancyIds: string[]): Promise<
3333
json_build_object(
3434
'url', '/api/latest/internal/external-db-sync/sync-engine',
3535
'body', json_build_object('tenancyId', t.tenancy_id),
36-
'flowControl', json_build_object('key', 'sentinel-sync-key', 'parallelism', 20)
36+
'flowControl', json_build_object('key', 'sentinel-sync-key', 'parallelism', 20),
37+
'deduplicationId', t.tenancy_id
3738
),
3839
NULL,
3940
'sentinel-sync-key-' || t.tenancy_id
4041
FROM unnest(${tenancyIds}::uuid[]) AS t(tenancy_id)
41-
ON CONFLICT ("deduplicationKey") DO NOTHING
42+
ON CONFLICT ("deduplicationKey") WHERE "startedFulfillingAt" IS NULL DO NOTHING
4243
`;
4344
}

0 commit comments

Comments
 (0)