Skip to content

Commit 00ae644

Browse files
earayuclaude
andauthored
feat(celery Wave 3): T3.1 hard-cut legacy Celery indexing layer (#1729)
* feat(celery T3.1 commit 1/5): alembic migration — drop legacy + ALTER NOT NULL + rename to canonical Wave 3 hard-cut schema migration per architect msg=4a801b2b (Wave 1 Bug 2 ruling that locked the temporary v2 suffix) + msg=498b12f0 (Wave 2 informational item ruling that promoted dispatch columns NOT NULL in Wave 3) + PM acceptance msg=5939e394 item 1. Migration revision d0f4c1b9a8e2 chains off c2e8d5a1f3b9 and: 1. DROP TABLE document_index CASCADE — the legacy Celery-era table that lived alongside the Wave 1 v2 table during the transition. Pre-launch + no callers in Wave 3 (the dependent code is hard- deleted in subsequent commits of this same PR). 2. ALTER COLUMN collection_id, source_path → NOT NULL on document_index_v2. Wave 1 fixtures used NULL for back-compat; Wave 3 orchestrator + reconciler always populate them (per architect msg=498b12f0 Lock). 3. Rename every index *_v2_* → *_*. The partial-unique uniq_document_index_v2_serving is dropped + re-created (PG ALTER INDEX RENAME does not regenerate the WHERE predicate symbol map per Postgres quirk; SQLite would silently keep the old reference). 4. RENAME TABLE document_index_v2 → document_index — back to the §F.1 canonical name (architect msg=4a801b2b lock). The downgrade reverses every step in mirror order so a rollback can replay subsequent migrations cleanly. The recreated legacy ``document_index`` table on downgrade is intentionally schema-less (only the id PK column) because the legacy class was deleted in the Wave 3 PR alongside this migration — operators rolling back past this point must restore the legacy ORM file before re-running upgrades. There is no production scenario for that. This is commit 1/5 of T3.1; subsequent commits land the FastAPI wire-in, knowledge_base/tasks.py Pattern A/B/C migration of the 6 remaining Celery tasks, the 9 production caller migrations, and the legacy file-layer hard-delete + audit allowlist removal + pyproject Celery/kombu dep removal. Design pack §F.1 + §F.5 amends (per architect msg=498b12f0 + msg=3890c9d7 path C ruling) are deferred to a follow-up commit once PR #1725 (which owns docs/modularization/indexing-redesign- design-pack.md) merges — flagged in the channel. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery T3.1 commit 2/5): dispatcher.py + cleanup path C (collection-deletion cascade) Wave 3 wire-in helpers per architect msg=268f9022 (Wave 3 spec) + msg=3890c9d7 (Pattern A path C ruling). Adds the upload-side dispatcher + the cleanup worker's third path so commits 3-5 can wire FastAPI + migrate the 6 knowledge_base/tasks.py Celery tasks without inventing new abstractions. aperag/indexing/dispatcher.py (301 LOC, NEW): - DispatchRequest dataclass — collection_id / document_id / parse_version / source_path / tenant_scope_key / modalities tuple - IndexingMode enum — ASYNC (queue + worker pool) / INLINE (synchronous derive + sync per modality, for tier-1 private deployments per design pack §L) - dispatch_indexing() async helper — INSERTs N PENDING rows in one transaction (collection_id + source_path + tenant_scope_key are populated per the design pack §F.1 amended NOT NULL columns) + finalizes per mode (queue.push for ASYNC; process_one_task call for INLINE) - modalities_for_collection() helper — maps per-modality enable flags to a canonical-order modality tuple, useful for HTTP handlers - Fail-fast on missing dependency: raises ValueError if mode=ASYNC with no queue, or mode=INLINE with empty workers (catches config bugs at the HTTP boundary, not mid-INSERT) aperag/indexing/cleanup.py (extended +131 LOC): - New "Path C" cleanup_for_deleted_collections() per architect msg=3890c9d7 Pattern A. Three-step cascade: 1. Find all distinct document_ids in document_index referencing each deleted collection_id 2. Cascade to path B (cleanup_for_deleted_documents) for those documents — that path already handles modality fan-out (graph lineage cleanup vs flat backend delete) 3. Sweep any remaining document_index rows by collection_id (covers the edge case where a row was orphaned earlier or the collection had rows queued before any document indexed) - Idempotent: a partial cascade that crashes mid-way is resumed on the next call (Pattern B reconciler scan that sweeps tombstoned collections) - Counts dict adds collections_cleaned key - Module docstring rewritten to describe THREE paths (was TWO) aperag/indexing/__init__.py: - Re-exports cleanup_for_deleted_collections + 6 dispatcher symbols (DispatchRequest, IndexingMode, DEFAULT_MODALITIES, dispatch_indexing, modalities_for_collection, all_modalities) tests/unit_test/indexing/test_t3_1_dispatcher_path_c.py (8 cases): - dispatcher_async: INSERTs N rows + pushes payloads to per-modality queue + leaves DB rows PENDING with correct scoping fields - dispatcher_async_requires_queue: fail-fast on None queue - dispatcher_inline: INSERTs + invokes process_one_task → row ends ACTIVE + is_serving=TRUE in one TX (§F.3) - dispatcher_inline_requires_workers: fail-fast on empty workers - modalities_for_collection: canonical order + subset selection - path_c_cascades_via_path_b: 3 collection rows (2 doc + 1 ghost) → 3 backend deletes + 3 row deletes; other-collection row untouched - path_c_handles_empty_input: counts dict zeroed - path_c_idempotent_on_re_run: second call returns rows_deleted=0 Local pytest: tests/unit_test/indexing/test_t3_1_dispatcher_path_c.py 8/8 passed. Lint + format clean across new + extended files. Note: this commit does not yet wire dispatcher into the FastAPI app (commit 3) or migrate the 6 knowledge_base/tasks.py Celery tasks per Pattern A/B/C (commit 4). Bryce can now start T3.2 + T3.3 on top of this branch — the dispatcher shape is the stable API both lanes depend on (T3.3 inline mode reuses dispatch_indexing(mode=INLINE) unchanged; T3.2 search API does not depend on dispatcher). Branch is rebased on main HEAD f370dc6 (PR #1725 design pack merged, so subsequent commits can amend §F.1 / §F.5 directly if any new spec drift surfaces during implementation). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery T3.2 + T3.3): SearchResultMetadata §G.5 + private-deploy + INDEXING_MODE=inline smoke Per docs/modularization/indexing-redesign-design-pack.md §G.5 + §L + architect msg=268f9022 (Wave 3 spec) + msg=3890c9d7 (path-C ruling) + msg=c685f83e (PR #1725 §F.1/§F.5 amendments merged). Two Wave 3 lanes shipped together because they share no production- code surface with chenyexuan T3.1 commits 1-2 (this commit's diff is purely additive: 1 new helper + 1 schema extension + 1 docs file + 2 test files): # T3.2 — SearchResultMetadata §G.5 extension aperag/domains/retrieval/schemas.py: * New typing aliases ``IndexerModality`` (vector/fulltext/graph/ summary/vision) + ``IndexStateValue`` (ACTIVE/FAILED/NOT_ENABLED/ INDEXING). * Three new optional fields on SearchResultMetadata: ``parse_version``, ``index_modality``, ``index_state_per_modality``. ``extra="forbid"`` config preserved — the §G.5 additions widen the allowlist by exactly three entries; a typo / future shadow field still fails Pydantic validation loudly. * ``modality`` (D10.h-locked content shape: text/image) kept as-is. The §G.5 spec uses bare ``modality`` for the indexer modality, but the existing public surface already binds that name to content shape; renaming would break D10.h. We chose ``index_modality`` for the indexer modality to disambiguate at the schema level. (Spec narrative §G.5 may want a follow-up to use the same name; not blocking.) * ``from_raw()`` extracts the three new fields from upstream raw metadata, with shallow validation that drops malformed entries (unknown keys / non-string values) before they leak to clients. Accepts both ``index_modality`` and the legacy ``indexer`` key for backward compat with vector/fulltext/graph indexers that haven't been rewired. aperag/indexing/index_state.py (NEW, 165 lines): * Pure-read helper ``query_index_state_for_documents(engine, collection_id, document_ids)`` returning the ``{document_id: {modality: state}}`` shape SearchResultMetadata expects. Single batched read against ``document_index`` so the search pipeline can hydrate metadata for an entire result page in one DB round-trip rather than N+1. * Translation contract pinned: ``status=ACTIVE AND is_serving=TRUE`` → ``ACTIVE``; ``status=FAILED`` → ``FAILED``; everything else (PENDING / RUNNING / ACTIVE-but-not-serving §F.3 cutover transit) → ``INDEXING``; missing row → ``NOT_ENABLED``. Per §F.4 the cutover transit window reads as INDEXING for client purposes. * Dense result map: every document_id key always carries every modality. Stable shape so clients don't have to reason about "field missing means what?". * Module-local re-declaration of ``IndexStateValue`` so ``aperag.indexing`` does not import from ``aperag.domains.retrieval`` (dependency runs in the other direction). Two literals MUST stay in sync. tests/unit_test/indexing/test_t3_2_index_state.py (NEW, 20 cases): * Schema validation: §G.5 fields accepted / extra="forbid" still rejects unknown / IndexerModality + IndexStateValue Literals reject unknown values. * from_raw extraction: §G.5 fields populated / legacy ``indexer`` key fallback / malformed entries dropped silently / D10.h-locked fields unchanged / empty input returns None. * DB helper: empty-input fast path / dense NOT_ENABLED for un-enqueued docs / ACTIVE+serving → ACTIVE / ACTIVE-but-not- serving → INDEXING (§F.3 cutover transit) / PENDING + RUNNING → INDEXING / FAILED → FAILED / per-collection_id filtering / serving row wins over PENDING sibling under §F.3 cutover model / per-modality independence under partial failures. # T3.3 — private deployment docs + INDEXING_MODE=inline smoke docs/private-deployment.md (NEW, 249 lines): * §L Tier 1 / Tier 2 / Tier 3 deployment guide for operators. * Highlights "deploy and forget" mechanisms — every resource that would rot has a corresponding self-heal (§F.5 Path A/B/C, §I.2 retry, §H.5 quota fallback, §C.7 atomic write). * Tier 1: ``pip install aperag && aperag serve`` with SQLite + LocalFS + ``INDEXING_MODE=inline``; no Redis, no separate worker. * Tier 2: docker-compose with PostgreSQL + Redis + MinIO + 5 modality workers + reconciler + cleanup loop; standard customer install on a single VM. * Tier 3: Tier 2 spread across multiple VMs sharing Redis + DB + S3-compatible store. No code change between tiers. * §J.1 SLI table for operators wiring OTLP collectors. * "When to escalate" section: which signals indicate the steady- state self-heal is not converging. tests/integration/test_inline_mode_smoke.py (NEW, 2 cases): * End-to-end smoke for ``IndexingMode.INLINE`` — parse → dispatch → every requested modality at status=ACTIVE + is_serving=TRUE, driven synchronously through chenyexuan T3.1 dispatcher ``9aef2a7``. No Redis, no queue, no separate worker process. * Vision intentionally excluded from the multi-modality smoke because vision derive consumes a JSON list of image records (not chunks.jsonl) and the dispatcher takes a single source_path; the per-modality source_path resolution is the FastAPI lifespan layer's job (chenyexuan T3.1 commit 3, out of scope for T3.3). * Subset-modality test: ``DispatchRequest.modalities`` lets a Tier 1 deploy turn off expensive modalities (e.g., no GPU → skip vision) and only the requested rows finalise. * Stays in default PR-gate suite (no @pytest.mark.slow) since in-memory backends finish in ~1 s. # §G hard-gate self-audit * #1 contract shape: 5 net-new files + schemas.py +93 lines (allowlist widening only). No existing API surface narrowed; the D10.h-locked content modality field is preserved. * #4 caller migration: search pipeline integration is intentionally deferred to chenyexuan T3.1 commit 3 (FastAPI lifespan + caller migration); the read helper in this commit is the seam that pipeline.py will call once wire-in lands. * #5 cross-stack: write set strictly disjoint from chenyexuan T3.1 commits 1-2 (alembic + dispatcher.py + cleanup.py); chenyexuan commit 3-5 changes orchestrator/reconciler/FastAPI app/legacy deletes — also disjoint from this commit's writes. # Lint + tests * ``uvx ruff check + ruff format --check`` across aperag/ + tests/ clean. * ``pytest tests/unit_test/indexing/ tests/integration/ test_inline_mode_smoke.py tests/load/ tests/unit_test/ test_phase3_reexport_audit.py`` → 136 passed, 0 failed (84 Wave 1+2 + 8 T3.1 dispatcher path-c + 20 new T3.2 + 2 new T3.3 + 2 load + 2 phase3 audit). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery T3.1 commit 3/5): Config.INDEXING_MODE + FastAPI lifespan wire-in for indexing runtime Wave 3 wire-in step per architect msg=268f9022 §K T3.1 spec item 4. Adds the runtime entry point that launches the per-modality worker pool + reconciler + cleanup loop on app startup when ``INDEXING_MODE=async`` (default), and the in-process ``WorkQueue`` + ``Engine`` references that future request-handler dispatchers will import via ``app.state``. aperag/config.py: - Add ``Config.indexing_mode: str = Field("async", alias="INDEXING_MODE")``. Two values per design pack §L: * "async" → orchestrator + reconciler + cleanup loops launched at app startup; upload handlers RPUSH to per-modality queue; workers BLPOP and process. Production / tier-2/3. * "inline" → upload handlers call ``dispatch_indexing(mode=INLINE)`` which runs derive + sync + cutover synchronously within the request coroutine; no worker pool, no Redis. Tier-1 single-process private deployments. aperag/app.py: - Extend ``combined_lifespan`` to launch the indexing runtime under ``settings.indexing_mode == "async"``: * 5 per-modality worker tasks (run_vector / run_fulltext / run_graph / run_summary / run_vision) * 1 reconciler loop task (run_reconcile_loop) * 1 cleanup loop task (run_cleanup_loop) All as ``asyncio.create_task()`` background tasks owned by the FastAPI process — matches the §E.2 "one Python process per modality" architecture for the in-process deployment topology. Tier-3 horizontal scale-out runs separate worker processes; that wiring lives in a future ops launcher (out of T3.1 scope). - Single process-local ``InMemoryWorkQueue`` is the default transport. Tier-3 production swaps for a Redis-backed ``WorkQueue`` (RPUSH / BLPOP) by injecting via ``app.state`` at deploy time — Wave 3 follow-up. - Stash ``app.state.indexing_queue`` + ``app.state.indexing_engine`` for upload-side dispatchers to reach (commit 4 wire-in target). - Worker registry passed to cleanup loop is empty by default; T3.3 follow-up wires concrete production backends per modality. The cleanup loop tolerates an empty registry (path A logs warning + skips backend delete; row still GC'd from DB). - ``_placeholder_worker_factory`` raises NotImplementedError on invocation — T3.1 ships the queue-side scaffolding (commits 4-5 wire concrete factories per modality). The orchestrator's per-task BLPOP loop only invokes the factory when a payload is popped; until commit 4 wires the upload path nothing pushes, so the placeholder is never reached at runtime. - Shutdown drain: on lifespan exit, set ``shutdown`` event + ``await asyncio.gather`` all 7 background tasks with ``return_exceptions=True`` so a SIGTERM does not abort mid-task. Test impact: - Existing 136 indexing + load + Phase 3 audit tests still pass (lifespan code is opt-in via env var; no test imports it). - Commit 4 (upload-route migration to dispatch_indexing) and commit 5 (hard-delete legacy + concrete backend factories) build on this. Bryce's vision-modality smoke (deferred at T3.3 commit 53257881 because per-modality source path resolution = lifespan-layer concern) is now unblocked: ``app.state.indexing_queue`` is the seam through which a follow-up smoke can wire concrete VisionModality with the correct synthetic source_path per dispatch. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test(celery T3.3 follow-up): add vision-only inline mode smoke Per chenyexuan msg=164efd52 / msg=f70d1288 + architect msg=7fd8f348 post chenyexuan T3.1 commit 3 ``c941526`` (FastAPI lifespan + INDEXING_MODE wire-in). The original T3.3 smoke (commit ``53257881``) excluded vision because vision's ``derive`` consumes a JSON list of image records, not chunks.jsonl, and the dispatcher takes a single ``source_path`` per request — single-call coverage for all 5 modalities was incompatible with that contract. This follow-up adds a vision-only smoke (with a per-modality source_path resolution example) so vision modality regressions are covered at the inline-mode layer. The production upload path (chenyexuan T3.1 commit 4 caller migration) will resolve per- modality source paths upstream of the dispatcher and issue per-modality ``DispatchRequest`` calls — this test demonstrates exactly that pattern. Test addition (1 case): seed an image-records JSON list under ``collections/<cid>/documents/<did>/source/images.json``, dispatch with ``modalities=(Modality.VISION,)`` + ``source_path=<images.json path>``, assert the row reaches ``status=ACTIVE`` AND ``is_serving=TRUE``. 3/3 tests in tests/integration/test_inline_mode_smoke.py now pass. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery T3.1 commit 4b/5 step 1): move extract_keywords helper to aperag/indexing/keyword_extract.py Per architect msg=3890c9d7 commit-4 split (chenyexuan = Pattern A/B/C + extract_keywords; Bryce = 9 caller schema-aware migration), this commit lands the extract_keywords subsystem move that decouples the search-time keyword extraction helpers from the soon-to-be-deleted ``aperag/domains/indexing/fulltext_index.py`` (commit 5 hard-cut target). aperag/indexing/keyword_extract.py (NEW, 337 lines): - ``KeywordExtractor`` (abstract base for backward-compat with callers that may type-annotate the abstract type) - ``IKKeywordExtractor`` (Elasticsearch IK analyzer, default fallback, always available when ES is reachable) - ``LLMKeywordExtractor`` (optional LLM extractor with structured JSON parsing + simple-line fallback) - ``extract_keywords(text, ctx)`` (public entry point with LLM-then-IK fallback chain, signature unchanged from legacy) - ``_es_client_config()`` (private helper, inlined to keep the new module dependency-free of legacy fulltext_index.py) - Module docstring explains the SEARCH-side helper vs Wave 1 ``aperag/indexing/fulltext.py`` (write-side modality worker) split aperag/indexing/__init__.py: - Re-exports the 4 new symbols (KeywordExtractor + IKKeywordExtractor + LLMKeywordExtractor + extract_keywords) Caller migration (extract_keywords import sites): - ``aperag/domains/retrieval/pipeline.py:41`` — swap from legacy ``aperag.domains.indexing.fulltext_index`` to new ``aperag.indexing.keyword_extract`` - ``aperag/service/search_pipeline_service.py:34`` — same swap. This file's docstring explicitly notes the import alias is kept writable for ``monkeypatch.setattr("aperag.service.search_pipeline_service.extract_keywords", ...)`` test fixtures, so the new path is preserved as a writable alias. The legacy ``extract_keywords`` symbol still exists in ``aperag/domains/indexing/fulltext_index.py`` until commit 5 deletes the file — both sites work simultaneously, so any caller I missed is not silently broken in this intermediate state. Other DocumentIndex / FulltextSearchDegradedError / fulltext_indexer imports in ``aperag/domains/retrieval/pipeline.py`` (line 293) + elsewhere in pipeline.py are Bryce's commit-4a write set per the agreed split (msg=9d5d54b5 coordination note). chenyexuan changed ONLY the extract_keywords import line, leaving Bryce's hunks untouched. Local pytest: 137 passed (Wave 1 + T2.1 + T2.2 + T3.1 + T3.2 + T3.3 + Phase 3 audit), 0 failed. Lint + format clean. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery T3.1 commit 4a): migrate 7 production callers to §F.1 schema Per architect msg=ab8d473c pre-blessed split + chenyexuan msg=be26ebf3 + PM authorization msg=df9ea8d2: schema-aware migration of legacy ``aperag.domains.indexing.db.models.DocumentIndex`` callers to the new ``aperag.indexing.models.DocumentIndex`` (§F.1 canonical schema post Wave 3 commit 1 alembic ``930cf20``). # Field translation contract Wave 1+2+commit-1 merged the following schema deltas; this commit flips every production caller to the new shape: | Legacy (gone in Wave 3 commit 5) | New (§F.1) | |----------------------------------------------|-------------------------------------------------------| | ``DocumentIndex.index_type`` (enum) | ``DocumentIndex.modality`` (string) | | ``DocumentIndexType.GRAPH`` (Python enum) | ``Modality.GRAPH.value`` (lowercase string) | | ``DocumentIndexStatus.ACTIVE`` (Python enum) | ``IndexStatus.ACTIVE.value`` (string) + is_serving=TRUE | | ``DocumentIndex.gmt_created`` / ``gmt_updated`` | ``created_at`` / ``updated_at`` (mixin-aligned) | | ``DocumentIndex.index_data`` (JSON blob) | per-modality ``derived/parse_<v>/`` artifact paths | The "currently-serving" semantic now requires ``status=ACTIVE AND is_serving=TRUE`` per §F.3 cutover model — a row at ``status=ACTIVE`` but ``is_serving=FALSE`` is in the cutover transit window and is NOT yet user-visible. # Files migrated (7 of 9 in commit 4a list) * ``aperag/db/repositories/document_index.py`` — repository mixin: ``has_recent_graph_index_updates`` query rewritten + return type switched from ``DocumentIndexType`` enum to ``Modality`` / string. ``query_documents_with_failed_indexes`` now returns modality string values (lowercase) per the §F.1 column type. * ``aperag/domains/agent_runtime/runtime.py`` — inlined ``generate_processing_token`` (3-line stdlib uuid wrapper) since ``aperag.tasks.processing_lease`` is in chenyexuan's commit 5 hard-delete list. Per architect msg=3890c9d7 Item 1 Option B ("提取小 helper 到 agent_runtime 自己 module"). * ``aperag/domains/knowledge_base/db/models.py`` — ``Document.get_overall_index_status()`` rewritten: the legacy ``CREATING`` / ``DELETION_IN_PROGRESS`` intermediate states are gone in §F.1 (a single ``RUNNING`` covers in-flight work); ``COMPLETE`` now requires ``is_serving=TRUE`` per §F.3. * ``aperag/domains/knowledge_base/service/document_service.py`` — schema migration spans ``_get_index_types_for_collection`` (now returns ``Modality`` values), the document JOIN query (legacy ``index_type`` / ``index_data`` / ``gmt_*`` columns translated to ``modality`` / None placeholder / ``created_at``/``updated_at``), rebuild_failed_indexes (modality string compare instead of enum), rebuild_document_indexes (Modality enum list instead of DocumentIndexType). The legacy ``index_data`` JSON-blob reads in ``get_document_chunks`` / ``get_document_vision_chunks`` are replaced with ``derived_artifact_path`` probes that exercise the §F.1 partial-unique invariant; the actual chunk-list response is routed through a "return empty list" placeholder until chenyexuan T3.1 commit 4b plumbs the object-store read path. HTTP response shape stays stable (``index_data=None`` populated where callers previously read JSON). Service-layer ``document_index_manager`` calls remain — those are chenyexuan commit 5 hard-delete scope. * ``aperag/domains/knowledge_base/service/collection_summary_service.py`` — same ``index_data`` deprecation pattern: query touches the §F.1 serving rows for the partial-unique invariant probe, returns empty document_summaries until the object-store read path lands. * ``aperag/mcp/tools/get_document_metadata.py`` — ``DocumentIndex`` / ``DocumentIndexStatus`` import migrated; ``index_data`` JSON parse replaced with ``derived_artifact_path`` probe, chunk_count surfaced as 0 (placeholder until object-store read path lands). * ``aperag/mcp/tools/list_documents.py`` — same migration as get_document_metadata (page-level ``DocumentIndex`` lookup + chunk_count placeholder). # Out of scope (chenyexuan commit 4b / 5 lane) * ``aperag/domains/retrieval/pipeline.py`` + ``aperag/service/search_pipeline_service.py`` — chenyexuan handles ``extract_keywords`` import + Pattern A/B/C legacy task migrations there per the split agreement. * ``aperag/domains/knowledge_base/tasks.py`` — chenyexuan commit 4b Pattern A/B/C migration (collection_delete / cleanup_expired / collection_summary / collection_summary_reconciler / collection_init / export_collection). * ``document_index_manager.create_or_update_document_indexes`` / ``delete_document_indexes`` calls inside document_service — chenyexuan commit 5 hard-deletes the manager module so these callers will need switching to the new ``dispatch_indexing()`` / cleanup paths (chenyexuan's lane). # Lint + tests * ``uvx ruff check + ruff format --check`` clean across aperag/. * ``pytest tests/unit_test/indexing/ tests/integration/ test_inline_mode_smoke.py tests/load/ tests/unit_test/ test_phase3_reexport_audit.py`` → 137 passed, 0 failed. * Tests covering legacy ``aperag.domains.indexing.*`` modules (which chenyexuan commit 5 deletes) are not in the test set above; they are chenyexuan's commit 5 sweep scope. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery T3.1 commit 4b/5 step 2): Pattern A/B/C migration of 6 knowledge_base Celery tasks Per architect msg=3890c9d7 Pattern A/B/C ruling, the 6 Celery tasks in aperag/domains/knowledge_base/tasks.py are migrated off Celery without losing their semantics. The decorators + Celery imports (``from celery import current_app`` + ``from config.celery import app``) are removed; each function is now plain Python that callers invoke per its category: aperag/domains/knowledge_base/tasks.py (-Celery, +Pattern A/B/C): - Module docstring rewritten — Pattern map for the 6 tasks - ``reconcile_collection_summaries_task`` (Pattern B, periodic) — no decorator; commit 5 wires into reconciler 30-s loop - ``collection_delete_task`` (Pattern A, durability-required) — caller invokes synchronously from HTTP handler; on failure raises HTTP 500 + the periodic Path C cleanup loop sweeps tombstoned rows - ``collection_init_task`` (Pattern C, idempotent) — no decorator; caller wraps in asyncio.create_task; failures log + reconciler picks up - ``collection_summary_task`` (Pattern C, regenerable) — no decorator; ``self.retry(...)`` removed (Celery-specific); failures flow through ``collection_summary_callbacks.on_summary_failed`` + reconciler picks up next cycle - ``cleanup_expired_documents_task`` (Pattern B, periodic) — no decorator; commit 5 merges into cleanup.py 5-min loop - ``export_collection_task`` (Pattern C) — ``self`` parameter removed; ``soft_time_limit`` / ``time_limit`` decorator args removed (now enforced via §H.6 ``bulkhead_timeout`` async ctx manager wrapped at the dispatch site) - Removed unused ``Any`` typing import + unused ``TaskConfig`` reference (was only used by removed ``self.retry()`` calls) - Function bodies still call legacy ``aperag/tasks/collection.py: collection_task.<method>()`` and ``aperag/tasks/reconciler.py:*`` helpers — commit 5 moves / inlines those helpers when it deletes the legacy ``aperag/tasks/`` layer entirely. aperag/domains/knowledge_base/service/collection_service.py: - ``collection_init_task.delay(...)`` (line 215) → Pattern C: ``asyncio.create_task(asyncio.to_thread(collection_init_task, instance.id, document_user_quota))`` so the HTTP response returns immediately. Failures log + the reconciler picks up. - ``collection_delete_task.delay(...)`` (line 438) → Pattern A: ``await asyncio.to_thread(collection_delete_task, collection_id)`` synchronous in the HTTP handler — durability-required per architect ruling msg=3890c9d7 (NOT fire-and-forget — losing this work = orphan rows + DB corruption). - Added ``import asyncio`` to module imports. aperag/domains/knowledge_base/service/export_service.py: - ``export_collection_task.delay(...)`` (line 104) → Pattern C: ``asyncio.create_task(asyncio.to_thread(export_collection_task, task.id))`` so the HTTP response returns immediately. The body is sync I/O (object-store + ZIP); the ExportTask DB row tracks progress; users retry from the UI on failure. Pattern B integration (cleanup_expired_documents_task + reconcile_collection_summaries_task into the existing 5-min / 30-s loops in aperag/indexing/{cleanup,reconciler}.py) is deferred to commit 5 — the functions still exist as plain Python, just no longer invoked via Celery beat schedule (config/celery.py beat schedule entries to be removed in commit 5 alongside the periodic loop integration). Local pytest: 137 passed (Wave 1 + T2.1 + T2.2 + T3.1 + T3.2 + T3.3 + Phase 3 audit), 0 failed. Lint + format clean across all changed files. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery T3.1 commit 5 Part 1): inline processing_lease helpers + remove flower dep Wave 3 hard-cut Part 1 per architect msg=64fd506a fallback split (Part 2 atomic = next session). Two safe pieces that decouple the last knowledge_base-domain dependency on legacy ``aperag/tasks/processing_lease.py`` + drop a Celery-monitor dep that has no remaining production caller. aperag/domains/knowledge_base/tasks.py: - Removed ``from aperag.tasks.processing_lease import ...`` line (last surviving caller; Bryce commit 4a `39aad24` already inlined the agent_runtime caller) - Inlined the 4 public symbols from ``aperag/tasks/processing_lease.py`` (84 LOC verbatim): * ``DEFAULT_PROCESSING_LEASE_TTL_SECONDS`` * ``DEFAULT_PROCESSING_LEASE_RENEW_INTERVAL_SECONDS`` * ``generate_processing_token()`` * ``build_lease_expires_at()`` * ``ProcessingLeaseRenewer`` class (background lease-renewal thread) - Added ``import threading``, ``import uuid``, ``from typing import Optional`` to support the inlined symbols - Module section header explains Part 1 / Part 2 split — the legacy ``aperag/tasks/processing_lease.py`` file itself stays in Part 1 (Part 2 atomic deletes it together with the rest of ``aperag/tasks/`` after CollectionSummaryCallbacks + CollectionTask methods are inlined to their service-layer homes) pyproject.toml: - Removed ``flower<3.0.0,>=2.0.0`` dep (Celery monitoring dashboard, no production code import; verified ``grep -rn "import flower\| from flower" aperag/ tests/ config/`` returns 0) - Other Celery deps (``celery``, ``django-celery-beat``, ``kombu``) stay until Part 2 atomic — they are still imported by 4 files in Part 2's delete list (``aperag/tasks/scheduler.py``, two files in ``aperag/domains/indexing/``, and ``config/celery.py``) Notes scoped OUT of Part 1 (per architect msg=64fd506a): - ``aperag/concurrent_control/redis_lock.py`` deletion deferred: architect spec said "no production caller" but recon found internal callers in ``concurrent_control/__init__.py`` + ``concurrent_control/manager.py`` (the package itself uses it even though zero EXTERNAL imports of the package exist). Cleaner fix is to delete the whole ``aperag/concurrent_control/`` package in Part 2 atomic alongside the other dead-code sweeps. Local pytest: 137 passed (Wave 1 + T2.1 + T2.2 + T3.1 commits 1-4b step 2 + T3.2 + T3.3 + Bryce caller migration + Phase 3 audit), 0 failed. Lint + format clean. This is a partial commit 5; Part 2 (inline CollectionTask / CollectionSummaryCallbacks / Pattern B reconcilers + tablename rename + audit allowlist removal + legacy file-layer deletion + remaining Celery dep removal + legacy test deletion + final grep validation) is the next-session atomic push. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery T3.1 commit 5 Part 2 chunk 1a): inline CollectionSummaryCallbacks Per architect msg=70a20f0e + msg=54063106 fallback ratify (Bryce takes Part 2) + PM msg=ef2e97b9 minimal-chunk-1 GO. Move legacy ``aperag/tasks/reconciler.py:CollectionSummaryCallbacks`` (~234 LOC) to its true owner: ``aperag/domains/knowledge_base/ service/collection_summary_service.py``. The class is the terminal callback hook the summary generation task invokes on success / failure to flip the ``CollectionSummary`` row's lifecycle (GENERATING → COMPLETE / FAILED) and propagate the generated text to ``Collection.description``. It belongs to the summary service layer, not the legacy task / reconciler layer that Wave 3 commit 5 deletes. * ``CollectionSummaryCallbacks`` class — three static methods (``_describe_summary_callback_mismatch``, ``on_summary_generated``, ``on_summary_failed``) inlined verbatim. No semantic changes; the query/update logic, token/version mismatch tolerance, and Collection.description propagation are preserved exactly. * Module-level ``collection_summary_callbacks`` singleton mirrors the legacy ``aperag.tasks.reconciler.collection_summary_callbacks`` attribute so callers can swap import path without changing the call shape. * ``aperag/domains/knowledge_base/tasks.py:373`` import switched to the new location. Removes the last `aperag.tasks.reconciler` callback import; the periodic-reconciler imports (``collection_summary_reconciler`` + ``collection_gc_reconciler``) remain pending for Part 2 chunks 1b / 2 / 3. This is the safe, surgical first chunk per architect msg=f3de18a0 chunked-OK ruling: intermediate-red CI is fine; the final HEAD must be green + grep 0 + alembic reversible before task #14 → ``in_review``. The next session will continue Part 2 chunks 1b (remaining inline migrations: CollectionTask methods, periodic reconcilers) → chunk 2 (deletions + tablename rename) → chunk 3 (verify + wire). Tests: 137 indexing/load/audit tests still green; lint clean. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery T3.1 commit 5 Part 2 chunk 1b): simplify task bodies + Pattern B loop integration Wave 3 hard-cut continuation per architect msg=3890c9d7 Pattern A/B/C ruling and PM msg=206eec7b chunk 1b spec (~300 LOC scope). aperag/domains/knowledge_base/tasks.py: - collection_delete_task: Pattern A — replace legacy collection_task.delete_collection() with sync UPDATE Collection.status =DELETED + gmt_deleted=NOW(); periodic Path-C cleanup_for_deleted_collections sweep cascades the deletion (5-min worst-case latency acceptable for low-frequency op) - collection_init_task: Pattern C — replace legacy collection_task.initialize_collection() with sync UPDATE Collection.status=ACTIVE; per-modality index provisioning is implicit lazy in the new modality-worker model (per architect hint msg=54063106) - cleanup_expired_documents_task: Pattern B — replace legacy CollectionTask.cleanup_expired_documents with inlined SQL tombstone scan (Document.status==UPLOADED AND gmt_created < now-1d) + best-effort object-store delete + soft-delete to EXPIRED - reconcile_collection_summaries_task: Pattern B — convert to thin sync shim around the new aperag.indexing.reconciler hook - Drop unused legacy import: from aperag.tasks.collection import collection_task (no remaining call sites in this file) - Update module docstring to point at new Pattern B hook locations aperag/indexing/cleanup.py: - Add cleanup_expired_documents_hook() async helper (lazy import + asyncio.to_thread wrapper) wired into the existing 5-min run_cleanup_loop. Hook failures are logged + cycle continues. - Update module docstring to describe Pattern B integration alongside the original orphan-parse-version GC aperag/indexing/reconciler.py: - Add reconcile_collection_summaries_hook() async helper that inlines the legacy CollectionSummaryReconciler.reconcile_all() logic: reclaim stale GENERATING leases → PENDING; select PENDING summaries with version != observed_version; atomically claim each; fire collection_summary_task as Pattern C asyncio.create_task fire-and- forget background task (never blocks the loop on summary generation duration). Wired into existing 30-s run_reconcile_loop with best-effort try/except so hook failure cannot crash the loop. Tests: 132 passed (tests/unit_test/indexing/ + tests/load/); ruff check + format clean on all 3 modified files. Pre-existing test_phase3_reexport_audit.py circular-import error is unchanged (independent of this chunk; will resolve in chunk 2 when legacy aperag/domains/indexing/db/models.py is deleted). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery T3.1 commit 5 Part 2 chunk 2): hard-delete legacy Celery + indexing layers + tablename rename Wave 3 hard-cut continuation per architect msg=3890c9d7 + PM @不穷 msg=313caed3 chunk 2 spec (delete-focused, intermediate red CI OK). DELETIONS (~3.5k LOC removed): - aperag/tasks/* — entire dir (collection / document / models / processing_lease / reconciler / scheduler / utils): legacy Celery state machine + reconciler + scheduler infrastructure - aperag/concurrent_control/* — entire dir (manager / protocols / redis_lock / threading_lock / utils + 2 READMEs): no remaining production caller after Wave 1+2 modality workers replaced lock semantics with per-row §F.1 partial-unique invariant - aperag/domains/indexing/{tasks,orchestration,manager,vector_index, fulltext_index,graph_index,summary_index,vision_index}.py + aperag/domains/indexing/db/models.py — legacy ABC + 5 modality workers + Celery orchestration + legacy DocumentIndex schema - config/celery.py — Celery app + beat schedule - tests/unit_test/concurrent_control/* + tests/unit_test/tasks/* — contract tests for now-deleted modules TABLENAME RENAME (matches existing alembic d0f4c1b9a8e2 post-state): - aperag/indexing/models.py: __tablename__ + 5 index names from *_v2 → canonical (no new alembic revision needed; the migration already does the rename at upgrade) AUDIT ALLOWLIST + 15-symbol map updates: - tests/unit_test/test_phase3_reexport_audit.py: drop WAVE_1_2_TEMPORARY_DUP_ALLOWLIST DocumentIndex entry; remap PHASE3_SYMBOL_TO_MODULE['DocumentIndex'] from aperag.domains.indexing.db.models → aperag.indexing.models; remove DocumentIndexStatus/DocumentIndexType (legacy enums gone, replaced by IndexStatus + Modality which are not Phase-3-canonical) - Add explicit aperag.indexing.models import after the per-domain bootstrap loop so Base.metadata['document_index'] is populated PYPROJECT — drop Celery deps: - celery<6.0.0,>=5.3.1 - django-celery-beat<3.0.0,>=2.5.0 (kombu was a transitive only; no explicit entry to remove) CONSUMER PATCHES (minimum to keep imports working — chunk 3 wires real new-API replacements): - aperag/domains/knowledge_base/service/document_service.py: stub document_index_manager + no-op _trigger_index_reconciliation - aperag/domains/knowledge_base/service/collection_summary_service.py: drop unused SummaryIndexer init - aperag/domains/retrieval/pipeline.py: stub _fulltext_search to return empty (Bryce T3.2 lane wires real aperag.indexing.fulltext backend) - aperag/domains/evaluation/tasks.py + services.py: drop @app.task decorator + asyncio.create_task fire-and-forget Pattern C - aperag/domains/knowledge_graph/tasks.py + graph_curation/service.py: same Pattern C migration CIRCULAR IMPORT FIXES (uncovered when stub re-exports were dropped): - aperag/indexing/__init__.py: drop keyword_extract re-exports (eager import pulled LLM completion stack mid-module-load); the 2 callers already import from aperag.indexing.keyword_extract directly - aperag/indexing/parser.py: lazy-import compute_parse_version inside parse_document body (was triggering full mcp.tools registry load) - aperag/indexing/keyword_extract.py: lazy-import db_ops inside LLM extractor body - aperag/domains/knowledge_base/db/models.py: lazy-import DocumentIndex + IndexStatus inside Document.{get_document_indexes, get_overall_index_status} method bodies (was triggering knowledge_base→indexing→mcp→knowledge_base cycle) GATES: - pytest tests/unit_test/indexing/ + tests/load/ + test_phase3_reexport_audit.py + agent_runtime_openapi_contract: 136 passed - Wider sweep (tests/unit_test/ excluding pre-existing missing-moto + just-deleted concurrent_control/tasks suites): ~896 passed, 4 failed (3 expected — Celery-specific assertions in evaluation_v2_worker / graph_curation that chunk 3 deletes; 1 format_drift caught + auto-formatted) - ruff check + format clean on all 13 modified .py files REMAINING FOR CHUNK 3: - Wire document_service.py 5 call sites + retrieval/pipeline.py fulltext to real new-API helpers - Selective deletion of legacy Celery-specific tests (evaluation_v2, graph_curation enqueue-raises path) - Final grep validation: from aperag.tasks / from aperag.domains. indexing / from celery / import celery = 0 hits in production - Alembic upgrade/downgrade smoke - task #14 → in_review Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery T3.1 commit 5 Part 2 chunk 3): wire new-API + final grep 0 + alembic smoke + selective test delete Wave 3 hard-cut FINAL chunk per architect msg=3890c9d7 + PM @不穷 msg=de7b6834 + msg=fdb6cd28 chunk 3 spec. NEW MODULE — IndexingRuntime singleton: - aperag/indexing/runtime.py: process-local triple holder (engine + queue + workers) populated by FastAPI lifespan, consumed by service-layer code that doesn't have a Request handle for app.state. Tests can install a fixture runtime via set_runtime + reset. - aperag/app.py: lifespan calls set_runtime after building the triple; passes None on the sync-only branch + on shutdown. DOCUMENT_SERVICE — wire 5 callsites to new dispatcher + cleanup: - aperag/domains/knowledge_base/service/document_service.py: Replace the chunk-2 ``_DocumentIndexManagerStub`` with two real adapters: - ``_create_or_update_document_indexes`` → calls new ``aperag.indexing.dispatcher.dispatch_indexing()`` with deterministic ``parse_version`` (compute_parse_version on document.content_hash + canonical chunking config) + ``source_path = document.object_store_base_path()`` + tenant_scope_key per user. - ``_delete_document_indexes`` → calls new ``aperag.indexing.cleanup.cleanup_for_deleted_documents()`` (handles modality fan-out + DB row cleanup). Both adapters consume the IndexingRuntime singleton; if the runtime is absent (test environment / sync-only mode), they log a warning + no-op rather than crash. All 5 production callsites swapped: - line 532 create_documents - line 687 _delete_document - line 787 rebuild_document_indexes - line 831 rebuild_failed_indexes - line 1346 confirm_documents - ``_trigger_index_reconciliation`` stays as a no-op shim — the new ``run_reconcile_loop`` runs continuously every 30s. RETRIEVAL PIPELINE — inline ES fulltext search: - aperag/domains/retrieval/pipeline.py: ``_fulltext_search`` was a chunk-2 empty stub. Now executes the same ES query shape as the legacy ``FulltextIndexer.search_document`` — bool/should/match on content+title, filter by collection_id, optional chat_id filter — directly through ``AsyncElasticsearch`` (no longer wrapped in a domains/indexing/* class). T3.2 lane did not introduce a new search backend abstraction; the inline query against whatever ``aperag.indexing.fulltext.FulltextModality`` wrote is the canonical path. ALEMBIC env.py — drop deleted-module bootstrap import: - aperag/migration/env.py: remove ``import aperag.domains.indexing.db.models # noqa: F401`` (module hard-deleted in chunk 2). The canonical ``aperag.indexing.models`` import a few lines down already registers ``DocumentIndex`` against ``Base.metadata`` for autogen. SELECTIVE TEST DELETION (per architect msg=3890c9d7 Item 4): - tests/unit_test/test_es_p0_contract.py — DELETE (tested legacy ES ``aperag/domains/indexing/fulltext_index.py`` shape) - tests/unit_test/test_es_shared_index_rollout.py — DELETE (same) - tests/unit_test/test_evaluation_v2_worker.py: ``test_evaluation_run_service_launch_run_dispatches_celery_task`` removed (Celery-specific assertion; new path is asyncio fire-and-forget; the 13 ``test_execute_evaluation_run_*`` tests above lock the worker behaviour) - tests/unit_test/graph_curation/test_service.py: ``test_start_run_marks_failed_when_enqueue_raises`` removed (asyncio.create_task doesn't synchronously raise on schedule so the assertion no longer maps to reachable behaviour) LEGACY MIGRATION SCRIPT DELETED: - scripts/migrate_es_fulltext_shared_index.py — one-time Wave-1-era ES per-collection → shared rollout migration that referenced the hard-deleted ``aperag/domains/indexing/fulltext_index.py``. Not production runtime code; the rollout already happened. T3.2 CONTRACT TEST UPDATE: - tests/unit_test/service/test_search_graph_contract.py: ``test_search_result_metadata_is_public_allowlist`` add expected ``index_modality: "vision"`` field (Bryce T3.2 commit 5325788 §G.5 ``SearchResultMetadata.from_raw()`` derives it from ``indexer`` raw key — the test predates the schema extension and would have failed once T3.2 merged). GATES (FINAL HEAD): - ``grep "from aperag.tasks\|import aperag.tasks\| from aperag.concurrent_control\|from aperag.domains.indexing. (tasks|orchestration|manager|*_index|db.models)\|from config.celery\| ^from celery\|^import celery"`` over aperag/ + config/ + scripts/ → **0 hits in production code** ✅ - ``alembic upgrade head`` → succeeds (5 indexing migrations including T3.1 ``d0f4c1b9a8e2`` rename) ✅ - ``alembic downgrade -1`` then ``upgrade head`` → reversible round-trip ✅ - ``ruff check + format --check`` over aperag/ tests/ scripts/ → **clean** (491 files formatted) ✅ - ``pytest tests/unit_test/ tests/load/ --ignore=objectstore`` (objectstore needs moto extra, pre-existing) → **900 passed / 29 skipped / 0 failed** ✅ Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(celery T3.1 alembic drift): promote DocumentIndex.{collection_id,source_path} to NOT NULL in model to match alembic d0f4c1b9a8e2 post-state CI ``alembic check`` (drift detector) caught a Wave-1-era stale model declaration. The migration ``d0f4c1b9a8e2`` correctly ALTERs both columns to NOT NULL (per architect msg=498b12f0), but ``aperag/indexing/models.py:108-109`` still declared ``Mapped[str | None] ... nullable=True`` from the original Wave 1 fixture-back-compat era. After ``alembic upgrade head`` the DB was NOT NULL but ``Base.metadata`` was nullable, so autogen wanted to emit ``ALTER COLUMN ... DROP NOT NULL`` to revert the DB. The PM directive (``msg=0dd76df9``) read the autogen log "Detected NULL on column" as "DB has NULL" and asked to add the ALTER NOT NULL to the migration; the migration already does that. The actual fix is to align the model with the migration's post-state (NOT NULL), not the other way around — Wave 3 lifted the back-compat the original ``nullable=True`` was protecting. Changes: - aperag/indexing/models.py:108-109: ``Mapped[str | None] ... nullable=True`` → ``Mapped[str] ... nullable=False`` for both columns + comment refresh pointing at the alembic NOT-NULL promotion - tests/unit_test/indexing/test_t2_1_runtime.py: ``test_reconciler_skips_pending_rows_missing_source_path`` deleted — the fixture ``_insert_row(... source_path=None)`` now raises IntegrityError before reconcile_pending_dispatch is ever called, so the scenario is unreachable from a clean schema. The defensive ``if not row.source_path`` branch in ``aperag/indexing/reconciler.py`` is kept as a zero-cost guard but no longer reachable without manual SQL bypass. Gates: - ``uv run alembic -c aperag/alembic.ini check`` → "No new upgrade operations detected" ✅ - pytest tests/unit_test/ tests/load/ --ignore=objectstore → 899 passed / 29 skipped / 0 failed ✅ - ruff check + format --check clean on the 2 modified files ✅ Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(celery T3.1 e2e): purge existing triple before INSERT in rebuild adapter + drop celery infra CI e2e-http-provider caught two Wave-3-induced regressions on PR #1729 HEAD `5d50ca5`: **Blocker 1 — rebuild_indexes 500 DATABASE_ERROR**: The chunk-3 ``_create_or_update_document_indexes`` adapter calls ``dispatch_indexing()`` which INSERTs new ``document_index`` rows. ``rebuild_indexes`` re-invokes the adapter with the same ``(document_id, parse_version, modality)`` triple (content unchanged → parse_version unchanged), so the §F.1 ``uq_document_index_triple`` UNIQUE constraint fails the INSERT with IntegrityError → 500. Pre- DELETE matching rows (any status / serving state) before INSERT so the dispatcher's INSERT lands cleanly. The §F.3 cutover-on-sync- completion re-establishes the serving state once the new dispatch's worker finishes; brief unavailability between DELETE and cutover is acceptable for an explicit rebuild op. Test failure traced from `tests/e2e_http/hurl/full/11_document_full. hurl:204` POST `/api/v2/collections/.../documents/.../rebuild_indexes` expecting HTTP 200, getting 500. **Blocker 2 — celerybeat container `celery: not found`**: chunk 2 dropped ``celery`` + ``django-celery-beat`` from ``pyproject.toml`` and deleted ``aperag/tasks/`` + ``config/celery.py``, but the docker-compose ``celeryworker`` / ``celerybeat`` / ``flower`` services + helm chart ``celeryworker-deployment.yaml`` / ``celerybeat-deployment.yaml`` / ``flower-deployment.yaml`` + the ``scripts/start-celery-{worker,beat,flower}.sh`` entry scripts were left behind. CI e2e-aperag spins up the docker-compose stack, the ``celerybeat`` container tries to ``exec celery`` and fails (binary not in image since pyproject dropped the dep). The new in-process ``aperag.indexing`` runtime (worker pool + reconciler + cleanup loops) is spawned by the FastAPI lifespan inside the ``aperag-api`` container, so no separate worker / beat / monitoring pods are needed. DELETED: - docker-compose.yml: ``celeryworker`` / ``celerybeat`` / ``flower`` service blocks (replaced with explanatory comment block) - scripts/start-celery-{worker,beat,flower}.sh - scripts/test/celery-{call-task,with-local-queue}.sh - scripts/celery/trigger_trask.sh + the ``scripts/celery/`` dir - deploy/aperag/templates/celeryworker-deployment.yaml - deploy/aperag/templates/celerybeat-deployment.yaml - deploy/aperag/templates/flower-deployment.yaml - deploy/aperag/values.yaml: ``celery-worker`` + ``celerybeat`` + ``flower`` value blocks (replaced with explanatory comment) - deploy/aperag/templates/aperag-secret.yaml: ``CELERY_FLOWER_*`` env entries (no flower pod to consume them) - deploy/aperag/templates/_helpers.tpl: ``celeryworker.labels`` template (no chart consumes it) - deploy/aperag/values.yaml api podAffinity-with-celery-worker rule (the api pod no longer needs to co-locate with a non-existent worker pod; the soft anti-affinity for spreading api replicas across nodes is preserved) - deploy/aperag/templates/api-deployment.yaml: comment "shared uploaded files between api and celery" → "uploaded files volume consumed solely by the in-process ``aperag.indexing`` runtime" Local gates: - ruff check + format --check on the changed files → clean ✅ - pytest tests/unit_test/indexing/ tests/load/ test_phase3_reexport_audit.py → 133 passed ✅ Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(celery T3.1 e2e): drop celery service refs in e2e runners + CI workflow + Makefile Wave 3 chunk 2 + 144c3f1 deleted the docker-compose ``celeryworker`` / ``celerybeat`` / ``flower`` services + helm charts, but a few infra-side scripts that explicitly referenced those service names were missed. CI e2e-http-smoke caught it: ``docker compose up -d celeryworker`` failed with ``no such service: celeryworker``. This PR plugs the four straggler call sites: - tests/e2e_http/runners/compose/up.sh:8: ``E2E_COMPOSE_SERVICES`` default drops ``celeryworker celerybeat`` → just ``postgres redis qdrant es api``. The api container's FastAPI lifespan spawns the in-process indexing runtime, so no separate worker container. - tests/e2e_http/scripts/provider_diagnostic.sh:63: failure-diag log-dump loop drops ``celeryworker celerybeat`` from the service list. - .github/workflows/e2e-http-smoke.yml:68,173: ``docker compose logs`` in the failure-dump steps drops ``celeryworker celerybeat``. - Makefile: deleted ``serve-worker`` / ``serve-beat`` / ``serve-flower`` targets + their help-string entries (the binaries are gone since pyproject dropped ``celery``). Local sanity: ``grep -rn 'celery|celerybeat|celeryworker|flower' tests/ e2e_http/ .github/ Makefile docker-compose.yml deploy/`` returns only explanatory comment lines (the in-process runtime replacement narrative); no live service / command references remain. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(celery T3.1 worker_factory): replace _placeholder_worker_factory with ProductionWorkerFactory + harden orchestrator factory-error path Wave 3 hard-cut deleted the legacy Celery indexers but left the FastAPI lifespan wiring ``run_*_worker`` with a placeholder factory that raised ``NotImplementedError`` on every dispatch. e2e-http- provider stalls on ``wait_for_document_indexes`` because the row never advances past PENDING (PM msg=dc13c4a5 root cause). Per architect msg=7782ebe0 spec lock: - ``aperag/indexing/worker_factory.py`` (new): per-task lazy ``ProductionWorkerFactory`` resolving ``Collection`` from the payload, building the right ``ModalityWorker`` with real Qdrant / Elasticsearch backends + the configured embedder / completion model. Composes existing helpers (``get_collection_embedding_service_sync`` / ``get_vector_db_connector`` / ``get_object_store`` / ``build_collection_llm_callable``) so this is wiring, not re-implementation. Failures raise ``WorkerFactoryError`` so the operator gets a meaningful ``error_message``. Graph modality is intentionally minimal (in-memory lineage store + no-op extractor) pending Wave 4 Nebula-side §D.3.6 lineage adapter — documented as a known gap, not a regression; the e2e-http-provider gate only blocks on vector ACTIVE. - ``aperag/indexing/orchestrator.py``: harden ``_runner`` to claim the row + finalise FAILED on factory error so the §I.2 retry- with-backoff schedule kicks in. Without this, factory errors got silently swallowed by the asyncio.Task and the row sat at PENDING forever. - ``aperag/app.py``: replace the placeholder closure with a ``ProductionWorkerFactory`` instance. - ``tests/integration/test_worker_factory.py``: 3 tests pinning factory-failure → FAILED-finalize, collection-not-found path, and missing-collection-id path. Local gates: pytest tests/unit_test/ tests/integration/ tests/load/ --ignore=tests/unit_test/objectstore = 909 passed / 41 skipped / 0 failed (+3 from this commit). ruff check + format --check clean. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(celery T3.1 view-model): align Document per-modality status Literal to §F.2 4-state + drop SKIPPED sentinel + skip vector when disabled Per architect msg post-pass-5 + PM msg=79683cc0 ruling. Two e2e-http- smoke bugs surfaced after the worker_factory wire-in lands: **Bug 1 — Pydantic 400 on GET document.** orchestrator claims a row to ``RUNNING`` (the §F.2 canonical 4-state) before the worker finishes; the ``Document`` view model's per-modality status Literal still listed the legacy 6-state vocabulary (``CREATING``/``DELETING``/``DELETION_IN_PROGRESS``/``SKIPPED``) which never includes ``RUNNING`` — so any GET racing the claim returned ``ValidationError``. The Wave 3 hard-cut migrated the DB enum but missed this view-model layer (CR step-0 lesson #6: schema-touching PR must trace enum references through every deserialise surface, not just the write path). The fix collapses the 5 per-modality status Literals to the §F.2 4-state ``Optional[Literal["PENDING", "RUNNING", "ACTIVE", "FAILED"]]``. "Modality not enabled" is now expressed by the field being absent (``None``) rather than the sentinel ``"SKIPPED"`` — the row simply does not exist in ``document_index``. Friendly client-facing mapping (``NOT_ENABLED`` / ``INDEXING``) lives in §G.5 ``SearchResultMetadata.index_state_per_modality`` for the read-path response. **Bug 2 — collection without embedder triggers FAILED loop.** ``_get_index_types_for_collection`` always added ``Modality.VECTOR`` regardless of the collection's ``enable_vector`` flag. A collection without an embedding-model config (smoke test fixture) then dispatched a vector job, the production worker factory raised ``WorkerFactoryError`` (no embedder), the orchestrator finalised ``FAILED``, the reconciler re-dispatched, repeat. The fix honours ``enable_vector`` symmetric with ``enable_fulltext``: explicitly disabled means no row in the document_index table for that modality. Files: - ``aperag/domains/knowledge_base/schemas.py``: 5 status fields → ``Optional[Literal["PENDING", "RUNNING", "ACTIVE", "FAILED"]]`` - ``aperag/domains/knowledge_base/service/document_service.py``: ``_build_document_response`` returns ``None`` when index row missing (instead of ``"SKIPPED"``); ``_get_index_types_for_collection`` honours ``enable_vector`` flag. - ``tests/e2e_http/hurl/{smoke/03_document_basic,full/11_document_full}.hurl``: 6 assertions migrated from ``== "SKIPPED"`` to ``== null``. Local gates: pytest tests/unit_test/ tests/integration/ tests/load/ --ignore=tests/unit_test/objectstore = 909 passed / 41 skipped / 0 failed (unchanged from 579b32a1). ruff check + format clean. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(celery T3.1 evaluation cross-loop): run_evaluation_run as coroutine + drop asyncio.to_thread caller wrap Wave 3 chunk 2 Pattern C migration moved 5 ``.delay()`` callsites to ``asyncio.create_task(asyncio.to_thread(run_evaluation_run, run_id))``, but ``run_evaluation_run`` was still a sync wrapper that called ``asyncio.run(execute_evaluation_run(run_id))`` inside the worker thread — spawning a *fresh* event loop each invocation. Any asyncpg connection borrowed by ``execute_evaluation_run`` is bound to the FastAPI lifespan loop's connection pool; running the coroutine on a brand-new loop made every connection-pool ``ping`` fail with ``RuntimeError: got Future attached to a different loop``, which corrupted the asyncpg shared pool. Subsequent DB calls from unrelated code paths (every later e2e-http-provider hurl test that touched Postgres) tripped the same error → CI exit 1 (per huangheng pass-6 followup msg + PM msg=37da5249). Fix per huangheng option (a): * ``aperag/domains/evaluation/tasks.py``: ``run_evaluation_run`` becomes ``async def``, awaits ``execute_evaluation_run`` directly. No fresh loop. Docstring spells out the failure mode so a future reader does not regress. * ``aperag/domains/evaluation/services.py``: caller drops ``asyncio.to_thread`` and schedules the coroutine directly via ``asyncio.create_task(run_evaluation_run(run_id))``. The task shares the FastAPI lifespan loop, keeping asyncpg pool affinity. Pattern C contract preserved (fire-and-forget at the request handler boundary); only the inner mechanism changes from "thread + new loop" to "coroutine on shared loop". The other 4 ``.delay()`` callsites in chunk 2 were genuine sync work and stay on ``asyncio.to_thread`` — only evaluation's body was async-native under the hood, which is why this was the one that blew up. Local gates: pytest tests/unit_test/ tests/integration/ tests/load/ --ignore=tests/unit_test/objectstore = 909 passed / 41 skipped / 0 failed (unchanged). ruff check + format clean. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(celery T3.1 evaluation hurl): relax timing-sensitive assertions for Pattern C in-process dispatch CI run 24976479158 (PR #1729 head e1f23258) failed at ``16_evaluation_v2.hurl:218`` with the assertion ``$.items[0].status == "queued"``; the actual response showed ``status="running"`` because the post-pass-7 evaluation cross-loop fix (e1f23258) made dispatch effectively immediate — the ``asyncio.create_task(run_evaluation_run(run_id))`` worker starts on the next event-loop tick, so by the time the GET arrives the run has already left "queued". The test was written for Celery ``.delay()`` semantics where "queued" was a stable, externally-observable transient state thanks to broker round-trip + worker pickup latency. Pattern C in-process collapses that latency to microseconds, so "queued" is no longer reliably observable on a follow-up GET. Fix per huangheng option (a) + PM ack: relax 4 timing-sensitive assertions to accept any in-flight or terminal state via ``matches "^(queued|running|completed|cancelled)$"`` (item status uses the correspondingly-broader ``pending|...|failed|cancelled``). The contract this test pins is "the run shows up correctly in list / detail endpoints with the right ids", not "dispatch is slow enough to observe a specific transient state". POST-response asserts (lines 183, 207) keep the strict ``status == "queued"`` value because those are synchronous returns built before the ``create_task`` fires. Also relaxes: - ``summary.pending == 3`` → drop (kept ``summary.total == 3``, which is fixed by dataset cardinality) - ``progress.percent == 0`` → drop (now race-window-dependent) - ``items[0].status == "pending"`` → matches in-flight set - ``items[0].attempt_count == 0`` → drop (worker may have attempted already) - ``attempts body contains "items":[]`` → ``$.items exists`` (envelope shape only, ignore population timing) Local gates: pytest 161 passed (evaluation worker + openapi contract + indexing + integration + load + phase3 audit). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(celery T3.1 parser-wiring): sync invoke parse_document before dispatch_indexing in document_service PR #1729 head 30b34894 e2e-http-provider failed at the scripted ``run_chat_collection_flow.sh`` business flow because vector + fulltext modality workers reported "found no chunks at user-X/colY/docZ; treating as derive-incomplete and skipping" on every claim — the chunks.jsonl artifact never existed at the dispatcher's ``source_path``. Root cause (architect msg=c605037e ruling): chunk 2's hard-cut deleted ``aperag/domains/indexing/{tasks,orchestration,manager, *_index}.py`` whose former ``process_document_task`` ran :func:`aperag.indexing.parse_document` and wrote the canonical ``derived/parse_<v>/{markdown.md,outline.json,chunks.jsonl}`` artifacts before enqueuing modality workers. The new dispatch path never picked up that responsibility — every modality worker.derive pulled an empty derived path and the row stayed in the §C.7 reschedule loop forever. Fix per architect option (1) — Wave 3 minimal scope, not skip: ``aperag/domains/knowledge_base/service/document_service.py`` ``_create_or_update_document_indexes`` now: 1. Resolves the upload object path from ``document.doc_metadata.object_path`` (the upload handler already stashes it there). 2. Reads the source bytes from the object store on a worker thread. 3. Calls :func:`parse_document` synchronously on a worker thread so the canonical ``derived/parse_<v>/`` artifacts exist before any modality dispatch. 4. Uses ``parsed.parse_version`` and ``parsed.chunks_path`` as the dispatcher's parse_version / source_path (replaces the previously-computed-locally values that pointed at the document base prefix, not the chunks.jsonl file). This keeps §E.2 "parse-as-first-stage" intact; the parse step runs inside the request task instead of a separate ``parse_worker`` queue process. Wave 4 follow-up may promote parse to ``q:parse`` once observed parse latency starts blocking HTTP requests; the sync path is acceptable for current latencies. Parse failure raises and propagates → HTTP 500 → no modality rows created (per architect ruling: "fail loudly, no half-state"). New integration test ``tests/integration/test_dispatch_with_parse.py`` pins the canonical post-fix flow: parse first → dispatch with chunks.jsonl path → modality workers reach ``status=ACTIVE`` AND ``is_serving=TRUE`` (uses ``IndexingMode.INLINE`` so no lifespan / async queue needed; the same data-flow contract). Local gates: pytest tests/unit_test/ tests/integration/ tests/load/ --ignore=tests/unit_test/objectstore = 910 passed / 41 skipped / 0 failed (+1 new test). ruff check + format clean. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(celery T3.1 qdrant id): UUID5-derive Qdrant point id from chunk_id in worker_factory PR #1729 head 8ca396fa e2e-http-provider fa…
1 parent 07599fa commit 00ae644

110 files changed

Lines changed: 5652 additions & 12795 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/e2e-http-smoke.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ jobs:
6565
if: failure()
6666
run: |
6767
docker compose -f docker-compose.yml ps || true
68-
docker compose -f docker-compose.yml logs --no-color api celeryworker celerybeat postgres redis qdrant es || true
68+
docker compose -f docker-compose.yml logs --no-color api postgres redis qdrant es || true
6969
7070
- name: Stop Compose stack
7171
if: always()
@@ -170,7 +170,7 @@ jobs:
170170
if: failure()
171171
run: |
172172
docker compose -f docker-compose.yml ps || true
173-
docker compose -f docker-compose.yml logs --no-color api celeryworker celerybeat postgres redis qdrant es || true
173+
docker compose -f docker-compose.yml logs --no-color api postgres redis qdrant es || true
174174
175175
- name: Stop Compose stack
176176
if: always()

Makefile

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,6 @@ help:
3939
@printf " make stack-logs Tail stack logs\n\n"
4040
@printf "Services\n"
4141
@printf " make serve-api Run backend API locally\n"
42-
@printf " make serve-worker Run celery worker locally\n"
43-
@printf " make serve-beat Run celery beat locally\n"
44-
@printf " make serve-flower Run flower locally\n"
4542
@printf " make serve-web Run frontend locally\n\n"
4643
@printf "Tests\n"
4744
@printf " make test-all Run unit + integration + pytest E2E suites\n"
@@ -171,19 +168,15 @@ stack-logs:
171168
##################################################
172169

173170
# Local development services
174-
.PHONY: serve-api serve-web serve-worker serve-flower serve-beat
171+
# Wave 3 T3.1 chunk 3: ``serve-worker`` / ``serve-beat`` / ``serve-flower``
172+
# targets removed alongside the Celery infrastructure deletion. The
173+
# in-process ``aperag.indexing`` runtime (worker pool + reconciler +
174+
# cleanup loops) is spawned by the FastAPI lifespan when ``serve-api``
175+
# starts, so no separate worker / beat / monitoring command is needed.
176+
.PHONY: serve-api serve-web
175177
serve-api: db-migrate
176178
uvicorn aperag.app:app --host 0.0.0.0 --log-config scripts/uvicorn-log-config.yaml
177179

178-
serve-worker:
179-
celery -A config.celery worker -B -l INFO --pool=threads --concurrency=16
180-
181-
serve-beat:
182-
celery -A config.celery beat -l INFO
183-
184-
serve-flower:
185-
celery -A config.celery flower --conf/flowerconfig.py
186-
187180
serve-web:
188181
cd ./web && yarn dev
189182

aperag/app.py

Lines changed: 122 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
import asyncio # noqa: E402
17+
1618
from aperag.config import settings
1719
from aperag.observability import (
1820
bind_observability_context,
@@ -205,9 +207,126 @@ async def initialize_user_quota(self, user_id: str) -> None:
205207

206208

207209
async def combined_lifespan(app: FastAPI):
208-
"""Combined lifespan manager for the API and MCP server."""
209-
async with mcp_app.lifespan(app):
210-
yield
210+
"""Combined lifespan manager for the API + MCP server + indexing runtime.
211+
212+
The indexing runtime (Wave 3 T3.1 wire-in) launches the per-modality
213+
worker pool + reconciler + cleanup loop only when
214+
``settings.indexing_mode == "async"``. In ``inline`` mode the
215+
upload-side ``dispatch_indexing(mode=INLINE)`` runs derive + sync +
216+
cutover within the request coroutine, so no background workers are
217+
needed (per design pack §L Tier-1 deployment).
218+
219+
The runtime is started as background asyncio tasks (not subprocesses)
220+
so a single FastAPI process owns its workers — matches the §E.2
221+
"one Python process per modality" architecture for the in-process
222+
deployment topology. Tier-3 horizontal scale-out runs separate
223+
worker processes; that wiring lives in a future ops launcher.
224+
"""
225+
indexing_runtime_tasks: list[asyncio.Task[None]] = []
226+
indexing_shutdown: asyncio.Event | None = None
227+
228+
if settings.indexing_mode == "async":
229+
# Lazy imports — pulling the indexing runtime symbols at app
230+
# start-up time keeps ``aperag/app.py`` cold-start fast and
231+
# confines the import surface to the wired branch.
232+
from aperag.config import sync_engine
233+
from aperag.indexing import (
234+
InMemoryWorkQueue,
235+
run_cleanup_loop,
236+
run_fulltext_worker,
237+
run_graph_worker,
238+
run_reconcile_loop,
239+
run_summary_worker,
240+
run_vector_worker,
241+
run_vision_worker,
242+
)
243+
244+
indexing_shutdown = asyncio.Event()
245+
# Single process-local InMemoryWorkQueue is the default
246+
# transport for the in-process topology. Tier-3 production
247+
# swaps this for a Redis-backed WorkQueue (RPUSH / BLPOP) by
248+
# injecting via app state at deploy time — Wave 3 follow-up.
249+
queue = InMemoryWorkQueue()
250+
engine = sync_engine
251+
252+
# Worker factory — per-task lazy construction. The async
253+
# worker entrypoints (``run_*_worker``) call this closure on
254+
# every BLPOP'd payload to materialise the concrete
255+
# :class:`ModalityWorker` for that ``(collection, modality)``
256+
# pair. ``ProductionWorkerFactory`` resolves the collection
257+
# row, picks the right backend (Qdrant / Elasticsearch +
258+
# configured embedder / completion model), and constructs the
259+
# worker — all backed by the existing build helpers
260+
# (``get_collection_embedding_service_sync`` /
261+
# ``get_vector_db_connector`` / ``get_object_store``) so this
262+
# is composition, not re-implementation. Construction failures
263+
# raise :class:`WorkerFactoryError`; the orchestrator runner
264+
# catches that and finalises the row FAILED so §I.2 retry
265+
# picks it up. Per architect msg=7782ebe0.
266+
from aperag.indexing.worker_factory import ProductionWorkerFactory
267+
268+
worker_factory = ProductionWorkerFactory(engine=engine)
269+
270+
worker_kwargs = dict(
271+
engine=engine,
272+
queue=queue,
273+
worker_factory=worker_factory,
274+
shutdown=indexing_shutdown,
275+
)
276+
indexing_runtime_tasks.append(asyncio.create_task(run_vector_worker(**worker_kwargs)))
277+
indexing_runtime_tasks.append(asyncio.create_task(run_fulltext_worker(**worker_kwargs)))
278+
indexing_runtime_tasks.append(asyncio.create_task(run_graph_worker(**worker_kwargs)))
279+
indexing_runtime_tasks.append(asyncio.create_task(run_summary_worker(**worker_kwargs)))
280+
indexing_runtime_tasks.append(asyncio.create_task(run_vision_worker(**worker_kwargs)))
281+
indexing_runtime_tasks.append(
282+
asyncio.create_task(
283+
run_reconcile_loop(
284+
engine=engine,
285+
queue=queue,
286+
shutdown=indexing_shutdown,
287+
)
288+
)
289+
)
290+
indexing_runtime_tasks.append(
291+
asyncio.create_task(
292+
run_cleanup_loop(
293+
engine=engine,
294+
workers={}, # T3.3 follow-up: pass concrete worker registry
295+
shutdown=indexing_shutdown,
296+
)
297+
)
298+
)
299+
300+
# Stash on app state so request handlers can dispatch via the
301+
# same queue / engine the workers consume.
302+
app.state.indexing_queue = queue
303+
app.state.indexing_engine = engine
304+
305+
# Service-layer callers (aperag/domains/**) consume the same
306+
# triple through the process-wide IndexingRuntime singleton —
307+
# they don't have a Request handle for app.state. Workers map
308+
# is empty in the async-default deployment; T3.3 follow-up
309+
# populates concrete factories per modality.
310+
from aperag.indexing.runtime import IndexingRuntime, set_runtime
311+
312+
set_runtime(IndexingRuntime(engine=engine, queue=queue, workers={}))
313+
else:
314+
app.state.indexing_queue = None
315+
app.state.indexing_engine = None
316+
from aperag.indexing.runtime import set_runtime
317+
318+
set_runtime(None)
319+
320+
try:
321+
async with mcp_app.lifespan(app):
322+
yield
323+
finally:
324+
if indexing_shutdown is not None:
325+
indexing_shutdown.set()
326+
if indexing_runtime_tasks:
327+
# Drain in-flight worker / reconciler / cleanup loops with
328+
# a short grace window so a SIGTERM does not abort mid-task.
329+
await asyncio.gather(*indexing_runtime_tasks, return_exceptions=True)
211330

212331

213332
# Create the main FastAPI app with combined lifespan

0 commit comments

Comments
 (0)