Skip to content

Commit 503982a

Browse files
committed
feat: progress bar and batch flag
1 parent c499845 commit 503982a

3 files changed

Lines changed: 57 additions & 15 deletions

File tree

apps/cli/commands/extract.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ function parseFlags(args: string[]): ParsedFlags {
2222
const next = args[i + 1];
2323

2424
switch (arg) {
25-
case "--batch-size":
25+
case "--batch":
2626
case "-b":
2727
flags.batchSize = parseInt(next || "", 10);
2828
i++;
@@ -55,7 +55,7 @@ Storage is auto-selected based on environment:
5555
Already-extracted files are automatically skipped (tracked in index.jsonl).
5656
5757
Options
58-
--batch-size, -b <n> Number of files per batch (default: from EXTRACT_BATCH_SIZE or 100)
58+
--batch, -b <n> Limit to n documents (default: all)
5959
--workers, -w <n> Number of parallel workers (default: from EXTRACT_WORKERS or 4)
6060
--verbose, -v Show detailed progress
6161
--help, -h Show this help
@@ -68,11 +68,11 @@ Environment Variables
6868
R2_BUCKET_NAME R2 bucket (default: docx-corpus)
6969
EXTRACT_INPUT_PREFIX Input prefix (default: documents)
7070
EXTRACT_OUTPUT_PREFIX Output prefix (default: extracted)
71-
EXTRACT_BATCH_SIZE Batch size (default: 100)
7271
EXTRACT_WORKERS Worker count (default: 4)
7372
7473
Examples
75-
corpus extract # Extract up to batch size
74+
corpus extract # Extract all documents
75+
corpus extract -b 100 # Limit to 100 documents
7676
corpus extract -v # With verbose output
7777
corpus extract -b 50 -w 8 # Custom batch/workers
7878
`;
@@ -101,7 +101,7 @@ export async function runExtract(args: string[]) {
101101
storage,
102102
inputPrefix: envConfig.extract.inputPrefix,
103103
outputPrefix: envConfig.extract.outputPrefix,
104-
batchSize: flags.batchSize ?? envConfig.extract.batchSize,
104+
batchSize: flags.batchSize ?? Infinity,
105105
workers: flags.workers ?? envConfig.extract.workers,
106106
};
107107

@@ -113,7 +113,7 @@ export async function runExtract(args: string[]) {
113113
console.log(`Input: ${config.inputPrefix}/`);
114114
console.log(`Output: ${config.outputPrefix}/`);
115115
console.log(`Workers: ${config.workers}`);
116-
console.log(`Batch: ${config.batchSize}`);
116+
console.log(`Batch: ${config.batchSize === Infinity ? "all" : config.batchSize}`);
117117
console.log("");
118118

119119
try {

packages/extractor/processor.ts

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { mkdir, rm } from "node:fs/promises";
22
import { join, basename, dirname } from "node:path";
33
import { tmpdir } from "node:os";
44
import type { ExtractConfig, ExtractedDocument } from "./types";
5+
import { formatProgress, writeMultiLineProgress } from "@docx-corpus/shared";
56

67
const PYTHON_DIR = join(dirname(import.meta.path), "python");
78
const PYTHON_PATH = join(PYTHON_DIR, ".venv", "bin", "python");
@@ -34,7 +35,7 @@ export async function processDirectory(
3435
const id = extractIdFromKey(key);
3536
if (!indexState.successIds.has(id)) {
3637
files.push(key);
37-
if (files.length >= batchSize) {
38+
if (batchSize !== Infinity && files.length >= batchSize) {
3839
console.log(`Listed ${files.length} files to process (batch limit)`);
3940
break;
4041
}
@@ -122,6 +123,35 @@ async function processBatch(
122123
let errorCount = 0;
123124
const queue = [...keys];
124125

126+
// Progress tracking (only used when not verbose)
127+
const startTime = Date.now();
128+
let lastThroughputUpdate = startTime;
129+
let docsAtLastUpdate = 0;
130+
let currentDocsPerSec = 0;
131+
let prevLineCount = 0;
132+
133+
const updateProgress = () => {
134+
const now = Date.now();
135+
const elapsed = (now - lastThroughputUpdate) / 1000;
136+
if (elapsed >= 1) {
137+
currentDocsPerSec = (successCount + errorCount - docsAtLastUpdate) / elapsed;
138+
lastThroughputUpdate = now;
139+
docsAtLastUpdate = successCount + errorCount;
140+
}
141+
142+
const lines = formatProgress({
143+
saved: successCount + errorCount,
144+
total: keys.length,
145+
docsPerSec: currentDocsPerSec,
146+
failed: errorCount > 0 ? errorCount : undefined,
147+
elapsedMs: now - startTime,
148+
});
149+
150+
prevLineCount = writeMultiLineProgress(lines, prevLineCount);
151+
};
152+
153+
const progressInterval = !verbose ? setInterval(updateProgress, 100) : null;
154+
125155
const processFile = async (): Promise<void> => {
126156
while (queue.length > 0) {
127157
const sourceKey = queue.shift();
@@ -175,6 +205,14 @@ async function processBatch(
175205
.map(() => processFile());
176206

177207
await Promise.all(workerPromises);
208+
209+
// Clean up progress display
210+
if (progressInterval) {
211+
clearInterval(progressInterval);
212+
updateProgress(); // Final update
213+
console.log(); // Move to next line
214+
}
215+
178216
return { successCount, errorCount };
179217
}
180218

packages/shared/ui.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ export interface ProgressStats {
5353
saved: number;
5454
total: number;
5555
docsPerSec: number;
56-
currentRps: number;
57-
skipped: number;
58-
failed: number;
59-
retried: number;
56+
currentRps?: number;
57+
skipped?: number;
58+
failed?: number;
59+
retried?: number;
6060
elapsedMs: number;
6161
}
6262

@@ -76,10 +76,14 @@ export function formatProgress(stats: ProgressStats): string[] {
7676

7777
// Line 2: Metrics
7878
const metrics: string[] = [];
79-
metrics.push(`${docsPerSec.toFixed(1)}/s @ ${currentRps} RPS`);
80-
if (skipped > 0) metrics.push(`${skipped} dup`);
81-
if (failed > 0) metrics.push(`${failed} fail`);
82-
if (retried > 0) metrics.push(`${retried} retried`);
79+
if (currentRps !== undefined && currentRps > 0) {
80+
metrics.push(`${docsPerSec.toFixed(1)}/s @ ${currentRps} RPS`);
81+
} else {
82+
metrics.push(`${docsPerSec.toFixed(1)}/s`);
83+
}
84+
if (skipped !== undefined && skipped > 0) metrics.push(`${skipped} dup`);
85+
if (failed !== undefined && failed > 0) metrics.push(`${failed} fail`);
86+
if (retried !== undefined && retried > 0) metrics.push(`${retried} retried`);
8387
metrics.push(formatDuration(elapsedMs));
8488

8589
lines.push(metrics.join(" · "));

0 commit comments

Comments
 (0)