diff --git a/app/workers/ingest.py b/app/workers/ingest.py index 02af8b5..6cf4ee2 100644 --- a/app/workers/ingest.py +++ b/app/workers/ingest.py @@ -1,3 +1,4 @@ +import asyncio """Ingestion worker task — processes a Source into chunks and vectors.""" from __future__ import annotations @@ -158,13 +159,30 @@ async def ingest_source(ctx: dict, source_id: str, tenant_id: str) -> dict: except Exception as exc: logger.exception("Ingestion failed for source %s", source_id) await session.rollback() - # Try to mark the source as errored - try: + urls = config.get("urls", []) + if not urls: + # Fallback to single URL for backward compatibility + url = config.get("url", "") + urls = [url] if url else [] + if not urls: + + async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: + tasks = [client.get(url, headers={"User-Agent": "MiniRAG/1.0"}) for url in urls] + responses = await asyncio.gather(*tasks, return_exceptions=True) + + content_parts = [] + for url, resp in zip(urls, responses): + if isinstance(resp, Exception): + logger.warning("Failed to fetch URL %s: %s", url, resp) + continue + try: + resp.raise_for_status() + content_parts.append(html_to_text(resp.text)) + except Exception as e: + logger.warning("Failed to process URL %s: %s", url, e) + + return "\n\n".join(content_parts) source = await _get_source(session, source_id, tenant_id) - if source: - await _mark_error(session, source, str(exc)[:2000]) - except Exception: - logger.exception("Failed to mark source %s as errored", source_id) # Dispatch webhook for failure (fire-and-forget) try: