Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .mcp.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
"svelte": {
"type": "http",
"url": "https://mcp.svelte.dev/mcp"
},
"cloudflare": {
"type": "http",
"url": "https://mcp.cloudflare.com/mcp"
}
}
}
4 changes: 4 additions & 0 deletions opencode.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
"svelte": {
"type": "remote",
"url": "https://mcp.svelte.dev/mcp"
},
"cloudflare": {
"type": "remote",
"url": "https://mcp.cloudflare.com/mcp"
}
}
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"scripts": {
"dev": "vite dev",
"build": "vite build",
"preview": "wrangler dev .svelte-kit/cloudflare/_worker.js --port 4173",
"preview": "wrangler dev --port 4173",
"prepare": "svelte-kit sync || echo ''",
"check": "svelte-kit sync && svelte-check --tsconfig ./tsconfig.json",
"check:watch": "svelte-kit sync && svelte-check --tsconfig ./tsconfig.json --watch",
Expand Down
77 changes: 25 additions & 52 deletions src/lib/server/extraction-jobs.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { error, type RequestEvent } from '@sveltejs/kit';
import { error } from '@sveltejs/kit';

const GMAIL_SCOPE_PROFILE = 'https://gmail.googleapis.com/gmail/v1/users/me/profile';
const GMAIL_SCOPE_MESSAGES = 'https://gmail.googleapis.com/gmail/v1/users/me/messages';
Expand All @@ -7,7 +7,7 @@ const GMAIL_BATCH_ENDPOINT = 'https://www.googleapis.com/batch/gmail/v1';
const EMAIL_BATCH_SIZE = 50;
const GMAIL_BATCH_API_SIZE = 100;
const MESSAGES_PER_PAGE = 500;
const PAGES_PER_STEP = 1;
const PAGES_PER_INVOCATION = 4;

const RETRY_BASE_DELAY_MS = 1000;
const RETRY_MAX_DELAY_MS = 30_000;
Expand Down Expand Up @@ -179,7 +179,7 @@ export async function getExtractionStatus(db: D1Database, jobId: string, jobKey:
};
}

