diff --git a/.gitignore b/.gitignore index 77e793d8d509..4400170f398d 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,4 @@ test/e2e/**/packages # Local vitest config overrides vitest.config.local.mts +.worktrees/ diff --git a/packages/basic-crawler/src/internals/basic-crawler.ts b/packages/basic-crawler/src/internals/basic-crawler.ts index 3ff8e2c63421..f6c37edc1be5 100644 --- a/packages/basic-crawler/src/internals/basic-crawler.ts +++ b/packages/basic-crawler/src/internals/basic-crawler.ts @@ -39,6 +39,8 @@ import { GotScrapingHttpClient, KeyValueStore, mergeCookies, + Monitor, + type MonitorOptions, NonRetryableError, purgeDefaultStorages, RequestListAdapter, @@ -405,6 +407,25 @@ export interface BasicCrawlerOptions(); private experiments: CrawlerExperiments; @@ -612,6 +635,8 @@ export class BasicCrawler { + // When monitor mode is active, it owns the display — skip the periodic log to avoid + // interleaving plain log lines with ANSI cursor-movement sequences. + if (this.monitorEnabled) return; + const { mode: operationMode, failedDelta } = getOperationMode(); let message: string; @@ -1038,9 +1071,16 @@ export class BasicCrawler this.requestManager?.getTotalCount()) + : null; + + monitor?.start(); + try { await this.autoscaledPool!.run(); } finally { + monitor?.stop(); await this.teardown(); await this.stats.stopCapturing(); diff --git a/packages/core/src/crawlers/index.ts b/packages/core/src/crawlers/index.ts index 77a83511e413..8a3a3c0d673d 100644 --- a/packages/core/src/crawlers/index.ts +++ b/packages/core/src/crawlers/index.ts @@ -4,3 +4,4 @@ export * from './crawler_utils'; export * from './statistics'; export * from './error_tracker'; export * from './error_snapshotter'; +export * from './monitor'; diff --git a/packages/core/src/crawlers/monitor.ts b/packages/core/src/crawlers/monitor.ts new file mode 100644 index 000000000000..423c67f72e59 --- /dev/null +++ b/packages/core/src/crawlers/monitor.ts @@ -0,0 +1,160 @@ +import os from 'node:os'; + +import type { AutoscaledPool } from '../autoscaling/autoscaled_pool'; +import type { Statistics } from './statistics'; + +export interface MonitorOptions { + /** + * How often to refresh the monitor display, in seconds. + * @default 5 + */ + intervalSecs?: number; +} + +const MONITOR_LINE_COUNT = 5; + +function padStart(n: number, width = 2): string { + return String(n).padStart(width, '0'); +} + +function formatDuration(ms: number): string { + const totalSecs = Math.floor(ms / 1000); + const h = Math.floor(totalSecs / 3600); + const m = Math.floor((totalSecs % 3600) / 60); + const s = totalSecs % 60; + return `${padStart(h)}:${padStart(m)}:${padStart(s)}`; +} + +function formatBytes(bytes: number): string { + if (bytes >= 1024 ** 3) return `${(bytes / 1024 ** 3).toFixed(1)} GB`; + if (bytes >= 1024 ** 2) return `${(bytes / 1024 ** 2).toFixed(0)} MB`; + return `${(bytes / 1024).toFixed(0)} KB`; +} + +/** + * Renders a compact real-time status block to `process.stderr` during a crawl. + * + * Enable via the `monitor` option on `BasicCrawler`: + * ```ts + * const crawler = new BasicCrawler({ monitor: true, ... }); + * ``` + * + * In TTY mode the block overwrites itself in-place. In non-TTY mode (CI, pipes) + * it prints plain lines so the output remains readable in logs. + */ +export class Monitor { + private intervalId?: ReturnType; + private readonly intervalMs: number; + private rendered = false; + + constructor( + private readonly stats: Statistics, + private readonly autoscaledPool?: AutoscaledPool, + options: MonitorOptions = {}, + private readonly totalRequests?: () => number | undefined, + ) { + this.intervalMs = (options.intervalSecs ?? 5) * 1000; + } + + /** Starts the periodic display. Renders an initial frame immediately, then repeats on each interval. */ + start(): void { + this.render(); // render immediately so short crawls always show output + this.intervalId = setInterval(() => this.render(), this.intervalMs); + this.intervalId.unref(); // don't prevent process exit if the event loop would otherwise be empty + } + + /** Stops the periodic display and clears the last rendered block from the terminal. */ + stop(): void { + if (this.intervalId !== undefined) { + clearInterval(this.intervalId); + this.intervalId = undefined; + } + if (this.rendered && process.stderr.isTTY) { + // Move up MONITOR_LINE_COUNT lines and clear each one + for (let i = 0; i < MONITOR_LINE_COUNT; i++) { + process.stderr.write('\x1b[1A\x1b[2K'); + } + this.rendered = false; + } + } + + /** Builds and returns the status block as an array of lines. Exposed for testing. */ + buildLines(): string[] { + const { state } = this.stats; + const calculated = this.stats.calculate(); + + const startedAt = state.crawlerStartedAt ? new Date(state.crawlerStartedAt) : new Date(); + const now = new Date(); + const elapsed = now.getTime() - startedAt.getTime(); + + const finished = state.requestsFinished; + const failed = state.requestsFailed; + const total = this.totalRequests?.(); + // getTotalCount() on RequestManagerTandem may be an approximate sum + // of the underlying RequestList + RequestQueue. The plan treats this as a best-effort + // estimate: progress % and ETA are shown when total > 0, hidden when total === 0. + // This matches the existing behaviour in PR #2692 and is acceptable for a "monitor mode" + // display (non-authoritative progress indicator). No special-casing per request-source mode. + const speed = calculated.requestsFinishedPerMinute; + + const progressStr = total != null && total > 0 + ? `${finished}/${total} (${((finished / total) * 100).toFixed(1)}%)` + : total === 0 + ? `${finished}/0 (N/A%)` + : `${finished}/? (?%)`; + + const failedPct = finished + failed > 0 + ? ` | Failed: ${failed} (${((failed / (finished + failed)) * 100).toFixed(1)}%)` + : ''; + + let etaStr = 'N/A'; + if (total != null && total > 0 && speed > 0) { + // Use Math.max to guard against negative remaining (e.g. when total is an approximate count) + const remaining = Math.max(0, total - finished); + const etaMs = (remaining / speed) * 60 * 1000; + etaStr = `~${formatDuration(etaMs)}`; + } + + const memInfo = process.memoryUsage(); + const totalMem = os.totalmem(); + const usedMem = totalMem - os.freemem(); + const cpus = os.cpus(); + const cpuLoad = os.loadavg()[0]; + // os.loadavg() always returns [0,0,0] on Windows — show N/A to avoid misleading output. + const cpuPct = process.platform === 'win32' + ? 'N/A' + : cpus.length > 0 ? Math.min(100, (cpuLoad / cpus.length) * 100).toFixed(0) : '?'; + + const concurrency = this.autoscaledPool + ? `${this.autoscaledPool.currentConcurrency}/${this.autoscaledPool.maxConcurrency} (desired: ${this.autoscaledPool.desiredConcurrency})` + : 'N/A'; + + return [ + `\u23F1 Start: ${startedAt.toLocaleTimeString()} | Running for ${formatDuration(elapsed)}`, + `\uD83D\uDCCA Progress: ${progressStr}${failedPct} | Speed: ${speed} req/min`, + `\u23F3 ETA: ${etaStr}`, + `\uD83D\uDCBB CPU: ${cpuPct}% | Mem: ${formatBytes(memInfo.rss)} process / ${formatBytes(usedMem)} / ${formatBytes(totalMem)} total`, + `\uD83D\uDD00 Concurrency: ${concurrency}`, + ]; + } + + private render(): void { + const lines = this.buildLines(); + + if (process.stderr.isTTY && this.rendered) { + // Move cursor up to overwrite previous block + process.stderr.write(`\x1b[${MONITOR_LINE_COUNT}A`); + } + + for (const line of lines) { + if (process.stderr.isTTY) { + // Clear line then write + process.stderr.write(`\x1b[2K${line}\n`); + } else { + process.stderr.write(`${line}\n`); + } + } + + this.rendered = true; + } +} diff --git a/test/core/crawlers/basic_crawler.test.ts b/test/core/crawlers/basic_crawler.test.ts index c455fbf4f402..0956aeb803ea 100644 --- a/test/core/crawlers/basic_crawler.test.ts +++ b/test/core/crawlers/basic_crawler.test.ts @@ -2039,4 +2039,51 @@ describe('BasicCrawler', () => { expect(crawlerB.requestQueue?.config).toBe(configB); }); }); + + describe('monitor option', () => { + test('crawler runs successfully with monitor: true', async () => { + const handledUrls: string[] = []; + + const crawler = new BasicCrawler({ + monitor: true, + requestHandler: ({ request }) => { + handledUrls.push(request.url); + }, + }); + + await crawler.run([{ url: `http://${HOSTNAME}:${port}` }]); + expect(handledUrls).toHaveLength(1); + }); + + test('crawler runs successfully with monitor: false (default)', async () => { + const handledUrls: string[] = []; + + const crawler = new BasicCrawler({ + requestHandler: ({ request }) => { + handledUrls.push(request.url); + }, + }); + + await crawler.run([{ url: `http://${HOSTNAME}:${port}` }]); + expect(handledUrls).toHaveLength(1); + }); + + test('monitor: true does not suppress request errors — failedRequestHandler still fires', async () => { + let failed = 0; + + const crawler = new BasicCrawler({ + monitor: true, + maxRequestRetries: 0, + requestHandler: () => { + throw new Error('forced failure'); + }, + failedRequestHandler: () => { + failed++; + }, + }); + + await crawler.run([{ url: `http://${HOSTNAME}:${port}` }]); + expect(failed).toBe(1); + }); + }); }); diff --git a/test/core/crawlers/monitor.test.ts b/test/core/crawlers/monitor.test.ts new file mode 100644 index 000000000000..324388360c09 --- /dev/null +++ b/test/core/crawlers/monitor.test.ts @@ -0,0 +1,209 @@ +import os from 'node:os'; + +import { Statistics } from '@crawlee/core'; +import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest'; + +import { Monitor } from '../../../packages/core/src/crawlers/monitor'; +import { MemoryStorageEmulator } from '../../shared/MemoryStorageEmulator'; + +describe('Monitor', () => { + const localStorageEmulator = new MemoryStorageEmulator(); + let originalIsTTY: boolean | undefined; + + beforeEach(async () => { + await localStorageEmulator.init(); + vi.useFakeTimers(); + originalIsTTY = process.stderr.isTTY; + }); + + afterEach(async () => { + await localStorageEmulator.destroy(); + vi.useRealTimers(); + vi.restoreAllMocks(); + // Restore isTTY — Object.defineProperty mutations are not undone by vi.restoreAllMocks + Object.defineProperty(process.stderr, 'isTTY', { value: originalIsTTY, configurable: true }); + }); + + test('constructs without throwing', () => { + const stats = new Statistics(); + expect(() => new Monitor(stats)).not.toThrow(); + }); + + test('start() and stop() do not throw', () => { + const stats = new Statistics(); + const monitor = new Monitor(stats); + expect(() => monitor.start()).not.toThrow(); + expect(() => monitor.stop()).not.toThrow(); + }); + + test('stop() before start() does not throw', () => { + const stats = new Statistics(); + const monitor = new Monitor(stats); + expect(() => monitor.stop()).not.toThrow(); + }); + + test('buildLines() returns 5 lines', () => { + const stats = new Statistics(); + const monitor = new Monitor(stats); + const lines = monitor.buildLines(); + expect(lines).toHaveLength(5); + }); + + test('buildLines() shows finished/total and percentage when total is known', () => { + const stats = new Statistics(); + stats.startJob('r1'); + stats.finishJob('r1', 0); + + const monitor = new Monitor(stats, undefined, {}, () => 10); + const lines = monitor.buildLines(); + + expect(lines[1]).toContain('1/10'); + expect(lines[1]).toContain('10.0%'); + }); + + test('buildLines() shows ? when total is unknown', () => { + const stats = new Statistics(); + const monitor = new Monitor(stats); + const lines = monitor.buildLines(); + + expect(lines[1]).toContain('/?'); + }); + + test('buildLines() shows ETA as N/A when total is unknown', () => { + const stats = new Statistics(); + const monitor = new Monitor(stats); + const lines = monitor.buildLines(); + + expect(lines[2]).toContain('N/A'); + }); + + test('buildLines() shows concurrency info when autoscaledPool is provided', () => { + const stats = new Statistics(); + const fakePool = { + currentConcurrency: 3, + desiredConcurrency: 5, + maxConcurrency: 10, + } as any; + + const monitor = new Monitor(stats, fakePool); + const lines = monitor.buildLines(); + + expect(lines[4]).toContain('3/10'); + expect(lines[4]).toContain('desired: 5'); + }); + + test('buildLines() shows N/A for concurrency when autoscaledPool is not provided', () => { + const stats = new Statistics(); + const monitor = new Monitor(stats); + const lines = monitor.buildLines(); + + expect(lines[4]).toContain('N/A'); + }); + + test('renders to stderr when interval fires', () => { + const writeStub = vi.spyOn(process.stderr, 'write').mockImplementation(() => true); + const stats = new Statistics(); + const monitor = new Monitor(stats, undefined, { intervalSecs: 1 }); + + monitor.start(); + vi.advanceTimersByTime(1000); + monitor.stop(); + + expect(writeStub).toHaveBeenCalled(); + }); + + test('in non-TTY mode, does not write ANSI overwrite codes', () => { + const writes: string[] = []; + vi.spyOn(process.stderr, 'write').mockImplementation((chunk: any) => { + writes.push(String(chunk)); + return true; + }); + Object.defineProperty(process.stderr, 'isTTY', { value: false, configurable: true }); + + const stats = new Statistics(); + const monitor = new Monitor(stats, undefined, { intervalSecs: 1 }); + + monitor.start(); + vi.advanceTimersByTime(1000); + monitor.stop(); + + const combined = writes.join(''); + // Should not contain ANSI cursor-up code + expect(combined).not.toContain('\x1b[5A'); + expect(combined).not.toContain('\x1b[2K'); + }); + + test('in TTY mode, second render writes ANSI cursor-up to overwrite', () => { + const writes: string[] = []; + vi.spyOn(process.stderr, 'write').mockImplementation((chunk: any) => { + writes.push(String(chunk)); + return true; + }); + Object.defineProperty(process.stderr, 'isTTY', { value: true, configurable: true }); + + const stats = new Statistics(); + const monitor = new Monitor(stats, undefined, { intervalSecs: 1 }); + + monitor.start(); + vi.advanceTimersByTime(1000); // first render (from start()) + vi.advanceTimersByTime(1000); // second render via interval — should have cursor-up + monitor.stop(); + + const combined = writes.join(''); + expect(combined).toContain('\x1b[5A'); + }); + + test('ETA is never negative when finished > total (approximate count)', () => { + const stats = new Statistics(); + // Simulate 11 finished but total = 10 (approximate) + for (let i = 0; i < 11; i++) { + stats.startJob(`r${i}`); + stats.finishJob(`r${i}`, 0); + } + + const monitor = new Monitor(stats, undefined, {}, () => 10); + const lines = monitor.buildLines(); + + // ETA should be ~00:00:00 (zero remaining), never a negative duration + expect(lines[2]).not.toMatch(/~-/); + expect(lines[2]).toContain('~00:00:00'); + }); + + test('monitorOptions.intervalSecs controls the refresh interval', () => { + const writes: string[] = []; + vi.spyOn(process.stderr, 'write').mockImplementation((chunk: any) => { + writes.push(String(chunk)); + return true; + }); + Object.defineProperty(process.stderr, 'isTTY', { value: false, configurable: true }); + + const stats = new Statistics(); + const monitor = new Monitor(stats, undefined, { intervalSecs: 3 }); + + monitor.start(); + // At 2 s: only the immediate render from start() — interval has not fired yet + vi.advanceTimersByTime(2000); + const writeCountBefore = writes.length; + + // At 3 s: interval fires once + vi.advanceTimersByTime(1000); + monitor.stop(); + + // After 3 s total, the interval should have fired exactly once, adding 5 more lines + expect(writes.length).toBe(writeCountBefore + 5); + }); + + test('CPU line shows N/A on Windows', () => { + const originalPlatform = process.platform; + Object.defineProperty(process, 'platform', { value: 'win32', configurable: true }); + vi.spyOn(os, 'loadavg').mockReturnValue([0, 0, 0]); + + const stats = new Statistics(); + const monitor = new Monitor(stats); + const lines = monitor.buildLines(); + + Object.defineProperty(process, 'platform', { value: originalPlatform, configurable: true }); + + expect(lines[3]).toContain('CPU: N/A'); + }); +});