Skip to content

Commit 8b60a6d

Browse files
committed
perf: optimize scraper DB queries
- Transfer md5 hashes instead of full URLs for processedUrls loading (~6s vs ~14s, 70% less data over the wire) - Batch cross-crawl dup inserts (100 per round trip vs 1) - Add covering indexes on (status, source_url) and (crawl_id, source_url) for index-only scans - Add upsertDuplicateBatch to DbClient for bulk dup record creation
1 parent b8b90ed commit 8b60a6d

4 files changed

Lines changed: 79 additions & 20 deletions

File tree

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
-- Covering indexes for common scraper queries
2+
-- Allows index-only scans (no table lookups) for URL loading
3+
4+
-- Covers: SELECT source_url FROM documents WHERE status IN ('uploaded', 'duplicate')
5+
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_status_url
6+
ON documents(status, source_url);
7+
8+
-- Covers: SELECT source_url FROM documents WHERE crawl_id = $1
9+
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_crawl_url
10+
ON documents(crawl_id, source_url);

db/schema.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ CREATE INDEX IF NOT EXISTS idx_documents_status ON documents(status);
5151
CREATE INDEX IF NOT EXISTS idx_documents_crawl_id ON documents(crawl_id);
5252
CREATE INDEX IF NOT EXISTS idx_documents_source_url ON documents(source_url);
5353

54+
-- Covering indexes for scraper URL loading (index-only scans)
55+
CREATE INDEX IF NOT EXISTS idx_documents_status_url ON documents(status, source_url);
56+
CREATE INDEX IF NOT EXISTS idx_documents_crawl_url ON documents(crawl_id, source_url);
57+
5458
-- Indexes for extraction/embedding/classification queries
5559
CREATE INDEX IF NOT EXISTS idx_documents_extracted ON documents(extracted_at) WHERE extracted_at IS NOT NULL;
5660
CREATE INDEX IF NOT EXISTS idx_documents_embedded ON documents(embedded_at) WHERE embedded_at IS NOT NULL;

packages/scraper/scraper.ts

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,26 @@ import {
2222
} from "@docx-corpus/shared";
2323
import { computeHash, extractFilename, validateDocx } from "./validation";
2424

25+
function md5Url(url: string): string {
26+
const hasher = new Bun.CryptoHasher("md5");
27+
hasher.update(url);
28+
return hasher.digest("hex");
29+
}
30+
2531
interface ProcessContext {
2632
db: Awaited<ReturnType<typeof createDb>>;
2733
storage: Storage;
2834
config: Config;
2935
crawlId: string;
3036
stats: { saved: number; skipped: number; failed: number };
3137
rateLimiter: RateLimiter;
32-
processedUrls: Set<string>;
38+
processedHashes: Set<string>;
3339
force?: boolean;
3440
onError?: (status: number, url: string, message: string) => void;
3541
}
3642

