Skip to content

Commit 1de82ca

Browse files
authored
fix: batch limits and failed task reque limit (#2484)
1 parent 8f7742c commit 1de82ca

7 files changed

Lines changed: 449 additions & 7 deletions

File tree

application/api/user/idempotency.py

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import annotations
44

55
import functools
6+
import inspect
67
import logging
78
import threading
89
import uuid
@@ -26,21 +27,35 @@
2627
LEASE_RETRY_MAX = 10
2728

2829

29-
def with_idempotency(task_name: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
30+
def with_idempotency(
31+
task_name: str,
32+
*,
33+
on_poison: Optional[Callable[[str, dict], None]] = None,
34+
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
3035
"""Short-circuit on completed key; gate concurrent runs via a lease.
3136
37+
The guard key is the caller's ``idempotency_key``, or one synthesized
38+
from ``source_id`` so a keyless dispatch is still poison-guarded.
39+
3240
Entry short-circuits:
3341
- completed row → return cached result
3442
- live lease held → retry(countdown=LEASE_TTL_SECONDS)
35-
- attempt_count > MAX_TASK_ATTEMPTS → poison-loop alert
43+
- attempt_count > MAX_TASK_ATTEMPTS → poison alert; ``on_poison`` fires
3644
Success writes ``completed``; exceptions leave ``pending`` for
3745
autoretry until the poison-loop guard trips.
3846
"""
3947

4048
def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
4149
@functools.wraps(fn)
4250
def wrapper(self, *args: Any, idempotency_key: Any = None, **kwargs: Any) -> Any:
43-
key = idempotency_key if isinstance(idempotency_key, str) and idempotency_key else None
51+
explicit_key = (
52+
idempotency_key
53+
if isinstance(idempotency_key, str) and idempotency_key
54+
else None
55+
)
56+
# A keyless dispatch still gets the guard via a synthesized key;
57+
# None means no anchor exists — run unguarded, as before.
58+
key = explicit_key or _synthesize_guard_key(task_name, kwargs)
4459
if key is None:
4560
return fn(self, *args, idempotency_key=idempotency_key, **kwargs)
4661

@@ -88,6 +103,9 @@ def wrapper(self, *args: Any, idempotency_key: Any = None, **kwargs: Any) -> Any
88103
"attempts": attempt,
89104
}
90105
_finalize(key, poisoned, status="failed")
106+
_run_poison_hook(
107+
on_poison, task_name, fn, self, args, kwargs, idempotency_key,
108+
)
91109
return poisoned
92110

93111
heartbeat_thread, heartbeat_stop = _start_lease_heartbeat(
@@ -109,6 +127,45 @@ def wrapper(self, *args: Any, idempotency_key: Any = None, **kwargs: Any) -> Any
109127
return decorator
110128

111129

130+
def _synthesize_guard_key(task_name: str, kwargs: dict) -> Optional[str]:
131+
"""Derive a deterministic guard key from ``source_id`` for a keyless dispatch.
132+
133+
``source_id`` is stable across broker redeliveries and unique per
134+
upload, so the poison-loop counter survives an OOM SIGKILL. Returns
135+
``None`` when absent — the dispatch then runs unguarded as before.
136+
"""
137+
source_id = kwargs.get("source_id")
138+
if source_id:
139+
return f"auto:{task_name}:{source_id}"
140+
return None
141+
142+
143+
def _run_poison_hook(
144+
on_poison: Optional[Callable[[str, dict], None]],
145+
task_name: str,
146+
fn: Callable[..., Any],
147+
task_self: Any,
148+
args: tuple,
149+
kwargs: dict,
150+
idempotency_key: Any,
151+
) -> None:
152+
"""Invoke a task's poison-path hook with named call args; swallow failures.
153+
154+
A hook failure must never change the poison-guard outcome.
155+
"""
156+
if on_poison is None:
157+
return
158+
try:
159+
bound = inspect.signature(fn).bind_partial(
160+
task_self, *args, idempotency_key=idempotency_key, **kwargs,
161+
)
162+
on_poison(task_name, dict(bound.arguments))
163+
except Exception:
164+
logger.exception(
165+
"idempotency: poison hook failed for task=%s", task_name,
166+
)
167+
168+
112169
def _lookup_completed(key: str) -> Any:
113170
"""Return cached ``result_json`` if a completed row exists for ``key``, else None."""
114171
with db_readonly() as conn:

application/api/user/tasks.py

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,42 @@
2727
)
2828

2929

30+
# operation tag for the poison-path source.ingest.failed event, per task.
31+
_INGEST_POISON_OPERATION = {
32+
"ingest": "upload",
33+
"ingest_remote": "upload",
34+
"ingest_connector_task": "upload",
35+
"reingest_source_task": "reingest",
36+
}
37+
38+
39+
def _emit_ingest_poison_event(task_name, bound):
40+
"""Publish a terminal ``source.ingest.failed`` when the poison-guard trips.
41+
42+
The guard returns before the worker runs, so the worker's own failed
43+
event never fires — without this the upload toast spins on "training".
44+
"""
45+
user = bound.get("user")
46+
source_id = bound.get("source_id")
47+
if not user or not source_id:
48+
return
49+
from application.events.publisher import publish_user_event
50+
51+
publish_user_event(
52+
user,
53+
"source.ingest.failed",
54+
{
55+
"source_id": str(source_id),
56+
"filename": bound.get("filename") or "",
57+
"operation": _INGEST_POISON_OPERATION.get(task_name, "upload"),
58+
"error": "Ingestion stopped after repeated failures.",
59+
},
60+
scope={"kind": "source", "id": str(source_id)},
61+
)
62+
63+
3064
@celery.task(**DURABLE_TASK)
31-
@with_idempotency(task_name="ingest")
65+
@with_idempotency(task_name="ingest", on_poison=_emit_ingest_poison_event)
3266
def ingest(
3367
self,
3468
directory,
@@ -57,7 +91,7 @@ def ingest(
5791

5892

5993
@celery.task(**DURABLE_TASK)
60-
@with_idempotency(task_name="ingest_remote")
94+
@with_idempotency(task_name="ingest_remote", on_poison=_emit_ingest_poison_event)
6195
def ingest_remote(
6296
self, source_data, job_name, user, loader,
6397
idempotency_key=None, source_id=None,
@@ -71,7 +105,9 @@ def ingest_remote(
71105

72106

73107
@celery.task(**DURABLE_TASK)
74-
@with_idempotency(task_name="reingest_source_task")
108+
@with_idempotency(
109+
task_name="reingest_source_task", on_poison=_emit_ingest_poison_event,
110+
)
75111
def reingest_source_task(self, source_id, user, idempotency_key=None):
76112
from application.worker import reingest_source_worker
77113

@@ -128,7 +164,9 @@ def process_agent_webhook(self, agent_id, payload, idempotency_key=None):
128164

129165

130166
@celery.task(**DURABLE_TASK)
131-
@with_idempotency(task_name="ingest_connector_task")
167+
@with_idempotency(
168+
task_name="ingest_connector_task", on_poison=_emit_ingest_poison_event,
169+
)
132170
def ingest_connector_task(
133171
self,
134172
job_name,

application/core/settings.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ class Settings(BaseSettings):
6666
PARSE_IMAGE_REMOTE: bool = False
6767
DOCLING_OCR_ENABLED: bool = False # Enable OCR for docling parsers (PDF, images)
6868
DOCLING_OCR_ATTACHMENTS_ENABLED: bool = False # Enable OCR for docling when parsing attachments
69+
# Pages docling's threaded pipeline buffers in flight; the library
70+
# default (100) drives worker RSS to ~3 GB on a mid-size PDF.
71+
DOCLING_PIPELINE_QUEUE_MAX_SIZE: int = 2
6972
VECTOR_STORE: str = "faiss" # "faiss" or "elasticsearch" or "qdrant" or "milvus" or "lancedb" or "pgvector"
7073
RETRIEVERS_ENABLED: list = ["classic_rag"]
7174
AGENT_NAME: str = "classic"

application/parser/file/docling_parser.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,29 @@
1616
logger = logging.getLogger(__name__)
1717

1818

19+
# Per-stage batch size for docling's threaded pipeline; 1 holds the
20+
# concurrent working set to a single page (see _apply_pipeline_caps).
21+
_PIPELINE_BATCH_SIZE = 1
22+
23+
24+
def _apply_pipeline_caps(pipeline_options) -> None:
25+
"""Cap docling's threaded-pipeline queue depth and batch sizes in place.
26+
27+
hasattr-guarded so docling builds without these knobs are unaffected.
28+
"""
29+
from application.core.settings import settings
30+
31+
caps = {
32+
"queue_max_size": max(1, settings.DOCLING_PIPELINE_QUEUE_MAX_SIZE),
33+
"layout_batch_size": _PIPELINE_BATCH_SIZE,
34+
"table_batch_size": _PIPELINE_BATCH_SIZE,
35+
"ocr_batch_size": _PIPELINE_BATCH_SIZE,
36+
}
37+
for name, value in caps.items():
38+
if hasattr(pipeline_options, name):
39+
setattr(pipeline_options, name, value)
40+
41+
1942
class DoclingParser(BaseParser):
2043
"""Parser using docling for advanced document processing.
2144
@@ -86,6 +109,7 @@ def _create_converter(self):
86109
do_ocr=self.ocr_enabled,
87110
do_table_structure=self.table_structure,
88111
)
112+
_apply_pipeline_caps(pipeline_options)
89113

90114
if self.ocr_enabled:
91115
ocr_options = self._get_ocr_options()

0 commit comments

Comments
 (0)