Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions app/workers/ingest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
"""Ingestion worker task — processes a Source into chunks and vectors."""

from __future__ import annotations
Expand Down Expand Up @@ -158,13 +159,30 @@ async def ingest_source(ctx: dict, source_id: str, tenant_id: str) -> dict:
except Exception as exc:
logger.exception("Ingestion failed for source %s", source_id)
await session.rollback()
# Try to mark the source as errored
try:
urls = config.get("urls", [])
if not urls:
# Fallback to single URL for backward compatibility
url = config.get("url", "")
urls = [url] if url else []
if not urls:

async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
tasks = [client.get(url, headers={"User-Agent": "MiniRAG/1.0"}) for url in urls]
responses = await asyncio.gather(*tasks, return_exceptions=True)

content_parts = []
for url, resp in zip(urls, responses):
if isinstance(resp, Exception):
logger.warning("Failed to fetch URL %s: %s", url, resp)
continue
try:
resp.raise_for_status()
content_parts.append(html_to_text(resp.text))
except Exception as e:
logger.warning("Failed to process URL %s: %s", url, e)

return "\n\n".join(content_parts)
source = await _get_source(session, source_id, tenant_id)
if source:
await _mark_error(session, source, str(exc)[:2000])
except Exception:
logger.exception("Failed to mark source %s as errored", source_id)

# Dispatch webhook for failure (fire-and-forget)
try:
Expand Down