Skip to content

Commit c2cfd8a

Browse files
committed
fix: handle errors in download tasks and CDX stream gracefully
- Wrap downloadLimit callback in try/catch to prevent unhandled rejections from crashing the process silently - Wrap CDX stream loop to catch and log stream errors - Errors are printed above the progress bar so they remain visible
1 parent 7b89770 commit c2cfd8a

1 file changed

Lines changed: 54 additions & 41 deletions

File tree

packages/scraper/scraper.ts

Lines changed: 54 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -286,52 +286,65 @@ export async function scrape(options: ScrapeOptions) {
286286
}
287287
};
288288

289-
for await (const record of streamCdxFromR2(config, crawlId)) {
290-
if (stats.saved >= batchSize) break;
291-
292-
stats.discovered++;
293-
294-
// Fast skip: URL already processed (outside downloadLimit for instant throughput)
295-
if (!force && processedUrls.has(record.url)) {
296-
stats.skipped++;
297-
if (!crawlUrls.has(record.url)) {
298-
const crawlScopedHash = await computeHash(new TextEncoder().encode(record.url + crawlId));
299-
pendingDups.push({
300-
id: `dup-${crawlScopedHash}`,
301-
url: record.url,
302-
filename: extractFilename(record.url),
303-
});
304-
crawlUrls.add(record.url);
305-
if (pendingDups.length >= DUP_BATCH_SIZE) await flushDups();
289+
try {
290+
for await (const record of streamCdxFromR2(config, crawlId)) {
291+
if (stats.saved >= batchSize) break;
292+
293+
stats.discovered++;
294+
295+
// Fast skip: URL already processed (outside downloadLimit for instant throughput)
296+
if (!force && processedUrls.has(record.url)) {
297+
stats.skipped++;
298+
if (!crawlUrls.has(record.url)) {
299+
const crawlScopedHash = await computeHash(new TextEncoder().encode(record.url + crawlId));
300+
pendingDups.push({
301+
id: `dup-${crawlScopedHash}`,
302+
url: record.url,
303+
filename: extractFilename(record.url),
304+
});
305+
crawlUrls.add(record.url);
306+
if (pendingDups.length >= DUP_BATCH_SIZE) await flushDups();
307+
}
308+
updateProgress();
309+
continue;
306310
}
307-
updateProgress();
308-
continue;
309-
}
310311

311-
updateProgress();
312-
313-
const task = downloadLimit(async () => {
314-
await rateLimiter.acquire();
315-
await processRecord(record, {
316-
db,
317-
storage,
318-
config,
319-
crawlId,
320-
stats,
321-
rateLimiter,
322-
processedUrls,
323-
force,
324-
onError,
325-
});
326-
crawlUrls.add(record.url);
327312
updateProgress();
328-
}).finally(() => tasks.delete(task));
329313

330-
tasks.add(task);
331-
332-
if (tasks.size >= config.crawl.concurrency * 2) {
333-
await Promise.race(tasks);
314+
const task = downloadLimit(async () => {
315+
try {
316+
await rateLimiter.acquire();
317+
await processRecord(record, {
318+
db,
319+
storage,
320+
config,
321+
crawlId,
322+
stats,
323+
rateLimiter,
324+
processedUrls,
325+
force,
326+
onError,
327+
});
328+
crawlUrls.add(record.url);
329+
updateProgress();
330+
} catch (err) {
331+
stats.failed++;
332+
clearLines(prevLineCount);
333+
logError(`Error processing ${record.url}: ${err instanceof Error ? err.message : String(err)}`);
334+
prevLineCount = 0;
335+
}
336+
}).finally(() => tasks.delete(task));
337+
338+
tasks.add(task);
339+
340+
if (tasks.size >= config.crawl.concurrency * 2) {
341+
await Promise.race(tasks);
342+
}
334343
}
344+
} catch (err) {
345+
clearLines(prevLineCount);
346+
logError(`CDX stream error: ${err instanceof Error ? err.message : String(err)}`);
347+
prevLineCount = 0;
335348
}
336349

337350
await Promise.all(tasks);

0 commit comments

Comments
 (0)