Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ def get_env_variable(
env_value = get_env_variable("PDF_EXTRACT_IMAGES", "False").lower()
PDF_EXTRACT_IMAGES = True if env_value == "true" else False

# Optional pre-extraction webhook: when text extraction returns pages averaging
# fewer than PRE_EXTRACTION_WEBHOOK_MIN_CHARS characters, the file is POSTed to
# PRE_EXTRACTION_WEBHOOK_URL. The webhook is expected to respond with
# `{"text": "...", "provider": "..."}` and that text replaces the original
# extraction. Disabled when the URL is empty (the default).
PRE_EXTRACTION_WEBHOOK_URL = get_env_variable("PRE_EXTRACTION_WEBHOOK_URL", "")
PRE_EXTRACTION_WEBHOOK_MIN_CHARS = int(
get_env_variable("PRE_EXTRACTION_WEBHOOK_MIN_CHARS", "100")
)
PRE_EXTRACTION_WEBHOOK_TIMEOUT = int(
get_env_variable("PRE_EXTRACTION_WEBHOOK_TIMEOUT", "60")
)

if POSTGRES_USE_UNIX_SOCKET:
connection_suffix = f"{urllib.parse.quote_plus(POSTGRES_USER)}:{urllib.parse.quote_plus(POSTGRES_PASSWORD)}@/{urllib.parse.quote_plus(POSTGRES_DB)}?host={urllib.parse.quote_plus(DB_HOST)}"
else:
Expand Down
12 changes: 12 additions & 0 deletions app/routes/document_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
clean_text,
process_documents,
cleanup_temp_encoding_file,
maybe_enrich_with_webhook,
)
from app.utils.health import is_health_ok

