Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 97 additions & 30 deletions aperag/domains/retrieval/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
Phase 2 hard-cut. Preserves the existing pipeline shape and async
semantics byte-for-byte — no algorithmic change (Non-goal 2).

The knowledge-graph recall path consumes its provider through the
``GraphSearchContract`` protocol declared in
``aperag.domains.retrieval.ports``. The retrieval domain must not
import ``aperag.domains.knowledge_graph`` directly — doing so would
re-establish a cross-domain static dependency. Instead the
``_graph_search`` method type-binds to the Protocol; the concrete
graphindex service instance structurally satisfies it at runtime.
``aperag.domains.knowledge_graph.graphindex.*`` stays legal here because it is infrastructure,
not a forbidden aggregate.
Wave 6 #33 chunk 3 (per architect Option C ruling msg=6fccd9ab):
the knowledge-graph recall path now routes through the new
:class:`aperag.indexing.graph.LineageGraphStore` Protocol
(``query_entities_by_keyword`` + ``expand_neighbors_n_hops``)
instead of the legacy ``GraphIndexService.query_context``. The
retrieval pipeline composes context text from
``EntityWithLineage`` / ``RelationWithLineage`` directly — see
:func:`_render_graph_context_text`. The legacy
``aperag.domains.knowledge_graph.graphindex`` package is retained
in the codebase because its UI / curation methods (``get_labels`` /
``get_knowledge_graph`` / ``merge_entities``) still serve other
flows (knowledge_graph/service.py + graph_curation/*); the retrieval
side has no remaining import of it.
"""

from __future__ import annotations
Expand All @@ -39,7 +43,6 @@
from aperag.config import build_vector_db_context, settings
from aperag.db.ops import async_db_ops
from aperag.domains.retrieval.context.context import ContextManager
from aperag.domains.retrieval.ports import GraphSearchContract
from aperag.domains.retrieval.schemas import SearchRequest, SearchResultItem, SearchResultMetadata
from aperag.exceptions import ValidationException
from aperag.llm.embed.base_embedding import get_collection_embedding_service_sync
Expand Down Expand Up @@ -74,17 +77,61 @@ class CollectionRow(Protocol):
config: Any


def _graph_search_service_for(collection: CollectionRow) -> GraphSearchContract:
"""Build the graph-search provider for a collection.
def _render_graph_context_text(entities: list[Any], relations: list[Any]) -> str:
"""Compose a LightRAG-style text block from entities + relations.

The import is kept local so the pipeline module does not pay the
graphindex import tax unless a graph recall is actually requested.
The return type is annotated as the Protocol so the boundary is
explicit: the caller only sees ``query_context`` and nothing else.
Mirrors the legacy ``GraphIndexService.query_context`` rendering
convention so downstream RAG prompts that expect the
``-----Entities (KG)----- / -----Relationships (KG)-----``
sectioned text continue to work after the Wave 6 #33 chunk 3
migration.

``EntityWithLineage`` / ``RelationWithLineage`` carry their
description across multiple ``DescriptionPart`` rows (one per
contributing parse). We join the non-empty fragments with `` | ``
so the rendered context surfaces every contribution; an entity
with no descriptions falls back to ``"(no description)"``.
"""
if not entities and not relations:
return ""
lines: list[str] = []
if entities:
lines.append("-----Entities (KG)-----")
for e in entities:
desc_parts = [p.text.strip() for p in (e.description_parts or ()) if p.text and p.text.strip()]
desc = " | ".join(desc_parts) or "(no description)"
type_label = e.type or "entity"
lines.append(f"- [{type_label}] {e.name} — {desc}")
if relations:
if lines:
lines.append("")
lines.append("-----Relationships (KG)-----")
for r in relations:
desc_parts = [p.text.strip() for p in (r.description_parts or ()) if p.text and p.text.strip()]
desc = " | ".join(desc_parts) or "(no description)"
lines.append(f"- {r.source} → {r.target}: {desc}")
return "\n".join(lines)


def _build_lineage_graph_store_for(collection: CollectionRow) -> Any:
"""Build a :class:`LineageGraphStore` for ``collection`` using the
same backend dispatch the indexing worker_factory uses. The import
is kept local so the pipeline module does not pay the indexing
import tax unless a graph recall is actually requested.

Wave 6 #33 chunk 3 (per architect Option C ruling msg=6fccd9ab):
retrieval-side LightRAG-style query routes through the new
LineageGraphStore Protocol (`query_entities_by_keyword` +
`expand_neighbors_n_hops`) instead of the legacy
``GraphIndexService.query_context``.
"""
from aperag.domains.knowledge_graph.graphindex.integration import make_service_for_collection
from aperag.indexing.worker_factory import (
_build_lineage_graph_store,
_resolve_graph_backend_type,
)

return make_service_for_collection(collection) # type: ignore[return-value]
backend_type = _resolve_graph_backend_type(collection)
return _build_lineage_graph_store(backend_type=backend_type, collection=collection)


def _deduplicate_vision_results(results: List[DocumentWithScore]) -> List[DocumentWithScore]:
Expand Down Expand Up @@ -414,25 +461,45 @@ async def _graph_search(
query: str,
top_k: int,
) -> List[DocumentWithScore]:
"""Knowledge-graph retrieval path. Always routes to graphindex v2
through the ``GraphSearchContract`` protocol.

A collection that hasn't been indexed yet returns no context;
this is the correct behaviour — search pipelines compose
(vector + graph + fulltext), and a blank graph just means
"graph contributes nothing this time", not "fall back to
something stale".
"""Knowledge-graph retrieval path via the new
:class:`aperag.indexing.graph.LineageGraphStore` Protocol.

Wave 6 #33 chunk 3 (per architect Option C ruling msg=6fccd9ab):
replaces the legacy ``GraphIndexService.query_context`` flow
with a two-step composition:

1. ``query_entities_by_keyword(query, top_k)`` — anchor entities
via lexical recall on the lineage store. The retrieval
pipeline owns its own embedder and does not need a backend
vector index here (vector recall was honestly deferred per
chunk 2 ruling — Wave 4 lineage schema has no entity-vector
column).
2. ``expand_neighbors_n_hops(entity_names, hops=1)`` — pull the
direct neighbours + connecting relations so the rendered
context block includes both anchor entities and their
one-hop graph context.

A collection that hasn't been indexed yet (or yields no
keyword anchors) returns ``[]``; this is the correct behaviour —
search pipelines compose (vector + graph + fulltext), and a
blank graph just means "graph contributes nothing this time",
not "fall back to something stale".
"""
config = parseCollectionConfig(collection.config)
if not config.enable_knowledge_graph:
logger.warning(f"Collection {collection.id} does not have knowledge graph enabled")
return []

svc: GraphSearchContract = _graph_search_service_for(collection)
ctx = await svc.query_context(collection_id=str(collection.id), query=query, top_k=top_k)
if not ctx.text:
store = _build_lineage_graph_store_for(collection)
anchors = await store.query_entities_by_keyword(query=query, top_k=top_k)
if not anchors:
return []
anchor_names = [e.name for e in anchors]
entities, relations = await store.expand_neighbors_n_hops(entity_names=anchor_names, hops=1)
text = _render_graph_context_text(entities, relations)
if not text:
return []
return [DocumentWithScore(text=ctx.text, metadata={"recall_type": "graph_search"})]
return [DocumentWithScore(text=text, metadata={"recall_type": "graph_search"})]

async def _summary_search(
self,
Expand Down
Loading
Loading