Skip to content

Commit 08d9d3b

Browse files
earayuclaude
andauthored
feat(celery Wave 7 #8): retrieval/curation cutover + 3 wiring points (#1762)
* feat(celery Wave 7 #8): retrieval/curation cutover + 3 wiring points Wave 7 §K.12.8 task #8 — three wiring points lit at the same time so PR #1758's inseparability gate (alias-redirect on every indexer write) and PR #1756's vector recall (LightRAG-style semantic search plus 1-hop traversal) become production-alive at the same merge. What lands * ``worker_factory._build_lineage_graph_store`` now returns a ``LineageGraphStoreWithAliasRedirect`` decorating the raw per-collection backend (Wave 7 §K.12 invariant #9). Callers that need the raw inner store — the merger writes canonical names directly and must not be intercepted — go through the new ``_build_lineage_graph_store_inner``. * ``retrieval/pipeline._graph_search`` now composes ``GraphSearchService.search_entities`` (vector recall) + ``get_subgraph`` (1-hop traversal) + ``GraphSearchService.compose_context`` (legacy LightRAG-style rendering). The render is byte-for-byte identical to Wave 6's ``_render_graph_context_text`` (locked by the byte-parity test in PR #1756) so downstream RAG prompts are zero-functional-change — only now vector recall actually happens. * ``GraphService.merge_entities`` (the route layer for ``POST /graphs/nodes/merge``) delegates to ``LineageEntityMerger``. Backward-compat response shape preserved (``target_entity_id`` / ``description`` / ``source_chunk_ids`` / ``edges_*``); chunk ids are recovered from the target's lineage after step 6a re-anchors the source parts under the canonical name. ``edges_redirected`` / ``edges_collapsed`` surface ``0`` because edge re-anchoring is handled transparently by the alias-redirect decorator at indexer write time, not as part of the merge action. * New ``build_lineage_entity_merger_for(collection)`` factory in ``aperag/graph_curation/lineage_merge.py`` resolves the six dependencies (raw inner store / alias repo / compactor / vector connector / embedder / LLM) the merger expects, lifting the ``_SyncEmbedderShim`` pattern out of ``MergeCandidateDetector``'s factory so merger and detector share one shim. Tests * ``tests/unit_test/indexing/test_wave7_task8_wiring.py`` — eight integration tests pinning each of the three wiring points so a future refactor cannot silently regress any of them: decorator wraps inner store, inner factory still returns raw backend, pipeline composes the three GraphSearchService calls in order, KG gate still short-circuits, factory failures degrade to empty, merger delegation preserves backward-compat shape, fallback to unified description when no compaction, alias cycle surfaces as ValueError → 400. * Wave 6 ``test_graph_search_migration.py`` — two existing tests updated to mock the new ``build_graph_search_service_for`` / ``search_entities`` / ``get_subgraph`` boundary instead of the retired keyword-only path. The grep-zero migration assertion is unchanged and passes (legacy import still 0). 12-invariant cross-check (§K.12 / huangheng msg=fcf580a6) * #4 vector store via ``VectorStoreConnectorAdaptor`` ✅ — pipeline composes through ``GraphSearchService`` which already abides; no direct Qdrant import added. * #5 payload ``indexer="graph_entity"`` filter pattern ✅ — inherited from ``GraphSearchService.search_entities``. * #9 ``upsert_entity_with_lineage`` alias redirect ✅ — every indexer / read path now receives the decorated store; merger bypasses via the inner-only factory so canonical writes are not intercepted. * #11 candidate detection write-only / merge read paths ✅ — the cutover preserves the read/write boundary. * #12 grep-zero LightRAG ✅ — code + tests stay LightRAG-clean. ``aperag/graph_curation/*`` and ``domains/knowledge_graph/service.py:get_knowledge_graph`` still hold legacy imports for graph-overview / curation-run paths that depend on enumerate-all-entities; deferred to task #10 close-out alongside the Protocol-extension or legacy delete. 4-pattern pre-check matrix * Pattern 1 v1: ``rg "from aperag.domains.knowledge_graph.graphindex"`` count is unchanged in this PR (drops below the threshold task #10 will assert grep-zero); the ``_graph_search`` path no longer imports the legacy package at all. * Pattern 1 v2: per-method matrix — pipeline cutover replaces ``query_entities_by_keyword`` + ``expand_neighbors_n_hops`` with ``search_entities`` + ``get_subgraph``; merge cutover replaces ``GraphIndexService.merge_entities`` with ``LineageEntityMerger.merge_entities``. * Pattern 2: state binding — new merger factory binds to the same Qdrant connector + embedder the indexer write path uses (``_build_collection_graph_vector_writer``). * Pattern 3: factory + decorator pattern verified by the wiring tests so a future split cannot silently regress. simple-stable 4 guardrail * #1 不无限扩范围 — three wiring swaps + one factory; no new endpoints, no new schema, no new Protocol surface. * #2 尽快上线 — task #8 unblocks task #9 (frontend) and task #11 (e2e narrative) without a Protocol-extension prerequisite. * #3 简单稳定 — decorator + factory split keeps the merger's "writes canonical directly" invariant explicit; pipeline cutover reuses the byte-parity contract from PR #1756. * #4 私有化部署免维护 — no operator config required; the ``API_BASE_URL`` env already supports MCP colocation, and the alias-redirect decorator runs on every collection automatically. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * docs(Wave 7 #8): clarify edges_*=0 semantic on merge_nodes route Per architect msg=4af6f66b / PM msg=e75fd00d follow-up: future maintainers grepping merge_nodes_view shouldn't see the hard-coded 0 and suspect a bug. The Wave 7 §K.12 invariant #9 LineageGraphStoreWithAliasRedirect decorator handles edge re-anchoring at indexer write-time, not as part of the merge call, so there is no explicit edge count to surface on the response. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 8e4c679 commit 08d9d3b

7 files changed

Lines changed: 618 additions & 70 deletions

File tree

aperag/domains/knowledge_graph/api/routes.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,13 @@ async def merge_nodes_view(
151151
* The response echoes the merged description **after** LLM
152152
summarization, so the frontend can refresh the entity detail
153153
panel without a second fetch.
154+
* ``edges_redirected`` / ``edges_collapsed`` are always ``0`` by
155+
design (Wave 7 §K.12 invariant #9) — alias redirect happens at
156+
indexer write-time via the
157+
:class:`LineageGraphStoreWithAliasRedirect` decorator, not as
158+
part of the merge call, so the merge action has no "explicit
159+
edge re-anchor count" to surface. The fields are kept on the
160+
response shape for backward-compat with the existing frontend.
154161
"""
155162
entity_ids = payload.get("entity_ids") or []
156163
if not isinstance(entity_ids, list) or len(entity_ids) < 2:

aperag/domains/knowledge_graph/service.py

Lines changed: 95 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -163,24 +163,51 @@ async def merge_entities(
163163
target_entity_id: str,
164164
source_entity_ids: List[str],
165165
) -> Dict[str, Any]:
166-
"""Merge entities and return a summary payload for the UI.
167-
168-
The heavy lifting (structural merge + LLM summary of the merged
169-
description) happens inside
170-
``GraphIndexService.merge_entities``. This layer only validates
171-
the collection and reshapes the DTO into the dict the existing
172-
frontend expects.
166+
"""Merge entities — Wave 7 §K.12.8 task #8 cutover to
167+
:class:`LineageEntityMerger` (PR #1758).
168+
169+
The merger runs the 8-step orchestration inside the new lineage
170+
layer: alias-map writes for transparent indexer redirect (steps
171+
1-2), per-doc parts re-anchor under the canonical name (step 6a,
172+
preserves invariant #1 lineage), unified-description LLM merge
173+
+ compaction (steps 4-5 + 6b), vector point upsert with the
174+
canonical name + 3-field payload + uuid5 id (step 7), and
175+
finally L1 + vector deletion of source entities (step 8).
176+
177+
Response shape is preserved byte-for-byte for backward-compat
178+
with the existing frontend (cuiwenbo task #9 stays at
179+
OpenAPI-regen + flow smoke):
180+
181+
* ``target_entity_id`` ← ``LineageMergeResult.final_target``
182+
* ``description`` ← ``compacted_description`` if present, else
183+
``unified_description``
184+
* ``source_chunk_ids`` ← chunk ids contributed by the source
185+
entities, recovered from the target's lineage *after* the
186+
merge has re-anchored them under the canonical name
187+
* ``edges_redirected`` / ``edges_collapsed`` — the new merger
188+
does not return per-edge stats (the alias-redirect decorator
189+
handles edge re-anchoring transparently at indexer write
190+
time, not at merge time). We surface ``0`` for both fields
191+
to keep the response shape stable.
173192
"""
174193
db_collection = await self._get_and_validate_collection(user_id, collection_id)
175194

176-
from aperag.domains.knowledge_graph.graphindex.integration import make_service_for_collection
195+
from aperag.graph_curation.alias_map import AliasCycleError
196+
from aperag.graph_curation.lineage_merge import build_lineage_entity_merger_for
197+
198+
merger = build_lineage_entity_merger_for(db_collection)
199+
try:
200+
merge_result = await merger.merge_entities(
201+
target_name=target_entity_id,
202+
source_names=source_entity_ids,
203+
merged_by=user_id,
204+
)
205+
except AliasCycleError as exc:
206+
# Surface as a validation error so the route returns 400
207+
# (per the existing ``ValueError`` → 400 mapping in
208+
# ``merge_nodes_view``).
209+
raise ValueError(str(exc)) from exc
177210

178-
svc = make_service_for_collection(db_collection)
179-
result = await svc.merge_entities(
180-
collection_id=collection_id,
181-
target_entity_id=target_entity_id,
182-
source_entity_ids=source_entity_ids,
183-
)
184211
try:
185212
from aperag.graph_curation import graph_curation_service
186213

@@ -191,15 +218,63 @@ async def merge_entities(
191218
)
192219
except Exception:
193220
logger.exception("Failed to expire stale graph-curation suggestions after manual merge")
221+
222+
# Recover source chunk ids from the target's lineage after the
223+
# merge — step 6a re-anchored each source's parts under the
224+
# canonical name with their original ``(document_id,
225+
# parse_version, chunk_ids)`` so the union of those chunks is
226+
# what the UI expects.
227+
source_chunk_ids = await self._collect_source_chunk_ids(
228+
db_collection,
229+
entity_name=merge_result.final_target,
230+
)
231+
232+
description = merge_result.compacted_description or merge_result.unified_description or ""
233+
194234
return {
195-
"target_entity_id": result.target_entity_id,
196-
"merged_source_ids": list(result.merged_source_ids),
197-
"description": result.description,
198-
"source_chunk_ids": list(result.source_chunk_ids),
199-
"edges_redirected": result.edges_redirected,
200-
"edges_collapsed": result.edges_collapsed,
235+
"target_entity_id": merge_result.final_target,
236+
"merged_source_ids": list(merge_result.merged_source_ids),
237+
"description": description,
238+
"source_chunk_ids": source_chunk_ids,
239+
# Edge re-anchoring is handled transparently by the
240+
# alias-redirect decorator at indexer write time (Wave 7
241+
# §K.12 invariant #9), not as part of the merge action,
242+
# so per-edge counts are not surfaced. ``0`` keeps the
243+
# response shape stable for backward-compat.
244+
"edges_redirected": 0,
245+
"edges_collapsed": 0,
201246
}
202247

248+
async def _collect_source_chunk_ids(self, collection: Any, *, entity_name: str) -> List[str]:
249+
"""Return the union of chunk ids attached to ``entity_name``'s
250+
lineage members after the merge has run. Used by
251+
:meth:`merge_entities` to populate the backward-compat
252+
``source_chunk_ids`` field."""
253+
import asyncio
254+
255+
from aperag.indexing.worker_factory import (
256+
_build_lineage_graph_store_inner,
257+
_resolve_graph_backend_type,
258+
)
259+
260+
backend_type = _resolve_graph_backend_type(collection)
261+
store = await asyncio.to_thread(
262+
_build_lineage_graph_store_inner, backend_type=backend_type, collection=collection
263+
)
264+
entity = await store.get_entity(entity_name)
265+
if entity is None:
266+
return []
267+
chunk_ids: list[str] = []
268+
seen: set[str] = set()
269+
for member in entity.source_lineage or ():
270+
for cid in getattr(member, "chunk_ids", ()) or ():
271+
cid_str = str(cid)
272+
if cid_str in seen:
273+
continue
274+
seen.add(cid_str)
275+
chunk_ids.append(cid_str)
276+
return chunk_ids
277+
203278
# ============================================================ Wave 7 §K.12.6
204279
async def search_entities(
205280
self,

aperag/domains/retrieval/pipeline.py

Lines changed: 47 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -461,42 +461,62 @@ async def _graph_search(
461461
query: str,
462462
top_k: int,
463463
) -> List[DocumentWithScore]:
464-
"""Knowledge-graph retrieval path via the new
465-
:class:`aperag.indexing.graph.LineageGraphStore` Protocol.
466-
467-
Wave 6 #33 chunk 3 (per architect Option C ruling msg=6fccd9ab):
468-
replaces the legacy ``GraphIndexService.query_context`` flow
469-
with a two-step composition:
470-
471-
1. ``query_entities_by_keyword(query, top_k)`` — anchor entities
472-
via lexical recall on the lineage store. The retrieval
473-
pipeline owns its own embedder and does not need a backend
474-
vector index here (vector recall was honestly deferred per
475-
chunk 2 ruling — Wave 4 lineage schema has no entity-vector
476-
column).
477-
2. ``expand_neighbors_n_hops(entity_names, hops=1)`` — pull the
478-
direct neighbours + connecting relations so the rendered
479-
context block includes both anchor entities and their
480-
one-hop graph context.
481-
482-
A collection that hasn't been indexed yet (or yields no
483-
keyword anchors) returns ``[]``; this is the correct behaviour —
484-
search pipelines compose (vector + graph + fulltext), and a
485-
blank graph just means "graph contributes nothing this time",
486-
not "fall back to something stale".
464+
"""Knowledge-graph retrieval path via :class:`GraphSearchService`.
465+
466+
Wave 7 §K.12.5 / §K.12.8 (task #8) cutover: replaces the
467+
keyword-only Wave 6 #33 chunk 3 path with the full
468+
vector + 1-hop traversal composition the legacy
469+
``GraphIndexService.query_context`` always intended. Steps:
470+
471+
1. :meth:`GraphSearchService.search_entities` — embed the query
472+
against the per-collection vector index and ANN-recall the
473+
top-K entities (semantic match, not exact name match).
474+
2. :meth:`GraphSearchService.get_subgraph` — pull the direct
475+
neighbours + connecting relations of the anchor entities.
476+
3. :meth:`GraphSearchService.compose_context` — render the
477+
``-----Entities (KG)----- / -----Relationships (KG)-----``
478+
text block.
479+
480+
The render is byte-for-byte identical to Wave 6's
481+
``_render_graph_context_text`` (locked by
482+
``test_compose_context_matches_retrieval_pipeline_render_byte_for_byte``
483+
in PR #1756) so the swap is zero-functional-change for
484+
downstream RAG prompts: same context shape, same dedup rule,
485+
same fallback marker. Vector recall now actually happens
486+
— the Wave 4 → Wave 6 vacuum noted in §K.12.1 is closed.
487+
488+
A collection that hasn't been indexed yet (no vector points)
489+
returns ``[]`` — ``search_entities`` swallows backend
490+
embed/search faults and returns an empty list, mirroring the
491+
Wave 6 graceful-degrade convention. ``enable_knowledge_graph``
492+
gating preserved for backward compat.
487493
"""
488494
config = parseCollectionConfig(collection.config)
489495
if not config.enable_knowledge_graph:
490496
logger.warning(f"Collection {collection.id} does not have knowledge graph enabled")
491497
return []
492498

493-
store = _build_lineage_graph_store_for(collection)
494-
anchors = await store.query_entities_by_keyword(query=query, top_k=top_k)
499+
from aperag.indexing.graph_search_service import (
500+
GraphSearchService,
501+
build_graph_search_service_for,
502+
)
503+
504+
try:
505+
service = build_graph_search_service_for(collection)
506+
except Exception:
507+
logger.warning(
508+
"graph_search: factory failed for collection %s; degrading to empty result",
509+
collection.id,
510+
exc_info=True,
511+
)
512+
return []
513+
514+
anchors = await service.search_entities(query=query, top_k=top_k)
495515
if not anchors:
496516
return []
497517
anchor_names = [e.name for e in anchors]
498-
entities, relations = await store.expand_neighbors_n_hops(entity_names=anchor_names, hops=1)
499-
text = _render_graph_context_text(entities, relations)
518+
entities, relations = await service.get_subgraph(entity_names=anchor_names, hops=1)
519+
text = GraphSearchService.compose_context(entities, relations)
500520
if not text:
501521
return []
502522
return [DocumentWithScore(text=text, metadata={"recall_type": "graph_search"})]

aperag/graph_curation/lineage_merge.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,10 +445,103 @@ async def _to_thread(func, *args, **kwargs):
445445
return await asyncio.to_thread(func, *args, **kwargs)
446446

447447

448+
# ---------------------------------------------------------------------
449+
# Per-collection factory — Wave 7 §K.12.8 task #8 wiring
450+
# ---------------------------------------------------------------------
451+
452+
453+
class _SyncEmbedderShim:
454+
"""Adapt the sync ``(text -> list[float])`` callable used by the
455+
graph worker into the ``embed_query`` shape the merger / detector
456+
expect (mirrors :class:`EmbeddingService` surface). Lifted from
457+
:class:`MergeCandidateDetector`'s factory so the merger and
458+
detector share one shim."""
459+
460+
def __init__(self, fn: Callable[[str], list[float]]) -> None:
461+
self._fn = fn
462+
463+
def embed_query(self, text: str) -> list[float]:
464+
return self._fn(text)
465+
466+
467+
def build_lineage_entity_merger_for(collection: Any) -> "LineageEntityMerger":
468+
"""Build a :class:`LineageEntityMerger` for ``collection``.
469+
470+
Wires the six dependencies the merger expects:
471+
472+
* ``store`` — :func:`_build_lineage_graph_store_inner` (raw inner
473+
store; the merger writes canonical names directly and must NOT
474+
be intercepted by the alias-redirect decorator that
475+
:func:`_build_lineage_graph_store` returns to indexer / read
476+
paths).
477+
* ``alias_repo`` — fresh :class:`AliasMapRepository`.
478+
* ``compactor`` — :func:`_build_collection_graph_compactor`. Falls
479+
back to a no-op compactor if the collection has no completion
480+
model configured (the merger still runs; description simply
481+
stays uncompacted).
482+
* ``vector_connector`` + ``embedder`` —
483+
:func:`_build_collection_graph_vector_writer` shared with the
484+
Phase 3 indexer write path. Wrapped via :class:`_SyncEmbedderShim`
485+
so the ``embed_query`` interface matches.
486+
* ``llm`` — :func:`build_collection_llm_callable` (same async LLM
487+
callable the legacy graphindex used).
488+
* ``collection_id`` — bound at construction.
489+
490+
Raises :class:`aperag.indexing.worker_factory.WorkerFactoryError`
491+
when the embedder / vector connector / LLM cannot be resolved
492+
— a user-driven merge cannot meaningfully proceed without them
493+
(no unified description, no vector re-anchor).
494+
"""
495+
# Lazy imports keep this module free of worker_factory at import
496+
# time so worker_factory's own ``from .lineage_merge`` import (if
497+
# ever added) wouldn't form a cycle.
498+
from aperag.domains.knowledge_graph.graphindex.integration import (
499+
build_collection_llm_callable,
500+
)
501+
from aperag.indexing.graph_compactor import GraphIndexCompactor
502+
from aperag.indexing.worker_factory import (
503+
WorkerFactoryError,
504+
_build_collection_graph_vector_writer,
505+
_build_lineage_graph_store_inner,
506+
_resolve_graph_backend_type,
507+
)
508+
509+
backend_type = _resolve_graph_backend_type(collection)
510+
inner_store = _build_lineage_graph_store_inner(backend_type=backend_type, collection=collection)
511+
512+
vector_connector, embed_fn = _build_collection_graph_vector_writer(collection)
513+
if vector_connector is None or embed_fn is None:
514+
raise WorkerFactoryError(
515+
f"merge_entities: vector connector / embedder unavailable for collection {collection.id!r}"
516+
)
517+
518+
try:
519+
llm = build_collection_llm_callable(collection)
520+
except Exception as exc: # noqa: BLE001 — surface as factory failure.
521+
raise WorkerFactoryError(
522+
f"merge_entities: LLM not configured for collection {collection.id!r}: {exc}"
523+
) from exc
524+
525+
# Compactor is best-effort — the merger still runs without it
526+
# (description stays uncompacted; embedding falls back to unified).
527+
compactor = GraphIndexCompactor(llm=llm)
528+
529+
return LineageEntityMerger(
530+
store=inner_store,
531+
alias_repo=AliasMapRepository(),
532+
compactor=compactor,
533+
vector_connector=vector_connector,
534+
embedder=_SyncEmbedderShim(embed_fn),
535+
llm=llm,
536+
collection_id=str(collection.id),
537+
)
538+
539+
448540
__all__ = [
449541
"LineageEntityMerger",
450542
"LineageMergeResult",
451543
"AliasCycleError",
452544
"CURATION_MERGE_DOCUMENT_ID",
453545
"GRAPH_ENTITY_INDEXER",
546+
"build_lineage_entity_merger_for",
454547
]

aperag/indexing/worker_factory.py

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -690,10 +690,15 @@ def _resolve_graph_backend_type(collection: Any) -> str:
690690
return backend
691691

692692

693-
def _build_lineage_graph_store(*, backend_type: str, collection: Any) -> Any:
694-
"""Construct the per-collection :class:`LineageGraphStore` adapter
695-
by binding the shared per-process backend client to the collection
696-
id."""
693+
def _build_lineage_graph_store_inner(*, backend_type: str, collection: Any) -> Any:
694+
"""Construct the raw per-collection :class:`LineageGraphStore`
695+
adapter (no alias-redirect wrapper).
696+
697+
Used internally by :func:`_build_lineage_graph_store` and by the
698+
user-driven merger (Wave 7 task #6 :class:`LineageEntityMerger`,
699+
which writes canonical names directly and must not be intercepted
700+
by the alias-redirect decorator).
701+
"""
697702
if backend_type == "postgres":
698703
engine = _postgres_async_engine_singleton()
699704
from aperag.indexing.graph_storage.postgres import PostgresLineageGraphStore
@@ -720,6 +725,33 @@ def _build_lineage_graph_store(*, backend_type: str, collection: Any) -> Any:
720725
raise WorkerFactoryError(f"unsupported graph_backend_type {backend_type!r}")
721726

722727

728+
def _build_lineage_graph_store(*, backend_type: str, collection: Any) -> Any:
729+
"""Wave 7 §K.12 invariant #9 — return the alias-redirect-wrapped
730+
:class:`LineageGraphStore`.
731+
732+
Every write goes through the per-collection
733+
:class:`LineageGraphStoreWithAliasRedirect` (PR #1758) so
734+
user-merged entities are transparently consolidated at indexing
735+
time. Reads pass through unchanged. This makes the inseparability
736+
gate of task #6 alive in production: without this swap the alias
737+
map is written but never consulted, so user merges silently
738+
re-created the merged-away entities on the next sync.
739+
740+
Callers that need the raw inner store (the merger's L1 write
741+
path, which targets canonical names directly and must not be
742+
redirected) should use :func:`_build_lineage_graph_store_inner`.
743+
"""
744+
from aperag.graph_curation.alias_map import AliasMapRepository
745+
from aperag.indexing.alias_redirect_store import LineageGraphStoreWithAliasRedirect
746+
747+
inner = _build_lineage_graph_store_inner(backend_type=backend_type, collection=collection)
748+
return LineageGraphStoreWithAliasRedirect(
749+
inner=inner,
750+
alias_repo=AliasMapRepository(),
751+
collection_id=str(collection.id),
752+
)
753+
754+
723755
def _resolve_entity_lock(*, backend_type: str) -> Any:
724756
"""Pick the EntityLock implementation appropriate for the backend.
725757

0 commit comments

Comments
 (0)