Skip to content

Commit ee3c98e

Browse files
committed
fix: move skip logic out of download limiter and add progress feedback
- URL-dedup skips now happen outside downloadLimit for instant throughput - Cross-crawl dup inserts are batched (100 at a time) instead of one-by-one - Add Loading section showing DB query progress (uploaded, duplicate, total) - Show discovered/scanned count in progress bar during processing
1 parent 70a7f2d commit ee3c98e

2 files changed

Lines changed: 48 additions & 22 deletions

File tree

packages/scraper/scraper.ts

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,13 @@ export async function scrape(options: ScrapeOptions) {
182182

183183
// Pre-load processed URLs for fast duplicate checking
184184
// Failed URLs are excluded so they get retried (different WARC capture might succeed)
185+
section("Loading");
185186
const uploadedUrls = force ? new Set<string>() : await db.getUploadedUrls();
187+
keyValue("Uploaded", `${uploadedUrls.size} URLs`);
186188
const duplicateUrls = force ? new Set<string>() : await db.getDuplicateUrls();
189+
keyValue("Duplicate", `${duplicateUrls.size} URLs`);
187190
const processedUrls = new Set([...uploadedUrls, ...duplicateUrls]);
191+
keyValue("Total", `${processedUrls.size} known URLs`);
188192

189193
// Aggregate stats across all crawls
190194
const totalStats = { saved: 0, skipped: 0, failed: 0 };
@@ -193,6 +197,7 @@ export async function scrape(options: ScrapeOptions) {
193197
for (const crawlId of resolvedCrawlIds) {
194198
// Pre-load URLs already tracked under this crawl for cross-crawl dedup
195199
const crawlUrls = force ? new Set<string>() : await db.getUrlsForCrawl(crawlId);
200+
keyValue("Crawl URLs", `${crawlUrls.size} tracked in ${crawlId}`);
196201

197202
const crawlStartTime = Date.now();
198203

@@ -246,6 +251,7 @@ export async function scrape(options: ScrapeOptions) {
246251
total: batchSize,
247252
docsPerSec: currentDocsPerSec,
248253
currentRps: rateLimiter.getCurrentRps(),
254+
discovered: stats.discovered,
249255
skipped: stats.skipped,
250256
failed: stats.failed,
251257
retried: errorCount,
@@ -261,33 +267,50 @@ export async function scrape(options: ScrapeOptions) {
261267

262268
const tasks: Set<Promise<void>> = new Set();
263269

270+
// Batch cross-crawl dup inserts for performance
271+
const pendingDups: { id: string; url: string; filename: string }[] = [];
272+
const DUP_BATCH_SIZE = 100;
273+
274+
const flushDups = async () => {
275+
if (pendingDups.length === 0) return;
276+
const batch = pendingDups.splice(0);
277+
for (const dup of batch) {
278+
await db.upsertDocument({
279+
id: dup.id,
280+
source_url: dup.url,
281+
crawl_id: crawlId,
282+
original_filename: dup.filename,
283+
status: "duplicate",
284+
error_message: "cross-crawl duplicate",
285+
});
286+
}
287+
};
288+
264289
for await (const record of streamCdxFromR2(config, crawlId)) {
265290
if (stats.saved >= batchSize) break;
266291

267292
stats.discovered++;
268-
updateProgress();
269293

270-
const task = downloadLimit(async () => {
271-
// Check duplicates BEFORE rate limiting (instant skip)
272-
if (!force && processedUrls.has(record.url)) {
273-
stats.skipped++;
274-
// Create cross-crawl dup record if URL exists in DB but not under this crawl
275-
if (!crawlUrls.has(record.url)) {
276-
const crawlScopedHash = await computeHash(new TextEncoder().encode(record.url + crawlId));
277-
await db.upsertDocument({
278-
id: `dup-${crawlScopedHash}`,
279-
source_url: record.url,
280-
crawl_id: crawlId,
281-
original_filename: extractFilename(record.url),
282-
status: "duplicate",
283-
error_message: "cross-crawl duplicate",
284-
});
285-
crawlUrls.add(record.url);
286-
}
287-
updateProgress();
288-
return;
294+
// Fast skip: URL already processed (outside downloadLimit for instant throughput)
295+
if (!force && processedUrls.has(record.url)) {
296+
stats.skipped++;
297+
if (!crawlUrls.has(record.url)) {
298+
const crawlScopedHash = await computeHash(new TextEncoder().encode(record.url + crawlId));
299+
pendingDups.push({
300+
id: `dup-${crawlScopedHash}`,
301+
url: record.url,
302+
filename: extractFilename(record.url),
303+
});
304+
crawlUrls.add(record.url);
305+
if (pendingDups.length >= DUP_BATCH_SIZE) await flushDups();
289306
}
307+
updateProgress();
308+
continue;
309+
}
290310

311+
updateProgress();
312+
313+
const task = downloadLimit(async () => {
291314
await rateLimiter.acquire();
292315
await processRecord(record, {
293316
db,
@@ -312,6 +335,7 @@ export async function scrape(options: ScrapeOptions) {
312335
}
313336

314337
await Promise.all(tasks);
338+
await flushDups();
315339
clearLines(prevLineCount);
316340

317341
// Accumulate totals

packages/shared/ui.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,20 +54,22 @@ export interface ProgressStats {
5454
total: number;
5555
docsPerSec: number;
5656
currentRps?: number;
57+
discovered?: number;
5758
skipped?: number;
5859
failed?: number;
5960
retried?: number;
6061
elapsedMs: number;
6162
}
6263

6364
export function formatProgress(stats: ProgressStats): string[] {
64-
const { saved, total, docsPerSec, currentRps, skipped, failed, retried, elapsedMs } = stats;
65+
const { saved, total, docsPerSec, currentRps, discovered, skipped, failed, retried, elapsedMs } = stats;
6566

6667
const lines: string[] = [];
6768

6869
// Line 1: Progress bar with count and percentage
70+
const processed = saved + (skipped || 0) + (failed || 0);
6971
if (total === Infinity) {
70-
lines.push(`━━━━━━━━━━━━━━━━━━━━ ${saved} saved`);
72+
lines.push(`━━━━━━━━━━━━━━━━━━━━ ${saved} saved${discovered ? ` (${discovered} scanned)` : ""}`);
7173
} else {
7274
const bar = progressBar(saved, total);
7375
const pct = total > 0 ? ((saved / total) * 100).toFixed(1) : "0.0";

0 commit comments

Comments
 (0)