diff --git a/aperag/domains/retrieval/pipeline.py b/aperag/domains/retrieval/pipeline.py index 9a99a47da..b014ddaa4 100644 --- a/aperag/domains/retrieval/pipeline.py +++ b/aperag/domains/retrieval/pipeline.py @@ -18,15 +18,19 @@ Phase 2 hard-cut. Preserves the existing pipeline shape and async semantics byte-for-byte — no algorithmic change (Non-goal 2). -The knowledge-graph recall path consumes its provider through the -``GraphSearchContract`` protocol declared in -``aperag.domains.retrieval.ports``. The retrieval domain must not -import ``aperag.domains.knowledge_graph`` directly — doing so would -re-establish a cross-domain static dependency. Instead the -``_graph_search`` method type-binds to the Protocol; the concrete -graphindex service instance structurally satisfies it at runtime. -``aperag.domains.knowledge_graph.graphindex.*`` stays legal here because it is infrastructure, -not a forbidden aggregate. +Wave 6 #33 chunk 3 (per architect Option C ruling msg=6fccd9ab): +the knowledge-graph recall path now routes through the new +:class:`aperag.indexing.graph.LineageGraphStore` Protocol +(``query_entities_by_keyword`` + ``expand_neighbors_n_hops``) +instead of the legacy ``GraphIndexService.query_context``. The +retrieval pipeline composes context text from +``EntityWithLineage`` / ``RelationWithLineage`` directly — see +:func:`_render_graph_context_text`. The legacy +``aperag.domains.knowledge_graph.graphindex`` package is retained +in the codebase because its UI / curation methods (``get_labels`` / +``get_knowledge_graph`` / ``merge_entities``) still serve other +flows (knowledge_graph/service.py + graph_curation/*); the retrieval +side has no remaining import of it. """ from __future__ import annotations @@ -39,7 +43,6 @@ from aperag.config import build_vector_db_context, settings from aperag.db.ops import async_db_ops from aperag.domains.retrieval.context.context import ContextManager -from aperag.domains.retrieval.ports import GraphSearchContract from aperag.domains.retrieval.schemas import SearchRequest, SearchResultItem, SearchResultMetadata from aperag.exceptions import ValidationException from aperag.llm.embed.base_embedding import get_collection_embedding_service_sync @@ -74,17 +77,61 @@ class CollectionRow(Protocol): config: Any -def _graph_search_service_for(collection: CollectionRow) -> GraphSearchContract: - """Build the graph-search provider for a collection. +def _render_graph_context_text(entities: list[Any], relations: list[Any]) -> str: + """Compose a LightRAG-style text block from entities + relations. - The import is kept local so the pipeline module does not pay the - graphindex import tax unless a graph recall is actually requested. - The return type is annotated as the Protocol so the boundary is - explicit: the caller only sees ``query_context`` and nothing else. + Mirrors the legacy ``GraphIndexService.query_context`` rendering + convention so downstream RAG prompts that expect the + ``-----Entities (KG)----- / -----Relationships (KG)-----`` + sectioned text continue to work after the Wave 6 #33 chunk 3 + migration. + + ``EntityWithLineage`` / ``RelationWithLineage`` carry their + description across multiple ``DescriptionPart`` rows (one per + contributing parse). We join the non-empty fragments with `` | `` + so the rendered context surfaces every contribution; an entity + with no descriptions falls back to ``"(no description)"``. + """ + if not entities and not relations: + return "" + lines: list[str] = [] + if entities: + lines.append("-----Entities (KG)-----") + for e in entities: + desc_parts = [p.text.strip() for p in (e.description_parts or ()) if p.text and p.text.strip()] + desc = " | ".join(desc_parts) or "(no description)" + type_label = e.type or "entity" + lines.append(f"- [{type_label}] {e.name} — {desc}") + if relations: + if lines: + lines.append("") + lines.append("-----Relationships (KG)-----") + for r in relations: + desc_parts = [p.text.strip() for p in (r.description_parts or ()) if p.text and p.text.strip()] + desc = " | ".join(desc_parts) or "(no description)" + lines.append(f"- {r.source} → {r.target}: {desc}") + return "\n".join(lines) + + +def _build_lineage_graph_store_for(collection: CollectionRow) -> Any: + """Build a :class:`LineageGraphStore` for ``collection`` using the + same backend dispatch the indexing worker_factory uses. The import + is kept local so the pipeline module does not pay the indexing + import tax unless a graph recall is actually requested. + + Wave 6 #33 chunk 3 (per architect Option C ruling msg=6fccd9ab): + retrieval-side LightRAG-style query routes through the new + LineageGraphStore Protocol (`query_entities_by_keyword` + + `expand_neighbors_n_hops`) instead of the legacy + ``GraphIndexService.query_context``. """ - from aperag.domains.knowledge_graph.graphindex.integration import make_service_for_collection + from aperag.indexing.worker_factory import ( + _build_lineage_graph_store, + _resolve_graph_backend_type, + ) - return make_service_for_collection(collection) # type: ignore[return-value] + backend_type = _resolve_graph_backend_type(collection) + return _build_lineage_graph_store(backend_type=backend_type, collection=collection) def _deduplicate_vision_results(results: List[DocumentWithScore]) -> List[DocumentWithScore]: @@ -414,25 +461,45 @@ async def _graph_search( query: str, top_k: int, ) -> List[DocumentWithScore]: - """Knowledge-graph retrieval path. Always routes to graphindex v2 - through the ``GraphSearchContract`` protocol. - - A collection that hasn't been indexed yet returns no context; - this is the correct behaviour — search pipelines compose - (vector + graph + fulltext), and a blank graph just means - "graph contributes nothing this time", not "fall back to - something stale". + """Knowledge-graph retrieval path via the new + :class:`aperag.indexing.graph.LineageGraphStore` Protocol. + + Wave 6 #33 chunk 3 (per architect Option C ruling msg=6fccd9ab): + replaces the legacy ``GraphIndexService.query_context`` flow + with a two-step composition: + + 1. ``query_entities_by_keyword(query, top_k)`` — anchor entities + via lexical recall on the lineage store. The retrieval + pipeline owns its own embedder and does not need a backend + vector index here (vector recall was honestly deferred per + chunk 2 ruling — Wave 4 lineage schema has no entity-vector + column). + 2. ``expand_neighbors_n_hops(entity_names, hops=1)`` — pull the + direct neighbours + connecting relations so the rendered + context block includes both anchor entities and their + one-hop graph context. + + A collection that hasn't been indexed yet (or yields no + keyword anchors) returns ``[]``; this is the correct behaviour — + search pipelines compose (vector + graph + fulltext), and a + blank graph just means "graph contributes nothing this time", + not "fall back to something stale". """ config = parseCollectionConfig(collection.config) if not config.enable_knowledge_graph: logger.warning(f"Collection {collection.id} does not have knowledge graph enabled") return [] - svc: GraphSearchContract = _graph_search_service_for(collection) - ctx = await svc.query_context(collection_id=str(collection.id), query=query, top_k=top_k) - if not ctx.text: + store = _build_lineage_graph_store_for(collection) + anchors = await store.query_entities_by_keyword(query=query, top_k=top_k) + if not anchors: + return [] + anchor_names = [e.name for e in anchors] + entities, relations = await store.expand_neighbors_n_hops(entity_names=anchor_names, hops=1) + text = _render_graph_context_text(entities, relations) + if not text: return [] - return [DocumentWithScore(text=ctx.text, metadata={"recall_type": "graph_search"})] + return [DocumentWithScore(text=text, metadata={"recall_type": "graph_search"})] async def _summary_search( self, diff --git a/docs/modularization/indexing-redesign-design-pack.md b/docs/modularization/indexing-redesign-design-pack.md index 0abf01ce5..2634350b5 100644 --- a/docs/modularization/indexing-redesign-design-pack.md +++ b/docs/modularization/indexing-redesign-design-pack.md @@ -1782,25 +1782,22 @@ PM (燧木) 决定。架构师建议参考 D10 模式: **目标**: 完成 Wave 5 Phase 1 narrowed scope 推迟的 legacy ``graphindex`` package 整体淘汰 + retrieval/curation flows 迁移。这是 cross-cutting refactor,与 Wave 5 single-PR 风格不同 — 需先设计新查询层 (LightRAG-style query layer on ``LineageGraphStore``),再 cascade migrate 调用方。 -**Wave 6 acceptance items** (per Wave 5 §K.9 scope-narrow handoff): - -1. **Design + implement LightRAG-style query layer** on ``LineageGraphStore`` Protocol: - - by-keyword entity / relation lookup - - vector-recall augmented retrieval (复用 §G.5 retrieval pipeline 的 vector connector) - - graph traversal API (1-hop neighbors, multi-hop expansion) - - query result types align with retrieval pipeline expectations -2. **Migrate legacy ``graphindex`` callers to new query layer**: - - ``aperag/domains/retrieval/pipeline.py:_fulltext_search`` etc. — 用新 query API 替 ``GraphIndexService.query_context()`` - - ``aperag/domains/knowledge_graph/service.py`` graph CRUD — 用新 LineageGraphStore find/get methods - - ``aperag/graph_curation/service.py`` + ``integration.py`` + ``candidate_generation.py`` — 用新 query API + Entity DTO relocation -3. **Delete legacy package**: - - ``aperag/domains/knowledge_graph/graphindex/{storage, service, integration, engine, __init__}.py`` - - ``graphindex/{config, dto, prompts, models}.py`` (DTOs may relocate to canonical home) -4. **Delete legacy tests**: - - ``tests/unit_test/graphindex/test_connector.py`` - - ``tests/unit_test/graphindex/test_nebula_store.py`` - - ``tests/integration/compat/test_graph_compat.py`` -5. **Final grep-zero verify**: post-Wave-6, ``aperag/`` 全树无 ``from aperag.domains.knowledge_graph.graphindex`` import +**Wave 6 acceptance items** (per Wave 5 §K.9 scope-narrow handoff + Option C ruling msg=6fccd9ab narrowing post-chunk-3 caller-cascade gap): + +1. **Design + implement LightRAG-style query layer** on ``LineageGraphStore`` Protocol — **✅ shipped via #33 chunks 1+2 (PR #1737 + #1741)**: + - ✅ by-keyword entity lookup (`query_entities_by_keyword`) + - ❌ ~~vector-recall augmented retrieval~~ — **deferred per chunk 2 Option A ruling msg=54eac595** (lineage schema 没 entity vector column; Wave 7+ if real evidence) + - ✅ graph traversal API (`expand_neighbors_n_hops` with bounded `hops`) + - ✅ query result types align with retrieval pipeline expectations (`EntityWithLineage` / `RelationWithLineage`) +2. **Migrate legacy ``graphindex`` callers to new query layer** — **partially shipped via #33 chunk 3 (PR #1742) per Option C ruling msg=6fccd9ab**: + - ✅ ``aperag/domains/retrieval/pipeline.py`` `_graph_search` — 用新 `query_entities_by_keyword` + `expand_neighbors_n_hops` + `_render_graph_context_text` 替 ``GraphIndexService.query_context()``。retrieval/pipeline.py grep-zero verified post-chunk-3. + - ❌ ~~``aperag/domains/knowledge_graph/service.py`` graph CRUD~~ — **deferred to evidence-based future Wave** (UI methods `get_labels` / `get_knowledge_graph` / `merge_entities` 不在 new Protocol surface — chunk 1 spec lock 时只 verify import-level + design `query_context` replacement,漏 enumerate UI/curation methods coverage; Pattern 1 强化 lesson sediment to `feedback_spec_lock_grep_verify_caller.md`) + - ❌ ~~``aperag/graph_curation/service.py`` + ``integration.py`` + ``candidate_generation.py``~~ — **deferred to evidence-based future Wave** (依赖 graphindex.dto.Entity 类型 + LLM curation flows) +3. ❌ ~~**Delete legacy package**~~ — **deferred to evidence-based future Wave** (UI/curation flows 仍 reference legacy package; per simple-stable directive "if no evidence → defer"; UI/curation 现 work fine via legacy — 无 user-facing 急迫性) +4. ❌ ~~**Delete legacy tests**~~ — **deferred** (与 item 3 同 reason — UI/curation tests 仍 reference) +5. ❌ ~~**Final grep-zero verify** post-Wave-6, ``aperag/`` 全树无 graphindex import~~ — **deferred**; post-chunk-3 narrow grep-zero: `from aperag.domains.knowledge_graph.graphindex` 仅在 4 UI/curation files (knowledge_graph/service.py + 3 graph_curation/) — retrieval-side cutover ✅ + +**Wave 6 #33 close-out (narrowed)**: chunks 1+2+3 ship retrieval-side LightRAG-style query layer + retrieval/pipeline.py cutover。Legacy graphindex package retained for UI/curation。Wave 7+ task creation contingent on evidence-based real-world need (production user 报 issue / new UI feature 需 / refactor pressure 累积) — per simple-stable directive #1 "不无限扩范围" + audit-filter rule "if no evidence → defer"。 **Cross-backend lineage polish folded from Wave 5 P5A** (per Wave 5 P5A close-out 2026-04-27): @@ -1885,7 +1882,7 @@ Phase C (PR-B): └── #39 provider format variations (明书, 2-4 days, p | # | must-be-real | may-be-gated | fully-resolves | |---|---|---|---| -| 33 | LightRAG-style query layer 真实实现 (by-keyword + traversal API) + cascade-migrate 全 callers,行为等价 within ε tolerance for keyword + n-hop graph recall paths。**Graph entity vector recall NOT in Wave 6 scope** — deferred to Wave 7+ if real evidence surfaces (per architect ruling msg=54eac595 chunk 2 narrowed scope; lineage schema 没承担 entity vector storage,inline coupling 违反 simple-stable #3) | 无 (full hard-cut per §K.10) | §K.8 Wave 5 backlog item 1 deferred + §K.10 sub-spec | +| 33 | LightRAG-style query layer 真实实现 (by-keyword + traversal API) on `LineageGraphStore` + retrieval/pipeline.py cutover (`query_context()` → `query_entities_by_keyword` + `expand_neighbors_n_hops` + `_render_graph_context_text`),行为等价 within ε tolerance for retrieval-side keyword + n-hop graph recall paths。**TWO honest defer**:(a) **Graph entity vector recall NOT in Wave 6 scope** — deferred to Wave 7+ if real evidence (per chunk 2 ruling msg=54eac595; lineage schema 没承担 entity vector storage); (b) **UI/curation flows migration NOT in Wave 6 scope** — `aperag/domains/knowledge_graph/service.py` + `aperag/graph_curation/*` 4 files 仍 reference legacy `graphindex/` package for `get_labels` / `get_knowledge_graph` / `merge_entities` UI methods (per chunk 3 ruling msg=6fccd9ab; chunk 1 spec lock 时只 verify import-level + design `query_context` replacement Protocol,漏 enumerate UI/curation methods coverage — Pattern 1 强化 lesson) | 无 (chunks 1+2+3 三 chunk 全 hard-cut applied within narrowed scope) | §K.8 Wave 5 backlog item 1 deferred + §K.10 sub-spec narrowed (items 1-2 ✅ shipped via chunks 1-3, items 3-5 deferred to evidence-based future Wave) | | 34 | parser layer chunk_id 字段 schema 一致 + remaining utc_now → CURRENT_TIMESTAMP unify | parser legacy alt-shape transitional decoder OK to keep if needed for legacy fixtures | huangheng T1 obs B + Wave 5 P5B chunk_id 5th item 推迟项 | | 35 | Postgres / Nebula / Neo4j 全 lineage SET storage 改 parallel-list O(N) encoding + alembic migration | 无 — hard-cut policy per earayu2 msg=30c81478 (无生产数据 → schema 直改) | §K.10 item 6 (`feedback_simple_stable_zero_maintenance.md` directive #4 "私有化部署免维护" align — operator deploy 后不管 schema migration since 自动 alembic upgrade) | | 36 | `EntityRecord.type` → `entity_type` Protocol surface rename + Postgres column rename + Cypher property rewrite + Nebula tag-prop rename | 无 — hard-cut | §K.10 item 7 + §D.3 Protocol stability | @@ -1895,9 +1892,14 @@ Phase C (PR-B): └── #39 provider format variations (明书, 2-4 days, p #### K.11.5. Pre-check pattern lock (per `feedback_spec_lock_grep_verify_caller.md` 三 pattern) -**Pattern 1 (caller cascade)** 强制触发条件: +**Pattern 1 (caller cascade — IMPORT count + METHOD coverage matrix)** 强制触发条件: -- **#33**: pre-implementation 必跑 `grep -rn "from aperag.domains.knowledge_graph.graphindex" aperag/` 列全 caller graph,分类 (test-only / new-code-self-ref / legacy-production);legacy production caller (retrieval/pipeline.py / knowledge_graph/service.py / graph_curation/*) 要全列出迁移到新 query layer 的 mapping +- **import-level pre-check** (Pattern 1 v1): `grep -rn "from X import" aperag/` 列 caller files 数量 + 分类 (test-only / new-code-self-ref / legacy-production) +- **method-level pre-check** (Pattern 1 v2 — added 2026-04-27 per Wave 6 #33 chunk 3 ruling msg=6fccd9ab lesson): FOR EACH caller file, enumerate USED METHODS — grep `legacy_X.METHOD(` calls。VERIFY EACH method 是否 在 new Protocol/API surface 内 covered。Method-level coverage 是 import-level coverage 的 superset — 单 import-level pass 不充分。 + +如 ANY caller method NOT covered → STOP: lock form 4 (extended):narrow chunk to migrate only callers using covered methods; legacy package retained for un-covered methods; acceptance text 显式 declare un-covered methods + 决策 (add Protocol method same chunk / prior chunk / defer caller migration to next Wave)。 + +- **#33**: pre-implementation 必跑 (a) `grep -rn "from aperag.domains.knowledge_graph.graphindex" aperag/` 列全 caller graph (5 files: retrieval/pipeline.py + knowledge_graph/service.py + graph_curation/*); (b) FOR EACH caller, enumerate `GraphIndexService.METHOD(` calls — verify each method 在 new `LineageGraphStore` Protocol covered。**Wave 6 #33 retrospective miss**: chunk 1 spec 时只 verify (a) + design `query_context` replacement Protocol,未 enumerate (b) UI/curation methods (`get_labels` / `get_knowledge_graph` / `merge_entities`)。chunk 3 实施时 surface gap → Option C ruling narrow + UI/curation defer to evidence-based future Wave。 - **#36**: pre-implementation 必跑 `grep -rn "EntityRecord.type\|RelationRecord.type\|\.type=" aperag/indexing/ aperag/domains/` 列 Protocol surface caller graph + Cypher template caller graph + Postgres column caller graph,避免 rename 漏 site **Pattern 2 (state binding — runtime config)** 强制触发条件: @@ -1993,27 +1995,34 @@ PR-A "Wave 6 graphindex" 是 cross-cutting refactor,独立 architect direct ra |---|---|---|---|---| | **chunk 1** | LightRAG-style read Protocol stubs on `LineageGraphStore` — NO implementation, NO caller migration; spec ↔ Protocol surface align verify only | Bryce | ~200 LOC docs+protocol | ✅ MERGED PR #1737 commit `20b9071b` 2026-04-27 | | **chunk 2** | Protocol implementations (`query_entities_by_keyword` + `expand_neighbors_n_hops`) across Postgres / Neo4j / Nebula backends + unit tests + chunk 1 amend (drop `query_entities_by_vector` per architect Option A ruling msg=54eac595) | Bryce | ~600 LOC + tests | 🔄 PR #1741 in_review | -| **chunk 3** | Caller migration (5 files) + legacy graphindex package hard-cut delete + legacy tests delete + grep-zero verify | Bryce | ~400 LOC | ⏳ post-chunk-2 | +| **chunk 3** (narrowed per Option C ruling msg=6fccd9ab) | retrieval/pipeline.py cutover only (`query_context()` → `query_entities_by_keyword` + `expand_neighbors_n_hops` + `_render_graph_context_text`) + retrieval-side legacy reference removal + retrieval/pipeline.py grep-zero verify。**DO NOT delete** legacy graphindex package + tests (UI/curation 4 files 仍 reference) | Bryce | ~50 LOC + 9 tests | ✅ PR #1742 in_review | **Chunk 1 → chunk 2 amend note (architect ruling msg=54eac595 — Option A narrowed scope)**: - chunk 1 originally declared 3 Protocol methods (keyword + vector + traversal). chunk 2 backend impl surface state-binding gap: lineage schema 没 entity vector column (Wave 4 design choice, write-side only) - Architect ruling Option A (msg=54eac595): chunk 2 amends chunk 1 — drop `query_entities_by_vector` Protocol method declaration + InMemoryLineageGraphStore stub + 2 contract tests. Vector recall deferred to Wave 7+ if real evidence。 - Lesson: chunk 1 spec lock 时 architect 应先 grep-verify entity vector storage schema → Pattern 3 (Protocol method state binding) added to §K.11.5 + sediment 到 `feedback_spec_lock_grep_verify_caller.md` +**Chunk 2 → chunk 3 amend note #2 (architect ruling msg=6fccd9ab — Option C narrowed scope, 2026-04-27 second amend within #33)**: +- chunk 2 implementation 完 + chunk 3 caller-cascade pre-check 时 Bryce surface 第二 architectural gap: legacy `GraphIndexService` 4 methods 中只 `query_context()` 在 new `LineageGraphStore` Protocol covered;UI/curation methods (`get_labels` / `get_knowledge_graph` / `merge_entities`) 不在 surface — 4 caller files (knowledge_graph/service.py + graph_curation/*) 不能 migrate without new Graph CRUD API design。 +- Architect ruling Option C (msg=6fccd9ab): chunk 3 narrowed to retrieval/pipeline.py cutover only。Legacy graphindex package retained for UI/curation。Wave 6 close-out 不再 include "delete legacy graphindex" — defer to evidence-based future Wave。 +- Lesson #2: chunk 1 spec lock 时 architect 应 enumerate per-caller-per-method matrix (Pattern 1 v2 method-level coverage),不仅 import count。Pattern 1 强化 added to §K.11.5 + sediment 到 `feedback_spec_lock_grep_verify_caller.md`。 +- 双 architect-side own-ups 在同一 #33 task — chunk 1 spec lock 颗粒度 spec amend 模式 ratify trail:chunks 2+3 各自 amend chunk 1 acceptance text + memory feedback Pattern 沉淀。 + **Each chunk lands independently within PR-A** (chunked-rotation per `feedback_no_refresh_complete_all_tasks.md` Layer 1 same-session continuation directive)。Each chunk has own architect direct ratify gate per §K.11.8 lane lock。 **Why 3 chunks (not 1 monolith)**: (a) chunk 1 spec/Protocol-only allows architect ratify to lock query layer surface before backend implementations; (b) chunk 2 backend impl can independently verify Protocol semantics on real engines (per `feedback_dataflow_review.md` dataflow trace each backend); (c) chunk 3 caller migration is pure mechanical refactor once Protocol stable — chunk 4d Option C precedent (msg=b26f64b2 narrowed scope) avoids "spec drift mid-cascade" anti-pattern。 -PR-A 的 acceptance items 即 §K.10 (1-5) + amended #33 must-be-real per §K.11.4 (graph entity vector recall not in scope; keyword + traversal sufficient for retrieval/curation existing behaviors)。 +PR-A 的 acceptance items: §K.10 items 1-2 (narrowed) ✅ shipped via chunks 1-3; items 3-5 (delete legacy package + tests + grep-zero) deferred to evidence-based future Wave per Option C ruling msg=6fccd9ab。amended #33 must-be-real per §K.11.4 (graph entity vector recall NOT in scope per chunk 2 ruling msg=54eac595; UI/curation flows migration NOT in scope per chunk 3 ruling msg=6fccd9ab; retrieval-side keyword + traversal cutover ✅ shipped)。 #### K.11.12. Wave 6 close-out gate -Wave 6 close-out 双 PR merge: -- PR-B merge ✅ → 6 polish items shipped (#34 + #35 + #36 + #37 + #38 + #39) -- PR-A merge ✅ → graphindex elim shipped (#33) -- task board #33-#39 全 done -- 架构师发 Wave 6 final review (mirror Wave 5 close-out msg=43df7dd8) -- Wave 7+ 创建前必通过 simple-stable 4 guardrail 重审 (architect msg=c8cdb40d audit-filter 模板 reusable) +Wave 6 close-out (revised 2026-04-27 post-Option C chunk 3 ruling msg=6fccd9ab): +- PR-B fast-track merges ✅ → 4 polish items shipped (#34 + #37 + #38 + #39 已 MERGED via PR #1735/#1738/#1739) +- PR-A chunks 1+2+3 merges ✅ → retrieval-side LightRAG-style query layer + retrieval/pipeline.py cutover shipped (#33 narrowed scope; UI/curation migration + legacy package delete deferred to evidence-based future Wave) +- 余 #35 (Bryce, Lineage parallel-list perf rewrite) + #36 (Bryce, Cypher TYPE() / EntityRecord.type rename) — 各自 fast-track PR per Wave 6 individual fast-track pattern +- task board #33-#39 全 done (七 task close-out) +- 架构师发 Wave 6 final review (mirror Wave 5 close-out msg=43df7dd8) 含 honest declare 双 deferred items (vector recall + UI/curation migration) for future Wave evidence-based reactivation +- Wave 7+ 创建前必通过 simple-stable 4 guardrail 重审 (architect msg=c8cdb40d audit-filter 模板 reusable) + 三 pre-check pattern lock (Pattern 1 v2 method-level coverage + Pattern 2 + Pattern 3) per `feedback_spec_lock_grep_verify_caller.md` **why 双 PR 是 simple-stable**: 拒绝 single-PR 24-day land cycle (#33 阻 polish 6 items);6 polish items 1 周 land + #33 设计完成后再 ship — 用户视角 "尽快上线" 实质化 (PR-B 各 modality 改进可下周即用)。 diff --git a/tests/unit_test/domains/retrieval/test_graph_search_migration.py b/tests/unit_test/domains/retrieval/test_graph_search_migration.py new file mode 100644 index 000000000..bce63f930 --- /dev/null +++ b/tests/unit_test/domains/retrieval/test_graph_search_migration.py @@ -0,0 +1,309 @@ +# Copyright 2025 ApeCloud, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Wave 6 #33 chunk 3 — retrieval pipeline graph-search migration. + +Pin the chunk 3 contract: ``SearchPipelineService._graph_search`` +routes through the new :class:`LineageGraphStore` Protocol +(``query_entities_by_keyword`` + ``expand_neighbors_n_hops``) and +no longer imports the legacy +``aperag.domains.knowledge_graph.graphindex`` package. + +Per architect Option C ruling msg=6fccd9ab the legacy graphindex +package itself stays alive (UI / curation flows still consume it), +but retrieval/pipeline.py has a grep-zero contract — verified by +:func:`test_retrieval_pipeline_no_graphindex_import` below. +""" + +from __future__ import annotations + +import asyncio +import pathlib + +from aperag.domains.retrieval.pipeline import ( + _render_graph_context_text, +) +from aperag.indexing.graph import ( + DescriptionPart, + EntityWithLineage, + LineageMember, + RelationWithLineage, +) + + +def test_retrieval_pipeline_no_graphindex_import(): + """Chunk 3 grep-zero invariant: ``retrieval/pipeline.py`` must not + import the legacy ``aperag.domains.knowledge_graph.graphindex`` + package any longer. The migrated path uses ``LineageGraphStore`` + instead. Docstring references that mention the legacy package by + name (without ``import`` / ``from``) are allowed — they document + the migration.""" + pipeline_path = pathlib.Path(__file__).resolve().parents[4] / "aperag" / "domains" / "retrieval" / "pipeline.py" + body = pipeline_path.read_text(encoding="utf-8") + # Two real-import shapes the grep-zero contract forbids. + assert "from aperag.domains.knowledge_graph.graphindex" not in body + assert "import aperag.domains.knowledge_graph.graphindex" not in body + + +def test_render_graph_context_empty_inputs_returns_empty_string(): + assert _render_graph_context_text([], []) == "" + + +def test_render_graph_context_renders_lightrag_style_sections(): + members = ( + LineageMember( + document_id="doc-1", + parse_version="v1", + tenant_scope_key="user:t", + chunk_ids=("chunk-1",), + ), + ) + parts_a = (DescriptionPart(document_id="doc-1", parse_version="v1", text="Founder of company X"),) + parts_b = (DescriptionPart(document_id="doc-1", parse_version="v1", text="Released product Y in 2024"),) + entities = [ + EntityWithLineage( + name="Alice", + type="person", + source_lineage=members, + description_parts=parts_a, + ), + EntityWithLineage( + name="Acme", + type="organization", + source_lineage=members, + description_parts=parts_b, + ), + ] + rel_parts = (DescriptionPart(document_id="doc-1", parse_version="v1", text="Alice founded Acme"),) + relations = [ + RelationWithLineage( + source="Alice", + target="Acme", + type="founded", + evidence_lineage=members, + description_parts=rel_parts, + ) + ] + text = _render_graph_context_text(entities, relations) + + assert "-----Entities (KG)-----" in text + assert "- [person] Alice — Founder of company X" in text + assert "- [organization] Acme — Released product Y in 2024" in text + assert "-----Relationships (KG)-----" in text + assert "- Alice → Acme: Alice founded Acme" in text + + +def test_render_graph_context_joins_multiple_description_parts(): + members = ( + LineageMember( + document_id="doc-1", + parse_version="v1", + tenant_scope_key="user:t", + chunk_ids=("c1",), + ), + ) + parts = ( + DescriptionPart(document_id="doc-1", parse_version="v1", text="frag-1"), + DescriptionPart(document_id="doc-1", parse_version="v1", text="frag-2"), + DescriptionPart(document_id="doc-1", parse_version="v1", text=" "), # whitespace skipped + ) + entities = [ + EntityWithLineage(name="X", type="thing", source_lineage=members, description_parts=parts), + ] + text = _render_graph_context_text(entities, []) + assert "- [thing] X — frag-1 | frag-2" in text + + +def test_render_graph_context_falls_back_to_no_description_marker(): + members = ( + LineageMember( + document_id="doc-1", + parse_version="v1", + tenant_scope_key="user:t", + chunk_ids=("c1",), + ), + ) + entities = [ + EntityWithLineage( + name="Y", + type="", + source_lineage=members, + description_parts=(), + ), + ] + text = _render_graph_context_text(entities, []) + # Empty type defaults to "entity" label per implementation. + assert "- [entity] Y — (no description)" in text + + +def test_graph_search_returns_empty_when_no_anchors(monkeypatch): + """If keyword recall returns no anchor entities, _graph_search + returns an empty list — no traversal call, no rendered context.""" + from aperag.domains.retrieval.pipeline import SearchPipelineService + + class _StubStore: + def __init__(self) -> None: + self.expand_called = False + + async def query_entities_by_keyword(self, *, query, top_k): + return [] + + async def expand_neighbors_n_hops(self, *, entity_names, hops=1): + self.expand_called = True + return ([], []) + + stub_store = _StubStore() + monkeypatch.setattr( + "aperag.domains.retrieval.pipeline._build_lineage_graph_store_for", + lambda _collection: stub_store, + ) + + class _Collection: + id = "col-1" + user = "u" + config = '{"enable_knowledge_graph": true}' + + svc = SearchPipelineService() + results = asyncio.run(svc._graph_search(_Collection(), "anything", 5)) + assert results == [] + assert stub_store.expand_called is False, "expand should not be called when no anchors" + + +def test_graph_search_composes_text_from_anchors_and_neighbors(monkeypatch): + members = ( + LineageMember( + document_id="doc-1", + parse_version="v1", + tenant_scope_key="user:t", + chunk_ids=("c1",), + ), + ) + alice = EntityWithLineage( + name="Alice", + type="person", + source_lineage=members, + description_parts=(DescriptionPart(document_id="doc-1", parse_version="v1", text="Lead engineer"),), + ) + bob = EntityWithLineage( + name="Bob", + type="person", + source_lineage=members, + description_parts=(DescriptionPart(document_id="doc-1", parse_version="v1", text="CFO"),), + ) + alice_bob = RelationWithLineage( + source="Alice", + target="Bob", + type="reports_to", + evidence_lineage=members, + description_parts=(DescriptionPart(document_id="doc-1", parse_version="v1", text="Alice reports to Bob"),), + ) + + class _StubStore: + def __init__(self) -> None: + self.last_query: str | None = None + self.last_seeds: list[str] | None = None + + async def query_entities_by_keyword(self, *, query, top_k): + self.last_query = query + return [alice] + + async def expand_neighbors_n_hops(self, *, entity_names, hops=1): + self.last_seeds = entity_names + return ([alice, bob], [alice_bob]) + + stub_store = _StubStore() + monkeypatch.setattr( + "aperag.domains.retrieval.pipeline._build_lineage_graph_store_for", + lambda _collection: stub_store, + ) + + from aperag.domains.retrieval.pipeline import SearchPipelineService + + class _Collection: + id = "col-1" + user = "u" + config = '{"enable_knowledge_graph": true}' + + svc = SearchPipelineService() + results = asyncio.run(svc._graph_search(_Collection(), "Alice", 5)) + assert len(results) == 1 + doc = results[0] + assert doc.metadata == {"recall_type": "graph_search"} + assert "Alice" in doc.text + assert "Bob" in doc.text + assert "reports_to" not in doc.text # type label not in body, only the description + assert "Alice → Bob" in doc.text + assert stub_store.last_query == "Alice" + assert stub_store.last_seeds == ["Alice"] + + +def test_graph_search_returns_empty_when_kg_disabled(monkeypatch): + """If ``enable_knowledge_graph`` is False the path short-circuits + before touching the lineage store.""" + from aperag.domains.retrieval.pipeline import SearchPipelineService + + called = {"build": False} + + def _should_not_be_called(_collection): + called["build"] = True + raise AssertionError("graph store builder must not be reached when KG disabled") + + monkeypatch.setattr( + "aperag.domains.retrieval.pipeline._build_lineage_graph_store_for", + _should_not_be_called, + ) + + class _Collection: + id = "col-1" + user = "u" + config = '{"enable_knowledge_graph": false}' + + svc = SearchPipelineService() + results = asyncio.run(svc._graph_search(_Collection(), "anything", 5)) + assert results == [] + assert called["build"] is False + + +def test_build_lineage_graph_store_for_imports_only_on_demand(monkeypatch): + """``_build_lineage_graph_store_for`` keeps the indexing import + local — calling it must reach worker_factory's resolver helpers + without importing them at module level. We exercise the function + via a stubbed resolver to assert the contract.""" + from aperag.domains.retrieval import pipeline as pipeline_module + + seen: list[str] = [] + + def _stub_resolve(_collection): + seen.append("resolve") + return "postgres" + + def _stub_build(*, backend_type, collection): + seen.append(f"build:{backend_type}") + return object() + + monkeypatch.setattr( + "aperag.indexing.worker_factory._resolve_graph_backend_type", + _stub_resolve, + ) + monkeypatch.setattr( + "aperag.indexing.worker_factory._build_lineage_graph_store", + _stub_build, + ) + + class _Collection: + id = "col-1" + config = '{"graph_backend_type": "postgres"}' + + pipeline_module._build_lineage_graph_store_for(_Collection()) + assert seen == ["resolve", "build:postgres"]