diff --git a/aperag/indexing/graph.py b/aperag/indexing/graph.py index 0eff7e99b..796704d96 100644 --- a/aperag/indexing/graph.py +++ b/aperag/indexing/graph.py @@ -267,12 +267,25 @@ class EntityWithLineage: storage representation (Nebula JSON STRING, Neo4j list-of-MAP, in-memory dict) and the algorithm in :class:`GraphModalityWorker` only ever sees this canonical form. + + Wave 7 (W7-1) adds ``compacted_description``: a derived cache + column written by ``GraphIndexCompactor`` (Wave 7 W7-2) when the + aggregate ``description_parts`` text grows past + ``max_description_chars`` / ``summarize_at_fragments`` thresholds. + The compactor calls an LLM to summarise the union of the parts and + persists the result here so the vector embedding step (Wave 7 W7-3) + has a bounded, coherent text source. The field is **derived**: + ``description_parts`` remains the per-doc source of truth, and the + compacted value can be reproduced by re-running the compactor over + the parts at any time. NULL means "not yet compacted" — readers + should fall back to joining ``description_parts.text``. """ name: str entity_type: str source_lineage: tuple[LineageMember, ...] description_parts: tuple[DescriptionPart, ...] + compacted_description: str | None = None @dataclass @@ -282,6 +295,7 @@ class RelationWithLineage: relation_type: str evidence_lineage: tuple[LineageMember, ...] description_parts: tuple[DescriptionPart, ...] + compacted_description: str | None = None # --------------------------------------------------------------------- @@ -494,24 +508,63 @@ async def gc_entity_if_orphan(self, entity_name: str) -> bool: async def gc_relation_if_orphan(self, source: str, target: str, type: str) -> bool: """Same as :meth:`gc_entity_if_orphan` for a relation edge.""" + async def delete_entity(self, entity_name: str) -> bool: + """Wave 7 W7-1: unconditionally delete the entity row. + + Used by :class:`GraphCurationService.merge_entities` (W7-6) to + remove the source entities after a user-driven merge — we want + the row gone regardless of any remaining lineage members, + because the canonical entity has absorbed those parts. The + ``gc_*_if_orphan`` family is for indexer-side garbage + collection (only delete when lineage is empty); curation merge + needs an explicit unconditional delete that complements it. + + Returns ``True`` when a delete actually ran (the row existed), + ``False`` when the name was not present (idempotent retry). + """ + + async def delete_relation(self, source: str, target: str, type: str) -> bool: + """Wave 7 W7-1: unconditionally delete the relation row. + + Symmetric to :meth:`delete_entity` for a relation edge — + same use case (merge of relations under a curation flow). + """ + async def upsert_entity_with_lineage( self, *, record: EntityRecord, lineage: LineageMember, + compacted_description: str | None = None, ) -> None: """Add ``lineage`` to ``source_lineage`` and a corresponding ``DescriptionPart`` to ``description_parts``. Creates the entity if absent. Replaces an existing member with the same - ``(document_id, parse_version)`` key.""" + ``(document_id, parse_version)`` key. + + Wave 7 (W7-1) ``compacted_description`` semantics: ``None`` + (default) means "preserve any existing compacted_description on + the row" — Wave 4 indexer-side per-chunk upserts MUST NOT clear + a compacted value computed by ``GraphIndexCompactor`` (W7-2) on + a previous sync. A non-None string overwrites the column. + Backends implement this with a ``COALESCE``-style update so the + Wave 4 hot path stays a single SQL statement (per Wave 4 §C.3 + forward-only retry contract). + """ async def upsert_relation_with_lineage( self, *, record: RelationRecord, lineage: LineageMember, + compacted_description: str | None = None, ) -> None: - """Symmetric to :meth:`upsert_entity_with_lineage` for relations.""" + """Symmetric to :meth:`upsert_entity_with_lineage` for relations. + + Wave 7 (W7-1) ``compacted_description`` semantics match the + entity variant: ``None`` preserves the existing column value; + a non-None string overwrites. + """ async def get_entity(self, entity_name: str) -> EntityWithLineage | None: """Read-path helper used by tests / read primitives. Returns @@ -615,6 +668,7 @@ class _InMemoryEntityRow: entity_type: str description_parts: dict[tuple[str, str], DescriptionPart] = field(default_factory=dict) source_lineage: dict[tuple[str, str], LineageMember] = field(default_factory=dict) + compacted_description: str | None = None @dataclass @@ -624,6 +678,7 @@ class _InMemoryRelationRow: relation_type: str description_parts: dict[tuple[str, str], DescriptionPart] = field(default_factory=dict) evidence_lineage: dict[tuple[str, str], LineageMember] = field(default_factory=dict) + compacted_description: str | None = None class InMemoryLineageGraphStore: @@ -729,6 +784,23 @@ async def gc_relation_if_orphan(self, source: str, target: str, type: str) -> bo del self._relations[rel_key] return True + # ---- unconditional delete (Wave 7 W7-1) ------------------------- + + async def delete_entity(self, entity_name: str) -> bool: + async with self._guard: + if entity_name not in self._entities: + return False + del self._entities[entity_name] + return True + + async def delete_relation(self, source: str, target: str, type: str) -> bool: + rel_key = (source, target, type) + async with self._guard: + if rel_key not in self._relations: + return False + del self._relations[rel_key] + return True + # ---- upserts ---------------------------------------------------- async def upsert_entity_with_lineage( @@ -736,6 +808,7 @@ async def upsert_entity_with_lineage( *, record: EntityRecord, lineage: LineageMember, + compacted_description: str | None = None, ) -> None: async with self._guard: row = self._entities.get(record.name) @@ -752,12 +825,16 @@ async def upsert_entity_with_lineage( parse_version=lineage.parse_version, text=record.description, ) + # Wave 7 W7-1: ``None`` preserves; non-None overwrites. + if compacted_description is not None: + row.compacted_description = compacted_description async def upsert_relation_with_lineage( self, *, record: RelationRecord, lineage: LineageMember, + compacted_description: str | None = None, ) -> None: rel_key = record.relation_key() async with self._guard: @@ -775,6 +852,9 @@ async def upsert_relation_with_lineage( parse_version=lineage.parse_version, text=record.description, ) + # Wave 7 W7-1: ``None`` preserves; non-None overwrites. + if compacted_description is not None: + row.compacted_description = compacted_description # ---- read path -------------------------------------------------- @@ -788,6 +868,7 @@ async def get_entity(self, entity_name: str) -> EntityWithLineage | None: entity_type=row.entity_type, source_lineage=tuple(_sorted_lineage(row.source_lineage.values())), description_parts=tuple(_sorted_description_parts(row.description_parts.values())), + compacted_description=row.compacted_description, ) async def get_relation(self, source: str, target: str, type: str) -> RelationWithLineage | None: @@ -802,6 +883,7 @@ async def get_relation(self, source: str, target: str, type: str) -> RelationWit relation_type=row.relation_type, evidence_lineage=tuple(_sorted_lineage(row.evidence_lineage.values())), description_parts=tuple(_sorted_description_parts(row.description_parts.values())), + compacted_description=row.compacted_description, ) # ------------------------------------------------------------------ @@ -828,6 +910,7 @@ async def query_entities_by_keyword( entity_type=row.entity_type, source_lineage=tuple(_sorted_lineage(row.source_lineage.values())), description_parts=tuple(_sorted_description_parts(row.description_parts.values())), + compacted_description=row.compacted_description, ) ) # Deterministic ordering for stable test output. @@ -859,6 +942,7 @@ async def expand_neighbors_n_hops( entity_type=row.entity_type, source_lineage=tuple(_sorted_lineage(row.source_lineage.values())), description_parts=tuple(_sorted_description_parts(row.description_parts.values())), + compacted_description=row.compacted_description, ) current = set(frontier) @@ -874,6 +958,7 @@ async def expand_neighbors_n_hops( relation_type=rel_row.relation_type, evidence_lineage=tuple(_sorted_lineage(rel_row.evidence_lineage.values())), description_parts=tuple(_sorted_description_parts(rel_row.description_parts.values())), + compacted_description=rel_row.compacted_description, ) for neighbour in (src, tgt): if neighbour in seen_entities: @@ -886,6 +971,7 @@ async def expand_neighbors_n_hops( entity_type=row.entity_type, source_lineage=tuple(_sorted_lineage(row.source_lineage.values())), description_parts=tuple(_sorted_description_parts(row.description_parts.values())), + compacted_description=row.compacted_description, ) next_frontier.add(neighbour) if not next_frontier: diff --git a/aperag/indexing/graph_storage/nebula.py b/aperag/indexing/graph_storage/nebula.py index 3f7aee544..9eafea7b6 100644 --- a/aperag/indexing/graph_storage/nebula.py +++ b/aperag/indexing/graph_storage/nebula.py @@ -158,6 +158,28 @@ def _is_schema_visibility_error(exc: BaseException) -> bool: return any(fragment in msg for fragment in _SCHEMA_VISIBILITY_ERROR_FRAGMENTS) +# Wave 7 W7-1: Nebula has no ``IF NOT EXISTS`` for ``ALTER TAG ADD``; +# running ADD against a column that already exists raises an error +# whose text differs by version ("Column already exists" / "Duplicated +# property"). The fragments below cover both 3.x phrasings so the +# idempotent ALTER survives both fresh deploys (column added by CREATE +# TAG, ALTER raises duplicate) and upgrades (column missing, ALTER +# adds). +_DUPLICATE_PROPERTY_ERROR_FRAGMENTS = ( + "Existed", + "existed", + "already exist", + "Duplicated", + "duplicated", + "duplicate", +) + + +def _is_duplicate_property_error(exc: BaseException) -> bool: + msg = str(exc) + return any(fragment in msg for fragment in _DUPLICATE_PROPERTY_ERROR_FRAGMENTS) + + # --------------------------------------------------------------------- # Helpers — VID encoding, JSON SET manipulation, escaping. # --------------------------------------------------------------------- @@ -351,17 +373,33 @@ def _ensure_schema_sync(self) -> None: # Wave 6 #36: tag-prop renamed ``type`` → ``entity_type`` / # ``relation_type`` per architect Pattern 3 ruling. # Hard-cut per earayu2 msg=30c81478 (no production data). + # + # Wave 7 W7-1: ``compacted_description`` is the + # GraphIndexCompactor-derived unified description (W7-2). + # Nullable; new deployments get it via CREATE TAG, older + # deployments via ALTER TAG ADD below (idempotent). f"CREATE TAG IF NOT EXISTS `{_ENTITY_TAG}`(" f"name string, entity_type string, " f"source_lineage_json string, description_parts_json string, " + f"compacted_description string NULL, " f"gmt_created datetime, gmt_updated datetime)", f"CREATE TAG IF NOT EXISTS `{_RELATION_TAG}`(" f"source string, target string, relation_type string, " f"evidence_lineage_json string, description_parts_json string, " + f"compacted_description string NULL, " f"gmt_created datetime, gmt_updated datetime)", f"CREATE TAG INDEX IF NOT EXISTS `idx_{_ENTITY_TAG}_name` ON `{_ENTITY_TAG}`(name(256))", f"CREATE TAG INDEX IF NOT EXISTS `idx_{_RELATION_TAG}_source` ON `{_RELATION_TAG}`(source(256))", ] + # Wave 7 W7-1 backfill: ALTER TAG ADD for tags created on a + # pre-Wave-7 schema. Nebula has no ``IF NOT EXISTS`` for ALTER + # TAG ADD; running it on a tag that already has the column + # raises a "field already exists" / "duplicated property" + # error which we swallow (idempotent contract). + alter_stmts = [ + f"ALTER TAG `{_ENTITY_TAG}` ADD (compacted_description string NULL)", + f"ALTER TAG `{_RELATION_TAG}` ADD (compacted_description string NULL)", + ] last_error: RuntimeError | None = None for _ in range(_SCHEMA_VISIBILITY_RETRIES): try: @@ -370,6 +408,16 @@ def _ensure_schema_sync(self) -> None: continue for stmt in tag_stmts: self._execute(self._space, stmt) + # Wave 7 W7-1: idempotent ALTER TAG for pre-Wave-7 + # schemas. Swallow "duplicate property" errors so + # fresh tags (which already have the column from + # CREATE TAG above) don't trip the migration. + for stmt in alter_stmts: + try: + self._execute(self._space, stmt) + except RuntimeError as alter_exc: + if not _is_duplicate_property_error(alter_exc): + raise # Allow heartbeat to propagate the new tags before # any caller writes; otherwise the first INSERT # races schema visibility. @@ -391,15 +439,22 @@ async def ensure_schema(self) -> None: # -- read helpers (sync-blocking, called via to_thread) ----------- - def _read_entity_lineage(self, entity_name: str) -> tuple[str, list[LineageMember], list[DescriptionPart]] | None: - """Return ``(type, source_lineage, description_parts)`` for the - entity, or ``None`` if the vertex doesn't exist yet.""" + def _read_entity_lineage( + self, entity_name: str + ) -> tuple[str, list[LineageMember], list[DescriptionPart], str | None] | None: + """Return ``(type, source_lineage, description_parts, compacted_description)`` + for the entity, or ``None`` if the vertex doesn't exist yet. + + Wave 7 W7-1: ``compacted_description`` is ``None`` if not yet + computed (NULL column) or the empty string was never written. + """ vid = _entity_vid(entity_name) stmt = ( f'FETCH PROP ON `{_ENTITY_TAG}` "{_escape_str(vid)}" ' f"YIELD `{_ENTITY_TAG}`.entity_type AS entity_type, " f"`{_ENTITY_TAG}`.source_lineage_json AS sl, " - f"`{_ENTITY_TAG}`.description_parts_json AS dp" + f"`{_ENTITY_TAG}`.description_parts_json AS dp, " + f"`{_ENTITY_TAG}`.compacted_description AS cd" ) result = self._execute_with_schema_retry(self._space, stmt) if result.row_size() == 0: @@ -408,16 +463,18 @@ def _read_entity_lineage(self, entity_name: str) -> tuple[str, list[LineageMembe type_value = row[0].as_string() if row[0].is_string() else "" sl_raw = row[1].as_string() if row[1].is_string() else "" dp_raw = row[2].as_string() if row[2].is_string() else "" - return type_value, _members_from_json(sl_raw), _parts_from_json(dp_raw) + compacted = row[3].as_string() if row[3].is_string() else None + return type_value, _members_from_json(sl_raw), _parts_from_json(dp_raw), compacted def _read_relation_lineage( self, source: str, target: str, type: str - ) -> tuple[list[LineageMember], list[DescriptionPart]] | None: + ) -> tuple[list[LineageMember], list[DescriptionPart], str | None] | None: vid = _relation_vid(source, target, type) stmt = ( f'FETCH PROP ON `{_RELATION_TAG}` "{_escape_str(vid)}" ' f"YIELD `{_RELATION_TAG}`.evidence_lineage_json AS el, " - f"`{_RELATION_TAG}`.description_parts_json AS dp" + f"`{_RELATION_TAG}`.description_parts_json AS dp, " + f"`{_RELATION_TAG}`.compacted_description AS cd" ) result = self._execute_with_schema_retry(self._space, stmt) if result.row_size() == 0: @@ -425,7 +482,8 @@ def _read_relation_lineage( row = result.row_values(0) el_raw = row[0].as_string() if row[0].is_string() else "" dp_raw = row[1].as_string() if row[1].is_string() else "" - return _members_from_json(el_raw), _parts_from_json(dp_raw) + compacted = row[2].as_string() if row[2].is_string() else None + return _members_from_json(el_raw), _parts_from_json(dp_raw), compacted def _list_all_entity_vids(self) -> list[str]: """Return all VIDs tagged with ``lineage_entity`` in the @@ -478,15 +536,19 @@ def _write_entity_vertex( type_value: str, source_lineage: list[LineageMember], description_parts: list[DescriptionPart], + compacted_description: str | None, ) -> None: vid = _entity_vid(name) + compacted_literal = "NULL" if compacted_description is None else f'"{_escape_str(compacted_description)}"' stmt = ( f"INSERT VERTEX `{_ENTITY_TAG}`" - f"(name, entity_type, source_lineage_json, description_parts_json, gmt_created, gmt_updated) " + f"(name, entity_type, source_lineage_json, description_parts_json, " + f"compacted_description, gmt_created, gmt_updated) " f'VALUES "{_escape_str(vid)}":(' f'"{_escape_str(name)}", "{_escape_str(type_value)}", ' f'"{_escape_str(_members_to_json(source_lineage))}", ' f'"{_escape_str(_parts_to_json(description_parts))}", ' + f"{compacted_literal}, " f"datetime(), datetime())" ) self._execute_with_schema_retry(self._space, stmt) @@ -499,16 +561,20 @@ def _write_relation_vertex( type_value: str, evidence_lineage: list[LineageMember], description_parts: list[DescriptionPart], + compacted_description: str | None, ) -> None: vid = _relation_vid(source, target, type_value) + compacted_literal = "NULL" if compacted_description is None else f'"{_escape_str(compacted_description)}"' stmt = ( f"INSERT VERTEX `{_RELATION_TAG}`" f"(source, target, relation_type, " - f"evidence_lineage_json, description_parts_json, gmt_created, gmt_updated) " + f"evidence_lineage_json, description_parts_json, " + f"compacted_description, gmt_created, gmt_updated) " f'VALUES "{_escape_str(vid)}":(' f'"{_escape_str(source)}", "{_escape_str(target)}", "{_escape_str(type_value)}", ' f'"{_escape_str(_members_to_json(evidence_lineage))}", ' f'"{_escape_str(_parts_to_json(description_parts))}", ' + f"{compacted_literal}, " f"datetime(), datetime())" ) self._execute_with_schema_retry(self._space, stmt) @@ -534,7 +600,7 @@ def _scan() -> list[str]: row = self._read_entity_lineage_by_vid(vid) if row is None: continue - name, _type, members, _parts = row + name, _type, members, _parts, _compacted = row if any(m.document_id == document_id for m in members): names.append(name) return names @@ -550,7 +616,7 @@ def _scan() -> list[tuple[str, str, str]]: row = self._read_relation_lineage_by_vid(vid) if row is None: continue - source, target, type_value, members, _parts = row + source, target, type_value, members, _parts, _compacted = row if any(m.document_id == document_id for m in members): keys.append((source, target, type_value)) return keys @@ -561,13 +627,14 @@ def _scan() -> list[tuple[str, str, str]]: # lineage so the doc-scan helpers don't have to re-parse the VID. def _read_entity_lineage_by_vid( self, vid: str - ) -> tuple[str, str, list[LineageMember], list[DescriptionPart]] | None: + ) -> tuple[str, str, list[LineageMember], list[DescriptionPart], str | None] | None: stmt = ( f'FETCH PROP ON `{_ENTITY_TAG}` "{_escape_str(vid)}" ' f"YIELD `{_ENTITY_TAG}`.name AS name, " f"`{_ENTITY_TAG}`.entity_type AS entity_type, " f"`{_ENTITY_TAG}`.source_lineage_json AS sl, " - f"`{_ENTITY_TAG}`.description_parts_json AS dp" + f"`{_ENTITY_TAG}`.description_parts_json AS dp, " + f"`{_ENTITY_TAG}`.compacted_description AS cd" ) result = self._execute_with_schema_retry(self._space, stmt) if result.row_size() == 0: @@ -577,18 +644,20 @@ def _read_entity_lineage_by_vid( type_value = row[1].as_string() if row[1].is_string() else "" sl_raw = row[2].as_string() if row[2].is_string() else "" dp_raw = row[3].as_string() if row[3].is_string() else "" - return name, type_value, _members_from_json(sl_raw), _parts_from_json(dp_raw) + compacted = row[4].as_string() if row[4].is_string() else None + return name, type_value, _members_from_json(sl_raw), _parts_from_json(dp_raw), compacted def _read_relation_lineage_by_vid( self, vid: str - ) -> tuple[str, str, str, list[LineageMember], list[DescriptionPart]] | None: + ) -> tuple[str, str, str, list[LineageMember], list[DescriptionPart], str | None] | None: stmt = ( f'FETCH PROP ON `{_RELATION_TAG}` "{_escape_str(vid)}" ' f"YIELD `{_RELATION_TAG}`.source AS source, " f"`{_RELATION_TAG}`.target AS target, " f"`{_RELATION_TAG}`.relation_type AS relation_type, " f"`{_RELATION_TAG}`.evidence_lineage_json AS el, " - f"`{_RELATION_TAG}`.description_parts_json AS dp" + f"`{_RELATION_TAG}`.description_parts_json AS dp, " + f"`{_RELATION_TAG}`.compacted_description AS cd" ) result = self._execute_with_schema_retry(self._space, stmt) if result.row_size() == 0: @@ -599,12 +668,14 @@ def _read_relation_lineage_by_vid( type_value = row[2].as_string() if row[2].is_string() else "" el_raw = row[3].as_string() if row[3].is_string() else "" dp_raw = row[4].as_string() if row[4].is_string() else "" + compacted = row[5].as_string() if row[5].is_string() else None return ( source, target, type_value, _members_from_json(el_raw), _parts_from_json(dp_raw), + compacted, ) # -- strip-by-document (pre-rebuild phase) ------------------------ @@ -617,7 +688,7 @@ def _strip() -> None: row = self._read_entity_lineage(entity_name) if row is None: return - type_value, members, parts = row + type_value, members, parts, compacted = row new_members = [m for m in members if m.document_id != document_id] new_parts = [p for p in parts if p.document_id != document_id] if len(new_members) == len(members) and len(new_parts) == len(parts): @@ -627,6 +698,10 @@ def _strip() -> None: type_value=type_value, source_lineage=new_members, description_parts=new_parts, + # W7-1: strip preserves compacted_description so a + # subsequent re-sync of the SAME doc keeps the + # cache; compactor decides when to recompute. + compacted_description=compacted, ) await asyncio.to_thread(_strip) @@ -639,7 +714,7 @@ def _strip() -> None: row = self._read_relation_lineage(source, target, type) if row is None: return - members, parts = row + members, parts, compacted = row new_members = [m for m in members if m.document_id != document_id] new_parts = [p for p in parts if p.document_id != document_id] if len(new_members) == len(members) and len(new_parts) == len(parts): @@ -650,6 +725,7 @@ def _strip() -> None: type_value=type, evidence_lineage=new_members, description_parts=new_parts, + compacted_description=compacted, ) await asyncio.to_thread(_strip) @@ -664,7 +740,7 @@ def _gc() -> bool: row = self._read_entity_lineage(entity_name) if row is None: return False - _type_value, members, _parts = row + _type_value, members, _parts, _compacted = row if members: return False self._delete_entity_vertex(entity_name) @@ -680,7 +756,7 @@ def _gc() -> bool: row = self._read_relation_lineage(source, target, type) if row is None: return False - members, _parts = row + members, _parts, _compacted = row if members: return False self._delete_relation_vertex(source, target, type) @@ -688,9 +764,43 @@ def _gc() -> bool: return await asyncio.to_thread(_gc) + # -- unconditional delete (Wave 7 W7-1, used by curation merge) --- + + async def delete_entity(self, entity_name: str) -> bool: + await self.ensure_schema() + async with self._entity_lock.acquire(entity_name): + + def _delete() -> bool: + row = self._read_entity_lineage(entity_name) + if row is None: + return False + self._delete_entity_vertex(entity_name) + return True + + return await asyncio.to_thread(_delete) + + async def delete_relation(self, source: str, target: str, type: str) -> bool: + await self.ensure_schema() + async with self._entity_lock.acquire(_relation_vid(source, target, type)): + + def _delete() -> bool: + row = self._read_relation_lineage(source, target, type) + if row is None: + return False + self._delete_relation_vertex(source, target, type) + return True + + return await asyncio.to_thread(_delete) + # -- upserts (rebuild phase) -------------------------------------- - async def upsert_entity_with_lineage(self, *, record: EntityRecord, lineage: LineageMember) -> None: + async def upsert_entity_with_lineage( + self, + *, + record: EntityRecord, + lineage: LineageMember, + compacted_description: str | None = None, + ) -> None: """Add (or replace by ``(document_id, parse_version)`` key) the lineage member + corresponding description part. @@ -699,6 +809,11 @@ async def upsert_entity_with_lineage(self, *, record: EntityRecord, lineage: Lin same entity serialise. Without this serialisation, Nebula's property-update semantics (no native list ops) would race — per architect msg=f2921ae0. + + Wave 7 W7-1: ``compacted_description=None`` (default) preserves + the existing column; non-None overwrites. The Postgres COALESCE + equivalent is implemented in Python here because Nebula has no + native COALESCE on INSERT VERTEX (full-row overwrite semantics). """ await self.ensure_schema() new_part = DescriptionPart( @@ -713,20 +828,30 @@ def _upsert() -> None: if row is None: new_members = [lineage] new_parts = [new_part] + existing_compacted: str | None = None else: - _existing_type, members, parts = row + _existing_type, members, parts, existing_compacted = row new_members = [m for m in members if m.key() != lineage.key()] + [lineage] new_parts = [p for p in parts if p.key() != new_part.key()] + [new_part] + # W7-1: preserve existing if param is None. + final_compacted = compacted_description if compacted_description is not None else existing_compacted self._write_entity_vertex( name=record.name, type_value=record.entity_type, source_lineage=new_members, description_parts=new_parts, + compacted_description=final_compacted, ) await asyncio.to_thread(_upsert) - async def upsert_relation_with_lineage(self, *, record: RelationRecord, lineage: LineageMember) -> None: + async def upsert_relation_with_lineage( + self, + *, + record: RelationRecord, + lineage: LineageMember, + compacted_description: str | None = None, + ) -> None: await self.ensure_schema() new_part = DescriptionPart( document_id=lineage.document_id, @@ -740,16 +865,19 @@ def _upsert() -> None: if row is None: new_members = [lineage] new_parts = [new_part] + existing_compacted: str | None = None else: - members, parts = row + members, parts, existing_compacted = row new_members = [m for m in members if m.key() != lineage.key()] + [lineage] new_parts = [p for p in parts if p.key() != new_part.key()] + [new_part] + final_compacted = compacted_description if compacted_description is not None else existing_compacted self._write_relation_vertex( source=record.source, target=record.target, type_value=record.relation_type, evidence_lineage=new_members, description_parts=new_parts, + compacted_description=final_compacted, ) await asyncio.to_thread(_upsert) @@ -763,12 +891,13 @@ def _read() -> EntityWithLineage | None: row = self._read_entity_lineage(entity_name) if row is None: return None - type_value, members, parts = row + type_value, members, parts, compacted = row return EntityWithLineage( name=entity_name, entity_type=type_value, source_lineage=tuple(members), description_parts=tuple(parts), + compacted_description=compacted, ) return await asyncio.to_thread(_read) @@ -780,13 +909,14 @@ def _read() -> RelationWithLineage | None: row = self._read_relation_lineage(source, target, type) if row is None: return None - members, parts = row + members, parts, compacted = row return RelationWithLineage( source=source, target=target, relation_type=type, evidence_lineage=tuple(members), description_parts=tuple(parts), + compacted_description=compacted, ) return await asyncio.to_thread(_read) @@ -817,7 +947,7 @@ def _scan() -> list[EntityWithLineage]: row = self._read_entity_lineage_by_vid(vid) if row is None: continue - name, type_value, members, parts = row + name, type_value, members, parts, compacted = row if needle not in name.lower(): continue matches.append( @@ -826,6 +956,7 @@ def _scan() -> list[EntityWithLineage]: entity_type=type_value, source_lineage=tuple(members), description_parts=tuple(parts), + compacted_description=compacted, ) ) matches.sort(key=lambda e: e.name) @@ -853,12 +984,13 @@ def _add_entity(name: str) -> None: row = self._read_entity_lineage(name) if row is None: return - type_value, members, parts = row + type_value, members, parts, compacted = row seen_entities[name] = EntityWithLineage( name=name, entity_type=type_value, source_lineage=tuple(members), description_parts=tuple(parts), + compacted_description=compacted, ) current = {n for n in entity_names if n} @@ -877,7 +1009,7 @@ def _add_entity(name: str) -> None: row = self._read_relation_lineage_by_vid(vid) if row is None: continue - src, tgt, rtype, members, parts = row + src, tgt, rtype, members, parts, compacted = row if src not in current and tgt not in current: continue key = (src, tgt, rtype) @@ -888,6 +1020,7 @@ def _add_entity(name: str) -> None: relation_type=rtype, evidence_lineage=tuple(members), description_parts=tuple(parts), + compacted_description=compacted, ) for endpoint in (src, tgt): if endpoint not in seen_entities and endpoint not in next_frontier: @@ -919,7 +1052,7 @@ def _scan() -> list[str]: row = self._read_entity_lineage_by_vid(vid) if row is None: continue - _, type_value, _, _ = row + _, type_value, _, _, _ = row if type_value: labels.add(type_value) return sorted(labels) diff --git a/aperag/indexing/graph_storage/neo4j.py b/aperag/indexing/graph_storage/neo4j.py index 2b31cec7e..76ee879e7 100644 --- a/aperag/indexing/graph_storage/neo4j.py +++ b/aperag/indexing/graph_storage/neo4j.py @@ -331,14 +331,54 @@ async def gc_relation_if_orphan(self, source: str, target: str, type: str) -> bo rec = await result.single() return bool(rec and rec["deleted"] > 0) + # -- unconditional delete (Wave 7 W7-1, used by curation merge) --- + + async def delete_entity(self, entity_name: str) -> bool: + query = ( + f"MATCH (n:{_ENTITY_LABEL} {{collection_id: $collection_id, name: $name}}) " + f"DELETE n " + f"RETURN count(n) AS deleted" + ) + async with self._session() as session: + result = await session.run(query, collection_id=self._collection_id, name=entity_name) + rec = await result.single() + return bool(rec and rec["deleted"] > 0) + + async def delete_relation(self, source: str, target: str, type: str) -> bool: + query = ( + f"MATCH (r:{_RELATION_LABEL} " + f" {{collection_id: $collection_id, source: $source, target: $target, relation_type: $relation_type}}) " + f"DELETE r " + f"RETURN count(r) AS deleted" + ) + async with self._session() as session: + result = await session.run( + query, + collection_id=self._collection_id, + source=source, + target=target, + relation_type=type, + ) + rec = await result.single() + return bool(rec and rec["deleted"] > 0) + # -- upserts (rebuild phase) -------------------------------------- - async def upsert_entity_with_lineage(self, *, record: EntityRecord, lineage: LineageMember) -> None: + async def upsert_entity_with_lineage( + self, + *, + record: EntityRecord, + lineage: LineageMember, + compacted_description: str | None = None, + ) -> None: """Add (or replace by ``(document_id, parse_version)`` key) the lineage member + its description part. Single Cypher statement so two concurrent rebuilds against the same entity cannot race on read-modify-write — Neo4j MERGE locks the merged row for the duration of the trailing SET. + + Wave 7 W7-1: ``compacted_description=None`` (default) preserves + any existing column value via ``COALESCE``; non-None overwrites. """ member_json = _lineage_member_json(lineage) part_json = _description_part_json( @@ -352,6 +392,10 @@ async def upsert_entity_with_lineage(self, *, record: EntityRecord, lineage: Lin f"MERGE (n:{_ENTITY_LABEL} {{collection_id: $collection_id, name: $name}}) " # initialise on create — start with empty parallel lists so # the strip-then-append below has a defined input list. + # ``compacted_description`` initialises to NULL on create + # (W7-1: derived field; the trailing ``COALESCE`` SET below + # writes a non-None value if ``$compacted_description`` is + # supplied). f"ON CREATE SET " f" n.source_lineage = [], " f" n.source_lineage_doc_ids = [], " @@ -359,6 +403,7 @@ async def upsert_entity_with_lineage(self, *, record: EntityRecord, lineage: Lin f" n.description_parts = [], " f" n.description_parts_doc_ids = [], " f" n.description_parts_parse_versions = [], " + f" n.compacted_description = NULL, " f" n.gmt_created = datetime() " f"WITH n, " f" [i IN range(0, size(n.source_lineage_doc_ids) - 1) " @@ -377,6 +422,9 @@ async def upsert_entity_with_lineage(self, *, record: EntityRecord, lineage: Lin f" [i IN dp_keep | n.description_parts_doc_ids[i]] + [$document_id], " f" n.description_parts_parse_versions = " f" [i IN dp_keep | n.description_parts_parse_versions[i]] + [$parse_version], " + # W7-1: COALESCE preserves existing if param is NULL, else + # overwrites. On CREATE both sides are NULL → still NULL. + f" n.compacted_description = COALESCE($compacted_description, n.compacted_description), " f" n.gmt_updated = datetime()" ) async with self._session() as session: @@ -389,9 +437,16 @@ async def upsert_entity_with_lineage(self, *, record: EntityRecord, lineage: Lin parse_version=lineage.parse_version, member_json=member_json, part_json=part_json, + compacted_description=compacted_description, ) - async def upsert_relation_with_lineage(self, *, record: RelationRecord, lineage: LineageMember) -> None: + async def upsert_relation_with_lineage( + self, + *, + record: RelationRecord, + lineage: LineageMember, + compacted_description: str | None = None, + ) -> None: member_json = _lineage_member_json(lineage) part_json = _description_part_json( DescriptionPart( @@ -410,6 +465,7 @@ async def upsert_relation_with_lineage(self, *, record: RelationRecord, lineage: f" r.description_parts = [], " f" r.description_parts_doc_ids = [], " f" r.description_parts_parse_versions = [], " + f" r.compacted_description = NULL, " f" r.gmt_created = datetime() " f"WITH r, " f" [i IN range(0, size(r.evidence_lineage_doc_ids) - 1) " @@ -428,6 +484,7 @@ async def upsert_relation_with_lineage(self, *, record: RelationRecord, lineage: f" [i IN dp_keep | r.description_parts_doc_ids[i]] + [$document_id], " f" r.description_parts_parse_versions = " f" [i IN dp_keep | r.description_parts_parse_versions[i]] + [$parse_version], " + f" r.compacted_description = COALESCE($compacted_description, r.compacted_description), " f" r.gmt_updated = datetime()" ) async with self._session() as session: @@ -441,6 +498,7 @@ async def upsert_relation_with_lineage(self, *, record: RelationRecord, lineage: parse_version=lineage.parse_version, member_json=member_json, part_json=part_json, + compacted_description=compacted_description, ) # -- read-path ---------------------------------------------------- @@ -450,7 +508,8 @@ async def get_entity(self, entity_name: str) -> EntityWithLineage | None: f"MATCH (n:{_ENTITY_LABEL} {{collection_id: $collection_id, name: $name}}) " f"RETURN n.name AS name, n.entity_type AS entity_type, " f" n.source_lineage AS source_lineage, " - f" n.description_parts AS description_parts" + f" n.description_parts AS description_parts, " + f" n.compacted_description AS compacted_description" ) async with self._session() as session: result = await session.run(query, collection_id=self._collection_id, name=entity_name) @@ -464,6 +523,7 @@ async def get_entity(self, entity_name: str) -> EntityWithLineage | None: description_parts=tuple( DescriptionPart.from_dict(json.loads(s)) for s in (rec["description_parts"] or []) ), + compacted_description=rec["compacted_description"], ) async def get_relation(self, source: str, target: str, type: str) -> RelationWithLineage | None: @@ -472,7 +532,8 @@ async def get_relation(self, source: str, target: str, type: str) -> RelationWit f" {{collection_id: $collection_id, source: $source, target: $target, relation_type: $relation_type}}) " f"RETURN r.source AS source, r.target AS target, r.relation_type AS relation_type, " f" r.evidence_lineage AS evidence_lineage, " - f" r.description_parts AS description_parts" + f" r.description_parts AS description_parts, " + f" r.compacted_description AS compacted_description" ) async with self._session() as session: result = await session.run( @@ -493,6 +554,7 @@ async def get_relation(self, source: str, target: str, type: str) -> RelationWit description_parts=tuple( DescriptionPart.from_dict(json.loads(s)) for s in (rec["description_parts"] or []) ), + compacted_description=rec["compacted_description"], ) # -- LightRAG-style query layer (Wave 6 #33 chunk 2) -------------- @@ -520,7 +582,8 @@ async def query_entities_by_keyword( f"WHERE toLower(n.name) CONTAINS toLower($keyword) " f"RETURN n.name AS name, n.entity_type AS entity_type, " f" n.source_lineage AS source_lineage, " - f" n.description_parts AS description_parts " + f" n.description_parts AS description_parts, " + f" n.compacted_description AS compacted_description " f"ORDER BY n.name " f"LIMIT $top_k" ) @@ -539,6 +602,7 @@ async def query_entities_by_keyword( description_parts=tuple( DescriptionPart.from_dict(json.loads(s)) for s in (rec["description_parts"] or []) ), + compacted_description=rec["compacted_description"], ) async for rec in result ] @@ -563,7 +627,8 @@ async def _fetch_entities(session, names: list[str]) -> None: f"WHERE n.name IN $names " f"RETURN n.name AS name, n.entity_type AS entity_type, " f" n.source_lineage AS source_lineage, " - f" n.description_parts AS description_parts" + f" n.description_parts AS description_parts, " + f" n.compacted_description AS compacted_description" ) result = await session.run(cypher, collection_id=self._collection_id, names=names) async for rec in result: @@ -576,6 +641,7 @@ async def _fetch_entities(session, names: list[str]) -> None: description_parts=tuple( DescriptionPart.from_dict(json.loads(s)) for s in (rec["description_parts"] or []) ), + compacted_description=rec["compacted_description"], ) async def _fetch_relations_touching(session, names: list[str]) -> set[str]: @@ -586,7 +652,8 @@ async def _fetch_relations_touching(session, names: list[str]) -> set[str]: f"WHERE r.source IN $names OR r.target IN $names " f"RETURN r.source AS source, r.target AS target, r.relation_type AS relation_type, " f" r.evidence_lineage AS evidence_lineage, " - f" r.description_parts AS description_parts" + f" r.description_parts AS description_parts, " + f" r.compacted_description AS compacted_description" ) result = await session.run(cypher, collection_id=self._collection_id, names=names) next_frontier: set[str] = set() @@ -603,6 +670,7 @@ async def _fetch_relations_touching(session, names: list[str]) -> set[str]: description_parts=tuple( DescriptionPart.from_dict(json.loads(s)) for s in (rec["description_parts"] or []) ), + compacted_description=rec["compacted_description"], ) for endpoint in (rec["source"], rec["target"]): if endpoint not in seen_entities and endpoint not in next_frontier: diff --git a/aperag/indexing/graph_storage/postgres.py b/aperag/indexing/graph_storage/postgres.py index c7d287b23..70c453775 100644 --- a/aperag/indexing/graph_storage/postgres.py +++ b/aperag/indexing/graph_storage/postgres.py @@ -62,6 +62,7 @@ DateTime, Index, String, + Text, select, text, ) @@ -124,6 +125,13 @@ class _LineageEntityRow(_LineageGraphBase): entity_type = Column(String(64), nullable=False) source_lineage = Column(JSONB, nullable=False, server_default=text("'[]'::jsonb")) description_parts = Column(JSONB, nullable=False, server_default=text("'[]'::jsonb")) + # Wave 7 W7-1: derived cache column written by ``GraphIndexCompactor`` + # (W7-2) when ``description_parts`` aggregate text grows past + # threshold. Nullable: ``NULL`` means "not yet compacted" — readers + # fall back to joining ``description_parts.text``. PostgreSQL + # ``Text`` is unlimited; the application layer (Compactor) enforces + # ``max_description_chars`` (default 8000) per spec §K.12.5. + compacted_description = Column(Text, nullable=True) # Wave 5 P5B: ORM uses the same ``server_default=CURRENT_TIMESTAMP`` # the alembic migration declares — strict ORM↔migration mirror so # ``alembic check`` cannot drift on schema-touching follow-ups. @@ -164,6 +172,8 @@ class _LineageRelationRow(_LineageGraphBase): # standalone ``description`` field so the column had no consumer. evidence_lineage = Column(JSONB, nullable=False, server_default=text("'[]'::jsonb")) description_parts = Column(JSONB, nullable=False, server_default=text("'[]'::jsonb")) + # Wave 7 W7-1: derived cache column, semantics mirror entity row. + compacted_description = Column(Text, nullable=True) # Wave 5 P5B: same ORM↔migration mirror discipline as # ``_LineageEntityRow`` above. gmt_created = Column( @@ -365,12 +375,59 @@ async def gc_relation_if_orphan(self, source: str, target: str, type: str) -> bo ) return (result.rowcount or 0) > 0 + # -- unconditional delete (Wave 7 W7-1, used by curation merge) --- + + async def delete_entity(self, entity_name: str) -> bool: + sql = text( + f""" + DELETE FROM {ENTITY_TABLE} + WHERE collection_id = :collection_id AND name = :name + """ + ) + async with self._engine.begin() as conn: + result = await conn.execute(sql, {"collection_id": self._collection_id, "name": entity_name}) + return (result.rowcount or 0) > 0 + + async def delete_relation(self, source: str, target: str, type: str) -> bool: + sql = text( + f""" + DELETE FROM {RELATION_TABLE} + WHERE collection_id = :collection_id + AND source = :source AND target = :target AND relation_type = :relation_type + """ + ) + async with self._engine.begin() as conn: + result = await conn.execute( + sql, + { + "collection_id": self._collection_id, + "source": source, + "target": target, + "relation_type": type, + }, + ) + return (result.rowcount or 0) > 0 + # -- upserts (rebuild phase) -------------------------------------- - async def upsert_entity_with_lineage(self, *, record: EntityRecord, lineage: LineageMember) -> None: + async def upsert_entity_with_lineage( + self, + *, + record: EntityRecord, + lineage: LineageMember, + compacted_description: str | None = None, + ) -> None: """Add (or replace by `(document_id, parse_version)` key) the lineage member + a corresponding description part. Single - statement so concurrent rebuilds do not race.""" + statement so concurrent rebuilds do not race. + + Wave 7 W7-1: ``compacted_description=None`` (default) preserves + any existing column value via ``COALESCE``; non-None overwrites. + This keeps the Wave 4 indexer hot path (per-chunk upserts that + don't compute compacted summaries) idempotent across syncs — + only ``GraphIndexCompactor`` (W7-2) actually populates the + column with a non-None value. + """ member_json = _lineage_to_json_array(lineage) part_json = _description_part_to_json( DescriptionPart( @@ -383,12 +440,14 @@ async def upsert_entity_with_lineage(self, *, record: EntityRecord, lineage: Lin f""" INSERT INTO {ENTITY_TABLE} ( collection_id, name, entity_type, source_lineage, description_parts, + compacted_description, gmt_created, gmt_updated ) VALUES ( :collection_id, :name, :entity_type, jsonb_build_array(CAST(:member_json AS jsonb)), jsonb_build_array(CAST(:part_json AS jsonb)), + :compacted_description, NOW(), NOW() ) ON CONFLICT (collection_id, name) DO UPDATE @@ -410,6 +469,11 @@ async def upsert_entity_with_lineage(self, *, record: EntityRecord, lineage: Lin '[]'::jsonb ) || jsonb_build_array(CAST(:part_json AS jsonb)) ), + -- W7-1: preserve existing if param is NULL, else overwrite. + compacted_description = COALESCE( + EXCLUDED.compacted_description, + {ENTITY_TABLE}.compacted_description + ), gmt_updated = NOW() """ ) @@ -424,10 +488,17 @@ async def upsert_entity_with_lineage(self, *, record: EntityRecord, lineage: Lin "parse_version": lineage.parse_version, "member_json": member_json, "part_json": part_json, + "compacted_description": compacted_description, }, ) - async def upsert_relation_with_lineage(self, *, record: RelationRecord, lineage: LineageMember) -> None: + async def upsert_relation_with_lineage( + self, + *, + record: RelationRecord, + lineage: LineageMember, + compacted_description: str | None = None, + ) -> None: member_json = _lineage_to_json_array(lineage) part_json = _description_part_to_json( DescriptionPart( @@ -441,12 +512,14 @@ async def upsert_relation_with_lineage(self, *, record: RelationRecord, lineage: INSERT INTO {RELATION_TABLE} ( collection_id, source, target, relation_type, evidence_lineage, description_parts, + compacted_description, gmt_created, gmt_updated ) VALUES ( :collection_id, :source, :target, :relation_type, jsonb_build_array(CAST(:member_json AS jsonb)), jsonb_build_array(CAST(:part_json AS jsonb)), + :compacted_description, NOW(), NOW() ) ON CONFLICT (collection_id, source, target, relation_type) DO UPDATE @@ -464,6 +537,11 @@ async def upsert_relation_with_lineage(self, *, record: RelationRecord, lineage: '[]'::jsonb ) || jsonb_build_array(CAST(:part_json AS jsonb)) ), + -- W7-1: preserve existing if param is NULL, else overwrite. + compacted_description = COALESCE( + EXCLUDED.compacted_description, + {RELATION_TABLE}.compacted_description + ), gmt_updated = NOW() """ ) @@ -479,6 +557,7 @@ async def upsert_relation_with_lineage(self, *, record: RelationRecord, lineage: "parse_version": lineage.parse_version, "member_json": member_json, "part_json": part_json, + "compacted_description": compacted_description, }, ) @@ -492,6 +571,7 @@ async def get_entity(self, entity_name: str) -> EntityWithLineage | None: _LineageEntityRow.entity_type, _LineageEntityRow.source_lineage, _LineageEntityRow.description_parts, + _LineageEntityRow.compacted_description, ).where( _LineageEntityRow.collection_id == self._collection_id, _LineageEntityRow.name == entity_name, @@ -505,6 +585,7 @@ async def get_entity(self, entity_name: str) -> EntityWithLineage | None: entity_type=row.entity_type, source_lineage=tuple(LineageMember.from_dict(elem) for elem in (row.source_lineage or [])), description_parts=tuple(DescriptionPart.from_dict(part) for part in (row.description_parts or [])), + compacted_description=row.compacted_description, ) async def get_relation(self, source: str, target: str, type: str) -> RelationWithLineage | None: @@ -516,6 +597,7 @@ async def get_relation(self, source: str, target: str, type: str) -> RelationWit _LineageRelationRow.relation_type, _LineageRelationRow.evidence_lineage, _LineageRelationRow.description_parts, + _LineageRelationRow.compacted_description, ).where( _LineageRelationRow.collection_id == self._collection_id, _LineageRelationRow.source == source, @@ -532,6 +614,7 @@ async def get_relation(self, source: str, target: str, type: str) -> RelationWit relation_type=row.relation_type, evidence_lineage=tuple(LineageMember.from_dict(elem) for elem in (row.evidence_lineage or [])), description_parts=tuple(DescriptionPart.from_dict(part) for part in (row.description_parts or [])), + compacted_description=row.compacted_description, ) # -- LightRAG-style query layer (Wave 6 #33 chunk 2) -------------- @@ -554,6 +637,7 @@ async def query_entities_by_keyword( _LineageEntityRow.entity_type, _LineageEntityRow.source_lineage, _LineageEntityRow.description_parts, + _LineageEntityRow.compacted_description, ) .where( _LineageEntityRow.collection_id == self._collection_id, @@ -568,6 +652,7 @@ async def query_entities_by_keyword( entity_type=row.entity_type, source_lineage=tuple(LineageMember.from_dict(elem) for elem in (row.source_lineage or [])), description_parts=tuple(DescriptionPart.from_dict(part) for part in (row.description_parts or [])), + compacted_description=row.compacted_description, ) for row in result ] @@ -593,6 +678,7 @@ async def _fetch_entities(names: set[str]) -> None: _LineageEntityRow.entity_type, _LineageEntityRow.source_lineage, _LineageEntityRow.description_parts, + _LineageEntityRow.compacted_description, ).where( _LineageEntityRow.collection_id == self._collection_id, _LineageEntityRow.name.in_(list(names)), @@ -606,6 +692,7 @@ async def _fetch_entities(names: set[str]) -> None: entity_type=row.entity_type, source_lineage=tuple(LineageMember.from_dict(elem) for elem in (row.source_lineage or [])), description_parts=tuple(DescriptionPart.from_dict(part) for part in (row.description_parts or [])), + compacted_description=row.compacted_description, ) async def _fetch_relations_touching(names: set[str]) -> set[str]: @@ -621,6 +708,7 @@ async def _fetch_relations_touching(names: set[str]) -> set[str]: _LineageRelationRow.relation_type, _LineageRelationRow.evidence_lineage, _LineageRelationRow.description_parts, + _LineageRelationRow.compacted_description, ).where( _LineageRelationRow.collection_id == self._collection_id, (_LineageRelationRow.source.in_(list(names))) | (_LineageRelationRow.target.in_(list(names))), @@ -638,6 +726,7 @@ async def _fetch_relations_touching(names: set[str]) -> set[str]: description_parts=tuple( DescriptionPart.from_dict(part) for part in (row.description_parts or []) ), + compacted_description=row.compacted_description, ) for endpoint in (row.source, row.target): if endpoint not in seen_entities and endpoint not in next_frontier: diff --git a/aperag/migration/versions/20260428020000-a3b7c4d8e2f1_lineage_graph_compacted_description.py b/aperag/migration/versions/20260428020000-a3b7c4d8e2f1_lineage_graph_compacted_description.py new file mode 100644 index 000000000..167e6e251 --- /dev/null +++ b/aperag/migration/versions/20260428020000-a3b7c4d8e2f1_lineage_graph_compacted_description.py @@ -0,0 +1,56 @@ +"""add ``compacted_description`` column to ``aperag_lineage_*`` tables + +Wave 7 W7-1 (spec §K.12.4): adds a nullable ``Text`` column to both +lineage tables so :class:`GraphIndexCompactor` (W7-2) can persist the +LLM-summarised unified description for each entity / relation. + +The column is **derived**: ``description_parts`` (JSONB) remains the +per-doc source of truth; ``compacted_description`` is the cache fed to +the vector embedding step (W7-3) so embedded text stays bounded under +``max_description_chars`` (default 8000 — application-layer enforced +by Compactor, not a DB ``CHECK``). NULL means "not yet compacted" — +readers fall back to joining ``description_parts.text``. + +PostgreSQL ``Text`` is unlimited; per spec §K.12.5 the length cap +lives in the application layer (Compactor + ``graph_extractor``) +because the same Protocol must run against Neo4j ``STRING`` and Nebula +``string`` properties which also have no fixed cap. Schema CHECK +constraints would have to be replicated cross-backend; the Compactor +truncate-not-fail policy is the canonical enforcement. + +Hard-cut per spec §K.12 (legacy ``graphindex_*`` deployment was gated +behind ``enable_knowledge_graph=False`` and Wave 4 hard-cut already +locked-in no-production-data on the new ``aperag_lineage_*`` tables). +The downgrade drops the column so a rollback can replay subsequent +migrations cleanly. + +Revision ID: a3b7c4d8e2f1 +Revises: f8a4c5b9d3e1 +Create Date: 2026-04-28 02:00:00.000000 +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +revision: str = "a3b7c4d8e2f1" +down_revision: Union[str, None] = "f8a4c5b9d3e1" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column( + "aperag_lineage_entity", + sa.Column("compacted_description", sa.Text(), nullable=True), + ) + op.add_column( + "aperag_lineage_relation", + sa.Column("compacted_description", sa.Text(), nullable=True), + ) + + +def downgrade() -> None: + op.drop_column("aperag_lineage_relation", "compacted_description") + op.drop_column("aperag_lineage_entity", "compacted_description") diff --git a/tests/integration/compat/test_lineage_graph_compat.py b/tests/integration/compat/test_lineage_graph_compat.py index d7aab66ed..bce89743d 100644 --- a/tests/integration/compat/test_lineage_graph_compat.py +++ b/tests/integration/compat/test_lineage_graph_compat.py @@ -600,3 +600,241 @@ async def test_list_entity_labels_excludes_orphan_after_gc(store, collection_id) labels = await s.list_entity_labels() assert labels == ["person"] + + +# --- tests: Wave 7 W7-1 ``compacted_description`` field -------------------- +# +# These pin the cross-backend behaviour of the new W7-1 column added by +# the alembic migration (Postgres) / Neo4j property / Nebula tag-prop: +# +# - default value is ``None`` after a Wave 4-style indexer upsert +# (kwarg not supplied) +# - explicit non-None kwarg writes through and is read back +# - ``None`` kwarg on a subsequent upsert PRESERVES the existing +# compacted value (the COALESCE invariant — the per-chunk indexer +# hot path MUST NOT clobber a Compactor-written value) +# - 100k-char roundtrip (huangheng safety gate: verifies driver +# buffers don't truncate / corrupt at large sizes) +# - lineage-preserve safety gate (huangheng msg=828c83cc 1131981c): +# setting compacted via upsert MUST NOT clobber other lineage state +# - ``delete_entity`` / ``delete_relation`` unconditional remove + + +@pytest.mark.asyncio +async def test_compacted_description_defaults_to_none_when_kwarg_omitted(store, collection_id): + """A Wave 4-style upsert (no ``compacted_description`` kwarg) + leaves the column at its default ``None``.""" + + _, s = store + await s.upsert_entity_with_lineage( + record=_entity("Alice", description="raw extracted text"), + lineage=_LM_A_V1, + ) + got = await s.get_entity("Alice") + assert got is not None + assert got.compacted_description is None + + +@pytest.mark.asyncio +async def test_compacted_description_round_trip_when_supplied(store, collection_id): + """Explicit non-None kwarg writes through and reads back unchanged.""" + + _, s = store + summary = "Alice is a senior engineer working on graph indexing." + await s.upsert_entity_with_lineage( + record=_entity("Alice"), + lineage=_LM_A_V1, + compacted_description=summary, + ) + got = await s.get_entity("Alice") + assert got is not None + assert got.compacted_description == summary + + +@pytest.mark.asyncio +async def test_compacted_description_preserved_on_subsequent_none_upsert(store, collection_id): + """The COALESCE invariant: once the Compactor writes a non-None + value, a subsequent Wave 4 indexer upsert (no kwarg → ``None`` + default) MUST preserve it. Without this contract every per-chunk + upsert in the indexer hot path would clear the Compactor cache and + force a re-summary on every doc add. This test pins the + cross-backend behaviour (Postgres COALESCE, Neo4j COALESCE, + Nebula in-Python preserve).""" + + _, s = store + summary = "compacted v1 — synthesised by GraphIndexCompactor" + await s.upsert_entity_with_lineage( + record=_entity("Alice"), + lineage=_LM_A_V1, + compacted_description=summary, + ) + # Indexer-side upsert with the same lineage key but no compacted + # kwarg — emulates a re-sync of the same doc/parse_version. + await s.upsert_entity_with_lineage( + record=_entity("Alice", description="re-extracted text"), + lineage=_LM_A_V1, + ) + got = await s.get_entity("Alice") + assert got is not None + assert got.compacted_description == summary # preserved + + +@pytest.mark.asyncio +async def test_compacted_description_overwritten_on_subsequent_non_none_upsert(store, collection_id): + """The Compactor refreshing a stale summary MUST overwrite the + existing value. (Compactor passes a non-None kwarg deliberately.)""" + + _, s = store + await s.upsert_entity_with_lineage( + record=_entity("Alice"), + lineage=_LM_A_V1, + compacted_description="v1 summary", + ) + await s.upsert_entity_with_lineage( + record=_entity("Alice"), + lineage=_LM_A_V1, + compacted_description="v2 summary", + ) + got = await s.get_entity("Alice") + assert got is not None + assert got.compacted_description == "v2 summary" + + +@pytest.mark.asyncio +async def test_compacted_description_supports_100k_chars_roundtrip(store, collection_id): + """100k-char roundtrip safety gate (per huangheng msg=4d93a6c5 + gate #1). The Postgres ``Text`` / Neo4j ``STRING`` / Nebula + ``string`` types are theoretically unbounded but driver-level + buffers (asyncpg / neo4j-python-driver / nebula-python) have + historically clipped large payloads silently. The application + layer caps at 8000 chars (per spec §K.12.5), but the schema layer + MUST tolerate the 12.5× margin — otherwise a Compactor bug that + drops the cap could surface as a database-level corruption rather + than an obvious truncation. This test pins the schema-side margin + cross-backend.""" + + _, s = store + big = "x" * 100_000 + await s.upsert_entity_with_lineage( + record=_entity("Bob"), + lineage=_LM_A_V1, + compacted_description=big, + ) + got = await s.get_entity("Bob") + assert got is not None + assert got.compacted_description is not None + assert len(got.compacted_description) == 100_000 + assert got.compacted_description == big + + +@pytest.mark.asyncio +async def test_compacted_description_does_not_clobber_lineage_on_compactor_write(store, collection_id): + """huangheng safety gate (msg=828c83cc): writing + ``compacted_description`` via the existing + ``upsert_*_with_lineage`` kwarg path MUST preserve every other + lineage field on the row. This is the principal risk of Option B + (chosen over Option A's separate ``set_compacted_description`` + method): a Compactor that mis-constructs the EntityRecord could + silently drop ``description_parts`` from other (doc, parse_v) + slices. The test seeds multi-doc lineage, then simulates a + Compactor write (re-passing the same lineage), and asserts ALL + fields intact.""" + + _, s = store + # Seed two distinct (doc, parse_v) lineage slices with distinct + # description text. The compactor will be invoked once, with + # lineage A-v1; A-v2 must remain untouched. + await s.upsert_entity_with_lineage( + record=_entity("Alice", description="from doc-A v1"), + lineage=_LM_A_V1, + ) + await s.upsert_entity_with_lineage( + record=_entity("Alice", description="from doc-A v2"), + lineage=_LM_A_V2, + ) + pre = await s.get_entity("Alice") + assert pre is not None + pre_lineage = sorted(pre.source_lineage, key=lambda m: (m.document_id, m.parse_version)) + pre_parts = sorted(pre.description_parts, key=lambda p: (p.document_id, p.parse_version)) + assert {(m.document_id, m.parse_version) for m in pre_lineage} == {("doc-A", "v1"), ("doc-A", "v2")} + + # Compactor write — passes the EXISTING (doc-A, v1) record (text + # unchanged) plus a compacted_description. After this, both + # lineage slices MUST still be present, neither description_parts + # text has changed, and compacted_description is set. + await s.upsert_entity_with_lineage( + record=_entity("Alice", description="from doc-A v1"), + lineage=_LM_A_V1, + compacted_description="LLM-merged summary across both v1 and v2", + ) + after = await s.get_entity("Alice") + assert after is not None + after_lineage = sorted(after.source_lineage, key=lambda m: (m.document_id, m.parse_version)) + after_parts = sorted(after.description_parts, key=lambda p: (p.document_id, p.parse_version)) + assert after_lineage == pre_lineage + # description_parts text values for v1/v2 must be unchanged. + assert {(p.document_id, p.parse_version, p.text) for p in after_parts} == { + (p.document_id, p.parse_version, p.text) for p in pre_parts + } + assert after.compacted_description == "LLM-merged summary across both v1 and v2" + + +@pytest.mark.asyncio +async def test_compacted_description_relation_round_trip(store, collection_id): + """Symmetric coverage for relations — the W7-1 schema is added on + both ``aperag_lineage_entity`` and ``aperag_lineage_relation``.""" + + _, s = store + await s.upsert_relation_with_lineage( + record=_relation("Alice", "Bob"), + lineage=_LM_A_V1, + compacted_description="Alice and Bob co-authored the paper.", + ) + got = await s.get_relation("Alice", "Bob", "knows") + assert got is not None + assert got.compacted_description == "Alice and Bob co-authored the paper." + + +# --- tests: Wave 7 W7-1 ``delete_entity`` / ``delete_relation`` ------------ + + +@pytest.mark.asyncio +async def test_delete_entity_unconditionally_removes(store, collection_id): + """``delete_entity`` removes the row regardless of lineage state. + Distinguishes from ``gc_entity_if_orphan`` which only deletes when + ``source_lineage`` is empty.""" + + _, s = store + await s.upsert_entity_with_lineage(record=_entity("Alice"), lineage=_LM_A_V1) + deleted = await s.delete_entity("Alice") + assert deleted is True + assert await s.get_entity("Alice") is None + # Idempotent: a second delete returns False (row no longer exists). + again = await s.delete_entity("Alice") + assert again is False + + +@pytest.mark.asyncio +async def test_delete_entity_returns_false_when_absent(store, collection_id): + """``delete_entity`` is idempotent: returns False when the row was + not present to begin with.""" + + _, s = store + deleted = await s.delete_entity("DoesNotExist") + assert deleted is False + + +@pytest.mark.asyncio +async def test_delete_relation_unconditionally_removes(store, collection_id): + _, s = store + await s.upsert_relation_with_lineage(record=_relation("Alice", "Bob"), lineage=_LM_A_V1) + deleted = await s.delete_relation("Alice", "Bob", "knows") + assert deleted is True + assert await s.get_relation("Alice", "Bob", "knows") is None + + +@pytest.mark.asyncio +async def test_delete_relation_returns_false_when_absent(store, collection_id): + _, s = store + deleted = await s.delete_relation("Alice", "Bob", "knows") + assert deleted is False diff --git a/tests/unit_test/indexing/test_t1_2_graph.py b/tests/unit_test/indexing/test_t1_2_graph.py index 7049ce425..150a15fb6 100644 --- a/tests/unit_test/indexing/test_t1_2_graph.py +++ b/tests/unit_test/indexing/test_t1_2_graph.py @@ -1084,3 +1084,191 @@ async def extractor(chunks): entities, _relations = parse_kg_jsonl(body.read()) assert len(entities) >= 1 assert all(e.name.startswith("E_") for e in entities) + + +# --------------------------------------------------------------------- +# Group 5: Wave 7 W7-1 — ``compacted_description`` field + unconditional +# ``delete_entity`` / ``delete_relation`` against the InMemory reference +# implementation. The cross-backend versions of these tests live in +# ``tests/integration/compat/test_lineage_graph_compat.py``; the unit +# tests here pin the reference oracle (the InMemory store is the +# canonical correctness target every backend must match). +# --------------------------------------------------------------------- + + +_LINEAGE_W7_DOC_A_V1 = LineageMember( + document_id="doc_A", + parse_version="v1", + tenant_scope_key="tenant-X", + chunk_ids=("c0",), +) + +_LINEAGE_W7_DOC_A_V2 = LineageMember( + document_id="doc_A", + parse_version="v2", + tenant_scope_key="tenant-X", + chunk_ids=("c1",), +) + + +def _record(name: str = "Alice", *, description: str = "raw text", chunk: str = "c0") -> EntityRecord: + return EntityRecord( + name=name, + entity_type="Person", + description=description, + source_chunk_ids=(chunk,), + ) + + +def _relation_record( + source: str = "Alice", + target: str = "Bob", + *, + relation_type: str = "knows", + description: str = "they know each other", + chunk: str = "c0", +) -> RelationRecord: + return RelationRecord( + source=source, + target=target, + relation_type=relation_type, + description=description, + source_chunk_ids=(chunk,), + ) + + +@pytest.mark.asyncio +async def test_w7_compacted_description_defaults_to_none(): + store = InMemoryLineageGraphStore() + await store.upsert_entity_with_lineage(record=_record(), lineage=_LINEAGE_W7_DOC_A_V1) + got = await store.get_entity("Alice") + assert got is not None + assert got.compacted_description is None + + +@pytest.mark.asyncio +async def test_w7_compacted_description_round_trip(): + store = InMemoryLineageGraphStore() + await store.upsert_entity_with_lineage( + record=_record(), + lineage=_LINEAGE_W7_DOC_A_V1, + compacted_description="LLM summary text", + ) + got = await store.get_entity("Alice") + assert got is not None + assert got.compacted_description == "LLM summary text" + + +@pytest.mark.asyncio +async def test_w7_compacted_description_preserved_on_subsequent_none_kwarg(): + """COALESCE invariant on the InMemory reference store — ``None`` + kwarg on a subsequent upsert MUST preserve the existing value.""" + store = InMemoryLineageGraphStore() + await store.upsert_entity_with_lineage( + record=_record(), + lineage=_LINEAGE_W7_DOC_A_V1, + compacted_description="v1 summary", + ) + # Indexer-side re-sync — same lineage key, no compacted kwarg. + await store.upsert_entity_with_lineage( + record=_record(description="re-extracted"), + lineage=_LINEAGE_W7_DOC_A_V1, + ) + got = await store.get_entity("Alice") + assert got is not None + assert got.compacted_description == "v1 summary" + + +@pytest.mark.asyncio +async def test_w7_compacted_description_overwritten_on_non_none_kwarg(): + store = InMemoryLineageGraphStore() + await store.upsert_entity_with_lineage( + record=_record(), + lineage=_LINEAGE_W7_DOC_A_V1, + compacted_description="v1", + ) + await store.upsert_entity_with_lineage( + record=_record(), + lineage=_LINEAGE_W7_DOC_A_V1, + compacted_description="v2", + ) + got = await store.get_entity("Alice") + assert got is not None + assert got.compacted_description == "v2" + + +@pytest.mark.asyncio +async def test_w7_compacted_write_does_not_clobber_lineage_state(): + """huangheng safety gate (msg=828c83cc): the Compactor write path + must preserve every other lineage field on the row.""" + store = InMemoryLineageGraphStore() + await store.upsert_entity_with_lineage(record=_record(description="v1 text"), lineage=_LINEAGE_W7_DOC_A_V1) + await store.upsert_entity_with_lineage( + record=_record(description="v2 text", chunk="c1"), + lineage=_LINEAGE_W7_DOC_A_V2, + ) + pre = await store.get_entity("Alice") + assert pre is not None + pre_lineage_keys = {(m.document_id, m.parse_version) for m in pre.source_lineage} + pre_part_keys = {(p.document_id, p.parse_version, p.text) for p in pre.description_parts} + + # Compactor-style write — passes the v1 record again with a + # compacted summary covering both v1 and v2. + await store.upsert_entity_with_lineage( + record=_record(description="v1 text"), + lineage=_LINEAGE_W7_DOC_A_V1, + compacted_description="v1+v2 unified summary", + ) + after = await store.get_entity("Alice") + assert after is not None + after_lineage_keys = {(m.document_id, m.parse_version) for m in after.source_lineage} + after_part_keys = {(p.document_id, p.parse_version, p.text) for p in after.description_parts} + + assert after_lineage_keys == pre_lineage_keys + assert after_part_keys == pre_part_keys + assert after.compacted_description == "v1+v2 unified summary" + + +@pytest.mark.asyncio +async def test_w7_relation_compacted_description_round_trip(): + store = InMemoryLineageGraphStore() + await store.upsert_relation_with_lineage( + record=_relation_record(), + lineage=_LINEAGE_W7_DOC_A_V1, + compacted_description="rel summary", + ) + got = await store.get_relation("Alice", "Bob", "knows") + assert got is not None + assert got.compacted_description == "rel summary" + + +@pytest.mark.asyncio +async def test_w7_delete_entity_unconditionally_removes(): + store = InMemoryLineageGraphStore() + await store.upsert_entity_with_lineage(record=_record(), lineage=_LINEAGE_W7_DOC_A_V1) + deleted = await store.delete_entity("Alice") + assert deleted is True + assert await store.get_entity("Alice") is None + + +@pytest.mark.asyncio +async def test_w7_delete_entity_returns_false_when_absent(): + store = InMemoryLineageGraphStore() + deleted = await store.delete_entity("DoesNotExist") + assert deleted is False + + +@pytest.mark.asyncio +async def test_w7_delete_relation_unconditionally_removes(): + store = InMemoryLineageGraphStore() + await store.upsert_relation_with_lineage(record=_relation_record(), lineage=_LINEAGE_W7_DOC_A_V1) + deleted = await store.delete_relation("Alice", "Bob", "knows") + assert deleted is True + assert await store.get_relation("Alice", "Bob", "knows") is None + + +@pytest.mark.asyncio +async def test_w7_delete_relation_returns_false_when_absent(): + store = InMemoryLineageGraphStore() + deleted = await store.delete_relation("Alice", "Bob", "knows") + assert deleted is False