Skip to content

Commit bcc9ea3

Browse files
committed
feat: scraper dedup improvements and failed URL retry
- Add cross-crawl duplicate detection (creates dup records when URL exists under different crawl) - Content-dedup now matches any existing hash, not just uploaded status - Clean up stale failed-* records when WARC retry succeeds - Track processedUrls after successful download to prevent same-URL CDX duplicates - Failed URLs are no longer skipped on re-runs (different WARC capture might succeed) - Add deleteDocument and getUrlsForCrawl to DbClient - Document dedup model in CLAUDE.md
1 parent 2638a65 commit bcc9ea3

3 files changed

Lines changed: 92 additions & 19 deletions

File tree

CLAUDE.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,47 @@ Each stage writes to the same PostgreSQL database (`documents` table):
2929
3. **Embed** (TS) — Google API → pgvector (`embedding`, `embedded_at`)
3030
4. **Classify** (Python) — ModernBERT → labels (`document_type`, `document_topic`)
3131

32+
## Scraper deduplication
33+
34+
The scraper maintains **exact parity** between CDX URLs and database records: every URL in a crawl's CDX files has exactly one record in the `documents` table under that `crawl_id`.
35+
36+
### Document statuses
37+
38+
- `uploaded` — valid .docx saved to R2, ID is `{contentHash}`
39+
- `failed` — WARC download failed or content is invalid docx, ID is `failed-{urlHash}` (download error) or `{contentHash}` (validation error)
40+
- `duplicate` — same content already exists under a different URL, ID is `dup-{urlHash}`
41+
42+
### ID scheme
43+
44+
IDs are content-addressed for storage mapping (`documents/{id}.docx`):
45+
46+
| Scenario | ID | Reason |
47+
|---|---|---|
48+
| Uploaded | `{sha256(content)}` | Maps to R2 storage key |
49+
| Download failed | `failed-{sha256(url)}` | No content available, use URL hash |
50+
| Validation failed | `{sha256(content)}` | Content exists but isn't valid docx |
51+
| Content duplicate | `dup-{sha256(url)}` | Content hash would collide with original |
52+
53+
### Dedup paths
54+
55+
The scraper handles three dedup scenarios in order:
56+
57+
1. **URL-dedup** (instant, no download) — URL already in `processedUrls` Set (loaded from all crawls at startup). If the URL exists under a different `crawl_id`, creates a cross-crawl `duplicate` record under the current crawl. If already under the current crawl, silently skips.
58+
59+
2. **Content-dedup** (requires WARC download) — URL is new but content hash matches an existing document. Creates a `duplicate` record pointing to the original.
60+
61+
3. **Same-URL retry** (within same crawl) — Same URL appears multiple times in CDX files (different WARC captures). After a successful WARC download, the URL is added to `processedUrls` so subsequent entries are skipped. Failed downloads do NOT add to `processedUrls`, allowing retry from a different WARC capture.
62+
63+
### Stale record cleanup
64+
65+
When a WARC download succeeds, the scraper deletes any previous `failed-{urlHash}` record for that URL. This prevents duplicate records when a URL fails on one attempt but succeeds on a later retry (since the failed and successful records have different IDs).
66+
67+
### Re-run safety
68+
69+
Running the scraper on the same crawl again is safe:
70+
- `--force` flag: re-downloads everything but checks `source_url` before creating dup records, so existing records aren't duplicated
71+
- Without `--force`: all existing URLs are in `processedUrls` and skipped instantly
72+
3273
## Database
3374

3475
Single `documents` table in PostgreSQL (NeonDB) with pgvector. All pipeline stages write to this table.

packages/scraper/scraper.ts

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@ interface ProcessContext {
2929
crawlId: string;
3030
stats: { saved: number; skipped: number; failed: number };
3131
rateLimiter: RateLimiter;
32-
uploadedUrls: Set<string>;
32+
processedUrls: Set<string>;
3333
force?: boolean;
3434
onError?: (status: number, url: string, message: string) => void;
3535
}
3636

