Skip to content

Commit 185ac08

Browse files
author
marcus
committed
fix
1 parent f1f3366 commit 185ac08

3 files changed

Lines changed: 34 additions & 14 deletions

File tree

src/app/api/sync/events/route.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { NextResponse } from "next/server";
22
import { requireAppUser } from "@/server/auth/user";
3+
import { getQueueBackend } from "@/server/queue/queue";
34
import { createSyncEventSubscriber, getSyncEventsChannelForUser } from "@/server/queue/sync-events";
45

56
export const runtime = "nodejs";
@@ -14,14 +15,27 @@ function encodeSse(data: unknown, event?: string): Uint8Array {
1415
export async function GET(request: Request) {
1516
const { appUser } = await requireAppUser();
1617
const channel = getSyncEventsChannelForUser(appUser.id);
18+
const backend = getQueueBackend();
1719

1820
let cleanupRef: (() => Promise<void>) | null = null;
1921

2022
const stream = new ReadableStream<Uint8Array>({
2123
async start(controller) {
22-
const subscriber = createSyncEventSubscriber();
2324
let heartbeatTimer: ReturnType<typeof setInterval> | null = null;
2425

26+
if (backend === "supabase") {
27+
controller.enqueue(encodeSse({ ok: true, connected: true, mode: "supabase", timestamp: new Date().toISOString() }, "connected"));
28+
heartbeatTimer = setInterval(() => {
29+
controller.enqueue(encodeSse({ t: Date.now() }, "ping"));
30+
}, 20000);
31+
cleanupRef = async () => {
32+
if (heartbeatTimer) clearInterval(heartbeatTimer);
33+
};
34+
return;
35+
}
36+
37+
const subscriber = createSyncEventSubscriber();
38+
2539
const onMessage = (incomingChannel: string, message: string) => {
2640
if (incomingChannel !== channel) return;
2741
controller.enqueue(encodeSse(message));

src/server/queue/queue.ts

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ export type SyncProvider = "GITLAB" | "AZURE_DEVOPS" | "GITHUB";
77
export type QueueBackend = "bull" | "supabase";
88

99
const backend: QueueBackend = process.env.SYNC_QUEUE_BACKEND === "supabase" ? "supabase" : "bull";
10-
11-
const connection = new IORedis(process.env.REDIS_URL ?? "redis://localhost:6379", {
12-
maxRetriesPerRequest: null,
13-
enableReadyCheck: false,
14-
});
15-
16-
export const syncQueue = new Queue("contribution-sync", { connection });
10+
let queueInstance: Queue | null = null;
11+
function getSyncQueue(): Queue {
12+
if (queueInstance) return queueInstance;
13+
const connection = new IORedis(process.env.REDIS_URL ?? "redis://localhost:6379", {
14+
maxRetriesPerRequest: null,
15+
enableReadyCheck: false,
16+
});
17+
queueInstance = new Queue("contribution-sync", { connection });
18+
return queueInstance;
19+
}
1720

1821
export type SyncJobOptions = {
1922
from?: string;
@@ -60,7 +63,7 @@ export async function enqueueUserSync(userId: string, options?: SyncJobOptions)
6063
return;
6164
}
6265

63-
await syncQueue.add("sync-user", { userId, options }, { attempts: 3, backoff: { type: "exponential", delay: 1000 } });
66+
await getSyncQueue().add("sync-user", { userId, options }, { attempts: 3, backoff: { type: "exponential", delay: 1000 } });
6467
}
6568

6669
export async function scheduleNightlySync() {
@@ -69,7 +72,7 @@ export async function scheduleNightlySync() {
6972
return;
7073
}
7174

72-
await syncQueue.add(
75+
await getSyncQueue().add(
7376
"nightly-sync",
7477
{},
7578
{
@@ -99,7 +102,7 @@ export async function listBackfillJobsForUser(userId: string, limit = 8): Promis
99102
}
100103

101104
try {
102-
const jobs = await syncQueue.getJobs(["waiting", "active", "completed", "failed", "delayed"], 0, 200);
105+
const jobs = await getSyncQueue().getJobs(["waiting", "active", "completed", "failed", "delayed"], 0, 200);
103106
const collected: BackfillJobView[] = [];
104107

105108
for (const job of jobs) {
@@ -145,7 +148,7 @@ export async function retryBackfillJobForUser(userId: string, jobId: string): Pr
145148
}
146149

147150
try {
148-
const job = await syncQueue.getJob(jobId);
151+
const job = await getSyncQueue().getJob(jobId);
149152
if (!job || job.name !== "sync-user" || !isBackfillJob(job.data) || job.data.userId !== userId) {
150153
return { ok: false };
151154
}
@@ -177,7 +180,7 @@ export async function removeBackfillJobForUser(userId: string, jobId: string): P
177180
}
178181

179182
try {
180-
const job = await syncQueue.getJob(jobId);
183+
const job = await getSyncQueue().getJob(jobId);
181184
if (!job || job.name !== "sync-user" || !isBackfillJob(job.data) || job.data.userId !== userId) {
182185
return { ok: false };
183186
}
@@ -198,7 +201,7 @@ export async function removeCompletedBackfillJobsForUser(userId: string): Promis
198201
}
199202

200203
try {
201-
const jobs = await syncQueue.getJobs(["completed"], 0, 300);
204+
const jobs = await getSyncQueue().getJobs(["completed"], 0, 300);
202205
let removed = 0;
203206

204207
for (const job of jobs) {

src/server/queue/sync-events.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import IORedis from "ioredis";
22
import { safeLog } from "@/server/crypto/logging";
3+
import { getQueueBackend } from "@/server/queue/queue";
34

45
const REDIS_URL = process.env.REDIS_URL ?? "redis://localhost:6379";
56
const CHANNEL_PREFIX = "pow:sync-events:user:";
@@ -20,6 +21,8 @@ function getChannel(userId: string): string {
2021
}
2122

2223
export async function publishSyncEvent(payload: Omit<SyncEventPayload, "timestamp">): Promise<void> {
24+
if (getQueueBackend() === "supabase") return;
25+
2326
const publisher = new IORedis(REDIS_URL, {
2427
maxRetriesPerRequest: null,
2528
enableReadyCheck: false,

0 commit comments

Comments
 (0)