diff --git a/aperag/indexing/graph.py b/aperag/indexing/graph.py index 796704d96..c81a78d2a 100644 --- a/aperag/indexing/graph.py +++ b/aperag/indexing/graph.py @@ -93,7 +93,8 @@ from collections.abc import AsyncIterator, Iterable, Sequence from contextlib import asynccontextmanager from dataclasses import dataclass, field -from typing import Any, Awaitable, Callable, Protocol +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Protocol +from uuid import NAMESPACE_DNS, uuid5 from aperag.indexing.base import DeriveResult, ModalityWorker from aperag.indexing.models import Modality @@ -105,6 +106,11 @@ from aperag.indexing.parser import read_chunks from aperag.objectstore.base import ObjectStore as _SyncObjectStore +if TYPE_CHECKING: # pragma: no cover — TYPE_CHECKING-only imports avoid circular deps + from aperag.indexing.graph_compactor import GraphIndexCompactor + from aperag.indexing.merge_candidate_detector import MergeCandidateDetector + from aperag.vectorstore.base import VectorStoreConnector + logger = logging.getLogger(__name__) @@ -1118,6 +1124,10 @@ def __init__( object_store: _SyncObjectStore, collection_id: str, tenant_scope_key: str, + compactor: "GraphIndexCompactor | None" = None, + embedder: Callable[[str], list[float]] | None = None, + vector_connector: "VectorStoreConnector | None" = None, + merge_detector: "MergeCandidateDetector | None" = None, ) -> None: """Construct a graph worker bound to a single tenant scope. @@ -1127,6 +1137,28 @@ def __init__( ``sync`` so the SET-level tenant attribution survives the artifact round-trip and is available to read-path ACL filtering. + + Wave 7 W7-3 dependencies (all optional — when omitted the + sync() worker degrades to Wave 6 lineage-only behaviour and + skips Phase 3 entirely): + + * ``compactor`` (Wave 7 W7-2): produces a single bounded LLM + summary from the per-doc ``description_parts`` list. Skipped + when ``None`` — the entity's existing ``compacted_description`` + (potentially set by a prior sync or curation merge) is left + alone. + * ``embedder``: sync ``(text -> list[float])`` callable; mirrors + the ``EmbeddingService.embed_query`` signature used by the + vector worker. Required for Step B (vector upsert) and the + merge detector to function. + * ``vector_connector``: a :class:`VectorStoreConnector` already + per-tenant bound (the connector enforces the tenant guard so + the per-point payload is 3-field — see §K.12.5 ratify + msg=acbd0003 / msg=d3f4e6f8). Required for Steps B + C. + * ``merge_detector`` (Wave 7 W7-4): writes PENDING auto-detect + ``GraphCurationSuggestion`` rows. Skipped when ``None`` — + the detector is a write-only auxiliary, not on the lineage + critical path. """ self._store = store self._extractor = extractor @@ -1134,6 +1166,10 @@ def __init__( self._object_store = object_store self._collection_id = collection_id self._tenant_scope_key = tenant_scope_key + self._compactor = compactor + self._embedder = embedder + self._vector_connector = vector_connector + self._merge_detector = merge_detector # ---- derive ----------------------------------------------------- @@ -1266,6 +1302,359 @@ async def sync( len(relations), ) + # ---- Phase 3: Wave 7 W7-3 — compact + embed + vector upsert + # + snapshot-diff vector cleanup + merge candidate detect. + # + # The phase is opt-in: a worker constructed without + # ``embedder`` / ``vector_connector`` (e.g. unit tests, dev + # scripts) skips all four steps and degrades to Wave 6 + # lineage-only behaviour. Production deployments wire the + # dependencies in ``worker_factory._build_graph_worker``. + # + # Step ordering (per spec §K.12.3 + huangheng msg=16a38734 + # 7-invariant): Compactor → embed → vector upsert → + # snapshot-diff delete → merge detector. Out-of-order would + # break invariant #3 (Compactor must run before vector embed) + # or invariant #7 (snapshot-diff entity-name set must be + # captured *after* lineage rebuild + gc). + await self._post_sync_vector_pipeline( + document_id=document_id, + parse_version=parse_version, + kg_entity_records=entities, + kg_relation_records=relations, + pre_sync_entity_names=set(affected_entity_ids), + pre_sync_relation_keys=set(affected_relation_keys), + ) + + # ---- Phase 3 helpers (Wave 7 W7-3) ------------------------------ + + async def _post_sync_vector_pipeline( + self, + *, + document_id: str, + parse_version: str, + kg_entity_records: Sequence[EntityRecord], + kg_relation_records: Sequence[RelationRecord], + pre_sync_entity_names: set[str], + pre_sync_relation_keys: set[tuple[str, str, str]], + ) -> None: + """Phase 3: compact → embed → vector upsert → snapshot-diff + delete → merge detector. See ``sync`` docstring for invariant + ordering rationale. + + Skips entirely when both ``embedder`` and ``vector_connector`` + are unwired — the post-Wave-7 production wiring always pairs + them, but unit tests for Phase 1+2 algorithm coverage can + construct the worker without these dependencies. + """ + if self._vector_connector is None or self._embedder is None: + return # Wave 6 backward-compat: lineage-only sync. + + # Step A — Compactor: write ``compacted_description`` for each + # entity / relation that this sync just touched. The + # ``upsert_*_with_lineage`` kwarg ``compacted_description`` + # uses COALESCE-style preserve, so passing ``None`` (when + # compactor opts out or is unwired) does NOT clear an existing + # cache; only a real summary string overwrites. + affected_entities_for_phase3: list[EntityWithLineage] = [] + for entity_record in kg_entity_records: + entity = await self._store.get_entity(entity_record.name) + if entity is None: + # Entity was extracted in this sync but is not in the + # store anymore — should be impossible after Phase 2, + # but guard against partial-write races. + continue + compacted = await self._maybe_compact(entity) + if compacted is not None: + lineage = LineageMember( + document_id=document_id, + parse_version=parse_version, + tenant_scope_key=self._tenant_scope_key, + chunk_ids=entity_record.source_chunk_ids, + ) + async with self._entity_lock.acquire(entity_record.name): + await self._store.upsert_entity_with_lineage( + record=entity_record, + lineage=lineage, + compacted_description=compacted, + ) + # Refresh the in-memory copy so Step B embeds the new + # compacted string, not the stale read-before-write. + entity = EntityWithLineage( + name=entity.name, + entity_type=entity.entity_type, + source_lineage=entity.source_lineage, + description_parts=entity.description_parts, + compacted_description=compacted, + ) + affected_entities_for_phase3.append(entity) + + affected_relations_for_phase3: list[RelationWithLineage] = [] + for relation_record in kg_relation_records: + relation = await self._store.get_relation( + relation_record.source, + relation_record.target, + relation_record.relation_type, + ) + if relation is None: + continue + compacted_rel = await self._maybe_compact_relation(relation) + if compacted_rel is not None: + lineage = LineageMember( + document_id=document_id, + parse_version=parse_version, + tenant_scope_key=self._tenant_scope_key, + chunk_ids=relation_record.source_chunk_ids, + ) + lock_key = _relation_lock_key( + relation_record.source, + relation_record.target, + relation_record.relation_type, + ) + async with self._entity_lock.acquire(lock_key): + await self._store.upsert_relation_with_lineage( + record=relation_record, + lineage=lineage, + compacted_description=compacted_rel, + ) + relation = RelationWithLineage( + source=relation.source, + target=relation.target, + relation_type=relation.relation_type, + evidence_lineage=relation.evidence_lineage, + description_parts=relation.description_parts, + compacted_description=compacted_rel, + ) + affected_relations_for_phase3.append(relation) + + # Step B — Embed + vector upsert (3-field payload, uuid5 id + # includes collection_id for cross-collection uniqueness in a + # shared backing store; per-tenant guard is the connector's + # responsibility — see spec §K.12.5 lock). + for entity in affected_entities_for_phase3: + await self._upsert_entity_vector_point(entity) + for relation in affected_relations_for_phase3: + await self._upsert_relation_vector_point(relation) + + # Step C — Snapshot-diff delete: drop vector points for any + # entity / relation that pre-sync had a row but post-sync does + # not (gc_*_if_orphan deleted it during Phase 1). Computing the + # diff on lineage-store names instead of an ANN list-all is + # invariant #7 — the vector backend is treated as derivable + # state, the lineage store is the source of truth. + post_sync_entity_names = await self._capture_entity_names( + kg_entity_records=kg_entity_records, + pre_sync_entity_names=pre_sync_entity_names, + ) + gc_entity_names = pre_sync_entity_names - post_sync_entity_names + if gc_entity_names: + ids = [self._entity_vector_id(name) for name in gc_entity_names] + await asyncio.to_thread(self._vector_connector.delete, ids) + + post_sync_relation_keys = await self._capture_relation_keys( + kg_relation_records=kg_relation_records, + pre_sync_relation_keys=pre_sync_relation_keys, + ) + gc_relation_keys = pre_sync_relation_keys - post_sync_relation_keys + if gc_relation_keys: + ids = [self._relation_vector_id(*key) for key in gc_relation_keys] + await asyncio.to_thread(self._vector_connector.delete, ids) + + # Step D — Merge candidate detection (write-only, D-3 lock — + # detector writes PENDING ``GraphCurationSuggestion`` rows for + # the curator UI; never auto-merges). + if self._merge_detector is not None: + affected_names = [e.name for e in affected_entities_for_phase3] + sync_run_id = f"{self._collection_id}:{document_id}:{parse_version}" + try: + await self._merge_detector.detect_for_sync( + sync_run_id=sync_run_id, + affected_entity_names=affected_names, + ) + except Exception: # noqa: BLE001 — detector failure must not break sync + logger.warning( + "graph.sync merge_detector failed (non-fatal) collection=%s document=%s", + self._collection_id, + document_id, + exc_info=True, + ) + + async def _maybe_compact(self, entity: EntityWithLineage) -> str | None: + """Run the compactor over an entity's ``description_parts``; + return the new summary or ``None`` (caller leaves the column + alone — COALESCE preserve).""" + if self._compactor is None: + return None + parts_text = [p.text for p in entity.description_parts if p.text] + try: + return await self._compactor.compact_if_oversized(parts_text) + except Exception: # noqa: BLE001 — compactor LLM may flake + logger.warning( + "graph.sync compactor failed for entity=%r (non-fatal)", + entity.name, + exc_info=True, + ) + return None + + async def _maybe_compact_relation(self, relation: RelationWithLineage) -> str | None: + if self._compactor is None: + return None + parts_text = [p.text for p in relation.description_parts if p.text] + try: + return await self._compactor.compact_if_oversized(parts_text) + except Exception: # noqa: BLE001 + logger.warning( + "graph.sync compactor failed for relation=%s->%s:%s (non-fatal)", + relation.source, + relation.target, + relation.relation_type, + exc_info=True, + ) + return None + + async def _upsert_entity_vector_point(self, entity: EntityWithLineage) -> None: + """Embed the entity's compacted description (or fallback to + ``name + parts`` concat when compactor declined or unwired) and + upsert one Qdrant-style point. Connector handles tenant guard; + payload is the 3-field shape locked at spec §K.12.5.""" + text = self._embed_text_for_entity(entity) + if not text: + return + try: + embedding = await asyncio.to_thread(self._embedder, text) + except Exception: # noqa: BLE001 + logger.warning( + "graph.sync embedder failed for entity=%r (non-fatal)", + entity.name, + exc_info=True, + ) + return + from aperag.vectorstore.dto import VectorPoint + + point = VectorPoint( + id=self._entity_vector_id(entity.name), + vector=list(embedding), + payload={ + "indexer": "graph_entity", + "entity_name": entity.name, + "entity_type": entity.entity_type, + }, + ) + try: + await asyncio.to_thread(self._vector_connector.upsert, [point]) + except Exception: # noqa: BLE001 + logger.warning( + "graph.sync vector upsert failed for entity=%r (non-fatal)", + entity.name, + exc_info=True, + ) + + async def _upsert_relation_vector_point(self, relation: RelationWithLineage) -> None: + text = self._embed_text_for_relation(relation) + if not text: + return + try: + embedding = await asyncio.to_thread(self._embedder, text) + except Exception: # noqa: BLE001 + logger.warning( + "graph.sync embedder failed for relation=%s->%s:%s (non-fatal)", + relation.source, + relation.target, + relation.relation_type, + exc_info=True, + ) + return + from aperag.vectorstore.dto import VectorPoint + + point = VectorPoint( + id=self._relation_vector_id(relation.source, relation.target, relation.relation_type), + vector=list(embedding), + payload={ + "indexer": "graph_relation", + "entity_name": f"{relation.source}->{relation.target}", + "entity_type": relation.relation_type, + }, + ) + try: + await asyncio.to_thread(self._vector_connector.upsert, [point]) + except Exception: # noqa: BLE001 + logger.warning( + "graph.sync vector upsert failed for relation=%s->%s:%s (non-fatal)", + relation.source, + relation.target, + relation.relation_type, + exc_info=True, + ) + + @staticmethod + def _embed_text_for_entity(entity: EntityWithLineage) -> str: + # Prefer compactor's summary; fallback joins ``name`` + raw + # description parts so newly-extracted entities (compactor + # below threshold) still get vectorised. + if entity.compacted_description: + return entity.compacted_description + body = "\n\n".join(p.text for p in entity.description_parts if p.text) + if body: + return f"{entity.name}\n\n{body}" + return entity.name + + @staticmethod + def _embed_text_for_relation(relation: RelationWithLineage) -> str: + if relation.compacted_description: + return relation.compacted_description + body = "\n\n".join(p.text for p in relation.description_parts if p.text) + head = f"{relation.source} -[{relation.relation_type}]-> {relation.target}" + if body: + return f"{head}\n\n{body}" + return head + + def _entity_vector_id(self, entity_name: str) -> str: + return str(uuid5(NAMESPACE_DNS, f"graph_entity:{self._collection_id}:{entity_name}")) + + def _relation_vector_id(self, source: str, target: str, relation_type: str) -> str: + return str( + uuid5( + NAMESPACE_DNS, + f"graph_relation:{self._collection_id}:{source}->{target}:{relation_type}", + ) + ) + + async def _capture_entity_names( + self, + *, + kg_entity_records: Sequence[EntityRecord], + pre_sync_entity_names: set[str], + ) -> set[str]: + """Build the post-sync entity-name set: every entity that was + in the sync's kg.jsonl plus any pre-existing entity from + ``pre_sync_entity_names`` that survived ``gc_entity_if_orphan`` + (it had lineage from another doc / parse_version).""" + post: set[str] = {r.name for r in kg_entity_records} + # Pre-existing entities — only include those still present after + # Phase 1 gc (other documents' lineage kept them alive). + for name in pre_sync_entity_names: + if name in post: + continue + row = await self._store.get_entity(name) + if row is not None: + post.add(name) + return post + + async def _capture_relation_keys( + self, + *, + kg_relation_records: Sequence[RelationRecord], + pre_sync_relation_keys: set[tuple[str, str, str]], + ) -> set[tuple[str, str, str]]: + post: set[tuple[str, str, str]] = {r.relation_key() for r in kg_relation_records} + for key in pre_sync_relation_keys: + if key in post: + continue + row = await self._store.get_relation(*key) + if row is not None: + post.add(key) + return post + def _relation_lock_key(source: str, target: str, type: str) -> str: """Lock key that serialises read-modify-write on a relation edge. diff --git a/aperag/indexing/worker_factory.py b/aperag/indexing/worker_factory.py index c994dd24c..291b25bee 100644 --- a/aperag/indexing/worker_factory.py +++ b/aperag/indexing/worker_factory.py @@ -513,6 +513,22 @@ def _build_graph_worker(*, collection: Any, object_store: Any, payload: Dispatch lock = _resolve_entity_lock(backend_type=backend_type) extractor = build_collection_graph_extractor(collection) tenant_scope_key = _resolve_tenant_scope_key(payload=payload) + + # Wave 7 W7-3 wiring: compactor + embedder + vector connector + + # merge candidate detector. The Phase 3 sync extension is opt-in — + # if any dependency fails to construct (e.g. completion model not + # configured for this collection), we degrade to Wave 6 + # lineage-only sync. Failures are logged and the Wave 4-6 lineage + # path stays intact. + compactor = _build_collection_graph_compactor(collection) + vector_connector, embedder = _build_collection_graph_vector_writer(collection) + merge_detector = _build_collection_merge_candidate_detector( + collection=collection, + store=store, + vector_connector=vector_connector, + embedder=embedder, + ) + return _GraphModalityWorker( store=store, extractor=extractor, @@ -520,9 +536,119 @@ def _build_graph_worker(*, collection: Any, object_store: Any, payload: Dispatch object_store=object_store, collection_id=collection.id, tenant_scope_key=tenant_scope_key, + compactor=compactor, + embedder=embedder, + vector_connector=vector_connector, + merge_detector=merge_detector, ) +def _build_collection_graph_compactor(collection: Any) -> Any: + """Build a per-collection :class:`GraphIndexCompactor` if the + collection has a completion model configured. Returns ``None`` + otherwise so :class:`GraphModalityWorker` skips the compaction + step gracefully (Phase 3 still runs vector upsert against the + raw description fallback).""" + from aperag.indexing.graph_compactor import GraphIndexCompactor + + try: + from aperag.domains.knowledge_graph.graphindex.integration import ( + build_collection_llm_callable, + ) + + llm = build_collection_llm_callable(collection) + except Exception: # noqa: BLE001 — best-effort; missing model is non-fatal. + logger.warning( + "graph compactor: completion model not configured for collection %s; " + "compactor will not run on sync (vector embed falls back to raw description)", + getattr(collection, "id", ""), + ) + return None + return GraphIndexCompactor(llm=llm) + + +def _build_collection_graph_vector_writer(collection: Any) -> tuple[Any, Any]: + """Resolve the per-collection vector connector + sync embedder for + the graph entity / relation vector path. Returns ``(None, None)`` + if the embedder fails to resolve (no completion model, broken + config) — Phase 3 then degrades to lineage-only sync. + + Reuses :func:`_build_collection_qdrant_connector` so the graph + write path shares the exact connector / dimension resolution the + vector worker uses (one Qdrant collection per ApeRAG collection, + distinguished from chunk vectors by the ``indexer`` payload key). + """ + try: + adaptor, embedding_service, _vector_size = _build_collection_qdrant_connector(collection) + except Exception: # noqa: BLE001 + logger.warning( + "graph vector writer: qdrant connector resolve failed for collection %s; " + "Phase 3 vector path will be skipped", + getattr(collection, "id", ""), + exc_info=True, + ) + return None, None + + if embedding_service is None: + return adaptor.connector, None + + def _embed(text: str) -> list[float]: + return embedding_service.embed_query(text) + + return adaptor.connector, _embed + + +def _build_collection_merge_candidate_detector( + *, + collection: Any, + store: Any, + vector_connector: Any, + embedder: Any, +) -> Any: + """Build the per-collection :class:`MergeCandidateDetector` if + both vector connector and embedder resolved. Returns ``None`` when + any dependency is missing — the detector is a write-only auxiliary + so a missing one degrades cleanly to "no auto-detect candidates + written" without breaking the sync.""" + if vector_connector is None or embedder is None: + return None + + from aperag.indexing.merge_candidate_detector import MergeCandidateDetector + + class _SyncEmbedderShim: + """Adapt the sync ``(text -> list[float])`` callable used by + the graph worker into the ``embed_query`` shape the detector + expects (mirrors :class:`EmbeddingService` surface).""" + + def __init__(self, fn: Callable[[str], list[float]]) -> None: + self._fn = fn + + def embed_query(self, text: str) -> list[float]: + return self._fn(text) + + user_id = _resolve_collection_user_id(collection) + return MergeCandidateDetector( + store=store, + vector_connector=vector_connector, + embedder=_SyncEmbedderShim(embedder), + collection_id=collection.id, + user_id=user_id, + ) + + +def _resolve_collection_user_id(collection: Any) -> str: + """Best-effort recovery of the collection owner's user id, used by + :class:`MergeCandidateDetector` to attribute the auto-detect run. + Falls back to the literal ``"system"`` so the detector still runs + when the field is unavailable (the suggestion's ``source`` already + distinguishes auto-detect from user-triggered runs).""" + for attr in ("user_id", "user", "owner_id"): + value = getattr(collection, attr, None) + if isinstance(value, str) and value: + return value + return "system" + + # --------------------------------------------------------------------- # Helpers — backend dispatch + per-process client singletons + lock # selection. diff --git a/tests/unit_test/indexing/test_t1_2_graph.py b/tests/unit_test/indexing/test_t1_2_graph.py index 150a15fb6..310386b9e 100644 --- a/tests/unit_test/indexing/test_t1_2_graph.py +++ b/tests/unit_test/indexing/test_t1_2_graph.py @@ -1272,3 +1272,661 @@ async def test_w7_delete_relation_returns_false_when_absent(): store = InMemoryLineageGraphStore() deleted = await store.delete_relation("Alice", "Bob", "knows") assert deleted is False + + +# --------------------------------------------------------------------- +# Group 6: Wave 7 W7-3 — ``GraphModalityWorker.sync()`` Phase 3 +# extension (compactor → embed → vector upsert → snapshot-diff +# delete → merge candidate detector). Covers the InMemory reference +# store; cross-backend coverage of the storage primitives lives in +# ``tests/integration/compat/test_lineage_graph_compat.py`` (W7-1). +# --------------------------------------------------------------------- + + +from uuid import NAMESPACE_DNS, uuid5 # noqa: E402 — Wave 7 W7-3 group section import + +from aperag.vectorstore.dto import VectorPoint # noqa: E402 — same + + +class _StubCompactor: + """Stub :class:`GraphIndexCompactor` — returns whatever the test + wired into ``response`` (None to opt-out, str to overwrite).""" + + def __init__(self, response: str | None = None, raise_after: int = 0) -> None: + self.response = response + self.calls: list[list[str]] = [] + self._raise_after = raise_after + + async def compact_if_oversized(self, parts: list[str]) -> str | None: + self.calls.append(list(parts)) + if self._raise_after and len(self.calls) >= self._raise_after: + raise RuntimeError("simulated compactor failure") + return self.response + + +class _StubVectorConnector: + """Captures upsert / delete calls against a deterministic embedder.""" + + def __init__(self, raise_on_upsert: bool = False) -> None: + self.upserts: list[list[VectorPoint]] = [] + self.deletes: list[list[str]] = [] + self._raise_on_upsert = raise_on_upsert + + def upsert(self, points: list[VectorPoint]) -> list[str]: + if self._raise_on_upsert: + raise RuntimeError("simulated vector upsert failure") + self.upserts.append(list(points)) + return [p.id for p in points] + + def delete(self, ids: list[str]) -> None: + self.deletes.append(list(ids)) + + +def _stub_embedder(text: str) -> list[float]: + # Deterministic 4-dim embedding so tests can compare equality. + h = abs(hash(text)) % 1000 + return [float(h % 7) / 7.0, float(h % 11) / 11.0, float(h % 13) / 13.0, float(h % 17) / 17.0] + + +class _StubMergeDetector: + def __init__(self) -> None: + self.calls: list[tuple[str, list[str]]] = [] + + async def detect_for_sync(self, *, sync_run_id: str, affected_entity_names: Sequence[str]) -> int: + self.calls.append((sync_run_id, list(affected_entity_names))) + return len(affected_entity_names) + + +def _phase3_worker( + *, + store: InMemoryLineageGraphStore, + entity_lock: EntityLock, + object_store: InMemoryObjectStore, + document_id: str, + entities_per_doc: dict[str, list[EntityRecord]] | None = None, + relations_per_doc: dict[str, list[RelationRecord]] | None = None, + compactor: Any = None, + embedder: Any = None, + vector_connector: Any = None, + merge_detector: Any = None, +) -> GraphModalityWorker: + async def extractor( + chunks: Sequence[dict[str, Any]], + ) -> tuple[list[EntityRecord], list[RelationRecord]]: + del chunks + return ( + list((entities_per_doc or {}).get(document_id, [])), + list((relations_per_doc or {}).get(document_id, [])), + ) + + return GraphModalityWorker( + store=store, + extractor=extractor, + entity_lock=entity_lock, + object_store=object_store, + collection_id=COLLECTION_ID, + tenant_scope_key=DEFAULT_TENANT, + compactor=compactor, + embedder=embedder, + vector_connector=vector_connector, + merge_detector=merge_detector, + ) + + +def _expected_entity_id(name: str) -> str: + return str(uuid5(NAMESPACE_DNS, f"graph_entity:{COLLECTION_ID}:{name}")) + + +def _expected_relation_id(source: str, target: str, type_: str) -> str: + return str(uuid5(NAMESPACE_DNS, f"graph_relation:{COLLECTION_ID}:{source}->{target}:{type_}")) + + +@pytest.mark.asyncio +async def test_w7_phase3_skipped_when_vector_deps_unwired(store, entity_lock, object_store): + """Wave 6 backward-compat: a worker with no embedder / + vector_connector behaves exactly as the lineage-only Wave 6 + version — no Phase 3 side effects.""" + entities_per_doc = { + "doc_A": [ + EntityRecord( + name="Linus", + entity_type="Person", + description="kernel hacker", + source_chunk_ids=("doc_A-v1-c0",), + ) + ] + } + worker = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + # All Phase 3 deps deliberately None. + ) + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + # Lineage rebuild still happened (Wave 6 path). + entity = await store.get_entity("Linus") + assert entity is not None + assert entity.compacted_description is None # Phase 3 skipped — no compactor write. + + +@pytest.mark.asyncio +async def test_w7_phase3_writes_vector_point_with_3_field_payload(store, entity_lock, object_store): + """Spec §K.12.5 lock: vector point payload is 3 fields exactly + (``indexer`` / ``entity_name`` / ``entity_type``); no + ``collection_id`` payload (that lives in the uuid5 id instead so + a shared backing store still has cross-collection unique ids).""" + entities_per_doc = { + "doc_A": [ + EntityRecord( + name="Linus", + entity_type="Person", + description="kernel hacker", + source_chunk_ids=("doc_A-v1-c0",), + ) + ] + } + vector = _StubVectorConnector() + worker = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + embedder=_stub_embedder, + vector_connector=vector, + ) + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + + assert len(vector.upserts) == 1 + [point] = vector.upserts[0] + assert point.id == _expected_entity_id("Linus") + assert point.payload == { + "indexer": "graph_entity", + "entity_name": "Linus", + "entity_type": "Person", + } + # 3 fields strict — no collection_id leakage. + assert "collection_id" not in point.payload + + +@pytest.mark.asyncio +async def test_w7_phase3_uuid5_id_is_deterministic(store, entity_lock, object_store): + """Same (collection, name) MUST produce identical uuid5 across + calls so vector upsert overwrites the existing point instead of + leaving stale duplicates (forward-only retry safety).""" + entities_per_doc = { + "doc_A": [ + EntityRecord( + name="Linus", + entity_type="Person", + description="rev1", + source_chunk_ids=("c0",), + ) + ] + } + vector = _StubVectorConnector() + worker = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + embedder=_stub_embedder, + vector_connector=vector, + ) + # First sync. + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + first_id = vector.upserts[0][0].id + # Re-sync same doc same content — id MUST match (deterministic). + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + assert vector.upserts[1][0].id == first_id + assert first_id == _expected_entity_id("Linus") + + +@pytest.mark.asyncio +async def test_w7_phase3_compactor_runs_before_embed(store, entity_lock, object_store): + """Spec §K.12.3 invariant #3 ordering: compactor MUST run before + the vector embed step so the embedded text is the LLM-summarised + version, not the raw concat. We assert by wiring a compactor that + returns ``"COMPACTED"`` and checking that the embed input matches.""" + entities_per_doc = { + "doc_A": [ + EntityRecord( + name="Linus", + entity_type="Person", + description="raw text", + source_chunk_ids=("c0",), + ) + ] + } + compactor = _StubCompactor(response="COMPACTED") + vector = _StubVectorConnector() + captured_embed_inputs: list[str] = [] + + def capturing_embedder(text: str) -> list[float]: + captured_embed_inputs.append(text) + return _stub_embedder(text) + + worker = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + compactor=compactor, + embedder=capturing_embedder, + vector_connector=vector, + ) + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + # Compactor was called with the per-doc parts text. + assert compactor.calls == [["raw text"]] + # Embed input is the compacted summary, not the raw text. + assert captured_embed_inputs == ["COMPACTED"] + # Storage row also carries the compacted value for downstream readers. + entity = await store.get_entity("Linus") + assert entity is not None + assert entity.compacted_description == "COMPACTED" + + +@pytest.mark.asyncio +async def test_w7_phase3_compactor_none_falls_back_to_raw_text(store, entity_lock, object_store): + """Compactor returning ``None`` (below threshold) leaves the + storage column untouched and the embedder gets the + ``name + raw parts`` fallback string.""" + entities_per_doc = { + "doc_A": [ + EntityRecord( + name="Linus", + entity_type="Person", + description="short", + source_chunk_ids=("c0",), + ) + ] + } + compactor = _StubCompactor(response=None) + vector = _StubVectorConnector() + captured: list[str] = [] + + def capturing(text: str) -> list[float]: + captured.append(text) + return _stub_embedder(text) + + worker = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + compactor=compactor, + embedder=capturing, + vector_connector=vector, + ) + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + assert captured == ["Linus\n\nshort"] + entity = await store.get_entity("Linus") + assert entity is not None + assert entity.compacted_description is None # COALESCE preserve. + + +@pytest.mark.asyncio +async def test_w7_phase3_snapshot_diff_deletes_gc_vector_points(store, entity_lock, object_store): + """Doc-delete cascade: the entity gc'd by Phase 1 MUST have its + vector point deleted in Phase 3 step C — using the lineage-store + name set diff (not an ANN list-all per invariant #7).""" + # Sync 1: doc_A produces ``Linus``. + entities_per_doc_v1 = { + "doc_A": [ + EntityRecord( + name="Linus", + entity_type="Person", + description="kernel hacker", + source_chunk_ids=("c0",), + ) + ] + } + vector = _StubVectorConnector() + worker_v1 = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc_v1, + embedder=_stub_embedder, + vector_connector=vector, + ) + await _derive_then_sync( + worker=worker_v1, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + assert len(vector.upserts) == 1 # Linus upserted. + + # Sync 2: doc_A re-parsed but no longer mentions Linus → entity + # gc'd → vector point should be deleted by snapshot-diff. + entities_per_doc_v2 = {"doc_A": []} + worker_v2 = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc_v2, + embedder=_stub_embedder, + vector_connector=vector, + ) + await _derive_then_sync( + worker=worker_v2, + document_id="doc_A", + parse_version="v2", + object_store=object_store, + ) + # No new upsert (kg.jsonl empty). + assert len(vector.upserts) == 1 + # Snapshot-diff issued a delete for Linus' vector point. + assert vector.deletes == [[_expected_entity_id("Linus")]] + + +@pytest.mark.asyncio +async def test_w7_phase3_snapshot_diff_preserves_cross_doc_entity(store, entity_lock, object_store): + """Cross-doc shared entity: when doc_A is re-parsed and drops + Linus, but doc_B still mentions him, Phase 3 MUST NOT delete the + vector point (post_sync set still contains Linus from doc_B).""" + # Seed both docs. + vector = _StubVectorConnector() + entities_per_doc = { + "doc_A": [ + EntityRecord( + name="Linus", + entity_type="Person", + description="kernel hacker per doc_A", + source_chunk_ids=("c0",), + ) + ], + "doc_B": [ + EntityRecord( + name="Linus", + entity_type="Person", + description="kernel hacker per doc_B", + source_chunk_ids=("c10",), + ) + ], + } + worker_a = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + embedder=_stub_embedder, + vector_connector=vector, + ) + worker_b = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_B", + entities_per_doc=entities_per_doc, + embedder=_stub_embedder, + vector_connector=vector, + ) + await _derive_then_sync( + worker=worker_a, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + await _derive_then_sync( + worker=worker_b, + document_id="doc_B", + parse_version="v1", + object_store=object_store, + ) + # Re-sync doc_A with no entities — Linus survives via doc_B. + entities_per_doc_v2 = {"doc_A": [], "doc_B": entities_per_doc["doc_B"]} + worker_a_v2 = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc_v2, + embedder=_stub_embedder, + vector_connector=vector, + ) + await _derive_then_sync( + worker=worker_a_v2, + document_id="doc_A", + parse_version="v2", + object_store=object_store, + ) + # No deletes — Linus still alive via doc_B's lineage. + assert vector.deletes == [] + + +@pytest.mark.asyncio +async def test_w7_phase3_relation_vector_upsert_payload_and_id(store, entity_lock, object_store): + """Relation Phase 3 mirrors entity: 3-field payload with + ``indexer="graph_relation"``, uuid5 id formatted + ``graph_relation::->:``.""" + entities_per_doc = { + "doc_A": [ + EntityRecord(name="Alice", entity_type="Person", description="a", source_chunk_ids=("c0",)), + EntityRecord(name="Bob", entity_type="Person", description="b", source_chunk_ids=("c0",)), + ] + } + relations_per_doc = { + "doc_A": [ + RelationRecord( + source="Alice", + target="Bob", + relation_type="knows", + description="Alice knows Bob", + source_chunk_ids=("c0",), + ) + ] + } + vector = _StubVectorConnector() + worker = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + relations_per_doc=relations_per_doc, + embedder=_stub_embedder, + vector_connector=vector, + ) + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + # 3 upserts: 2 entities + 1 relation. + upserted_ids = {pts[0].id for pts in vector.upserts} + upserted_payloads = {pts[0].payload["indexer"] for pts in vector.upserts} + assert _expected_relation_id("Alice", "Bob", "knows") in upserted_ids + assert "graph_relation" in upserted_payloads + # Find the relation point and validate payload shape. + relation_points = [pts[0] for pts in vector.upserts if pts[0].payload["indexer"] == "graph_relation"] + assert len(relation_points) == 1 + rel_payload = relation_points[0].payload + assert rel_payload == { + "indexer": "graph_relation", + "entity_name": "Alice->Bob", + "entity_type": "knows", + } + + +@pytest.mark.asyncio +async def test_w7_phase3_merge_detector_invoked_with_affected_names(store, entity_lock, object_store): + """Step D wiring: when a detector is wired, Phase 3 calls + ``detect_for_sync`` exactly once per sync, with the entity names + just touched by this sync (per D-3 — incremental detection only).""" + entities_per_doc = { + "doc_A": [ + EntityRecord(name="Alice", entity_type="Person", description="a", source_chunk_ids=("c0",)), + EntityRecord(name="Bob", entity_type="Person", description="b", source_chunk_ids=("c0",)), + ] + } + detector = _StubMergeDetector() + worker = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + embedder=_stub_embedder, + vector_connector=_StubVectorConnector(), + merge_detector=detector, + ) + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + assert len(detector.calls) == 1 + sync_run_id, names = detector.calls[0] + assert sync_run_id == f"{COLLECTION_ID}:doc_A:v1" + assert sorted(names) == ["Alice", "Bob"] + + +@pytest.mark.asyncio +async def test_w7_phase3_merge_detector_failure_is_non_fatal(store, entity_lock, object_store): + """Detector throwing must NOT abort sync — Phase 3 step D is + best-effort (write-only auxiliary, not on the lineage critical + path). The lineage rebuild + vector upsert that ran before it + must remain intact.""" + + class _RaisingDetector: + async def detect_for_sync(self, *, sync_run_id, affected_entity_names): + raise RuntimeError("simulated detector failure") + + entities_per_doc = { + "doc_A": [ + EntityRecord(name="Linus", entity_type="Person", description="raw", source_chunk_ids=("c0",)), + ] + } + vector = _StubVectorConnector() + worker = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + embedder=_stub_embedder, + vector_connector=vector, + merge_detector=_RaisingDetector(), + ) + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + # Lineage + vector upsert succeeded. + assert (await store.get_entity("Linus")) is not None + assert len(vector.upserts) == 1 + + +@pytest.mark.asyncio +async def test_w7_phase3_compactor_failure_falls_back(store, entity_lock, object_store): + """Compactor failing (LLM flake) must NOT abort sync — vector + upsert continues with the raw description fallback (forward-only + retry safety).""" + entities_per_doc = { + "doc_A": [ + EntityRecord(name="Linus", entity_type="Person", description="raw", source_chunk_ids=("c0",)), + ] + } + compactor = _StubCompactor(raise_after=1) + vector = _StubVectorConnector() + captured: list[str] = [] + + def capturing(text: str) -> list[float]: + captured.append(text) + return _stub_embedder(text) + + worker = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + compactor=compactor, + embedder=capturing, + vector_connector=vector, + ) + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + # Embedder ran with the fallback. + assert captured == ["Linus\n\nraw"] + # Vector upsert still happened. + assert len(vector.upserts) == 1 + + +@pytest.mark.asyncio +async def test_w7_phase3_vector_upsert_failure_is_non_fatal(store, entity_lock, object_store): + """Vector connector raising on upsert must NOT abort sync — the + failure is logged and lineage state is unaffected.""" + entities_per_doc = { + "doc_A": [ + EntityRecord(name="Linus", entity_type="Person", description="raw", source_chunk_ids=("c0",)), + ] + } + vector = _StubVectorConnector(raise_on_upsert=True) + worker = _phase3_worker( + store=store, + entity_lock=entity_lock, + object_store=object_store, + document_id="doc_A", + entities_per_doc=entities_per_doc, + embedder=_stub_embedder, + vector_connector=vector, + ) + await _derive_then_sync( + worker=worker, + document_id="doc_A", + parse_version="v1", + object_store=object_store, + ) + # Lineage row still present. + assert (await store.get_entity("Linus")) is not None + # No upsert recorded (raised before append). + assert vector.upserts == []