3737
async function processRecord(record: CdxRecord, ctx: ProcessContext) {
38-
const { db, storage, config, crawlId, stats, rateLimiter, onError } = ctx;
38+
const { db, storage, config, crawlId, stats, rateLimiter, processedUrls, onError } = ctx;
3939

4040
// Download from WARC
4141
let result: WarcResult;
@@ -59,6 +59,13 @@ async function processRecord(record: CdxRecord, ctx: ProcessContext) {
5959
return;
6060
}
6161

62+
// Clean up any previous failed-* record for this URL (from a prior failed attempt)
63+
const urlHash = await computeHash(new TextEncoder().encode(record.url));
64+
await db.deleteDocument(`failed-${urlHash}`);
65+
66+
// Mark URL as processed so duplicate CDX entries are skipped
67+
processedUrls.add(record.url);
68+
6269
// Validate
6370
const validation = validateDocx(result.content);
6471
if (!validation.isValid) {
@@ -82,20 +89,16 @@ async function processRecord(record: CdxRecord, ctx: ProcessContext) {
8289

8390
// Check if already exists by hash (different URL, same file content)
8491
const existingByHash = await db.getDocument(hash);
85-
if (existingByHash && existingByHash.status === "uploaded") {
92+
if (existingByHash && existingByHash.source_url !== record.url) {
8693
stats.skipped++;
87-
// Only create duplicate record if it's actually a different URL
88-
if (existingByHash.source_url !== record.url) {
89-
const urlHash = await computeHash(new TextEncoder().encode(record.url));
90-
await db.upsertDocument({
91-
id: `dup-${urlHash}`,
92-
source_url: record.url,
93-
crawl_id: crawlId,
94-
original_filename: extractFilename(record.url),
95-
status: "duplicate",
96-
error_message: `duplicate content of ${hash}`,
97-
});
98-
}
94+
await db.upsertDocument({
95+
id: `dup-${urlHash}`,
96+
source_url: record.url,
97+
crawl_id: crawlId,
98+
original_filename: extractFilename(record.url),
99+
status: "duplicate",
100+
error_message: `duplicate content of ${hash}`,
101+
});
99102
return;
100103
}
101104

@@ -176,17 +179,20 @@ export async function scrape(options: ScrapeOptions) {
176179
// Initialize database
177180
const db = await createDb(config.database.url);
178181

179-
// Pre-load processed URLs for fast duplicate checking (includes uploaded, failed, and duplicate)
182+
// Pre-load processed URLs for fast duplicate checking
183+
// Failed URLs are excluded so they get retried (different WARC capture might succeed)
180184
const uploadedUrls = force ? new Set<string>() : await db.getUploadedUrls();
181-
const failedUrls = force ? new Set<string>() : await db.getFailedUrls();
182185
const duplicateUrls = force ? new Set<string>() : await db.getDuplicateUrls();
183-
const processedUrls = new Set([...uploadedUrls, ...failedUrls, ...duplicateUrls]);
186+
const processedUrls = new Set([...uploadedUrls, ...duplicateUrls]);
184187

185188
// Aggregate stats across all crawls
186189
const totalStats = { saved: 0, skipped: 0, failed: 0 };
187190

188191
// Process each crawl
189192
for (const crawlId of resolvedCrawlIds) {
193+
// Pre-load URLs already tracked under this crawl for cross-crawl dedup
194+
const crawlUrls = force ? new Set<string>() : await db.getUrlsForCrawl(crawlId);
195+
190196
const crawlStartTime = Date.now();
191197

192198
// Per-crawl stats
@@ -264,6 +270,19 @@ export async function scrape(options: ScrapeOptions) {
264270
// Check duplicates BEFORE rate limiting (instant skip)
265271
if (!force && processedUrls.has(record.url)) {
266272
stats.skipped++;
273+
// Create cross-crawl dup record if URL exists in DB but not under this crawl
274+
if (!crawlUrls.has(record.url)) {
275+
const urlHash = await computeHash(new TextEncoder().encode(record.url));
276+
await db.upsertDocument({
277+
id: `dup-${urlHash}`,
278+
source_url: record.url,
279+
crawl_id: crawlId,
280+
original_filename: extractFilename(record.url),
281+
status: "duplicate",
282+
error_message: "cross-crawl duplicate",
283+
});
284+
crawlUrls.add(record.url);
285+
}
267286
updateProgress();
268287
return;
269288
}
@@ -276,7 +295,7 @@ export async function scrape(options: ScrapeOptions) {
276295
crawlId,
277296
stats,
278297
rateLimiter,
279-
uploadedUrls,
298+
processedUrls,
280299
force,
281300
onError,
282301
});

packages/shared/db.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,13 @@ export interface LLMClassificationData {
7979
export interface DbClient {
8080
// Scraping methods (existing)
8181
upsertDocument(doc: Partial<DocumentRecord> & { id: string }): Promise<void>;
82+
deleteDocument(id: string): Promise<void>;
8283
getDocument(id: string): Promise<DocumentRecord | null>;
8384
getDocumentByUrl(url: string): Promise<DocumentRecord | null>;
8485
getUploadedUrls(): Promise<Set<string>>;
8586
getFailedUrls(): Promise<Set<string>>;
8687
getDuplicateUrls(): Promise<Set<string>>;
88+
getUrlsForCrawl(crawlId: string): Promise<Set<string>>;
8789
getDocumentsByStatus(status: DocumentStatus, limit?: number): Promise<DocumentRecord[]>;
8890
getStats(): Promise<{ status: string; count: number }[]>;
8991
getAllDocuments(limit?: number): Promise<DocumentRecord[]>;
@@ -161,6 +163,10 @@ export async function createDb(databaseUrl: string): Promise<DbClient> {
161163
}
162164
},
163165

166+
async deleteDocument(id: string) {
167+
await sql`DELETE FROM documents WHERE id = ${id}`;
168+
},
169+
164170
async getDocument(id: string) {
165171
const rows = await sql<DocumentRecord[]>`
166172
SELECT * FROM documents WHERE id = ${id}
@@ -196,6 +202,13 @@ export async function createDb(databaseUrl: string): Promise<DbClient> {
196202
return new Set(rows.map((r) => r.source_url));
197203
},
198204

205+
async getUrlsForCrawl(crawlId: string) {
206+
const rows = await sql<{ source_url: string }[]>`
207+
SELECT source_url FROM documents WHERE crawl_id = ${crawlId}
208+
`;
209+
return new Set(rows.map((r) => r.source_url));
210+
},
211+
199212
async getDocumentsByStatus(status: DocumentStatus, limit = 100) {
200213
return sql<DocumentRecord[]>`
201214
SELECT * FROM documents WHERE status = ${status} LIMIT ${limit}

0 commit comments

Comments
 (0)