export async function processExtractionJobStep(params: {
export async function processExtractionJobChunk(params: {
db: D1Database;
jobId: string;
jobKey: string;
Expand Down Expand Up @@ -208,29 +208,23 @@ export async function processExtractionJobStep(params: {
.bind(jobId)
.first<ExtractionJobRow>();

if (!job || job.job_key !== jobKey) {
return { done: true };
}

if (job.status === 'completed' || job.status === 'failed') {
return { done: true };
}

if (!job || job.job_key !== jobKey) return { done: true };
if (job.status === 'completed' || job.status === 'failed') return { done: true };
if (!job.access_token) {
await markJobFailed(db, jobId, 'Missing access token for extraction job.');
return { done: true };
}

try {
await db
.prepare('UPDATE extraction_jobs SET status = ?, updated_at = ? WHERE id = ?')
.bind('running', new Date().toISOString(), jobId)
.run();
await db
.prepare('UPDATE extraction_jobs SET status = ?, updated_at = ? WHERE id = ?')
.bind('running', new Date().toISOString(), jobId)
.run();

let nextPageToken = job.next_page_token ?? undefined;
let scannedMessages = job.scanned_messages;
let nextPageToken = job.next_page_token ?? undefined;
let scannedMessages = job.scanned_messages;

for (let i = 0; i < PAGES_PER_STEP; i += 1) {
try {
for (let i = 0; i < PAGES_PER_INVOCATION; i += 1) {
const listUrl = new URL(GMAIL_SCOPE_MESSAGES);
listUrl.searchParams.set('maxResults', String(MESSAGES_PER_PAGE));
listUrl.searchParams.set('includeSpamTrash', 'true');
Expand Down Expand Up @@ -259,20 +253,20 @@ export async function processExtractionJobStep(params: {
await db
.prepare(
`
UPDATE extraction_jobs
SET
status = ?,
scanned_messages = ?,
unique_senders = ?,
next_page_token = ?,
access_token = ?,
last_error = NULL,
updated_at = ?
WHERE id = ?
`
UPDATE extraction_jobs
SET
status = ?,
scanned_messages = ?,
unique_senders = ?,
next_page_token = ?,
access_token = ?,
last_error = NULL,
updated_at = ?
WHERE id = ?
`
)
.bind(
done ? 'completed' : 'running',
done ? 'completed' : 'pending',
scannedMessages,
uniqueSenders,
nextPageToken ?? null,
Expand All @@ -289,27 +283,6 @@ export async function processExtractionJobStep(params: {
}
}

export function scheduleExtractionStep(
event: RequestEvent,
payload: { jobId: string; jobKey: string }
): void {
const schedule = async () => {
const url = new URL('/api/extraction/process', event.url);
await fetch(url.toString(), {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify(payload)
});
};

if (event.platform?.ctx) {
event.platform.ctx.waitUntil(schedule());
return;
}

void schedule();
}

async function markJobFailed(db: D1Database, jobId: string, message: string): Promise<void> {
await db
.prepare(
Expand Down
49 changes: 49 additions & 0 deletions src/lib/server/extraction-queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { ensureExtractionTables, processExtractionJobChunk } from '$lib/server/extraction-jobs';

export type ExtractionQueueMessage = {
jobId: string;
jobKey: string;
};

type ExtractionQueueEnv = {
loop_extract_emails_prod: D1Database;
EXTRACTION_QUEUE: Queue<ExtractionQueueMessage>;
};

export async function enqueueExtractionJob(
queue: Queue<ExtractionQueueMessage>,
message: ExtractionQueueMessage
): Promise<void> {
await queue.send(message, { contentType: 'json' });
}

export async function handleExtractionQueueBatch(
batch: MessageBatch<unknown>,
env: ExtractionQueueEnv
): Promise<void> {
await ensureExtractionTables(env.loop_extract_emails_prod);

for (const message of batch.messages) {
if (!isExtractionQueueMessage(message.body)) {
console.warn('Ignoring malformed extraction queue message');
continue;
}

const result = await processExtractionJobChunk({
db: env.loop_extract_emails_prod,
jobId: message.body.jobId,
jobKey: message.body.jobKey
});

if (!result.done) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think re-enqueuing a batch when the job isn't quite done yet is clean & elegant!

await enqueueExtractionJob(env.EXTRACTION_QUEUE, message.body);
}
}
}

function isExtractionQueueMessage(value: unknown): value is ExtractionQueueMessage {
if (typeof value !== 'object' || value === null) return false;

const record = value as Record<string, unknown>;
return typeof record.jobId === 'string' && typeof record.jobKey === 'string';
}
2 changes: 1 addition & 1 deletion src/routes/+page.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@
if (extractor.hasError) {
consenting = false;
awaitingAuth = false;
consentError = 'Something went wrong. Please try signing in again.';
consentError = extractor.status;
stopConsentMonitor();
}
}, 150);
Expand Down
37 changes: 0 additions & 37 deletions src/routes/api/extraction/process/+server.ts

This file was deleted.

9 changes: 5 additions & 4 deletions src/routes/emails.remote.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
import * as v from 'valibot';
import { command, getRequestEvent } from '$app/server';
import { enqueueExtractionJob } from '$lib/server/extraction-queue';
import {
createExtractionJob,
ensureExtractionTables,
getExtractionStatus,
scheduleExtractionStep
getExtractionStatus
} from '$lib/server/extraction-jobs';

export const startEmailExtraction = command(
v.object({ accessToken: v.string() }),
async ({ accessToken }) => {
const event = getRequestEvent();
const db = event.platform?.env.loop_extract_emails_prod;
const queue = event.platform?.env.EXTRACTION_QUEUE;
const salt = event.platform?.env.USER_HASH_SALT;

if (!db) throw new Error('D1 binding not configured');
if (!queue) throw new Error('Queue binding not configured');
if (!salt) throw new Error('USER_HASH_SALT not configured');

await ensureExtractionTables(db);
const { jobId, jobKey } = await createExtractionJob({ db, salt, accessToken });

scheduleExtractionStep(event, { jobId, jobKey });
await enqueueExtractionJob(queue, { jobId, jobKey });

return { jobId, jobKey };
}
Expand Down
2 changes: 1 addition & 1 deletion svelte.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import adapter from '@sveltejs/adapter-cloudflare';
/** @type {import('@sveltejs/kit').Config} */
const config = {
kit: {
adapter: adapter(),
adapter: adapter({ config: 'wrangler.app.jsonc' }),
experimental: {
remoteFunctions: true
}
Expand Down
13 changes: 13 additions & 0 deletions worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import app from './.svelte-kit/cloudflare/_worker.js';
import { handleExtractionQueueBatch } from './src/lib/server/extraction-queue';

const worker = {
fetch(request, env, ctx) {
return app.fetch(request, env, ctx);
},
queue(batch, env) {
return handleExtractionQueueBatch(batch, env);
}
};

export default worker;
32 changes: 32 additions & 0 deletions wrangler.app.jsonc
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"$schema": "./node_modules/wrangler/config-schema.json",
"name": "loop-waitlist",
"compatibility_date": "2026-02-20",
"compatibility_flags": ["nodejs_als"],
"account_id": "b287c94b13bd9617c8a7d2498ffcf8c2",
"main": ".svelte-kit/cloudflare/_worker.js",
"assets": {
"binding": "ASSETS",
"directory": ".svelte-kit/cloudflare"
},
"workers_dev": true,
"preview_urls": true,
"queues": {
"producers": [
{
"binding": "EXTRACTION_QUEUE",
"queue": "loop-extraction-jobs",
"remote": true
}
],
"consumers": []
},
"d1_databases": [
{
"binding": "loop_extract_emails_prod",
"database_name": "loop-extract-emails-prod",
"database_id": "6f2ed02a-0a28-493c-bc4b-8fb9f57f7a19",
"remote": true
}
]
}
22 changes: 21 additions & 1 deletion wrangler.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,33 @@
"compatibility_date": "2026-02-20",
"compatibility_flags": ["nodejs_als"],
"account_id": "b287c94b13bd9617c8a7d2498ffcf8c2",
"main": ".svelte-kit/cloudflare/_worker.js",
"main": "worker.js",
"build": {
"command": "pnpm build"
},
"assets": {
"binding": "ASSETS",
"directory": ".svelte-kit/cloudflare"
},
"workers_dev": true,
"preview_urls": true,
"queues": {
"producers": [
{
"binding": "EXTRACTION_QUEUE",
"queue": "loop-extraction-jobs",
"remote": true
}
],
"consumers": [
{
"queue": "loop-extraction-jobs",
"max_batch_size": 1,
"max_retries": 10,
"max_concurrency": 1
}
]
},
"d1_databases": [
{
"binding": "loop_extract_emails_prod",
Expand Down