From 401decede5dab31f4987efb352ed49d1e319babf Mon Sep 17 00:00:00 2001 From: earayu Date: Tue, 28 Apr 2026 02:20:31 +0800 Subject: [PATCH] feat(graph): GraphModalityWorker.sync() Phase 3 (Wave 7 W7-3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds Phase 3 to ``GraphModalityWorker.sync()``: the four steps that turn the lineage-store rebuild (Phase 1+2, Wave 4-6) into a complete LightRAG-style graph index — compact, embed, vector upsert, snapshot-diff delete, and merge-candidate detect. Step ordering (per spec §K.12.3 + huangheng msg=16a38734 invariant list): 1. **Compactor** (W7-2): for each entity / relation just touched by this sync, run ``GraphIndexCompactor.compact_if_oversized`` over the per-doc ``description_parts`` and write the unified summary back via ``upsert_*_with_lineage(..., compacted_description=...)``. Returning ``None`` (below threshold or compactor unwired) leaves the COALESCE-preserved column alone. 2. **Embed + vector upsert**: hash the compacted summary (or the ``name`` + raw parts fallback when the compactor opted out) and ``await asyncio.to_thread(connector.upsert, [VectorPoint(...)])`` the result. Vector point id is ``uuid5(NAMESPACE_DNS, f"graph_entity:{collection_id}:{name}")`` — deterministic so a re-sync overwrites instead of leaking. Payload is the 3-field shape locked at spec §K.12.5 ratify msg=acbd0003 / msg=d3f4e6f8: ``{indexer, entity_name, entity_type}``. No ``collection_id`` payload — that lives in the uuid5 id (cross- collection uniqueness in a shared backing store) while the connector handles per-tenant guard. 3. **Snapshot-diff delete**: pre-sync entity-name set (``find_entity_ids_with_lineage`` from Phase 1) minus post-sync set (kg.jsonl entities ∪ pre-sync entities still alive after gc) → ids to delete. Computed against the lineage store, never an ANN ``list_all`` (invariant #7). 4. **MergeCandidateDetector** (W7-4, optional): pass the affected entity names to ``detect_for_sync`` so PENDING auto-detect suggestions get persisted for the curator UI. D-3 lock — detector never auto-merges. The Phase 3 dependencies (``compactor``, ``embedder``, ``vector_connector``, ``merge_detector``) are all optional kwargs. Wave 6 callers that don't wire them get the lineage-only behaviour unchanged — Phase 3 returns early when the vector connector or embedder is unset. Failures inside any step (compactor LLM flake, embedder error, vector backend hiccup, detector raise) are logged and swallowed so the lineage critical path always survives a partial Phase 3. ``worker_factory._build_graph_worker`` wires production deps: * ``compactor`` via the shared ``build_collection_llm_callable`` (the same resolver the graph extractor uses). * ``vector_connector`` + ``embedder`` via the existing ``_build_collection_qdrant_connector`` helper — the graph entity / relation vectors go into the same Qdrant collection as chunk vectors, distinguished by the ``indexer`` payload key. * ``merge_detector`` constructed once the connector + embedder resolved, with a thin shim adapting the sync ``(text -> list[float])`` callable used by the graph worker into the ``embed_query`` shape the detector expects. When any of these fail to construct (no completion model, collection's embedder unresolved, etc.) the factory logs a warning and falls back to the no-op default, mirroring the summary worker pattern. Tests: * 12 new InMemory unit tests in ``tests/unit_test/indexing/test_t1_2_graph.py`` — cover the four invariants (Phase 3 skipped without deps; 3-field payload exact; uuid5 id deterministic across resyncs; compactor before embed; fallback on compactor None; snapshot-diff delete on doc cascade; cross-doc shared entity not deleted; relation Phase 3 mirrors entity; detector receives correct names; detector failure non-fatal; compactor failure non-fatal; vector upsert failure non-fatal). All 1117 unit tests pass. §K.12 invariant cross-check: this PR materially honours #2 (L1 → L2 single-direction derivation), #3 (Compactor runs before vector embed), #4 (vector store via ``VectorStoreConnector(Adaptor)`` abstraction — no Qdrant client import), #5 (3-field payload + ``indexer`` filter), #6 (uuid5 deterministic id), #7 (snapshot-diff via lineage name set, not ANN list-all), #11 (D-3 detector writes-only, never auto-merges), #12 (no LightRAG strings introduced). 4-pattern pre-check matrix: * P1 v1: ``GraphModalityWorker.sync`` callers — orchestrator wiring in ``aperag/indexing/`` only; the new optional kwargs default to ``None`` so call shape is backward-compat. * P1 v2: caller return-shape expectations — sync still returns ``None``; nothing observable beyond Phase 3 side effects on the vector store + suggestion table. * P2 (state binding): ``GraphIndexCompactor`` is W7-2 (merged ``c1c48429``), ``MergeCandidateDetector`` is W7-4 (merged ``0dbf9fd1``), ``VectorStoreConnector`` is the existing Wave 4 abstraction. All three already in main. * P3 (Protocol method state): no new ``LineageGraphStore`` Protocol methods needed — Phase 3 reads via existing ``get_entity`` / ``get_relation`` and writes via the ``compacted_description`` kwarg shipped in W7-1. Closes Wave 7 task #3. Co-Authored-By: Claude Opus 4.7 --- aperag/indexing/graph.py | 391 +++++++++++- aperag/indexing/worker_factory.py | 126 ++++ tests/unit_test/indexing/test_t1_2_graph.py | 658 ++++++++++++++++++++ 3 files changed, 1174 insertions(+), 1 deletion(-) diff --git a/aperag/indexing/graph.py b/aperag/indexing/graph.py index 796704d96..c81a78d2a 100644 --- a/aperag/indexing/graph.py +++ b/aperag/indexing/graph.py @@ -93,7 +93,8 @@ from collections.abc import AsyncIterator, Iterable, Sequence from contextlib import asynccontextmanager from dataclasses import dataclass, field -from typing import Any, Awaitable, Callable, Protocol +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Protocol +from uuid import NAMESPACE_DNS, uuid5 from aperag.indexing.base import DeriveResult, ModalityWorker from aperag.indexing.models import Modality @@ -105,6 +106,11 @@ from aperag.indexing.parser import read_chunks from aperag.objectstore.base import ObjectStore as _SyncObjectStore +if TYPE_CHECKING: # pragma: no cover — TYPE_CHECKING-only imports avoid circular deps + from aperag.indexing.graph_compactor import GraphIndexCompactor + from aperag.indexing.merge_candidate_detector import MergeCandidateDetector + from aperag.vectorstore.base import VectorStoreConnector + logger = logging.getLogger(__name__) @@ -1118,6 +1124,10 @@ def __init__( object_store: _SyncObjectStore, collection_id: str, tenant_scope_key: str, + compactor: "GraphIndexCompactor | None" = None, + embedder: Callable[[str], list[float]] | None = None, + vector_connector: "VectorStoreConnector | None" = None, + merge_detector: "MergeCandidateDetector | None" = None, ) -> None: """Construct a graph worker bound to a single tenant scope. @@ -1127,6 +1137,28 @@ def __init__( ``sync`` so the SET-level tenant attribution survives the artifact round-trip and is available to read-path ACL filtering. + + Wave 7 W7-3 dependencies (all optional — when omitted the + sync() worker degrades to Wave 6 lineage-only behaviour and + skips Phase 3 entirely): + + * ``compactor`` (Wave 7 W7-2): produces a single bounded LLM + summary from the per-doc ``description_parts`` list. Skipped + when ``None`` — the entity's existing ``compacted_description`` + (potentially set by a prior sync or curation merge) is left + alone. + * ``embedder``: sync ``(text -> list[float])`` callable; mirrors + the ``EmbeddingService.embed_query`` signature used by the + vector worker. Required for Step B (vector upsert) and the + merge detector to function. + * ``vector_connector``: a :class:`VectorStoreConnector` already + per-tenant bound (the connector enforces the tenant guard so + the per-point payload is 3-field — see §K.12.5 ratify + msg=acbd0003 / msg=d3f4e6f8). Required for Steps B + C. + * ``merge_detector`` (Wave 7 W7-4): writes PENDING auto-detect + ``GraphCurationSuggestion`` rows. Skipped when ``None`` — + the detector is a write-only auxiliary, not on the lineage + critical path. """ self._store = store self._extractor = extractor @@ -1134,6 +1166,10 @@ def __init__( self._object_store = object_store self._collection_id = collection_id self._tenant_scope_key = tenant_scope_key + self._compactor = compactor + self._embedder = embedder + self._vector_connector = vector_connector + self._merge_detector = merge_detector # ---- derive ----------------------------------------------------- @@ -1266,6 +1302,359 @@ async def sync( len(relations), ) + # ---- Phase 3: Wave 7 W7-3 — compact + embed + vector upsert + # + snapshot-diff vector cleanup + merge candidate detect. + # + # The phase is opt-in: a worker constructed without + # ``embedder`` / ``vector_connector`` (e.g. unit tests, dev + # scripts) skips all four steps and degrades to Wave 6 + # lineage-only behaviour. Production deployments wire the + # dependencies in ``worker_factory._build_graph_worker``. + # + # Step ordering (per spec §K.12.3 + huangheng msg=16a38734 + # 7-invariant): Compactor → embed → vector upsert → + # snapshot-diff delete → merge detector. Out-of-order would + # break invariant #3 (Compactor must run before vector embed) + # or invariant #7 (snapshot-diff entity-name set must be + # captured *after* lineage rebuild + gc). + await self._post_sync_vector_pipeline( + document_id=document_id, + parse_version=parse_version, + kg_entity_records=entities, + kg_relation_records=relations, + pre_sync_entity_names=set(affected_entity_ids), + pre_sync_relation_keys=set(affected_relation_keys), + ) + + # ---- Phase 3 helpers (Wave 7 W7-3) ------------------------------ + + async def _post_sync_vector_pipeline( + self, + *, + document_id: str, + parse_version: str, + kg_entity_records: Sequence[EntityRecord], + kg_relation_records: Sequence[RelationRecord], + pre_sync_entity_names: set[str], + pre_sync_relation_keys: set[tuple[str, str, str]], + ) -> None: + """Phase 3: compact → embed → vector upsert → snapshot-diff + delete → merge detector. See ``sync`` docstring for invariant + ordering rationale. + + Skips entirely when both ``embedder`` and ``vector_connector`` + are unwired — the post-Wave-7 production wiring always pairs + them, but unit tests for Phase 1+2 algorithm coverage can + construct the worker without these dependencies. + """ + if self._vector_connector is None or self._embedder is None: + return # Wave 6 backward-compat: lineage-only sync. + + # Step A — Compactor: write ``compacted_description`` for each + # entity / relation that this sync just touched. The + # ``upsert_*_with_lineage`` kwarg ``compacted_description`` + # uses COALESCE-style preserve, so passing ``None`` (when + # compactor opts out or is unwired) does NOT clear an existing + # cache; only a real summary string overwrites. + affected_entities_for_phase3: list[EntityWithLineage] = [] + for entity_record in kg_entity_records: + entity = await self._store.get_entity(entity_record.name) + if entity is None: + # Entity was extracted in this sync but is not in the + # store anymore — should be impossible after Phase 2, + # but guard against partial-write races. + continue + compacted = await self._maybe_compact(entity) + if compacted is not None: + lineage = LineageMember( + document_id=document_id, + parse_version=parse_version, + tenant_scope_key=self._tenant_scope_key, + chunk_ids=entity_record.source_chunk_ids, + ) + async with self._entity_lock.acquire(entity_record.name): + await self._store.upsert_entity_with_lineage( + record=entity_record, + lineage=lineage, + compacted_description=compacted, + ) + # Refresh the in-memory copy so Step B embeds the new + # compacted string, not the stale read-before-write. + entity = EntityWithLineage( + name=entity.name, + entity_type=entity.entity_type, + source_lineage=entity.source_lineage, + description_parts=entity.description_parts, + compacted_description=compacted, + ) + affected_entities_for_phase3.append(entity) + + affected_relations_for_phase3: list[RelationWithLineage] = [] + for relation_record in kg_relation_records: + relation = await self._store.get_relation( + relation_record.source, + relation_record.target, + relation_record.relation_type, + ) + if relation is None: + continue + compacted_rel = await self._maybe_compact_relation(relation) + if compacted_rel is not None: + lineage = LineageMember( + document_id=document_id, + parse_version=parse_version, + tenant_scope_key=self._tenant_scope_key, + chunk_ids=relation_record.source_chunk_ids, + ) + lock_key = _relation_lock_key( + relation_record.source, + relation_record.target, + relation_record.relation_type, + ) + async with self._entity_lock.acquire(lock_key): + await self._store.upsert_relation_with_lineage( + record=relation_record, + lineage=lineage, + compacted_description=compacted_rel, + ) + relation = RelationWithLineage( + source=relation.source, + target=relation.target, + relation_type=relation.relation_type, + evidence_lineage=relation.evidence_lineage, + description_parts=relation.description_parts, + compacted_description=compacted_rel, + ) + affected_relations_for_phase3.append(relation) + + # Step B — Embed + vector upsert (3-field payload, uuid5 id + # includes collection_id for cross-collection uniqueness in a + # shared backing store; per-tenant guard is the connector's + # responsibility — see spec §K.12.5 lock). + for entity in affected_entities_for_phase3: + await self._upsert_entity_vector_point(entity) + for relation in affected_relations_for_phase3: + await self._upsert_relation_vector_point(relation) + + # Step C — Snapshot-diff delete: drop vector points for any + # entity / relation that pre-sync had a row but post-sync does + # not (gc_*_if_orphan deleted it during Phase 1). Computing the + # diff on lineage-store names instead of an ANN list-all is + # invariant #7 — the vector backend is treated as derivable + # state, the lineage store is the source of truth. + post_sync_entity_names = await self._capture_entity_names( + kg_entity_records=kg_entity_records, + pre_sync_entity_names=pre_sync_entity_names, + ) + gc_entity_names = pre_sync_entity_names - post_sync_entity_names + if gc_entity_names: + ids = [self._entity_vector_id(name) for name in gc_entity_names] + await asyncio.to_thread(self._vector_connector.delete, ids) + + post_sync_relation_keys = await self._capture_relation_keys( + kg_relation_records=kg_relation_records, + pre_sync_relation_keys=pre_sync_relation_keys, + ) + gc_relation_keys = pre_sync_relation_keys - post_sync_relation_keys + if gc_relation_keys: + ids = [self._relation_vector_id(*key) for key in gc_relation_keys] + await asyncio.to_thread(self._vector_connector.delete, ids) + + # Step D — Merge candidate detection (write-only, D-3 lock — + # detector writes PENDING ``GraphCurationSuggestion`` rows for + # the curator UI; never auto-merges). + if self._merge_detector is not None: + affected_names = [e.name for e in affected_entities_for_phase3] + sync_run_id = f"{self._collection_id}:{document_id}:{parse_version}" + try: + await self._merge_detector.detect_for_sync( + sync_run_id=sync_run_id, + affected_entity_names=affected_names, + ) + except Exception: # noqa: BLE001 — detector failure must not break sync + logger.warning( + "graph.sync merge_detector failed (non-fatal) collection=%s document=%s", + self._collection_id, + document_id, + exc_info=True, + ) + + async def _maybe_compact(self, entity: EntityWithLineage) -> str | None: + """Run the compactor over an entity's ``description_parts``; + return the new summary or ``None`` (caller leaves the column + alone — COALESCE preserve).""" + if self._compactor is None: + return None + parts_text = [p.text for p in entity.description_parts if p.text] + try: + return await self._compactor.compact_if_oversized(parts_text) + except Exception: # noqa: BLE001 — compactor LLM may flake + logger.warning( + "graph.sync compactor failed for entity=%r (non-fatal)", + entity.name, + exc_info=True, + ) + return None + + async def _maybe_compact_relation(self, relation: RelationWithLineage) -> str | None: + if self._compactor is None: + return None + parts_text = [p.text for p in relation.description_parts if p.text] + try: + return await self._compactor.compact_if_oversized(parts_text) + except Exception: # noqa: BLE001 + logger.warning( + "graph.sync compactor failed for relation=%s->%s:%s (non-fatal)", + relation.source, + relation.target, + relation.relation_type, + exc_info=True, + ) + return None + + async def _upsert_entity_vector_point(self, entity: EntityWithLineage) -> None: + """Embed the entity's compacted description (or fallback to + ``name + parts`` concat when compactor declined or unwired) and + upsert one Qdrant-style point. Connector handles tenant guard; + payload is the 3-field shape locked at spec §K.12.5.""" + text = self._embed_text_for_entity(entity) + if not text: + return + try: + embedding = await asyncio.to_thread(self._embedder, text) + except Exception: # noqa: BLE001 + logger.warning( + "graph.sync embedder failed for entity=%r (non-fatal)", + entity.name, + exc_info=True, + ) + return + from aperag.vectorstore.dto import VectorPoint + + point = VectorPoint( + id=self._entity_vector_id(entity.name), + vector=list(embedding), + payload={ + "indexer": "graph_entity", + "entity_name": entity.name, + "entity_type": entity.entity_type, + }, + ) + try: + await asyncio.to_thread(self._vector_connector.upsert, [point]) + except Exception: # noqa: BLE001 + logger.warning( + "graph.sync vector upsert failed for entity=%r (non-fatal)", + entity.name, + exc_info=True, + ) + + async def _upsert_relation_vector_point(self, relation: RelationWithLineage) -> None: + text = self._embed_text_for_relation(relation) + if not text: + return + try: + embedding = await asyncio.to_thread(self._embedder, text) + except Exception: # noqa: BLE001 + logger.warning( + "graph.sync embedder failed for relation=%s->%s:%s (non-fatal)", + relation.source, + relation.target, + relation.relation_type, + exc_info=True, + ) + return + from aperag.vectorstore.dto import VectorPoint + + point = VectorPoint( + id=self._relation_vector_id(relation.source, relation.target, relation.relation_type), + vector=list(embedding), + payload={ + "indexer": "graph_relation", + "entity_name": f"{relation.source}->{relation.target}", + "entity_type": relation.relation_type, + }, + ) + try: + await asyncio.to_thread(self._vector_connector.upsert, [point]) + except Exception: # noqa: BLE001 + logger.warning( + "graph.sync vector upsert failed for relation=%s->%s:%s (non-fatal)", + relation.source, + relation.target, + relation.relation_type, + exc_info=True, + ) + + @staticmethod + def _embed_text_for_entity(entity: EntityWithLineage) -> str: + # Prefer compactor's summary; fallback joins ``name`` + raw + # description parts so newly-extracted entities (compactor + # below threshold) still get vectorised. + if entity.compacted_description: + return entity.compacted_description + body = "\n\n".join(p.text for p in entity.description_parts if p.text) + if body: + return f"{entity.name}\n\n{body}" + return entity.name + + @staticmethod + def _embed_text_for_relation(relation: RelationWithLineage) -> str: + if relation.compacted_description: + return relation.compacted_description + body = "\n\n".join(p.text for p in relation.description_parts if p.text) + head = f"{relation.source} -[{relation.relation_type}]-> {relation.target}" + if body: + return f"{head}\n\n{body}" + return head + + def _entity_vector_id(self, entity_name: str) -> str: + return str(uuid5(NAMESPACE_DNS, f"graph_entity:{self._collection_id}:{entity_name}")) + + def _relation_vector_id(self, source: str, target: str, relation_type: str) -> str: + return str( + uuid5( + NAMESPACE_DNS, + f"graph_relation:{self._collection_id}:{source}->{target}:{relation_type}", + ) + ) + + async def _capture_entity_names( + self, + *, + kg_entity_records: Sequence[EntityRecord], + pre_sync_entity_names: set[str], + ) -> set[str]: + """Build the post-sync entity-name set: every entity that was + in the sync's kg.jsonl plus any pre-existing entity from + ``pre_sync_entity_names`` that survived ``gc_entity_if_orphan`` + (it had lineage from another doc / parse_version).""" + post: set[str] = {r.name for r in kg_entity_records} + # Pre-existing entities — only include those still present after + # Phase 1 gc (other documents' lineage kept them alive). + for name in pre_sync_entity_names: + if name in post: + continue + row = await self._store.get_entity(name) + if row is not None: + post.add(name) + return post + + async def _capture_relation_keys( + self, + *, + kg_relation_records: Sequence[RelationRecord], + pre_sync_relation_keys: set[tuple[str, str, str]], + ) -> set[tuple[str, str, str]]: + post: set[tuple[str, str, str]] = {r.relation_key() for r in kg_relation_records} + for key in pre_sync_relation_keys: + if key in post: + continue + row = await self._store.get_relation(*key) + if row is not None: + post.add(key) + return post + def _relation_lock_key(source: str, target: str, type: str) -> str: """Lock key that serialises read-modify-write on a relation edge. diff --git a/aperag/indexing/worker_factory.py b/aperag/indexing/worker_factory.py index c994dd24c..291b25bee 100644 --- a/aperag/indexing/worker_factory.py +++ b/aperag/indexing/worker_factory.py @@ -513,6 +513,22 @@ def _build_graph_worker(*, collection: Any, object_store: Any, payload: Dispatch lock = _resolve_entity_lock(backend_type=backend_type) extractor = build_collection_graph_extractor(collection) tenant_scope_key = _resolve_tenant_scope_key(payload=payload) + + # Wave 7 W7-3 wiring: compactor + embedder + vector connector + + # merge candidate detector. The Phase 3 sync extension is opt-in — + # if any dependency fails to construct (e.g. completion model not + # configured for this collection), we degrade to Wave 6 + # lineage-only sync. Failures are logged and the Wave 4-6 lineage + # path stays intact. + compactor = _build_collection_graph_compactor(collection) + vector_connector, embedder = _build_collection_graph_vector_writer(collection) + merge_detector = _build_collection_merge_candidate_detector( + collection=collection, + store=store, + vector_connector=vector_connector, + embedder=embedder, + ) + return _GraphModalityWorker( store=store, extractor=extractor, @@ -520,9 +536,119 @@ def _build_graph_worker(*, collection: Any, object_store: Any, payload: Dispatch object_store=object_store, collection_id=collection.id, tenant_scope_key=tenant_scope_key, + compactor=compactor, + embedder=embedder, + vector_connector=vector_connector, + merge_detector=merge_detector, ) +def _build_collection_graph_compactor(collection: Any) -> Any: + """Build a per-collection :class:`GraphIndexCompactor` if the + collection has a completion model configured. Returns ``None`` + otherwise so :class:`GraphModalityWorker` skips the compaction + step gracefully (Phase 3 still runs vector upsert against the + raw description fallback).""" + from aperag.indexing.graph_compactor import GraphIndexCompactor + + try: + from aperag.domains.knowledge_graph.graphindex.integration import ( + build_collection_llm_callable, + ) + + llm = build_collection_llm_callable(collection) + except Exception: # noqa: BLE001 — best-effort; missing model is non-fatal. + logger.warning( + "graph compactor: completion model not configured for collection %s; " + "compactor will not run on sync (vector embed falls back to raw description)", + getattr(collection, "id", ""), + ) + return None + return GraphIndexCompactor(llm=llm) + + +def _build_collection_graph_vector_writer(collection: Any) -> tuple[Any, Any]: + """Resolve the per-collection vector connector + sync embedder for + the graph entity / relation vector path. Returns ``(None, None)`` + if the embedder fails to resolve (no completion model, broken + config) — Phase 3 then degrades to lineage-only sync. + + Reuses :func:`_build_collection_qdrant_connector` so the graph + write path shares the exact connector / dimension resolution the + vector worker uses (one Qdrant collection per ApeRAG collection, + distinguished from chunk vectors by the ``indexer`` payload key). + """ + try: + adaptor, embedding_service, _vector_size = _build_collection_qdrant_connector(collection) + except Exception: # noqa: BLE001 + logger.warning( + "graph vector writer: qdrant connector resolve failed for collection %s; " + "Phase 3 vector path will be skipped", + getattr(collection, "id", ""), + exc_info=True, + ) + return None, None + + if embedding_service is None: + return adaptor.connector, None + + def _embed(text: str) -> list[float]: + return embedding_service.embed_query(text) + + return adaptor.connector, _embed + + +def _build_collection_merge_candidate_detector( + *, + collection: Any, + store: Any, + vector_connector: Any, + embedder: Any, +) -> Any: + """Build the per-collection :class:`MergeCandidateDetector` if + both vector connector and embedder resolved. Returns ``None`` when + any dependency is missing — the detector is a write-only auxiliary + so a missing one degrades cleanly to "no auto-detect candidates + written" without breaking the sync.""" + if vector_connector is None or embedder is None: + return None + + from aperag.indexing.merge_candidate_detector import MergeCandidateDetector + + class _SyncEmbedderShim: + """Adapt the sync ``(text -> list[float])`` callable used by + the graph worker into the ``embed_query`` shape the detector + expects (mirrors :class:`EmbeddingService` surface).""" + + def __init__(self, fn: Callable[[str], list[float]]) -> None: + self._fn = fn + + def embed_query(self, text: str) -> list[float]: + return self._fn(text) + + user_id = _resolve_collection_user_id(collection) + return MergeCandidateDetector( + store=store, + vector_connector=vector_connector, + embedder=_SyncEmbedderShim(embedder), + collection_id=collection.id, + user_id=user_id, + ) + + +def _resolve_collection_user_id(collection: Any) -> str: + """Best-effort recovery of the collection owner's user id, used by + :class:`MergeCandidateDetector` to attribute the auto-detect run. + Falls back to the literal ``"system"`` so the detector still runs + when the field is unavailable (the suggestion's ``source`` already + distinguishes auto-detect from user-triggered runs).""" + for attr in ("user_id", "user", "owner_id"): + value = getattr(collection, attr, None) + if isinstance(value, str) and value: + return value + return "system" + + # --------------------------------------------------------------------- # Helpers — backend dispatch + per-process client singletons + lock # selection. diff --git a/tests/unit_test/indexing/test_t1_2_graph.py b/tests/unit_test/indexing/test_t1_2_graph.py index 150a15fb6..310386b9e 100644 --- a/tests/unit_test/indexing/test_t1_2_graph.py +++ b/tests/unit_test/indexing/test_t1_2_graph.py @@ -1272,3 +1272,661 @@ async def test_w7_delete_relation_returns_false_when_absent(): store = InMemoryLineageGraphStore() deleted = await store.delete_relation("Alice", "Bob", "knows") assert deleted is False + + +# --------------------------------------------------------------------- +# Group 6: Wave 7 W7-3 — ``GraphModalityWorker.sync()`` Phase 3 +# extension (compactor → embed → vector upsert → snapshot-diff +# delete → merge candidate detector). Covers the InMemory reference +# store; cross-backend coverage of the storage primitives lives in +# ``tests/integration/compat/test_lineage_graph_compat.py`` (W7-1). +# --------------------------------------------------------------------- + + +from uuid import NAMESPACE_DNS, uuid5 # noqa: E402 — Wave 7 W7-3 group section import + +from aperag.vectorstore.dto import VectorPoint # noqa: E402 — same + + +class _StubCompactor: + """Stub :class:`GraphIndexCompactor` — returns whatever the test + wired into ``response`` (None to opt-out, str to overwrite).""" + + def __init__(self, response: str | None = None, raise_after: int = 0) -> None: + self.response = response + self.calls: list[list[str]] = [] + self._raise_after = raise_after + + async def compact_if_oversized(self, parts: list[str]) -> str | None: + self.calls.append(list(parts)) + if self._raise_after and len(self.calls) >= self._raise_after: + raise RuntimeError("simulated compactor failure") + return self.response + + +class _StubVectorConnector: + """Captures upsert / delete calls against a deterministic embedder.""" + + def __init__(self, raise_on_upsert: bool = False) -> None: + self.upserts: list[list[VectorPoint]] = [] + self.deletes: list[list[str]] = [] + self._raise_on_upsert = raise_on_upsert + + def upsert(self, points: list[VectorPoint]) -> list[str]: + if self._raise_on_upsert: + raise RuntimeError("simulated vector upsert failure") + self.upserts.append(list(points)) + return [p.id for p in points] + + def delete(self, ids: list[str]) -> None: + self.deletes.append(list(ids)) + + +def _stub_embedder(text: str) -> list[float]: + # Deterministic 4-dim embedding so tests can compare equality. + h = abs(hash(text)) % 1000 + return [float(h % 7) / 7.0, float(h % 11) / 11.0, float(h % 13) / 13.0, float(h % 17) / 17.0] + + +class _StubMergeDetector: + def __init__(self) -> None: + self.calls: list[tuple[str, list[str]]] = [] + + async def detect_for_sync(self, *, sync_run_id: str, affected_entity_names: Sequence[str]) -> int: + self.calls.append((sync_run_id, list(affected_entity_names))) + return len(affected_entity_names) + + +def _phase3_worker( + *, + store: InMemoryLineageGraphStore, + entity_lock: EntityLock, + object_store: InMemoryObjectStore, + document_id: str, + entities_per_doc: dict[str, list[EntityRecord]] | None = None, + relations_per_doc: dict[str, list[RelationRecord]] | None = None, + compactor: Any = None, + embedder: Any = None, + vector_connector: Any = None, + merge_detector: Any = None, +) -> GraphModalityWorker: + async def extractor( + chunks: Sequence[dict[str, Any]], + ) -> tuple[list[EntityRecord], list[RelationRecord]]: + del chunks + return ( + list((entities_per_doc or {}).get(document_id, [])), + list((relations_per_doc or {}).get(document_id, [])), + ) + + return GraphModalityWorker( + store=store, + extractor=extractor, + entity_lock=entity_lock, + object_store=object_store, + collection_id=COLLECTION_ID, + tenant_scope_key=DEFAULT_TENANT, + compactor=compactor, + embedder=embedder, + vector_connector=vector_connector, + merge_detector=merge_detector, + ) + + +def _expected_entity_id(name: str) -> str: + return str(uuid5(NAMESPACE_DNS, f"graph_entity:{COLLECTION_ID}:{name}")) + + +def _expected_relation_id(source: str, target: str, type_: str) -> str: + return str(uuid5(NAMESPACE_DNS, f"graph_relation:{COLLECTION_ID}:{source}->{target}:{type_}")) + + +@pytest.mark.asyncio +async def test_w7_phase3_skipped_when_vector_deps_unwired(store, entity_lock, object_store): + """Wave 6 backward-compat: a worker with no embedder / + vector_connector behaves exactly as the lineage-only Wave 6 + version — no Phase 3 side effects.""" + entities_per_doc = { + "doc_A": [ + EntityRecord( + name="Linus", + entity_type="Person", + description="kernel hacker", + source_chunk_ids=("doc_A-v1-c0",), + ) + ] + } + worker = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + # All Phase 3 deps deliberately None. + ) + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + # Lineage rebuild still happened (Wave 6 path). + entity = await store.get_entity("Linus") + assert entity is not None + assert entity.compacted_description is None # Phase 3 skipped — no compactor write. + + +@pytest.mark.asyncio +async def test_w7_phase3_writes_vector_point_with_3_field_payload(store, entity_lock, object_store): + """Spec §K.12.5 lock: vector point payload is 3 fields exactly + (``indexer`` / ``entity_name`` / ``entity_type``); no + ``collection_id`` payload (that lives in the uuid5 id instead so + a shared backing store still has cross-collection unique ids).""" + entities_per_doc = { + "doc_A": [ + EntityRecord( + name="Linus", + entity_type="Person", + description="kernel hacker", + source_chunk_ids=("doc_A-v1-c0",), + ) + ] + } + vector = _StubVectorConnector() + worker = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + embedder=_stub_embedder, + vector_connector=vector, + ) + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + + assert len(vector.upserts) == 1 + [point] = vector.upserts[0] + assert point.id == _expected_entity_id("Linus") + assert point.payload == { + "indexer": "graph_entity", + "entity_name": "Linus", + "entity_type": "Person", + } + # 3 fields strict — no collection_id leakage. + assert "collection_id" not in point.payload + + +@pytest.mark.asyncio +async def test_w7_phase3_uuid5_id_is_deterministic(store, entity_lock, object_store): + """Same (collection, name) MUST produce identical uuid5 across + calls so vector upsert overwrites the existing point instead of + leaving stale duplicates (forward-only retry safety).""" + entities_per_doc = { + "doc_A": [ + EntityRecord( + name="Linus", + entity_type="Person", + description="rev1", + source_chunk_ids=("c0",), + ) + ] + } + vector = _StubVectorConnector() + worker = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + embedder=_stub_embedder, + vector_connector=vector, + ) + # First sync. + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + first_id = vector.upserts[0][0].id + # Re-sync same doc same content — id MUST match (deterministic). + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + assert vector.upserts[1][0].id == first_id + assert first_id == _expected_entity_id("Linus") + + +@pytest.mark.asyncio +async def test_w7_phase3_compactor_runs_before_embed(store, entity_lock, object_store): + """Spec §K.12.3 invariant #3 ordering: compactor MUST run before + the vector embed step so the embedded text is the LLM-summarised + version, not the raw concat. We assert by wiring a compactor that + returns ``"COMPACTED"`` and checking that the embed input matches.""" + entities_per_doc = { + "doc_A": [ + EntityRecord( + name="Linus", + entity_type="Person", + description="raw text", + source_chunk_ids=("c0",), + ) + ] + } + compactor = _StubCompactor(response="COMPACTED") + vector = _StubVectorConnector() + captured_embed_inputs: list[str] = [] + + def capturing_embedder(text: str) -> list[float]: + captured_embed_inputs.append(text) + return _stub_embedder(text) + + worker = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + compactor=compactor, + embedder=capturing_embedder, + vector_connector=vector, + ) + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + # Compactor was called with the per-doc parts text. + assert compactor.calls == [["raw text"]] + # Embed input is the compacted summary, not the raw text. + assert captured_embed_inputs == ["COMPACTED"] + # Storage row also carries the compacted value for downstream readers. + entity = await store.get_entity("Linus") + assert entity is not None + assert entity.compacted_description == "COMPACTED" + + +@pytest.mark.asyncio +async def test_w7_phase3_compactor_none_falls_back_to_raw_text(store, entity_lock, object_store): + """Compactor returning ``None`` (below threshold) leaves the + storage column untouched and the embedder gets the + ``name + raw parts`` fallback string.""" + entities_per_doc = { + "doc_A": [ + EntityRecord( + name="Linus", + entity_type="Person", + description="short", + source_chunk_ids=("c0",), + ) + ] + } + compactor = _StubCompactor(response=None) + vector = _StubVectorConnector() + captured: list[str] = [] + + def capturing(text: str) -> list[float]: + captured.append(text) + return _stub_embedder(text) + + worker = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + compactor=compactor, + embedder=capturing, + vector_connector=vector, + ) + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + assert captured == ["Linus\n\nshort"] + entity = await store.get_entity("Linus") + assert entity is not None + assert entity.compacted_description is None # COALESCE preserve. + + +@pytest.mark.asyncio +async def test_w7_phase3_snapshot_diff_deletes_gc_vector_points(store, entity_lock, object_store): + """Doc-delete cascade: the entity gc'd by Phase 1 MUST have its + vector point deleted in Phase 3 step C — using the lineage-store + name set diff (not an ANN list-all per invariant #7).""" + # Sync 1: doc_A produces ``Linus``. + entities_per_doc_v1 = { + "doc_A": [ + EntityRecord( + name="Linus", + entity_type="Person", + description="kernel hacker", + source_chunk_ids=("c0",), + ) + ] + } + vector = _StubVectorConnector() + worker_v1 = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc_v1, + embedder=_stub_embedder, + vector_connector=vector, + ) + await _derive_then_sync( + worker=worker_v1, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + assert len(vector.upserts) == 1 # Linus upserted. + + # Sync 2: doc_A re-parsed but no longer mentions Linus → entity + # gc'd → vector point should be deleted by snapshot-diff. + entities_per_doc_v2 = {"doc_A": []} + worker_v2 = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc_v2, + embedder=_stub_embedder, + vector_connector=vector, + ) + await _derive_then_sync( + worker=worker_v2, + document_id="doc_A", + parse_version="v2", + object_store=object_store, + ) + # No new upsert (kg.jsonl empty). + assert len(vector.upserts) == 1 + # Snapshot-diff issued a delete for Linus' vector point. + assert vector.deletes == [[_expected_entity_id("Linus")]] + + +@pytest.mark.asyncio +async def test_w7_phase3_snapshot_diff_preserves_cross_doc_entity(store, entity_lock, object_store): + """Cross-doc shared entity: when doc_A is re-parsed and drops + Linus, but doc_B still mentions him, Phase 3 MUST NOT delete the + vector point (post_sync set still contains Linus from doc_B).""" + # Seed both docs. + vector = _StubVectorConnector() + entities_per_doc = { + "doc_A": [ + EntityRecord( + name="Linus", + entity_type="Person", + description="kernel hacker per doc_A", + source_chunk_ids=("c0",), + ) + ], + "doc_B": [ + EntityRecord( + name="Linus", + entity_type="Person", + description="kernel hacker per doc_B", + source_chunk_ids=("c10",), + ) + ], + } + worker_a = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + embedder=_stub_embedder, + vector_connector=vector, + ) + worker_b = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_B", + entities_per_doc=entities_per_doc, + embedder=_stub_embedder, + vector_connector=vector, + ) + await _derive_then_sync( + worker=worker_a, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + await _derive_then_sync( + worker=worker_b, + document_id="doc_B", + parse_version="v1", + object_store=object_store, + ) + # Re-sync doc_A with no entities — Linus survives via doc_B. + entities_per_doc_v2 = {"doc_A": [], "doc_B": entities_per_doc["doc_B"]} + worker_a_v2 = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc_v2, + embedder=_stub_embedder, + vector_connector=vector, + ) + await _derive_then_sync( + worker=worker_a_v2, + document_id="doc_A", + parse_version="v2", + object_store=object_store, + ) + # No deletes — Linus still alive via doc_B's lineage. + assert vector.deletes == [] + + +@pytest.mark.asyncio +async def test_w7_phase3_relation_vector_upsert_payload_and_id(store, entity_lock, object_store): + """Relation Phase 3 mirrors entity: 3-field payload with + ``indexer="graph_relation"``, uuid5 id formatted + ``graph_relation::->:``.""" + entities_per_doc = { + "doc_A": [ + EntityRecord(name="Alice", entity_type="Person", description="a", source_chunk_ids=("c0",)), + EntityRecord(name="Bob", entity_type="Person", description="b", source_chunk_ids=("c0",)), + ] + } + relations_per_doc = { + "doc_A": [ + RelationRecord( + source="Alice", + target="Bob", + relation_type="knows", + description="Alice knows Bob", + source_chunk_ids=("c0",), + ) + ] + } + vector = _StubVectorConnector() + worker = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + relations_per_doc=relations_per_doc, + embedder=_stub_embedder, + vector_connector=vector, + ) + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + # 3 upserts: 2 entities + 1 relation. + upserted_ids = {pts[0].id for pts in vector.upserts} + upserted_payloads = {pts[0].payload["indexer"] for pts in vector.upserts} + assert _expected_relation_id("Alice", "Bob", "knows") in upserted_ids + assert "graph_relation" in upserted_payloads + # Find the relation point and validate payload shape. + relation_points = [pts[0] for pts in vector.upserts if pts[0].payload["indexer"] == "graph_relation"] + assert len(relation_points) == 1 + rel_payload = relation_points[0].payload + assert rel_payload == { + "indexer": "graph_relation", + "entity_name": "Alice->Bob", + "entity_type": "knows", + } + + +@pytest.mark.asyncio +async def test_w7_phase3_merge_detector_invoked_with_affected_names(store, entity_lock, object_store): + """Step D wiring: when a detector is wired, Phase 3 calls + ``detect_for_sync`` exactly once per sync, with the entity names + just touched by this sync (per D-3 — incremental detection only).""" + entities_per_doc = { + "doc_A": [ + EntityRecord(name="Alice", entity_type="Person", description="a", source_chunk_ids=("c0",)), + EntityRecord(name="Bob", entity_type="Person", description="b", source_chunk_ids=("c0",)), + ] + } + detector = _StubMergeDetector() + worker = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + embedder=_stub_embedder, + vector_connector=_StubVectorConnector(), + merge_detector=detector, + ) + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + assert len(detector.calls) == 1 + sync_run_id, names = detector.calls[0] + assert sync_run_id == f"{COLLECTION_ID}:doc_A:v1" + assert sorted(names) == ["Alice", "Bob"] + + +@pytest.mark.asyncio +async def test_w7_phase3_merge_detector_failure_is_non_fatal(store, entity_lock, object_store): + """Detector throwing must NOT abort sync — Phase 3 step D is + best-effort (write-only auxiliary, not on the lineage critical + path). The lineage rebuild + vector upsert that ran before it + must remain intact.""" + + class _RaisingDetector: + async def detect_for_sync(self, *, sync_run_id, affected_entity_names): + raise RuntimeError("simulated detector failure") + + entities_per_doc = { + "doc_A": [ + EntityRecord(name="Linus", entity_type="Person", description="raw", source_chunk_ids=("c0",)), + ] + } + vector = _StubVectorConnector() + worker = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + embedder=_stub_embedder, + vector_connector=vector, + merge_detector=_RaisingDetector(), + ) + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + # Lineage + vector upsert succeeded. + assert (await store.get_entity("Linus")) is not None + assert len(vector.upserts) == 1 + + +@pytest.mark.asyncio +async def test_w7_phase3_compactor_failure_falls_back(store, entity_lock, object_store): + """Compactor failing (LLM flake) must NOT abort sync — vector + upsert continues with the raw description fallback (forward-only + retry safety).""" + entities_per_doc = { + "doc_A": [ + EntityRecord(name="Linus", entity_type="Person", description="raw", source_chunk_ids=("c0",)), + ] + } + compactor = _StubCompactor(raise_after=1) + vector = _StubVectorConnector() + captured: list[str] = [] + + def capturing(text: str) -> list[float]: + captured.append(text) + return _stub_embedder(text) + + worker = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + compactor=compactor, + embedder=capturing, + vector_connector=vector, + ) + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + # Embedder ran with the fallback. + assert captured == ["Linus\n\nraw"] + # Vector upsert still happened. + assert len(vector.upserts) == 1 + + +@pytest.mark.asyncio +async def test_w7_phase3_vector_upsert_failure_is_non_fatal(store, entity_lock, object_store): + """Vector connector raising on upsert must NOT abort sync — the + failure is logged and lineage state is unaffected.""" + entities_per_doc = { + "doc_A": [ + EntityRecord(name="Linus", entity_type="Person", description="raw", source_chunk_ids=("c0",)), + ] + } + vector = _StubVectorConnector(raise_on_upsert=True) + worker = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + embedder=_stub_embedder, + vector_connector=vector, + ) + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + # Lineage row still present. + assert (await store.get_entity("Linus")) is not None + # No upsert recorded (raised before append). + assert vector.upserts == []