Skip to content

Commit d0e9680

Browse files
author
marcus
committed
Add Supabase-backed sync queue backend with cron processor while keeping BullMQ
1 parent c62353b commit d0e9680

19 files changed

Lines changed: 403 additions & 43 deletions

File tree

.env.example

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,10 @@ SUPABASE_SERVICE_ROLE_KEY="your-service-role-key"
88
MASTER_KEY="base64-32-byte-key"
99
REDIS_URL="redis://localhost:6379"
1010

11+
# Queue backend: bull (default) or supabase
12+
SYNC_QUEUE_BACKEND="bull"
13+
14+
# Required only for Supabase queue cron processor endpoint
15+
CRON_SECRET="change-me"
16+
1117
APP_URL="http://localhost:3000"

README.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,8 @@ Copy `.env.example` to `.env.local` and set:
288288
- `SUPABASE_SERVICE_ROLE_KEY`
289289
- `MASTER_KEY` (base64-encoded 32-byte key)
290290
- `REDIS_URL`
291+
- `SYNC_QUEUE_BACKEND` (`bull` or `supabase`)
292+
- `CRON_SECRET` (required when `SYNC_QUEUE_BACKEND=supabase`)
291293
- `APP_URL`
292294
- `NEXT_PUBLIC_APP_NAME` (optional, default `ContributionPulse`)
293295
- `NEXT_PUBLIC_APP_SLUG` (optional, default derived from app name)
@@ -313,6 +315,50 @@ npm run worker:nightly
313315

314316
`worker:nightly` registers the repeatable nightly sync schedule. Run it once per environment.
315317

318+
### Supabase queue mode (no Redis worker required)
319+
If you want to run without BullMQ/Redis for low-cost environments:
320+
321+
1. Set:
322+
```bash
323+
SYNC_QUEUE_BACKEND=supabase
324+
CRON_SECRET=<strong-random-secret>
325+
```
326+
327+
2. Do not run `npm run worker` for queue processing.
328+
329+
3. Trigger queue processing by calling:
330+
```bash
331+
POST /api/internal/sync/process
332+
Authorization: Bearer <CRON_SECRET>
333+
```
334+
335+
You can process manually:
336+
```bash
337+
npm run queue:process
338+
```
339+
340+
4. Schedule this endpoint with Supabase Cron (example every minute):
341+
```sql
342+
select
343+
cron.schedule(
344+
'process-contribution-sync-queue',
345+
'* * * * *',
346+
$$
347+
select
348+
net.http_post(
349+
url := 'https://YOUR_APP_DOMAIN/api/internal/sync/process?limit=10',
350+
headers := jsonb_build_object(
351+
'Content-Type', 'application/json',
352+
'Authorization', 'Bearer YOUR_CRON_SECRET'
353+
),
354+
body := '{}'::jsonb
355+
);
356+
$$
357+
);
358+
```
359+
360+
This keeps BullMQ code in the project for future scale, while allowing Redis-free operation today.
361+
316362
---
317363