3743
async function processRecord(record: CdxRecord, ctx: ProcessContext) {
38-
const { db, storage, config, crawlId, stats, rateLimiter, processedUrls, onError } = ctx;
44+
const { db, storage, config, crawlId, stats, rateLimiter, processedHashes, onError } = ctx;
3945

4046
// Download from WARC
4147
let result: WarcResult;
@@ -65,7 +71,7 @@ async function processRecord(record: CdxRecord, ctx: ProcessContext) {
6571
await db.deleteDocument(`failed-${urlHash}`, crawlId);
6672

6773
// Mark URL as processed so duplicate CDX entries are skipped
68-
processedUrls.add(record.url);
74+
processedHashes.add(md5Url(record.url));
6975

7076
// Validate
7177
const validation = validateDocx(result.content);
@@ -185,15 +191,12 @@ export async function scrape(options: ScrapeOptions) {
185191
// Initialize database
186192
const db = await createDb(config.database.url);
187193

188-
// Pre-load processed URLs for fast duplicate checking
194+
// Pre-load processed URL hashes for fast duplicate checking
189195
// Failed URLs are excluded so they get retried (different WARC capture might succeed)
196+
// Uses md5 hashes instead of full URLs to reduce network transfer (~6s vs ~14s)
190197
section("Loading");
191-
const uploadedUrls = force ? new Set<string>() : await db.getUploadedUrls();
192-
keyValue("Uploaded", `${uploadedUrls.size} URLs`);
193-
const duplicateUrls = force ? new Set<string>() : await db.getDuplicateUrls();
194-
keyValue("Duplicate", `${duplicateUrls.size} URLs`);
195-
const processedUrls = new Set([...uploadedUrls, ...duplicateUrls]);
196-
keyValue("Total", `${processedUrls.size} known URLs`);
198+
const processedHashes = force ? new Set<string>() : await db.getProcessedUrlHashes();
199+
keyValue("Processed", `${processedHashes.size} hashes (uploaded + duplicate)`);
197200

198201
// Aggregate stats across all crawls
199202
const totalStats = { saved: 0, skipped: 0, failed: 0 };
@@ -272,25 +275,39 @@ export async function scrape(options: ScrapeOptions) {
272275

273276
const tasks: Set<Promise<void>> = new Set();
274277

278+
// Batch cross-crawl dup inserts to avoid per-record DB round trips
279+
type DupRecord = { id: string; sourceUrl: string; crawlId: string; filename: string };
280+
const pendingDups: DupRecord[] = [];
281+
const DUP_BATCH_SIZE = 100;
282+
283+
const flushDups = async () => {
284+
if (pendingDups.length === 0) return;
285+
const batch = pendingDups.splice(0);
286+
await db.upsertDuplicateBatch(batch);
287+
};
288+
275289
try {
276290
for await (const record of streamCdxFromR2(config, crawlId)) {
277291
stats.discovered++;
278292

279293
// Fast skip: URL already processed (outside downloadLimit for instant throughput)
280-
if (!force && processedUrls.has(record.url)) {
294+
if (!force && processedHashes.has(md5Url(record.url))) {
281295
stats.skipped++;
282-
// Create cross-crawl dup record if URL exists in DB but not under this crawl
296+
// Queue cross-crawl dup record if URL exists in DB but not under this crawl
283297
if (!crawlUrls.has(record.url)) {
284-
const crawlScopedHash = await computeHash(new TextEncoder().encode(record.url + crawlId));
285-
await db.upsertDocument({
298+
const hasher = new Bun.CryptoHasher("sha256");
299+
hasher.update(record.url + crawlId);
300+
const crawlScopedHash = hasher.digest("hex");
301+
pendingDups.push({
286302
id: `dup-${crawlScopedHash}`,
287-
source_url: record.url,
288-
crawl_id: crawlId,
289-
original_filename: extractFilename(record.url),
290-
status: "duplicate",
291-
error_message: "cross-crawl duplicate",
303+
sourceUrl: record.url,
304+
crawlId,
305+
filename: extractFilename(record.url),
292306
});
293307
crawlUrls.add(record.url);
308+
if (pendingDups.length >= DUP_BATCH_SIZE) {
309+
await flushDups();
310+
}
294311
}
295312
updateProgress();
296313
continue;
@@ -308,7 +325,7 @@ export async function scrape(options: ScrapeOptions) {
308325
crawlId,
309326
stats,
310327
rateLimiter,
311-
processedUrls,
328+
processedHashes,
312329
force,
313330
onError,
314331
});
@@ -334,6 +351,8 @@ export async function scrape(options: ScrapeOptions) {
334351
prevLineCount = 0;
335352
}
336353

354+
// Flush remaining cross-crawl dup records
355+
await flushDups();
337356
await Promise.all(tasks);
338357
clearLines(prevLineCount);
339358

packages/shared/db.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ export interface DbClient {
8282
deleteDocument(id: string, crawlId?: string): Promise<void>;
8383
getDocument(id: string): Promise<DocumentRecord | null>;
8484
getDocumentByUrl(url: string): Promise<DocumentRecord | null>;
85+
getProcessedUrlHashes(): Promise<Set<string>>;
86+
upsertDuplicateBatch(records: { id: string; sourceUrl: string; crawlId: string; filename: string }[]): Promise<void>;
8587
getUploadedUrls(): Promise<Set<string>>;
8688
getFailedUrls(): Promise<Set<string>>;
8789
getDuplicateUrls(): Promise<Set<string>>;
@@ -185,6 +187,30 @@ export async function createDb(databaseUrl: string): Promise<DbClient> {
185187
return rows[0] || null;
186188
},
187189

190+
async getProcessedUrlHashes() {
191+
const rows = await sql<{ h: string }[]>`
192+
SELECT md5(source_url) as h FROM documents WHERE status IN ('uploaded', 'duplicate')
193+
`;
194+
return new Set(rows.map((r) => r.h));
195+
},
196+
197+
async upsertDuplicateBatch(records) {
198+
if (records.length === 0) return;
199+
const values = records.map((_, i) => {
200+
const b = i * 6;
201+
return `($${b + 1}, $${b + 2}, $${b + 3}, $${b + 4}, $${b + 5}, $${b + 6})`;
202+
}).join(", ");
203+
const params = records.flatMap((r) => [
204+
r.id, r.sourceUrl, r.crawlId, r.filename, "duplicate", "cross-crawl duplicate",
205+
]);
206+
await sql.unsafe(
207+
`INSERT INTO documents (id, source_url, crawl_id, original_filename, status, error_message)
208+
VALUES ${values}
209+
ON CONFLICT (id) DO NOTHING`,
210+
params
211+
);
212+
},
213+
188214
async getUploadedUrls() {
189215
const rows = await sql<{ source_url: string }[]>`
190216
SELECT source_url FROM documents WHERE status = 'uploaded'

0 commit comments

Comments
 (0)