Skip to content

Commit 19d3d70

Browse files
earayuclaudeBryce
authored
feat(celery Wave 4): real backends + 11 production-readiness items (#1731)
* feat(celery Wave 4 T8 chunk 1): PostgresLineageGraphStore reference adapter First chunk of Wave 4 T8 (graph 3-backend wiring + legacy hard-cut 2nd round). Per architect msg=803a2757 plan, lands the Postgres adapter alone so the schema mapping for the §D.3.5 ``LineageGraphStore`` Protocol can be ratified before mirroring into Neo4j / Nebula in chunks 2 / 3, with the legacy ``aperag/domains/knowledge_graph/graphindex/storage/*`` hard-cut deletion in chunk 4. Schema design (two tables, JSONB SET storage): * ``aperag_lineage_entity (collection_id, name, type, source_lineage, description_parts, gmt_created, gmt_updated)`` — composite primary key on ``(collection_id, name)`` keeps tenants isolated; both JSONB columns default to ``'[]'::jsonb``. Composite uniqueness named ``uq_lineage_entity_collection_name`` so the adapter can ``ON CONFLICT ON CONSTRAINT`` for atomic upserts. * ``aperag_lineage_relation (collection_id, source, target, type, description, evidence_lineage, description_parts, ...)`` — same shape, primary key on the four-tuple, named constraint ``uq_lineage_relation_collection_triple``. * ``description`` on the relation row is the LATEST canonical description (so reads do not always have to walk ``description_parts``); per-(doc, version) slices live in ``description_parts`` JSONB array. §D.3.5 Protocol semantics mapped to single-statement SQL: * ``find_*_with_lineage(document_id)`` — JSONB containment ``source_lineage @> jsonb_build_array(jsonb_build_object('document_id', $d))`` scans the SET in one expression. * ``remove_*_lineage_member(..., document_id)`` — single ``UPDATE ... SET source_lineage = (SELECT jsonb_agg(elem) ... WHERE elem->>'document_id' != $d)`` so concurrent syncs cannot race on read-modify-write. * ``upsert_*_with_lineage`` — ``INSERT ... ON CONFLICT DO UPDATE`` with a strip-then-append expression that replaces any element with the same ``(document_id, parse_version)`` key and appends the new one. Matching ``DescriptionPart`` mirrors the same shape. * ``gc_*_if_orphan`` — ``DELETE ... WHERE jsonb_array_length(...) = 0``. * ``get_entity`` / ``get_relation`` — ORM SELECT with ``_row_to_entity_with_lineage`` / ``_row_to_relation_with_lineage`` materialisers that reconstruct the canonical ``EntityWithLineage`` / ``RelationWithLineage`` dataclasses. Tenant isolation: the store is constructed with the ``collection_id`` it serves; all queries filter on it. The worker_factory caches one ``PostgresLineageGraphStore`` instance per collection (chunk 4 follow-up), so the in-process singleton pattern is replaced by a per-collection pool. Hard-cut second-round target: this module replaces ``aperag/domains/knowledge_graph/graphindex/storage/postgres.py`` when chunk 4 deletes the legacy storage subpackage. The legacy ``GraphStore.upsert_entities(collection_id, [Entity])`` interface is fundamentally entity-level merge-append; the new lineage Protocol is SET-element add-or-replace, so this is a clean rewrite, not a wrap. Pending in subsequent chunks (per architect msg=803a2757 plan): * chunk 2 — Neo4j adapter mirror. * chunk 3 — Nebula adapter mirror with per-entity Redis lock activation. * chunk 4 — alembic migration for the two tables + factory dispatch on ``collection.config.graph_backend_type`` + ``aperag/indexing/worker_factory.py:_build_graph_worker`` rewire + delete legacy ``aperag/domains/knowledge_graph/graphindex/ storage/*`` + GraphIndexService consumer migration. * contract tests round out each adapter chunk. Local gates: import smoke (``from aperag.indexing.graph_storage. postgres import PostgresLineageGraphStore``) OK. ``ruff check + format --check`` clean. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery Wave 4 T4 chunk 1): RedisWorkQueue + lifespan dispatch on INDEXING_QUEUE_BACKEND Per architect msg=803a2757 owner assignment + Wave 4 backlog #6 (Real Redis WorkQueue replacing single-process InMemoryWorkQueue per design pack §E.2 + Wave 1+2 gap report msg=fab88774). aperag/indexing/orchestrator.py: - Add ``RedisWorkQueue`` class implementing the existing ``WorkQueue`` Protocol. RPUSH on enqueue + BLPOP-with-timeout on dequeue, JSON- encoded payloads. Per-modality keying as ``q:indexing:<modality>`` so each modality has its own BLPOP queue. Lazy ``redis.asyncio.Redis`` client (one per process), ``close()`` for lifespan-shutdown drain. - Includes ``qsize()`` inspector helper for the §J.1 ``queue_depth`` SLI emitter (Wave 4 #8 follow-up will wire OTLP) + tests. aperag/indexing/__init__.py: - Re-export ``RedisWorkQueue`` alongside ``InMemoryWorkQueue`` + ``WorkQueue`` so consumers can dispatch on the protocol without importing the orchestrator module directly. aperag/config.py: - Add ``INDEXING_QUEUE_BACKEND`` setting (default ``inmemory`` for backward-compat with Wave 3 single-pod deployments; production multi-pod sets ``redis``). - Add ``INDEXING_QUEUE_REDIS_URL`` setting; auto-derive ``redis://...:port/2`` from existing ``REDIS_HOST/PORT/USER/PASSWORD`` when unset, using a separate logical DB (db=2) from broker (db=0) and memory (db=1) so BLPOP queues never collide with cache / memory backends. aperag/app.py: - Lifespan dispatch: when ``settings.indexing_queue_backend == "redis"`` instantiate ``RedisWorkQueue(settings.indexing_queue_redis_url)``, else ``InMemoryWorkQueue()``. - Shutdown path calls ``queue.close()`` (guarded by ``hasattr``) so the Redis client's connection pool drains cleanly on SIGTERM. - Add ``import contextlib`` for the suppress-on-close guard. tests/integration/test_redis_workqueue.py (NEW, +6 tests): - Pin Wave 4 #6 acceptance: roundtrip / timeout-returns-None / per-modality isolation / qsize / close-and-reconnect / **multi- consumer atomic demux** (the §E.2 multi-process scale-out invariant — N consumers BLPOP'ing N pushed payloads see N disjoint payloads, no duplicates, no losses). - Skip-when-Redis-unreachable so the lint-and-unit CI lane stays green; the e2e-http-compose lane has Redis up so these run there. - Per-test key prefix (``q:test:<uuid>:<modality>``) avoids cross- test / cross-CI-run leakage on shared Redis DB. Production-readiness invariant declaration (per architect Wave 3 lesson `feedback_production_readiness_invariant.md`): - must-be-real: ``RedisWorkQueue`` is the production transport; multi- process scale-out depends on it - may-be-gated: ``InMemoryWorkQueue`` remains as TEST/SINGLE-POD ONLY default (operators must set ``INDEXING_QUEUE_BACKEND=redis`` for multi-pod correctness) - follow-up Wave: none — T4 fully resolves Wave 4 #6 Local gates: - pytest tests/integration/test_redis_workqueue.py → 6 passed (against real Redis at db=15) - pytest tests/unit_test/indexing/ tests/load/ → 142 passed - ruff check + format --check clean Wave 4 chunk 1 of 4 (T4). Next chunks (chenyexuan, parallel with Bryce T8): T2 cleanup fan-out / T3 real parser / T6 OTLP emitter. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery Wave 4 T8 chunk 2): Neo4jLineageGraphStore mirror adapter Mirror PostgresLineageGraphStore (chunk 1 reference) to Neo4j 5.x via Cypher parallel-list encoding — Neo4j node properties cannot hold LIST<MAP>, so the JSONB-shaped lineage SET is split into one list<string> of JSON-encoded members plus parallel list<string> properties for the dedup key fields (doc_ids, parse_versions). The strip-then-append upsert is a single Cypher MERGE … SET statement so two concurrent syncs against the same row cannot race on read-modify-write — Neo4j's MERGE row-lock makes the trailing SET atomic. APOC is not required. Six contract integration tests against a real Neo4j 5.x instance (skip-if-COMPAT_NEO4J_URI-unset) lock the §D.3.5 lineage SET semantics: roundtrip / multi-doc lineage / doc re-parse v1→v2 via remove+upsert orchestrator flow / orphan GC / relation-independent lineage / per-collection_id tenant isolation. Production-readiness invariant declaration (per Wave 3 lesson #10): - must-be-real: Neo4jLineageGraphStore on AsyncDriver - may-be-gated: InMemoryLineageGraphStore stays the worker_factory default until chunk 4 wires the backend dispatch - fully-resolves: Wave 4 T8 chunk 2 (Neo4j adapter) Legacy aperag/domains/knowledge_graph/graphindex/storage/neo4j.py is deleted in chunk 4 alongside the worker_factory wire-in (per architect msg=0aea9edf hard-cut). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery Wave 4 T6 chunk 1): OTLPMetricsEmitter + MeterProvider wire + lifespan dispatch on INDEXING_METRICS_EMITTER Wave 4 backlog #8 — wire the four §J.1 SLIs (index_lag_seconds / index_failure_total / index_success_total / queue_depth / worker_utilization) onto a real OpenTelemetry SDK MeterProvider so operators running multi-pod production deployments actually see indexing health on their OTLP collector instead of silently dropping samples through NoopMetricsEmitter. Production-readiness invariant three-layer declaration (per Wave 3 lesson #10): - must-be-real: aperag.indexing.OTLPMetricsEmitter materialises real SDK Counter / Gauge instruments via opentelemetry.metrics.get_meter and forwards every gauge / counter call to the configured exporter. - may-be-gated: NoopMetricsEmitter remains the INDEXING_METRICS_EMITTER default and silently drops every sample — TEST / dev / single-machine only; production multi-pod operators must opt into INDEXING_METRICS_EMITTER=otlp to ship data to the collector. - fully-resolves: Wave 4 backlog #8. Implementation: - aperag/observability/metrics.py adds init_metrics_provider(config) / shutdown_metrics_provider(). configure_metrics() now drives the SDK MeterProvider install (PeriodicExportingMetricReader wrapping OTLPMetricExporter, http/protobuf or grpc per config.otlp_protocol) whenever config.metrics_enabled. Without this, the OTel global provider is _ProxyMeterProvider and every metric call is a no-op. - aperag/indexing/observability.py adds OTLPMetricsEmitter — caches Counter / Gauge instruments per metric name and tolerates instrument creation / set / add failures so a misconfigured exporter degrades to no-op instead of crashing indexing. Constructor accepts an injectable meter for tests because opentelemetry.metrics.set_meter_provider is one-shot per process and cannot be reset between test cases. - aperag/config.py adds INDEXING_METRICS_EMITTER setting (default noop; production sets otlp). - aperag/indexing/runtime.py extends IndexingRuntime with a metrics_emitter field (default NoopMetricsEmitter via field default_factory) so service-layer callers and lifespan-spawned workers can pull a single emitter from the runtime singleton. - aperag/app.py lifespan dispatches on settings.indexing_metrics_emitter: otlp → OTLPMetricsEmitter() / else → NoopMetricsEmitter(); stashes on app.state.indexing_metrics_emitter and feeds the runtime singleton. tests/integration/test_otlp_metrics_emitter.py (NEW, 7 tests): - counter round-trip: SDK MeterProvider with InMemoryMetricReader sees the running total, attribute-keyed. - gauge round-trip: Gauge instrument latest-value-wins per attribute set. - per-modality isolation: VECTOR + SUMMARY queue_depth fan out to separate data points. - instrument reuse: repeated emits for the same metric name reuse one SDK instrument handle. - init_metrics_provider endpoint guard: short-circuits when OTEL_EXPORTER_OTLP_ENDPOINT missing, app keeps booting. - init_metrics_provider idempotent: second call returns True without re-initialising. - default meter resolves global: emitter constructed without a meter binds to opentelemetry.metrics.get_meter("aperag.indexing") and tolerates the proxy provider so Tier-1 boot before OTLP collector is reachable does not crash. Local gates: - ruff check + format clean (7 files) - pytest tests/integration/test_otlp_metrics_emitter.py: 7 passed - pytest tests/unit_test/indexing/ tests/load/: 142 passed - pytest tests/unit_test/ (sans env-dep moto): 861 passed / 27 skipped Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery Wave 4 T6 chunk 1 follow-up): wire shutdown_metrics_provider into lifespan finally for graceful OTLP drain Addresses huangheng pass-1 observation A (msg=5d450300): OTLP MeterProvider was never flushed on lifespan shutdown, so the PeriodicExportingMetricReader could lose pending metric samples on SIGTERM before the process exits. Mirrors the T4 ``queue_obj.close()`` graceful shutdown pattern. ``shutdown_metrics_provider`` is a no-op when the SDK MeterProvider was never installed (default ``INDEXING_METRICS_EMITTER=noop`` / OTLP endpoint missing) so single-machine and dev deployments are unaffected. ``asyncio.to_thread`` wrap because the OTel SDK ``shutdown()`` blocks while draining the exporter pool. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery Wave 4 T3 chunk 1): wire DocParser into parse_document for real PDF/Word/image inputs Wave 4 backlog #4 — replace the T1.1 simulator's UTF-8 markdown-only shortcut with a real DocParser dispatch on non-text extensions so production uploads of PDF / DOCX / PPTX / XLSX / EPUB / HTML / image files actually parse instead of raising "simulator parser only handles UTF-8 markdown". The simulator path stays for ``.md`` / ``.markdown`` / ``.txt`` / no-extension inputs so existing unit tests + dev workflows keep working unchanged. Production-readiness invariant three-layer declaration (per Wave 3 lesson #10): - must-be-real: aperag.docparser.doc_parser.DocParser chain dispatches on extension and runs the real MarkItDown / MinerU / ImageParser / AudioParser stack on production deployments. ``parser_config`` forwarded so per-collection MinerU API token / use_markitdown toggles flow through. - may-be-gated: the simulator's UTF-8 markdown decode stays the default for text-only inputs; tests that pass markdown bytes without a filename hint are unchanged. - partially-resolves: Wave 4 backlog #4. Chunk 2 (next session) promotes the parser to its own ``q:parse`` async queue per design pack §E.2 so an upload handler no longer blocks on a 30-second OCR run inside the request thread. Implementation: - aperag/indexing/parser.py: * New ``source_filename`` + ``parser_config`` parameters on ``parse_document``. ``_normalise_extension`` + ``_SIMULATOR_EXTENSIONS`` drive the dispatch decision. * New ``_docparser_extract_markdown`` helper materialises the bytes into a tempfile (DocParser only accepts a real path, not a stream), runs ``DocParser.parse_file``, concatenates every produced ``MarkdownPart``, and unlinks the tempfile in a try/finally so a raised ``ParserError`` does not leak files. Image-/audio-only inputs (no MarkdownPart) emit empty markdown so the artifacts still exist + the chunks/outline pipeline produces zero chunks (vision modality consumes the original asset directly per §C.6). * DocParser import is lazy inside ``parse_document`` per the same discipline as ``compute_parse_version`` (avoids pulling MarkItDown / MinerU / pikepdf at indexing-package import time). * Unsupported extensions raise a clear ``ValueError`` with the DocParser supported list embedded so callers can diagnose; we do NOT fall back silently to the simulator (Wave 3 lesson — silent-fallback masks wiring bugs). - aperag/domains/knowledge_base/service/document_service.py: * ``_create_or_update_document_indexes`` passes ``source_filename= object_path`` so the upload's ``user-<u>/<col>/<doc>/original<ext>`` object key drives the dispatch; non-markdown uploads now route through DocParser instead of dying on the UTF-8 decode. tests/integration/test_real_parser_dispatch.py (NEW, 7 tests): - markdown simulator path regression (no filename + ``.md`` filename) - simulator rejects non-UTF-8 with no extension hint (clear error, not silent corruption) - HTML routes through DocParser → MarkItDown → MarkdownPart → outline + chunks (full chain, no extra system deps) - unsupported extension surfaces ``ValueError`` with supported-list - empty PDF dispatches through DocParser (verifies the path runs even when the parser produces no markdown body — pikepdf-only, graceful skip without it) - .docx round-trip when python-docx is installed (skip otherwise so CI hosts without the optional dep stay green) Local gates: - ruff check + format clean (3 files) - pytest tests/integration/test_real_parser_dispatch.py: 6 passed, 1 skipped (python-docx) - pytest tests/unit_test/indexing/ tests/load/ tests/integration/{otlp,real_parser_dispatch}: 155 passed, 1 skipped — including all T1.1 simulator regression tests still green Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery Wave 4 T8 chunk 3): NebulaLineageGraphStore mirror adapter Mirror PostgresLineageGraphStore (chunk 1 reference) and Neo4jLineageGraphStore (chunk 2) over Nebula 3.x. Nebula's property type system has neither LIST<MAP> nor parallel-list-comprehension support; the §D.3.5 lineage SET ports via a JSON STRING property encoding (each SET serialises into one string column holding a JSON array). Reads parse the JSON, writes mutate in Python and write the new JSON back — inherently a read-modify-write loop. Per architect msg=f2921ae0, the Nebula backend therefore activates the EntityLock injected at construction time around every read-modify-write (upsert / strip / GC). Postgres + Neo4j don't need this because their backends have native single-statement strip-then-append; Nebula does. InMemoryEntityLock for tests, RedisEntityLock for production multi-process worker pools. One Nebula SPACE per collection_id (natural Nebula tenancy boundary mirroring the legacy adapter); the store-instance binds to a collection_id at construction so SPACE selection is automatic. Schema-visibility retry handles "TagNotFound" + "EdgeNotFound" + "SpaceNotFound" + "No schema found for" — the heartbeat-propagation window after CREATE TAG / CREATE SPACE. Six contract integration tests against a real Nebula 3.8.0 instance (skip-if-COMPAT_NEBULA_HOSTS-unset) lock the §D.3.5 semantics: roundtrip / multi-doc lineage / doc re-parse v1→v2 via remove+upsert orchestrator flow + (doc, parse_v) dedup-key invariant / orphan GC / relation-independent lineage / per-collection_id tenant isolation via separate SPACE. Production-readiness invariant declaration (per Wave 3 lesson #10): - must-be-real: NebulaLineageGraphStore on nebula3.gclient.net.ConnectionPool + injected RedisEntityLock for multi-process serialisation - may-be-gated: InMemoryLineageGraphStore stays the worker_factory default until chunk 4 wires the backend dispatch - fully-resolves: Wave 4 T8 chunk 3 (Nebula adapter) Three backends (Postgres / Neo4j / Nebula) now mirror; chunk 4 finishes T8 with alembic + worker_factory wire + 3-backend contract fixture + delete legacy adapters + e2e production smoke. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery Wave 4 T3 chunk 2): q:parse async promotion — upload handler returns 202 + parse worker pool Promotes :func:`aperag.indexing.parse_document` off the HTTP request thread per design pack §E.2. Without this, every PDF / Word / image upload blocks the request for 30s+ on DocParser (MarkItDown / MinerU / OCR), destroying responsiveness. The new flow: * upload handler ``_create_or_update_document_indexes`` pushes a :class:`ParseDispatchPayload` onto ``q:parse`` and returns immediately (202-equivalent — pending visible via ``document.status``) * ``run_parse_worker`` (FastAPI lifespan asyncio task, concurrency 8 per §E.2) BLPOPs payloads, runs :func:`parse_document`, then fans out to the 5 per-modality queues via :func:`dispatch_indexing`. Deltas: * ``aperag/indexing/parse_orchestrator.py`` (new, +311) — ``ParseDispatchPayload`` + ``process_one_parse_task`` (read source → parse → optional rebuild-purge → dispatch) + ``run_parse_worker_loop`` + ``run_parse_worker`` thin entrypoint mirroring the per-modality ``run_*_worker`` shape. * ``aperag/indexing/orchestrator.py`` — ``WorkQueue`` protocol gains ``push_parse`` / ``pop_parse``; ``InMemoryWorkQueue`` + ``RedisWorkQueue`` (key ``q:parse``) implement them. * ``aperag/app.py`` lifespan starts ``run_parse_worker`` alongside the 5 modality workers + reconciler + cleanup. * ``aperag/domains/knowledge_base/service/document_service.py`` — ``_create_or_update_document_indexes`` swapped to ``push_parse`` (no more inline ``parse_document`` + ``dispatch_indexing``); new ``_resolve_parser_config_for_collection`` reads optional ``parser_config`` from ``collection.config`` so deployments can opt into MinerU / per-collection OCR without code changes. * ``tests/integration/test_parse_async_roundtrip.py`` (new) — 10 tests across single-task happy / failure paths, rebuild purge, parse-only, full async roundtrip (push_parse → parse worker → modality workers → ACTIVE), and payload to_dict/from_dict round-trip pinning. Failure semantics for chunk 2 minimal scope: a failed parse logs + drops the job (no DocumentIndex rows materialised). Wave 5 follow-up will extend the reconciler to detect "documents created N minutes ago without document_index rows" and re-enqueue. The per-modality §I.2 backoff still covers post-parse failures. Three-class tag (Wave 3 production-readiness invariant): * must-be-real: ``RedisWorkQueue.push_parse / pop_parse`` (Redis ``q:parse`` BLPOP backed by the existing T4 client) * may-be-gated: ``InMemoryWorkQueue.push_parse / pop_parse`` for unit + integration tests; private deployments still use ``q:parse`` via the same protocol * fully-resolves: Wave 4 backlog #4 (parser real PDF/Word/image + q:parse async promotion) — chunk 1 wired DocParser into ``parse_document``; chunk 2 takes parse off the HTTP thread. Local gates: * ``pytest tests/unit_test/indexing/ tests/integration/ tests/load/`` — 180 passed / 25 skipped (incl. 10 new ``test_parse_async_roundtrip``). * ``ruff check aperag/ tests/integration/test_parse_async_roundtrip.py`` — clean. * ``ruff format`` — applied. Pushed: rebased on b0bdc09 (Bryce T8 chunk 3 Nebula adapter). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery Wave 4 T8 chunk 4a): alembic migration mirror ORM 100% + drop relation description column Wave 4 T8 chunk 4a per docs/modularization/indexing-redesign-design-pack.md §D.3.5 + chunk 4 acceptance lock items 1 + 2 (architect msg=baf6618e / huangheng msg=b6f20096): * New alembic migration ``e7a3b9c2d1f6`` creates the two PostgreSQL tables that back PostgresLineageGraphStore (T8 chunk 1 msg=f0571f98): - ``aperag_lineage_entity`` (collection_id, name) PK + JSONB ``source_lineage`` / ``description_parts`` SETs + GIN containment index for ``find_entity_ids_with_lineage`` doc-id scans. - ``aperag_lineage_relation`` (collection_id, source, target, type) PK + JSONB ``evidence_lineage`` / ``description_parts`` SETs + GIN index on ``evidence_lineage``. * env.py registers the adapter's private ``_LineageGraphBase.metadata`` alongside the application's ``Base.metadata`` so autogen comparison sees both — alembic check returns "No new upgrade operations detected" (ORM 100% mirror invariant honored, schema drift = Wave 3 T3.1 lesson). * Dropped legacy ``description`` Text/string column from the relation row across all 3 backend adapters (Postgres + Neo4j + Nebula) — the per-document fragments in ``description_parts`` are the canonical source per §D.3.3 Option A; ``RelationWithLineage`` does not expose a standalone ``description`` field so the column had no read-path consumer. Cross-backend "ORM 100% mirror" stays honest. * PostgresLineageGraphStore SQL fixes surfaced by chunk 4a smoke: - ``ON CONFLICT (col1, col2)`` column-list form (vs. named constraint) so the upsert no longer depends on the redundant UniqueConstraint name (PG collapses PK + UC covering same columns into one index). - ``CAST(:document_id AS TEXT)`` inside ``jsonb_build_object`` so asyncpg can infer the parameter type (otherwise raises ``IndeterminateDatatypeError``). - ``get_entity`` / ``get_relation`` switched to per-column ``select(...)`` + ``row.attr`` access (ORM-instance row mapping via ``_mapping[Column]`` does not work under Core ``conn.execute``). Three-pass alembic test against real Postgres: Pass 1 — ``upgrade head`` → ``e7a3b9c2d1f6`` ✅ Pass 2 — ``check`` → "No new upgrade operations detected." ✅ Pass 3 — ``downgrade -1`` → tables dropped ✅ Pass 4 — ``upgrade head`` + ``check`` → green ✅ Local gates green: - ``ruff check ./aperag ./tests`` clean - ``ruff format --check ./aperag ./tests`` clean (501 files) - ``pytest tests/unit_test/indexing/`` 140 passed - ``pytest tests/integration/test_neo4j_lineage_graph_store.py`` 6 passed (real Neo4j 5.x; description-drop verified) - ``pytest tests/integration/test_nebula_lineage_graph_store.py`` 6 passed (real Nebula 3.8.0; description-drop verified) - PostgresLineageGraphStore inline smoke: PASS (entity + relation upsert / lineage SET preserve / strip-by-doc / GC / dedup-key / find-by-doc scan, all against real Postgres) Cross-backend mirror state after chunk 4a: | Backend | Entity desc col | Relation desc col | |----------|-----------------|-------------------| | Postgres | none (parts only) | none (parts only) — DROPPED | | Neo4j | none (parts only) | none (parts only) — DROPPED | | Nebula | none (parts only) | none (parts only) — DROPPED | Next: chunk 4b — worker_factory dispatch on ``collection.config.graph_backend_type ∈ {postgres, neo4j, nebula}`` + EntityLock injection (Nebula RedisEntityLock, Postgres + Neo4j no-op) + async driver cross-event-loop verify. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery Wave 4 T8 chunk 4b): worker_factory dispatch on graph_backend_type + EntityLock injection Wave 4 T8 chunk 4b per docs/modularization/indexing-redesign-design-pack.md §D.3.5 + chunk 4 acceptance lock items 3 + 5 + 6 (architect msg=baf6618e / huangheng msg=b6f20096): * Adds ``CollectionConfig.graph_backend_type`` Literal["postgres", "neo4j", "nebula"] (default "postgres" — the §D.3.5 reference adapter that shares the application's own PostgreSQL). * Refactors ``aperag/indexing/worker_factory.py`` graph builder to read ``collection.config.graph_backend_type`` and dispatch to the matching :class:`LineageGraphStore` adapter: - postgres → :class:`PostgresLineageGraphStore` bound to a shared process-wide ``AsyncEngine`` (lazily created from ``settings.database_url``, postgresql:// auto-promoted to postgresql+asyncpg://). Strip-then-append RMW is single-statement so InMemoryEntityLock no-op suffices. - neo4j → :class:`Neo4jLineageGraphStore` bound to a shared ``AsyncDriver`` (lazily created from ``settings.neo4j_uri`` / ``settings.neo4j_username`` / ``settings.neo4j_password``). Same single-statement RMW guarantee under MERGE row-lock so InMemoryEntityLock no-op suffices. - nebula → :class:`NebulaLineageGraphStore` bound to a shared sync ``ConnectionPool`` (32 connections). EntityLock injection is :class:`RedisEntityLock` (binding to ``settings.indexing_queue_redis_url`` via ``redis.asyncio.from_url``) so the read-modify-write loop serialises across worker processes (architect msg=f2921ae0 invariant). Falls back to InMemoryEntityLock only when ``indexing_queue_redis_url`` is unset (single-process tests). * Cross-event-loop verify (acceptance lock item 3): backend client singletons are created inside the builder thread (``asyncio.to_thread`` from ProductionWorkerFactory.__call__) but loop binding is deferred to first use — async engines / drivers attach to whatever event loop the worker's ``sync(...)`` coroutine runs on, which is the orchestrator loop. No ``asyncio.run`` near the factory. * "Wave 4 wiring T1 extractor" gate kept explicit even with backend wired: the no-op extractor stub is identity-checked in the builder so collections that opt into knowledge_graph today land on WorkerFactoryError → §I.2 retry-with-backoff (Wave 3 lesson #10: no silent ACTIVE-with-empty-graph). The gate self-disables when T1 PR replaces ``_no_op_extractor`` with the real LightRAG-style extractor. * Adds 9 unit tests in tests/integration/test_worker_factory.py: - graph_backend_type defaults to "postgres" when config absent - reads from pydantic attribute / dict / JSON-string config - rejects unknown backend with clear WorkerFactoryError - returns InMemoryEntityLock for postgres + neo4j (no Redis dep) - the T1-wiring gate raises WorkerFactoryError with "Wave 4 wiring" + "T1" in message even when backend is wired (the e2e Phase 1 smoke pin in chunk 4e relies on this exact message). * Adds ``_reset_graph_backend_singletons_for_tests()`` helper so fixtures can drop cached clients between runs (used by chunk 4c cross-backend contract test fixture). Local gates green: - ``ruff check ./aperag ./tests`` clean - ``ruff format --check ./aperag ./tests`` clean (503 files) - ``pytest tests/unit_test/indexing/ tests/integration/test_worker_factory.py tests/integration/test_neo4j_lineage_graph_store.py``: 152 passed, 6 skipped (Nebula skipped without COMPAT_NEBULA_HOSTS) Production-readiness three-class layer: - must-be-real: per-backend client singletons (AsyncEngine / AsyncDriver / ConnectionPool) + RedisEntityLock for Nebula - may-be-gated: InMemoryEntityLock fallback only when indexing_queue_redis_url is unset (single-process test/dev) - fully-resolves: chunk 4 acceptance items 3 (async cross-event-loop) + 5 (EntityLock injection) + 6 (factory dispatch on graph_backend_type) Next: chunk 4c — 6-case cross-backend contract test fixture parametrizing postgres + neo4j + nebula against the same Protocol invariants. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery Wave 4 T2): cleanup loop 5 modality singleton fan-out — per-row worker_factory Per Wave 4 backlog #3 + architect msg=c79e9a3f gap-report. Pre-T2 the cleanup loop ran with ``workers={}`` empty (placeholder from Wave 3 T3.1 chunk 3). Every row hit ``worker is None`` → ``backend_skipped``, so production cleanup was a silent no-op for backend artefacts: Qdrant points / ES docs / graph entities leaked forever after document or collection delete. The DB rows were dropped but the backend state was not, leaving the indexes with orphan vectors that search would still match. T2 wires the per-row factory the orchestrator already uses for dispatch (Bryce T8 chunk 4b shipped backend dispatch on ``collection.config.graph_backend_type``). The cleanup loop calls ``ProductionWorkerFactory.build_for_cleanup_row(row)`` for every row, getting the right per-(collection, modality) worker view. Deltas: * ``aperag/indexing/worker_factory.py`` — ``CleanupWorkerView`` (minimal :class:`ModalityWorker` shape with ``modality`` + either ``_backend`` for flat modalities or ``_store + _entity_lock`` for graph; ``derive`` / ``sync`` raise loudly so misrouting into the dispatch path surfaces immediately) + ``build_for_cleanup_row`` + ``_build_qdrant_cleanup_backend`` / ``_build_es_cleanup_backend`` / ``_build_cleanup_view_sync`` helpers. Bypasses dispatch-time gates that block construction but are irrelevant to deletion — graph "Wave 4 T1 extractor" gate (``_build_graph_worker:429``) and vision multimodal gate (``_build_vision_worker:349``) still raise for dispatch but cleanup must drop their backend artefacts even when those gates are active. * ``aperag/indexing/cleanup.py`` — ``WorkerFactoryForRow`` type alias + ``_resolve_cleanup_worker`` that prefers ``worker_factory(row)`` over ``workers.get(modality)``, gracefully catches :class:`WorkerFactoryError` (counts as ``backend_skipped`` + still drops the DB row so the cleanup index does not grow unboundedly while the operator triages the gate). All three cleanup entry points (``cleanup_orphan_parse_versions`` / ``cleanup_for_deleted_documents`` / ``cleanup_for_deleted_collections``) + ``run_cleanup_loop`` accept the optional ``worker_factory`` parameter alongside the legacy ``workers`` map. * ``aperag/indexing/runtime.py`` — ``IndexingRuntime.cleanup_worker_factory`` field so service-layer code (``_delete_document_indexes``) can read the same factory the lifespan installed. * ``aperag/app.py`` — lifespan wires ``run_cleanup_loop(worker_factory=worker_factory.build_for_cleanup_row)`` and stashes the bound method on ``IndexingRuntime``. * ``aperag/domains/knowledge_base/service/document_service.py`` — ``_delete_document_indexes`` forwards ``runtime.cleanup_worker_factory`` to the cleanup call so user-initiated document deletes also run the per-row factory. * ``tests/integration/test_cleanup_fan_out.py`` (new) — 7 tests across factory-wins-over-map, per-collection dispatch, factory raises ⇒ skip+drop, orphan parse_v GC with factory, collection cascade with factory, ``CleanupWorkerView`` derive/sync raise, workers-only backward compat. All factory-side cases use ``InMemoryVectorBackend`` / ``InMemoryFulltextBackend`` to drive the cleanup loop without standing up real Qdrant / ES. Three-class tag (Wave 3 production-readiness invariant): * must-be-real: ``ProductionWorkerFactory.build_for_cleanup_row`` returns real Qdrant / ES / LineageGraphStore-backed views per row * may-be-gated: tests inject closures that return InMemory backends * fully-resolves: Wave 4 backlog #3 (Cleanup loop 5 modality singleton fan-out) — production cleanup actually cleans backend artefacts now (was silent no-op pre-T2) Local gates: * ``pytest tests/unit_test/indexing/ tests/integration/`` — 192 passed / 25 skipped (incl. 7 new ``test_cleanup_fan_out``). * ``ruff check aperag/ tests/integration/test_cleanup_fan_out.py`` — clean. * ``ruff format`` — applied. Pushed: rebased on Bryce T8 chunk 4b ``d1713e2`` (graph backend dispatch dependency). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery Wave 4 T8 chunk 4c): cross-backend lineage graph store contract test fixture Wave 4 T8 chunk 4c per chunk 4 acceptance lock item 4 + architect msg=87e2b187 amendment (cross-event-loop scenario): * New ``tests/integration/test_lineage_graph_store_contract.py`` parametrizes the §D.3.5 ``LineageGraphStore`` Protocol contract across all three production backends (Postgres / Neo4j / Nebula) in a single 6-case fixture so any backend that drifts from spec fails here regardless of which adapter it is. Each backend skips its parametrize cell when the env-var is unset (lint-and-unit CI lane stays green; e2e-http-compose lane runs them all). 6 contract cases: 1. roundtrip_entity_with_one_lineage_member — basic upsert + read 2. two_documents_cite_same_entity_preserves_both_lineage — §D.3 cross-doc lineage SET coexistence 3. doc_re_parse_replaces_old_parse_version_member — §D.3.6 step 3 remove + upsert + (document_id, parse_version) composite dedup 4. remove_then_gc_orphan_entity — §D.3.2 phase-1 strip → phase-2 GC; orphan-only deletion 5. relation_lineage_set_independent_from_entity — relation evidence_lineage SET independent of entity source_lineage 6. tenant_isolation_collection_id_filters_all_queries — §H.2 per-store-instance binding tenant double-layer * Adds 7th case (per architect msg=87e2b187 chunk 4c amendment): ``test_cross_event_loop_construct_then_call`` parametrized across all three backends. Pins acceptance lock item 3 ("async driver wire cross-event-loop verify; no asyncio.run near factory") at the contract layer rather than relying on driver-vendor lazy bind behaviour. Forward-prevent for Wave 3 evaluation cross-loop bug msg=e1f23258 if a future driver upgrade flips to eager bind. The test mirrors ``ProductionWorkerFactory.__call__`` exactly: - sync per-process client creation (no loop binding) - per-collection adapter constructor inside ``asyncio.to_thread`` (sync ``__init__`` only) - first async op runs on the orchestrator event loop (lazy bind) * Surfaces + fixes a Nebula schema-visibility error fragment Layer 1 same-session: rapid SPACE drop/recreate cycles in the cross- backend fixture exposed a 5th + 6th propagation-window error variant ``Storage Error: Tag not found`` / ``Edge not found`` (storaged-side text spelling differs from metad's TagNotFound / EdgeNotFound). Added both fragments to ``_SCHEMA_VISIBILITY_ERROR_FRAGMENTS`` so retry catches them. Local gates green: - ``ruff check ./aperag ./tests`` clean - ``ruff format --check ./aperag ./tests`` clean (505 files) - ``pytest tests/integration/test_lineage_graph_store_contract.py`` 21 passed against real Postgres + Neo4j 5.x + Nebula 3.8.0 (6 cases × 3 backends + 3 cross-event-loop variants) - ``pytest tests/unit_test/indexing/ + test_worker_factory.py + test_cleanup_fan_out.py``: 159 passed Production-readiness three-class layer: - must-be-real: cross-backend Protocol contract enforced against real Postgres / Neo4j / Nebula - may-be-gated: backends skip when env-var unset (CI lint-and-unit lane stays green) - fully-resolves: chunk 4 acceptance item 4 (cross-backend contract test) + acceptance item 3 (cross-event-loop verify, locked at contract layer per architect amendment) The per-backend ``tests/integration/test_neo4j_lineage_graph_store.py`` and ``test_nebula_lineage_graph_store.py`` retain backend-specific extras (parallel-list encoding shape, schema-visibility retry path) that cannot be expressed as Protocol-level invariants; this contract file covers only Protocol-level invariants. Next: chunk 4d — delete legacy ``aperag/domains/knowledge_graph/graphindex/storage/{base,postgres,neo4j,nebula,connector}.py`` + grep-zero verify (Wave 3 hard-cut second round). Pending architect clarification on consumer migration scope (legacy graphindex/service.py + engine/indexer.py + integration.py still import the storage modules). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery Wave 4 T9): fulltext multi-backend adapter dispatch (collection.config.fulltext_backend_type) Mirrors the T8 chunk 4b graph backend dispatch shape (msg=27571f63, msg=a3eec27d) for the fulltext modality. Pre-T9 every collection hard-coded to Elasticsearch via ``settings.es_host``; T9 introduces a per-collection ``fulltext_backend_type ∈ {elasticsearch, opensearch}`` field with the same dispatch pattern so a deployment can run OpenSearch (open-licence alternative) without code changes. Both backends share the same Elasticsearch-compatible HTTP API, so the existing ``_ElasticsearchFulltextBackend`` adapter wraps either client unchanged. Deltas: * ``aperag/schema/common.py`` — ``CollectionConfig.fulltext_backend_type`` ``Literal["elasticsearch", "opensearch"]`` field, default ``"elasticsearch"`` so existing collections keep their pre-T9 behavior. * ``aperag/indexing/worker_factory.py`` — - ``_resolve_fulltext_backend_type(collection)`` mirrors ``_resolve_graph_backend_type``: pydantic attr / Mapping / JSON string shape coverage + clear ``WorkerFactoryError`` on unknown. - ``_build_fulltext_backend(*, backend_type, index_name)`` dispatcher that constructs the right driver client and wraps it in the shared ``_ElasticsearchFulltextBackend`` adapter. - ``_build_elasticsearch_client`` / ``_build_opensearch_client`` helpers — ``opensearch-py`` lazy-imported with the same gating pattern used by ``graph-neo4j`` / ``graph-nebula`` extras (clear ``WorkerFactoryError`` pointing at the ``fulltext-opensearch`` extra when the dependency is missing). - ``_build_es_cleanup_backend`` now also dispatches on the resolved backend type so cleanup hits the same backend a switched-over collection currently writes to. * ``tests/integration/test_worker_factory.py`` — 9 new tests: default → "elasticsearch" / pydantic attr × 2 backends / dict / JSON string / unknown raises / dispatch builds ES adapter / opensearch gate raises when driver missing / ES_HOST gate raises when unset. Mirrors the chunk 4b coverage shape exactly. Three-class tag (Wave 3 production-readiness invariant): * must-be-real: ``Elasticsearch`` + ``OpenSearch`` clients (drivers, not stubs); ``settings.es_host`` enforced for both. * may-be-gated: ``opensearch`` backend gates on ``opensearch-py`` presence — operator must opt-in via the extra. ``elasticsearch`` is the always-available default. * fully-resolves: Wave 4 backlog — fulltext multi-backend adapter dispatch pattern in place. Wave 5 follow-up may add MeiliSearch / Typesense by extending ``_VALID_FULLTEXT_BACKENDS`` + a new ``_build_*_client`` helper without touching dispatch / cleanup. Local gates: * ``pytest tests/unit_test/indexing/ tests/integration/`` — 201 passed / 25 skipped (incl. 9 new T9 tests in ``test_worker_factory.py``). * ``ruff check aperag/ tests/integration/test_worker_factory.py`` — clean. * ``ruff format`` — applied. Pushed: rebased on T2 ``97dbe61`` (cleanup fan-out) — chunk 4b graph dispatch + T2 cleanup factory + T9 fulltext dispatch all share the per-(collection, modality) factory pattern now. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery Wave 4 T8 chunk 4d+4e): spec amendments + Phase 1 production e2e smoke (ratify-ready) Wave 4 T8 chunk 4d narrowed scope (architect msg=87e2b187 Option C ruling) + chunk 4e (architect msg=da3012a4 / msg=87e2b187 / PM msg=067c18e5): CHUNK 4d (acceptance lock item 7, narrowed) Per architect Option C ruling: chunk 4d does NOT delete legacy ``aperag/domains/knowledge_graph/graphindex/storage/{base,postgres, neo4j,nebula,connector}.py`` because the legacy ``GraphStore`` 24- method LightRAG-style Protocol has many active callers in retrieval / curation paths that would break the build (Wave 3 lesson #9 hard-cut delete-side cascade). Instead chunk 4d locks an invariant: the new indexing pipeline (``aperag/indexing/*``) must NOT cross-reference the legacy storage modules. Verified via: grep -rn "from aperag.domains.knowledge_graph.graphindex.storage" aperag/indexing/ → 0 results. Locked. Legacy ``graphindex`` package elimination is re-scoped to Wave 5 follow-up (cross-cutting refactor with retrieval/curation migration to §G.5 read primitives). CHUNK 4e — spec amendments (5 items per architect ratify history) * §D.3.5.1 — explicit "(document_id, parse_version) composite dedup key" constraint, three-backend align (architect msg=baf6618e). Tested by chunk 4c case 3 ``test_doc_re_parse_replaces_old_parse_version_member``. * §H.5.1 — Redis logical-db assignment table (broker=0/memory=1/WorkQueue=2/Quota=3) so BLPOP queues do not collide with cache or broker (architect msg=baf6618e). * §H.5.2 — Nebula multi-process EntityLock invariant: when ``graph_backend_type=nebula`` with worker pool concurrency >= 2, ``INDEXING_QUEUE_REDIS_URL`` MUST be set to bind RedisEntityLock cross-process (architect msg=87e2b187). * §C.3.1 — ``collection.config.parser_config`` per-collection parser override description (architect msg=9a6de002 from chunk 4e amendment scope). * §K.7 — Wave 4 production-readiness section + chunk 4d narrowed scope explanation. * §K.8 — Wave 5 backlog list (legacy graphindex elimination + accumulated Wave-5 candidates from chunks T2/T3/T6/T8). CHUNK 4e — Phase 1 production e2e smoke New ``tests/integration/test_full_indexing_pipeline.py`` with two layers: * Layer 1 (always run, 4 tests): gate invariants verified against real ``ProductionWorkerFactory``. Even with chunk 4b backend dispatch wired, graph + vision modalities raise ``WorkerFactoryError`` with ``"Wave 4 wiring"`` (T1 extractor / T7 multimodal embedder gates remain effective). Plus dispatch- table cleanup-view verification: ``build_for_cleanup_row`` returns a non-None view for ALL 5 modalities (Wave 3 lesson #10 surgical-gate corollary — dispatch raises but cleanup doesn't, or partial-gated artefacts leak forever). * Layer 2 (gated by ``RUN_E2E_PHASE1_SMOKE=1``, 1 test): canonical full-pipeline contract per architect msg=da3012a4 — real Postgres + Redis + Qdrant + ES + OTel SDK; vector + fulltext + summary reach ACTIVE; graph + vision finalise FAILED with "Wave 4 wiring"; delete+cleanup roundtrip removes backend artefacts (Qdrant points / ES docs / lineage entities all 0). Skipped by default (requires stub model-provider fixture from e2e-http-compose lane scaffolding). Layer split keeps local-dev signal fast while preserving canonical CI contract. Wave 4 close-out (Phase 2 — after T1 + T7 land) flips Layer 1 to expect graph/vision ACTIVE. Local gates green: - ``ruff check ./aperag ./tests`` clean - ``ruff format --check ./aperag ./tests`` clean (506 files) - ``pytest tests/integration/test_full_indexing_pipeline.py + test_worker_factory.py``: 25 passed / 1 skipped (Layer 2) Production-readiness three-class layer: - must-be-real: Phase 1 gate invariants verified via real ``ProductionWorkerFactory``; cleanup view for all 5 modalities - may-be-gated: Layer 2 full-pipeline gated on ``RUN_E2E_PHASE1_SMOKE=1`` (e2e-http-compose lane only); legacy ``graphindex`` package elimination deferred to Wave 5 - fully-resolves: chunk 4 acceptance items 7 (narrowed) + 8 (5 spec amendments) + 9 (Phase 1 e2e smoke + delete+cleanup roundtrip) Wave 4 T8 chunk 4 SUMMARY (4a + 4b + 4c + 4d + 4e): - 4a: alembic migration + ORM mirror + drop relation description - 4b: factory dispatch on graph_backend_type + EntityLock - 4c: cross-backend contract test fixture + cross-loop scenario - 4d: grep-zero verify (narrowed Option C ruling) - 4e: 5 spec amendments + Phase 1 e2e smoke (this commit) All 9 chunk 4 acceptance items locked. Ready for architect direct ratify on chunks 4d + 4e (per msg=a3eec27d trigger 4d/4e lane). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * docs(celery Wave 4 T8 chunk 4e follow-up): §G.2.2.1 fulltext_backend_type spec amendment Wave 4 T8 chunk 4e 6th spec amendment (architect msg=eba26fc2 + PM msg=8069d807): the T9 push (chenyexuan, HEAD 944805e7) added ``CollectionConfig.fulltext_backend_type`` Literal["elasticsearch", "opensearch"] but the design pack §G.2.2 did not declare the user- facing config contract. Spec drift would let private deployments miss the field on copy-paste. Adds new §G.2.2.1 subsection to the design pack: * Document the field shape + default value * Document the OpenSearch lazy-import / fulltext-opensearch extra pattern (mirrors graph-neo4j / graph-nebula extras from chunk 4b) * Document the single-ES_HOST design decision (one fulltext cluster per deployment) + how to point ES_HOST at an OpenSearch endpoint * Note the extension pattern for future backends (Solr / Typesense / MeiliSearch) — same _VALID_FULLTEXT_BACKENDS + _build_*_client helper extension; dispatch / cleanup paths unchanged. Chunk 4e spec amendment scope is now 6 items (5 prior + this G.2.2.1 follow-up): 1. §D.3.5.1 — composite (document_id, parse_version) dedup key 2. §H.5.1 — Redis logical-db assignment table 3. §H.5.2 — Nebula multi-process EntityLock invariant 4. §C.3.1 — collection.config.parser_config per-collection override 5. §K.7+§K.8 — Wave 4 production-readiness + Wave 5 backlog 6. §G.2.2.1 — fulltext_backend_type backend dispatch (this commit) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * docs(celery Wave 4): T9 sweep A + B + C — fulltext multi-backend follow-up Wave 4 sweeps fold-in per architect msg=eba26fc2 + msg=803a2757 + huangheng pass-1 surface (msg=7f063f2c) + PM ruling (msg=...): the T7 / T9 follow-ups that should land alongside the dispatch chunks to keep the operator-facing docs + comments truthful. Three sweeps in this commit: * **Sweep A** (T7 #23 acceptance, doc lane) — ``docs/private-deployment.md:19`` Wave 3 release scope intro drops "vision" from the production-ready list because vision is Wave 4-gated (raises ``WorkerFactoryError`` until a real multimodal vision-LLM is configured). Listing it as production-ready was inherited from a draft and never resynced after the Wave 3 gate landed. Listing the modality there now would mislead operators into thinking ``enable_vision=true`` is supported on Wave 3 ship. * **Sweep B** (T9 #25 acceptance, doc lane) — ``docs/private-deployment.md:53-56`` Wave 4 backlog description resynced from the draft 5-item phrasing to the 11-item locked scope (graph adapters / extractor / parser / cleanup / contract tests / Redis WorkQueue / Redis QuotaBackend / OTLP / vision / fulltext multi-backend / chunk-4e amendments). Operators reading this section now see the real scope instead of an incomplete earlier checkpoint. * **Sweep C** (T7 #23 acceptance, code-comment lane) — ``aperag/indexing/worker_factory.py:_build_vision_worker._embed`` comment resynced. The pre-T9 comment claimed the call routed through the multimodal model "rather than the string-concat placeholder", but the body still composes ``f"{image_id}|{alt_text}"`` and feeds it to ``embed_query`` on whatever embedder the operator configured. The ``is_multimodal()`` gate above only verifies the embedder is configured as multimodal; it does not change the call shape. Comment now accurately describes the Wave 3 placeholder and points at T7 for the real image-bytes path. Sweep D (``_fulltext_search:350`` ``minimum_should_match`` calc gap) was deferred to chunk 4e Phase 1 e2e smoke per architect ruling — the chunk 4e multi-keyword smoke case will surface whether the 80% threshold over a 2N-clause should-set is actually a bug or just algebra fear; pre-fixing without verification was deemed over-engineering. No new tests — sweep A + B are docs only; sweep C is a comment update with no behaviour change. Ruff clean; T9 9-test suite still green (9 passed in ``test_worker_factory.py::*fulltext*``). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test(celery Wave 4 T8 chunk 4e follow-up): sweep D multi-keyword fulltext smoke (Layer 2 stub) Wave 4 T8 chunk 4e sweep D fold (architect msg=fdd53586 ruling + PM msg=89946df8): adds a Layer 2 stub case ``test_phase1_multi_keyword_fulltext_search_returns_hits`` to ``tests/integration/test_full_indexing_pipeline.py`` documenting the sweep D verification path. Architect msg=2721a5e7 concern D flagged a latent ``_fulltext_search:350`` ``minimum_should_match`` arithmetic gap — the 80% threshold over N×content + N×title (= 2N) should clauses may underflow on multi-keyword queries (per huangheng msg=fb64468c). The fix philosophy locked by architect msg=fdd53586: real-world verification beats algebraic pre-fix. Run the actual ES query semantics against a real ES instance with a real indexed document; if it passes, the latent risk did not materialise; if it fails, fix-forward in chunk 4e (or escalate per architect msg=fdd53586 ruling). Layer 2 (gated by RUN_E2E_PHASE1_SMOKE=1) — the implementation lives behind the same e2e-http-compose lane scaffolding as the canonical Phase 1 smoke test, since it needs a real ES instance + retrieval pipeline + indexed document fixture. Until the Wave 4 close-out PR wires that scaffolding, the test is a documented stub that points at the existing ``test_fulltext_roundtrip_fields.py`` path which exercises ``bulk_index + search`` round-trip on a real ES index (intermediate signal for the same invariant). Local gates green: - ``ruff check ./aperag ./tests`` clean - ``ruff format --check ./aperag ./tests`` clean (506 files) - ``pytest test_full_indexing_pipeline.py`` 4 passed / 2 skipped (Layer 2 canonical smoke + Layer 2 sweep D smoke) Chunk 4e acceptance scope is now 9 items (per architect msg=fdd53586 chunk 4e scope累积): 1-6: 6 spec amendments (locked across prior commits) 7. Phase 1 e2e smoke (vector+fulltext+summary ACTIVE + graph/vision WorkerFactoryError) — Layer 1 always-run + Layer 2 stub 8. delete + cleanup roundtrip (via Layer 1 cleanup-view dispatch test for all 5 modalities + Layer 2 stub) 9. multi-keyword fulltext search smoke (sweep D verify path) — Layer 2 stub (this commit) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * docs(celery Wave 4 T8 chunk 4e follow-up): add Wave 5 backlog item — e2e Layer 2 fixture activation Per architect msg=87e2b187 chunk 4d/4e ratify decision condition #3: the chunk 4e Phase 1 e2e smoke Layer 2 tests are currently ``pytest.skip(...)`` stubs (canonical full-pipeline + sweep D multi-keyword) because the embedders need a configured model-provider that local dev does not have. The contract is documented but the test does not actually execute end-to-end yet. Adds an explicit Wave 5 backlog item to §K.8: wire the e2e-http-compose lane stub model-provider fixture so Layer 2 tests run for real instead of skip. The fixture lives in the lane scaffolding (``tests/e2e_http/scripts/run_full.sh``) — extending it to expose a deterministic stub embedder + completion model is straightforward; gating Layer 2 activation behind that work keeps chunk 4e scope clean. Closes the third architect ratify condition; chunk 4d+4e now fully addressed (1: §G.2.2.1 amendment shipped in 9d75f61f, 2: sweep D smoke stub shipped in 919d3789→4d36c7fb rebase, 3: this Wave 5 backlog lock). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery Wave 4 T1): real LightRAG-style LLM graph extractor Wave 4 backlog T1: replaces the chunk 4b ``_no_op_extractor`` placeholder with an actual LLM-driven entity/relation extractor that calls the collection's configured completion model for each chunk and parses the JSON output into the new ``aperag.indexing.graph.EntityRecord`` / ``RelationRecord`` shapes. The chunk 4b "Wave 4 wiring T1" symbol-identity gate self-disables because ``_no_op_extractor`` is gone — the worker factory now wires ``build_collection_graph_extractor(collection)`` directly. * New ``aperag/indexing/graph_extractor.py`` (~280 LOC) implementing the closure factory: - Builds a per-collection LLM callable via the existing legacy ``build_collection_llm_callable`` (Wave 5 backlog will relocate this helper to ``aperag/indexing/llm.py`` per architect msg=87e2b187 chunk 4d Option C ruling). - Reads ``collection.config.knowledge_graph_config.entity_types`` + ``collection.config.language`` with tolerant readers (handles pydantic attr / dict / JSON-string config shapes). - Per-chunk LLM call with a 60s ``asyncio.wait_for`` timeout so a stuck LLM does not block the worker forever. - JSON response parser tolerates fenced ``\`\`\`json … \`\`\``` wrappers + skips individual malformed records (preserves the rest). * Failure semantics (per Wave 3 lesson #10 ship-incomplete-but-don't- silently-lie): - No completion model configured for the collection → factory raises :class:`WorkerFactoryError` with "completion model not configured" so the orchestrator finalises the row FAILED with operator-facing diagnostics. - Per-chunk LLM failures (malformed JSON / transient errors / etc.) log a warning and skip the chunk's entities/relations; other chunks still contribute. Mirrors the legacy LightRAG extractor failure semantics — one bad chunk does not poison the document. - Empty chunks (no ``text``) short-circuit without an LLM call. * worker_factory updates: - ``_build_graph_worker`` removes the ``_no_op_extractor`` symbol and the symbol-identity gate; constructs the extractor via the new builder. - The "Wave 4 wiring T1" message is gone from the codebase — the new failure mode for missing-completion-model rows is "graph extractor: completion model not configured" which the orchestrator surfaces in ``error_message``. * Test updates: - ``test_full_indexing_pipeline.py``: rename ``test_phase1_graph_modality_raises_wave4_wiring_t1_gate`` → ``test_phase1_graph_modality_raises_when_completion_model_missing`` and update the assertion to allow either old or new message (forward-compat for cherry-picks). - ``test_worker_factory.py``: rename ``test_build_graph_worker_raises_t1_wiring_gate`` → ``test_build_graph_worker_raises_when_completion_model_missing`` and update the assertion accordingly. - New ``test_graph_extractor.py`` (14 unit tests): cover the JSON parser invariants (well-formed / fenced / malformed / per-record skip), config readers (entity_types / language across 3 shapes), chunk-skip-on-empty-text, per-chunk failure isolation, and the completion-model-missing gate. Local gates green: - ``ruff check ./aperag ./tests`` clean - ``ruff format --check ./aperag ./tests`` clean (508 files) - ``pytest tests/integration/test_graph_extractor.py``: 14 passed - ``pytest tests/unit_test/indexing/ + test_full_indexing_pipeline.py + test_worker_factory.py``: 165 passed / 2 skipped (Layer 2 stubs) Production-readiness three-class layer: - must-be-real: ``build_collection_graph_extractor`` constructs a real LLM-driven extractor per collection - may-be-gated: legacy ``build_collection_llm_callable`` import will be relocated to ``aperag/indexing/llm.py`` in Wave 5 cross-cutting refactor (per architect Wave 5 backlog item) - fully-resolves: Wave 4 backlog T1 (real graph LLM extractor — the chunk 4b "Wave 4 wiring T1" gate self-disabled) Cross-task interaction: - chunk 4b's ``Wave 4 wiring T1`` gate is now obsolete; the Phase 1 e2e smoke tests are updated accordingly. - chunk 4e Layer 2 canonical full-pipeline test (currently a ``pytest.skip`` stub) can be activated by the Wave 5 e2e-http- compose lane fixture work — graph modality will reach ACTIVE when the real LLM is configured + the lane wires it up. Next: T5 (real Redis QuotaBackend Lua atomic) → T7 (real multimodal vision-LLM + vision modality production wiring). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery Wave 4 T5): wire RedisQuotaBackend into lifespan + IndexingRuntime Wave 4 backlog T5: the ``RedisQuotaBackend`` Lua-atomic token-bucket implementation has existed in ``aperag/indexing/quota.py`` since Wave 1 (per existing 411 LOC). T5 wires it through to the orchestrator runtime so multi-pod production deployments share §H.5 token state via Redis logical-db=3 (per chunk 4e §H.5.1 amendment) instead of each pod independently exhausting tenant quota with the :class:`InMemoryQuotaBackend` default. Changes: * ``aperag/config.py``: new ``INDEXING_QUOTA_BACKEND`` setting (default ``inmemory``; production multi-pod sets ``redis``). Mirrors the chunk 4 ``INDEXING_QUEUE_BACKEND`` / ``INDEXING_METRICS_EMITTER`` dispatch pattern. * ``aperag/indexing/runtime.py``: ``IndexingRuntime`` adds a ``quota_backend: QuotaBackend`` field with a default :class:`InMemoryQuotaBackend` (``QuotaPolicyRegistry`` empty — every tenant routes to fallback policy until operators populate the policy table). Existing callers that build the runtime without specifying ``quota_backend`` keep working. * ``aperag/app.py`` lifespan: dispatches on ``settings.indexing_quota_backend`` — when ``redis``, builds an ``redis.asyncio`` client at ``settings.indexing_queue_redis_url`` (decode_responses=False because the Lua script returns raw byte/int payloads), wraps it in :class:`RedisQuotaBackend`, and feeds the runtime. Stashes the underlying client on ``app.state.indexing_quota_redis`` so the lifespan ``finally`` can ``aclose()`` it on shutdown — mirrors the T4 RedisWorkQueue graceful close pattern. * The actual call sites (graph_extractor / summary modality / vision modality) calling ``await runtime.quota_backend.acquire(...)`` are Wave 5 follow-up — T5 lands the wiring contract today; the per- callsite acquire calls follow in the Wave 5 ``aperag/indexing/llm.py`` relocate batch (per architect msg=87e2b187 chunk 4d Option C ruling). Local gates green: - ``ruff check ./aperag ./tests`` clean - ``ruff format --check ./aperag ./tests`` clean (508 files) - ``pytest tests/integration/test_full_indexing_pipeline.py test_worker_factory.py test_graph_extractor.py + tests/unit_test/indexing/``: 179 passed / 2 skipped (Layer 2) Production-readiness three-class layer: - must-be-real: RedisQuotaBackend Lua-atomic acquire/release against shared Redis (db=3 per §H.5.1) - may-be-gated: InMemoryQuotaBackend default keeps single-pod deployments working; Wave 5 wires the per-callsite acquire - fully-resolves: Wave 4 backlog T5 (real Redis QuotaBackend Lua atomic — wiring contract closed; per-callsite consumer relocate follows Wave 5 ``aperag/indexing/llm.py`` task) Next: T7 (real multimodal vision-LLM + vision modality production wiring). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(celery Wave 4 T5+T7): db=3 spec compliance + T7 vision wiring scope amendment Two coordinated fixes addressing architect msg=c279a0ff (T5 db=3 violation fix-forward) + msg=8c456789 (T7 wiring scope clarification). T5 db=3 spec violation fix-forward (architect direct ratify pre-condition): * T5 chunk wiring (commit 5f42209d) used ``settings.indexing_queue_redis_url`` for the RedisQuotaBackend, but chunk 4e §H.5.1 amendment locked ``broker=0 / memory=1 / WorkQueue=2 / Quota+EntityLock=3`` — ``indexing_queue_redis_url`` is db=2 (WorkQueue), not db=3 (Quota). * Architect msg=c279a0ff catches the spec drift; chunk 4e §H.5.1 amendment text locked the invariant but my T5 code violated it. Fixes: * ``aperag/config.py``: new ``INDEXING_QUOTA_REDIS_URL`` setting + default-derive that builds ``redis://USER:PASS@HOST:PORT/3``. Mirrors the chunk 4 ``indexing_queue_redis_url`` derive pattern but on db=3. * ``aperag/app.py``: the T5 lifespan dispatch now binds the Redis client to ``settings.indexing_quota_redis_url`` (db=3) instead of the WorkQueue URL. Aligns code with §H.5.1 spec text. * ``aperag/indexing/worker_factory.py``: the chunk 4b ``_redis_entity_lock_singleton`` also moved from ``indexing_queue_redis_url`` (db=2) to ``indexing_quota_redis_url`` (db=3) — §H.5.1 specifies BOTH Quota AND EntityLock keyspaces live in db=3 (the ``indexing:graph:entity:<slot>`` keys are quota-adjacent + the test fixture / docker compose runs both off the same logical DB so production wiring should match). T7 Wave 4 wiring scope amendment (architect-side scope clarification): * ``docs/modularization/indexing-redesign-design-pack.md`` §G.2.5.1 documents the T7 wiring scope split: items 1+3 (multimodal embedding API surface + provider capability flag) close in Wave 4; item 2 (real PDF / image-source extraction in parser pipeline) defers to Wave 5 alongside the parser canonical schema unification (per huangheng T1 obs B). * The Wave 3 lesson #10 explicit-gate (chunk 4b vision gate) stays effective until item 1 lands the ``embed_image(bytes) -> list[float]`` API surface and the operator opts into a multimodal model. * Until item 2 lands, T7 ships the embedder API surface + gate behaviour but the actual ``vision/images/<image_id>.<ext>`` reads gracefully fallback — the operator sees a clean ``error_message`` in the DocumentIndex row noting the parser-side gap. Architect-side meta lesson (folded into ``feedback_spec_lock_grep_verify_caller.md`` via ratify msg trail): spec amendment lock "logical db assignment" type invariants must **simultaneously grep-verify** the current code binding, otherwise spec text locks the invariant while code keeps violating it. The chunk 4e §H.5.1 amendment ratify did not grep- verify the chunk 4b ``_redis_entity_lock_singleton`` URL choice — this T5 fix-forward closes both gaps. Local gates green: - ``ruff check ./aperag ./tests`` clean - ``ruff format --check ./aperag ./tests`` clean (508 files) - ``pytest`` 179 passed / 2 skipped (Layer 2 stubs) Production-readiness three-class layer: - must-be-real: db=3 logical isolation per §H.5.1 amendment - may-be-gated: T7 item 2 (parser image-source extraction) defers to Wave 5 - fully-resolves: T5 ``INDEXING_QUOTA_BACKEND=redis`` wiring + T7 wiring scope spec amendment Wave 4 backlog status after this commit: - T8 chunk 4 (4a-4e + follow-ups): closed ✅ - T1: closed ✅ - T5: closed ✅ (per-callsite consumer relocate → Wave 5) - T7: scope amendment + gate behaviour locked; multimodal embedding API surface + parser image extraction → Wave 5 - T2 / T3 / T4 / T6 / T9: closed by chenyexuan…
1 parent a7c8f2f commit 19d3d70

33 files changed

Lines changed: 8790 additions & 231 deletions

aperag/app.py

Lines changed: 136 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# limitations under the License.
1515

1616
import asyncio # noqa: E402
17+
import contextlib # noqa: E402
1718

1819
from aperag.config import settings
1920
from aperag.observability import (
@@ -232,23 +233,77 @@ async def combined_lifespan(app: FastAPI):
232233
from aperag.config import sync_engine
233234
from aperag.indexing import (
234235
InMemoryWorkQueue,
236+
NoopMetricsEmitter,
237+
OTLPMetricsEmitter,
238+
RedisWorkQueue,
235239
run_cleanup_loop,
236240
run_fulltext_worker,
237241
run_graph_worker,
242+
run_parse_worker,
238243
run_reconcile_loop,
239244
run_summary_worker,
240245
run_vector_worker,
241246
run_vision_worker,
242247
)
243248

244249
indexing_shutdown = asyncio.Event()
245-
# Single process-local InMemoryWorkQueue is the default
246-
# transport for the in-process topology. Tier-3 production
247-
# swaps this for a Redis-backed WorkQueue (RPUSH / BLPOP) by
248-
# injecting via app state at deploy time — Wave 3 follow-up.
249-
queue = InMemoryWorkQueue()
250+
# Wave 4 T4: dispatch on ``INDEXING_QUEUE_BACKEND`` setting
251+
# (default ``inmemory`` for backward-compat single-pod
252+
# deployments; production multi-pod sets ``redis`` to enable
253+
# BLPOP transport per design pack §E.2). InMemoryWorkQueue is
254+
# process-local — multi-pod deployments lose tasks pushed to
255+
# one process and BLPOP'd by another, so production must run
256+
# ``INDEXING_QUEUE_BACKEND=redis`` for correctness.
257+
if settings.indexing_queue_backend.lower() == "redis":
258+
queue = RedisWorkQueue(redis_url=settings.indexing_queue_redis_url)
259+
else:
260+
queue = InMemoryWorkQueue()
250261
engine = sync_engine
251262

263+
# Wave 4 T6: dispatch on ``INDEXING_METRICS_EMITTER`` setting
264+
# (default ``noop`` for backward-compat; production multi-pod
265+
# sets ``otlp`` to wire the four §J.1 SLIs onto the
266+
# ``MeterProvider`` configured by ``aperag.observability``).
267+
# NoopMetricsEmitter silently drops every sample, so operators
268+
# running Tier 2/3 production must explicitly opt into ``otlp``
269+
# — otherwise queue-backlog / failure-rate alerts on the
270+
# collector side never receive data.
271+
if settings.indexing_metrics_emitter.lower() == "otlp":
272+
metrics_emitter = OTLPMetricsEmitter()
273+
else:
274+
metrics_emitter = NoopMetricsEmitter()
275+
276+
# Wave 4 T5: dispatch on ``INDEXING_QUOTA_BACKEND`` setting
277+
# (default ``inmemory`` for backward-compat single-pod
278+
# deployments; production multi-pod sets ``redis`` so worker
279+
# processes share §H.5 token-bucket state via Redis logical
280+
# db=3 per §H.5.1 amendment). InMemoryQuotaBackend is process-
281+
# local — multi-pod deployments running ``inmemory`` would
282+
# have each pod's worker independently exhaust its tenant
283+
# quota, which silently breaks the per-tenant rate limit
284+
# invariant (§H.5).
285+
from aperag.indexing.quota import (
286+
InMemoryQuotaBackend,
287+
QuotaPolicyRegistry,
288+
RedisQuotaBackend,
289+
)
290+
291+
quota_registry = QuotaPolicyRegistry()
292+
if settings.indexing_quota_backend.lower() == "redis":
293+
try:
294+
from redis import asyncio as redis_asyncio
295+
except ImportError as exc: # pragma: no cover — redis is a base dep
296+
raise RuntimeError("INDEXING_QUOTA_BACKEND=redis but redis package not installed") from exc
297+
quota_redis = redis_asyncio.from_url(
298+
settings.indexing_quota_redis_url,
299+
encoding="utf-8",
300+
decode_responses=False,
301+
)
302+
quota_backend = RedisQuotaBackend(quota_redis, quota_registry)
303+
else:
304+
quota_redis = None
305+
quota_backend = InMemoryQuotaBackend(quota_registry)
306+
252307
# Worker factory — per-task lazy construction. The async
253308
# worker entrypoints (``run_*_worker``) call this closure on
254309
# every BLPOP'd payload to materialise the concrete
@@ -278,6 +333,32 @@ async def combined_lifespan(app: FastAPI):
278333
indexing_runtime_tasks.append(asyncio.create_task(run_graph_worker(**worker_kwargs)))
279334
indexing_runtime_tasks.append(asyncio.create_task(run_summary_worker(**worker_kwargs)))
280335
indexing_runtime_tasks.append(asyncio.create_task(run_vision_worker(**worker_kwargs)))
336+
337+
# Wave 4 T3 chunk 2: parse worker reads ``q:parse``, runs
338+
# :class:`DocParser`, and dispatches the per-modality jobs.
339+
# Without this, the upload handler's :func:`push_parse` call
340+
# would land in Redis with no consumer — the per-modality
341+
# rows would never get inserted and documents would stay
342+
# PENDING forever. The object store factory is async per the
343+
# production resolver signature; this closure adapts the
344+
# synchronous ``get_object_store`` helper into the
345+
# ``ObjectStoreFactory`` shape :func:`run_parse_worker`
346+
# expects.
347+
from aperag.objectstore.base import get_object_store
348+
349+
async def _resolve_object_store():
350+
return await asyncio.to_thread(get_object_store)
351+
352+
indexing_runtime_tasks.append(
353+
asyncio.create_task(
354+
run_parse_worker(
355+
engine=engine,
356+
queue=queue,
357+
object_store_factory=_resolve_object_store,
358+
shutdown=indexing_shutdown,
359+
)
360+
)
361+
)
281362
indexing_runtime_tasks.append(
282363
asyncio.create_task(
283364
run_reconcile_loop(
@@ -287,11 +368,17 @@ async def combined_lifespan(app: FastAPI):
287368
)
288369
)
289370
)
371+
# Wave 4 T2: cleanup loop now consumes a per-row worker
372+
# factory so each ``DocumentIndex`` row is cleaned against the
373+
# right per-(collection, modality) backend. Without this the
374+
# cleanup loop ran with ``workers={}`` and silently skipped
375+
# every backend delete (Qdrant points / ES docs / graph
376+
# entities leaked forever after document or collection delete).
290377
indexing_runtime_tasks.append(
291378
asyncio.create_task(
292379
run_cleanup_loop(
293380
engine=engine,
294-
workers={}, # T3.3 follow-up: pass concrete worker registry
381+
worker_factory=worker_factory.build_for_cleanup_row,
295382
shutdown=indexing_shutdown,
296383
)
297384
)
@@ -301,6 +388,13 @@ async def combined_lifespan(app: FastAPI):
301388
# same queue / engine the workers consume.
302389
app.state.indexing_queue = queue
303390
app.state.indexing_engine = engine
391+
app.state.indexing_metrics_emitter = metrics_emitter
392+
app.state.indexing_quota_backend = quota_backend
393+
# Wave 4 T5: stash the underlying Redis client (only when
394+
# ``INDEXING_QUOTA_BACKEND=redis``) so the lifespan finally
395+
# block can close it on shutdown — mirrors the T4 RedisWorkQueue
396+
# close lifecycle.
397+
app.state.indexing_quota_redis = quota_redis
304398

305399
# Service-layer callers (aperag/domains/**) consume the same
306400
# triple through the process-wide IndexingRuntime singleton —
@@ -309,10 +403,20 @@ async def combined_lifespan(app: FastAPI):
309403
# populates concrete factories per modality.
310404
from aperag.indexing.runtime import IndexingRuntime, set_runtime
311405

312-
set_runtime(IndexingRuntime(engine=engine, queue=queue, workers={}))
406+
set_runtime(
407+
IndexingRuntime(
408+
engine=engine,
409+
queue=queue,
410+
workers={},
411+
metrics_emitter=metrics_emitter,
412+
cleanup_worker_factory=worker_factory.build_for_cleanup_row,
413+
quota_backend=quota_backend,
414+
)
415+
)
313416
else:
314417
app.state.indexing_queue = None
315418
app.state.indexing_engine = None
419+
app.state.indexing_metrics_emitter = None
316420
from aperag.indexing.runtime import set_runtime
317421

318422
set_runtime(None)
@@ -327,6 +431,31 @@ async def combined_lifespan(app: FastAPI):
327431
# Drain in-flight worker / reconciler / cleanup loops with
328432
# a short grace window so a SIGTERM does not abort mid-task.
329433
await asyncio.gather(*indexing_runtime_tasks, return_exceptions=True)
434+
# Wave 4 T4: release the indexing queue's underlying connection
435+
# pool (Redis client owns one); InMemoryWorkQueue has no
436+
# ``close`` so guard with hasattr.
437+
queue_obj = getattr(app.state, "indexing_queue", None)
438+
if queue_obj is not None and hasattr(queue_obj, "close"):
439+
with contextlib.suppress(Exception):
440+
await queue_obj.close()
441+
# Wave 4 T5: release the quota Redis client (only present when
442+
# ``INDEXING_QUOTA_BACKEND=redis`` was selected at startup).
443+
# ``InMemoryQuotaBackend`` has no underlying client.
444+
quota_redis_obj = getattr(app.state, "indexing_quota_redis", None)
445+
if quota_redis_obj is not None:
446+
with contextlib.suppress(Exception):
447+
await quota_redis_obj.aclose()
448+
# Wave 4 T6: flush + shut down the OTLP MeterProvider so the
449+
# PeriodicExportingMetricReader drains any pending metric
450+
# samples before the process exits. Mirrors the T4 graceful
451+
# shutdown pattern and addresses huangheng pass-1 observation A
452+
# (msg=5d450300). ``shutdown_metrics_provider`` is a no-op when
453+
# the SDK MeterProvider was never installed (default
454+
# ``noop`` emitter / OTLP endpoint missing).
455+
from aperag.observability.metrics import shutdown_metrics_provider
456+
457+
with contextlib.suppress(Exception):
458+
await asyncio.to_thread(shutdown_metrics_provider)
330459

331460

332461
# Create the main FastAPI app with combined lifespan

aperag/config.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,69 @@ class Config(BaseSettings):
119119
# per design pack §L.
120120
indexing_mode: str = Field("async", alias="INDEXING_MODE")
121121

122+
# Indexing queue backend (Wave 4 T4 — replaces InMemoryWorkQueue
123+
# default with Redis BLPOP for multi-process scale-out per design
124+
# pack §E.2). Values:
125+
#
126+
# ``inmemory`` → ``aperag.indexing.InMemoryWorkQueue`` (Wave 1+2
127+
# default, single-process, asyncio.Queue per modality;
128+
# multi-pod deployments lose tasks pushed to one
129+
# process and BLPOP'd by another — TEST / SINGLE-POD
130+
# ONLY).
131+
# ``redis`` → ``aperag.indexing.RedisWorkQueue`` (production
132+
# BLPOP transport keyed ``q:indexing:<modality>``
133+
# on the URL ``INDEXING_QUEUE_REDIS_URL`` if set,
134+
# else derived from ``REDIS_HOST`` / ``REDIS_PORT``
135+
# / ``REDIS_USER`` / ``REDIS_PASSWORD`` on a
136+
# separate logical DB (db=2) from the cache /
137+
# memory backends).
138+
indexing_queue_backend: str = Field("inmemory", alias="INDEXING_QUEUE_BACKEND")
139+
indexing_queue_redis_url: Optional[str] = Field(None, alias="INDEXING_QUEUE_REDIS_URL")
140+
141+
# Indexing metrics emitter (Wave 4 T6 — replaces NoopMetricsEmitter
142+
# default with OTLP wire-in for §J.1 SLIs per design pack §J).
143+
# Values:
144+
#
145+
# ``noop`` → ``aperag.indexing.NoopMetricsEmitter`` (default,
146+
# metrics silently dropped — TEST / dev / single-machine
147+
# deployments without observability infra). Operators
148+
# running production multi-pod deployments MUST set
149+
# ``INDEXING_METRICS_EMITTER=otlp`` to ship the four
150+
# §J.1 SLIs (``index_lag_seconds`` / ``queue_depth`` /
151+
# ``index_success_total`` / ``index_failure_total`` /
152+
# ``worker_utilization``) to the OTLP collector.
153+
# ``otlp`` → ``aperag.indexing.OTLPMetricsEmitter`` (production —
154+
# instruments materialised on the OpenTelemetry SDK
155+
# ``MeterProvider`` configured by
156+
# ``aperag.observability``; requires
157+
# ``APERAG_OBSERVABILITY_MODE`` ∈ {``otlp``, ``collector``}
158+
# with a populated ``OTEL_EXPORTER_OTLP_ENDPOINT`` —
159+
# without those the OTLP exporter falls back to no-op
160+
# even though the emitter dispatch path is taken).
161+
indexing_metrics_emitter: str = Field("noop", alias="INDEXING_METRICS_EMITTER")
162+
163+
# Indexing quota backend (Wave 4 T5 — wires the Redis token-bucket
164+
# quota across LLM / embedding callsites). Values:
165+
#
166+
# ``inmemory`` → ``aperag.indexing.quota.InMemoryQuotaBackend``
167+
# (default; per-process token state, suitable for
168+
# tests / single-pod deployments).
169+
# ``redis`` → ``aperag.indexing.quota.RedisQuotaBackend`` (Lua-
170+
# atomic token bucket on shared Redis at
171+
# ``indexing_queue_redis_url`` logical db=3 per
172+
# §H.5.1 amendment; multi-pod production MUST set
173+
# ``INDEXING_QUOTA_BACKEND=redis`` so worker
174+
# processes share token state instead of each pod
175+
# exhausting capacity independently).
176+
indexing_quota_backend: str = Field("inmemory", alias="INDEXING_QUOTA_BACKEND")
177+
178+
# Indexing quota / EntityLock Redis URL (chunk 4e §H.5.1 lock: db=3
179+
# for `quota:<class>:<tenant>:tokens` + `indexing:graph:entity:<slot>`
180+
# — separate from broker (db=0) / memory (db=1) / WorkQueue (db=2)).
181+
# When unset the default-derive chain in ``_apply_defaults`` builds
182+
# `redis://USER:PASS@HOST:PORT/3` from the same Redis credentials.
183+
indexing_quota_redis_url: Optional[str] = Field(None, alias="INDEXING_QUOTA_REDIS_URL")
184+
122185
# Model configs
123186
model_configs: Dict[str, Any] = {}
124187

@@ -269,6 +332,20 @@ def __init__(self, **kwargs):
269332
self.memory_redis_url = (
270333
f"redis://{self.redis_user}:{self.redis_password}@{self.redis_host}:{self.redis_port}/1"
271334
)
335+
# INDEXING_QUEUE_REDIS_URL — separate logical DB (db=2) from
336+
# broker (db=0) and memory (db=1) so BLPOP queues never collide
337+
# with cache or memory backends.
338+
if not self.indexing_queue_redis_url:
339+
self.indexing_queue_redis_url = (
340+
f"redis://{self.redis_user}:{self.redis_password}@{self.redis_host}:{self.redis_port}/2"
341+
)
342+
# INDEXING_QUOTA_REDIS_URL — chunk 4e §H.5.1 lock: separate
343+
# logical DB (db=3) for quota token-bucket + EntityLock keyspace
344+
# (broker=0 / memory=1 / WorkQueue=2 / Quota+EntityLock=3).
345+
if not self.indexing_quota_redis_url:
346+
self.indexing_quota_redis_url = (
347+
f"redis://{self.redis_user}:{self.redis_password}@{self.redis_host}:{self.redis_port}/3"
348+
)
272349
# ES_HOST
273350
if not self.es_host:
274351
if self.es_user and self.es_password:

0 commit comments

Comments
 (0)