Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 88 additions & 2 deletions aperag/indexing/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,25 @@ class EntityWithLineage:
storage representation (Nebula JSON STRING, Neo4j list-of-MAP,
in-memory dict) and the algorithm in :class:`GraphModalityWorker`
only ever sees this canonical form.

Wave 7 (W7-1) adds ``compacted_description``: a derived cache
column written by ``GraphIndexCompactor`` (Wave 7 W7-2) when the
aggregate ``description_parts`` text grows past
``max_description_chars`` / ``summarize_at_fragments`` thresholds.
The compactor calls an LLM to summarise the union of the parts and
persists the result here so the vector embedding step (Wave 7 W7-3)
has a bounded, coherent text source. The field is **derived**:
``description_parts`` remains the per-doc source of truth, and the
compacted value can be reproduced by re-running the compactor over
the parts at any time. NULL means "not yet compacted" — readers
should fall back to joining ``description_parts.text``.
"""

name: str
entity_type: str
source_lineage: tuple[LineageMember, ...]
description_parts: tuple[DescriptionPart, ...]
compacted_description: str | None = None


@dataclass
Expand All @@ -282,6 +295,7 @@ class RelationWithLineage:
relation_type: str
evidence_lineage: tuple[LineageMember, ...]
description_parts: tuple[DescriptionPart, ...]
compacted_description: str | None = None


# ---------------------------------------------------------------------
Expand Down Expand Up @@ -494,24 +508,63 @@ async def gc_entity_if_orphan(self, entity_name: str) -> bool:
async def gc_relation_if_orphan(self, source: str, target: str, type: str) -> bool:
"""Same as :meth:`gc_entity_if_orphan` for a relation edge."""

async def delete_entity(self, entity_name: str) -> bool:
"""Wave 7 W7-1: unconditionally delete the entity row.

Used by :class:`GraphCurationService.merge_entities` (W7-6) to
remove the source entities after a user-driven merge — we want
the row gone regardless of any remaining lineage members,
because the canonical entity has absorbed those parts. The
``gc_*_if_orphan`` family is for indexer-side garbage
collection (only delete when lineage is empty); curation merge
needs an explicit unconditional delete that complements it.

Returns ``True`` when a delete actually ran (the row existed),
``False`` when the name was not present (idempotent retry).
"""

async def delete_relation(self, source: str, target: str, type: str) -> bool:
"""Wave 7 W7-1: unconditionally delete the relation row.

Symmetric to :meth:`delete_entity` for a relation edge —
same use case (merge of relations under a curation flow).
"""

async def upsert_entity_with_lineage(
self,
*,
record: EntityRecord,
lineage: LineageMember,
compacted_description: str | None = None,
) -> None:
"""Add ``lineage`` to ``source_lineage`` and a corresponding
``DescriptionPart`` to ``description_parts``. Creates the
entity if absent. Replaces an existing member with the same
``(document_id, parse_version)`` key."""
``(document_id, parse_version)`` key.

Wave 7 (W7-1) ``compacted_description`` semantics: ``None``
(default) means "preserve any existing compacted_description on
the row" — Wave 4 indexer-side per-chunk upserts MUST NOT clear
a compacted value computed by ``GraphIndexCompactor`` (W7-2) on
a previous sync. A non-None string overwrites the column.
Backends implement this with a ``COALESCE``-style update so the
Wave 4 hot path stays a single SQL statement (per Wave 4 §C.3
forward-only retry contract).
"""

async def upsert_relation_with_lineage(
self,
*,
record: RelationRecord,
lineage: LineageMember,
compacted_description: str | None = None,
) -> None:
"""Symmetric to :meth:`upsert_entity_with_lineage` for relations."""
"""Symmetric to :meth:`upsert_entity_with_lineage` for relations.

Wave 7 (W7-1) ``compacted_description`` semantics match the
entity variant: ``None`` preserves the existing column value;
a non-None string overwrites.
"""