Expand Down Expand Up @@ -147,6 +148,11 @@ async def load_file_content(
loader, known_type, file_ext = get_loader(filename, content_type, file_path)
loop = asyncio.get_running_loop()
data = await loop.run_in_executor(executor, lambda: list(loader.lazy_load()))
# Optional pre-extraction webhook (e.g. OCR sidecar) — no-op when
# PRE_EXTRACTION_WEBHOOK_URL is unset.
data = await loop.run_in_executor(
executor, maybe_enrich_with_webhook, file_path, data
)
return data, known_type, file_ext
finally:
# Clean up temporary UTF-8 file if it was created for encoding conversion
Expand Down Expand Up @@ -756,6 +762,12 @@ async def embed_local_file(
data = await loop.run_in_executor(
request.app.state.thread_pool, lambda: list(loader.lazy_load())
)
data = await loop.run_in_executor(
request.app.state.thread_pool,
maybe_enrich_with_webhook,
file_path,
data,
)

result = await store_data_in_vector_db(
data,
Expand Down
87 changes: 86 additions & 1 deletion app/utils/document_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,15 @@

from langchain_core.documents import Document

from app.config import known_source_ext, PDF_EXTRACT_IMAGES, CHUNK_OVERLAP, logger
from app.config import (
known_source_ext,
PDF_EXTRACT_IMAGES,
CHUNK_OVERLAP,
PRE_EXTRACTION_WEBHOOK_URL,
PRE_EXTRACTION_WEBHOOK_MIN_CHARS,
PRE_EXTRACTION_WEBHOOK_TIMEOUT,
logger,
)
from langchain_community.document_loaders import (
TextLoader,
PyPDFLoader,
Expand Down Expand Up @@ -188,6 +196,83 @@ def remove_non_utf8(text: str) -> str:
return text


def maybe_enrich_with_webhook(
file_path: str, documents: List[Document]
) -> List[Document]:
"""
Optional hook: when PRE_EXTRACTION_WEBHOOK_URL is set and the current
extraction returned pages averaging fewer than
PRE_EXTRACTION_WEBHOOK_MIN_CHARS characters, POST the original file to the
webhook and substitute its returned text. On any failure the original
documents are returned unchanged (soft-fail by design).
"""
url = PRE_EXTRACTION_WEBHOOK_URL
if not url:
return documents

# Compute average characters per extracted page; empty list counts as 0.
if documents:
avg_chars = sum(
len((doc.page_content or "").strip()) for doc in documents
) / len(documents)
else:
avg_chars = 0

if avg_chars >= PRE_EXTRACTION_WEBHOOK_MIN_CHARS:
return documents

# Import lazily so module import works in environments without `requests`
# (e.g. test collection phase when the feature is disabled).
import requests

try:
with open(file_path, "rb") as f:
response = requests.post(
url,
files={"file": (os.path.basename(file_path), f)},
timeout=PRE_EXTRACTION_WEBHOOK_TIMEOUT,
)
response.raise_for_status()
payload = response.json()
except Exception as exc: # broad by intent — never break ingest
logger.warning(
"pre-extraction webhook failed, falling back to original text: %s", exc
)
return documents

text = (payload.get("text") or "").strip()
if not text:
logger.warning(
"pre-extraction webhook returned empty text, keeping original extraction"
)
return documents

provider = payload.get("provider", "unknown")
source = (
documents[0].metadata.get("source")
if documents and isinstance(documents[0].metadata, dict)
else file_path
)

logger.info(
"pre-extraction webhook enriched %s with %d chars from provider %s",
file_path,
len(text),
provider,
)

return [
Document(
page_content=text,
metadata={
"source": source,
"ocr_used": True,
"ocr_provider": provider,
},
)
]


def process_documents(documents: List[Document]) -> str:
processed_text = ""
last_page: Optional[int] = None
Expand Down
1 change: 1 addition & 0 deletions requirements.lite.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ chardet==5.2.0
langchain-ollama==1.0.1
tenacity>=9.0.0
msoffcrypto-tool>=6.0.0,<7
requests>=2.31.0,<3
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ pydantic>=2.10.6,<3
chardet==5.2.0
tenacity>=9.0.0
msoffcrypto-tool>=6.0.0,<7
requests>=2.31.0,<3
4 changes: 3 additions & 1 deletion test_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ pytest==8.3.4
pytest-asyncio==0.26.0
pytest-postgresql==7.0.1
mongomock==4.3.0
httpx==0.27.0
httpx==0.27.0
responses==0.25.3
requests>=2.31.0,<3
158 changes: 158 additions & 0 deletions tests/utils/test_pre_extraction_webhook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
"""Tests for the optional pre-extraction webhook in document_loader.

The webhook is enabled only when PRE_EXTRACTION_WEBHOOK_URL is set. These tests
exercise the helper directly so we can cover the branching logic without
standing up FastAPI.
"""

import importlib

import pytest
import responses
from langchain_core.documents import Document


WEBHOOK_URL = "http://ocr-sidecar.test/extract"


@pytest.fixture
def enabled_webhook(monkeypatch):
"""Reload config + document_loader so the module-level constants pick up
the new environment variables."""
monkeypatch.setenv("PRE_EXTRACTION_WEBHOOK_URL", WEBHOOK_URL)
monkeypatch.setenv("PRE_EXTRACTION_WEBHOOK_MIN_CHARS", "100")
monkeypatch.setenv("PRE_EXTRACTION_WEBHOOK_TIMEOUT", "5")

import app.config as config_module
import app.utils.document_loader as loader_module

importlib.reload(config_module)
importlib.reload(loader_module)
yield loader_module

# Reset after the test so other tests see the default (disabled) state.
monkeypatch.delenv("PRE_EXTRACTION_WEBHOOK_URL", raising=False)
importlib.reload(config_module)
importlib.reload(loader_module)


@pytest.fixture
def disabled_webhook(monkeypatch):
monkeypatch.delenv("PRE_EXTRACTION_WEBHOOK_URL", raising=False)
import app.config as config_module
import app.utils.document_loader as loader_module

importlib.reload(config_module)
importlib.reload(loader_module)
yield loader_module


def _make_pdf(tmp_path, name="input.pdf"):
p = tmp_path / name
p.write_bytes(b"%PDF-1.4\n%fake\n")
return str(p)


def test_webhook_disabled_is_noop(disabled_webhook, tmp_path):
"""When the feature flag is empty, the helper must return the input
unchanged without issuing any HTTP requests."""
docs = [Document(page_content="", metadata={"source": "a.pdf"})]
file_path = _make_pdf(tmp_path)
out = disabled_webhook.maybe_enrich_with_webhook(file_path, docs)
assert out is docs


def test_above_threshold_skips_webhook(enabled_webhook, tmp_path):
"""If average chars per page already exceeds the threshold we trust the
existing extraction and do not call the webhook."""
docs = [
Document(page_content="a" * 150, metadata={"source": "a.pdf"}),
Document(page_content="b" * 150, metadata={"source": "a.pdf"}),
]
file_path = _make_pdf(tmp_path)

# responses active but no registered endpoints — an unexpected call would
# raise ConnectionError, which is exactly what we want to assert.
with responses.RequestsMock() as rsps:
out = enabled_webhook.maybe_enrich_with_webhook(file_path, docs)
assert out == docs
assert len(rsps.calls) == 0


def test_below_threshold_triggers_webhook_and_replaces_documents(
enabled_webhook, tmp_path
):
"""Pages with effectively no text should be sent to the webhook and the
result should replace the documents, annotated with ocr metadata."""
docs = [
Document(page_content="", metadata={"source": "scanned.pdf"}),
Document(page_content=" ", metadata={"source": "scanned.pdf"}),
]
file_path = _make_pdf(tmp_path, name="scanned.pdf")

with responses.RequestsMock() as rsps:
rsps.add(
responses.POST,
WEBHOOK_URL,
json={
"text": "OCRed contract content",
"provider": "azure-di",
"pages_processed": 2,
},
status=200,
)
out = enabled_webhook.maybe_enrich_with_webhook(file_path, docs)

assert len(out) == 1
assert out[0].page_content == "OCRed contract content"
assert out[0].metadata["ocr_used"] is True
assert out[0].metadata["ocr_provider"] == "azure-di"
assert out[0].metadata["source"] == "scanned.pdf"


def test_webhook_failure_falls_back_to_original(enabled_webhook, tmp_path):
"""HTTP errors must never break ingest; we keep the original documents."""
docs = [Document(page_content="", metadata={"source": "scanned.pdf"})]
file_path = _make_pdf(tmp_path, name="scanned.pdf")

with responses.RequestsMock() as rsps:
rsps.add(responses.POST, WEBHOOK_URL, status=500)
out = enabled_webhook.maybe_enrich_with_webhook(file_path, docs)

assert out == docs


def test_webhook_empty_text_falls_back(enabled_webhook, tmp_path):
"""A 200 response with empty text means OCR had nothing to offer; keep
whatever PyPDF returned rather than destroy the extraction."""
docs = [Document(page_content="", metadata={"source": "scanned.pdf"})]
file_path = _make_pdf(tmp_path, name="scanned.pdf")

with responses.RequestsMock() as rsps:
rsps.add(
responses.POST,
WEBHOOK_URL,
json={"text": "", "provider": "azure-di"},
status=200,
)
out = enabled_webhook.maybe_enrich_with_webhook(file_path, docs)

assert out == docs


def test_webhook_handles_empty_document_list(enabled_webhook, tmp_path):
"""Empty list: avg_chars is 0 → webhook is called."""
file_path = _make_pdf(tmp_path, name="scanned.pdf")

with responses.RequestsMock() as rsps:
rsps.add(
responses.POST,
WEBHOOK_URL,
json={"text": "rescued text", "provider": "azure-di"},
status=200,
)
out = enabled_webhook.maybe_enrich_with_webhook(file_path, [])

assert len(out) == 1
assert out[0].page_content == "rescued text"
assert out[0].metadata["ocr_used"] is True