Skip to content

Commit dbdcd06

Browse files
earayuclaude
andcommitted
feat(celery Wave 5 P4): production robustness 3-pack — cleanup transient/intentional split + parse_version short-circuit + reconciler stuck-parse re-enqueue
Wave 5 Phase 4 (PR-C / task #29) — closes 3 latent issues surfaced through Wave 4 ratify trail: * **P4-1 cleanup transient-vs-intentional split** (per architect msg=6aa8ca88 T2 ratify minor obs A): Pre-Wave-5 ``_resolve_cleanup_worker`` collapsed any factory exception into "drop the DB row". Transient infra errors (Qdrant blip, ES network glitch) lost the retry signal — DB row was deleted before next cycle could retry. Wave 5 P4 returns a new ``CleanupWorkerResolution(worker, transient)`` distinguishing the two: ``WorkerFactoryError`` is a by-design gate (drop the row to bound index growth), any other Exception is transient (skip DB row drop so next cycle retries when backend recovers). Counts surface ``transient_deferred`` so operators can track recovery rate. * **P4-2 parse_version short-circuit** (per huangheng T3 chunk 2 obs B + Wave 5 backlog item): Pre-Wave-5 ``parse_document`` always re-runs DocParser even when the resulting artifact directory is byte-identical (parse_version is content-derived). Wave 5 P4 adds ``short_circuit_if_artifacts_exist=True`` default — if all three derived artifacts (markdown.md / outline.json / chunks.jsonl) already exist under the canonical ``derived/parse_<version>/`` path, skip DocParser + writes entirely. Eliminates the ~30s OCR / Word rerun cost on rebuild of unchanged content. Tests can pass ``False`` to force re-parse. * **P4-3 reconciler stuck-document parse re-enqueue** (per architect Wave 4 T3 chunk 2 obs A — production gap close): Pre-Wave-5 parse failures (DocParser raise / source missing) silently dropped the document_id in the parse worker; operator saw ``document.status == PENDING`` forever with no signal to re-trigger. Wave 5 P4 adds ``reconcile_stuck_documents_for_parse_reenqueue`` to the reconciler loop. Detects documents with ``Document.gmt_created < now - cooldown_seconds`` AND zero ``document_index`` rows AND ``Document.gmt_updated < now - cooldown_seconds`` (cooldown filter prevents 30-s tick storm), then pushes a fresh ``ParseDispatchPayload`` matching the upload handler's contract. ``gmt_updated`` bumps after each push so the cooldown predicate rate-limits re-enqueue. Three-class tag (Wave 3 production-readiness invariant): * must-be-real: cleanup transient/intentional split prevents silent retry-signal loss; parse_version short-circuit eliminates OCR rerun waste; stuck-parse reconciler closes the document.status PENDING-forever gap * may-be-gated: ``short_circuit_if_artifacts_exist=False`` for tests pinning DocParser invocation count * fully-resolves: 3 of Wave 5 P1 backlog items (per architect ratify trail msg=6aa8ca88 + huangheng obs trail) Deltas: * ``aperag/indexing/cleanup.py`` — ``CleanupWorkerResolution`` dataclass + WorkerFactoryError-vs-Exception split in ``_resolve_cleanup_worker``; both ``cleanup_orphan_parse_versions`` + ``cleanup_for_deleted_documents`` honor ``resolution.transient`` to skip DB row drop on transient infra failures; collection cascade aggregates ``transient_deferred``. * ``aperag/indexing/parser.py`` — ``_all_artifacts_present`` predicate + ``short_circuit_if_artifacts_exist`` parameter + early-return ParseResult when all 3 artifacts present. * ``aperag/indexing/reconciler.py`` — ``STUCK_PARSE_COOLDOWN_SECONDS`` + ``reconcile_stuck_documents_for_parse_reenqueue`` async scan + ``_select_stuck_documents_for_reenqueue`` SQL query + ``_build_parse_payload_for_document`` payload reconstruction + ``_resolve_collection_parser_config`` / ``_resolve_collection_modalities`` (mirror ``document_service`` shape) + ``_mark_stuck_documents_reenqueued`` bumps gmt_updated; ``run_reconcile_loop`` calls the new scan. * ``aperag/indexing/__init__.py`` — re-export new symbols. * ``tests/integration/test_p4_robustness_3pack.py`` (new) — 13 tests across 3 layers (cleanup transient/intentional split / parse short-circuit / reconciler re-enqueue with cooldown + payload shape + skip-when-indexed + skip-when-no-object-path). * ``tests/unit_test/indexing/test_t3_1_dispatcher_path_c.py`` — added ``transient_deferred: 0`` to expected empty-counts dict. Local gates: * ``pytest tests/unit_test/indexing/ tests/integration/`` — 232 passed / 48 skipped (incl. 13 new ``test_p4_robustness_3pack``). * ``ruff check aperag/ tests/integration/test_p4_robustness_3pack.py`` — clean. * ``ruff format`` — applied. Branch: local ``chenyexuan/celery-wave5-p4`` based on main ``19d3d70`` (Wave 4 PR #1731 squash); will rebase onto ``bryce/celery-wave5`` once Bryce opens the Wave 5 draft PR. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent e070716 commit dbdcd06

6 files changed

Lines changed: 1014 additions & 49 deletions

File tree

aperag/indexing/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,11 @@
158158
HEARTBEAT_STALE_SECONDS,
159159
RECONCILE_BATCH_SIZE,
160160
RECONCILE_INTERVAL_SECONDS,
161+
STUCK_PARSE_COOLDOWN_SECONDS,
161162
reconcile_failed_retry,
162163
reconcile_pending_dispatch,
163164
reconcile_running_reclaim,
165+
reconcile_stuck_documents_for_parse_reenqueue,
164166
run_reconcile_loop,
165167
)
166168
from aperag.indexing.summary import (
@@ -276,13 +278,15 @@
276278
"process_one_parse_task",
277279
"run_parse_worker",
278280
"run_parse_worker_loop",
279-
# Reconciler (T2.1)
281+
# Reconciler (T2.1 + Wave 5 P4 stuck-parse re-enqueue)
280282
"RECONCILE_INTERVAL_SECONDS",
281283
"RECONCILE_BATCH_SIZE",
282284
"HEARTBEAT_STALE_SECONDS",
285+
"STUCK_PARSE_COOLDOWN_SECONDS",
283286
"reconcile_pending_dispatch",
284287
"reconcile_failed_retry",
285288
"reconcile_running_reclaim",
289+
"reconcile_stuck_documents_for_parse_reenqueue",
286290
"run_reconcile_loop",
287291
# Cleanup (T2.1 + T3.1 path C)
288292
"CLEANUP_INTERVAL_SECONDS",

aperag/indexing/cleanup.py

Lines changed: 94 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484

8585
import asyncio
8686
import logging
87+
from dataclasses import dataclass
8788
from datetime import datetime, timedelta, timezone
8889
from typing import Any, Awaitable, Callable, Mapping, Optional
8990

@@ -92,6 +93,7 @@
9293

9394
from aperag.indexing.base import ModalityWorker
9495
from aperag.indexing.models import DocumentIndex, Modality
96+
from aperag.indexing.worker_factory import WorkerFactoryError
9597

9698
logger = logging.getLogger(__name__)
9799

@@ -164,45 +166,92 @@ def _is_graph_worker(worker: ModalityWorker) -> bool:
164166
return getattr(worker, "modality", None) is Modality.GRAPH
165167

166168

169+
@dataclass(frozen=True)
170+
class CleanupWorkerResolution:
171+
"""Wave 5 P4 T2 — outcome of :func:`_resolve_cleanup_worker`.
172+
173+
Distinguishes the two failure modes that pre-Wave-5 collapsed into
174+
a single ``None`` return:
175+
176+
* **intentional gate** (``worker is None`` AND ``transient is False``)
177+
— :class:`WorkerFactoryError` raised because the modality is
178+
Wave-N-gated by design (graph extractor not wired / vision
179+
multimodal not configured), the operator deliberately disabled
180+
the modality via ``collection.config``, or the row's modality
181+
string is unknown. The cleanup loop should drop the DB row so
182+
the index does not grow unboundedly while the gate is active.
183+
184+
* **transient infrastructure error** (``worker is None`` AND
185+
``transient is True``) — DB connection blip / Qdrant
186+
unreachable / ES unhealthy / Redis network glitch. The cleanup
187+
loop must NOT drop the DB row — the next cycle (5 min later)
188+
retries automatically once the backend recovers. Pre-Wave-5
189+
this collapsed into the gate path and silently lost the retry
190+
signal.
191+
192+
* **resolved worker** (``worker is not None``, ``transient ignored``)
193+
— happy path; caller proceeds with backend cleanup.
194+
"""
195+
196+
worker: Optional[ModalityWorker]
197+
transient: bool
198+
199+
167200
async def _resolve_cleanup_worker(
168201
*,
169202
workers: Optional[Mapping[Modality, ModalityWorker]],
170203
worker_factory: Optional[WorkerFactoryForRow],
171204
row: DocumentIndex,
172-
) -> Optional[ModalityWorker]:
205+
) -> CleanupWorkerResolution:
173206
"""Resolve the cleanup worker for a row from factory or static map.
174207
175-
Wave 4 T2: production wires the factory (per-(collection, modality)
176-
lazy materialisation); tests typically pass a pre-built ``workers``
177-
mapping. When both are provided the factory wins — production
178-
deployments override the legacy mapping with the per-row factory.
179-
180-
Returns ``None`` when neither source can supply a worker; the
181-
caller logs + counts the row as ``backend_skipped``. A factory
182-
that raises :class:`WorkerFactoryError` (Wave 4 vision multimodal
183-
gate / partial config) also returns ``None`` so the cleanup cycle
184-
drops the DB row even when the backend is unreachable, matching
185-
the existing "skip backend, drop row" semantics.
208+
Wave 4 T2 + Wave 5 P4 (transient-vs-intentional split): production
209+
wires the factory (per-(collection, modality) lazy materialisation);
210+
tests typically pass a pre-built ``workers`` mapping. When both
211+
are provided the factory wins — production deployments override
212+
the legacy mapping with the per-row factory.
213+
214+
Returns a :class:`CleanupWorkerResolution`:
215+
216+
* ``worker is not None``: backend cleanup proceeds.
217+
* ``worker is None, transient=False``: intentional gate / unknown
218+
modality / no source — caller drops DB row.
219+
* ``worker is None, transient=True``: transient infrastructure
220+
error — caller skips DB row drop so next cycle retries.
221+
222+
Pre-Wave-5 this returned ``None`` for both failure modes,
223+
causing transient errors to silently lose their retry signal.
186224
"""
187225
if worker_factory is not None:
188226
try:
189-
return await worker_factory(row)
190-
except Exception as exc: # noqa: BLE001 — surface via log, count as skip
227+
return CleanupWorkerResolution(worker=await worker_factory(row), transient=False)
228+
except WorkerFactoryError as exc:
191229
logger.warning(
192-
"cleanup worker_factory failed modality=%s row id=%d collection=%s: %s — counting as backend_skipped",
230+
"cleanup worker_factory gate raised modality=%s row id=%d collection=%s: %s — "
231+
"counting as backend_skipped (intentional gate, dropping DB row)",
193232
row.modality,
194233
row.id,
195234
row.collection_id,
196235
exc,
197236
)
198-
return None
237+
return CleanupWorkerResolution(worker=None, transient=False)
238+
except Exception as exc: # noqa: BLE001 — transient infra error, retry next cycle
239+
logger.warning(
240+
"cleanup worker_factory transient failure modality=%s row id=%d collection=%s: %s — "
241+
"skipping DB row drop, will retry next cycle",
242+
row.modality,
243+
row.id,
244+
row.collection_id,
245+
exc,
246+
)
247+
return CleanupWorkerResolution(worker=None, transient=True)
199248
if workers is None:
200-
return None
249+
return CleanupWorkerResolution(worker=None, transient=False)
201250
try:
202251
modality = Modality(row.modality)
203252
except ValueError:
204-
return None
205-
return workers.get(modality)
253+
return CleanupWorkerResolution(worker=None, transient=False)
254+
return CleanupWorkerResolution(worker=workers.get(modality), transient=False)
206255

