Skip to content

Commit 6c05c88

Browse files
committed
Fix external DB sync poller stale claim recovery
1 parent 66adb4e commit 6c05c88

1 file changed

Lines changed: 86 additions & 33 deletions

File tree

  • apps/backend/src/app/api/latest/internal/external-db-sync/poller

apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts

Lines changed: 86 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,15 @@ const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000;
2020
const DIRECT_SYNC_ENV = "STACK_EXTERNAL_DB_SYNC_DIRECT";
2121
const POLLER_CLAIM_LIMIT_ENV = "STACK_EXTERNAL_DB_SYNC_POLL_CLAIM_LIMIT";
2222
const DEFAULT_POLL_CLAIM_LIMIT = 1000;
23-
const STALE_REQUEST_THRESHOLD_MS = 60 * 1000;
23+
const STALE_CLAIM_INTERVAL_MINUTES = 5;
24+
25+
type ClaimedOutgoingRequest = OutgoingRequest & {
26+
wasStale: boolean,
27+
};
28+
29+
function getClaimedStaleRequestIds(claimedRequests: ClaimedOutgoingRequest[]): string[] {
30+
return claimedRequests.filter((request) => request.wasStale).map((request) => request.id);
31+
}
2432

2533
function parseMaxDurationMs(value: string | undefined): number {
2634
if (!value) return DEFAULT_MAX_DURATION_MS;
@@ -85,32 +93,63 @@ export const GET = createSmartRouteHandler({
8593
const startTime = performance.now();
8694
const maxDurationMs = parseMaxDurationMs(query.maxDurationMs);
8795
const pollIntervalMs = 50;
88-
const staleClaimIntervalMinutes = 5;
8996
const pollerClaimLimit = getPollerClaimLimit();
9097

9198
span.setAttribute("stack.external-db-sync.max-duration-ms", maxDurationMs);
9299
span.setAttribute("stack.external-db-sync.poll-interval-ms", pollIntervalMs);
93100
span.setAttribute("stack.external-db-sync.poller-claim-limit", pollerClaimLimit);
94-
span.setAttribute("stack.external-db-sync.stale-claim-minutes", staleClaimIntervalMinutes);
101+
span.setAttribute("stack.external-db-sync.stale-claim-minutes", STALE_CLAIM_INTERVAL_MINUTES);
95102

96103
let totalRequestsProcessed = 0;
97104
let iterationCount = 0;
98105

99-
async function claimPendingRequests(): Promise<OutgoingRequest[]> {
106+
async function claimPendingRequests(): Promise<ClaimedOutgoingRequest[]> {
100107
return await traceSpan("external-db-sync.poller.claimPendingRequests", async (claimSpan) => {
101-
const requests = await globalPrismaClient.$queryRaw<OutgoingRequest[]>`
102-
UPDATE "OutgoingRequest"
103-
SET "startedFulfillingAt" = NOW()
104-
WHERE "id" IN (
105-
SELECT id
106-
FROM "OutgoingRequest"
107-
WHERE "startedFulfillingAt" IS NULL
108-
LIMIT ${pollerClaimLimit}
109-
FOR UPDATE SKIP LOCKED
110-
)
111-
RETURNING *;
112-
`;
108+
const requests = await retryTransaction(globalPrismaClient, async (tx) => {
109+
const claimTime = new Date();
110+
const reclaimBefore = new Date(claimTime.getTime() - STALE_CLAIM_INTERVAL_MINUTES * 60 * 1000);
111+
112+
const candidateRequests = await tx.outgoingRequest.findMany({
113+
where: {
114+
OR: [
115+
{ startedFulfillingAt: null },
116+
{ startedFulfillingAt: { lt: reclaimBefore } },
117+
],
118+
},
119+
orderBy: [
120+
{ createdAt: "asc" },
121+
{ id: "asc" },
122+
],
123+
take: pollerClaimLimit,
124+
});
125+
126+
const claimedRequests: ClaimedOutgoingRequest[] = [];
127+
for (const candidateRequest of candidateRequests) {
128+
const claimResult = await tx.outgoingRequest.updateMany({
129+
where: {
130+
id: candidateRequest.id,
131+
startedFulfillingAt: candidateRequest.startedFulfillingAt,
132+
},
133+
data: {
134+
startedFulfillingAt: claimTime,
135+
},
136+
});
137+
138+
if (claimResult.count !== 1) {
139+
continue;
140+
}
141+
142+
claimedRequests.push({
143+
...candidateRequest,
144+
startedFulfillingAt: claimTime,
145+
wasStale: candidateRequest.startedFulfillingAt != null,
146+
});
147+
}
148+
149+
return claimedRequests;
150+
}, { level: "serializable" });
113151
claimSpan.setAttribute("stack.external-db-sync.claimed-count", requests.length);
152+
claimSpan.setAttribute("stack.external-db-sync.claimed-stale-count", getClaimedStaleRequestIds(requests).length);
114153
return requests;
115154
});
116155
}
@@ -127,6 +166,7 @@ export const GET = createSmartRouteHandler({
127166
await tx.outgoingRequest.deleteMany({ where: { id: { in: ids } } });
128167
});
129168
}
169+
130170
async function processRequest(request: OutgoingRequest): Promise<void> {
131171
// Prisma JsonValue doesn't carry a precise shape for this JSON blob.
132172
const options = request.qstashOptions as any;
@@ -243,25 +283,16 @@ export const GET = createSmartRouteHandler({
243283
return { stopReason: "disabled", processed: 0 };
244284
}
245285

246-
const staleRequests = await globalPrismaClient.$queryRaw<{ id: string, startedFulfillingAt: Date }[]>`
247-
SELECT "id", "startedFulfillingAt"
248-
FROM "OutgoingRequest"
249-
WHERE "startedFulfillingAt" IS NOT NULL
250-
AND "startedFulfillingAt" < NOW() - ${STALE_REQUEST_THRESHOLD_MS} * INTERVAL '1 millisecond'
251-
LIMIT 10
252-
`;
253-
iterationSpan.setAttribute("stack.external-db-sync.stale-count", staleRequests.length);
254-
if (staleRequests.length > 0) {
255-
captureError(
256-
"poller-stale-outgoing-requests",
257-
new StackAssertionError(
258-
`Found ${staleRequests.length} outgoing request(s) with startedFulfillingAt older than ${STALE_REQUEST_THRESHOLD_MS}ms`,
259-
{ staleRequestIds: staleRequests.map(r => r.id) },
260-
),
286+
const pendingRequests = await claimPendingRequests();
287+
const reclaimedStaleRequestIds = getClaimedStaleRequestIds(pendingRequests);
288+
iterationSpan.setAttribute("stack.external-db-sync.reclaimed-count", reclaimedStaleRequestIds.length);
289+
iterationSpan.setAttribute("stack.external-db-sync.stale-count", reclaimedStaleRequestIds.length);
290+
if (reclaimedStaleRequestIds.length > 0) {
291+
console.warn(
292+
`[Poller] Reclaimed ${reclaimedStaleRequestIds.length} stale outgoing request(s) older than ${STALE_CLAIM_INTERVAL_MINUTES} minutes.`,
293+
{ staleRequestIds: reclaimedStaleRequestIds },
261294
);
262295
}
263-
264-
const pendingRequests = await claimPendingRequests();
265296
iterationSpan.setAttribute("stack.external-db-sync.pending-count", pendingRequests.length);
266297

267298
const processed = await processRequests(pendingRequests);
@@ -289,3 +320,25 @@ export const GET = createSmartRouteHandler({
289320
});
290321
},
291322
});
323+
324+
import.meta.vitest?.describe("getClaimedStaleRequestIds(...)", () => {
325+
import.meta.vitest?.test("returns only requests reclaimed from stale claims", ({ expect }) => {
326+
expect(getClaimedStaleRequestIds([
327+
{
328+
id: "fresh-claim",
329+
wasStale: false,
330+
} as ClaimedOutgoingRequest,
331+
{
332+
id: "reclaimed-stale-1",
333+
wasStale: true,
334+
} as ClaimedOutgoingRequest,
335+
{
336+
id: "reclaimed-stale-2",
337+
wasStale: true,
338+
} as ClaimedOutgoingRequest,
339+
])).toEqual([
340+
"reclaimed-stale-1",
341+
"reclaimed-stale-2",
342+
]);
343+
});
344+
});

0 commit comments

Comments
 (0)