Skip to content

Commit 0348a78

Browse files
earayuclaude
andauthored
feat(celery Wave 1): Foundation + 5 modalities + observability (#1726)
* feat(celery T1.1): Foundation — schema + Modality ABC + object_store + parser Phase celery T1.1 per docs/modularization/indexing-redesign-design-pack.md (PR #1725 v3 head 5d7a60f). Foundation lane that the other Wave 1 modality lanes (T1.2 graph @bryce / T1.3 vector+fulltext / T1.4 summary+vision / T1.5 observability) depend on. What this adds (~600 LOC + tests): - alembic migration `f9c4d2a8e1b5_indexing_redesign_document_index.py` creates the `document_index` table with the §F.1 partial unique index `uniq_document_index_serving` enforcing "at most one is_serving=TRUE row per (document_id, modality)" at the DB layer (Bryce v2 review msg=7ccb176f #2). Postgres native; SQLite ≥3.8 supports the same syntax (Tier 1 §L deploy stays consistent). - `aperag/indexing/models.py` — DocumentIndex SQLAlchemy ORM mirroring the alembic schema, plus `Modality` (5 values per §C/§D) and `IndexStatus` (4 lifecycle states per §F.2) string-enums. - `aperag/indexing/base.py` — `ModalityWorker` ABC with `derive` + `sync` so per-modality workers (T1.2-T1.5) inherit the §D.1 "DELETE-by-(doc, parse_version) THEN INSERT" replace-idempotent contract. Graph reinterprets DELETE as the §D.3 lineage-level DELETE+INSERT internally; the ABC accepts that variation. - `aperag/indexing/object_store.py` — Atomic write helpers that wrap the existing `aperag.objectstore` package: LocalFS uses tmp+fsync+rename per §C.7; S3/MinIO relies on the single-request PutObject (or multipart CompleteMultipartUpload) visibility gate. Includes `read_or_none` for the §C.7 read contract and an `InMemoryObjectStore` test fixture so downstream T1.x can wire unit tests without touching disk. - `aperag/indexing/parser.py` — Deterministic parser entry point that produces the three shared artifacts (`markdown.md` / `outline.json` / `chunks.jsonl`) under `derived/parse_<v>/` per §C.1. parse_version is computed via the canonical D10.g §E.2 helper (`compute_parse_version`) so a chunking change rolls the version. T1.1 ships an in-process simulator implementation that proves the write contract; production parser integration (docparser/Marker/OCR) swaps in at T2.x without changing the artifact shape. - `aperag/migration/env.py` — Register `aperag.indexing.models` for alembic autogen (the new module deliberately lives outside the per-domain `db/models.py` tree because the `aperag.domains.indexing` surface is the Wave 3 hard-delete target). Tests cover the three Wave 1 acceptance gates locked by the design pack §K: 1. Partial unique invariant: an INSERT of a second is_serving=TRUE row for the same (document_id, modality) raises IntegrityError; non-serving rows + a different modality on the same document are allowed; the §F.3 three-statement cutover transaction satisfies the constraint. 2. Object-store atomic write: `_LocalAtomicWriter` produces a final destination file with no `.tmp.*` siblings remaining; concurrent writers to different artifacts in the same parse_version directory each land their own bytes; `InMemoryObjectStore` matches the single-call atomicity semantic. 3. Parser → derived/ round-trip: parse_document writes the three canonical artifacts; identical inputs yield identical parse_version + identical artifact contents (§C.3 idempotent retry); a chunking config change rolls parse_version (§E.2 hash); chunks.jsonl round-trips via `read_chunks`; outline.json carries slash-separated `section_path` + slug `heading_anchor` per the D10.c §A.9 R1 lock; re-running parse_document overwrites atomically; missing/empty artifacts are treated as "derive not yet complete" (§C.7 read contract); path traversal is rejected. Out of scope for this lane (per §K decomposition): - T1.2 graph modality + §D.3 lineage model (@bryce, task #7) - T1.3 vector + fulltext modalities (chenyexuan, task #8) - T1.4 summary + vision modalities (chenyexuan, task #9) - T1.5 observability OTLP emit (chenyexuan, task #10) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery Wave 1): T1.3+T1.4+T1.5 modalities + observability + foundation fix-forward Bundles the architect-mandated fix-forward (msg=4a801b2b, msg=c3b0ba5b, msg=07b5b1e6) on top of T1.1 foundation: T1.1 fix-forward (per architect rulings on PR #1726 P0 bugs): - Bug1: object_store.py LocalObjectStore import alias (class is named Local) - Bug2: tablename → document_index_v2 + index renames (Wave 3 renames back to canonical via alembic per task #14 acceptance amendment) - NEW (§H.2): tenant_scope_key VARCHAR(64) NOT NULL column + idx_document_index_v2_tenant_scope index — locked into Wave 1 schema for T2.2 quota lane to consume without migration churn - Tests: fixture default tenant_scope_key="user:test" T1.3 Vector + Fulltext (§D.1 replace-idempotent contract): - VectorBackend / InMemoryVectorBackend Protocol + Qdrant-shaped delete_by_filter + upsert_point - VectorModality: derive no-op pass-through, sync DELETE-by-(doc, parse_v) THEN per-chunk upsert with placeholder embedding (sha256-derived 16-dim) - FulltextBackend / InMemoryFulltextBackend + delete_by_query + bulk_index - FulltextModality: shares chunks.jsonl with vector (§C.6); chunk_id parity preserved for hybrid dedup at search layer - Tests: replace-idempotent on double sync, new parse_version doesn't corrupt old slot, hybrid chunk_id parity, modality discriminator on payload, missing-artifact no-op (§C.7 reschedule semantic) T1.4 Summary + Vision (§C.6 + §D.2 expensive-derive split): - SummaryModality: derive reads parser markdown.md, runs placeholder summarizer (first non-heading paragraph), embeds, writes summary.json atomically; sync deletes by filter + upserts single point keyed summary:{document_id}:{parse_version} - VisionModality: derive reads synthetic image-records JSON (T1 simulator contract — T2.x replaces with real PDF extract + vision-LLM), writes vision/manifest.jsonl, sync upserts one point per image keyed vision:{document_id}:{parse_version}:{image_id} - Both backends use Qdrant-shaped Protocol + InMemory test fixtures - Tests: derive persists canonical artifact (cost preserved across retries), idempotent on double sync, new parse_version doesn't corrupt old slot, modality discriminator, missing-artifact no-op T1.5 Observability primitives (§J.1 SLI emission): - 5 metric name constants prefixed indexing.* — index_lag_seconds / index_failure_rate / index_success / queue_depth / worker_utilization - MetricsEmitter Protocol + NoopMetricsEmitter (Tier 1 deploy without OTLP) + InMemoryMetricsEmitter (test fixture) - emit_index_lag / emit_index_failure / emit_index_success / emit_queue_depth / emit_worker_utilization helpers — modality attribute optional, utilization clamps to [0, 1] and handles capacity=0 - Tests: emission shape contract for each helper + metric name prefix lock Per architect msg=f21a79f0 + PM msg=07b5b1e6 + msg=95012fdb: this commit accumulates onto the Wave 1 mega-PR #1726. Bryce will rebase + push T1.2 graph commit on the same branch; once T1.2 lands, the full Wave 1 PR flips to in_review for huangheng's step 0+ / step 0 / step 0''' / cross-lane caller sweep CR + verdict. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(celery T1.2): graph modality — §D.3 lineage model + per-entity Redis lock + tenant_scope_key Per docs/modularization/indexing-redesign-design-pack.md §D.3 + architect msg=cc555e33 / msg=f2921ae0 / msg=c3b0ba5b lineage rulings. The graph modality is the only one whose backend rows are shared across documents, so the simple DELETE-by-(doc, parse_version) + re-INSERT pattern that vector/fulltext/summary/vision use breaks the shared-entity model — Bryce v2 review msg=7ccb176f #3 surfaced this. T1.2 lands the lineage-tracked entity / relation rows + the §D.3.2 two-phase sync algorithm + the per-entity Redis lock that protects Nebula's read-modify-write window from racing. Surface (aperag/indexing/graph.py, ~1017 lines): * Lineage data model — LineageMember(document_id, parse_version, tenant_scope_key, chunk_ids), DescriptionPart, EntityRecord, RelationRecord, EntityWithLineage, RelationWithLineage. The tenant_scope_key lives at SET-element level (not entity row level) per architect msg=c3b0ba5b — placement chosen so a shared entity cited by multiple tenants still has one row but each lineage member carries its own tenant attribution for read-path ACL filtering (§H.2 quota / organization key). * Per-entity lock — EntityLock Protocol + InMemoryEntityLock (asyncio.Lock per key, single-process default) + RedisEntityLock (Redis SETNX-style lock keyed by f"{prefix}:{slot}" where slot is crc32(entity_id) % 4096 to bound the key space). The lock is mandatory on the Nebula path because Nebula 3.x's list ops require application-layer read-modify-write; concurrent sync calls touching the same shared entity would otherwise lose lineage members at the network round-trip. * LineageGraphStore Protocol — backend abstraction so the §D.3.2 algorithm in GraphModalityWorker is portable across the three backends listed in §D.3.5 (Nebula 3.x, Neo4j with native list + APOC, in-memory). Methods filter lineage by document_id (not exact (doc, parse_version)) so re-parsing supersedes the old parse_version cleanly per §D.3.6 step 3 (see §D.3.2 amendment note in docstring). * InMemoryLineageGraphStore — Python-dict reference implementation used as the canonical correctness oracle. The §D.3.6 self-test + concurrent race test run against it; any future Nebula / Neo4j adapter that satisfies the Protocol inherits pass/fail status from this suite. * GraphExtractor — Callable injection point so the production LightRAG-based LLM extractor (Wave 2 wiring) and the test stubs share the same surface. The kg.jsonl artifact persists the extractor output so retries never re-charge LLM cost (§C.6 CANONICAL artifact contract). * serialize_kg_jsonl / parse_kg_jsonl — line-oriented JSON with a "kind" discriminator so future record types can be added without breaking the file format. Empty payloads encode as b"\\n" (one byte) so the §C.7 "empty body == derive not finished" sentinel cannot collide with a deliberate "no records produced" payload — the deletion flow (§D.3.6 step 4 / step 5) publishes b"\\n" to cleanly clear a document's lineage contribution. * GraphModalityWorker — implements ModalityWorker ABC. derive() reads chunks.jsonl produced by the T1.1 parser, calls the injected extractor, writes kg.jsonl atomically. sync() reads kg.jsonl and applies the §D.3.2 algorithm: Phase 1 (cleanup): for every entity / relation in the lineage backend whose lineage SET contains the document_id, remove ALL members for that document_id (across any parse_version) and GC rows that go orphan. Per-entity lock held during cleanup so concurrent syncs cannot race the read-modify-write. Phase 2 (rebuild): for every entity / relation in kg.jsonl, upsert with a fresh LineageMember stamped with the current (document_id, parse_version, tenant_scope_key). Per-entity lock held during rebuild for the same reason. Tests (tests/unit_test/indexing/test_t1_2_graph.py, ~1044 lines, 16 test cases): * §D.3.6 five-step idempotency suite (test_d3_6_step1..step5): doc_A v1 → doc_B v1 → doc_A v2 supersedes → delete doc_A → delete doc_B with full entity GC. Pin the §D.3.2 algorithm against the §D.3.6 narrative; the regression Bryce surfaced in msg=7ccb176f #3 fails this suite under the v1 append-on-conflict path. * Relation-lineage symmetric coverage (test_relation_lineage_doc_a_then_doc_b_then_delete_doc_a) — proves the same lineage semantics flow through relation evidence_lineage. * §D.4 byte-equivalent re-sync (test_d4_byte_equivalent_resync) — double-sync with the same artifact leaves the backend identical. The cross-modality idempotency contract every modality must pass. * Nebula race condition under per-entity lock (test_nebula_race_under_per_entity_lock_preserves_both_writes + test_nebula_race_without_lock_loses_a_writer) — uses a _RaceProvocateurStore that emulates Nebula's read-modify-write network round-trip with a deterministic asyncio yield. Under the in-memory entity lock, both writers' lineage members end up in the SET (PASS); under a no-op lock, deterministically one writer loses (the negative control). Pins the architect msg=f2921ae0 invariant that per-entity serialization is mandatory not optional. * tenant_scope_key propagation (test_tenant_scope_key_propagates_into_lineage_members) — pins the SET-element placement decision per architect msg=c3b0ba5b so a future regression that drops the field or moves it to entity row level fails loudly. * kg.jsonl round-trip (test_kg_jsonl_*) — entity / relation serialization, forward-compatible kind skipping, empty-body encoding (always at least b"\\n" to distinguish "no records" from "derive crashed"). * derive contract (test_derive_writes_kg_jsonl_under_canonical_path + test_sync_with_missing_artifact_is_a_noop) — derive writes to derived/parse_<v>/kg.jsonl per §C.6 canonical layout; sync no-ops when the artifact is missing per §C.7 read contract. * End-to-end with the real T1.1 parser (test_end_to_end_with_real_parser_chunks) — proves graph.derive cooperates with the chunks.jsonl shape that aperag.indexing.parser.parse_document actually produces, no hidden coupling beyond the §C.6 contract. aperag/indexing/__init__.py — re-export the T1.2 graph public surface alongside chenyexuan's T1.3/T1.4/T1.5 modality exports so the indexing package surface is uniform across all five modalities. Lint: ruff check + ruff format --check both clean across aperag/indexing/ and tests/unit_test/indexing/. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(celery T1.1 tests): leftover ImportError + bogus settings monkey-patch Three follow-up bugs surfaced by Bryce's local pytest collection (msg=464d5b70) on top of fix-forward 859f899: 1. test_t1_1_foundation.py:69 — ImportError. The class is named `Local` (not `LocalObjectStore`); my main-tree object_store.py already uses the `Local as LocalObjectStore` alias per architect ruling on Bug 1 (msg=4a801b2b), but the test file's own import line still referenced the wrong name. Fixed: same alias pattern. 2. test_local_atomic_write_uses_tmp_rename_dance — AttributeError on `aperag.objectstore.local.settings`. The module never had a `settings` attribute; the monkey-patch block was unfounded. `Local(LocalConfig)` accepts the config struct directly, no module-level singleton dependency. Replaced the monkey-patch block with a direct `LocalObjectStore(LocalConfig(root_dir=...))` construction. 3. test_concurrent_atomic_writes_dont_clobber_each_other — same AttributeError pattern, same fix. No production code changed; only the T1.1 test fixture is simplified (net -30 lines). Lint clean. Wave 1 PR #1726 ready to flip to in_review after this push. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(celery T1.1 audit): narrow allowlist for transitional DocumentIndex duplicate huangheng's Wave 1 CR (msg=8e67bf0e) flagged P1: test_phase3_classes_have_single_definition_site fails because Wave 1 introduces aperag/indexing/models.py:DocumentIndex alongside the legacy aperag/domains/indexing/db/models.py:DocumentIndex still owned by Celery. Architect ruling msg=5be9a535 — option (a): add a narrow allowlist covering exactly this (class_name, file_pair); reject (b) renaming the new ORM class (would force a Wave 1 + Wave 3 double-rename, conflicts with msg=4a801b2b "Python class name stays canonical, only __tablename__ differs") and (c) xfail (does not express the "intentional transitional state" semantic). Wave 3 task #14 acceptance per architect amendment will remove this allowlist entry in the same PR that deletes the legacy file + alembic renames document_index_v2 back to document_index. The allowlist entry is narrow: - Only the exact frozenset of two file paths is accepted; any third duplicate (Wave 4+ regression, e.g. a copy-paste of the class) still flags as an offender. - Only matches `class DocumentIndex(Base):` definitions (the existing orm_pattern); pydantic schemas with the same name are unaffected. - No global widening — no other Phase 3 class gets an exception. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 704b3cf commit 0348a78

20 files changed

Lines changed: 5875 additions & 13 deletions

aperag/indexing/__init__.py

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
# Copyright 2025 ApeCloud, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Phase celery indexing redesign — public surface.
16+
17+
Per ``docs/modularization/indexing-redesign-design-pack.md``. T1.1
18+
Foundation lands the schema (``DocumentIndex`` + Modality / IndexStatus
19+
enums), the ``ModalityWorker`` ABC that downstream T1.2-T1.5 modality
20+
files implement, the atomic-write object-store helpers (§C.7), and a
21+
deterministic parser entry point that produces the shared
22+
``markdown.md`` / ``outline.json`` / ``chunks.jsonl`` artifacts
23+
(§C.1 / §C.6).
24+
"""
25+
26+
from aperag.indexing.base import DeriveResult, ModalityWorker
27+
from aperag.indexing.fulltext import (
28+
FulltextBackend,
29+
FulltextModality,
30+
InMemoryFulltextBackend,
31+
)
32+
from aperag.indexing.graph import (
33+
KG_ARTIFACT_FILENAME,
34+
DescriptionPart,
35+
EntityLock,
36+
EntityRecord,
37+
EntityWithLineage,
38+
GraphExtractor,
39+
GraphModalityWorker,
40+
InMemoryEntityLock,
41+
InMemoryLineageGraphStore,
42+
LineageGraphStore,
43+
LineageMember,
44+
RedisEntityLock,
45+
RelationRecord,
46+
RelationWithLineage,
47+
parse_kg_jsonl,
48+
serialize_kg_jsonl,
49+
)
50+
from aperag.indexing.models import DocumentIndex, IndexStatus, Modality
51+
from aperag.indexing.object_store import (
52+
InMemoryObjectStore,
53+
derived_artifact,
54+
derived_dir,
55+
read_or_none,
56+
read_or_none_async,
57+
source_artifact,
58+
write_atomic,
59+
write_atomic_async,
60+
)
61+
from aperag.indexing.observability import (
62+
INDEX_FAILURE_METRIC,
63+
INDEX_LAG_METRIC,
64+
INDEX_SUCCESS_METRIC,
65+
QUEUE_DEPTH_METRIC,
66+
WORKER_UTILIZATION_METRIC,
67+
InMemoryMetricsEmitter,
68+
MetricsEmitter,
69+
NoopMetricsEmitter,
70+
emit_index_failure,
71+
emit_index_lag,
72+
emit_index_success,
73+
emit_queue_depth,
74+
emit_worker_utilization,
75+
)
76+
from aperag.indexing.parser import (
77+
DEFAULT_CHUNK_OVERLAP,
78+
DEFAULT_CHUNK_SIZE,
79+
DEFAULT_PARSER_PIPELINE,
80+
ChunkingConfig,
81+
ParseConfig,
82+
ParseResult,
83+
parse_document,
84+
read_chunks,
85+
)
86+
from aperag.indexing.summary import (
87+
InMemorySummaryBackend,
88+
SummaryBackend,
89+
SummaryModality,
90+
)
91+
from aperag.indexing.vector import (
92+
InMemoryVectorBackend,
93+
VectorBackend,
94+
VectorModality,
95+
)
96+
from aperag.indexing.vision import (
97+
InMemoryVisionBackend,
98+
VisionBackend,
99+
VisionModality,
100+
)
101+
102+
__all__ = [
103+
# Schema
104+
"DocumentIndex",
105+
"Modality",
106+
"IndexStatus",
107+
# ABC
108+
"ModalityWorker",
109+
"DeriveResult",
110+
# Object store helpers
111+
"derived_dir",
112+
"derived_artifact",
113+
"source_artifact",
114+
"write_atomic",
115+
"write_atomic_async",
116+
"read_or_none",
117+
"read_or_none_async",
118+
"InMemoryObjectStore",
119+
# Parser
120+
"ChunkingConfig",
121+
"ParseConfig",
122+
"ParseResult",
123+
"parse_document",
124+
"read_chunks",
125+
"DEFAULT_PARSER_PIPELINE",
126+
"DEFAULT_CHUNK_SIZE",
127+
"DEFAULT_CHUNK_OVERLAP",
128+
# Modalities (T1.2 graph / T1.3 / T1.4)
129+
"VectorModality",
130+
"VectorBackend",
131+
"InMemoryVectorBackend",
132+
"FulltextModality",
133+
"FulltextBackend",
134+
"InMemoryFulltextBackend",
135+
"GraphModalityWorker",
136+
"LineageGraphStore",
137+
"InMemoryLineageGraphStore",
138+
"EntityLock",
139+
"InMemoryEntityLock",
140+
"RedisEntityLock",
141+
"LineageMember",
142+
"DescriptionPart",
143+
"EntityRecord",
144+
"RelationRecord",
145+
"EntityWithLineage",
146+
"RelationWithLineage",
147+
"GraphExtractor",
148+
"KG_ARTIFACT_FILENAME",
149+
"serialize_kg_jsonl",
150+
"parse_kg_jsonl",
151+
"SummaryModality",
152+
"SummaryBackend",
153+
"InMemorySummaryBackend",
154+
"VisionModality",
155+
"VisionBackend",
156+
"InMemoryVisionBackend",
157+
# Observability (T1.5)
158+
"MetricsEmitter",
159+
"NoopMetricsEmitter",
160+
"InMemoryMetricsEmitter",
161+
"INDEX_LAG_METRIC",
162+
"INDEX_FAILURE_METRIC",
163+
"INDEX_SUCCESS_METRIC",
164+
"QUEUE_DEPTH_METRIC",
165+
"WORKER_UTILIZATION_METRIC",
166+
"emit_index_lag",
167+
"emit_index_failure",
168+
"emit_index_success",
169+
"emit_queue_depth",
170+
"emit_worker_utilization",
171+
]

aperag/indexing/base.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# Copyright 2025 ApeCloud, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""``Modality`` abstract base — celery T1.1 Foundation.
16+
17+
Per ``docs/modularization/indexing-redesign-design-pack.md`` §C/§D, every
18+
modality (``vector`` / ``fulltext`` / ``graph`` / ``summary`` / ``vision``)
19+
implements two operations:
20+
21+
- ``derive(document_id, parse_version, source_path) -> derived_artifact_path``
22+
Reads the source / parser output, calls any expensive LLM /
23+
embedding / vision pipelines, writes the canonical artifact under
24+
``derived/parse_<version>/<modality_file>`` using the
25+
``ObjectStore`` write-then-rename / multipart-then-complete contract
26+
(§C.7) so partial writes are never visible.
27+
28+
- ``sync(document_id, parse_version, derived_artifact_path) -> None``
29+
Reads the derived artifact and applies the §D.1 two-phase
30+
replace-idempotent contract: DELETE all backend entries WHERE
31+
(document_id=X, parse_version=Y), then INSERT all entries from the
32+
artifact. Cheap to retry; never re-runs ``derive``.
33+
34+
The graph modality is the one exception to the simple
35+
DELETE-by-(doc, parse_version) shape — see §D.3 lineage model. The
36+
ABC accepts that variation: ``sync`` is just "make backend
37+
byte-equivalent to the artifact for this (doc, version) slot",
38+
however that has to be done.
39+
"""
40+
41+
from __future__ import annotations
42+
43+
from abc import ABC, abstractmethod
44+
from dataclasses import dataclass
45+
46+
from aperag.indexing.models import Modality
47+
48+
49+
@dataclass(frozen=True)
50+
class DeriveResult:
51+
"""Outcome of ``Modality.derive``.
52+
53+
The ``derived_artifact_path`` is what ``sync`` will read on this
54+
and any subsequent retry. It is opaque to callers — only the
55+
matching ``Modality.sync`` knows how to interpret it.
56+
"""
57+
58+
derived_artifact_path: str
59+
60+
61+
class ModalityWorker(ABC):
62+
"""Abstract base for the 5 per-modality workers.
63+
64+
Implementations live in ``aperag/indexing/{vector,fulltext,
65+
graph,summary,vision}.py`` and are instantiated by the
66+
orchestrator (T2.1). The ABC enforces the (derive, sync) split so
67+
the orchestrator can route retries to ``sync`` only — never
68+
re-charging the LLM / embedding cost.
69+
"""
70+
71+
#: The modality this worker owns. Subclasses MUST override.
72+
modality: Modality
73+
74+
@abstractmethod
75+
async def derive(
76+
self,
77+
*,
78+
document_id: str,
79+
parse_version: str,
80+
source_path: str,
81+
) -> DeriveResult:
82+
"""Produce the canonical derived artifact for this modality.
83+
84+
Implementations must use the ``ObjectStore`` write-then-rename
85+
/ multipart-then-complete contract (§C.7) so a partial write
86+
is never visible. Implementations MUST be idempotent at the
87+
artifact level: re-running ``derive`` for the same
88+
``(document_id, parse_version)`` produces a byte-equivalent
89+
artifact (modulo non-deterministic LLM noise, which is the
90+
whole reason the artifact gets persisted in the first place).
91+
"""
92+
93+
@abstractmethod
94+
async def sync(
95+
self,
96+
*,
97+
document_id: str,
98+
parse_version: str,
99+
derived_artifact_path: str,
100+
) -> None:
101+
"""Apply the §D.1 replace-idempotent contract to the backend.
102+
103+
DELETE all backend entries WHERE
104+
``(document_id=X, parse_version=Y)`` then INSERT the entries
105+
encoded in ``derived_artifact_path``. Re-running this method
106+
produces a backend state byte-equivalent to a fresh sync (§D.4).
107+
108+
Graph modality reinterprets DELETE as the §D.3 lineage-level
109+
DELETE+INSERT — ``sync`` keeps the same signature; the §D.3
110+
algorithm is internal to the graph implementation.
111+
"""
112+
113+
114+
__all__ = ["ModalityWorker", "DeriveResult"]

0 commit comments

Comments
 (0)