diff --git a/app/config.py b/app/config.py index 8031763a..98b655a7 100644 --- a/app/config.py +++ b/app/config.py @@ -90,6 +90,19 @@ def get_env_variable( env_value = get_env_variable("PDF_EXTRACT_IMAGES", "False").lower() PDF_EXTRACT_IMAGES = True if env_value == "true" else False +# Optional pre-extraction webhook: when text extraction returns pages averaging +# fewer than PRE_EXTRACTION_WEBHOOK_MIN_CHARS characters, the file is POSTed to +# PRE_EXTRACTION_WEBHOOK_URL. The webhook is expected to respond with +# `{"text": "...", "provider": "..."}` and that text replaces the original +# extraction. Disabled when the URL is empty (the default). +PRE_EXTRACTION_WEBHOOK_URL = get_env_variable("PRE_EXTRACTION_WEBHOOK_URL", "") +PRE_EXTRACTION_WEBHOOK_MIN_CHARS = int( + get_env_variable("PRE_EXTRACTION_WEBHOOK_MIN_CHARS", "100") +) +PRE_EXTRACTION_WEBHOOK_TIMEOUT = int( + get_env_variable("PRE_EXTRACTION_WEBHOOK_TIMEOUT", "60") +) + if POSTGRES_USE_UNIX_SOCKET: connection_suffix = f"{urllib.parse.quote_plus(POSTGRES_USER)}:{urllib.parse.quote_plus(POSTGRES_PASSWORD)}@/{urllib.parse.quote_plus(POSTGRES_DB)}?host={urllib.parse.quote_plus(DB_HOST)}" else: diff --git a/app/routes/document_routes.py b/app/routes/document_routes.py index c5ebd3f5..45e8bfcc 100644 --- a/app/routes/document_routes.py +++ b/app/routes/document_routes.py @@ -52,6 +52,7 @@ clean_text, process_documents, cleanup_temp_encoding_file, + maybe_enrich_with_webhook, ) from app.utils.health import is_health_ok @@ -147,6 +148,11 @@ async def load_file_content( loader, known_type, file_ext = get_loader(filename, content_type, file_path) loop = asyncio.get_running_loop() data = await loop.run_in_executor(executor, lambda: list(loader.lazy_load())) + # Optional pre-extraction webhook (e.g. OCR sidecar) — no-op when + # PRE_EXTRACTION_WEBHOOK_URL is unset. + data = await loop.run_in_executor( + executor, maybe_enrich_with_webhook, file_path, data + ) return data, known_type, file_ext finally: # Clean up temporary UTF-8 file if it was created for encoding conversion @@ -756,6 +762,12 @@ async def embed_local_file( data = await loop.run_in_executor( request.app.state.thread_pool, lambda: list(loader.lazy_load()) ) + data = await loop.run_in_executor( + request.app.state.thread_pool, + maybe_enrich_with_webhook, + file_path, + data, + ) result = await store_data_in_vector_db( data, diff --git a/app/utils/document_loader.py b/app/utils/document_loader.py index 900b1561..e589c5b1 100644 --- a/app/utils/document_loader.py +++ b/app/utils/document_loader.py @@ -9,7 +9,15 @@ from langchain_core.documents import Document -from app.config import known_source_ext, PDF_EXTRACT_IMAGES, CHUNK_OVERLAP, logger +from app.config import ( + known_source_ext, + PDF_EXTRACT_IMAGES, + CHUNK_OVERLAP, + PRE_EXTRACTION_WEBHOOK_URL, + PRE_EXTRACTION_WEBHOOK_MIN_CHARS, + PRE_EXTRACTION_WEBHOOK_TIMEOUT, + logger, +) from langchain_community.document_loaders import ( TextLoader, PyPDFLoader, @@ -188,6 +196,83 @@ def remove_non_utf8(text: str) -> str: return text +def maybe_enrich_with_webhook( + file_path: str, documents: List[Document] +) -> List[Document]: + """ + Optional hook: when PRE_EXTRACTION_WEBHOOK_URL is set and the current + extraction returned pages averaging fewer than + PRE_EXTRACTION_WEBHOOK_MIN_CHARS characters, POST the original file to the + webhook and substitute its returned text. On any failure the original + documents are returned unchanged (soft-fail by design). + """ + url = PRE_EXTRACTION_WEBHOOK_URL + if not url: + return documents + + # Compute average characters per extracted page; empty list counts as 0. + if documents: + avg_chars = sum( + len((doc.page_content or "").strip()) for doc in documents + ) / len(documents) + else: + avg_chars = 0 + + if avg_chars >= PRE_EXTRACTION_WEBHOOK_MIN_CHARS: + return documents + + # Import lazily so module import works in environments without `requests` + # (e.g. test collection phase when the feature is disabled). + import requests + + try: + with open(file_path, "rb") as f: + response = requests.post( + url, + files={"file": (os.path.basename(file_path), f)}, + timeout=PRE_EXTRACTION_WEBHOOK_TIMEOUT, + ) + response.raise_for_status() + payload = response.json() + except Exception as exc: # broad by intent — never break ingest + logger.warning( + "pre-extraction webhook failed, falling back to original text: %s", exc + ) + return documents + + text = (payload.get("text") or "").strip() + if not text: + logger.warning( + "pre-extraction webhook returned empty text, keeping original extraction" + ) + return documents + + provider = payload.get("provider", "unknown") + source = ( + documents[0].metadata.get("source") + if documents and isinstance(documents[0].metadata, dict) + else file_path + ) + + logger.info( + "pre-extraction webhook enriched %s with %d chars from provider %s", + file_path, + len(text), + provider, + ) + + return [ + Document( + page_content=text, + metadata={ + "source": source, + "ocr_used": True, + "ocr_provider": provider, + }, + ) + ] + + def process_documents(documents: List[Document]) -> str: processed_text = "" last_page: Optional[int] = None diff --git a/requirements.lite.txt b/requirements.lite.txt index b3454c06..0158a7f1 100644 --- a/requirements.lite.txt +++ b/requirements.lite.txt @@ -35,3 +35,4 @@ chardet==5.2.0 langchain-ollama==1.0.1 tenacity>=9.0.0 msoffcrypto-tool>=6.0.0,<7 +requests>=2.31.0,<3 diff --git a/requirements.txt b/requirements.txt index 18cfaf06..3f8015b2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -39,3 +39,4 @@ pydantic>=2.10.6,<3 chardet==5.2.0 tenacity>=9.0.0 msoffcrypto-tool>=6.0.0,<7 +requests>=2.31.0,<3 diff --git a/test_requirements.txt b/test_requirements.txt index dcf35571..906cfe52 100644 --- a/test_requirements.txt +++ b/test_requirements.txt @@ -2,4 +2,6 @@ pytest==8.3.4 pytest-asyncio==0.26.0 pytest-postgresql==7.0.1 mongomock==4.3.0 -httpx==0.27.0 \ No newline at end of file +httpx==0.27.0 +responses==0.25.3 +requests>=2.31.0,<3 diff --git a/tests/utils/test_pre_extraction_webhook.py b/tests/utils/test_pre_extraction_webhook.py new file mode 100644 index 00000000..1d890912 --- /dev/null +++ b/tests/utils/test_pre_extraction_webhook.py @@ -0,0 +1,158 @@ +"""Tests for the optional pre-extraction webhook in document_loader. + +The webhook is enabled only when PRE_EXTRACTION_WEBHOOK_URL is set. These tests +exercise the helper directly so we can cover the branching logic without +standing up FastAPI. +""" + +import importlib + +import pytest +import responses +from langchain_core.documents import Document + + +WEBHOOK_URL = "http://ocr-sidecar.test/extract" + + +@pytest.fixture +def enabled_webhook(monkeypatch): + """Reload config + document_loader so the module-level constants pick up + the new environment variables.""" + monkeypatch.setenv("PRE_EXTRACTION_WEBHOOK_URL", WEBHOOK_URL) + monkeypatch.setenv("PRE_EXTRACTION_WEBHOOK_MIN_CHARS", "100") + monkeypatch.setenv("PRE_EXTRACTION_WEBHOOK_TIMEOUT", "5") + + import app.config as config_module + import app.utils.document_loader as loader_module + + importlib.reload(config_module) + importlib.reload(loader_module) + yield loader_module + + # Reset after the test so other tests see the default (disabled) state. + monkeypatch.delenv("PRE_EXTRACTION_WEBHOOK_URL", raising=False) + importlib.reload(config_module) + importlib.reload(loader_module) + + +@pytest.fixture +def disabled_webhook(monkeypatch): + monkeypatch.delenv("PRE_EXTRACTION_WEBHOOK_URL", raising=False) + import app.config as config_module + import app.utils.document_loader as loader_module + + importlib.reload(config_module) + importlib.reload(loader_module) + yield loader_module + + +def _make_pdf(tmp_path, name="input.pdf"): + p = tmp_path / name + p.write_bytes(b"%PDF-1.4\n%fake\n") + return str(p) + + +def test_webhook_disabled_is_noop(disabled_webhook, tmp_path): + """When the feature flag is empty, the helper must return the input + unchanged without issuing any HTTP requests.""" + docs = [Document(page_content="", metadata={"source": "a.pdf"})] + file_path = _make_pdf(tmp_path) + out = disabled_webhook.maybe_enrich_with_webhook(file_path, docs) + assert out is docs + + +def test_above_threshold_skips_webhook(enabled_webhook, tmp_path): + """If average chars per page already exceeds the threshold we trust the + existing extraction and do not call the webhook.""" + docs = [ + Document(page_content="a" * 150, metadata={"source": "a.pdf"}), + Document(page_content="b" * 150, metadata={"source": "a.pdf"}), + ] + file_path = _make_pdf(tmp_path) + + # responses active but no registered endpoints — an unexpected call would + # raise ConnectionError, which is exactly what we want to assert. + with responses.RequestsMock() as rsps: + out = enabled_webhook.maybe_enrich_with_webhook(file_path, docs) + assert out == docs + assert len(rsps.calls) == 0 + + +def test_below_threshold_triggers_webhook_and_replaces_documents( + enabled_webhook, tmp_path +): + """Pages with effectively no text should be sent to the webhook and the + result should replace the documents, annotated with ocr metadata.""" + docs = [ + Document(page_content="", metadata={"source": "scanned.pdf"}), + Document(page_content=" ", metadata={"source": "scanned.pdf"}), + ] + file_path = _make_pdf(tmp_path, name="scanned.pdf") + + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + WEBHOOK_URL, + json={ + "text": "OCRed contract content", + "provider": "azure-di", + "pages_processed": 2, + }, + status=200, + ) + out = enabled_webhook.maybe_enrich_with_webhook(file_path, docs) + + assert len(out) == 1 + assert out[0].page_content == "OCRed contract content" + assert out[0].metadata["ocr_used"] is True + assert out[0].metadata["ocr_provider"] == "azure-di" + assert out[0].metadata["source"] == "scanned.pdf" + + +def test_webhook_failure_falls_back_to_original(enabled_webhook, tmp_path): + """HTTP errors must never break ingest; we keep the original documents.""" + docs = [Document(page_content="", metadata={"source": "scanned.pdf"})] + file_path = _make_pdf(tmp_path, name="scanned.pdf") + + with responses.RequestsMock() as rsps: + rsps.add(responses.POST, WEBHOOK_URL, status=500) + out = enabled_webhook.maybe_enrich_with_webhook(file_path, docs) + + assert out == docs + + +def test_webhook_empty_text_falls_back(enabled_webhook, tmp_path): + """A 200 response with empty text means OCR had nothing to offer; keep + whatever PyPDF returned rather than destroy the extraction.""" + docs = [Document(page_content="", metadata={"source": "scanned.pdf"})] + file_path = _make_pdf(tmp_path, name="scanned.pdf") + + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + WEBHOOK_URL, + json={"text": "", "provider": "azure-di"}, + status=200, + ) + out = enabled_webhook.maybe_enrich_with_webhook(file_path, docs) + + assert out == docs + + +def test_webhook_handles_empty_document_list(enabled_webhook, tmp_path): + """Empty list: avg_chars is 0 → webhook is called.""" + file_path = _make_pdf(tmp_path, name="scanned.pdf") + + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + WEBHOOK_URL, + json={"text": "rescued text", "provider": "azure-di"}, + status=200, + ) + out = enabled_webhook.maybe_enrich_with_webhook(file_path, []) + + assert len(out) == 1 + assert out[0].page_content == "rescued text" + assert out[0].metadata["ocr_used"] is True