async def get_entity(self, entity_name: str) -> EntityWithLineage | None:
"""Read-path helper used by tests / read primitives. Returns
Expand Down Expand Up @@ -615,6 +668,7 @@ class _InMemoryEntityRow:
entity_type: str
description_parts: dict[tuple[str, str], DescriptionPart] = field(default_factory=dict)
source_lineage: dict[tuple[str, str], LineageMember] = field(default_factory=dict)
compacted_description: str | None = None


@dataclass
Expand All @@ -624,6 +678,7 @@ class _InMemoryRelationRow:
relation_type: str
description_parts: dict[tuple[str, str], DescriptionPart] = field(default_factory=dict)
evidence_lineage: dict[tuple[str, str], LineageMember] = field(default_factory=dict)
compacted_description: str | None = None


class InMemoryLineageGraphStore:
Expand Down Expand Up @@ -729,13 +784,31 @@ async def gc_relation_if_orphan(self, source: str, target: str, type: str) -> bo
del self._relations[rel_key]
return True

# ---- unconditional delete (Wave 7 W7-1) -------------------------

async def delete_entity(self, entity_name: str) -> bool:
async with self._guard:
if entity_name not in self._entities:
return False
del self._entities[entity_name]
return True

async def delete_relation(self, source: str, target: str, type: str) -> bool:
rel_key = (source, target, type)
async with self._guard:
if rel_key not in self._relations:
return False
del self._relations[rel_key]
return True

# ---- upserts ----------------------------------------------------

async def upsert_entity_with_lineage(
self,
*,
record: EntityRecord,
lineage: LineageMember,
compacted_description: str | None = None,
) -> None:
async with self._guard:
row = self._entities.get(record.name)
Expand All @@ -752,12 +825,16 @@ async def upsert_entity_with_lineage(
parse_version=lineage.parse_version,
text=record.description,
)
# Wave 7 W7-1: ``None`` preserves; non-None overwrites.
if compacted_description is not None:
row.compacted_description = compacted_description

async def upsert_relation_with_lineage(
self,
*,
record: RelationRecord,
lineage: LineageMember,
compacted_description: str | None = None,
) -> None:
rel_key = record.relation_key()
async with self._guard:
Expand All @@ -775,6 +852,9 @@ async def upsert_relation_with_lineage(
parse_version=lineage.parse_version,
text=record.description,
)
# Wave 7 W7-1: ``None`` preserves; non-None overwrites.
if compacted_description is not None:
row.compacted_description = compacted_description

# ---- read path --------------------------------------------------

Expand All @@ -788,6 +868,7 @@ async def get_entity(self, entity_name: str) -> EntityWithLineage | None:
entity_type=row.entity_type,
source_lineage=tuple(_sorted_lineage(row.source_lineage.values())),
description_parts=tuple(_sorted_description_parts(row.description_parts.values())),
compacted_description=row.compacted_description,
)

async def get_relation(self, source: str, target: str, type: str) -> RelationWithLineage | None:
Expand All @@ -802,6 +883,7 @@ async def get_relation(self, source: str, target: str, type: str) -> RelationWit
relation_type=row.relation_type,
evidence_lineage=tuple(_sorted_lineage(row.evidence_lineage.values())),
description_parts=tuple(_sorted_description_parts(row.description_parts.values())),
compacted_description=row.compacted_description,
)

# ------------------------------------------------------------------
Expand All @@ -828,6 +910,7 @@ async def query_entities_by_keyword(
entity_type=row.entity_type,
source_lineage=tuple(_sorted_lineage(row.source_lineage.values())),
description_parts=tuple(_sorted_description_parts(row.description_parts.values())),
compacted_description=row.compacted_description,
)
)
# Deterministic ordering for stable test output.
Expand Down Expand Up @@ -859,6 +942,7 @@ async def expand_neighbors_n_hops(
entity_type=row.entity_type,
source_lineage=tuple(_sorted_lineage(row.source_lineage.values())),
description_parts=tuple(_sorted_description_parts(row.description_parts.values())),
compacted_description=row.compacted_description,
)

current = set(frontier)
Expand All @@ -874,6 +958,7 @@ async def expand_neighbors_n_hops(
relation_type=rel_row.relation_type,
evidence_lineage=tuple(_sorted_lineage(rel_row.evidence_lineage.values())),
description_parts=tuple(_sorted_description_parts(rel_row.description_parts.values())),
compacted_description=rel_row.compacted_description,
)
for neighbour in (src, tgt):
if neighbour in seen_entities:
Expand All @@ -886,6 +971,7 @@ async def expand_neighbors_n_hops(
entity_type=row.entity_type,
source_lineage=tuple(_sorted_lineage(row.source_lineage.values())),
description_parts=tuple(_sorted_description_parts(row.description_parts.values())),
compacted_description=row.compacted_description,
)
next_frontier.add(neighbour)
if not next_frontier:
Expand Down
Loading
Loading