diff --git a/aperag/domains/knowledge_graph/db/models.py b/aperag/domains/knowledge_graph/db/models.py index bfdc80fc4..68eae0b5f 100644 --- a/aperag/domains/knowledge_graph/db/models.py +++ b/aperag/domains/knowledge_graph/db/models.py @@ -133,9 +133,36 @@ class GraphCurationSuggestion(Base): gmt_operated = Column(DateTime(timezone=True), nullable=True) +class LineageEntityAlias(Base): + """Persistent record of a user-driven entity merge — Wave 7 §K.12.7 + + §K.12.10b. ``(collection_id, alias_name)`` is unique; the row + points the alias at the (already-flattened) ``canonical_name``. + + Survives the canonical entity's GC: a future indexer write to the + alias name still resolves to the (now-empty) canonical, preserving + user intent (spec §K.12.7 decision X).""" + + __tablename__ = "aperag_lineage_entity_alias" + __table_args__ = ( + Index( + "ix_aperag_lineage_entity_alias_canonical", + "collection_id", + "canonical_name", + ), + ) + + collection_id = Column(String(64), primary_key=True, nullable=False) + alias_name = Column(String(512), primary_key=True, nullable=False) + canonical_name = Column(String(512), nullable=False) + merged_by = Column(String(256), nullable=True) + gmt_created = Column(DateTime(timezone=True), default=utc_now, nullable=False) + gmt_updated = Column(DateTime(timezone=True), default=utc_now, onupdate=utc_now, nullable=False) + + __all__ = [ "GraphCurationRun", "GraphCurationRunStatus", "GraphCurationSuggestion", "GraphCurationSuggestionStatus", + "LineageEntityAlias", ] diff --git a/aperag/graph_curation/alias_map.py b/aperag/graph_curation/alias_map.py new file mode 100644 index 000000000..c5e9cf5e0 --- /dev/null +++ b/aperag/graph_curation/alias_map.py @@ -0,0 +1,213 @@ +# Copyright 2026 ApeCloud, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Alias-map repository — Wave 7 §K.12.7 + §K.12.10b task #6. + +Persists user-driven entity merge intent. The :class:`AliasMapRepository` +is the canonical write/read surface; downstream consumers use the two +methods :meth:`AliasMapRepository.resolve_canonical` (read) and +:meth:`AliasMapRepository.upsert_alias` (write, with cycle reject and +transitive flatten). + +Design notes +------------ + +* Per-collection scoping is a row-level concern (column + ``collection_id``). The repository takes ``collection_id`` on each + call rather than at construction to keep the repo a stateless + singleton (mirrors :class:`AsyncBaseRepository` convention). +* Cycle handling is the service-layer invariant from §K.12.10b: + ``upsert_alias`` ALWAYS resolves the requested ``target`` through + the existing chain first. If the resolved canonical equals the + ``alias_name`` itself, we raise :class:`AliasCycleError` instead of + writing a self-loop. +* Transitive flatten: when ``B → C`` is recorded, any prior + ``A → B`` row is rewritten to ``A → C`` in the same transaction. + Readers therefore always see at most one indirection (no chain + walks at read time, even after multi-step merges). +""" + +from __future__ import annotations + +import logging + +from sqlalchemy import delete, select, update +from sqlalchemy.ext.asyncio import AsyncSession + +from aperag.db.repositories.base import AsyncBaseRepository +from aperag.domains.knowledge_graph.db.models import LineageEntityAlias +from aperag.utils.utils import utc_now + +logger = logging.getLogger(__name__) + + +class AliasCycleError(ValueError): + """Raised when an alias upsert would create a self-loop — + ``alias_name == resolve_canonical(target)``. The service layer + aborts the merge and surfaces this to the caller so the user can + pick a different target. + """ + + +class AliasMapRepository(AsyncBaseRepository): + """Read/write surface for ``aperag_lineage_entity_alias``. + + Two write methods (:meth:`upsert_alias`, :meth:`purge_collection`) + and two read methods (:meth:`resolve_canonical`, + :meth:`list_aliases_pointing_at`). Everything else can be expressed + as composition. + """ + + # ------------------------------------------------------------------ + # read path + # ------------------------------------------------------------------ + + async def resolve_canonical(self, *, collection_id: str, name: str) -> str: + """Return the canonical name for ``name`` in ``collection_id``. + + ``name`` itself is returned when no alias row points at it + (i.e. ``name`` is already canonical or has never been merged). + Returns at most one indirection because :meth:`upsert_alias` + flattens transitively at write time. + """ + if not name: + return name + + async def _op(session: AsyncSession) -> str: + row = await session.get(LineageEntityAlias, (collection_id, name)) + if row is None: + return name + return str(row.canonical_name) + + return await self._execute_query(_op) + + async def list_aliases_pointing_at(self, *, collection_id: str, canonical_name: str) -> list[str]: + """Return every ``alias_name`` whose row points at + ``canonical_name`` in ``collection_id``. + + Used by :meth:`upsert_alias` to perform transitive flatten and + by tests / admin tooling. Order is alphabetical for + determinism.""" + + async def _op(session: AsyncSession) -> list[str]: + stmt = ( + select(LineageEntityAlias.alias_name) + .where( + LineageEntityAlias.collection_id == collection_id, + LineageEntityAlias.canonical_name == canonical_name, + ) + .order_by(LineageEntityAlias.alias_name) + ) + result = await session.execute(stmt) + return [r[0] for r in result.all()] + + return await self._execute_query(_op) + + # ------------------------------------------------------------------ + # write path + # ------------------------------------------------------------------ + + async def upsert_alias( + self, + *, + collection_id: str, + alias_name: str, + target: str, + merged_by: str | None = None, + ) -> str: + """Record that ``alias_name`` should resolve to ``target`` in + ``collection_id``. + + Returns the resolved canonical name actually written (which may + differ from ``target`` if ``target`` itself was already an + alias — we flatten through to the terminal canonical so readers + never traverse a chain). + + Cycle reject: if the resolved canonical equals ``alias_name``, + raise :class:`AliasCycleError` instead of writing a self-loop. + + Transitive flatten: any prior alias row whose ``canonical_name`` + equals the *old* row at ``(collection_id, alias_name)`` gets + rewritten to point at the new canonical. The flatten + the + upsert run inside one transaction so a partial flatten can + never be observed. + """ + + async def _op(session: AsyncSession) -> str: + # Resolve target through any existing alias row (single + # indirection, since flatten keeps the table 1-deep). + target_row = await session.get(LineageEntityAlias, (collection_id, target)) + canonical = str(target_row.canonical_name) if target_row is not None else target + + if canonical == alias_name: + raise AliasCycleError( + f"alias upsert would create a cycle: " + f"alias={alias_name!r} → target={target!r} resolves to {canonical!r}" + ) + + now = utc_now() + existing = await session.get(LineageEntityAlias, (collection_id, alias_name)) + if existing is not None: + existing.canonical_name = canonical + existing.merged_by = merged_by + existing.gmt_updated = now + else: + session.add( + LineageEntityAlias( + collection_id=collection_id, + alias_name=alias_name, + canonical_name=canonical, + merged_by=merged_by, + gmt_created=now, + gmt_updated=now, + ) + ) + + # Transitive flatten: any row pointing at ``alias_name`` now + # needs to point at the new canonical instead. (The alias + # name was canonical from those rows' perspective; now it is + # itself an alias of ``canonical``.) + await session.execute( + update(LineageEntityAlias) + .where( + LineageEntityAlias.collection_id == collection_id, + LineageEntityAlias.canonical_name == alias_name, + ) + .values( + canonical_name=canonical, + gmt_updated=now, + ) + ) + return canonical + + return await self.execute_with_transaction(_op) + + async def purge_collection(self, collection_id: str) -> int: + """Delete all alias rows for ``collection_id``. Returns the + number of rows deleted. Used by collection-purge / test + teardown.""" + + async def _op(session: AsyncSession) -> int: + result = await session.execute( + delete(LineageEntityAlias).where(LineageEntityAlias.collection_id == collection_id) + ) + return int(result.rowcount or 0) + + return await self.execute_with_transaction(_op) + + +__all__ = [ + "AliasCycleError", + "AliasMapRepository", +] diff --git a/aperag/graph_curation/lineage_merge.py b/aperag/graph_curation/lineage_merge.py new file mode 100644 index 000000000..2ca5cee6e --- /dev/null +++ b/aperag/graph_curation/lineage_merge.py @@ -0,0 +1,454 @@ +# Copyright 2026 ApeCloud, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""User-driven entity merge over the lineage graph store — Wave 7 +§K.12.6 / §K.12.10 task #6. + +Replaces the legacy ``GraphIndexService.merge_entities`` flow with one +that operates on :class:`aperag.indexing.graph.LineageGraphStore` + +:class:`aperag.graph_curation.alias_map.AliasMapRepository` + +:class:`aperag.indexing.graph_compactor.GraphIndexCompactor` + +:class:`aperag.vectorstore.base.VectorStoreConnector`. Per architect +ratify msg=cf860ae4 + huangheng endorse msg=22816e0d: + +Step ordering (locked, invariant #2 L1 → L2 单向派生): + +1. ``alias_repo.resolve_canonical(target_name)`` to flatten any + pre-existing chain (1-hop guarantee — target may itself be an + alias). +2. For each source: ``alias_repo.upsert_alias(alias=source, + target=final_target)``. Cycle reject is :class:`AliasCycleError` + raised by the alias repo; we let it propagate so the user sees an + actionable error. +3. Read target + sources via ``store.get_entity``; UNION their + ``description_parts`` lists. +4. LLM unified description over the merged parts. +5. Compactor pass via ``compact_if_oversized([unified], + subject_kind="entity", subject_label=final_target, language=...)`` + — kwargs locked by drift #3. +6. L1 target write — TWO sub-steps: + 6a. For each source's ``description_parts``: re-upsert under the + target name preserving the original + ``(document_id, parse_version, chunk_ids)`` so per-doc lineage + is not lost (invariant #1 L1 不污染). + 6b. Final upsert with the unified text + compacted_description, + tagged with sentinel ``document_id="__curation_merge__"`` + (sentinel pick locked by huangheng msg=22816e0d) and + ``parse_version=str(merge_uuid)`` so each merge has a unique + lineage member that cannot be confused with a real-doc + contribution. +7. Vector layer write — 3-field payload (``indexer / entity_name / + entity_type``), ``uuid5(NAMESPACE_DNS, "graph_entity:{cid}:{name}")`` + for the deterministic point id (invariants #5 + #6). +8. Delete sources — last. ``store.delete_entity(s)`` (single-arg, drift + #1) plus the corresponding vector point delete. + +Failure mode +------------ + +The merge is *not* a single SQL transaction across all eight steps — +the lineage store layer commits each ``upsert_*_with_lineage`` call +independently. Per huangheng msg=22816e0d this gives a "soft +transition" mid-failure state: partial source parts already on the +target, sources still readable until step 8 deletes them, and the +caller can retry the merge idempotently because every upsert is keyed +on ``(document_id, parse_version)``. + +The alias upsert (step 2) IS transactional inside :class:`AliasMapRepository` +so a failed merge never leaves orphan alias rows that point at half-merged +canonicals. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import Any, Awaitable, Callable, Sequence +from uuid import NAMESPACE_DNS, uuid4, uuid5 + +from aperag.graph_curation.alias_map import AliasCycleError, AliasMapRepository +from aperag.indexing.graph import ( + DescriptionPart, + EntityRecord, + EntityWithLineage, + LineageGraphStore, + LineageMember, +) +from aperag.indexing.graph_compactor import GraphIndexCompactor +from aperag.vectorstore.base import VectorStoreConnector +from aperag.vectorstore.dto import VectorPoint + +logger = logging.getLogger(__name__) + + +# Sentinel ``document_id`` for the unified+compacted lineage member +# written in step 6b. ``__curation_merge__`` was picked by huangheng +# msg=22816e0d over ``__user_merge__`` because ``user`` is misleading +# (the API caller is ``GraphCurationService``, not the UI user). +CURATION_MERGE_DOCUMENT_ID: str = "__curation_merge__" + + +# Wave 7 §K.12 invariant #5: graph entity vector points carry exactly +# these three payload fields. Locked by architect ratify (msg=acbd0003 +# Q4 + msg=cf860ae4 step 7'). Match this constant against task #3's +# write payload to keep the indexer hot path and the curation merge +# path in lock-step. +GRAPH_ENTITY_INDEXER: str = "graph_entity" + + +@dataclass(frozen=True) +class LineageMergeResult: + """Return shape of :meth:`LineageEntityMerger.merge_entities`. + + ``final_target`` is the canonical name actually written to (may + differ from the requested ``target_name`` if it was itself an + alias). ``merged_source_ids`` is the list of source names that + were folded in (in input order). ``compacted_description`` is the + unified+compacted text written to L1 / embedded into the vector + point — ``None`` when the unified description was short enough to + skip compaction (Compactor returns ``None`` below threshold). + """ + + final_target: str + merged_source_ids: list[str] + unified_description: str + compacted_description: str | None + + +# Embedder API: callers pass any object exposing a sync ``embed_query`` +# (matches :class:`aperag.llm.embed.embedding_service.EmbeddingService`). +EmbedQueryFn = Callable[[str], list[float]] + + +# LLM API: any async callable mapping prompt → response text. Reuses +# the :class:`aperag.indexing.llm.LLMCall` shape so wiring is a direct +# drop-in. +LLMCall = Callable[[str], Awaitable[str]] + + +class LineageEntityMerger: + """Per-collection user-driven merge orchestrator. + + Constructor takes the inner store (NOT the alias-redirect decorator + — the merge writes the canonical names directly), the alias-map + repository, the compactor + LLM + embedder + vector connector, and + the bound ``collection_id``. Construction does no I/O. + """ + + def __init__( + self, + *, + store: LineageGraphStore, + alias_repo: AliasMapRepository, + compactor: GraphIndexCompactor, + vector_connector: VectorStoreConnector, + embedder: Any, + llm: LLMCall, + collection_id: str, + language: str = "Chinese", + ) -> None: + self._store = store + self._alias_repo = alias_repo + self._compactor = compactor + self._vector_connector = vector_connector + self._embedder = embedder + self._llm = llm + self._collection_id = collection_id + self._language = language + + # ------------------------------------------------------------------ + # public surface + # ------------------------------------------------------------------ + + async def merge_entities( + self, + *, + target_name: str, + source_names: Sequence[str], + merged_by: str | None = None, + ) -> LineageMergeResult: + """Merge ``source_names`` into ``target_name`` (or the canonical + ``target_name`` resolves to). + + Raises :class:`aperag.graph_curation.alias_map.AliasCycleError` + if any source → target relationship would form a cycle. The + caller should surface this to the user so they can pick a + different target. + + Empty ``source_names`` is a no-op (returns immediately with + ``LineageMergeResult(final_target=target_name, ..., + merged_source_ids=[])``). + """ + if not source_names: + return LineageMergeResult( + final_target=target_name, + merged_source_ids=[], + unified_description="", + compacted_description=None, + ) + + # Step 1 — flatten target chain (1-hop). + final_target = await self._alias_repo.resolve_canonical(collection_id=self._collection_id, name=target_name) + + # Step 2 — record the alias rows so future indexer writes get + # transparently redirected. Cycle reject propagates as + # AliasCycleError so the caller can abort the merge cleanly. + for src in source_names: + await self._alias_repo.upsert_alias( + collection_id=self._collection_id, + alias_name=src, + target=final_target, + merged_by=merged_by, + ) + + # Step 3 — read target + sources, accumulate description parts. + target_entity = await self._store.get_entity(final_target) + if target_entity is None: + # Target was GC'd between merge initiation and execution. + # We still wrote the alias rows so future indexer writes to + # the sources will resolve to the (now-empty) canonical; + # but the merge body has nothing to consolidate. + logger.warning( + "lineage_merger: target %r missing at merge time (collection=%s); " + "alias rows written but no L1/vector update performed", + final_target, + self._collection_id, + ) + return LineageMergeResult( + final_target=final_target, + merged_source_ids=list(source_names), + unified_description="", + compacted_description=None, + ) + + source_entities: list[EntityWithLineage] = [] + for src in source_names: + row = await self._store.get_entity(src) + if row is not None: + source_entities.append(row) + + all_parts: list[DescriptionPart] = list(target_entity.description_parts) + for src in source_entities: + all_parts.extend(src.description_parts) + part_texts = [p.text for p in all_parts if p.text and p.text.strip()] + + # Step 4 — LLM unified description. + unified = await self._unified_description( + entity_name=final_target, + entity_type=target_entity.entity_type, + part_texts=part_texts, + ) + + # Step 5 — compact if oversized. + compacted = await self._compactor.compact_if_oversized( + [unified], + subject_kind="entity", + subject_label=final_target, + language=self._language, + ) + + # Step 6a — re-anchor source parts under the target name, + # preserving their original lineage so per-doc tracking is not + # lost (invariant #1). DescriptionPart carries no chunk_ids of + # its own; we look up the matching LineageMember by + # ``(document_id, parse_version)`` to recover them. + for src in source_entities: + lineage_by_key = {m.key(): m for m in src.source_lineage} + for part in src.description_parts: + member = lineage_by_key.get(part.key()) + chunk_ids = tuple(member.chunk_ids) if member is not None else () + await self._store.upsert_entity_with_lineage( + record=EntityRecord( + name=final_target, + entity_type=target_entity.entity_type, + description=part.text, + source_chunk_ids=chunk_ids, + ), + lineage=LineageMember( + document_id=part.document_id, + parse_version=part.parse_version, + tenant_scope_key=self._tenant_scope_key_for(src), + chunk_ids=chunk_ids, + ), + ) + + # Step 6b — final write with unified text + compacted_description + # under the curation-merge sentinel lineage. + merge_uuid = str(uuid4()) + await self._store.upsert_entity_with_lineage( + record=EntityRecord( + name=final_target, + entity_type=target_entity.entity_type, + description=unified, + source_chunk_ids=(), + ), + lineage=LineageMember( + document_id=CURATION_MERGE_DOCUMENT_ID, + parse_version=merge_uuid, + tenant_scope_key=self._tenant_scope_key_for(target_entity), + chunk_ids=(), + ), + compacted_description=compacted, + ) + + # Step 7 — vector layer write (3-field payload, uuid5 point id). + await self._upsert_vector_point( + entity_name=final_target, + entity_type=target_entity.entity_type, + text=compacted or unified, + ) + + # Step 8 — delete sources from L1 and vector store, last. + for src_entity in source_entities: + await self._store.delete_entity(src_entity.name) + await self._delete_vector_point(src_entity.name) + + return LineageMergeResult( + final_target=final_target, + merged_source_ids=list(source_names), + unified_description=unified, + compacted_description=compacted, + ) + + # ------------------------------------------------------------------ + # internals + # ------------------------------------------------------------------ + + async def _unified_description( + self, + *, + entity_name: str, + entity_type: str, + part_texts: list[str], + ) -> str: + """Use the LLM to produce one consolidated description from the + per-doc fragments. Returns the joined fragments unchanged when + there are 0 or 1 fragments (no merge needed) or when the LLM + call fails (graceful degrade — partial merge still proceeds). + """ + if not part_texts: + return "" + if len(part_texts) == 1: + return part_texts[0] + prompt = self._render_merge_prompt( + entity_name=entity_name, + entity_type=entity_type, + fragments=part_texts, + ) + try: + return (await self._llm(prompt)).strip() + except Exception: + logger.warning( + "lineage_merger: unified-description LLM call failed for %r; falling back to joined parts", + entity_name, + exc_info=True, + ) + return "\n\n".join(part_texts) + + @staticmethod + def _render_merge_prompt(*, entity_name: str, entity_type: str, fragments: list[str]) -> str: + # Light-weight prompt — the heavy lifting (length cap, language + # control, S-P-O framing) lives in the Compactor (task #2). The + # merge prompt's only job is to consolidate fragments into one + # paragraph so the Compactor sees a single coherent input. + joined = "\n\n---\n\n".join(fragments) + return ( + f"You are merging descriptions of the same {entity_type} " + f"entity ({entity_name}). Consolidate the following " + f"fragments into ONE coherent paragraph that preserves " + f"every fact. Do not add information that isn't in the " + f"fragments.\n\n" + f"Fragments:\n\n---\n\n{joined}\n\n---\n\n" + f"Consolidated description:" + ) + + async def _upsert_vector_point(self, *, entity_name: str, entity_type: str, text: str) -> None: + if not text: + return + try: + embedding = await _to_thread(self._embedder.embed_query, text) + except Exception: + logger.warning( + "lineage_merger: embed failed for %r; skipping vector upsert", + entity_name, + exc_info=True, + ) + return + point = VectorPoint( + id=str( + uuid5( + NAMESPACE_DNS, + f"{GRAPH_ENTITY_INDEXER}:{self._collection_id}:{entity_name}", + ) + ), + vector=embedding, + payload={ + "indexer": GRAPH_ENTITY_INDEXER, + "entity_name": entity_name, + "entity_type": entity_type, + }, + ) + try: + await _to_thread(self._vector_connector.upsert, [point]) + except Exception: + logger.warning( + "lineage_merger: vector upsert failed for %r", + entity_name, + exc_info=True, + ) + + async def _delete_vector_point(self, entity_name: str) -> None: + point_id = str( + uuid5( + NAMESPACE_DNS, + f"{GRAPH_ENTITY_INDEXER}:{self._collection_id}:{entity_name}", + ) + ) + try: + await _to_thread(self._vector_connector.delete, [point_id]) + except Exception: + logger.warning( + "lineage_merger: vector delete failed for %r", + entity_name, + exc_info=True, + ) + + @staticmethod + def _tenant_scope_key_for(entity: EntityWithLineage) -> str: + # Re-anchored parts inherit the tenant_scope_key of the source + # they came from; the curation-merge sentinel lineage inherits + # the target entity's. Fall back to empty string when the + # entity has no lineage members yet (defensive only — + # ``upsert_entity_with_lineage`` is always called with a real + # lineage member alongside a real record). + for member in entity.source_lineage: + return member.tenant_scope_key + return "" + + +async def _to_thread(func, *args, **kwargs): + """Local wrapper around ``asyncio.to_thread`` so the import stays + inside the function body when this module is imported eagerly.""" + import asyncio + + return await asyncio.to_thread(func, *args, **kwargs) + + +__all__ = [ + "LineageEntityMerger", + "LineageMergeResult", + "AliasCycleError", + "CURATION_MERGE_DOCUMENT_ID", + "GRAPH_ENTITY_INDEXER", +] diff --git a/aperag/indexing/alias_redirect_store.py b/aperag/indexing/alias_redirect_store.py new file mode 100644 index 000000000..65880c031 --- /dev/null +++ b/aperag/indexing/alias_redirect_store.py @@ -0,0 +1,187 @@ +# Copyright 2026 ApeCloud, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Lineage graph store decorator that applies user-driven alias redirect. + +Wave 7 §K.12.4 invariant #3: when the indexer writes +``upsert_entity_with_lineage(record=EntityRecord(name="A"))`` and the +user previously merged ``A → C``, the write should land on ``C`` +transparently — the indexer hot path is not aware of curation merges. + +Implementation strategy (architect ratify msg=cf860ae4 + huangheng +endorse msg=22816e0d / msg=93d9add1, **Option (b)**): write a thin +decorator class that wraps any concrete +:class:`aperag.indexing.graph.LineageGraphStore` plus an +:class:`aperag.graph_curation.alias_map.AliasMapRepository`, intercepts +the write methods to rewrite entity names through the alias map, and +forwards every other Protocol method unchanged. This keeps the three +backend implementations (Postgres / Neo4j / Nebula) untouched — +critical for landing task #6 in one PR without rippling into Bryce's +storage territory. + +Decorator passthrough invariant (huangheng CR lock, +``test_decorator_passthrough_for_non_upsert_methods``): every method +declared on :class:`LineageGraphStore` that is NOT an ``upsert_*`` +write must forward to ``_inner`` byte-for-byte — no silent behaviour +change. Tests pin this so a future Protocol method addition can't slip +past without an explicit decorator update. +""" + +from __future__ import annotations + +import logging +from dataclasses import replace +from typing import TYPE_CHECKING + +from aperag.indexing.graph import ( + EntityRecord, + EntityWithLineage, + LineageMember, + RelationRecord, + RelationWithLineage, +) + +if TYPE_CHECKING: # pragma: no cover - typing only + from aperag.graph_curation.alias_map import AliasMapRepository + from aperag.indexing.graph import LineageGraphStore + +logger = logging.getLogger(__name__) + + +class LineageGraphStoreWithAliasRedirect: + """Wrap a :class:`LineageGraphStore` so writes go through the alias + map. + + Constructor takes the inner store, the alias-map repository, and + the ``collection_id`` the inner store is bound to. The decorator is + per-collection just like the inner store — both share the same + binding. + """ + + def __init__( + self, + *, + inner: "LineageGraphStore", + alias_repo: "AliasMapRepository", + collection_id: str, + ) -> None: + self._inner = inner + self._alias_repo = alias_repo + self._collection_id = collection_id + + # ------------------------------------------------------------------ + # Intercepted write paths — apply alias redirect + # ------------------------------------------------------------------ + + async def upsert_entity_with_lineage( + self, + *, + record: EntityRecord, + lineage: LineageMember, + compacted_description: str | None = None, + ) -> None: + canonical = await self._alias_repo.resolve_canonical(collection_id=self._collection_id, name=record.name) + redirected = record if canonical == record.name else replace(record, name=canonical) + if canonical != record.name: + logger.debug( + "alias_redirect: entity write %r → %r (collection=%s)", + record.name, + canonical, + self._collection_id, + ) + await self._inner.upsert_entity_with_lineage( + record=redirected, + lineage=lineage, + compacted_description=compacted_description, + ) + + async def upsert_relation_with_lineage( + self, + *, + record: RelationRecord, + lineage: LineageMember, + compacted_description: str | None = None, + ) -> None: + # Both endpoints of a relation may have been merged; resolve + # both. ``relation_type`` is unaffected by entity merges. + new_source = await self._alias_repo.resolve_canonical(collection_id=self._collection_id, name=record.source) + new_target = await self._alias_repo.resolve_canonical(collection_id=self._collection_id, name=record.target) + if new_source != record.source or new_target != record.target: + logger.debug( + "alias_redirect: relation write (%r→%r) → (%r→%r) (collection=%s)", + record.source, + record.target, + new_source, + new_target, + self._collection_id, + ) + redirected = replace(record, source=new_source, target=new_target) + else: + redirected = record + await self._inner.upsert_relation_with_lineage( + record=redirected, + lineage=lineage, + compacted_description=compacted_description, + ) + + # ------------------------------------------------------------------ + # Passthrough — forward every non-write Protocol method unchanged. + # Pinned by ``test_decorator_passthrough_for_non_upsert_methods``. + # ------------------------------------------------------------------ + + async def find_entity_ids_with_lineage(self, *, document_id: str) -> list[str]: + return await self._inner.find_entity_ids_with_lineage(document_id=document_id) + + async def find_relation_keys_with_lineage(self, *, document_id: str) -> list[tuple[str, str, str]]: + return await self._inner.find_relation_keys_with_lineage(document_id=document_id) + + async def remove_entity_lineage_member(self, *, entity_name: str, document_id: str) -> None: + await self._inner.remove_entity_lineage_member(entity_name=entity_name, document_id=document_id) + + async def remove_relation_lineage_member(self, *, source: str, target: str, type: str, document_id: str) -> None: + await self._inner.remove_relation_lineage_member( + source=source, target=target, type=type, document_id=document_id + ) + + async def gc_entity_if_orphan(self, entity_name: str) -> bool: + return await self._inner.gc_entity_if_orphan(entity_name) + + async def gc_relation_if_orphan(self, source: str, target: str, type: str) -> bool: + return await self._inner.gc_relation_if_orphan(source, target, type) + + async def delete_entity(self, entity_name: str) -> bool: + return await self._inner.delete_entity(entity_name) + + async def delete_relation(self, source: str, target: str, type: str) -> bool: + return await self._inner.delete_relation(source, target, type) + + async def get_entity(self, entity_name: str) -> EntityWithLineage | None: + return await self._inner.get_entity(entity_name) + + async def get_relation(self, source: str, target: str, type: str) -> RelationWithLineage | None: + return await self._inner.get_relation(source, target, type) + + async def query_entities_by_keyword(self, *, query: str, top_k: int) -> list[EntityWithLineage]: + return await self._inner.query_entities_by_keyword(query=query, top_k=top_k) + + async def expand_neighbors_n_hops( + self, *, entity_names: list[str], hops: int = 1 + ) -> tuple[list[EntityWithLineage], list[RelationWithLineage]]: + return await self._inner.expand_neighbors_n_hops(entity_names=entity_names, hops=hops) + + async def list_entity_labels(self) -> list[str]: + return await self._inner.list_entity_labels() + + +__all__ = ["LineageGraphStoreWithAliasRedirect"] diff --git a/aperag/migration/versions/20260428030000-b5d2e8f1c9a4_alias_map_table.py b/aperag/migration/versions/20260428030000-b5d2e8f1c9a4_alias_map_table.py new file mode 100644 index 000000000..477cde316 --- /dev/null +++ b/aperag/migration/versions/20260428030000-b5d2e8f1c9a4_alias_map_table.py @@ -0,0 +1,76 @@ +"""add ``aperag_lineage_entity_alias`` table for user-driven entity merge + +Wave 7 W7-6 (spec §K.12.7 / §K.12.10b): persists user-driven entity +merge intent so that subsequent indexer ``upsert_*_with_lineage`` calls +can transparently redirect a written ``record.name`` to the canonical +target when an alias has been recorded. Distinct from +``GraphCurationSuggestion`` (which records *suggested* merges pending +review); this table records *applied* merges. + +Per spec §K.12.10b: + +* ``(collection_id, alias_name)`` is the primary key — one alias maps + to exactly one canonical at any time. Re-merging an alias to a new + canonical UPDATEs the row in-place. +* ``canonical_name`` always points at the final (flattened) target — + cycle detection in the service layer enforces transitive flattening, + so a chain ``A → B → C`` is rewritten to ``A → C`` and ``B → C`` in + one transaction. +* The table SURVIVES canonical entity GC (spec §K.12.7 decision X): + if the canonical entity is later deleted, the alias rows stay so a + future re-indexer write to the alias name still resolves correctly + to the (now-empty) canonical, preserving user intent. + +Pre-launch: no production users on the new lineage path → land +without backfill (per earayu2 hard-cut acceptance). + +Revision ID: b5d2e8f1c9a4 +Revises: a3b7c4d8e2f1 +Create Date: 2026-04-28 03:00:00.000000 +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +revision: str = "b5d2e8f1c9a4" +down_revision: Union[str, None] = "a3b7c4d8e2f1" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "aperag_lineage_entity_alias", + sa.Column("collection_id", sa.String(length=64), nullable=False), + sa.Column("alias_name", sa.String(length=512), nullable=False), + sa.Column("canonical_name", sa.String(length=512), nullable=False), + sa.Column("merged_by", sa.String(length=256), nullable=True), + sa.Column( + "gmt_created", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("CURRENT_TIMESTAMP"), + ), + sa.Column( + "gmt_updated", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("CURRENT_TIMESTAMP"), + ), + sa.PrimaryKeyConstraint("collection_id", "alias_name", name="pk_aperag_lineage_entity_alias"), + ) + op.create_index( + "ix_aperag_lineage_entity_alias_canonical", + "aperag_lineage_entity_alias", + ["collection_id", "canonical_name"], + ) + + +def downgrade() -> None: + op.drop_index( + "ix_aperag_lineage_entity_alias_canonical", + table_name="aperag_lineage_entity_alias", + ) + op.drop_table("aperag_lineage_entity_alias") diff --git a/tests/unit_test/graph_curation/test_alias_map.py b/tests/unit_test/graph_curation/test_alias_map.py new file mode 100644 index 000000000..d629b3ca9 --- /dev/null +++ b/tests/unit_test/graph_curation/test_alias_map.py @@ -0,0 +1,197 @@ +# Copyright 2026 ApeCloud, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for ``aperag.graph_curation.alias_map`` — Wave 7 task #6. + +The repository talks to PostgreSQL in production; these tests exercise +the cycle-reject + transitive-flatten algorithm against an in-memory +SQLite engine so the cases that lock §K.12.10b can run without a +running Postgres. Cross-backend behaviour is covered separately by the +repository integration tests. + +Pinned cases (per spec §K.12.10b + huangheng CR plan msg=22816e0d): + +* Insert + read: A → B writes a row, ``resolve_canonical("A") == "B"``. +* Transitive flatten: B → C after A → B rewrites the existing + (A, B) row to (A, C) — readers never traverse a chain. +* Cycle reject: C → A when ``resolve_canonical("A") == "C"`` raises + :class:`AliasCycleError` instead of writing a self-loop. +* Persists across canonical GC (orphan canonical row): rows survive + even if the canonical entity no longer exists in the lineage table. +* Per-collection isolation: rows in collection X never leak into + collection Y. +""" + +from __future__ import annotations + +import pytest +import pytest_asyncio +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine +from sqlalchemy.orm import sessionmaker + +from aperag.domains.knowledge_graph.db.models import LineageEntityAlias # noqa: F401 (registers metadata) +from aperag.graph_curation.alias_map import AliasCycleError, AliasMapRepository + + +@pytest_asyncio.fixture +async def session() -> AsyncSession: + engine = create_async_engine("sqlite+aiosqlite:///:memory:") + async with engine.begin() as conn: + await conn.run_sync(lambda sync_conn: LineageEntityAlias.__table__.create(sync_conn)) + factory = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async with factory() as s: + yield s + await engine.dispose() + + +# --------------------------------------------------------------------- +# read path +# --------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_resolve_canonical_returns_input_when_no_alias_row(session): + repo = AliasMapRepository(session=session) + assert await repo.resolve_canonical(collection_id="c1", name="Apple") == "Apple" + + +@pytest.mark.asyncio +async def test_resolve_canonical_after_simple_upsert(session): + repo = AliasMapRepository(session=session) + await repo.upsert_alias(collection_id="c1", alias_name="Apple", target="Apple Inc.") + assert await repo.resolve_canonical(collection_id="c1", name="Apple") == "Apple Inc." + assert await repo.resolve_canonical(collection_id="c1", name="Apple Inc.") == "Apple Inc." + + +# --------------------------------------------------------------------- +# transitive flatten +# --------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_transitive_flatten_rewrites_existing_alias_rows(session): + """§K.12.10b: ``A → B`` then ``B → C`` rewrites the (A, B) row to + (A, C) so readers never need to walk a chain.""" + repo = AliasMapRepository(session=session) + await repo.upsert_alias(collection_id="c1", alias_name="A", target="B") + assert await repo.resolve_canonical(collection_id="c1", name="A") == "B" + + await repo.upsert_alias(collection_id="c1", alias_name="B", target="C") + # A still resolves to the (now-flattened) terminal canonical: + assert await repo.resolve_canonical(collection_id="c1", name="A") == "C" + assert await repo.resolve_canonical(collection_id="c1", name="B") == "C" + + +@pytest.mark.asyncio +async def test_target_is_itself_an_alias_resolves_through(session): + """If the caller supplies ``target=B`` while ``B → C`` already + exists, the new row points at ``C`` (1-hop guarantee).""" + repo = AliasMapRepository(session=session) + await repo.upsert_alias(collection_id="c1", alias_name="B", target="C") + + canonical = await repo.upsert_alias(collection_id="c1", alias_name="A", target="B") + assert canonical == "C" + assert await repo.resolve_canonical(collection_id="c1", name="A") == "C" + + +# --------------------------------------------------------------------- +# cycle reject +# --------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_cycle_reject_self_loop(session): + """``A → A`` (or anything that resolves to A when alias is A) + raises rather than writing a self-loop.""" + repo = AliasMapRepository(session=session) + with pytest.raises(AliasCycleError): + await repo.upsert_alias(collection_id="c1", alias_name="A", target="A") + assert await repo.resolve_canonical(collection_id="c1", name="A") == "A" + + +@pytest.mark.asyncio +async def test_cycle_reject_through_existing_chain(session): + """``A → B`` then ``B → C`` then ``C → A`` is rejected — the third + upsert resolves target ``A`` to the terminal canonical ``C`` (via + transitive flatten); when that equals the new alias name (``C``) + we raise instead of writing a self-loop.""" + repo = AliasMapRepository(session=session) + await repo.upsert_alias(collection_id="c1", alias_name="A", target="B") + await repo.upsert_alias(collection_id="c1", alias_name="B", target="C") + + with pytest.raises(AliasCycleError): + await repo.upsert_alias(collection_id="c1", alias_name="C", target="A") + + # No rows mutated by the failed upsert. + assert await repo.resolve_canonical(collection_id="c1", name="C") == "C" + assert await repo.resolve_canonical(collection_id="c1", name="A") == "C" + assert await repo.resolve_canonical(collection_id="c1", name="B") == "C" + + +# --------------------------------------------------------------------- +# orphan persistence (§K.12.7) +# --------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_alias_persists_after_canonical_gc(session): + """Per spec §K.12.7 decision X: if the canonical entity is later + deleted from the lineage table, the alias rows MUST stay so a + future indexer write to the alias name still resolves correctly.""" + repo = AliasMapRepository(session=session) + await repo.upsert_alias(collection_id="c1", alias_name="A", target="C") + # No coupling with the lineage table — alias persistence is a pure + # property of the alias-map repo. Resolving still works whether or + # not C currently has a lineage row. + assert await repo.resolve_canonical(collection_id="c1", name="A") == "C" + + +# --------------------------------------------------------------------- +# per-collection isolation +# --------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_aliases_isolated_per_collection(session): + repo = AliasMapRepository(session=session) + await repo.upsert_alias(collection_id="c1", alias_name="A", target="B") + await repo.upsert_alias(collection_id="c2", alias_name="A", target="Z") + + assert await repo.resolve_canonical(collection_id="c1", name="A") == "B" + assert await repo.resolve_canonical(collection_id="c2", name="A") == "Z" + + +@pytest.mark.asyncio +async def test_purge_collection_only_drops_its_own_rows(session): + repo = AliasMapRepository(session=session) + await repo.upsert_alias(collection_id="c1", alias_name="A", target="B") + await repo.upsert_alias(collection_id="c2", alias_name="A", target="Z") + + deleted = await repo.purge_collection("c1") + assert deleted == 1 + + assert await repo.resolve_canonical(collection_id="c1", name="A") == "A" + assert await repo.resolve_canonical(collection_id="c2", name="A") == "Z" + + +@pytest.mark.asyncio +async def test_list_aliases_pointing_at_returns_alphabetical(session): + repo = AliasMapRepository(session=session) + await repo.upsert_alias(collection_id="c1", alias_name="Zeta", target="C") + await repo.upsert_alias(collection_id="c1", alias_name="Alpha", target="C") + await repo.upsert_alias(collection_id="c1", alias_name="Mu", target="C") + await repo.upsert_alias(collection_id="c1", alias_name="Other", target="D") + + pointing_at_C = await repo.list_aliases_pointing_at(collection_id="c1", canonical_name="C") + assert pointing_at_C == ["Alpha", "Mu", "Zeta"] diff --git a/tests/unit_test/graph_curation/test_lineage_merge.py b/tests/unit_test/graph_curation/test_lineage_merge.py new file mode 100644 index 000000000..dcde1317e --- /dev/null +++ b/tests/unit_test/graph_curation/test_lineage_merge.py @@ -0,0 +1,404 @@ +# Copyright 2026 ApeCloud, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for ``aperag.graph_curation.lineage_merge.LineageEntityMerger`` +— Wave 7 §K.12.6 task #6. + +Pinned cases (per architect outline msg=cf860ae4 + huangheng CR plan +msg=22816e0d): + +* Step ordering ``L1 → vector → delete`` (invariant #2). +* Sentinel ``__curation_merge__`` document_id on the unified+compacted + lineage member (drift #2 lock). +* Compactor invocation passes the locked kwargs (``subject_kind``, + ``subject_label``, ``language``). +* Source parts are re-anchored under the target name preserving + per-doc lineage (invariant #1 L1 not polluted). +* Sources are deleted from L1 + vector last. +* Vector point id is the deterministic + ``uuid5(NAMESPACE_DNS, "graph_entity::")`` with the + 3-field payload (invariants #5 + #6). +* Empty source list short-circuits. +* Cycle reject from the alias repo propagates ``AliasCycleError``. +* Target ``GC`` between merge initiation and execution is logged but + not crashing. +""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock +from uuid import NAMESPACE_DNS, uuid5 + +import pytest + +from aperag.graph_curation.alias_map import AliasCycleError +from aperag.graph_curation.lineage_merge import ( + CURATION_MERGE_DOCUMENT_ID, + GRAPH_ENTITY_INDEXER, + LineageEntityMerger, +) +from aperag.indexing.graph import ( + DescriptionPart, + EntityWithLineage, + LineageMember, +) + +# --------------------------------------------------------------------- +# Test doubles +# --------------------------------------------------------------------- + + +def _entity( + name: str, + *, + entity_type: str = "organization", + parts: list[tuple[str, str, str, tuple[str, ...]]] | None = None, +) -> EntityWithLineage: + """``parts`` is a list of ``(document_id, parse_version, text, chunk_ids)``.""" + parts = parts or [("doc1", "v1", f"description of {name}", ("c0",))] + return EntityWithLineage( + name=name, + entity_type=entity_type, + source_lineage=tuple( + LineageMember( + document_id=d, + parse_version=v, + tenant_scope_key="tenant-1", + chunk_ids=tuple(cids), + ) + for d, v, _t, cids in parts + ), + description_parts=tuple(DescriptionPart(document_id=d, parse_version=v, text=t) for d, v, t, _cids in parts), + ) + + +def _make_merger( + *, + target: EntityWithLineage, + sources: dict[str, EntityWithLineage] | None = None, + alias_resolutions: dict[str, str] | None = None, + llm_response: str = "unified description", + compacted: str | None = "compacted", +): + sources = sources or {} + alias_resolutions = alias_resolutions or {} + + store = AsyncMock() + store.get_entity = AsyncMock(side_effect=lambda name: sources.get(name) if name != target.name else target) + store.upsert_entity_with_lineage = AsyncMock(return_value=None) + store.delete_entity = AsyncMock(return_value=True) + + alias_repo = AsyncMock() + alias_repo.resolve_canonical = AsyncMock( + side_effect=lambda *, collection_id, name: alias_resolutions.get(name, name) + ) + alias_repo.upsert_alias = AsyncMock(return_value=target.name) + + compactor = MagicMock() + compactor.compact_if_oversized = AsyncMock(return_value=compacted) + + embedder = MagicMock() + embedder.embed_query = MagicMock(return_value=[1.0, 0.0, 0.0]) + + vector_connector = MagicMock() + vector_connector.upsert = MagicMock(return_value=["id"]) + vector_connector.delete = MagicMock(return_value=None) + + async def _llm(_prompt: str) -> str: + return llm_response + + merger = LineageEntityMerger( + store=store, + alias_repo=alias_repo, + compactor=compactor, + vector_connector=vector_connector, + embedder=embedder, + llm=_llm, + collection_id="col-1", + language="English", + ) + return merger, store, alias_repo, compactor, embedder, vector_connector + + +# --------------------------------------------------------------------- +# Empty source — short circuit +# --------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_empty_source_list_short_circuits(): + target = _entity("Target") + merger, store, alias_repo, *_ = _make_merger(target=target) + + result = await merger.merge_entities(target_name="Target", source_names=[], merged_by="user1") + assert result.final_target == "Target" + assert result.merged_source_ids == [] + alias_repo.upsert_alias.assert_not_called() + store.upsert_entity_with_lineage.assert_not_called() + store.delete_entity.assert_not_called() + + +# --------------------------------------------------------------------- +# Step ordering: L1 → vector → delete +# --------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_step_order_is_l1_then_vector_then_delete(): + """Invariant #2: L1 source-of-truth writes precede vector (derived) + writes; deletes run last.""" + target = _entity("Apple Inc.") + src = _entity("Apple") + merger, store, _, _, _, vector_connector = _make_merger(target=target, sources={"Apple": src}) + + call_order: list[str] = [] + + async def _track_l1(*args, **kwargs): + call_order.append("L1_upsert") + + async def _track_delete(*args, **kwargs): + call_order.append("L1_delete") + return True + + def _track_vec_upsert(*args, **kwargs): + call_order.append("vector_upsert") + return ["id"] + + def _track_vec_delete(*args, **kwargs): + call_order.append("vector_delete") + + store.upsert_entity_with_lineage.side_effect = _track_l1 + store.delete_entity.side_effect = _track_delete + vector_connector.upsert.side_effect = _track_vec_upsert + vector_connector.delete.side_effect = _track_vec_delete + + await merger.merge_entities(target_name="Apple Inc.", source_names=["Apple"], merged_by="u") + + # All L1 upserts come first, then vector upsert, then delete. + last_l1_upsert_idx = max(i for i, x in enumerate(call_order) if x == "L1_upsert") + vec_upsert_idx = call_order.index("vector_upsert") + l1_delete_idx = call_order.index("L1_delete") + vec_delete_idx = call_order.index("vector_delete") + assert last_l1_upsert_idx < vec_upsert_idx < l1_delete_idx + assert l1_delete_idx < vec_delete_idx + + +# --------------------------------------------------------------------- +# Sentinel + Compactor kwargs +# --------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_unified_write_uses_curation_merge_sentinel(): + target = _entity("Apple Inc.") + src = _entity("Apple") + merger, store, *_ = _make_merger(target=target, sources={"Apple": src}) + + await merger.merge_entities(target_name="Apple Inc.", source_names=["Apple"], merged_by="u") + + # The LAST upsert call carries the sentinel + the compacted text. + upsert_calls = store.upsert_entity_with_lineage.call_args_list + final_kwargs = upsert_calls[-1].kwargs + assert final_kwargs["lineage"].document_id == CURATION_MERGE_DOCUMENT_ID + assert final_kwargs["compacted_description"] == "compacted" + assert final_kwargs["record"].name == "Apple Inc." + + +@pytest.mark.asyncio +async def test_compactor_invoked_with_locked_kwargs(): + target = _entity("Apple Inc.") + src = _entity("Apple") + merger, _, _, compactor, *_ = _make_merger(target=target, sources={"Apple": src}) + + await merger.merge_entities(target_name="Apple Inc.", source_names=["Apple"], merged_by="u") + compactor.compact_if_oversized.assert_awaited_once() + call = compactor.compact_if_oversized.call_args + assert call.kwargs == { + "subject_kind": "entity", + "subject_label": "Apple Inc.", + "language": "English", + } + + +# --------------------------------------------------------------------- +# Source parts re-anchored preserving per-doc lineage +# --------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_source_parts_reanchored_preserving_doc_lineage(): + """Invariant #1: per-doc tracking must survive the merge — each + source part is re-upserted under the target name with the original + ``(document_id, parse_version, chunk_ids)`` lineage.""" + target = _entity( + "Apple Inc.", + parts=[("docT", "v1", "target initial", ("ct",))], + ) + src = _entity( + "Apple", + parts=[ + ("docA", "v1", "fragment A", ("ca1",)), + ("docB", "v2", "fragment B", ("cb1", "cb2")), + ], + ) + merger, store, *_ = _make_merger(target=target, sources={"Apple": src}) + + await merger.merge_entities(target_name="Apple Inc.", source_names=["Apple"], merged_by="u") + + upsert_calls = store.upsert_entity_with_lineage.call_args_list + # The first 2 upserts are the source parts re-anchored under the + # target name (in order of the source's description_parts), then + # the final unified+compacted write. + assert len(upsert_calls) == 3 + first_call = upsert_calls[0].kwargs + assert first_call["record"].name == "Apple Inc." + assert first_call["record"].description == "fragment A" + assert first_call["lineage"].document_id == "docA" + assert first_call["lineage"].parse_version == "v1" + + second_call = upsert_calls[1].kwargs + assert second_call["record"].name == "Apple Inc." + assert second_call["record"].description == "fragment B" + assert second_call["lineage"].document_id == "docB" + assert second_call["lineage"].parse_version == "v2" + + # Final write is the unified+compacted with sentinel. + final = upsert_calls[2].kwargs + assert final["lineage"].document_id == CURATION_MERGE_DOCUMENT_ID + + +# --------------------------------------------------------------------- +# Vector payload + uuid5 pinning +# --------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_vector_payload_is_3_field_with_deterministic_uuid5(): + target = _entity("Apple Inc.") + src = _entity("Apple") + merger, _, _, _, _, vector_connector = _make_merger(target=target, sources={"Apple": src}) + + await merger.merge_entities(target_name="Apple Inc.", source_names=["Apple"], merged_by="u") + + # Upsert was called once with one VectorPoint. + vector_connector.upsert.assert_called_once() + points = vector_connector.upsert.call_args.args[0] + assert len(points) == 1 + point = points[0] + expected_id = str(uuid5(NAMESPACE_DNS, f"{GRAPH_ENTITY_INDEXER}:col-1:Apple Inc.")) + assert point.id == expected_id + # 3 fields exactly — no collection_id, no extras. + assert set(point.payload.keys()) == {"indexer", "entity_name", "entity_type"} + assert point.payload["indexer"] == GRAPH_ENTITY_INDEXER + assert point.payload["entity_name"] == "Apple Inc." + assert point.payload["entity_type"] == "organization" + + # Source vector point was deleted under the same uuid5 scheme. + vector_connector.delete.assert_called_once() + deleted_ids = vector_connector.delete.call_args.args[0] + expected_src_id = str(uuid5(NAMESPACE_DNS, f"{GRAPH_ENTITY_INDEXER}:col-1:Apple")) + assert deleted_ids == [expected_src_id] + + +# --------------------------------------------------------------------- +# Cycle reject propagation +# --------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_alias_cycle_propagates_through_merge(): + target = _entity("Target") + merger, _, alias_repo, *_ = _make_merger(target=target) + alias_repo.upsert_alias.side_effect = AliasCycleError("cycle") + + with pytest.raises(AliasCycleError): + await merger.merge_entities(target_name="Target", source_names=["Source"], merged_by="u") + + +# --------------------------------------------------------------------- +# Target flatten through alias chain +# --------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_target_resolves_through_alias_chain(): + """If ``target_name`` is itself already an alias, the merge body + operates on the flattened canonical.""" + final = _entity("Canonical") + src = _entity("Source") + merger, store, alias_repo, *_ = _make_merger( + target=final, + sources={"Source": src, "AliasOfTarget": final}, + alias_resolutions={"AliasOfTarget": "Canonical"}, + ) + + result = await merger.merge_entities(target_name="AliasOfTarget", source_names=["Source"], merged_by="u") + assert result.final_target == "Canonical" + # Sources were aliased to the canonical, not the original target. + alias_repo.upsert_alias.assert_called_once() + assert alias_repo.upsert_alias.call_args.kwargs["target"] == "Canonical" + + +# --------------------------------------------------------------------- +# Target GC tolerance +# --------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_target_gced_between_initiation_and_execution_does_not_crash(): + """If the target entity was GC'd between merge initiation and + execution, the merge logs a warning, writes the alias rows + anyway (so future indexer writes still resolve), and returns + cleanly.""" + src = _entity("Source") + + store = AsyncMock() + + async def _get(name: str): + if name == "Source": + return src + return None # target GC'd + + store.get_entity = AsyncMock(side_effect=_get) + store.upsert_entity_with_lineage = AsyncMock() + store.delete_entity = AsyncMock(return_value=True) + + alias_repo = AsyncMock() + alias_repo.resolve_canonical = AsyncMock(return_value="Target") + alias_repo.upsert_alias = AsyncMock(return_value="Target") + + compactor = MagicMock() + compactor.compact_if_oversized = AsyncMock(return_value=None) + embedder = MagicMock() + vector_connector = MagicMock() + + async def _llm(_p: str) -> str: + return "" + + merger = LineageEntityMerger( + store=store, + alias_repo=alias_repo, + compactor=compactor, + vector_connector=vector_connector, + embedder=embedder, + llm=_llm, + collection_id="col-1", + ) + result = await merger.merge_entities(target_name="Target", source_names=["Source"], merged_by="u") + assert result.final_target == "Target" + # Alias was still recorded. + alias_repo.upsert_alias.assert_called_once() + # No L1 upsert, no vector activity, no delete (nothing to consolidate). + store.upsert_entity_with_lineage.assert_not_called() + store.delete_entity.assert_not_called() diff --git a/tests/unit_test/indexing/test_alias_redirect_store.py b/tests/unit_test/indexing/test_alias_redirect_store.py new file mode 100644 index 000000000..6234d0ff5 --- /dev/null +++ b/tests/unit_test/indexing/test_alias_redirect_store.py @@ -0,0 +1,254 @@ +# Copyright 2026 ApeCloud, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for ``aperag.indexing.alias_redirect_store`` — Wave 7 §K.12.4 +invariant #3 (transparent alias redirect on the indexer hot path). + +Pinned cases: + +* Indexer write to an alias name lands on the canonical entity name + in the underlying lineage store. This is the core invariant — + ``test_indexer_upsert_after_merge_redirects_to_canonical``. +* Relation writes redirect *both* endpoints when either has been + merged. +* Decorator passthrough (huangheng CR lock, + ``test_decorator_passthrough_for_non_upsert_methods``): every + ``LineageGraphStore`` Protocol method that is NOT an + ``upsert_*`` write forwards to ``_inner`` byte-for-byte. Pin this + so a future Protocol method addition cannot slip past unnoticed. +""" + +from __future__ import annotations + +from unittest.mock import AsyncMock + +import pytest + +from aperag.indexing.alias_redirect_store import LineageGraphStoreWithAliasRedirect +from aperag.indexing.graph import ( + EntityRecord, + LineageMember, + RelationRecord, +) + + +class _FakeAliasRepo: + def __init__(self, mapping: dict[str, str] | None = None) -> None: + self._mapping = mapping or {} + + async def resolve_canonical(self, *, collection_id: str, name: str) -> str: + return self._mapping.get(name, name) + + +def _record(name: str = "Apple") -> EntityRecord: + return EntityRecord( + name=name, + entity_type="organization", + description="desc", + source_chunk_ids=("c0",), + ) + + +def _member() -> LineageMember: + return LineageMember( + document_id="doc1", + parse_version="v1", + tenant_scope_key="tenant-1", + chunk_ids=("c0",), + ) + + +# --------------------------------------------------------------------- +# Entity write redirect — the core invariant +# --------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_indexer_upsert_after_merge_redirects_to_canonical(): + """Wave 7 §K.12 invariant #3: indexer writes ``record.name="A"``; + user has merged ``A → C``; the underlying store sees the write on + ``C``, not ``A``.""" + inner = AsyncMock() + inner.upsert_entity_with_lineage = AsyncMock(return_value=None) + decorator = LineageGraphStoreWithAliasRedirect( + inner=inner, + alias_repo=_FakeAliasRepo({"A": "C"}), + collection_id="col-1", + ) + await decorator.upsert_entity_with_lineage( + record=_record("A"), + lineage=_member(), + compacted_description="cd", + ) + # Inner saw the write under the canonical name. + inner.upsert_entity_with_lineage.assert_awaited_once() + call_kwargs = inner.upsert_entity_with_lineage.call_args.kwargs + assert call_kwargs["record"].name == "C" + # Other fields untouched. + assert call_kwargs["record"].entity_type == "organization" + assert call_kwargs["compacted_description"] == "cd" + + +@pytest.mark.asyncio +async def test_indexer_upsert_no_alias_pass_through(): + inner = AsyncMock() + inner.upsert_entity_with_lineage = AsyncMock(return_value=None) + decorator = LineageGraphStoreWithAliasRedirect( + inner=inner, + alias_repo=_FakeAliasRepo(), # empty + collection_id="col-1", + ) + await decorator.upsert_entity_with_lineage( + record=_record("Untouched"), + lineage=_member(), + ) + call_kwargs = inner.upsert_entity_with_lineage.call_args.kwargs + assert call_kwargs["record"].name == "Untouched" + + +# --------------------------------------------------------------------- +# Relation write redirect — both endpoints +# --------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_relation_write_redirects_both_endpoints(): + inner = AsyncMock() + inner.upsert_relation_with_lineage = AsyncMock(return_value=None) + decorator = LineageGraphStoreWithAliasRedirect( + inner=inner, + alias_repo=_FakeAliasRepo({"A": "C", "B": "D"}), + collection_id="col-1", + ) + await decorator.upsert_relation_with_lineage( + record=RelationRecord( + source="A", + target="B", + relation_type="founded", + description="desc", + source_chunk_ids=("c0",), + ), + lineage=_member(), + ) + call_kwargs = inner.upsert_relation_with_lineage.call_args.kwargs + assert call_kwargs["record"].source == "C" + assert call_kwargs["record"].target == "D" + assert call_kwargs["record"].relation_type == "founded" + + +@pytest.mark.asyncio +async def test_relation_write_redirects_only_one_endpoint(): + inner = AsyncMock() + inner.upsert_relation_with_lineage = AsyncMock(return_value=None) + decorator = LineageGraphStoreWithAliasRedirect( + inner=inner, + alias_repo=_FakeAliasRepo({"A": "C"}), # only A is aliased + collection_id="col-1", + ) + await decorator.upsert_relation_with_lineage( + record=RelationRecord( + source="A", + target="B", + relation_type="founded", + description="desc", + source_chunk_ids=("c0",), + ), + lineage=_member(), + ) + call_kwargs = inner.upsert_relation_with_lineage.call_args.kwargs + assert call_kwargs["record"].source == "C" + assert call_kwargs["record"].target == "B" + + +# --------------------------------------------------------------------- +# Decorator passthrough invariant (huangheng CR lock) +# --------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_decorator_passthrough_for_non_upsert_methods(): + """Every ``LineageGraphStore`` Protocol method that is NOT an + ``upsert_*`` write must forward to ``_inner`` byte-for-byte. Spelled + out one method at a time so a future Protocol addition can't slip + past without an explicit decorator update.""" + inner = AsyncMock() + decorator = LineageGraphStoreWithAliasRedirect( + inner=inner, + alias_repo=_FakeAliasRepo(), + collection_id="col-1", + ) + + # find_entity_ids_with_lineage + inner.find_entity_ids_with_lineage = AsyncMock(return_value=["a", "b"]) + out = await decorator.find_entity_ids_with_lineage(document_id="doc1") + assert out == ["a", "b"] + inner.find_entity_ids_with_lineage.assert_awaited_once_with(document_id="doc1") + + # find_relation_keys_with_lineage + inner.find_relation_keys_with_lineage = AsyncMock(return_value=[("a", "b", "x")]) + out = await decorator.find_relation_keys_with_lineage(document_id="doc1") + assert out == [("a", "b", "x")] + inner.find_relation_keys_with_lineage.assert_awaited_once_with(document_id="doc1") + + # remove_entity_lineage_member + inner.remove_entity_lineage_member = AsyncMock(return_value=None) + await decorator.remove_entity_lineage_member(entity_name="x", document_id="d") + inner.remove_entity_lineage_member.assert_awaited_once_with(entity_name="x", document_id="d") + + # remove_relation_lineage_member + inner.remove_relation_lineage_member = AsyncMock(return_value=None) + await decorator.remove_relation_lineage_member(source="a", target="b", type="t", document_id="d") + inner.remove_relation_lineage_member.assert_awaited_once_with(source="a", target="b", type="t", document_id="d") + + # gc_entity_if_orphan / gc_relation_if_orphan + inner.gc_entity_if_orphan = AsyncMock(return_value=True) + assert await decorator.gc_entity_if_orphan("x") is True + inner.gc_entity_if_orphan.assert_awaited_once_with("x") + + inner.gc_relation_if_orphan = AsyncMock(return_value=False) + assert await decorator.gc_relation_if_orphan("a", "b", "t") is False + inner.gc_relation_if_orphan.assert_awaited_once_with("a", "b", "t") + + # delete_entity / delete_relation + inner.delete_entity = AsyncMock(return_value=True) + assert await decorator.delete_entity("x") is True + inner.delete_entity.assert_awaited_once_with("x") + + inner.delete_relation = AsyncMock(return_value=True) + assert await decorator.delete_relation("a", "b", "t") is True + inner.delete_relation.assert_awaited_once_with("a", "b", "t") + + # get_entity / get_relation + inner.get_entity = AsyncMock(return_value=None) + assert await decorator.get_entity("x") is None + inner.get_entity.assert_awaited_once_with("x") + + inner.get_relation = AsyncMock(return_value=None) + assert await decorator.get_relation("a", "b", "t") is None + inner.get_relation.assert_awaited_once_with("a", "b", "t") + + # query_entities_by_keyword + inner.query_entities_by_keyword = AsyncMock(return_value=[]) + assert await decorator.query_entities_by_keyword(query="q", top_k=5) == [] + inner.query_entities_by_keyword.assert_awaited_once_with(query="q", top_k=5) + + # expand_neighbors_n_hops + inner.expand_neighbors_n_hops = AsyncMock(return_value=([], [])) + assert await decorator.expand_neighbors_n_hops(entity_names=["x"], hops=2) == ([], []) + inner.expand_neighbors_n_hops.assert_awaited_once_with(entity_names=["x"], hops=2) + + # list_entity_labels + inner.list_entity_labels = AsyncMock(return_value=["a", "b"]) + assert await decorator.list_entity_labels() == ["a", "b"] + inner.list_entity_labels.assert_awaited_once_with()