docs(indexing): indexing redesign design pack — first-principles rewrite proposal#1725
Merged
Merged
Conversation
…rewrite proposal
Per earayu2 directive (#celery msg=56812dd6 + msg=d8080c08): full redesign of
the document indexing system, prioritizing simplicity and reliability over
feature breadth, targeting 100 concurrent docs, with hard-cut authorization
(pre-launch / no users / no migration).
Design pack contents (1049 lines, 11 sections):
- §A — Current system analysis with file:line evidence (3-layer ownership skew,
Python lease thread tied to worker process, graph index NOT replace-idempotent
per nebula.py:354 upsert_entities, ~995 lines in tasks.py mixing infra +
business)
- §B — First principles (single SoT in DB, idempotent convergence, source/
derived/index three-layer separation, concurrency bounded by external
capacity, simple > complex)
- §C — Three-layer document model (collections/<id>/documents/<id>/source/ +
derived/parse_<v>/{markdown.md, chunks.jsonl, kg.jsonl, summary.json,
vision/} + backend index stores)
- §D — Idempotency contract per modality (DELETE-by-(document_id, parse_version)
before INSERT for all 5 modalities; fixes graph index append bug)
- §E — Concurrency model decision matrix (HTTP-only / lightweight task /
Celery refactor); recommends lightweight Redis-backed asyncio worker pool
per modality (5 worker processes, ~80-line reconciler, no Celery / no chord /
no Python lease thread)
- §F — State machine + atomic flip (4 status values vs current 6;
document.active_parse_version + pending_parse_version with transactional
flip; deletion via async cleanup worker)
- §G — Multi-modal unified pipeline (Modality ABC with derive() + sync()
contract; collapses earayu2's "Celery task 绕来绕去最后又绕回 graph index"
complaint into 2 functions in 1 file)
- §H — Multi-tenant isolation (recommend simple — required tenant context +
bulkheads, defer fairness machinery until observability shows real
noisy-neighbor signal)
- §I — Failure recovery (3 modes: worker crash, transient backend, permanent
failure; exponential backoff retry; Redis token bucket for LLM rate-limit
backpressure)
- §J — Observability (4 SLI: index_lag_seconds, index_failure_rate,
queue_depth, worker_utilization; OTLP wire; aligns with PR #1702)
- §K — Migration plan (7 PRs: observability primitives → idempotent indexers →
object store layout → worker pool → atomic flip → cutover → availability
discriminator; feature-flagged dual-stack during PR-D/E; cutover deletes
~3000 lines of Celery infrastructure)
Net delta: roughly +4150 / -4850 lines across 7 PRs — net subtraction despite
adding functionality. Indexing layer drops from ~2500 lines to ~1500.
Three open decisions deferred to earayu2:
1. Concurrency model: lightweight Redis-asyncio (recommended), HTTP-only, or
Celery refactor
2. Atomic flip contract: all-modalities-ACTIVE-required (recommended) vs
per-modality independent
3. PR sequence: 7-PR cut (recommended) vs combined
Sibling reference: Bryce msg=791082a4 + msg=38fbf962 first-principles analysis
+ architect msg=19f283d5 + msg=2ee66c89 4-blind-spot synthesis. This design
pack is the single canonical deliverable per earayu2's owner directive
(@符炫炜 sole author of the final design).
…/MinIO Driven by earayu2 msg=cc0a00d7 + PM consolidation msg=32463d64. - Drop Celery → lock Redis + asyncio (§E, decision matrix removed) - Drop atomic flip → per-modality independent is_serving cutover (§F) Accept short eventual-consistency window per earayu2 directive. - Answer derived/parse_<v>/ contents per modality (§C.6) — chunks.jsonl shared by vector+fulltext, kg.jsonl, summary.json, vision/manifest.jsonl + images/, markdown.md + outline.json. - Answer MinIO/object-store suitability (§C.7) — ~150 MB / 100-doc burst, trivial; LocalFS / MinIO / S3-compatible all work; small-file + LIST caveats addressed. - Add §L private/on-premise "deploy-and-forget": Tier 1 inline (SQLite + LocalFS, ~10 docs/hour), Tier 2 docker-compose (~100 concurrent), Tier 3 horizontal scale-out — same code. - §H tenant_scope_key forward-compat hook for future organization concept; simple Redis token-bucket quota that won't lock future fairness. - §K restructure: 7 PRs → 3 waves (Foundation / Runtime / Cutover) with per-wave parallel-writability map. - §G.5 SearchResultMetadata extends w/ parse_version + index_state_per_modality (becomes structurally required under per-modality independent flip). PR #1725 v2; awaiting earayu2 final 拍板 on Wave 1 kickoff. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…f / serving invariant / graph entity lineage)
Bryce v2 review msg=7ccb176f surfaced 3 substantive technical deltas all
agreed-as-must-address by PM msg=fc307bbf. Folded into v2:
§C.6 — chunks.jsonl shared-by-vector-and-fulltext is now framed as conscious
trade-off (vector wants larger chunks, fulltext wants smaller) with explicit
shadow-file extension hook (chunks.fulltext.jsonl + namespaced sub-IDs)
preserved so future split is unblocked.
§F.1 — partial unique index added at the schema layer:
CREATE UNIQUE INDEX uniq_document_index_serving
ON document_index (document_id, modality)
WHERE is_serving = TRUE;
This makes the "at most one serving row per (doc, modality)" invariant DB-
enforced, not orchestrator-enforced. SQLite 3.8+ supports the same syntax
(Tier 1 deploy stays consistent).
§D.3 — graph entity lineage model rewritten. Cross-document shared entities
("Linus" mentioned in 100 docs) cannot be cleared by simple DELETE-by-doc
without losing other docs' contributions. New model:
- source_lineage: SET<{document_id, parse_version, chunk_ids[]}>
- description_parts: SET<{document_id, parse_version, text}>
- sync = lineage-level DELETE+INSERT, entity GC when lineage empty
Includes per-entity serialization invariant for Nebula (read-modify-write
without native list ops). 5-step idempotency self-test extension specified.
PR #1725 v2; ready for earayu2 / Bryce final ack.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
9 tasks
Bryce v3 implementation review (msg=464d5b70) caught a spec bug in
§D.3.2 step 1b: the pseudocode used `(document_id, parse_version)`
exact-match for lineage filter, which contradicts §D.3.6 narrative
step 3 ("doc_A v2 写入(覆盖 doc_A 旧 lineage)"). Strict exact-match
would leave lineage[A,v_old] + lineage[A,v_new] coexisting after a
re-parse, violating the expected supersede semantic.
Architect ruling (msg=80c5dc06) is to amend §D.3.2 step 1b to filter
by `document_id` only (not parse_version). This makes sync(doc, v_new)
self-contained for supersede; orchestrator does not need to do
explicit clear-then-sync.
§D.3.6 narrative remains canonical. PR #1726 Wave 1 graph implementation
follows the corrected algorithm.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…r pair (was single rate gauge) huangheng Wave 1 CR (msg=8e67bf0e) flagged §J.1 spec drift: T1.5 implementation emits index_failure_total + index_success_total counter pair, not the single index_failure_rate gauge spec called for. Architect ruling: amend spec to match implementation. Counter pair is OTLP- idiomatic, preserves raw events, re-aggregates across workers without sliding-window state, and the rate is trivially computable downstream. §J.1 spec amended; §K Wave 1 acceptance bullet updated. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
earayu
added a commit
that referenced
this pull request
Apr 26, 2026
* feat(celery T1.1): Foundation — schema + Modality ABC + object_store + parser Phase celery T1.1 per docs/modularization/indexing-redesign-design-pack.md (PR #1725 v3 head 5d7a60f). Foundation lane that the other Wave 1 modality lanes (T1.2 graph @bryce / T1.3 vector+fulltext / T1.4 summary+vision / T1.5 observability) depend on. What this adds (~600 LOC + tests): - alembic migration `f9c4d2a8e1b5_indexing_redesign_document_index.py` creates the `document_index` table with the §F.1 partial unique index `uniq_document_index_serving` enforcing "at most one is_serving=TRUE row per (document_id, modality)" at the DB layer (Bryce v2 review msg=7ccb176f #2). Postgres native; SQLite ≥3.8 supports the same syntax (Tier 1 §L deploy stays consistent). - `aperag/indexing/models.py` — DocumentIndex SQLAlchemy ORM mirroring the alembic schema, plus `Modality` (5 values per §C/§D) and `IndexStatus` (4 lifecycle states per §F.2) string-enums. - `aperag/indexing/base.py` — `ModalityWorker` ABC with `derive` + `sync` so per-modality workers (T1.2-T1.5) inherit the §D.1 "DELETE-by-(doc, parse_version) THEN INSERT" replace-idempotent contract. Graph reinterprets DELETE as the §D.3 lineage-level DELETE+INSERT internally; the ABC accepts that variation. - `aperag/indexing/object_store.py` — Atomic write helpers that wrap the existing `aperag.objectstore` package: LocalFS uses tmp+fsync+rename per §C.7; S3/MinIO relies on the single-request PutObject (or multipart CompleteMultipartUpload) visibility gate. Includes `read_or_none` for the §C.7 read contract and an `InMemoryObjectStore` test fixture so downstream T1.x can wire unit tests without touching disk. - `aperag/indexing/parser.py` — Deterministic parser entry point that produces the three shared artifacts (`markdown.md` / `outline.json` / `chunks.jsonl`) under `derived/parse_<v>/` per §C.1. parse_version is computed via the canonical D10.g §E.2 helper (`compute_parse_version`) so a chunking change rolls the version. T1.1 ships an in-process simulator implementation that proves the write contract; production parser integration (docparser/Marker/OCR) swaps in at T2.x without changing the artifact shape. - `aperag/migration/env.py` — Register `aperag.indexing.models` for alembic autogen (the new module deliberately lives outside the per-domain `db/models.py` tree because the `aperag.domains.indexing` surface is the Wave 3 hard-delete target). Tests cover the three Wave 1 acceptance gates locked by the design pack §K: 1. Partial unique invariant: an INSERT of a second is_serving=TRUE row for the same (document_id, modality) raises IntegrityError; non-serving rows + a different modality on the same document are allowed; the §F.3 three-statement cutover transaction satisfies the constraint. 2. Object-store atomic write: `_LocalAtomicWriter` produces a final destination file with no `.tmp.*` siblings remaining; concurrent writers to different artifacts in the same parse_version directory each land their own bytes; `InMemoryObjectStore` matches the single-call atomicity semantic. 3. Parser → derived/ round-trip: parse_document writes the three canonical artifacts; identical inputs yield identical parse_version + identical artifact contents (§C.3 idempotent retry); a chunking config change rolls parse_version (§E.2 hash); chunks.jsonl round-trips via `read_chunks`; outline.json carries slash-separated `section_path` + slug `heading_anchor` per the D10.c §A.9 R1 lock; re-running parse_document overwrites atomically; missing/empty artifacts are treated as "derive not yet complete" (§C.7 read contract); path traversal is rejected. Out of scope for this lane (per §K decomposition): - T1.2 graph modality + §D.3 lineage model (@bryce, task #7) - T1.3 vector + fulltext modalities (chenyexuan, task #8) - T1.4 summary + vision modalities (chenyexuan, task #9) - T1.5 observability OTLP emit (chenyexuan, task #10) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery Wave 1): T1.3+T1.4+T1.5 modalities + observability + foundation fix-forward Bundles the architect-mandated fix-forward (msg=4a801b2b, msg=c3b0ba5b, msg=07b5b1e6) on top of T1.1 foundation: T1.1 fix-forward (per architect rulings on PR #1726 P0 bugs): - Bug1: object_store.py LocalObjectStore import alias (class is named Local) - Bug2: tablename → document_index_v2 + index renames (Wave 3 renames back to canonical via alembic per task #14 acceptance amendment) - NEW (§H.2): tenant_scope_key VARCHAR(64) NOT NULL column + idx_document_index_v2_tenant_scope index — locked into Wave 1 schema for T2.2 quota lane to consume without migration churn - Tests: fixture default tenant_scope_key="user:test" T1.3 Vector + Fulltext (§D.1 replace-idempotent contract): - VectorBackend / InMemoryVectorBackend Protocol + Qdrant-shaped delete_by_filter + upsert_point - VectorModality: derive no-op pass-through, sync DELETE-by-(doc, parse_v) THEN per-chunk upsert with placeholder embedding (sha256-derived 16-dim) - FulltextBackend / InMemoryFulltextBackend + delete_by_query + bulk_index - FulltextModality: shares chunks.jsonl with vector (§C.6); chunk_id parity preserved for hybrid dedup at search layer - Tests: replace-idempotent on double sync, new parse_version doesn't corrupt old slot, hybrid chunk_id parity, modality discriminator on payload, missing-artifact no-op (§C.7 reschedule semantic) T1.4 Summary + Vision (§C.6 + §D.2 expensive-derive split): - SummaryModality: derive reads parser markdown.md, runs placeholder summarizer (first non-heading paragraph), embeds, writes summary.json atomically; sync deletes by filter + upserts single point keyed summary:{document_id}:{parse_version} - VisionModality: derive reads synthetic image-records JSON (T1 simulator contract — T2.x replaces with real PDF extract + vision-LLM), writes vision/manifest.jsonl, sync upserts one point per image keyed vision:{document_id}:{parse_version}:{image_id} - Both backends use Qdrant-shaped Protocol + InMemory test fixtures - Tests: derive persists canonical artifact (cost preserved across retries), idempotent on double sync, new parse_version doesn't corrupt old slot, modality discriminator, missing-artifact no-op T1.5 Observability primitives (§J.1 SLI emission): - 5 metric name constants prefixed indexing.* — index_lag_seconds / index_failure_rate / index_success / queue_depth / worker_utilization - MetricsEmitter Protocol + NoopMetricsEmitter (Tier 1 deploy without OTLP) + InMemoryMetricsEmitter (test fixture) - emit_index_lag / emit_index_failure / emit_index_success / emit_queue_depth / emit_worker_utilization helpers — modality attribute optional, utilization clamps to [0, 1] and handles capacity=0 - Tests: emission shape contract for each helper + metric name prefix lock Per architect msg=f21a79f0 + PM msg=07b5b1e6 + msg=95012fdb: this commit accumulates onto the Wave 1 mega-PR #1726. Bryce will rebase + push T1.2 graph commit on the same branch; once T1.2 lands, the full Wave 1 PR flips to in_review for huangheng's step 0+ / step 0 / step 0''' / cross-lane caller sweep CR + verdict. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery T1.2): graph modality — §D.3 lineage model + per-entity Redis lock + tenant_scope_key Per docs/modularization/indexing-redesign-design-pack.md §D.3 + architect msg=cc555e33 / msg=f2921ae0 / msg=c3b0ba5b lineage rulings. The graph modality is the only one whose backend rows are shared across documents, so the simple DELETE-by-(doc, parse_version) + re-INSERT pattern that vector/fulltext/summary/vision use breaks the shared-entity model — Bryce v2 review msg=7ccb176f #3 surfaced this. T1.2 lands the lineage-tracked entity / relation rows + the §D.3.2 two-phase sync algorithm + the per-entity Redis lock that protects Nebula's read-modify-write window from racing. Surface (aperag/indexing/graph.py, ~1017 lines): * Lineage data model — LineageMember(document_id, parse_version, tenant_scope_key, chunk_ids), DescriptionPart, EntityRecord, RelationRecord, EntityWithLineage, RelationWithLineage. The tenant_scope_key lives at SET-element level (not entity row level) per architect msg=c3b0ba5b — placement chosen so a shared entity cited by multiple tenants still has one row but each lineage member carries its own tenant attribution for read-path ACL filtering (§H.2 quota / organization key). * Per-entity lock — EntityLock Protocol + InMemoryEntityLock (asyncio.Lock per key, single-process default) + RedisEntityLock (Redis SETNX-style lock keyed by f"{prefix}:{slot}" where slot is crc32(entity_id) % 4096 to bound the key space). The lock is mandatory on the Nebula path because Nebula 3.x's list ops require application-layer read-modify-write; concurrent sync calls touching the same shared entity would otherwise lose lineage members at the network round-trip. * LineageGraphStore Protocol — backend abstraction so the §D.3.2 algorithm in GraphModalityWorker is portable across the three backends listed in §D.3.5 (Nebula 3.x, Neo4j with native list + APOC, in-memory). Methods filter lineage by document_id (not exact (doc, parse_version)) so re-parsing supersedes the old parse_version cleanly per §D.3.6 step 3 (see §D.3.2 amendment note in docstring). * InMemoryLineageGraphStore — Python-dict reference implementation used as the canonical correctness oracle. The §D.3.6 self-test + concurrent race test run against it; any future Nebula / Neo4j adapter that satisfies the Protocol inherits pass/fail status from this suite. * GraphExtractor — Callable injection point so the production LightRAG-based LLM extractor (Wave 2 wiring) and the test stubs share the same surface. The kg.jsonl artifact persists the extractor output so retries never re-charge LLM cost (§C.6 CANONICAL artifact contract). * serialize_kg_jsonl / parse_kg_jsonl — line-oriented JSON with a "kind" discriminator so future record types can be added without breaking the file format. Empty payloads encode as b"\\n" (one byte) so the §C.7 "empty body == derive not finished" sentinel cannot collide with a deliberate "no records produced" payload — the deletion flow (§D.3.6 step 4 / step 5) publishes b"\\n" to cleanly clear a document's lineage contribution. * GraphModalityWorker — implements ModalityWorker ABC. derive() reads chunks.jsonl produced by the T1.1 parser, calls the injected extractor, writes kg.jsonl atomically. sync() reads kg.jsonl and applies the §D.3.2 algorithm: Phase 1 (cleanup): for every entity / relation in the lineage backend whose lineage SET contains the document_id, remove ALL members for that document_id (across any parse_version) and GC rows that go orphan. Per-entity lock held during cleanup so concurrent syncs cannot race the read-modify-write. Phase 2 (rebuild): for every entity / relation in kg.jsonl, upsert with a fresh LineageMember stamped with the current (document_id, parse_version, tenant_scope_key). Per-entity lock held during rebuild for the same reason. Tests (tests/unit_test/indexing/test_t1_2_graph.py, ~1044 lines, 16 test cases): * §D.3.6 five-step idempotency suite (test_d3_6_step1..step5): doc_A v1 → doc_B v1 → doc_A v2 supersedes → delete doc_A → delete doc_B with full entity GC. Pin the §D.3.2 algorithm against the §D.3.6 narrative; the regression Bryce surfaced in msg=7ccb176f #3 fails this suite under the v1 append-on-conflict path. * Relation-lineage symmetric coverage (test_relation_lineage_doc_a_then_doc_b_then_delete_doc_a) — proves the same lineage semantics flow through relation evidence_lineage. * §D.4 byte-equivalent re-sync (test_d4_byte_equivalent_resync) — double-sync with the same artifact leaves the backend identical. The cross-modality idempotency contract every modality must pass. * Nebula race condition under per-entity lock (test_nebula_race_under_per_entity_lock_preserves_both_writes + test_nebula_race_without_lock_loses_a_writer) — uses a _RaceProvocateurStore that emulates Nebula's read-modify-write network round-trip with a deterministic asyncio yield. Under the in-memory entity lock, both writers' lineage members end up in the SET (PASS); under a no-op lock, deterministically one writer loses (the negative control). Pins the architect msg=f2921ae0 invariant that per-entity serialization is mandatory not optional. * tenant_scope_key propagation (test_tenant_scope_key_propagates_into_lineage_members) — pins the SET-element placement decision per architect msg=c3b0ba5b so a future regression that drops the field or moves it to entity row level fails loudly. * kg.jsonl round-trip (test_kg_jsonl_*) — entity / relation serialization, forward-compatible kind skipping, empty-body encoding (always at least b"\\n" to distinguish "no records" from "derive crashed"). * derive contract (test_derive_writes_kg_jsonl_under_canonical_path + test_sync_with_missing_artifact_is_a_noop) — derive writes to derived/parse_<v>/kg.jsonl per §C.6 canonical layout; sync no-ops when the artifact is missing per §C.7 read contract. * End-to-end with the real T1.1 parser (test_end_to_end_with_real_parser_chunks) — proves graph.derive cooperates with the chunks.jsonl shape that aperag.indexing.parser.parse_document actually produces, no hidden coupling beyond the §C.6 contract. aperag/indexing/__init__.py — re-export the T1.2 graph public surface alongside chenyexuan's T1.3/T1.4/T1.5 modality exports so the indexing package surface is uniform across all five modalities. Lint: ruff check + ruff format --check both clean across aperag/indexing/ and tests/unit_test/indexing/. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(celery T1.1 tests): leftover ImportError + bogus settings monkey-patch Three follow-up bugs surfaced by Bryce's local pytest collection (msg=464d5b70) on top of fix-forward 859f899: 1. test_t1_1_foundation.py:69 — ImportError. The class is named `Local` (not `LocalObjectStore`); my main-tree object_store.py already uses the `Local as LocalObjectStore` alias per architect ruling on Bug 1 (msg=4a801b2b), but the test file's own import line still referenced the wrong name. Fixed: same alias pattern. 2. test_local_atomic_write_uses_tmp_rename_dance — AttributeError on `aperag.objectstore.local.settings`. The module never had a `settings` attribute; the monkey-patch block was unfounded. `Local(LocalConfig)` accepts the config struct directly, no module-level singleton dependency. Replaced the monkey-patch block with a direct `LocalObjectStore(LocalConfig(root_dir=...))` construction. 3. test_concurrent_atomic_writes_dont_clobber_each_other — same AttributeError pattern, same fix. No production code changed; only the T1.1 test fixture is simplified (net -30 lines). Lint clean. Wave 1 PR #1726 ready to flip to in_review after this push. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(celery T1.1 audit): narrow allowlist for transitional DocumentIndex duplicate huangheng's Wave 1 CR (msg=8e67bf0e) flagged P1: test_phase3_classes_have_single_definition_site fails because Wave 1 introduces aperag/indexing/models.py:DocumentIndex alongside the legacy aperag/domains/indexing/db/models.py:DocumentIndex still owned by Celery. Architect ruling msg=5be9a535 — option (a): add a narrow allowlist covering exactly this (class_name, file_pair); reject (b) renaming the new ORM class (would force a Wave 1 + Wave 3 double-rename, conflicts with msg=4a801b2b "Python class name stays canonical, only __tablename__ differs") and (c) xfail (does not express the "intentional transitional state" semantic). Wave 3 task #14 acceptance per architect amendment will remove this allowlist entry in the same PR that deletes the legacy file + alembic renames document_index_v2 back to document_index. The allowlist entry is narrow: - Only the exact frozenset of two file paths is accepted; any third duplicate (Wave 4+ regression, e.g. a copy-paste of the class) still flags as an offender. - Only matches `class DocumentIndex(Base):` definitions (the existing orm_pattern); pydantic schemas with the same name are unaffected. - No global widening — no other Phase 3 class gets an exception. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
10 tasks
earayu
added a commit
that referenced
this pull request
Apr 26, 2026
…nup dispatch Three architect rulings (msg=492315e8) applied as a follow-up on top of T2.1 commit 7f51d44: **Ruling 1 — §F.3 cutover MUST be single-TX in worker, not split.** My earlier T2.1 implementation split §F.3 into: - worker writes statement 1 (status=ACTIVE) on sync success - reconciler.reconcile_cutover() writes statements 2+3 (demote+promote) ~30s later on its next cycle That's a drift from the §F.3 explicit lock "Three statements in one transaction" + introduces an ACTIVE-but-not-is_serving inconsistency window the spec disallows in §F.4. Fix: - aperag/indexing/orchestrator.py: replace _finalize_active with _finalize_active_with_cutover — runs all 3 §F.3 statements (status=ACTIVE → demote prior is_serving for (doc, modality) → promote new) inside the same session.begin() block. The §F.1 partial unique index (uniq_document_index_v2_serving) is honoured naturally because statement 2 (demote-FALSE) precedes statement 3 (promote-TRUE) — index never sees two TRUE rows mid-TX. - aperag/indexing/reconciler.py: remove reconcile_cutover() entirely + drop from run_reconcile_loop (now 3 scans: PENDING dispatch + FAILED retry + RUNNING reclaim). Module docstring updated to reference the orchestrator-side cutover with explicit pointer to architect ruling msg=492315e8 Ruling 1. - aperag/indexing/__init__.py: remove reconcile_cutover from exports. - tests: 3 reconciler-cutover tests rewritten as 3 orchestrator-side cutover tests (process_one_task happy path → ACTIVE+is_serving=TRUE in one TX; partial unique invariant; per-modality independence). E2E smoke updated — no longer needs a separate reconcile_cutover step after orchestrator completion. **Ruling 3 — graph cleanup MUST land in T2.1, not punt to T2.2.** T2.2 lane is quota + bulkhead per Ruling 2 simplification; cleanup is squarely chenyexuan's lane. Earlier T2.1 logged a WARN + skipped graph backend cleanup as a "T2.2 follow-up" — that was a scope leak. Fix in aperag/indexing/cleanup.py: - Two distinct entry points with different graph semantics: (A) cleanup_orphan_parse_versions — orphan parse_v GC. For graph, this is a backend no-op because §D.3.6 sync supersede already removed old parse_version's lineage members when the new parse_version was written (per amended §D.3.2 canonical PR #1725 head a0a4799 — sync clears by document_id, not parse_v). Counted under "graph_noop" for telemetry; DB row still GC'd. (B) cleanup_for_deleted_documents — caller-driven, runs when a document is removed. For non-graph: flat backend delete per (document_id, parse_version). For graph: invoke the lineage cleanup loop on the worker's underlying LineageGraphStore via EntityLock (Wave 1 conventions exposed as _store + _entity_lock). Per-document dedup so multiple parse_version rows for the same doc share one lineage cleanup call. - _is_graph_worker uses Modality.GRAPH check (no graph.py import) to avoid pulling Nebula/Neo4j extras into the cleanup module. - _flat_backend_delete_callable renamed for clarity (was _backend_delete_callable); only walks worker._backend (vector / fulltext / summary / vision); explicitly does NOT walk _store. - _cleanup_graph_lineage_for_document implements §D.3 lineage cleanup at the storage layer using duck-typed Wave 1 conventions (find_entity_ids_with_lineage / remove_entity_lineage_member / optional gc_entity_if_orphan + symmetric relation cleanup). Each entity is serialized through entity_lock.acquire() so a concurrent graph sync cannot race. Tests: - test_cleanup_orphan_parse_version_for_graph_is_backend_noop — asserts graph_noop counter increments + backend delete is NOT called + DB row is still GC'd - test_cleanup_for_deleted_documents_removes_non_graph_backend_per_parse_version — vector path: delete_by_filter per parse_version + rows dropped - test_cleanup_for_deleted_documents_calls_graph_lineage_cleanup_once_per_doc — graph path: lineage cleanup runs once per doc regardless of how many parse_version rows; entity GC + relation cleanup invoked - test_cleanup_for_deleted_documents_handles_empty_input - _StubLineageGraphStore + _StubAsyncLock + _GraphLikeWorker test fixtures — duck-typed stand-ins for GraphModalityWorker so we don't need graph extras to test cleanup Local pytest: full indexing suite + Phase 3 audit gate = 104 passed, 0 failed (Wave 1 + T2.1 + T2.2 + T2.1 follow-up all green together on rebased branch on Bryce's commit 057409f). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
earayu
added a commit
that referenced
this pull request
Apr 26, 2026
* feat(celery T2.1): worker pool + reconciler + cleanup runtime
Wave 2 foundation per design pack §E.2 + §I.2 + §I.3 + §F.5. Layers
the worker-pool / reconciler / cleanup runtime on top of Wave 1
modality workers; T2.2 (Bryce) wires commit_active + quota + bulkhead
on top of this.
Schema (alembic c2e8d5a1f3b9):
- ADD COLUMN collection_id VARCHAR(64) NULL — cleanup-worker GC scoping
- ADD COLUMN source_path TEXT NULL — modality.derive input path,
decouples orchestrator from canonical-layout filename assumptions
(vision modality reads non-canonical synthetic JSON in T1 / could
read PDF page extracts in T2.x without breaking layout)
- INDEX idx_document_index_v2_collection on collection_id
- Both nullable for back-compat with Wave 1 fixture rows; reconciler
logs + skips PENDING rows missing source_path
Orchestrator (aperag/indexing/orchestrator.py, ~570 LOC):
- BLPOP loop (WorkQueue Protocol + InMemoryWorkQueue test fixture +
RedisWorkQueue stub for production wire-in)
- Atomic claim: UPDATE WHERE id=$id AND status IN ('PENDING','FAILED')
→ RUNNING + last_heartbeat=now(); zero-row UPDATE = lost claim →
silent skip (concurrent worker won the race)
- Per-task heartbeat asyncio task (HEARTBEAT_INTERVAL_SECONDS=20)
bumps last_heartbeat; cancelled in finally; tolerates DB hiccups
- §I.2 retry backoff: 30s → 60s → 120s → 240s → 480s, capped at 5
retries; past-budget rows stay FAILED with retry_after=NULL
- §C.7 reschedule semantic: empty derive path → status back to
PENDING without burning retry_count (upstream not ready)
- 5 per-modality entrypoints (run_vector_worker / run_fulltext_worker
/ run_graph_worker / run_summary_worker / run_vision_worker) bind
the §E.2 concurrency caps (16/32/4/4/4)
Reconciler (aperag/indexing/reconciler.py, ~370 LOC):
- 30s loop with 4 idempotent scans
- (1) PENDING dispatch: push payloads to per-modality queue; does
NOT claim (orchestrator's job)
- (2) FAILED retry: flip elapsed-backoff FAILED → PENDING (retry_count
≤ MAX_RETRY_COUNT); past-budget rows stay FAILED
- (3) RUNNING reclaim: flip stale-heartbeat (>60s per §E.4) RUNNING
→ PENDING WITHOUT burning retry_count (worker death ≠ work failure)
- (4) Per-modality cutover (§F.3): for every ACTIVE-but-not-serving
row, demote prior is_serving row + promote new in single TX. The
§F.1 partial unique index guards against orchestrator-level bugs
leaving 2 rows is_serving=TRUE for the same (doc, modality)
Cleanup (aperag/indexing/cleanup.py, ~305 LOC):
- 5min loop, GC orphan parse_versions per §F.5
- Orphan = is_serving=FALSE AND superseded by newer parse_version
AND updated_at < now - 1hr cool-down (cool-down lets cutover
races settle before backend delete)
- Duck-typed backend delete (delete_by_filter for vector/summary/
vision; delete_by_query for fulltext); graph backend has no flat
delete (lineage co-mingled across docs per §D.3) → cleanup logs
warning + skips backend delete + still GCs DB row. Bryce's T2.2
follow-up extends graph with §D.3 lineage-aware cleanup hook
- Document-tombstone path (document.deleted_at) deferred to T2.2
cutover lane (touches document table outside indexing domain)
Tests (tests/unit_test/indexing/test_t2_1_runtime.py, 17 cases):
- Orchestrator: claim → derive → sync → finalize happy path; lost
claim silent skip; failure backoff (30s); §C.7 empty-derive
reschedule without retry burn
- §I.2 backoff schedule lock (30s/60s/120s/240s/480s + cap)
- Reconciler: PENDING dispatch idempotency + missing-source_path
skip; RUNNING reclaim preserves retry_count; FAILED retry flips
elapsed-only; cutover demotes prior + promotes new + per-modality
independence + partial-unique invariant survives
- Cleanup: orphan GC after cool-down + cool-down respect + graph
modality skip with warning (T2.2 follow-up flag)
- E2E smoke: PENDING → reconciler push → orchestrator process →
ACTIVE → reconciler cutover → is_serving=TRUE
- DispatchPayload dict + JSON round-trip (Redis serialization sanity)
Local pytest: 17/17 T2.1 + 64/64 Wave 1 indexing + 2/2 Phase 3 audit
all green (81/81).
Bryce can now layer T2.2 (commit_active helper + quota + bulkhead)
on this branch — orchestrator/reconciler/cleanup write-set is
disjoint from his planned commit_active.py / quota.py / limits.py.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* feat(celery T2.2): tenant-scoped quota + bulkhead limits + flaky race-test hardening
Per docs/modularization/indexing-redesign-design-pack.md §H.5 + §H.6
+ architect msg=8420f12a + ruling 2 simplification (msg=492315e8).
The Wave 2 runtime needs two layers of resource protection above
the per-modality worker pool: a quota layer that throttles upstream
LLM/embedding calls per (resource_class, tenant_scope_key), and a
bulkhead layer that places hard wall-time / size ceilings on every
worker call regardless of tenant. This commit lands both as
orchestrator-callable helpers; the orchestrator/reconciler integration
(invoke acquire() before LLM/embedding; wrap calls in
bulkhead_timeout) is chenyexuan's T2.1 follow-up scope per architect
msg=492315e8 ruling 2.
Surface (aperag/indexing/quota.py, ~411 lines):
* QuotaPolicy — frozen dataclass holding (capacity,
refill_rate_per_sec) with __post_init__ validation that both are
> 0. Standard token-bucket parameters; 60/1.0 matches §H.5
baseline of "60 LLM calls / minute sustained".
* QuotaPolicyRegistry — maps (resource_class, tenant_scope_key) →
QuotaPolicy with two-tier lookup: exact match first, then
("default" tenant_scope_key fallback for that resource class).
Raises KeyError if neither configured (worker hitting an
unconfigured resource class is a deployment bug; surface loud).
Resource classes are independent — declaring an "llm" default
does NOT implicitly cap "embedding".
* QuotaBackend — async Protocol every backend implements. Single
acquire() method that blocks until one token is granted,
respecting the refill rate.
* InMemoryQuotaBackend — Python token bucket with one asyncio.Lock
per (resource_class, tenant_scope_key) bucket key. Suitable for
the §L Tier-1 single-process deployment (INDEXING_MODE=inline)
and as the canonical correctness oracle for unit tests. Uses an
injectable clock fixture so tests can advance time
deterministically.
* RedisQuotaBackend — Redis-backed implementation with an atomic
Lua script (HMGET → refill math → HMSET in one round-trip) so
concurrent multi-process workers never overshoot the bucket
capacity. The Lua script returns (acquired, wait_seconds);
callers retry after asyncio.sleep(wait_seconds) when refused.
Uses crc32 slot hashing on tenant_scope_key to bound Redis-key
cardinality. Compatible with both sync and async redis-py
(await-if-awaitable shim).
Surface (aperag/indexing/limits.py, ~162 lines):
* LLM_CALL_TIMEOUT_SECONDS = 60.0 (§H.6)
* EMBEDDING_CALL_TIMEOUT_SECONDS = 30.0 (§H.6)
* UPLOAD_MAX_BYTES = 50 * 1024 * 1024 (§H.6)
* bulkhead_timeout(seconds, label=...) — async context manager
wrapping asyncio.timeout; logs the label on TimeoutError so
per-call telemetry distinguishes timeouts at "graph.derive.llm"
from "vector.derive.embedding" without scattering log strings
across modality workers.
* reject_if_oversize(content_length, label=...) — boundary-time
ValueError when over UPLOAD_MAX_BYTES; called by upload handlers
before parser allocates.
Tests (tests/unit_test/indexing/test_t2_2_quota_limits.py, 20 cases):
* QuotaPolicy validation — capacity / refill_rate must be > 0;
fractional values accepted.
* QuotaPolicyRegistry — exact-match wins over default; KeyError
when neither configured; per-resource-class default isolation.
* InMemoryQuotaBackend — initial bucket starts at capacity;
drained bucket blocks until refill (under fake clock + monkey-
patched asyncio.sleep, deterministic timing assertion); per-tenant
isolation; per-resource-class isolation; refill capped at
capacity after long idle (1hr → 3 tokens not 3600); default
fallback routes unknown tenant through shared policy.
* Bulkhead — bulkhead_timeout completes within budget vs raises
TimeoutError when exceeded; reject_if_oversize accepts at
boundary, rejects strictly over cap; constants pinned to design
pack values.
* RedisQuotaBackend — construction-only smoke (Protocol surface),
Lua-script invocation against fake script (acquire when token
available, retry-loop when wait_seconds returned). Real-Redis
integration deferred to T2.3 load-test infra (real Redis fixture).
Hardening for huangheng msg=2b20974b informational +
architect msg=8420f12a follow-up directive:
* tests/unit_test/indexing/test_t1_2_graph.py:_RaceProvocateurStore
now takes race_count parameter. The lock-protected test stays at
race_count=1 (no barrier; single-writer-at-a-time naturally).
The no-lock negative-control flips to race_count=2: an
asyncio.Event barrier holds both writers at the post-read /
pre-write window until BOTH have reached it, then releases. This
pins the race deterministically — the previous asyncio.sleep(0)
was scheduler-dependent and the test flaked under heavy CI load
(huangheng observed 1/2 fail in a full-suite run). Verified
5/5 deterministic passes locally post-fix.
aperag/indexing/__init__.py — re-export the T2.2 quota + limits
surfaces so the indexing package surface stays uniform across the
3 wave layers.
Lint + tests:
* uvx ruff check + ruff format --check across aperag/indexing/ +
tests/unit_test/indexing/: clean.
* pytest tests/unit_test/indexing/ + tests/unit_test/test_phase3_reexport_audit.py:
101 passed, 0 failed (62 pre-existing Wave 1+2 + 20 new T2.2 +
modified race-test path that now passes 5/5 deterministically).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* fix(celery T2.1): revert cutover to worker single-TX + add graph cleanup dispatch
Three architect rulings (msg=492315e8) applied as a follow-up on top
of T2.1 commit 7f51d44:
**Ruling 1 — §F.3 cutover MUST be single-TX in worker, not split.**
My earlier T2.1 implementation split §F.3 into:
- worker writes statement 1 (status=ACTIVE) on sync success
- reconciler.reconcile_cutover() writes statements 2+3 (demote+promote)
~30s later on its next cycle
That's a drift from the §F.3 explicit lock "Three statements in one
transaction" + introduces an ACTIVE-but-not-is_serving inconsistency
window the spec disallows in §F.4.
Fix:
- aperag/indexing/orchestrator.py: replace _finalize_active with
_finalize_active_with_cutover — runs all 3 §F.3 statements
(status=ACTIVE → demote prior is_serving for (doc, modality) →
promote new) inside the same session.begin() block. The §F.1
partial unique index (uniq_document_index_v2_serving) is honoured
naturally because statement 2 (demote-FALSE) precedes statement 3
(promote-TRUE) — index never sees two TRUE rows mid-TX.
- aperag/indexing/reconciler.py: remove reconcile_cutover() entirely
+ drop from run_reconcile_loop (now 3 scans: PENDING dispatch +
FAILED retry + RUNNING reclaim). Module docstring updated to
reference the orchestrator-side cutover with explicit pointer to
architect ruling msg=492315e8 Ruling 1.
- aperag/indexing/__init__.py: remove reconcile_cutover from exports.
- tests: 3 reconciler-cutover tests rewritten as 3 orchestrator-side
cutover tests (process_one_task happy path → ACTIVE+is_serving=TRUE
in one TX; partial unique invariant; per-modality independence).
E2E smoke updated — no longer needs a separate reconcile_cutover
step after orchestrator completion.
**Ruling 3 — graph cleanup MUST land in T2.1, not punt to T2.2.**
T2.2 lane is quota + bulkhead per Ruling 2 simplification; cleanup
is squarely chenyexuan's lane. Earlier T2.1 logged a WARN + skipped
graph backend cleanup as a "T2.2 follow-up" — that was a scope leak.
Fix in aperag/indexing/cleanup.py:
- Two distinct entry points with different graph semantics:
(A) cleanup_orphan_parse_versions — orphan parse_v GC. For graph,
this is a backend no-op because §D.3.6 sync supersede already
removed old parse_version's lineage members when the new
parse_version was written (per amended §D.3.2 canonical PR
#1725 head a0a4799 — sync clears by document_id, not parse_v).
Counted under "graph_noop" for telemetry; DB row still GC'd.
(B) cleanup_for_deleted_documents — caller-driven, runs when a
document is removed. For non-graph: flat backend delete per
(document_id, parse_version). For graph: invoke the lineage
cleanup loop on the worker's underlying LineageGraphStore via
EntityLock (Wave 1 conventions exposed as _store + _entity_lock).
Per-document dedup so multiple parse_version rows for the same
doc share one lineage cleanup call.
- _is_graph_worker uses Modality.GRAPH check (no graph.py import)
to avoid pulling Nebula/Neo4j extras into the cleanup module.
- _flat_backend_delete_callable renamed for clarity (was
_backend_delete_callable); only walks worker._backend (vector /
fulltext / summary / vision); explicitly does NOT walk _store.
- _cleanup_graph_lineage_for_document implements §D.3 lineage
cleanup at the storage layer using duck-typed Wave 1 conventions
(find_entity_ids_with_lineage / remove_entity_lineage_member /
optional gc_entity_if_orphan + symmetric relation cleanup). Each
entity is serialized through entity_lock.acquire() so a
concurrent graph sync cannot race.
Tests:
- test_cleanup_orphan_parse_version_for_graph_is_backend_noop —
asserts graph_noop counter increments + backend delete is NOT
called + DB row is still GC'd
- test_cleanup_for_deleted_documents_removes_non_graph_backend_per_parse_version
— vector path: delete_by_filter per parse_version + rows dropped
- test_cleanup_for_deleted_documents_calls_graph_lineage_cleanup_once_per_doc
— graph path: lineage cleanup runs once per doc regardless of how
many parse_version rows; entity GC + relation cleanup invoked
- test_cleanup_for_deleted_documents_handles_empty_input
- _StubLineageGraphStore + _StubAsyncLock + _GraphLikeWorker
test fixtures — duck-typed stand-ins for GraphModalityWorker so we
don't need graph extras to test cleanup
Local pytest: full indexing suite + Phase 3 audit gate = 104 passed,
0 failed (Wave 1 + T2.1 + T2.2 + T2.1 follow-up all green together
on rebased branch on Bryce's commit 057409f).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* feat(celery T2.3): synthetic 100-doc burst load test + 5-doc smoke
Per docs/modularization/indexing-redesign-design-pack.md §E.3 +
architect msg=8420f12a Wave 2 acceptance gate. Closes the third and
final task of Wave 2: a load test that proves the full Wave 1 + 2
runtime stack — parser → object_store → orchestrator (claim, derive,
sync, atomic 3-statement cutover per architect ruling msg=492315e8) →
all five modality workers — converges 100 concurrent documents to
``is_serving=TRUE`` for every (doc, modality) pair.
Surface (tests/load/test_100_doc_burst.py, ~530 lines):
* test_100_doc_burst_all_modalities_serving_within_budget — the
canonical Wave 2 gate. Runs against in-memory backends + SQLite
StaticPool so it completes in ~5 s on a developer laptop while
exercising the same orchestrator + cutover path that production
multi-process Redis-backed workers run. Asserts:
- ALL DOC_COUNT × len(Modality) = 500 rows reach is_serving=TRUE
- elapsed wall time < BURST_BUDGET_SECONDS (60s ceiling) — a
regression introducing per-process serialization (e.g., a
hot-path mutex bottlenecking all five modalities) trips this
- every serving row is at status=ACTIVE
- §F.1 partial unique invariant held (no duplicate (doc, modality)
pairs at is_serving=TRUE)
- §J SLI gauges/counters present per modality:
* indexing.index_lag_seconds gauge ≤ 30-min production SLO
* indexing.index_failure_total = 0
* indexing.index_success_total = 500
- InMemoryWorkQueue depth = 0 post-drain for every modality
Marked @pytest.mark.slow so the standard PR-gate suite skips it
(-m "not slow"); the nightly CI job runs -m slow.
* test_smoke_5_doc_burst_all_modalities_serving — same shape with 5
docs and no timing budget. Runs in ~1s, stays in default PR-gate
suite to catch regressions that break the run loop / cutover
entirely (vs just the timing budget).
Implementation notes:
* Per-modality drains run sequentially in the test — SQLite under
StaticPool serializes write transactions anyway, so an
asyncio.gather across modalities offers no real speedup while
introducing flaky cutover-TX races on heavily loaded CI runners.
Production multi-process workers do gather across modalities, but
their DB connections are independent so the gather pattern doesn't
translate cleanly to a single-connection SQLite test.
* Vision modality requires a JSON list source (per
VisionModality.derive contract); the seed phase writes an empty
image-records list under collections/<cid>/documents/<did>/source/
images.json so vision derive completes cleanly without doing real
image extraction. Other modalities all consume chunks.jsonl
(vector + fulltext shared, summary, graph).
* Graph extractor stub returns one EntityRecord per chunk —
deterministic + cheap. Real LLM extraction is out of scope for the
Wave 2 acceptance gate (it's about scheduling + cutover, not
extraction quality).
* doc_lag_starts captures monotonic timestamps at seed time so the
per-(doc, modality) lag emission reflects actual pipeline latency
rather than only the worker's processing time.
Lint + tests:
* uvx ruff check + ruff format --check across aperag/indexing/ +
tests/unit_test/indexing/ + tests/load/: clean.
* pytest tests/unit_test/indexing/ + tests/load/ +
tests/unit_test/test_phase3_reexport_audit.py:
106 passed, 0 failed (62 Wave 1 + 21 T2.1 follow-up + 20 T2.2
+ 2 T2.3 new + Phase 3 audit). 3 consecutive runs all clean.
Wave 2 PR #1727 status (ready for huangheng integrated CR):
* T2.1 (chenyexuan 7f51d44 + follow-up 836fe54)
* T2.2 (Bryce 057409f)
* T2.3 (this commit)
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
---------
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
…nant_scope_key columns + cleanup Path C Three Wave 3 amendments folded in to unblock chenyexuan T3.1 (msg=afe345a9): §F.1 schema: - Add collection_id VARCHAR(64) NOT NULL (denormalized from document.collection_id, populated by orchestrator at INSERT for self-contained dispatch payload — per huangheng Wave 2 CR finding msg=c94b57fe + architect ruling msg=498b12f0). - Add source_path TEXT NOT NULL (pointer to source/ artifact, worker derive reads directly without JOIN). - Add tenant_scope_key VARCHAR(64) NOT NULL (forward-compat for future organization concept per §H.2; required key for §H.5 quota bucket). Was implicit-but-not-listed in the schema before; now explicit. - Add idx_document_index_collection + idx_document_index_tenant_scope indexes for cleanup / quota scans. §F.5 cleanup worker: - Restructure to three paths (A/B/C) with explicit semantics. - Path A: orphan parse_version GC (existing); now notes graph backend no-op via §D.3 lineage supersede + graph_noop counter for telemetry. - Path B: single-document deletion cascade — explicit graph dispatch via remove_entity_lineage_member(document_id) per §D.3 amended canonical (by document_id only). - Path C (NEW): collection deletion cascade — Collection.deleted_at scan + Path B per child document + final Collection row + storage tree cleanup. Replaces legacy Celery collection_delete task with state-driven recovery (no asyncio.create_task() durability gap). These amendments unblock T3.1 commit 1+ since chenyexuan needs the spec head to reference for the audit allowlist removal and the caller-migration patterns. Wave 3 task #14 acceptance criteria (per PM msg=5939e394) now references this head. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
earayu
added a commit
that referenced
this pull request
Apr 26, 2026
… NOT NULL + rename to canonical Wave 3 hard-cut schema migration per architect msg=4a801b2b (Wave 1 Bug 2 ruling that locked the temporary v2 suffix) + msg=498b12f0 (Wave 2 informational item ruling that promoted dispatch columns NOT NULL in Wave 3) + PM acceptance msg=5939e394 item 1. Migration revision d0f4c1b9a8e2 chains off c2e8d5a1f3b9 and: 1. DROP TABLE document_index CASCADE — the legacy Celery-era table that lived alongside the Wave 1 v2 table during the transition. Pre-launch + no callers in Wave 3 (the dependent code is hard- deleted in subsequent commits of this same PR). 2. ALTER COLUMN collection_id, source_path → NOT NULL on document_index_v2. Wave 1 fixtures used NULL for back-compat; Wave 3 orchestrator + reconciler always populate them (per architect msg=498b12f0 Lock). 3. Rename every index *_v2_* → *_*. The partial-unique uniq_document_index_v2_serving is dropped + re-created (PG ALTER INDEX RENAME does not regenerate the WHERE predicate symbol map per Postgres quirk; SQLite would silently keep the old reference). 4. RENAME TABLE document_index_v2 → document_index — back to the §F.1 canonical name (architect msg=4a801b2b lock). The downgrade reverses every step in mirror order so a rollback can replay subsequent migrations cleanly. The recreated legacy ``document_index`` table on downgrade is intentionally schema-less (only the id PK column) because the legacy class was deleted in the Wave 3 PR alongside this migration — operators rolling back past this point must restore the legacy ORM file before re-running upgrades. There is no production scenario for that. This is commit 1/5 of T3.1; subsequent commits land the FastAPI wire-in, knowledge_base/tasks.py Pattern A/B/C migration of the 6 remaining Celery tasks, the 9 production caller migrations, and the legacy file-layer hard-delete + audit allowlist removal + pyproject Celery/kombu dep removal. Design pack §F.1 + §F.5 amends (per architect msg=498b12f0 + msg=3890c9d7 path C ruling) are deferred to a follow-up commit once PR #1725 (which owns docs/modularization/indexing-redesign- design-pack.md) merges — flagged in the channel. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
earayu
added a commit
that referenced
this pull request
Apr 26, 2026
…ion-deletion cascade)
Wave 3 wire-in helpers per architect msg=268f9022 (Wave 3 spec) +
msg=3890c9d7 (Pattern A path C ruling). Adds the upload-side
dispatcher + the cleanup worker's third path so commits 3-5 can
wire FastAPI + migrate the 6 knowledge_base/tasks.py Celery tasks
without inventing new abstractions.
aperag/indexing/dispatcher.py (301 LOC, NEW):
- DispatchRequest dataclass — collection_id / document_id /
parse_version / source_path / tenant_scope_key / modalities tuple
- IndexingMode enum — ASYNC (queue + worker pool) / INLINE
(synchronous derive + sync per modality, for tier-1 private
deployments per design pack §L)
- dispatch_indexing() async helper — INSERTs N PENDING rows in one
transaction (collection_id + source_path + tenant_scope_key are
populated per the design pack §F.1 amended NOT NULL columns) +
finalizes per mode (queue.push for ASYNC; process_one_task call
for INLINE)
- modalities_for_collection() helper — maps per-modality enable
flags to a canonical-order modality tuple, useful for HTTP
handlers
- Fail-fast on missing dependency: raises ValueError if mode=ASYNC
with no queue, or mode=INLINE with empty workers (catches
config bugs at the HTTP boundary, not mid-INSERT)
aperag/indexing/cleanup.py (extended +131 LOC):
- New "Path C" cleanup_for_deleted_collections() per architect
msg=3890c9d7 Pattern A. Three-step cascade:
1. Find all distinct document_ids in document_index referencing
each deleted collection_id
2. Cascade to path B (cleanup_for_deleted_documents) for those
documents — that path already handles modality fan-out (graph
lineage cleanup vs flat backend delete)
3. Sweep any remaining document_index rows by collection_id
(covers the edge case where a row was orphaned earlier or the
collection had rows queued before any document indexed)
- Idempotent: a partial cascade that crashes mid-way is resumed on
the next call (Pattern B reconciler scan that sweeps tombstoned
collections)
- Counts dict adds collections_cleaned key
- Module docstring rewritten to describe THREE paths (was TWO)
aperag/indexing/__init__.py:
- Re-exports cleanup_for_deleted_collections + 6 dispatcher symbols
(DispatchRequest, IndexingMode, DEFAULT_MODALITIES,
dispatch_indexing, modalities_for_collection, all_modalities)
tests/unit_test/indexing/test_t3_1_dispatcher_path_c.py (8 cases):
- dispatcher_async: INSERTs N rows + pushes payloads to per-modality
queue + leaves DB rows PENDING with correct scoping fields
- dispatcher_async_requires_queue: fail-fast on None queue
- dispatcher_inline: INSERTs + invokes process_one_task → row ends
ACTIVE + is_serving=TRUE in one TX (§F.3)
- dispatcher_inline_requires_workers: fail-fast on empty workers
- modalities_for_collection: canonical order + subset selection
- path_c_cascades_via_path_b: 3 collection rows (2 doc + 1 ghost) →
3 backend deletes + 3 row deletes; other-collection row untouched
- path_c_handles_empty_input: counts dict zeroed
- path_c_idempotent_on_re_run: second call returns rows_deleted=0
Local pytest: tests/unit_test/indexing/test_t3_1_dispatcher_path_c.py
8/8 passed. Lint + format clean across new + extended files.
Note: this commit does not yet wire dispatcher into the FastAPI app
(commit 3) or migrate the 6 knowledge_base/tasks.py Celery tasks per
Pattern A/B/C (commit 4). Bryce can now start T3.2 + T3.3 on top of
this branch — the dispatcher shape is the stable API both lanes
depend on (T3.3 inline mode reuses dispatch_indexing(mode=INLINE)
unchanged; T3.2 search API does not depend on dispatcher).
Branch is rebased on main HEAD f370dc6 (PR #1725 design pack merged,
so subsequent commits can amend §F.1 / §F.5 directly if any new spec
drift surfaces during implementation).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
earayu
added a commit
that referenced
this pull request
Apr 26, 2026
…+ INDEXING_MODE=inline smoke Per docs/modularization/indexing-redesign-design-pack.md §G.5 + §L + architect msg=268f9022 (Wave 3 spec) + msg=3890c9d7 (path-C ruling) + msg=c685f83e (PR #1725 §F.1/§F.5 amendments merged). Two Wave 3 lanes shipped together because they share no production- code surface with chenyexuan T3.1 commits 1-2 (this commit's diff is purely additive: 1 new helper + 1 schema extension + 1 docs file + 2 test files): # T3.2 — SearchResultMetadata §G.5 extension aperag/domains/retrieval/schemas.py: * New typing aliases ``IndexerModality`` (vector/fulltext/graph/ summary/vision) + ``IndexStateValue`` (ACTIVE/FAILED/NOT_ENABLED/ INDEXING). * Three new optional fields on SearchResultMetadata: ``parse_version``, ``index_modality``, ``index_state_per_modality``. ``extra="forbid"`` config preserved — the §G.5 additions widen the allowlist by exactly three entries; a typo / future shadow field still fails Pydantic validation loudly. * ``modality`` (D10.h-locked content shape: text/image) kept as-is. The §G.5 spec uses bare ``modality`` for the indexer modality, but the existing public surface already binds that name to content shape; renaming would break D10.h. We chose ``index_modality`` for the indexer modality to disambiguate at the schema level. (Spec narrative §G.5 may want a follow-up to use the same name; not blocking.) * ``from_raw()`` extracts the three new fields from upstream raw metadata, with shallow validation that drops malformed entries (unknown keys / non-string values) before they leak to clients. Accepts both ``index_modality`` and the legacy ``indexer`` key for backward compat with vector/fulltext/graph indexers that haven't been rewired. aperag/indexing/index_state.py (NEW, 165 lines): * Pure-read helper ``query_index_state_for_documents(engine, collection_id, document_ids)`` returning the ``{document_id: {modality: state}}`` shape SearchResultMetadata expects. Single batched read against ``document_index`` so the search pipeline can hydrate metadata for an entire result page in one DB round-trip rather than N+1. * Translation contract pinned: ``status=ACTIVE AND is_serving=TRUE`` → ``ACTIVE``; ``status=FAILED`` → ``FAILED``; everything else (PENDING / RUNNING / ACTIVE-but-not-serving §F.3 cutover transit) → ``INDEXING``; missing row → ``NOT_ENABLED``. Per §F.4 the cutover transit window reads as INDEXING for client purposes. * Dense result map: every document_id key always carries every modality. Stable shape so clients don't have to reason about "field missing means what?". * Module-local re-declaration of ``IndexStateValue`` so ``aperag.indexing`` does not import from ``aperag.domains.retrieval`` (dependency runs in the other direction). Two literals MUST stay in sync. tests/unit_test/indexing/test_t3_2_index_state.py (NEW, 20 cases): * Schema validation: §G.5 fields accepted / extra="forbid" still rejects unknown / IndexerModality + IndexStateValue Literals reject unknown values. * from_raw extraction: §G.5 fields populated / legacy ``indexer`` key fallback / malformed entries dropped silently / D10.h-locked fields unchanged / empty input returns None. * DB helper: empty-input fast path / dense NOT_ENABLED for un-enqueued docs / ACTIVE+serving → ACTIVE / ACTIVE-but-not- serving → INDEXING (§F.3 cutover transit) / PENDING + RUNNING → INDEXING / FAILED → FAILED / per-collection_id filtering / serving row wins over PENDING sibling under §F.3 cutover model / per-modality independence under partial failures. # T3.3 — private deployment docs + INDEXING_MODE=inline smoke docs/private-deployment.md (NEW, 249 lines): * §L Tier 1 / Tier 2 / Tier 3 deployment guide for operators. * Highlights "deploy and forget" mechanisms — every resource that would rot has a corresponding self-heal (§F.5 Path A/B/C, §I.2 retry, §H.5 quota fallback, §C.7 atomic write). * Tier 1: ``pip install aperag && aperag serve`` with SQLite + LocalFS + ``INDEXING_MODE=inline``; no Redis, no separate worker. * Tier 2: docker-compose with PostgreSQL + Redis + MinIO + 5 modality workers + reconciler + cleanup loop; standard customer install on a single VM. * Tier 3: Tier 2 spread across multiple VMs sharing Redis + DB + S3-compatible store. No code change between tiers. * §J.1 SLI table for operators wiring OTLP collectors. * "When to escalate" section: which signals indicate the steady- state self-heal is not converging. tests/integration/test_inline_mode_smoke.py (NEW, 2 cases): * End-to-end smoke for ``IndexingMode.INLINE`` — parse → dispatch → every requested modality at status=ACTIVE + is_serving=TRUE, driven synchronously through chenyexuan T3.1 dispatcher ``9aef2a7``. No Redis, no queue, no separate worker process. * Vision intentionally excluded from the multi-modality smoke because vision derive consumes a JSON list of image records (not chunks.jsonl) and the dispatcher takes a single source_path; the per-modality source_path resolution is the FastAPI lifespan layer's job (chenyexuan T3.1 commit 3, out of scope for T3.3). * Subset-modality test: ``DispatchRequest.modalities`` lets a Tier 1 deploy turn off expensive modalities (e.g., no GPU → skip vision) and only the requested rows finalise. * Stays in default PR-gate suite (no @pytest.mark.slow) since in-memory backends finish in ~1 s. # §G hard-gate self-audit * #1 contract shape: 5 net-new files + schemas.py +93 lines (allowlist widening only). No existing API surface narrowed; the D10.h-locked content modality field is preserved. * #4 caller migration: search pipeline integration is intentionally deferred to chenyexuan T3.1 commit 3 (FastAPI lifespan + caller migration); the read helper in this commit is the seam that pipeline.py will call once wire-in lands. * #5 cross-stack: write set strictly disjoint from chenyexuan T3.1 commits 1-2 (alembic + dispatcher.py + cleanup.py); chenyexuan commit 3-5 changes orchestrator/reconciler/FastAPI app/legacy deletes — also disjoint from this commit's writes. # Lint + tests * ``uvx ruff check + ruff format --check`` across aperag/ + tests/ clean. * ``pytest tests/unit_test/indexing/ tests/integration/ test_inline_mode_smoke.py tests/load/ tests/unit_test/ test_phase3_reexport_audit.py`` → 136 passed, 0 failed (84 Wave 1+2 + 8 T3.1 dispatcher path-c + 20 new T3.2 + 2 new T3.3 + 2 load + 2 phase3 audit). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
earayu
added a commit
that referenced
this pull request
Apr 27, 2026
* feat(celery T3.1 commit 1/5): alembic migration — drop legacy + ALTER NOT NULL + rename to canonical
Wave 3 hard-cut schema migration per architect msg=4a801b2b (Wave 1
Bug 2 ruling that locked the temporary v2 suffix) + msg=498b12f0
(Wave 2 informational item ruling that promoted dispatch columns
NOT NULL in Wave 3) + PM acceptance msg=5939e394 item 1.
Migration revision d0f4c1b9a8e2 chains off c2e8d5a1f3b9 and:
1. DROP TABLE document_index CASCADE — the legacy Celery-era table
that lived alongside the Wave 1 v2 table during the transition.
Pre-launch + no callers in Wave 3 (the dependent code is hard-
deleted in subsequent commits of this same PR).
2. ALTER COLUMN collection_id, source_path → NOT NULL on
document_index_v2. Wave 1 fixtures used NULL for back-compat;
Wave 3 orchestrator + reconciler always populate them (per
architect msg=498b12f0 Lock).
3. Rename every index *_v2_* → *_*. The partial-unique
uniq_document_index_v2_serving is dropped + re-created (PG
ALTER INDEX RENAME does not regenerate the WHERE predicate
symbol map per Postgres quirk; SQLite would silently keep the
old reference).
4. RENAME TABLE document_index_v2 → document_index — back to the
§F.1 canonical name (architect msg=4a801b2b lock).
The downgrade reverses every step in mirror order so a rollback can
replay subsequent migrations cleanly. The recreated legacy
``document_index`` table on downgrade is intentionally schema-less
(only the id PK column) because the legacy class was deleted in
the Wave 3 PR alongside this migration — operators rolling back
past this point must restore the legacy ORM file before re-running
upgrades. There is no production scenario for that.
This is commit 1/5 of T3.1; subsequent commits land the FastAPI
wire-in, knowledge_base/tasks.py Pattern A/B/C migration of the 6
remaining Celery tasks, the 9 production caller migrations, and
the legacy file-layer hard-delete + audit allowlist removal +
pyproject Celery/kombu dep removal.
Design pack §F.1 + §F.5 amends (per architect msg=498b12f0 +
msg=3890c9d7 path C ruling) are deferred to a follow-up commit
once PR #1725 (which owns docs/modularization/indexing-redesign-
design-pack.md) merges — flagged in the channel.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* feat(celery T3.1 commit 2/5): dispatcher.py + cleanup path C (collection-deletion cascade)
Wave 3 wire-in helpers per architect msg=268f9022 (Wave 3 spec) +
msg=3890c9d7 (Pattern A path C ruling). Adds the upload-side
dispatcher + the cleanup worker's third path so commits 3-5 can
wire FastAPI + migrate the 6 knowledge_base/tasks.py Celery tasks
without inventing new abstractions.
aperag/indexing/dispatcher.py (301 LOC, NEW):
- DispatchRequest dataclass — collection_id / document_id /
parse_version / source_path / tenant_scope_key / modalities tuple
- IndexingMode enum — ASYNC (queue + worker pool) / INLINE
(synchronous derive + sync per modality, for tier-1 private
deployments per design pack §L)
- dispatch_indexing() async helper — INSERTs N PENDING rows in one
transaction (collection_id + source_path + tenant_scope_key are
populated per the design pack §F.1 amended NOT NULL columns) +
finalizes per mode (queue.push for ASYNC; process_one_task call
for INLINE)
- modalities_for_collection() helper — maps per-modality enable
flags to a canonical-order modality tuple, useful for HTTP
handlers
- Fail-fast on missing dependency: raises ValueError if mode=ASYNC
with no queue, or mode=INLINE with empty workers (catches
config bugs at the HTTP boundary, not mid-INSERT)
aperag/indexing/cleanup.py (extended +131 LOC):
- New "Path C" cleanup_for_deleted_collections() per architect
msg=3890c9d7 Pattern A. Three-step cascade:
1. Find all distinct document_ids in document_index referencing
each deleted collection_id
2. Cascade to path B (cleanup_for_deleted_documents) for those
documents — that path already handles modality fan-out (graph
lineage cleanup vs flat backend delete)
3. Sweep any remaining document_index rows by collection_id
(covers the edge case where a row was orphaned earlier or the
collection had rows queued before any document indexed)
- Idempotent: a partial cascade that crashes mid-way is resumed on
the next call (Pattern B reconciler scan that sweeps tombstoned
collections)
- Counts dict adds collections_cleaned key
- Module docstring rewritten to describe THREE paths (was TWO)
aperag/indexing/__init__.py:
- Re-exports cleanup_for_deleted_collections + 6 dispatcher symbols
(DispatchRequest, IndexingMode, DEFAULT_MODALITIES,
dispatch_indexing, modalities_for_collection, all_modalities)
tests/unit_test/indexing/test_t3_1_dispatcher_path_c.py (8 cases):
- dispatcher_async: INSERTs N rows + pushes payloads to per-modality
queue + leaves DB rows PENDING with correct scoping fields
- dispatcher_async_requires_queue: fail-fast on None queue
- dispatcher_inline: INSERTs + invokes process_one_task → row ends
ACTIVE + is_serving=TRUE in one TX (§F.3)
- dispatcher_inline_requires_workers: fail-fast on empty workers
- modalities_for_collection: canonical order + subset selection
- path_c_cascades_via_path_b: 3 collection rows (2 doc + 1 ghost) →
3 backend deletes + 3 row deletes; other-collection row untouched
- path_c_handles_empty_input: counts dict zeroed
- path_c_idempotent_on_re_run: second call returns rows_deleted=0
Local pytest: tests/unit_test/indexing/test_t3_1_dispatcher_path_c.py
8/8 passed. Lint + format clean across new + extended files.
Note: this commit does not yet wire dispatcher into the FastAPI app
(commit 3) or migrate the 6 knowledge_base/tasks.py Celery tasks per
Pattern A/B/C (commit 4). Bryce can now start T3.2 + T3.3 on top of
this branch — the dispatcher shape is the stable API both lanes
depend on (T3.3 inline mode reuses dispatch_indexing(mode=INLINE)
unchanged; T3.2 search API does not depend on dispatcher).
Branch is rebased on main HEAD f370dc6 (PR #1725 design pack merged,
so subsequent commits can amend §F.1 / §F.5 directly if any new spec
drift surfaces during implementation).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* feat(celery T3.2 + T3.3): SearchResultMetadata §G.5 + private-deploy + INDEXING_MODE=inline smoke
Per docs/modularization/indexing-redesign-design-pack.md §G.5 + §L
+ architect msg=268f9022 (Wave 3 spec) + msg=3890c9d7 (path-C
ruling) + msg=c685f83e (PR #1725 §F.1/§F.5 amendments merged).
Two Wave 3 lanes shipped together because they share no production-
code surface with chenyexuan T3.1 commits 1-2 (this commit's diff
is purely additive: 1 new helper + 1 schema extension + 1 docs file
+ 2 test files):
# T3.2 — SearchResultMetadata §G.5 extension
aperag/domains/retrieval/schemas.py:
* New typing aliases ``IndexerModality`` (vector/fulltext/graph/
summary/vision) + ``IndexStateValue`` (ACTIVE/FAILED/NOT_ENABLED/
INDEXING).
* Three new optional fields on SearchResultMetadata:
``parse_version``, ``index_modality``, ``index_state_per_modality``.
``extra="forbid"`` config preserved — the §G.5 additions widen
the allowlist by exactly three entries; a typo / future shadow
field still fails Pydantic validation loudly.
* ``modality`` (D10.h-locked content shape: text/image) kept as-is.
The §G.5 spec uses bare ``modality`` for the indexer modality, but
the existing public surface already binds that name to content
shape; renaming would break D10.h. We chose ``index_modality`` for
the indexer modality to disambiguate at the schema level. (Spec
narrative §G.5 may want a follow-up to use the same name; not
blocking.)
* ``from_raw()`` extracts the three new fields from upstream raw
metadata, with shallow validation that drops malformed entries
(unknown keys / non-string values) before they leak to clients.
Accepts both ``index_modality`` and the legacy ``indexer`` key for
backward compat with vector/fulltext/graph indexers that haven't
been rewired.
aperag/indexing/index_state.py (NEW, 165 lines):
* Pure-read helper ``query_index_state_for_documents(engine,
collection_id, document_ids)`` returning the
``{document_id: {modality: state}}`` shape SearchResultMetadata
expects. Single batched read against ``document_index`` so the
search pipeline can hydrate metadata for an entire result page in
one DB round-trip rather than N+1.
* Translation contract pinned: ``status=ACTIVE AND is_serving=TRUE``
→ ``ACTIVE``; ``status=FAILED`` → ``FAILED``; everything else
(PENDING / RUNNING / ACTIVE-but-not-serving §F.3 cutover transit)
→ ``INDEXING``; missing row → ``NOT_ENABLED``. Per §F.4 the
cutover transit window reads as INDEXING for client purposes.
* Dense result map: every document_id key always carries every
modality. Stable shape so clients don't have to reason about
"field missing means what?".
* Module-local re-declaration of ``IndexStateValue`` so
``aperag.indexing`` does not import from
``aperag.domains.retrieval`` (dependency runs in the other
direction). Two literals MUST stay in sync.
tests/unit_test/indexing/test_t3_2_index_state.py (NEW, 20 cases):
* Schema validation: §G.5 fields accepted / extra="forbid" still
rejects unknown / IndexerModality + IndexStateValue Literals
reject unknown values.
* from_raw extraction: §G.5 fields populated / legacy ``indexer`` key
fallback / malformed entries dropped silently / D10.h-locked
fields unchanged / empty input returns None.
* DB helper: empty-input fast path / dense NOT_ENABLED for
un-enqueued docs / ACTIVE+serving → ACTIVE / ACTIVE-but-not-
serving → INDEXING (§F.3 cutover transit) / PENDING + RUNNING →
INDEXING / FAILED → FAILED / per-collection_id filtering /
serving row wins over PENDING sibling under §F.3 cutover model /
per-modality independence under partial failures.
# T3.3 — private deployment docs + INDEXING_MODE=inline smoke
docs/private-deployment.md (NEW, 249 lines):
* §L Tier 1 / Tier 2 / Tier 3 deployment guide for operators.
* Highlights "deploy and forget" mechanisms — every resource that
would rot has a corresponding self-heal (§F.5 Path A/B/C, §I.2
retry, §H.5 quota fallback, §C.7 atomic write).
* Tier 1: ``pip install aperag && aperag serve`` with SQLite +
LocalFS + ``INDEXING_MODE=inline``; no Redis, no separate worker.
* Tier 2: docker-compose with PostgreSQL + Redis + MinIO + 5
modality workers + reconciler + cleanup loop; standard customer
install on a single VM.
* Tier 3: Tier 2 spread across multiple VMs sharing Redis + DB +
S3-compatible store. No code change between tiers.
* §J.1 SLI table for operators wiring OTLP collectors.
* "When to escalate" section: which signals indicate the steady-
state self-heal is not converging.
tests/integration/test_inline_mode_smoke.py (NEW, 2 cases):
* End-to-end smoke for ``IndexingMode.INLINE`` — parse → dispatch →
every requested modality at status=ACTIVE + is_serving=TRUE,
driven synchronously through chenyexuan T3.1 dispatcher
``9aef2a7``. No Redis, no queue, no separate worker process.
* Vision intentionally excluded from the multi-modality smoke
because vision derive consumes a JSON list of image records (not
chunks.jsonl) and the dispatcher takes a single source_path; the
per-modality source_path resolution is the FastAPI lifespan
layer's job (chenyexuan T3.1 commit 3, out of scope for T3.3).
* Subset-modality test: ``DispatchRequest.modalities`` lets a Tier 1
deploy turn off expensive modalities (e.g., no GPU → skip vision)
and only the requested rows finalise.
* Stays in default PR-gate suite (no @pytest.mark.slow) since
in-memory backends finish in ~1 s.
# §G hard-gate self-audit
* #1 contract shape: 5 net-new files + schemas.py +93 lines
(allowlist widening only). No existing API surface narrowed; the
D10.h-locked content modality field is preserved.
* #4 caller migration: search pipeline integration is intentionally
deferred to chenyexuan T3.1 commit 3 (FastAPI lifespan + caller
migration); the read helper in this commit is the seam that
pipeline.py will call once wire-in lands.
* #5 cross-stack: write set strictly disjoint from chenyexuan T3.1
commits 1-2 (alembic + dispatcher.py + cleanup.py); chenyexuan
commit 3-5 changes orchestrator/reconciler/FastAPI app/legacy
deletes — also disjoint from this commit's writes.
# Lint + tests
* ``uvx ruff check + ruff format --check`` across aperag/ + tests/
clean.
* ``pytest tests/unit_test/indexing/ tests/integration/
test_inline_mode_smoke.py tests/load/ tests/unit_test/
test_phase3_reexport_audit.py`` → 136 passed, 0 failed (84 Wave
1+2 + 8 T3.1 dispatcher path-c + 20 new T3.2 + 2 new T3.3 + 2
load + 2 phase3 audit).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* feat(celery T3.1 commit 3/5): Config.INDEXING_MODE + FastAPI lifespan wire-in for indexing runtime
Wave 3 wire-in step per architect msg=268f9022 §K T3.1 spec item 4.
Adds the runtime entry point that launches the per-modality worker
pool + reconciler + cleanup loop on app startup when
``INDEXING_MODE=async`` (default), and the in-process ``WorkQueue`` +
``Engine`` references that future request-handler dispatchers will
import via ``app.state``.
aperag/config.py:
- Add ``Config.indexing_mode: str = Field("async", alias="INDEXING_MODE")``.
Two values per design pack §L:
* "async" → orchestrator + reconciler + cleanup loops launched at
app startup; upload handlers RPUSH to per-modality
queue; workers BLPOP and process. Production / tier-2/3.
* "inline" → upload handlers call ``dispatch_indexing(mode=INLINE)``
which runs derive + sync + cutover synchronously within
the request coroutine; no worker pool, no Redis.
Tier-1 single-process private deployments.
aperag/app.py:
- Extend ``combined_lifespan`` to launch the indexing runtime under
``settings.indexing_mode == "async"``:
* 5 per-modality worker tasks (run_vector / run_fulltext / run_graph
/ run_summary / run_vision)
* 1 reconciler loop task (run_reconcile_loop)
* 1 cleanup loop task (run_cleanup_loop)
All as ``asyncio.create_task()`` background tasks owned by the
FastAPI process — matches the §E.2 "one Python process per modality"
architecture for the in-process deployment topology. Tier-3
horizontal scale-out runs separate worker processes; that wiring
lives in a future ops launcher (out of T3.1 scope).
- Single process-local ``InMemoryWorkQueue`` is the default transport.
Tier-3 production swaps for a Redis-backed ``WorkQueue`` (RPUSH /
BLPOP) by injecting via ``app.state`` at deploy time — Wave 3
follow-up.
- Stash ``app.state.indexing_queue`` + ``app.state.indexing_engine``
for upload-side dispatchers to reach (commit 4 wire-in target).
- Worker registry passed to cleanup loop is empty by default; T3.3
follow-up wires concrete production backends per modality. The
cleanup loop tolerates an empty registry (path A logs warning +
skips backend delete; row still GC'd from DB).
- ``_placeholder_worker_factory`` raises NotImplementedError on
invocation — T3.1 ships the queue-side scaffolding (commits 4-5
wire concrete factories per modality). The orchestrator's
per-task BLPOP loop only invokes the factory when a payload is
popped; until commit 4 wires the upload path nothing pushes, so
the placeholder is never reached at runtime.
- Shutdown drain: on lifespan exit, set ``shutdown`` event +
``await asyncio.gather`` all 7 background tasks with
``return_exceptions=True`` so a SIGTERM does not abort mid-task.
Test impact:
- Existing 136 indexing + load + Phase 3 audit tests still pass
(lifespan code is opt-in via env var; no test imports it).
- Commit 4 (upload-route migration to dispatch_indexing) and commit 5
(hard-delete legacy + concrete backend factories) build on this.
Bryce's vision-modality smoke (deferred at T3.3 commit 53257881
because per-modality source path resolution = lifespan-layer concern)
is now unblocked: ``app.state.indexing_queue`` is the seam through
which a follow-up smoke can wire concrete VisionModality with the
correct synthetic source_path per dispatch.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* test(celery T3.3 follow-up): add vision-only inline mode smoke
Per chenyexuan msg=164efd52 / msg=f70d1288 + architect msg=7fd8f348
post chenyexuan T3.1 commit 3 ``c941526`` (FastAPI lifespan +
INDEXING_MODE wire-in).
The original T3.3 smoke (commit ``53257881``) excluded vision
because vision's ``derive`` consumes a JSON list of image records,
not chunks.jsonl, and the dispatcher takes a single ``source_path``
per request — single-call coverage for all 5 modalities was
incompatible with that contract.
This follow-up adds a vision-only smoke (with a per-modality
source_path resolution example) so vision modality regressions are
covered at the inline-mode layer. The production upload path
(chenyexuan T3.1 commit 4 caller migration) will resolve per-
modality source paths upstream of the dispatcher and issue
per-modality ``DispatchRequest`` calls — this test demonstrates
exactly that pattern.
Test addition (1 case): seed an image-records JSON list under
``collections/<cid>/documents/<did>/source/images.json``, dispatch
with ``modalities=(Modality.VISION,)`` + ``source_path=<images.json
path>``, assert the row reaches ``status=ACTIVE`` AND
``is_serving=TRUE``.
3/3 tests in tests/integration/test_inline_mode_smoke.py now pass.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* feat(celery T3.1 commit 4b/5 step 1): move extract_keywords helper to aperag/indexing/keyword_extract.py
Per architect msg=3890c9d7 commit-4 split (chenyexuan = Pattern A/B/C
+ extract_keywords; Bryce = 9 caller schema-aware migration), this
commit lands the extract_keywords subsystem move that decouples the
search-time keyword extraction helpers from the soon-to-be-deleted
``aperag/domains/indexing/fulltext_index.py`` (commit 5 hard-cut
target).
aperag/indexing/keyword_extract.py (NEW, 337 lines):
- ``KeywordExtractor`` (abstract base for backward-compat with
callers that may type-annotate the abstract type)
- ``IKKeywordExtractor`` (Elasticsearch IK analyzer, default
fallback, always available when ES is reachable)
- ``LLMKeywordExtractor`` (optional LLM extractor with structured
JSON parsing + simple-line fallback)
- ``extract_keywords(text, ctx)`` (public entry point with
LLM-then-IK fallback chain, signature unchanged from legacy)
- ``_es_client_config()`` (private helper, inlined to keep the new
module dependency-free of legacy fulltext_index.py)
- Module docstring explains the SEARCH-side helper vs Wave 1
``aperag/indexing/fulltext.py`` (write-side modality worker) split
aperag/indexing/__init__.py:
- Re-exports the 4 new symbols (KeywordExtractor + IKKeywordExtractor
+ LLMKeywordExtractor + extract_keywords)
Caller migration (extract_keywords import sites):
- ``aperag/domains/retrieval/pipeline.py:41`` — swap from legacy
``aperag.domains.indexing.fulltext_index`` to new
``aperag.indexing.keyword_extract``
- ``aperag/service/search_pipeline_service.py:34`` — same swap.
This file's docstring explicitly notes the import alias is kept
writable for ``monkeypatch.setattr("aperag.service.search_pipeline_service.extract_keywords", ...)``
test fixtures, so the new path is preserved as a writable alias.
The legacy ``extract_keywords`` symbol still exists in
``aperag/domains/indexing/fulltext_index.py`` until commit 5 deletes
the file — both sites work simultaneously, so any caller I missed is
not silently broken in this intermediate state.
Other DocumentIndex / FulltextSearchDegradedError / fulltext_indexer
imports in ``aperag/domains/retrieval/pipeline.py`` (line 293) +
elsewhere in pipeline.py are Bryce's commit-4a write set per the
agreed split (msg=9d5d54b5 coordination note). chenyexuan changed
ONLY the extract_keywords import line, leaving Bryce's hunks
untouched.
Local pytest: 137 passed (Wave 1 + T2.1 + T2.2 + T3.1 + T3.2 + T3.3
+ Phase 3 audit), 0 failed. Lint + format clean.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* feat(celery T3.1 commit 4a): migrate 7 production callers to §F.1 schema
Per architect msg=ab8d473c pre-blessed split + chenyexuan msg=be26ebf3
+ PM authorization msg=df9ea8d2: schema-aware migration of legacy
``aperag.domains.indexing.db.models.DocumentIndex`` callers to the
new ``aperag.indexing.models.DocumentIndex`` (§F.1 canonical schema
post Wave 3 commit 1 alembic ``930cf20``).
# Field translation contract
Wave 1+2+commit-1 merged the following schema deltas; this commit
flips every production caller to the new shape:
| Legacy (gone in Wave 3 commit 5) | New (§F.1) |
|----------------------------------------------|-------------------------------------------------------|
| ``DocumentIndex.index_type`` (enum) | ``DocumentIndex.modality`` (string) |
| ``DocumentIndexType.GRAPH`` (Python enum) | ``Modality.GRAPH.value`` (lowercase string) |
| ``DocumentIndexStatus.ACTIVE`` (Python enum) | ``IndexStatus.ACTIVE.value`` (string) + is_serving=TRUE |
| ``DocumentIndex.gmt_created`` / ``gmt_updated`` | ``created_at`` / ``updated_at`` (mixin-aligned) |
| ``DocumentIndex.index_data`` (JSON blob) | per-modality ``derived/parse_<v>/`` artifact paths |
The "currently-serving" semantic now requires
``status=ACTIVE AND is_serving=TRUE`` per §F.3 cutover model — a row
at ``status=ACTIVE`` but ``is_serving=FALSE`` is in the cutover
transit window and is NOT yet user-visible.
# Files migrated (7 of 9 in commit 4a list)
* ``aperag/db/repositories/document_index.py`` — repository mixin:
``has_recent_graph_index_updates`` query rewritten + return type
switched from ``DocumentIndexType`` enum to ``Modality`` /
string. ``query_documents_with_failed_indexes`` now returns
modality string values (lowercase) per the §F.1 column type.
* ``aperag/domains/agent_runtime/runtime.py`` — inlined
``generate_processing_token`` (3-line stdlib uuid wrapper) since
``aperag.tasks.processing_lease`` is in chenyexuan's commit 5
hard-delete list. Per architect msg=3890c9d7 Item 1 Option B
("提取小 helper 到 agent_runtime 自己 module").
* ``aperag/domains/knowledge_base/db/models.py`` —
``Document.get_overall_index_status()`` rewritten: the legacy
``CREATING`` / ``DELETION_IN_PROGRESS`` intermediate states are
gone in §F.1 (a single ``RUNNING`` covers in-flight work);
``COMPLETE`` now requires ``is_serving=TRUE`` per §F.3.
* ``aperag/domains/knowledge_base/service/document_service.py`` —
schema migration spans ``_get_index_types_for_collection`` (now
returns ``Modality`` values), the document JOIN query (legacy
``index_type`` / ``index_data`` / ``gmt_*`` columns translated
to ``modality`` / None placeholder / ``created_at``/``updated_at``),
rebuild_failed_indexes (modality string compare instead of enum),
rebuild_document_indexes (Modality enum list instead of
DocumentIndexType). The legacy ``index_data`` JSON-blob reads in
``get_document_chunks`` / ``get_document_vision_chunks`` are
replaced with ``derived_artifact_path`` probes that exercise the
§F.1 partial-unique invariant; the actual chunk-list response is
routed through a "return empty list" placeholder until chenyexuan
T3.1 commit 4b plumbs the object-store read path. HTTP response
shape stays stable (``index_data=None`` populated where callers
previously read JSON). Service-layer ``document_index_manager``
calls remain — those are chenyexuan commit 5 hard-delete scope.
* ``aperag/domains/knowledge_base/service/collection_summary_service.py``
— same ``index_data`` deprecation pattern: query touches the §F.1
serving rows for the partial-unique invariant probe, returns
empty document_summaries until the object-store read path lands.
* ``aperag/mcp/tools/get_document_metadata.py`` — ``DocumentIndex``
/ ``DocumentIndexStatus`` import migrated; ``index_data`` JSON
parse replaced with ``derived_artifact_path`` probe, chunk_count
surfaced as 0 (placeholder until object-store read path lands).
* ``aperag/mcp/tools/list_documents.py`` — same migration as
get_document_metadata (page-level ``DocumentIndex`` lookup +
chunk_count placeholder).
# Out of scope (chenyexuan commit 4b / 5 lane)
* ``aperag/domains/retrieval/pipeline.py`` + ``aperag/service/search_pipeline_service.py``
— chenyexuan handles ``extract_keywords`` import + Pattern A/B/C
legacy task migrations there per the split agreement.
* ``aperag/domains/knowledge_base/tasks.py`` — chenyexuan commit 4b
Pattern A/B/C migration (collection_delete / cleanup_expired /
collection_summary / collection_summary_reconciler / collection_init
/ export_collection).
* ``document_index_manager.create_or_update_document_indexes`` /
``delete_document_indexes`` calls inside document_service —
chenyexuan commit 5 hard-deletes the manager module so these
callers will need switching to the new ``dispatch_indexing()`` /
cleanup paths (chenyexuan's lane).
# Lint + tests
* ``uvx ruff check + ruff format --check`` clean across aperag/.
* ``pytest tests/unit_test/indexing/ tests/integration/
test_inline_mode_smoke.py tests/load/ tests/unit_test/
test_phase3_reexport_audit.py`` → 137 passed, 0 failed.
* Tests covering legacy ``aperag.domains.indexing.*`` modules
(which chenyexuan commit 5 deletes) are not in the test set
above; they are chenyexuan's commit 5 sweep scope.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* feat(celery T3.1 commit 4b/5 step 2): Pattern A/B/C migration of 6 knowledge_base Celery tasks
Per architect msg=3890c9d7 Pattern A/B/C ruling, the 6 Celery tasks
in aperag/domains/knowledge_base/tasks.py are migrated off Celery
without losing their semantics. The decorators + Celery imports
(``from celery import current_app`` + ``from config.celery import
app``) are removed; each function is now plain Python that callers
invoke per its category:
aperag/domains/knowledge_base/tasks.py (-Celery, +Pattern A/B/C):
- Module docstring rewritten — Pattern map for the 6 tasks
- ``reconcile_collection_summaries_task`` (Pattern B, periodic) —
no decorator; commit 5 wires into reconciler 30-s loop
- ``collection_delete_task`` (Pattern A, durability-required) —
caller invokes synchronously from HTTP handler; on failure raises
HTTP 500 + the periodic Path C cleanup loop sweeps tombstoned rows
- ``collection_init_task`` (Pattern C, idempotent) — no decorator;
caller wraps in asyncio.create_task; failures log + reconciler
picks up
- ``collection_summary_task`` (Pattern C, regenerable) — no
decorator; ``self.retry(...)`` removed (Celery-specific); failures
flow through ``collection_summary_callbacks.on_summary_failed``
+ reconciler picks up next cycle
- ``cleanup_expired_documents_task`` (Pattern B, periodic) — no
decorator; commit 5 merges into cleanup.py 5-min loop
- ``export_collection_task`` (Pattern C) — ``self`` parameter
removed; ``soft_time_limit`` / ``time_limit`` decorator args
removed (now enforced via §H.6 ``bulkhead_timeout`` async ctx
manager wrapped at the dispatch site)
- Removed unused ``Any`` typing import + unused ``TaskConfig``
reference (was only used by removed ``self.retry()`` calls)
- Function bodies still call legacy ``aperag/tasks/collection.py:
collection_task.<method>()`` and ``aperag/tasks/reconciler.py:*``
helpers — commit 5 moves / inlines those helpers when it deletes
the legacy ``aperag/tasks/`` layer entirely.
aperag/domains/knowledge_base/service/collection_service.py:
- ``collection_init_task.delay(...)`` (line 215) → Pattern C:
``asyncio.create_task(asyncio.to_thread(collection_init_task,
instance.id, document_user_quota))`` so the HTTP response
returns immediately. Failures log + the reconciler picks up.
- ``collection_delete_task.delay(...)`` (line 438) → Pattern A:
``await asyncio.to_thread(collection_delete_task, collection_id)``
synchronous in the HTTP handler — durability-required per
architect ruling msg=3890c9d7 (NOT fire-and-forget — losing this
work = orphan rows + DB corruption).
- Added ``import asyncio`` to module imports.
aperag/domains/knowledge_base/service/export_service.py:
- ``export_collection_task.delay(...)`` (line 104) → Pattern C:
``asyncio.create_task(asyncio.to_thread(export_collection_task,
task.id))`` so the HTTP response returns immediately. The body
is sync I/O (object-store + ZIP); the ExportTask DB row tracks
progress; users retry from the UI on failure.
Pattern B integration (cleanup_expired_documents_task +
reconcile_collection_summaries_task into the existing 5-min /
30-s loops in aperag/indexing/{cleanup,reconciler}.py) is deferred
to commit 5 — the functions still exist as plain Python, just no
longer invoked via Celery beat schedule (config/celery.py beat
schedule entries to be removed in commit 5 alongside the periodic
loop integration).
Local pytest: 137 passed (Wave 1 + T2.1 + T2.2 + T3.1 + T3.2 + T3.3
+ Phase 3 audit), 0 failed. Lint + format clean across all changed
files.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* feat(celery T3.1 commit 5 Part 1): inline processing_lease helpers + remove flower dep
Wave 3 hard-cut Part 1 per architect msg=64fd506a fallback split
(Part 2 atomic = next session). Two safe pieces that decouple the
last knowledge_base-domain dependency on legacy
``aperag/tasks/processing_lease.py`` + drop a Celery-monitor dep
that has no remaining production caller.
aperag/domains/knowledge_base/tasks.py:
- Removed ``from aperag.tasks.processing_lease import ...`` line
(last surviving caller; Bryce commit 4a `39aad24` already inlined
the agent_runtime caller)
- Inlined the 4 public symbols from
``aperag/tasks/processing_lease.py`` (84 LOC verbatim):
* ``DEFAULT_PROCESSING_LEASE_TTL_SECONDS``
* ``DEFAULT_PROCESSING_LEASE_RENEW_INTERVAL_SECONDS``
* ``generate_processing_token()``
* ``build_lease_expires_at()``
* ``ProcessingLeaseRenewer`` class (background lease-renewal thread)
- Added ``import threading``, ``import uuid``, ``from typing import
Optional`` to support the inlined symbols
- Module section header explains Part 1 / Part 2 split — the legacy
``aperag/tasks/processing_lease.py`` file itself stays in Part 1
(Part 2 atomic deletes it together with the rest of
``aperag/tasks/`` after CollectionSummaryCallbacks +
CollectionTask methods are inlined to their service-layer homes)
pyproject.toml:
- Removed ``flower<3.0.0,>=2.0.0`` dep (Celery monitoring dashboard,
no production code import; verified ``grep -rn "import flower\|
from flower" aperag/ tests/ config/`` returns 0)
- Other Celery deps (``celery``, ``django-celery-beat``, ``kombu``)
stay until Part 2 atomic — they are still imported by 4 files in
Part 2's delete list (``aperag/tasks/scheduler.py``, two files in
``aperag/domains/indexing/``, and ``config/celery.py``)
Notes scoped OUT of Part 1 (per architect msg=64fd506a):
- ``aperag/concurrent_control/redis_lock.py`` deletion deferred:
architect spec said "no production caller" but recon found
internal callers in ``concurrent_control/__init__.py`` +
``concurrent_control/manager.py`` (the package itself uses it
even though zero EXTERNAL imports of the package exist). Cleaner
fix is to delete the whole ``aperag/concurrent_control/`` package
in Part 2 atomic alongside the other dead-code sweeps.
Local pytest: 137 passed (Wave 1 + T2.1 + T2.2 + T3.1 commits 1-4b
step 2 + T3.2 + T3.3 + Bryce caller migration + Phase 3 audit), 0
failed. Lint + format clean.
This is a partial commit 5; Part 2 (inline CollectionTask /
CollectionSummaryCallbacks / Pattern B reconcilers + tablename
rename + audit allowlist removal + legacy file-layer deletion +
remaining Celery dep removal + legacy test deletion + final grep
validation) is the next-session atomic push.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* feat(celery T3.1 commit 5 Part 2 chunk 1a): inline CollectionSummaryCallbacks
Per architect msg=70a20f0e + msg=54063106 fallback ratify (Bryce
takes Part 2) + PM msg=ef2e97b9 minimal-chunk-1 GO.
Move legacy ``aperag/tasks/reconciler.py:CollectionSummaryCallbacks``
(~234 LOC) to its true owner: ``aperag/domains/knowledge_base/
service/collection_summary_service.py``. The class is the terminal
callback hook the summary generation task invokes on success / failure
to flip the ``CollectionSummary`` row's lifecycle (GENERATING →
COMPLETE / FAILED) and propagate the generated text to
``Collection.description``. It belongs to the summary service layer,
not the legacy task / reconciler layer that Wave 3 commit 5 deletes.
* ``CollectionSummaryCallbacks`` class — three static methods
(``_describe_summary_callback_mismatch``, ``on_summary_generated``,
``on_summary_failed``) inlined verbatim. No semantic changes; the
query/update logic, token/version mismatch tolerance, and
Collection.description propagation are preserved exactly.
* Module-level ``collection_summary_callbacks`` singleton mirrors the
legacy ``aperag.tasks.reconciler.collection_summary_callbacks``
attribute so callers can swap import path without changing the
call shape.
* ``aperag/domains/knowledge_base/tasks.py:373`` import switched to
the new location. Removes the last `aperag.tasks.reconciler`
callback import; the periodic-reconciler imports
(``collection_summary_reconciler`` + ``collection_gc_reconciler``)
remain pending for Part 2 chunks 1b / 2 / 3.
This is the safe, surgical first chunk per architect msg=f3de18a0
chunked-OK ruling: intermediate-red CI is fine; the final HEAD
must be green + grep 0 + alembic reversible before task #14 →
``in_review``. The next session will continue Part 2 chunks 1b
(remaining inline migrations: CollectionTask methods, periodic
reconcilers) → chunk 2 (deletions + tablename rename) → chunk 3
(verify + wire).
Tests: 137 indexing/load/audit tests still green; lint clean.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* feat(celery T3.1 commit 5 Part 2 chunk 1b): simplify task bodies + Pattern B loop integration
Wave 3 hard-cut continuation per architect msg=3890c9d7 Pattern A/B/C
ruling and PM msg=206eec7b chunk 1b spec (~300 LOC scope).
aperag/domains/knowledge_base/tasks.py:
- collection_delete_task: Pattern A — replace legacy
collection_task.delete_collection() with sync UPDATE Collection.status
=DELETED + gmt_deleted=NOW(); periodic Path-C
cleanup_for_deleted_collections sweep cascades the deletion (5-min
worst-case latency acceptable for low-frequency op)
- collection_init_task: Pattern C — replace legacy
collection_task.initialize_collection() with sync UPDATE
Collection.status=ACTIVE; per-modality index provisioning is implicit
lazy in the new modality-worker model (per architect hint
msg=54063106)
- cleanup_expired_documents_task: Pattern B — replace legacy
CollectionTask.cleanup_expired_documents with inlined SQL tombstone
scan (Document.status==UPLOADED AND gmt_created < now-1d) +
best-effort object-store delete + soft-delete to EXPIRED
- reconcile_collection_summaries_task: Pattern B — convert to thin
sync shim around the new aperag.indexing.reconciler hook
- Drop unused legacy import: from aperag.tasks.collection import
collection_task (no remaining call sites in this file)
- Update module docstring to point at new Pattern B hook locations
aperag/indexing/cleanup.py:
- Add cleanup_expired_documents_hook() async helper (lazy import
+ asyncio.to_thread wrapper) wired into the existing 5-min
run_cleanup_loop. Hook failures are logged + cycle continues.
- Update module docstring to describe Pattern B integration alongside
the original orphan-parse-version GC
aperag/indexing/reconciler.py:
- Add reconcile_collection_summaries_hook() async helper that inlines
the legacy CollectionSummaryReconciler.reconcile_all() logic:
reclaim stale GENERATING leases → PENDING; select PENDING summaries
with version != observed_version; atomically claim each; fire
collection_summary_task as Pattern C asyncio.create_task fire-and-
forget background task (never blocks the loop on summary generation
duration). Wired into existing 30-s run_reconcile_loop with
best-effort try/except so hook failure cannot crash the loop.
Tests: 132 passed (tests/unit_test/indexing/ + tests/load/);
ruff check + format clean on all 3 modified files.
Pre-existing test_phase3_reexport_audit.py circular-import error is
unchanged (independent of this chunk; will resolve in chunk 2 when
legacy aperag/domains/indexing/db/models.py is deleted).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* feat(celery T3.1 commit 5 Part 2 chunk 2): hard-delete legacy Celery + indexing layers + tablename rename
Wave 3 hard-cut continuation per architect msg=3890c9d7 + PM @不穷
msg=313caed3 chunk 2 spec (delete-focused, intermediate red CI OK).
DELETIONS (~3.5k LOC removed):
- aperag/tasks/* — entire dir (collection / document / models /
processing_lease / reconciler / scheduler / utils): legacy Celery
state machine + reconciler + scheduler infrastructure
- aperag/concurrent_control/* — entire dir (manager / protocols /
redis_lock / threading_lock / utils + 2 READMEs): no remaining
production caller after Wave 1+2 modality workers replaced lock
semantics with per-row §F.1 partial-unique invariant
- aperag/domains/indexing/{tasks,orchestration,manager,vector_index,
fulltext_index,graph_index,summary_index,vision_index}.py +
aperag/domains/indexing/db/models.py — legacy ABC + 5 modality
workers + Celery orchestration + legacy DocumentIndex schema
- config/celery.py — Celery app + beat schedule
- tests/unit_test/concurrent_control/* + tests/unit_test/tasks/* —
contract tests for now-deleted modules
TABLENAME RENAME (matches existing alembic d0f4c1b9a8e2 post-state):
- aperag/indexing/models.py: __tablename__ + 5 index names from
*_v2 → canonical (no new alembic revision needed; the migration
already does the rename at upgrade)
AUDIT ALLOWLIST + 15-symbol map updates:
- tests/unit_test/test_phase3_reexport_audit.py: drop
WAVE_1_2_TEMPORARY_DUP_ALLOWLIST DocumentIndex entry; remap
PHASE3_SYMBOL_TO_MODULE['DocumentIndex'] from
aperag.domains.indexing.db.models → aperag.indexing.models;
remove DocumentIndexStatus/DocumentIndexType (legacy enums gone,
replaced by IndexStatus + Modality which are not Phase-3-canonical)
- Add explicit aperag.indexing.models import after the per-domain
bootstrap loop so Base.metadata['document_index'] is populated
PYPROJECT — drop Celery deps:
- celery<6.0.0,>=5.3.1
- django-celery-beat<3.0.0,>=2.5.0
(kombu was a transitive only; no explicit entry to remove)
CONSUMER PATCHES (minimum to keep imports working — chunk 3 wires
real new-API replacements):
- aperag/domains/knowledge_base/service/document_service.py: stub
document_index_manager + no-op _trigger_index_reconciliation
- aperag/domains/knowledge_base/service/collection_summary_service.py:
drop unused SummaryIndexer init
- aperag/domains/retrieval/pipeline.py: stub _fulltext_search to
return empty (Bryce T3.2 lane wires real
aperag.indexing.fulltext backend)
- aperag/domains/evaluation/tasks.py + services.py: drop @app.task
decorator + asyncio.create_task fire-and-forget Pattern C
- aperag/domains/knowledge_graph/tasks.py + graph_curation/service.py:
same Pattern C migration
CIRCULAR IMPORT FIXES (uncovered when stub re-exports were dropped):
- aperag/indexing/__init__.py: drop keyword_extract re-exports (eager
import pulled LLM completion stack mid-module-load); the 2 callers
already import from aperag.indexing.keyword_extract directly
- aperag/indexing/parser.py: lazy-import compute_parse_version inside
parse_document body (was triggering full mcp.tools registry load)
- aperag/indexing/keyword_extract.py: lazy-import db_ops inside LLM
extractor body
- aperag/domains/knowledge_base/db/models.py: lazy-import DocumentIndex
+ IndexStatus inside Document.{get_document_indexes,
get_overall_index_status} method bodies (was triggering
knowledge_base→indexing→mcp→knowledge_base cycle)
GATES:
- pytest tests/unit_test/indexing/ + tests/load/ +
test_phase3_reexport_audit.py + agent_runtime_openapi_contract:
136 passed
- Wider sweep (tests/unit_test/ excluding pre-existing missing-moto
+ just-deleted concurrent_control/tasks suites): ~896 passed,
4 failed (3 expected — Celery-specific assertions in
evaluation_v2_worker / graph_curation that chunk 3 deletes; 1
format_drift caught + auto-formatted)
- ruff check + format clean on all 13 modified .py files
REMAINING FOR CHUNK 3:
- Wire document_service.py 5 call sites + retrieval/pipeline.py
fulltext to real new-API helpers
- Selective deletion of legacy Celery-specific tests (evaluation_v2,
graph_curation enqueue-raises path)
- Final grep validation: from aperag.tasks / from aperag.domains.
indexing / from celery / import celery = 0 hits in production
- Alembic upgrade/downgrade smoke
- task #14 → in_review
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* feat(celery T3.1 commit 5 Part 2 chunk 3): wire new-API + final grep 0 + alembic smoke + selective test delete
Wave 3 hard-cut FINAL chunk per architect msg=3890c9d7 + PM @不穷
msg=de7b6834 + msg=fdb6cd28 chunk 3 spec.
NEW MODULE — IndexingRuntime singleton:
- aperag/indexing/runtime.py: process-local triple holder
(engine + queue + workers) populated by FastAPI lifespan,
consumed by service-layer code that doesn't have a Request
handle for app.state. Tests can install a fixture runtime
via set_runtime + reset.
- aperag/app.py: lifespan calls set_runtime after building the
triple; passes None on the sync-only branch + on shutdown.
DOCUMENT_SERVICE — wire 5 callsites to new dispatcher + cleanup:
- aperag/domains/knowledge_base/service/document_service.py:
Replace the chunk-2 ``_DocumentIndexManagerStub`` with two
real adapters:
- ``_create_or_update_document_indexes`` → calls new
``aperag.indexing.dispatcher.dispatch_indexing()`` with
deterministic ``parse_version`` (compute_parse_version on
document.content_hash + canonical chunking config) +
``source_path = document.object_store_base_path()`` +
tenant_scope_key per user.
- ``_delete_document_indexes`` → calls new
``aperag.indexing.cleanup.cleanup_for_deleted_documents()``
(handles modality fan-out + DB row cleanup).
Both adapters consume the IndexingRuntime singleton; if the
runtime is absent (test environment / sync-only mode), they
log a warning + no-op rather than crash.
All 5 production callsites swapped:
- line 532 create_documents
- line 687 _delete_document
- line 787 rebuild_document_indexes
- line 831 rebuild_failed_indexes
- line 1346 confirm_documents
- ``_trigger_index_reconciliation`` stays as a no-op shim — the
new ``run_reconcile_loop`` runs continuously every 30s.
RETRIEVAL PIPELINE — inline ES fulltext search:
- aperag/domains/retrieval/pipeline.py: ``_fulltext_search``
was a chunk-2 empty stub. Now executes the same ES query
shape as the legacy ``FulltextIndexer.search_document`` —
bool/should/match on content+title, filter by collection_id,
optional chat_id filter — directly through ``AsyncElasticsearch``
(no longer wrapped in a domains/indexing/* class). T3.2 lane
did not introduce a new search backend abstraction; the inline
query against whatever ``aperag.indexing.fulltext.FulltextModality``
wrote is the canonical path.
ALEMBIC env.py — drop deleted-module bootstrap import:
- aperag/migration/env.py: remove
``import aperag.domains.indexing.db.models # noqa: F401``
(module hard-deleted in chunk 2). The canonical
``aperag.indexing.models`` import a few lines down already
registers ``DocumentIndex`` against ``Base.metadata`` for
autogen.
SELECTIVE TEST DELETION (per architect msg=3890c9d7 Item 4):
- tests/unit_test/test_es_p0_contract.py — DELETE (tested
legacy ES ``aperag/domains/indexing/fulltext_index.py`` shape)
- tests/unit_test/test_es_shared_index_rollout.py — DELETE
(same)
- tests/unit_test/test_evaluation_v2_worker.py:
``test_evaluation_run_service_launch_run_dispatches_celery_task``
removed (Celery-specific assertion; new path is asyncio
fire-and-forget; the 13 ``test_execute_evaluation_run_*``
tests above lock the worker behaviour)
- tests/unit_test/graph_curation/test_service.py:
``test_start_run_marks_failed_when_enqueue_raises`` removed
(asyncio.create_task doesn't synchronously raise on schedule
so the assertion no longer maps to reachable behaviour)
LEGACY MIGRATION SCRIPT DELETED:
- scripts/migrate_es_fulltext_shared_index.py — one-time Wave-1-era
ES per-collection → shared rollout migration that referenced the
hard-deleted ``aperag/domains/indexing/fulltext_index.py``. Not
production runtime code; the rollout already happened.
T3.2 CONTRACT TEST UPDATE:
- tests/unit_test/service/test_search_graph_contract.py:
``test_search_result_metadata_is_public_allowlist`` add
expected ``index_modality: "vision"`` field (Bryce T3.2
commit 5325788 §G.5 ``SearchResultMetadata.from_raw()``
derives it from ``indexer`` raw key — the test predates the
schema extension and would have failed once T3.2 merged).
GATES (FINAL HEAD):
- ``grep "from aperag.tasks\|import aperag.tasks\|
from aperag.concurrent_control\|from aperag.domains.indexing.
(tasks|orchestration|manager|*_index|db.models)\|from config.celery\|
^from celery\|^import celery"`` over aperag/ + config/ + scripts/
→ **0 hits in production code** ✅
- ``alembic upgrade head`` → succeeds (5 indexing migrations
including T3.1 ``d0f4c1b9a8e2`` rename) ✅
- ``alembic downgrade -1`` then ``upgrade head`` → reversible
round-trip ✅
- ``ruff check + format --check`` over aperag/ tests/ scripts/
→ **clean** (491 files formatted) ✅
- ``pytest tests/unit_test/ tests/load/ --ignore=objectstore``
(objectstore needs moto extra, pre-existing) → **900 passed
/ 29 skipped / 0 failed** ✅
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* fix(celery T3.1 alembic drift): promote DocumentIndex.{collection_id,source_path} to NOT NULL in model to match alembic d0f4c1b9a8e2 post-state
CI ``alembic check`` (drift detector) caught a Wave-1-era stale model
declaration. The migration ``d0f4c1b9a8e2`` correctly ALTERs both
columns to NOT NULL (per architect msg=498b12f0), but
``aperag/indexing/models.py:108-109`` still declared
``Mapped[str | None] ... nullable=True`` from the original Wave 1
fixture-back-compat era. After ``alembic upgrade head`` the DB was
NOT NULL but ``Base.metadata`` was nullable, so autogen wanted to
emit ``ALTER COLUMN ... DROP NOT NULL`` to revert the DB.
The PM directive (``msg=0dd76df9``) read the autogen log
"Detected NULL on column" as "DB has NULL" and asked to add the
ALTER NOT NULL to the migration; the migration already does that.
The actual fix is to align the model with the migration's
post-state (NOT NULL), not the other way around — Wave 3 lifted the
back-compat the original ``nullable=True`` was protecting.
Changes:
- aperag/indexing/models.py:108-109: ``Mapped[str | None] ... nullable=True`` → ``Mapped[str] ... nullable=False`` for both columns + comment refresh pointing at the alembic NOT-NULL promotion
- tests/unit_test/indexing/test_t2_1_runtime.py:
``test_reconciler_skips_pending_rows_missing_source_path`` deleted —
the fixture ``_insert_row(... source_path=None)`` now raises
IntegrityError before reconcile_pending_dispatch is ever called, so
the scenario is unreachable from a clean schema. The defensive
``if not row.source_path`` branch in
``aperag/indexing/reconciler.py`` is kept as a zero-cost guard but
no longer reachable without manual SQL bypass.
Gates:
- ``uv run alembic -c aperag/alembic.ini check`` → "No new upgrade operations detected" ✅
- pytest tests/unit_test/ tests/load/ --ignore=objectstore → 899 passed / 29 skipped / 0 failed ✅
- ruff check + format --check clean on the 2 modified files ✅
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* fix(celery T3.1 e2e): purge existing triple before INSERT in rebuild adapter + drop celery infra
CI e2e-http-provider caught two Wave-3-induced regressions on PR #1729
HEAD `5d50ca5`:
**Blocker 1 — rebuild_indexes 500 DATABASE_ERROR**:
The chunk-3 ``_create_or_update_document_indexes`` adapter calls
``dispatch_indexing()`` which INSERTs new ``document_index`` rows.
``rebuild_indexes`` re-invokes the adapter with the same
``(document_id, parse_version, modality)`` triple (content unchanged
→ parse_version unchanged), so the §F.1 ``uq_document_index_triple``
UNIQUE constraint fails the INSERT with IntegrityError → 500. Pre-
DELETE matching rows (any status / serving state) before INSERT so
the dispatcher's INSERT lands cleanly. The §F.3 cutover-on-sync-
completion re-establishes the serving state once the new dispatch's
worker finishes; brief unavailability between DELETE and cutover is
acceptable for an explicit rebuild op.
Test failure traced from `tests/e2e_http/hurl/full/11_document_full.
hurl:204` POST `/api/v2/collections/.../documents/.../rebuild_indexes`
expecting HTTP 200, getting 500.
**Blocker 2 — celerybeat container `celery: not found`**:
chunk 2 dropped ``celery`` + ``django-celery-beat`` from
``pyproject.toml`` and deleted ``aperag/tasks/`` + ``config/celery.py``,
but the docker-compose ``celeryworker`` / ``celerybeat`` / ``flower``
services + helm chart ``celeryworker-deployment.yaml`` /
``celerybeat-deployment.yaml`` / ``flower-deployment.yaml`` + the
``scripts/start-celery-{worker,beat,flower}.sh`` entry scripts were
left behind. CI e2e-aperag spins up the docker-compose stack, the
``celerybeat`` container tries to ``exec celery`` and fails (binary
not in image since pyproject dropped the dep).
The new in-process ``aperag.indexing`` runtime (worker pool +
reconciler + cleanup loops) is spawned by the FastAPI lifespan
inside the ``aperag-api`` container, so no separate worker / beat /
monitoring pods are needed.
DELETED:
- docker-compose.yml: ``celeryworker`` / ``celerybeat`` / ``flower``
service blocks (replaced with explanatory comment block)
- scripts/start-celery-{worker,beat,flower}.sh
- scripts/test/celery-{call-task,with-local-queue}.sh
- scripts/celery/trigger_trask.sh + the ``scripts/celery/`` dir
- deploy/aperag/templates/celeryworker-deployment.yaml
- deploy/aperag/templates/celerybeat-deployment.yaml
- deploy/aperag/templates/flower-deployment.yaml
- deploy/aperag/values.yaml: ``celery-worker`` + ``celerybeat`` +
``flower`` value blocks (replaced with explanatory comment)
- deploy/aperag/templates/aperag-secret.yaml: ``CELERY_FLOWER_*``
env entries (no flower pod to consume them)
- deploy/aperag/templates/_helpers.tpl: ``celeryworker.labels``
template (no chart consumes it)
- deploy/aperag/values.yaml api podAffinity-with-celery-worker rule
(the api pod no longer needs to co-locate with a non-existent
worker pod; the soft anti-affinity for spreading api replicas
across nodes is preserved)
- deploy/aperag/templates/api-deployment.yaml: comment "shared
uploaded files between api and celery" → "uploaded files volume
consumed solely by the in-process ``aperag.indexing`` runtime"
Local gates:
- ruff check + format --check on the changed files → clean ✅
- pytest tests/unit_test/indexing/ tests/load/ test_phase3_reexport_audit.py → 133 passed ✅
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* fix(celery T3.1 e2e): drop celery service refs in e2e runners + CI workflow + Makefile
Wave 3 chunk 2 + 144c3f1 deleted the docker-compose ``celeryworker`` /
``celerybeat`` / ``flower`` services + helm charts, but a few
infra-side scripts that explicitly referenced those service names
were missed. CI e2e-http-smoke caught it: ``docker compose up -d
celeryworker`` failed with ``no such service: celeryworker``.
This PR plugs the four straggler call sites:
- tests/e2e_http/runners/compose/up.sh:8: ``E2E_COMPOSE_SERVICES``
default drops ``celeryworker celerybeat`` → just ``postgres redis
qdrant es api``. The api container's FastAPI lifespan spawns the
in-process indexing runtime, so no separate worker container.
- tests/e2e_http/scripts/provider_diagnostic.sh:63: failure-diag
log-dump loop drops ``celeryworker celerybeat`` from the service
list.
- .github/workflows/e2e-http-smoke.yml:68,173: ``docker compose logs``
in the failure-dump steps drops ``celeryworker celerybeat``.
- Makefile: deleted ``serve-worker`` / ``serve-beat`` / ``serve-flower``
targets + their help-string entries (the binaries are gone since
pyproject dropped ``celery``).
Local sanity: ``grep -rn 'celery|celerybeat|celeryworker|flower' tests/
e2e_http/ .github/ Makefile docker-compose.yml deploy/`` returns only
explanatory comment lines (the in-process runtime replacement
narrative); no live service / command references remain.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* fix(celery T3.1 worker_factory): replace _placeholder_worker_factory with ProductionWorkerFactory + harden orchestrator factory-error path
Wave 3 hard-cut deleted the legacy Celery indexers but left the
FastAPI lifespan wiring ``run_*_worker`` with a placeholder factory
that raised ``NotImplementedError`` on every dispatch. e2e-http-
provider stalls on ``wait_for_document_indexes`` because the row
never advances past PENDING (PM msg=dc13c4a5 root cause).
Per architect msg=7782ebe0 spec lock:
- ``aperag/indexing/worker_factory.py`` (new): per-task lazy
``ProductionWorkerFactory`` resolving ``Collection`` from the
payload, building the right ``ModalityWorker`` with real
Qdrant / Elasticsearch backends + the configured embedder /
completion model. Composes existing helpers
(``get_collection_embedding_service_sync`` /
``get_vector_db_connector`` / ``get_object_store`` /
``build_collection_llm_callable``) so this is wiring, not
re-implementation. Failures raise ``WorkerFactoryError`` so the
operator gets a meaningful ``error_message``. Graph modality is
intentionally minimal (in-memory lineage store + no-op extractor)
pending Wave 4 Nebula-side §D.3.6 lineage adapter — documented as
a known gap, not a regression; the e2e-http-provider gate only
blocks on vector ACTIVE.
- ``aperag/indexing/orchestrator.py``: harden ``_runner`` to claim
the row + finalise FAILED on factory error so the §I.2 retry-
with-backoff schedule kicks in. Without this, factory errors got
silently swallowed by the asyncio.Task and the row sat at
PENDING forever.
- ``aperag/app.py``: replace the placeholder closure with a
``ProductionWorkerFactory`` instance.
- ``tests/integration/test_worker_factory.py``: 3 tests pinning
factory-failure → FAILED-finalize, collection-not-found path,
and missing-collection-id path.
Local gates: pytest tests/unit_test/ tests/integration/ tests/load/
--ignore=tests/unit_test/objectstore = 909 passed / 41 skipped /
0 failed (+3 from this commit). ruff check + format --check clean.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* fix(celery T3.1 view-model): align Document per-modality status Literal to §F.2 4-state + drop SKIPPED sentinel + skip vector when disabled
Per architect msg post-pass-5 + PM msg=79683cc0 ruling. Two e2e-http-
smoke bugs surfaced after the worker_factory wire-in lands:
**Bug 1 — Pydantic 400 on GET document.** orchestrator claims a row
to ``RUNNING`` (the §F.2 canonical 4-state) before the worker
finishes; the ``Document`` view model's per-modality status Literal
still listed the legacy 6-state vocabulary
(``CREATING``/``DELETING``/``DELETION_IN_PROGRESS``/``SKIPPED``)
which never includes ``RUNNING`` — so any GET racing the claim
returned ``ValidationError``. The Wave 3 hard-cut migrated the DB
enum but missed this view-model layer (CR step-0 lesson #6:
schema-touching PR must trace enum references through every
deserialise surface, not just the write path).
The fix collapses the 5 per-modality status Literals to the §F.2
4-state ``Optional[Literal["PENDING", "RUNNING", "ACTIVE",
"FAILED"]]``. "Modality not enabled" is now expressed by the field
being absent (``None``) rather than the sentinel ``"SKIPPED"`` —
the row simply does not exist in ``document_index``. Friendly
client-facing mapping (``NOT_ENABLED`` / ``INDEXING``) lives in
§G.5 ``SearchResultMetadata.index_state_per_modality`` for the
read-path response.
**Bug 2 — collection without embedder triggers FAILED loop.**
``_get_index_types_for_collection`` always added ``Modality.VECTOR``
regardless of the collection's ``enable_vector`` flag. A collection
without an embedding-model config (smoke test fixture) then
dispatched a vector job, the production worker factory raised
``WorkerFactoryError`` (no embedder), the orchestrator finalised
``FAILED``, the reconciler re-dispatched, repeat. The fix honours
``enable_vector`` symmetric with ``enable_fulltext``: explicitly
disabled means no row in the document_index table for that modality.
Files:
- ``aperag/domains/knowledge_base/schemas.py``: 5 status fields
→ ``Optional[Literal["PENDING", "RUNNING", "ACTIVE", "FAILED"]]``
- ``aperag/domains/knowledge_base/service/document_service.py``:
``_build_document_response`` returns ``None`` when index row
missing (instead of ``"SKIPPED"``); ``_get_index_types_for_collection``
honours ``enable_vector`` flag.
- ``tests/e2e_http/hurl/{smoke/03_document_basic,full/11_document_full}.hurl``:
6 assertions migrated from ``== "SKIPPED"`` to ``== null``.
Local gates: pytest tests/unit_test/ tests/integration/ tests/load/
--ignore=tests/unit_test/objectstore = 909 passed / 41 skipped /
0 failed (unchanged from 579b32a1). ruff check + format clean.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* fix(celery T3.1 evaluation cross-loop): run_evaluation_run as coroutine + drop asyncio.to_thread caller wrap
Wave 3 chunk 2 Pattern C migration moved 5 ``.delay()`` callsites to
``asyncio.create_task(asyncio.to_thread(run_evaluation_run, run_id))``,
but ``run_evaluation_run`` was still a sync wrapper that called
``asyncio.run(execute_evaluation_run(run_id))`` inside the worker
thread — spawning a *fresh* event loop each invocation.
Any asyncpg connection borrowed by ``execute_evaluation_run`` is
bound to the FastAPI lifespan loop's connection pool; running the
coroutine on a brand-new loop made every connection-pool ``ping``
fail with ``RuntimeError: got Future attached to a different loop``,
which corrupted the asyncpg shared pool. Subsequent DB calls from
unrelated code paths (every later e2e-http-provider hurl test that
touched Postgres) tripped the same error → CI exit 1 (per huangheng
pass-6 followup msg + PM msg=37da5249).
Fix per huangheng option (a):
* ``aperag/domains/evaluation/tasks.py``: ``run_evaluation_run``
becomes ``async def``, awaits ``execute_evaluation_run`` directly.
No fresh loop. Docstring spells out the failure mode so a future
reader does not regress.
* ``aperag/domains/evaluation/services.py``: caller drops
``asyncio.to_thread`` and schedules the coroutine directly via
``asyncio.create_task(run_evaluation_run(run_id))``. The task
shares the FastAPI lifespan loop, keeping asyncpg pool affinity.
Pattern C contract preserved (fire-and-forget at the request
handler boundary); only the inner mechanism changes from
"thread + new loop" to "coroutine on shared loop". The other 4
``.delay()`` callsites in chunk 2 were genuine sync work and stay
on ``asyncio.to_thread`` — only evaluation's body was async-native
under the hood, which is why this was the one that blew up.
Local gates: pytest tests/unit_test/ tests/integration/ tests/load/
--ignore=tests/unit_test/objectstore = 909 passed / 41 skipped /
0 failed (unchanged). ruff check + format clean.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* fix(celery T3.1 evaluation hurl): relax timing-sensitive assertions for Pattern C in-process dispatch
CI run 24976479158 (PR #1729 head e1f23258) failed at
``16_evaluation_v2.hurl:218`` with the assertion
``$.items[0].status == "queued"``; the actual response showed
``status="running"`` because the post-pass-7 evaluation cross-loop
fix (e1f23258) made dispatch effectively immediate — the
``asyncio.create_task(run_evaluation_run(run_id))`` worker starts
on the next event-loop tick, so by the time the GET arrives the
run has already left "queued".
The test was written for Celery ``.delay()`` semantics where
"queued" was a stable, externally-observable transient state thanks
to broker round-trip + worker pickup latency. Pattern C in-process
collapses that latency to microseconds, so "queued" is no longer
reliably observable on a follow-up GET.
Fix per huangheng option (a) + PM ack: relax 4 timing-sensitive
assertions to accept any in-flight or terminal state via ``matches
"^(queued|running|completed|cancelled)$"`` (item status uses the
correspondingly-broader ``pending|...|failed|cancelled``). The
contract this test pins is "the run shows up correctly in list /
detail endpoints with the right ids", not "dispatch is slow enough
to observe a specific transient state". POST-response asserts
(lines 183, 207) keep the strict ``status == "queued"`` value
because those are synchronous returns built before the
``create_task`` fires.
Also relaxes:
- ``summary.pending == 3`` → drop (kept ``summary.total == 3``,
which is fixed by dataset cardinality)
- ``progress.percent == 0`` → drop (now race-window-dependent)
- ``items[0].status == "pending"`` → matches in-flight set
- ``items[0].attempt_count == 0`` → drop (worker may have
attempted already)
- ``attempts body contains "items":[]`` → ``$.items exists``
(envelope shape only, ignore population timing)
Local gates: pytest 161 passed (evaluation worker + openapi
contract + indexing + integration + load + phase3 audit).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* fix(celery T3.1 parser-wiring): sync invoke parse_document before dispatch_indexing in document_service
PR #1729 head 30b34894 e2e-http-provider failed at the scripted
``run_chat_collection_flow.sh`` business flow because vector + fulltext
modality workers reported "found no chunks at user-X/colY/docZ;
treating as derive-incomplete and skipping" on every claim — the
chunks.jsonl artifact never existed at the dispatcher's
``source_path``.
Root cause (architect msg=c605037e ruling): chunk 2's hard-cut
deleted ``aperag/domains/indexing/{tasks,orchestration,manager,
*_index}.py`` whose former ``process_document_task`` ran
:func:`aperag.indexing.parse_document` and wrote the canonical
``derived/parse_<v>/{markdown.md,outline.json,chunks.jsonl}``
artifacts before enqueuing modality workers. The new dispatch
path never picked up that responsibility — every modality
worker.derive pulled an empty derived path and the row stayed in
the §C.7 reschedule loop forever.
Fix per architect option (1) — Wave 3 minimal scope, not skip:
``aperag/domains/knowledge_base/service/document_service.py``
``_create_or_update_document_indexes`` now:
1. Resolves the upload object path from
``document.doc_metadata.object_path`` (the upload handler
already stashes it there).
2. Reads the source bytes from the object store on a worker thread.
3. Calls :func:`parse_document` synchronously on a worker thread
so the canonical ``derived/parse_<v>/`` artifacts exist before
any modality dispatch.
4. Uses ``parsed.parse_version`` and ``parsed.chunks_path`` as
the dispatcher's parse_version / source_path (replaces the
previously-computed-locally values that pointed at the document
base prefix, not the chunks.jsonl file).
This keeps §E.2 "parse-as-first-stage" intact; the parse step
runs inside the request task instead of a separate
``parse_worker`` queue process. Wave 4 follow-up may promote
parse to ``q:parse`` once observed parse latency starts blocking
HTTP requests; the sync path is acceptable for current latencies.
Parse failure raises and propagates → HTTP 500 → no modality rows
created (per architect ruling: "fail loudly, no half-state").
New integration test
``tests/integration/test_dispatch_with_parse.py`` pins the canonical
post-fix flow: parse first → dispatch with chunks.jsonl path →
modality workers reach ``status=ACTIVE`` AND ``is_serving=TRUE``
(uses ``IndexingMode.INLINE`` so no lifespan / async queue
needed; the same data-flow contract).
Local gates: pytest tests/unit_test/ tests/integration/ tests/load/
--ignore=tests/unit_test/objectstore = 910 passed / 41 skipped /
0 failed (+1 new test). ruff check + format clean.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* fix(celery T3.1 qdrant id): UUID5-derive Qdrant point id from chunk_id in worker_factory
PR #1729 head 8ca396fa e2e-http-provider fa…
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Indexing redesign design pack per earayu2 directive (
#celery msg=56812dd6+msg=d8080c08). 1049-line design pack covering current system analysis + first-principles redesign + 7-PR migration plan.Architect (符炫炜) is sole owner of the final design per earayu2's directive ("@符炫炜 要给我一个完整的设计方案,其他人只给@符炫炜 做辅助").
Architect's recommendation in one sentence
Drop Celery, adopt a filesystem-as-source-of-truth pattern with derived per-modality artifacts (jsonl, markdown, etc.), use a thin Redis-backed asyncio worker pool, and let the database hold the state machine on
(document_id, parse_version, modality)triples — collapses three different ownership layers (Celery task / lease ledger / DB state) into one (DB), eliminates ~50% of the current code complexity, scales to 100+ concurrent documents on a single server, and makes per-modality reasoning concrete (each modality reads/writes one derived artifact file).Three open decisions for earayu2
Design pack sections
parse_versiontriple)Test plan
Sibling references
#celery:56812dd6 msg=791082a4#celery:56812dd6 msg=38fbf962#celery:56812dd6 msg=19f283d5#celery:56812dd6 msg=2ee66c89#celery msg=d8080c08🤖 Generated with Claude Code — 符炫炜 (chief architect)