Skip to content

Commit 1091172

Browse files
authored
feat(Wave 8 W8-1 #12): activate search_relations vector recall (#1767)
Replaces the Wave 7 conservative 1-hop expansion in ``GraphSearchService.search_relations`` with direct vector recall filtered on ``Eq("indexer", "graph_relation")``. Task #3 (PR #1757) has been writing relation vectors all along; this is finally the consumer side, completing the LightRAG-style full recall the spec §K.12.6 expected (huangheng W8-1 sediment from task #3 PR CR msg=d4ad0259). Algorithm --------- 1. Embed query via ``embedder.embed_query``. 2. ``vector_connector.search`` with ``Eq("indexer", "graph_relation")`` filter + the same threshold/top_k knobs ``search_entities`` uses. 3. Parse each hit's payload (``entity_name="src->tgt"`` + ``entity_type=relation_type`` per task #3 writer ``aperag/indexing/graph.py:1631``) by splitting on ``->``; skip hits that don't parse cleanly. 4. Reverse-lookup full ``RelationWithLineage`` via ``asyncio.gather(*store.get_relation(...))`` (architect ratify approach (a), msg=cf860ae4 — preserves ``compose_context`` byte-parity rendering required by the task #5 invariant). 5. Drop ``None`` results (edge GC'd between sync and search). Failure paths (embedder / vector store down) swallow and return ``[]``, mirroring ``search_entities``. §K.12 invariant cross-check --------------------------- - #4 vector store via abstraction: uses ``VectorStoreConnector``, no Qdrant-specific imports. - #5 indexer filter: ``Eq("indexer", "graph_relation")`` pinned in ``test_search_relations_uses_graph_relation_filter``. - #11 D-3 read-only: never invokes any mutation method. - #12 grep-zero LightRAG: 0 hits in the new code. - All other invariants n/a (read-only path, no schema changes). Test coverage ------------- 10 new/replaced cases in ``test_graph_search_service.py``: - empty query / zero-topk short-circuit (no embed, no search) - ``Eq("indexer","graph_relation")`` filter + threshold + top_k pinning - payload parse + ``store.get_relation`` reverse lookup happy path - hit-order preservation + payload-key dedup - payload skip cases: missing payload, missing arrow, missing entity_type, empty source, empty target - GC tolerance: edge deleted between sync and search - embedder failure swallowed - vector store failure swallowed Plus 14 pre-existing ``search_entities`` / ``get_subgraph`` / ``compose_context`` tests retained — total 24/24 pass.
1 parent f3f2d69 commit 1091172

2 files changed

Lines changed: 259 additions & 40 deletions

File tree

aperag/indexing/graph_search_service.py

Lines changed: 111 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,21 @@
5757
candidates are more expensive than false-positive recall."""
5858

5959

60-
# Vector point indexer tag — pinned by §K.12 invariant #5. Task #3
60+
# Vector point indexer tags — pinned by §K.12 invariant #5. Task #3
6161
# writes graph-entity vectors with ``payload["indexer"] ==
62-
# "graph_entity"``; this service filters on the same tag so chunk /
63-
# summary vectors sharing the physical collection never bleed into
64-
# graph recall.
62+
# "graph_entity"`` and graph-relation vectors with
63+
# ``payload["indexer"] == "graph_relation"``; this service filters on
64+
# the matching tag so chunk / summary vectors sharing the physical
65+
# collection never bleed into graph recall.
6566
GRAPH_ENTITY_INDEXER: str = "graph_entity"
67+
GRAPH_RELATION_INDEXER: str = "graph_relation"
68+
69+
70+
# ``entity_name`` payload field for ``graph_relation`` points carries the
71+
# composite ``f"{source}->{target}"`` per the task #3 writer
72+
# (``aperag/indexing/graph.py:1631``). The arrow has no surrounding
73+
# whitespace; we split exactly once to recover the endpoints.
74+
RELATION_PAYLOAD_ARROW: str = "->"
6675

6776

6877
class GraphSearchService:
@@ -73,8 +82,11 @@ class GraphSearchService:
7382
7483
* :meth:`search_entities` — embed the query, ANN-search the entity
7584
vector index, fetch the matching ``EntityWithLineage`` rows.
76-
* :meth:`search_relations` — derive relations attached to the
77-
vector-recalled entities (1-hop expansion).
85+
* :meth:`search_relations` — embed the query, ANN-search the
86+
relation vector index, fetch the matching ``RelationWithLineage``
87+
rows. Wave 8 W8-1 (task #12) replaced the Wave 7 conservative
88+
1-hop expansion with direct vector recall now that task #3 is
89+
writing relation vectors.
7890
* :meth:`get_subgraph` — wrap ``expand_neighbors_n_hops`` as a
7991
stable read primitive for MCP tools (task #7).
8092
* :meth:`compose_context` — render the ``EntityWithLineage`` /
@@ -184,31 +196,107 @@ async def _batch_get_entities(self, names: Sequence[str]) -> list[EntityWithLine
184196
return [e for e in results if e is not None]
185197

186198
# ------------------------------------------------------------------
187-
# search_relations — derived from search_entities
199+
# search_relations — vector recall path (Wave 8 W8-1 task #12)
188200
# ------------------------------------------------------------------
189201

190202
async def search_relations(
191203
self,
192204
query: str,
193205
top_k: int | None = None,
194206
) -> list[RelationWithLineage]:
195-
"""Recall relations attached to the entities that vector-search
196-
retrieved for ``query``.
197-
198-
Vector store does not carry separate relation vectors (Wave 7
199-
scope: only entity vectors are written by task #3; per-relation
200-
vectors are out-of-scope per architect msg=acbd0003 §K.12.5).
201-
We therefore derive relations as the 1-hop expansion of the
202-
entity-search result.
207+
"""Vector-recall the top-K relations matching ``query``.
208+
209+
Wave 8 W8-1 (architect ratify task #12): replaces the Wave 7
210+
conservative 1-hop expansion with a direct ANN search filtered
211+
on ``Eq("indexer", "graph_relation")`` — task #3 (PR #1757) has
212+
been writing relation vectors all along; this is finally the
213+
consumer side. Mirrors :meth:`search_entities` shape for
214+
symmetry.
215+
216+
Hits resolve to full :class:`RelationWithLineage` rows via
217+
per-hit ``store.get_relation`` reverse lookup so
218+
:meth:`compose_context` keeps its byte-parity with the legacy
219+
rendering (per task #5 invariant). Hits whose payload doesn't
220+
parse cleanly (missing arrow / empty endpoints / no
221+
``entity_type``) are dropped silently — better to skip a hit
222+
than to surface a relation we can't reconstruct.
203223
"""
204-
entities = await self.search_entities(query, top_k=top_k)
205-
if not entities:
224+
text = (query or "").strip()
225+
if not text:
226+
return []
227+
k = top_k if top_k is not None else self._top_k
228+
if k <= 0:
229+
return []
230+
231+
try:
232+
embedding = await asyncio.to_thread(self._embedder.embed_query, text)
233+
except Exception:
234+
logger.warning(
235+
"graph_search_service: embed failed for relation query=%r",
236+
text,
237+
exc_info=True,
238+
)
206239
return []
207-
_neighbour_entities, relations = await self._store.expand_neighbors_n_hops(
208-
entity_names=[e.name for e in entities],
209-
hops=1,
240+
241+
request = QueryRequest(
242+
embedding=embedding,
243+
top_k=k,
244+
flt=Eq("indexer", GRAPH_RELATION_INDEXER),
245+
score_threshold=self._score_threshold or None,
246+
)
247+
try:
248+
hits = await asyncio.to_thread(self._vector_connector.search, request)
249+
except Exception:
250+
logger.warning(
251+
"graph_search_service: vector search failed for relation query=%r",
252+
text,
253+
exc_info=True,
254+
)
255+
return []
256+
257+
keys = self._relation_keys_from_hits(hits)
258+
if not keys:
259+
return []
260+
return await self._batch_get_relations(keys)
261+
262+
@staticmethod
263+
def _relation_keys_from_hits(
264+
hits: Iterable[SearchHit],
265+
) -> list[tuple[str, str, str]]:
266+
# Preserve hit order (vector store ranked by score) and de-dup
267+
# so the same edge returned twice (e.g. alias-redirect side-effect)
268+
# only fetches once.
269+
seen: set[tuple[str, str, str]] = set()
270+
ordered: list[tuple[str, str, str]] = []
271+
for hit in hits:
272+
payload = hit.payload or {}
273+
composite = payload.get("entity_name") or payload.get("name")
274+
relation_type = payload.get("entity_type") or payload.get("relation_type")
275+
if not composite or not relation_type:
276+
continue
277+
composite_str = str(composite)
278+
if RELATION_PAYLOAD_ARROW not in composite_str:
279+
continue
280+
source, _, target = composite_str.partition(RELATION_PAYLOAD_ARROW)
281+
if not source or not target:
282+
continue
283+
key = (source, target, str(relation_type))
284+
if key in seen:
285+
continue
286+
seen.add(key)
287+
ordered.append(key)
288+
return ordered
289+
290+
async def _batch_get_relations(self, keys: Sequence[tuple[str, str, str]]) -> list[RelationWithLineage]:
291+
# Same ``asyncio.gather`` shape as :meth:`_batch_get_entities`:
292+
# N is bounded by ``top_k`` (≤ 10 in practice) so per-key
293+
# round-trip cost is acceptable, and the Protocol surface stays
294+
# narrow (no batch-get method).
295+
results = await asyncio.gather(
296+
*(self._store.get_relation(s, t, ty) for s, t, ty in keys),
297+
return_exceptions=False,
210298
)
211-
return relations
299+
return [r for r in results if r is not None]
212300

213301
# ------------------------------------------------------------------
214302
# get_subgraph — MCP-facing read primitive (task #7)
@@ -338,5 +426,7 @@ def build_graph_search_service_for(collection: Any) -> GraphSearchService:
338426
"DEFAULT_TOP_K",
339427
"DEFAULT_SCORE_THRESHOLD",
340428
"GRAPH_ENTITY_INDEXER",
429+
"GRAPH_RELATION_INDEXER",
430+
"RELATION_PAYLOAD_ARROW",
341431
"build_graph_search_service_for",
342432
]

tests/unit_test/indexing/test_graph_search_service.py

Lines changed: 148 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,23 @@
1313
# limitations under the License.
1414

1515
"""Unit tests for ``aperag.indexing.graph_search_service`` — Wave 7
16-
task #5.
16+
task #5 + Wave 8 W8-1 task #12.
1717
18-
Pins the Wave 7 vector-recall contract:
18+
Pins the vector-recall contract:
1919
2020
* ``search_entities`` embeds the query, ANN-searches the
2121
``graph_entity`` indexer slice (filter + threshold pinned), then
2222
fetches matching ``EntityWithLineage`` rows via per-name
2323
``get_entity`` (asyncio.gather). De-dups payload names so an aliased
2424
entity returned twice doesn't double-fetch.
25-
* ``search_relations`` derives relations as the 1-hop expansion of the
26-
vector-recalled entities — vector store carries no per-relation
27-
vectors in Wave 7.
25+
* ``search_relations`` (Wave 8 W8-1 upgrade) embeds the query,
26+
ANN-searches the ``graph_relation`` indexer slice, parses each hit's
27+
``entity_name="src->tgt"`` + ``entity_type=relation_type`` payload
28+
per the task #3 writer, then reverse-looks-up
29+
``RelationWithLineage`` via ``store.get_relation``. Hits whose
30+
payload doesn't parse cleanly are skipped silently. Replaces the
31+
Wave 7 conservative 1-hop expansion path now that task #3 is
32+
writing relation vectors.
2833
* ``get_subgraph`` is a thin pass-through to
2934
``expand_neighbors_n_hops`` for MCP / retrieval callers.
3035
* ``compose_context`` renders byte-for-byte the same LightRAG-style
@@ -124,15 +129,22 @@ def __init__(
124129
self,
125130
entities: dict[str, EntityWithLineage] | None = None,
126131
expansions: dict[tuple[str, ...], tuple[list[EntityWithLineage], list[RelationWithLineage]]] | None = None,
132+
relations: dict[tuple[str, str, str], RelationWithLineage] | None = None,
127133
) -> None:
128134
self._entities = entities or {}
129135
self._expansions = expansions or {}
136+
self._relations = relations or {}
130137
self.get_entity_calls: list[str] = []
138+
self.get_relation_calls: list[tuple[str, str, str]] = []
131139

132140
async def get_entity(self, entity_name: str) -> EntityWithLineage | None:
133141
self.get_entity_calls.append(entity_name)
134142
return self._entities.get(entity_name)
135143

144+
async def get_relation(self, source: str, target: str, type: str) -> RelationWithLineage | None:
145+
self.get_relation_calls.append((source, target, type))
146+
return self._relations.get((source, target, type))
147+
136148
async def expand_neighbors_n_hops(
137149
self,
138150
*,
@@ -185,13 +197,14 @@ def _make_service(
185197
*,
186198
entities: dict[str, EntityWithLineage] | None = None,
187199
expansions: dict[tuple[str, ...], tuple[list[EntityWithLineage], list[RelationWithLineage]]] | None = None,
200+
relations: dict[tuple[str, str, str], RelationWithLineage] | None = None,
188201
hits: list[SearchHit] | None = None,
189202
embedder: Any | None = None,
190203
connector: Any | None = None,
191204
top_k: int = 10,
192205
score_threshold: float = 0.0,
193206
) -> tuple[GraphSearchService, _FakeStore, _FakeVectorConnector | Any, _FakeEmbedder | Any]:
194-
store = _FakeStore(entities=entities, expansions=expansions)
207+
store = _FakeStore(entities=entities, expansions=expansions, relations=relations)
195208
connector = connector if connector is not None else _FakeVectorConnector(hits=hits)
196209
embedder = embedder if embedder is not None else _FakeEmbedder()
197210
service = GraphSearchService(
@@ -322,31 +335,147 @@ async def test_search_entities_swallows_vector_store_failure():
322335

323336

324337
# ---------------------------------------------------------------------
325-
# search_relations
338+
# search_relations (Wave 8 W8-1 vector recall path)
326339
# ---------------------------------------------------------------------
327340

328341

329342
@pytest.mark.asyncio
330-
async def test_search_relations_empty_when_no_entities_match():
331-
service, _, _, _ = _make_service(entities={}, hits=[])
332-
assert await service.search_relations("query") == []
343+
async def test_search_relations_empty_query_returns_empty():
344+
service, _, connector, embedder = _make_service(entities={})
345+
assert await service.search_relations("") == []
346+
assert await service.search_relations(" ") == []
347+
assert connector.searches == []
348+
assert embedder.calls == []
333349

334350

335351
@pytest.mark.asyncio
336-
async def test_search_relations_returns_one_hop_expansion_of_entity_results():
337-
a = _entity("Alpha")
338-
b = _entity("Beta")
339-
rel = _relation("Alpha", "Beta")
340-
service, _, _, _ = _make_service(
341-
entities={"Alpha": a, "Beta": b},
352+
async def test_search_relations_zero_topk_returns_empty():
353+
service, _, connector, embedder = _make_service(entities={})
354+
assert await service.search_relations("query", top_k=0) == []
355+
assert connector.searches == []
356+
assert embedder.calls == []
357+
358+
359+
@pytest.mark.asyncio
360+
async def test_search_relations_uses_graph_relation_filter():
361+
"""Wave 8 W8-1: filter pinned to ``Eq("indexer", "graph_relation")``
362+
so the ANN never bleeds into entity / chunk vectors sharing the
363+
physical collection."""
364+
service, _, connector, _ = _make_service(entities={}, hits=[], score_threshold=0.42, top_k=5)
365+
await service.search_relations("query")
366+
assert len(connector.searches) == 1
367+
request = connector.searches[0].request
368+
from aperag.indexing.graph_search_service import GRAPH_RELATION_INDEXER
369+
370+
assert request.flt == Eq("indexer", GRAPH_RELATION_INDEXER)
371+
assert request.score_threshold == 0.42
372+
assert request.top_k == 5
373+
374+
375+
@pytest.mark.asyncio
376+
async def test_search_relations_parses_payload_and_resolves_via_get_relation():
377+
"""Hit payload carries ``entity_name="src->tgt"`` +
378+
``entity_type=relation_type`` (per task #3 writer
379+
``aperag/indexing/graph.py:1631``); we split, reverse-lookup via
380+
``store.get_relation`` and return the full ``RelationWithLineage``
381+
so :meth:`compose_context` keeps its byte-parity rendering."""
382+
rel = _relation("Alpha", "Beta", relation_type="founded")
383+
service, store, _, _ = _make_service(
384+
relations={("Alpha", "Beta", "founded"): rel},
342385
hits=[
343-
SearchHit(id="1", score=0.9, payload={"entity_name": "Alpha"}),
344-
SearchHit(id="2", score=0.8, payload={"entity_name": "Beta"}),
386+
SearchHit(
387+
id="1",
388+
score=0.9,
389+
payload={"entity_name": "Alpha->Beta", "entity_type": "founded"},
390+
),
345391
],
346-
expansions={("Alpha", "Beta"): ([a, b], [rel])},
347392
)
348393
relations = await service.search_relations("query")
349394
assert relations == [rel]
395+
assert store.get_relation_calls == [("Alpha", "Beta", "founded")]
396+
397+
398+
@pytest.mark.asyncio
399+
async def test_search_relations_preserves_hit_order_and_dedupes():
400+
rel_ab = _relation("Alpha", "Beta", relation_type="founded")
401+
rel_bc = _relation("Beta", "Gamma", relation_type="acquired")
402+
service, store, _, _ = _make_service(
403+
relations={
404+
("Alpha", "Beta", "founded"): rel_ab,
405+
("Beta", "Gamma", "acquired"): rel_bc,
406+
},
407+
hits=[
408+
SearchHit(id="1", score=0.9, payload={"entity_name": "Beta->Gamma", "entity_type": "acquired"}),
409+
SearchHit(id="2", score=0.8, payload={"entity_name": "Alpha->Beta", "entity_type": "founded"}),
410+
# Duplicate of the first hit (e.g. alias-redirect side-effect)
411+
# — must dedupe so we don't double-fetch the same edge.
412+
SearchHit(id="3", score=0.7, payload={"entity_name": "Beta->Gamma", "entity_type": "acquired"}),
413+
],
414+
)
415+
relations = await service.search_relations("query")
416+
assert [(r.source, r.target) for r in relations] == [
417+
("Beta", "Gamma"),
418+
("Alpha", "Beta"),
419+
]
420+
assert store.get_relation_calls == [("Beta", "Gamma", "acquired"), ("Alpha", "Beta", "founded")]
421+
422+
423+
@pytest.mark.asyncio
424+
async def test_search_relations_skips_payload_missing_arrow_or_type():
425+
"""A hit whose payload doesn't parse cleanly is dropped silently —
426+
better to skip than to surface an edge we can't reconstruct."""
427+
rel_ab = _relation("Alpha", "Beta", relation_type="founded")
428+
service, store, _, _ = _make_service(
429+
relations={("Alpha", "Beta", "founded"): rel_ab},
430+
hits=[
431+
SearchHit(id="ghost1", score=1.0, payload={}), # no payload at all
432+
SearchHit(
433+
id="ghost2", score=0.99, payload={"entity_name": "AlphaBeta", "entity_type": "founded"}
434+
), # missing arrow
435+
SearchHit(id="ghost3", score=0.98, payload={"entity_name": "Alpha->Beta"}), # missing entity_type
436+
SearchHit(
437+
id="ghost4", score=0.97, payload={"entity_name": "->Beta", "entity_type": "founded"}
438+
), # empty source
439+
SearchHit(
440+
id="ghost5", score=0.96, payload={"entity_name": "Alpha->", "entity_type": "founded"}
441+
), # empty target
442+
SearchHit(id="real", score=0.9, payload={"entity_name": "Alpha->Beta", "entity_type": "founded"}),
443+
],
444+
)
445+
relations = await service.search_relations("query")
446+
assert [(r.source, r.target) for r in relations] == [("Alpha", "Beta")]
447+
# Only the real hit hit the store.
448+
assert store.get_relation_calls == [("Alpha", "Beta", "founded")]
449+
450+
451+
@pytest.mark.asyncio
452+
async def test_search_relations_drops_edge_gced_from_store():
453+
"""Vector hit for an edge that was deleted between sync and search
454+
→ store.get_relation returns None → drop from result, no exception."""
455+
service, _, _, _ = _make_service(
456+
relations={}, # store empty: every get_relation returns None
457+
hits=[
458+
SearchHit(id="1", score=0.9, payload={"entity_name": "Alpha->Beta", "entity_type": "founded"}),
459+
],
460+
)
461+
assert await service.search_relations("query") == []
462+
463+
464+
@pytest.mark.asyncio
465+
async def test_search_relations_swallows_embedder_failure():
466+
service, store, connector, _ = _make_service(entities={}, embedder=_FailingEmbedder())
467+
assert await service.search_relations("query") == []
468+
assert connector.searches == []
469+
assert store.get_relation_calls == []
470+
471+
472+
@pytest.mark.asyncio
473+
async def test_search_relations_swallows_vector_store_failure():
474+
service, store, connector, embedder = _make_service(entities={}, connector=_FailingVectorConnector())
475+
assert await service.search_relations("query") == []
476+
assert embedder.calls == ["query"]
477+
assert len(connector.searches) == 1
478+
assert store.get_relation_calls == []
350479

351480

352481
# ---------------------------------------------------------------------

0 commit comments

Comments
 (0)