318364
## Testing

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
"prisma:generate": "prisma generate",
1313
"prisma:migrate": "prisma migrate dev",
1414
"worker": "tsx scripts/worker.ts",
15-
"worker:nightly": "tsx scripts/nightly.ts"
15+
"worker:nightly": "tsx scripts/nightly.ts",
16+
"queue:process": "tsx scripts/process-supabase-queue.ts"
1617
},
1718
"dependencies": {
1819
"@prisma/client": "6.5.0",
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
CREATE TYPE "SyncJobStatus" AS ENUM ('QUEUED', 'RUNNING', 'COMPLETED', 'FAILED');
2+
3+
CREATE TABLE "SyncJob" (
4+
"id" TEXT NOT NULL,
5+
"userId" TEXT NOT NULL,
6+
"provider" "Provider",
7+
"from" TIMESTAMP(3),
8+
"to" TIMESTAMP(3),
9+
"backfillYear" INTEGER,
10+
"status" "SyncJobStatus" NOT NULL DEFAULT 'QUEUED',
11+
"attemptCount" INTEGER NOT NULL DEFAULT 0,
12+
"maxAttempts" INTEGER NOT NULL DEFAULT 3,
13+
"availableAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
14+
"lockedAt" TIMESTAMP(3),
15+
"startedAt" TIMESTAMP(3),
16+
"finishedAt" TIMESTAMP(3),
17+
"errorMessage" TEXT,
18+
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
19+
"updatedAt" TIMESTAMP(3) NOT NULL,
20+
21+
CONSTRAINT "SyncJob_pkey" PRIMARY KEY ("id")
22+
);
23+
24+
ALTER TABLE "SyncJob"
25+
ADD CONSTRAINT "SyncJob_userId_fkey"
26+
FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE;
27+
28+
CREATE INDEX "SyncJob_status_availableAt_idx" ON "SyncJob"("status", "availableAt");
29+
CREATE INDEX "SyncJob_userId_createdAt_idx" ON "SyncJob"("userId", "createdAt");
30+
CREATE INDEX "SyncJob_userId_backfillYear_idx" ON "SyncJob"("userId", "backfillYear");

prisma/schema.prisma

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,13 @@ enum SyncState {
2020
FAILED
2121
}
2222

23+
enum SyncJobStatus {
24+
QUEUED
25+
RUNNING
26+
COMPLETED
27+
FAILED
28+
}
29+
2330
model User {
2431
id String @id @default(cuid())
2532
supabaseUserId String @unique
@@ -30,6 +37,7 @@ model User {
3037
dailyActivities DailyActivity[]
3138
manualHighlights ManualHighlight[]
3239
publicShares PublicShare[]
40+
syncJobs SyncJob[]
3341
3442
@@index([email])
3543
}
@@ -94,3 +102,27 @@ model PublicShare {
94102
95103
@@index([userId])
96104
}
105+
106+
model SyncJob {
107+
id String @id @default(cuid())
108+
userId String
109+
provider Provider?
110+
from DateTime?
111+
to DateTime?
112+
backfillYear Int?
113+
status SyncJobStatus @default(QUEUED)
114+
attemptCount Int @default(0)
115+
maxAttempts Int @default(3)
116+
availableAt DateTime @default(now())
117+
lockedAt DateTime?
118+
startedAt DateTime?
119+
finishedAt DateTime?
120+
errorMessage String?
121+
createdAt DateTime @default(now())
122+
updatedAt DateTime @updatedAt
123+
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
124+
125+
@@index([status, availableAt])
126+
@@index([userId, createdAt])
127+
@@index([userId, backfillYear])
128+
}

scripts/process-supabase-queue.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import "dotenv/config";
2+
import { getQueueBackend } from "../src/server/queue/queue";
3+
import { processSupabaseSyncQueue } from "../src/server/queue/supabase-processor";
4+
5+
async function main() {
6+
if (getQueueBackend() !== "supabase") {
7+
// eslint-disable-next-line no-console
8+
console.log("SYNC_QUEUE_BACKEND is not supabase; nothing to process.");
9+
return;
10+
}
11+
12+
const result = await processSupabaseSyncQueue(10);
13+
// eslint-disable-next-line no-console
14+
console.log("Supabase queue processed", result);
15+
}
16+
17+
main()
18+
.then(() => process.exit(0))
19+
.catch((error) => {
20+
// eslint-disable-next-line no-console
21+
console.error(error);
22+
process.exit(1);
23+
});

src/app/api/integrations/azure-devops/route.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { NextResponse } from "next/server";
2-
import { Provider } from "@prisma/client";
32
import { encryptSecret } from "@/server/crypto/encryption";
43
import { prisma } from "@/server/db/prisma";
54
import { requireAppUser } from "@/server/auth/user";
@@ -19,11 +18,11 @@ export async function POST(request: Request) {
1918

2019
await prisma.integration.upsert({
2120
where: {
22-
userId_provider: { userId: appUser.id, provider: Provider.AZURE_DEVOPS },
21+
userId_provider: { userId: appUser.id, provider: "AZURE_DEVOPS" },
2322
},
2423
create: {
2524
userId: appUser.id,
26-
provider: Provider.AZURE_DEVOPS,
25+
provider: "AZURE_DEVOPS",
2726
encryptedToken: encrypted.ciphertext,
2827
tokenIv: encrypted.iv,
2928
tokenTag: encrypted.tag,

src/app/api/integrations/disconnect/route.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { NextResponse } from "next/server";
2-
import { Provider } from "@prisma/client";
32
import { requireAppUser } from "@/server/auth/user";
43
import { prisma } from "@/server/db/prisma";
54

@@ -15,7 +14,7 @@ export async function POST(request: Request) {
1514
provider = String(form.get("provider") ?? "");
1615
}
1716

18-
if (provider !== Provider.GITLAB && provider !== Provider.AZURE_DEVOPS && provider !== Provider.GITHUB) {
17+
if (provider !== "GITLAB" && provider !== "AZURE_DEVOPS" && provider !== "GITHUB") {
1918
return NextResponse.json({ error: "Invalid provider" }, { status: 400 });
2019
}
2120

src/app/api/integrations/gitlab/route.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { NextResponse } from "next/server";
2-
import { Provider } from "@prisma/client";
32
import { encryptSecret } from "@/server/crypto/encryption";
43
import { prisma } from "@/server/db/prisma";
54
import { requireAppUser } from "@/server/auth/user";
@@ -18,11 +17,11 @@ export async function POST(request: Request) {
1817

1918
await prisma.integration.upsert({
2019
where: {
21-
userId_provider: { userId: appUser.id, provider: Provider.GITLAB },
20+
userId_provider: { userId: appUser.id, provider: "GITLAB" },
2221
},
2322
create: {
2423
userId: appUser.id,
25-
provider: Provider.GITLAB,
24+
provider: "GITLAB",
2625
encryptedToken: encrypted.ciphertext,
2726
tokenIv: encrypted.iv,
2827
tokenTag: encrypted.tag,
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { NextResponse } from "next/server";
2+
import { getQueueBackend } from "@/server/queue/queue";
3+
import { processSupabaseSyncQueue } from "@/server/queue/supabase-processor";
4+
5+
export const runtime = "nodejs";
6+
export const dynamic = "force-dynamic";
7+
8+
function isAuthorized(request: Request): boolean {
9+
const secret = process.env.CRON_SECRET;
10+
if (!secret) return false;
11+
const bearer = request.headers.get("authorization");
12+
if (bearer === `Bearer ${secret}`) return true;
13+
const xSecret = request.headers.get("x-cron-secret");
14+
return xSecret === secret;
15+
}
16+
17+
export async function POST(request: Request) {
18+
if (!isAuthorized(request)) {
19+
return NextResponse.json({ ok: false, error: "unauthorized" }, { status: 401 });
20+
}
21+
22+
if (getQueueBackend() !== "supabase") {
23+
return NextResponse.json({ ok: false, error: "sync_queue_backend_not_supabase" }, { status: 400 });
24+
}
25+
26+
const url = new URL(request.url);
27+
const limit = Number(url.searchParams.get("limit") ?? "5");
28+
const result = await processSupabaseSyncQueue(Number.isFinite(limit) ? limit : 5);
29+
30+
return NextResponse.json({ ok: true, ...result });
31+
}

0 commit comments

Comments
 (0)