Skip to content

Commit a8db73d

Browse files
authored
Improve batches (#54)
* Include exception when batch entry fails * Clarify normalization when multiple drivers included in single BatchTask * Circuit breaker for authentication and quota exceptions * Parse command use batch mode, remove sync execution * Always prefix file outputs when using parse command --------- Co-authored-by: avvertix <5672748+avvertix@users.noreply.github.com>
1 parent 87f0293 commit a8db73d

9 files changed

Lines changed: 542 additions & 233 deletions

File tree

docs/howto/batch_processing.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,40 @@ When `stop_on_error=True`:
101101
- Only completed results (including the failed one) are returned
102102

103103

104+
## Circuit Breaker
105+
106+
Batch processing includes a built-in circuit breaker that detects systemic driver failures and short-circuits remaining tasks for the affected driver. This prevents wasting API calls and time when a driver is guaranteed to fail (e.g., invalid API key, exhausted quota).
107+
108+
The circuit breaker trips immediately (after a single failure) for these exception types:
109+
110+
| Exception | Meaning |
111+
|---|---|
112+
| `AuthenticationException` | API key or token is invalid |
113+
| `QuotaExceededException` | Account balance or credits exhausted |
114+
| `RateLimitException` | Rate limit hit |
115+
116+
Per-file errors like `FileNotFoundException` or `ParsingException` do **not** trip the circuit, since they are specific to individual files and don't indicate a driver-wide problem.
117+
118+
The circuit breaker is **per-driver**: if LlamaParse fails with an authentication error, PyMuPDF tasks continue unaffected. Short-circuited results carry the original tripping exception in `BatchResult.exception` and `BatchResult.error`.
119+
120+
A new circuit breaker is created for each `batch()` / `batch_iter()` call, so previous failures do not carry over between calls.
121+
122+
```python
123+
results = Parxy.batch(
124+
tasks=['doc1.pdf', 'doc2.pdf', 'doc3.pdf'],
125+
drivers=['llamaparse', 'pymupdf'],
126+
)
127+
128+
for result in results:
129+
if result.failed:
130+
# If llamaparse auth fails on doc1, doc2 and doc3 are
131+
# short-circuited immediately, i.e. no additional API calls.
132+
print(f'{result.file} ({result.driver}): {result.error}')
133+
else:
134+
print(f'{result.file} ({result.driver}): OK')
135+
```
136+
137+
104138
## Advanced: Per-File Configuration with BatchTask
105139

106140
For more control, use `BatchTask` objects to specify per-file configuration:

docs/tutorials/using_cli.md

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ The `parse` command is a powerful tool for extracting text from documents with e
2626

2727
### Basic Usage
2828

29-
Parse a single document using the default settings (PyMuPDF driver, markdown output):
29+
Parse a single document using the default settings (PyMuPDF driver, json output):
3030

3131
```bash
3232
parxy parse document.pdf
3333
```
3434

35-
This creates a `document.md` file in the same directory as the source file.
35+
This creates a `pymupdf-document.json` file in the same directory as the source file. Parxy always prefix the output file with the driver name.
3636

3737
### Processing Multiple Files and Folders
3838

@@ -103,29 +103,19 @@ Specify a driver with the `--driver` (`-d`) option:
103103

104104
```bash
105105
parxy parse --driver llamaparse document.pdf
106+
# output will be saved as llamaparse-document.json
106107
```
107108

108109
### Using Multiple Drivers for Comparison
109110

110-
Parse the same document(s) with multiple drivers by specifying `--driver` multiple times:
111+
Parse the same document(s) with multiple drivers by specifying `--driver` (or `-d` for short) multiple times:
111112

112113
```bash
113114
parxy parse document.pdf -d pymupdf -d llamaparse
114115
```
115116

116-
When using multiple drivers, Parxy automatically appends the driver name to the output filenames:
117-
- `document_pymupdf.md`
118-
- `document_llamaparse.md`
117+
When using multiple drivers, Parxy always prepend the driver name to the output filenames, e.g. `pymupdf-document.json`, `llamaparse-document.json`. This is particularly useful for comparing extraction quality across different parsers.
119118

120-
This is particularly useful for comparing extraction quality across different parsers.
121-
122-
### Showing Output in Console
123-
124-
By default, output is only saved to files. To also display content in the console, use the `--show` (`-s`) flag:
125-
126-
```bash
127-
parxy parse document.pdf --show
128-
```
129119

130120
### Progress Tracking
131121

src/parxy_cli/commands/parse.py

Lines changed: 44 additions & 166 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
"""Command line interface for Parxy document processing."""
22

3-
import os
43
from datetime import timedelta
54
from pathlib import Path
65
from typing import Optional, List, Annotated
76

87
import typer
98

109
from parxy_core.facade import Parxy
11-
from parxy_core.models import Document, BatchTask, BatchResult
10+
from parxy_core.models import Document, BatchResult
1211

1312
from parxy_cli.models import Level, OutputMode
1413
from parxy_cli.console.console import Console
@@ -108,67 +107,6 @@ def get_content(doc: Document, mode: OutputMode) -> str:
108107
return doc.text()
109108

110109

111-
def process_file_with_driver(
112-
file_path: Path,
113-
driver: str,
114-
level: Level,
115-
mode: OutputMode,
116-
output_dir: Optional[Path],
117-
show: bool,
118-
use_driver_suffix: bool = False,
119-
) -> tuple[str, int]:
120-
"""
121-
Process a single file with a single driver.
122-
123-
Args:
124-
file_path: Path to file to process
125-
driver: Driver name to use
126-
level: Extraction level
127-
mode: Output mode
128-
output_dir: Optional output directory
129-
show: Whether to show content in console
130-
use_driver_suffix: Whether to append driver name to output filename
131-
132-
Returns:
133-
Tuple of (output_path, page_count)
134-
"""
135-
# Parse the document
136-
doc = Parxy.parse(
137-
file=str(file_path),
138-
level=level.value,
139-
driver_name=driver,
140-
)
141-
142-
# Get content
143-
content = get_content(doc, mode)
144-
145-
# Determine output path
146-
if output_dir:
147-
output_dir.mkdir(parents=True, exist_ok=True)
148-
base_name = file_path.stem
149-
else:
150-
# Save in same directory as source file
151-
output_dir = file_path.parent
152-
base_name = file_path.stem
153-
154-
# If multiple drivers, append driver name to filename
155-
if use_driver_suffix and driver:
156-
base_name = f'{base_name}-{driver}'
157-
158-
extension = get_output_extension(mode)
159-
output_path = output_dir / f'{base_name}{extension}'
160-
161-
# Save to file
162-
output_path.write_text(content, encoding='utf-8')
163-
164-
# Show in console if requested
165-
if show:
166-
console.print(content)
167-
console.newline()
168-
169-
return str(output_path), len(doc.pages)
170-
171-
172110
def format_timedelta(td):
173111
days = td.days
174112
milliseconds = td.microseconds // 1000
@@ -195,7 +133,7 @@ def save_batch_result(
195133
mode: OutputMode,
196134
output_dir: Optional[Path],
197135
show: bool,
198-
use_driver_suffix: bool = False,
136+
use_driver_prefix: bool = True,
199137
) -> tuple[str, int]:
200138
"""
201139
Save a BatchResult to file.
@@ -205,7 +143,7 @@ def save_batch_result(
205143
mode: Output mode
206144
output_dir: Optional output directory
207145
show: Whether to show content in console
208-
use_driver_suffix: Whether to append driver name to output filename
146+
use_driver_prefix: Whether to prepend driver name to output filename
209147
210148
Returns:
211149
Tuple of (output_path, page_count)
@@ -226,8 +164,8 @@ def save_batch_result(
226164
base_name = file_path.stem
227165

228166
# If multiple drivers, append driver name to filename
229-
if use_driver_suffix and result.driver:
230-
base_name = f'{base_name}-{result.driver}'
167+
if use_driver_prefix and result.driver:
168+
base_name = f'{result.driver}-{base_name}'
231169

232170
extension = get_output_extension(mode)
233171
output_path = output_dir / f'{base_name}{extension}'
@@ -314,23 +252,15 @@ def parse(
314252
help='Stop processing files immediately if an error occurs with any file',
315253
),
316254
] = False,
317-
parallel: Annotated[
318-
bool,
319-
typer.Option(
320-
'--parallel',
321-
'-p',
322-
help='Process files in parallel using multiple workers',
323-
),
324-
] = False,
325255
workers: Annotated[
326-
Optional[int],
256+
int,
327257
typer.Option(
328258
'--workers',
329259
'-w',
330-
help='Number of parallel workers to use (only applies with --parallel). Defaults to CPU count.',
260+
help='Number of parallel workers to use. Defaults to 2.',
331261
min=1,
332262
),
333-
] = None,
263+
] = 2,
334264
):
335265
"""
336266
Parse documents using one or more drivers.
@@ -361,8 +291,8 @@ def parse(
361291
# Output as JSON and show in console
362292
parxy parse document.pdf -m json --show
363293
364-
# Process files in parallel with 4 workers
365-
parxy parse /path/to/folder --parallel --workers 4
294+
# Process files with 4 workers
295+
parxy parse /path/to/folder --workers 4
366296
"""
367297
console.action('Parse files', space_after=False)
368298
# Collect all files
@@ -382,21 +312,8 @@ def parse(
382312
# Calculate total tasks
383313
total_tasks = len(files) * len(drivers)
384314

385-
# Determine if we should use driver suffix (when multiple drivers are used)
386-
use_driver_suffix = len(drivers) > 1
387-
388-
if use_driver_suffix:
389-
console.info(
390-
'You have specified more than one driver. Driver name will be added as suffix to the file name while saving.'
391-
)
392-
393315
error_count = 0
394316

395-
# Determine number of workers for parallel processing
396-
if parallel:
397-
max_workers = workers if workers else (os.cpu_count() or 2)
398-
console.info(f'Using parallel processing with {max_workers} workers')
399-
400317
# Show info
401318
with console.shimmer(
402319
f'Processing {len(files)} file{"s" if len(files) > 1 else ""} with {len(drivers)} driver{"s" if len(drivers) > 1 else ""}...'
@@ -405,83 +322,44 @@ def parse(
405322
with console.progress('Processing documents') as progress:
406323
task = progress.add_task('', total=total_tasks)
407324

408-
if parallel:
409-
# Parallel processing using batch_iter
410-
batch_tasks = [str(f) for f in files]
411-
412-
for result in Parxy.batch_iter(
413-
tasks=batch_tasks,
414-
drivers=drivers,
415-
level=level.value,
416-
workers=max_workers,
417-
):
418-
file_name = (
419-
Path(result.file).name
420-
if isinstance(result.file, str)
421-
else 'document'
325+
batch_tasks = [str(f) for f in files]
326+
327+
for result in Parxy.batch_iter(
328+
tasks=batch_tasks,
329+
drivers=drivers,
330+
level=level.value,
331+
workers=workers,
332+
):
333+
file_name = (
334+
Path(result.file).name
335+
if isinstance(result.file, str)
336+
else 'document'
337+
)
338+
339+
if result.success:
340+
output_file, page_count = save_batch_result(
341+
result=result,
342+
mode=mode,
343+
output_dir=output_path,
344+
show=show,
422345
)
346+
console.print(
347+
f'[faint]⎿ [/faint] {file_name} via {result.driver} to [success]{output_file}[/success] [faint]({page_count} pages)[/faint]'
348+
)
349+
else:
350+
console.print(
351+
f'[faint]⎿ [/faint] {file_name} via {result.driver} error. [error]{result.error}[/error]'
352+
)
353+
error_count += 1
423354

424-
if result.success:
425-
output_file, page_count = save_batch_result(
426-
result=result,
427-
mode=mode,
428-
output_dir=output_path,
429-
show=show,
430-
use_driver_suffix=use_driver_suffix,
431-
)
432-
console.print(
433-
f'[faint]⎿ [/faint] {file_name} via {result.driver} to [success]{output_file}[/success] [faint]({page_count} pages)[/faint]'
355+
if stop_on_failure:
356+
console.newline()
357+
console.info(
358+
'Stopping due to error (--stop-on-failure flag is set)'
434359
)
435-
else:
436-
console.print(
437-
f'[faint]⎿ [/faint] {file_name} via {result.driver} error. [error]{result.error}[/error]'
438-
)
439-
error_count += 1
440-
441-
if stop_on_failure:
442-
console.newline()
443-
console.info(
444-
'Stopping due to error (--stop-on-failure flag is set)'
445-
)
446-
raise typer.Exit(1)
360+
raise typer.Exit(1)
447361

448-
progress.update(task, advance=1)
449-
else:
450-
# Sequential processing
451-
for file_path in files:
452-
for driver in drivers:
453-
try:
454-
output_file, page_count = process_file_with_driver(
455-
file_path=file_path,
456-
driver=driver,
457-
level=level,
458-
mode=mode,
459-
output_dir=output_path,
460-
show=show,
461-
use_driver_suffix=use_driver_suffix,
462-
)
463-
464-
# Update progress
465-
console.print(
466-
f'[faint]⎿ [/faint] {file_path.name} via {driver} to [success]{output_file}[/success] [faint]({page_count} pages)[/faint]'
467-
)
468-
progress.update(task, advance=1)
469-
470-
except Exception as e:
471-
console.print(
472-
f'[faint]⎿ [/faint] {file_path.name} via {driver} error. [error]{str(e)}[/error]'
473-
)
474-
progress.update(task, advance=1)
475-
error_count += 1
476-
477-
if stop_on_failure:
478-
console.newline()
479-
console.info(
480-
'Stopping due to error (--stop-on-failure flag is set)'
481-
)
482-
raise typer.Exit(1)
483-
484-
continue
362+
progress.update(task, advance=1)
485363

486364
elapsed_time = format_timedelta(
487365
timedelta(seconds=max(0, progress.tasks[0].elapsed))

0 commit comments

Comments
 (0)