Skip to content

Commit f04b18c

Browse files
author
marcus
committed
fix running queue
1 parent 185ac08 commit f04b18c

2 files changed

Lines changed: 25 additions & 2 deletions

File tree

src/app/api/internal/sync/process/route.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { processSupabaseSyncQueue } from "@/server/queue/supabase-processor";
44

55
export const runtime = "nodejs";
66
export const dynamic = "force-dynamic";
7+
export const maxDuration = 60;
78

89
function isAuthorized(request: Request): boolean {
910
const secret = process.env.CRON_SECRET;
@@ -24,8 +25,9 @@ export async function POST(request: Request) {
2425
}
2526

2627
const url = new URL(request.url);
27-
const limit = Number(url.searchParams.get("limit") ?? "5");
28-
const result = await processSupabaseSyncQueue(Number.isFinite(limit) ? limit : 5);
28+
const limit = Number(url.searchParams.get("limit") ?? "1");
29+
const boundedLimit = Number.isFinite(limit) ? Math.max(1, Math.min(3, limit)) : 1;
30+
const result = await processSupabaseSyncQueue(boundedLimit);
2931

3032
return NextResponse.json({ ok: true, ...result });
3133
}

src/server/queue/supabase-processor.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { publishSyncEvent } from "@/server/queue/sync-events";
44
import { syncUserContributions } from "@/server/sync/syncService";
55

66
const MAX_BATCH_SIZE = 20;
7+
const STALE_RUNNING_MS = 10 * 60 * 1000;
78
const SyncJobStatus = {
89
QUEUED: "QUEUED",
910
RUNNING: "RUNNING",
@@ -18,6 +19,26 @@ function toBackoffMs(attemptCount: number): number {
1819
export async function processSupabaseSyncQueue(limit = 5): Promise<{ picked: number; completed: number; failed: number }> {
1920
const batchSize = Math.max(1, Math.min(MAX_BATCH_SIZE, limit));
2021
const now = new Date();
22+
23+
// Recover jobs left RUNNING when a serverless execution is interrupted (timeout/redeploy/crash).
24+
const staleBefore = new Date(Date.now() - STALE_RUNNING_MS);
25+
const recovered = await (prisma as any).syncJob.updateMany({
26+
where: {
27+
status: SyncJobStatus.RUNNING,
28+
OR: [{ lockedAt: { lte: staleBefore } }, { lockedAt: null, startedAt: { lte: staleBefore } }],
29+
},
30+
data: {
31+
status: SyncJobStatus.QUEUED,
32+
availableAt: now,
33+
lockedAt: null,
34+
startedAt: null,
35+
errorMessage: "Recovered stale RUNNING lock after processor interruption.",
36+
},
37+
});
38+
if (recovered.count > 0) {
39+
safeLog("warn", "Recovered stale supabase sync jobs", { count: recovered.count });
40+
}
41+
2142
const candidates = await (prisma as any).syncJob.findMany({
2243
where: {
2344
status: SyncJobStatus.QUEUED,

0 commit comments

Comments
 (0)