Skip to content

Commit df65ceb

Browse files
committed
fix: filter batch error results instead of passing them as Documents
Kreuzberg's batch APIs return error results as ExtractionResult with empty metadata and None quality_score instead of raising exceptions. Previously these were silently passed through as valid Documents. - Add _is_batch_error() to utils.py to detect error results - Add _collect_batch_results() static method to filter errors with structured warning logs (classify_error + error_code_name) - Fix LogRecord conflict: rename 'name' kwarg to 'code_name' in both batch and sequential error logging paths - Add 3 unit tests for _is_batch_error detection logic - Add 3 integration tests for corrupt file/bytestream filtering
1 parent 419be44 commit df65ceb

4 files changed

Lines changed: 117 additions & 9 deletions

File tree

integrations/kreuzberg/src/haystack_integrations/components/converters/kreuzberg/converter.py

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
LanguageDetectionConfig,
2020
batch_extract_bytes_sync,
2121
batch_extract_files_sync,
22+
classify_error,
2223
config_to_json,
2324
detect_mime_type,
2425
error_code_name,
@@ -33,6 +34,7 @@
3334
_config_from_json_str,
3435
_copy_config,
3536
_get_table_markdown,
37+
_is_batch_error,
3638
_serialize_annotations,
3739
_serialize_images,
3840
_serialize_warnings,
@@ -218,20 +220,43 @@ def _extract_batch(
218220

219221
# Batch-extract file paths
220222
if file_paths:
221-
file_results = batch_extract_files_sync(file_paths, config=config, easyocr_kwargs=self.easyocr_kwargs)
222-
for idx, result in zip(file_indices, file_results, strict=True):
223-
results[idx] = result
223+
self._collect_batch_results(
224+
file_indices, batch_extract_files_sync(file_paths, config=config, easyocr_kwargs=self.easyocr_kwargs),
225+
sources, results,
226+
)
224227

225228
# Batch-extract byte streams
226229
if bytes_data:
227-
bytes_results = batch_extract_bytes_sync(
228-
bytes_data, bytes_mimes, config=config, easyocr_kwargs=self.easyocr_kwargs
230+
self._collect_batch_results(
231+
bytes_indices,
232+
batch_extract_bytes_sync(bytes_data, bytes_mimes, config=config, easyocr_kwargs=self.easyocr_kwargs),
233+
sources, results,
229234
)
230-
for idx, result in zip(bytes_indices, bytes_results, strict=True):
231-
results[idx] = result
232235

233236
return results
234237

238+
@staticmethod
239+
def _collect_batch_results(
240+
indices: list[int],
241+
batch_results: list[ExtractionResult],
242+
sources: list[str | Path | ByteStream],
243+
results: list[ExtractionResult | None],
244+
) -> None:
245+
"""Filter batch results, logging and skipping error entries."""
246+
for idx, result in zip(indices, batch_results, strict=True):
247+
if _is_batch_error(result):
248+
err_code = classify_error(result.content)
249+
logger.warning(
250+
"Could not convert {source} to Document. Error code: {code} ({code_name}). "
251+
"Details: {details}. Skipping it.",
252+
source=sources[idx],
253+
code=err_code,
254+
code_name=error_code_name(err_code),
255+
details=result.content,
256+
)
257+
continue
258+
results[idx] = result
259+
235260
@staticmethod
236261
def _build_extraction_metadata(result: ExtractionResult) -> dict[str, Any]:
237262
"""
@@ -501,11 +526,11 @@ def _extract_sequential(
501526
details = get_error_details()
502527
code_name = error_code_name(err_code) if err_code is not None else "UNKNOWN"
503528
logger.warning(
504-
"Could not convert {source} to Document. Error code: {code} ({name}). "
529+
"Could not convert {source} to Document. Error code: {code} ({code_name}). "
505530
"Details: {details}. Skipping it.",
506531
source=source,
507532
code=err_code,
508-
name=code_name,
533+
code_name=code_name,
509534
details=details.get("message", str(e)),
510535
)
511536
except Exception as diag_err:

integrations/kreuzberg/src/haystack_integrations/components/converters/kreuzberg/utils.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,21 @@
99
ExtractedImage,
1010
ExtractedTable,
1111
ExtractionConfig,
12+
ExtractionResult,
1213
config_to_json,
1314
)
1415

1516

17+
def _is_batch_error(result: ExtractionResult) -> bool:
18+
"""Detect error results returned by kreuzberg's batch APIs.
19+
20+
Batch APIs return ``ExtractionResult(content="Error: ...", metadata={},
21+
quality_score=None)`` instead of raising exceptions. Valid results always
22+
have populated metadata (at minimum ``output_format``).
23+
"""
24+
return result.metadata == {} and result.quality_score is None
25+
26+
1627
def _get_table_markdown(table: ExtractedTable | dict[str, Any]) -> str | None:
1728
"""Get markdown string from a table (`ExtractedTable` object or dict)."""
1829
if isinstance(table, dict):

integrations/kreuzberg/tests/test_converter.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from haystack_integrations.components.converters.kreuzberg import KreuzbergConverter
2222
from haystack_integrations.components.converters.kreuzberg.utils import (
23+
_is_batch_error,
2324
_serialize_warnings,
2425
)
2526

@@ -693,3 +694,33 @@ def test_run_batch_success(mock_get_bs: MagicMock, converter: KreuzbergConverter
693694
result = converter.run(sources=[Path("a.pdf"), Path("b.pdf")])
694695

695696
assert len(result["documents"]) == 2
697+
698+
699+
def test_is_batch_error_detects_error_result() -> None:
700+
"""Batch error results have empty metadata and None quality_score."""
701+
result = MagicMock(spec=ExtractionResult)
702+
result.metadata = {}
703+
result.quality_score = None
704+
result.content = "Error: could not parse file"
705+
706+
assert _is_batch_error(result) is True
707+
708+
709+
def test_is_batch_error_passes_valid_result() -> None:
710+
"""Valid results have populated metadata and are not flagged."""
711+
result = MagicMock(spec=ExtractionResult)
712+
result.metadata = {"output_format": "plain", "quality_score": 0.85}
713+
result.quality_score = 0.85
714+
result.content = "Hello world"
715+
716+
assert _is_batch_error(result) is False
717+
718+
719+
def test_is_batch_error_no_false_positive_on_error_content() -> None:
720+
"""Content starting with 'Error:' but valid metadata is NOT a batch error."""
721+
result = MagicMock(spec=ExtractionResult)
722+
result.metadata = {"output_format": "plain"}
723+
result.quality_score = 0.5
724+
result.content = "Error: this is actual document content that starts with Error:"
725+
726+
assert _is_batch_error(result) is False

integrations/kreuzberg/tests/test_converter_integration.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,3 +514,44 @@ def test_edge_config_not_mutated_across_runs() -> None:
514514
converter.run(sources=[FIXTURES_DIR / "sample.txt"])
515515
# Original config should not be mutated
516516
assert config.output_format == "html"
517+
518+
519+
@pytest.mark.integration
520+
def test_batch_filters_corrupt_files(tmp_path: Path) -> None:
521+
"""Batch mode filters corrupt files and keeps valid ones."""
522+
corrupt = tmp_path / "corrupt.pdf"
523+
corrupt.write_bytes(b"not a valid pdf")
524+
525+
converter = KreuzbergConverter(batch=True)
526+
result = converter.run(sources=[FIXTURES_DIR / "sample.txt", corrupt])
527+
docs = _docs(result)
528+
529+
assert len(docs) == 1
530+
assert "sample text document" in docs[0].content
531+
532+
533+
@pytest.mark.integration
534+
def test_batch_filters_corrupt_bytestream(tmp_path: Path) -> None:
535+
"""Batch mode filters corrupt ByteStream and keeps valid file."""
536+
corrupt_bs = ByteStream(data=b"not a valid pdf", mime_type="application/pdf")
537+
538+
converter = KreuzbergConverter(batch=True)
539+
result = converter.run(sources=[FIXTURES_DIR / "sample.txt", corrupt_bs])
540+
docs = _docs(result)
541+
542+
assert len(docs) == 1
543+
assert "sample text document" in docs[0].content
544+
545+
546+
@pytest.mark.integration
547+
def test_batch_all_corrupt(tmp_path: Path) -> None:
548+
"""Batch mode with all corrupt sources returns empty documents list."""
549+
corrupt1 = tmp_path / "corrupt1.pdf"
550+
corrupt1.write_bytes(b"not a valid pdf")
551+
corrupt2 = tmp_path / "corrupt2.pdf"
552+
corrupt2.write_bytes(b"also not valid")
553+
554+
converter = KreuzbergConverter(batch=True)
555+
result = converter.run(sources=[corrupt1, corrupt2])
556+
557+
assert result["documents"] == []

0 commit comments

Comments
 (0)