207256

208257
# ---------------------------------------------------------------------
@@ -301,6 +350,7 @@ async def cleanup_orphan_parse_versions(
301350
"rows_deleted": 0,
302351
"graph_noop": 0,
303352
"backend_skipped": 0,
353+
"transient_deferred": 0,
304354
}
305355

306356
delete_ids: list[int] = []
@@ -315,11 +365,18 @@ async def cleanup_orphan_parse_versions(
315365
)
316366
continue
317367

318-
worker = await _resolve_cleanup_worker(
368+
resolution = await _resolve_cleanup_worker(
319369
workers=workers,
320370
worker_factory=worker_factory,
321371
row=row,
322372
)
373+
if resolution.transient:
374+
# Wave 5 P4: transient infra error — skip both backend
375+
# delete AND DB row drop so the next cleanup cycle (5 min
376+
# later) retries automatically once the backend recovers.
377+
counts["transient_deferred"] += 1
378+
continue
379+
worker = resolution.worker
323380
if worker is None:
324381
logger.warning(
325382
"cleanup no worker registered for modality=%s row id=%d — skipping backend delete",
@@ -424,6 +481,7 @@ async def cleanup_for_deleted_documents(
424481
"graph_lineage_cleaned": 0,
425482
"rows_deleted": 0,
426483
"backend_skipped": 0,
484+
"transient_deferred": 0,
427485
}
428486

429487
# Per-document, per-modality dedup so graph lineage cleanup runs
@@ -444,11 +502,18 @@ async def cleanup_for_deleted_documents(
444502
)
445503
continue
446504

447-
worker = await _resolve_cleanup_worker(
505+
resolution = await _resolve_cleanup_worker(
448506
workers=workers,
449507
worker_factory=worker_factory,
450508
row=row,
451509
)
510+
if resolution.transient:
511+
# Wave 5 P4: transient infra error — skip backend delete
512+
# AND DB row drop so the caller can retry on a later cycle
513+
# / re-invocation once the backend recovers.
514+
counts["transient_deferred"] += 1
515+
continue
516+
worker = resolution.worker
452517
if worker is None:
453518
logger.warning(
454519
"cleanup no worker for modality=%s row id=%d document=%s — skipping backend",
@@ -623,6 +688,7 @@ async def cleanup_for_deleted_collections(
623688
"graph_lineage_cleaned": 0,
624689
"rows_deleted": 0,
625690
"backend_skipped": 0,
691+
"transient_deferred": 0,
626692
"collections_cleaned": 0,
627693
}
628694
if not collection_ids:
@@ -641,7 +707,13 @@ async def cleanup_for_deleted_collections(
641707
worker_factory=worker_factory,
642708
document_ids=document_ids,
643709
)
644-
for key in ("backend_deleted", "graph_lineage_cleaned", "rows_deleted", "backend_skipped"):
710+
for key in (
711+
"backend_deleted",
712+
"graph_lineage_cleaned",
713+
"rows_deleted",
714+
"backend_skipped",
715+
"transient_deferred",
716+
):
645717
counts[key] += sub_counts[key]
646718

647719
# Sweep any rows that path B did not catch (no document_id match

aperag/indexing/parser.py

Lines changed: 79 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,29 @@ def _document_md5(source_bytes: bytes) -> str:
329329
return hashlib.md5(source_bytes).hexdigest()
330330

331331

332+
def _all_artifacts_present(
333+
*,
334+
store: _SyncObjectStore,
335+
markdown_path: str,
336+
outline_path: str,
337+
chunks_path: str,
338+
) -> bool:
339+
"""Wave 5 P4 short-circuit predicate: all three canonical
340+
derived artifacts must exist for the cached parse to be valid.
341+
342+
Uses ``ObjectStore.obj_exists`` (cheap metadata check) rather
343+
than ``read_or_none`` so the predicate stays cost-bounded for
344+
every parse call. ``chunks.jsonl`` is checked last because it is
345+
the only artifact downstream modality workers actually read; if
346+
it is missing the previous parse was interrupted mid-write and
347+
re-parsing is required regardless.
348+
"""
349+
try:
350+
return store.obj_exists(markdown_path) and store.obj_exists(outline_path) and store.obj_exists(chunks_path)
351+
except Exception: # noqa: BLE001 — predicate fails closed (re-parse)
352+
return False
353+
354+
332355
def _docparser_extract_markdown(
333356
*,
334357
source_bytes: bytes,
@@ -398,6 +421,7 @@ def parse_document(
398421
source_filename: str | None = None,
399422
parser_config: dict[str, Any] | None = None,
400423
config: ParseConfig | None = None,
424+
short_circuit_if_artifacts_exist: bool = True,
401425
) -> ParseResult:
402426
"""Parse the source bytes and persist the three shared artifacts.
403427
@@ -406,6 +430,16 @@ def parse_document(
406430
them atomically). The artifact paths are the canonical
407431
``derived/parse_<version>/`` layout per design pack §C.1.
408432
433+
Wave 5 P4 short-circuit: when ``short_circuit_if_artifacts_exist``
434+
is True (default) and all three derived artifacts (``markdown.md``
435+
/ ``outline.json`` / ``chunks.jsonl``) already exist in the object
436+
store under the canonical ``derived/parse_<version>/`` path, the
437+
parser **skips DocParser + writes entirely** and returns the
438+
existing :class:`ParseResult`. This eliminates the ~30s OCR / Word
439+
rerun cost when a document is re-uploaded with identical content
440+
or a rebuild is dispatched against an already-parsed version
441+
(per huangheng T3 chunk 2 obs B + architect Wave 5 P4 lock).
442+
409443
Dispatch (Wave 4 T3 chunk 1):
410444
- ``source_filename`` ends in a known text extension (``.md`` /
411445
``.markdown`` / ``.txt`` / ``.text``) **or** is ``None`` →
@@ -433,6 +467,12 @@ def parse_document(
433467
to the real parser chain. Ignored on the simulator path.
434468
config: Parsing knobs that influence the parse_version. Pass
435469
``None`` to use simulator defaults.
470+
short_circuit_if_artifacts_exist: When True (default), reuse
471+
existing canonical artifacts if all three are already in
472+
the object store under the resolved
473+
``derived/parse_<version>/`` path. Pass ``False`` to force
474+
a re-parse + re-write (used by tests pinning DocParser
475+
invocation count).
436476
"""
437477
from aperag.mcp.tools.parse_version import compute_parse_version
438478

@@ -446,6 +486,45 @@ def parse_document(
446486
chunking_config=cfg.chunking.serialize(),
447487
)
448488

489+
markdown_path = derived_artifact(
490+
collection_id=collection_id,
491+
document_id=document_id,
492+
parse_version=parse_version,
493+
filename="markdown.md",
494+
)
495+
outline_path = derived_artifact(
496+
collection_id=collection_id,
497+
document_id=document_id,
498+
parse_version=parse_version,
499+
filename="outline.json",
500+
)
501+
chunks_path = derived_artifact(
502+
collection_id=collection_id,
503+
document_id=document_id,
504+
parse_version=parse_version,
505+
filename="chunks.jsonl",
506+
)
507+
508+
if short_circuit_if_artifacts_exist and _all_artifacts_present(
509+
store=store,
510+
markdown_path=markdown_path,
511+
outline_path=outline_path,
512+
chunks_path=chunks_path,
513+
):
514+
logger.info(
515+
"indexing parser short-circuit collection=%s document=%s parse_version=%s "
516+
"(all derived artifacts already present; skipping DocParser + writes)",
517+
collection_id,
518+
document_id,
519+
parse_version,
520+
)
521+
return ParseResult(
522+
parse_version=parse_version,
523+
markdown_path=markdown_path,
524+
outline_path=outline_path,
525+
chunks_path=chunks_path,
526+
)
527+
449528
if extension is None or extension in _SIMULATOR_EXTENSIONS:
450529
try:
451530
markdown = source_bytes.decode("utf-8")
@@ -470,25 +549,6 @@ def parse_document(
470549
outline = _build_outline(markdown)
471550
chunks = _split_chunks(markdown, cfg.chunking)
472551

473-
markdown_path = derived_artifact(
474-
collection_id=collection_id,
475-
document_id=document_id,
476-
parse_version=parse_version,
477-
filename="markdown.md",
478-
)
479-
outline_path = derived_artifact(
480-
collection_id=collection_id,
481-
document_id=document_id,
482-
parse_version=parse_version,
483-
filename="outline.json",
484-
)
485-
chunks_path = derived_artifact(
486-
collection_id=collection_id,
487-
document_id=document_id,
488-
parse_version=parse_version,
489-
filename="chunks.jsonl",
490-
)
491-
492552
write_atomic(store, markdown_path, markdown.encode("utf-8"))
493553
write_atomic(
494554
store,

0 commit comments

Comments
 (0)