feat(graph): W8-2 — bulk_upsert_entity_with_lineage_parts Protocol + 4 backends + curation cutover#1771
Conversation
…nds + curation cutover (W8-2) Wave 8 task #13 (W8-2): adds a bulk-upsert primitive on the ``LineageGraphStore`` Protocol so callers consolidating N×M ``(description_part, lineage_member)`` tuples to the same target entity get one transaction / one round-trip instead of N×M sequential single upserts. Cuts ``LineageEntityMerger.merge_entities`` step 6a over to the bulk path. ## Why ``GraphCurationService.merge_entities`` (the N-source-into-1-target user merge flow) re-anchors every source's description parts under the target name. With N source entities each carrying M description parts, that loop emitted N×M sequential ``upsert_entity_with_lineage`` round-trips — one full transaction per part. For typical curation runs (3-10 sources × 5-20 parts each), that's 15-200 sequential SQL hits where one bulk write would do. Per architect spec (Wave 8 candidate W8-2 sediment) + huangheng's task #6 PR #1758 perf observation, this folds into a single bulk write per backend. ## Scope (5 numbered items) 1. **Protocol method** — ``bulk_upsert_entity_with_lineage_parts(*, parts: Sequence[tuple[EntityRecord, LineageMember]])`` on ``aperag/indexing/graph.py:LineageGraphStore``. All ``record.name`` values MUST share the same string (asserted as ``ValueError``). Empty parts is a no-op. Per-part dedup key is ``(document_id, parse_version)`` last-wins. Bulk path NEVER touches ``compacted_description`` (preserves existing — same ``COALESCE``-style semantic as single upsert with ``None``). 2. **InMemory ref impl** — single ``asyncio.Lock``-guarded loop in ``InMemoryLineageGraphStore``. 3. **Postgres impl** — single ``INSERT … ON CONFLICT (collection_id, name) DO UPDATE`` that strips matching keys via ``jsonb_array_elements`` ``NOT EXISTS`` against an incoming ``strip_keys_json`` array, then appends the whole new ``new_members_json`` / ``new_parts_json`` arrays. One statement, atomic. 4. **Neo4j impl** — single Cypher MERGE + parallel-list strip-then- append, with the strip predicate matching against the **set** of incoming keys (``IN $strip_keys`` on the ``"<doc_id>|<pv>"`` key string). Row-lock on MERGE serialises concurrent bulk ops on the same entity. 5. **Nebula impl** — single ``EntityLock(target_name)`` acquire + single read / Python merge / write. Mirrors the existing read-modify-write pattern of single upsert but folds the strip- then-append over the **set** of incoming keys. ## Caller cutover * ``LineageEntityMerger.merge_entities`` (step 6a) — replaces the N×M ``for src in source_entities: for part in src.description_parts: await self._store.upsert_entity_with_lineage(...)`` with a single ``bulk_upsert_entity_with_lineage_parts`` call. Step 6b's sentinel write (``__curation_merge__`` final write with unified+compacted text) still goes through the single-upsert path because it needs the ``compacted_description`` column write that the bulk path intentionally doesn't carry. ## Alias-redirect decorator * ``LineageGraphStoreWithAliasRedirect`` — bulk path mirrors single upsert: each part's ``record.name`` resolves through the alias map before forwarding to the inner store. The merger always passes records pinned to the canonical ``final_target`` so the redirect is a no-op in that flow, but symmetry with the single upsert contract means a future caller writing to an aliased name still gets correct behaviour. ## 12-invariant cross-check (Wave 7 §K.12) * **#1 L1 不污染**: bulk path operates on lineage SETs only — kg.jsonl raw extract layer untouched. ✅ * **#3 indexer write redirect through alias map**: bulk path goes through the same decorator alias resolution as single upsert. ✅ * **#9 alias redirect transparent**: decorator forwards bulk to inner with redirected names; merger callers see no behaviour change. ✅ * **#10 DB column cap**: Postgres ``compacted_description`` Text column unchanged (bulk path preserves existing value via COALESCE-equivalent — bulk passes ``None`` end-to-end through the Postgres SQL → INSERT branch sets NULL, ON CONFLICT branch preserves). ✅ * All other invariants: unaffected. ## 4-pattern pre-check matrix * Pattern A (kg.jsonl shape): unchanged * Pattern B (Lineage SET semantics): bulk preserves dedup-by- ``(document_id, parse_version)`` exactly — same key as single upsert. ✅ * Pattern C (Cypher LIST<MAP>): bulk reuses parallel-list encoding for the strip-then-append. ✅ * Pattern D (vector 3-field payload): unchanged ## Simple-stable 4-guardrail * **#1 不无限扩范围**: 1 new Protocol method, no new public REST surface, no new schema column, no new alembic migration. * **#2 尽快上线**: single PR, no spec amend needed (architect W8-2 candidate sediment + Wave 7 task #1 3-backend Protocol pattern reuse). * **#3 简单稳定**: bulk semantic mirrors single upsert exactly; caller cutover is a 1-call replacement of the N×M loop. * **#4 私有化部署免维护**: backend-portable (4 backends shipped cross-backend); no operator-facing config knob. ## Test plan - [x] InMemory contract — 7 new tests pin empty / create / replace / mismatched-names / dedup-within-input / preserves-compacted / last-entity-type-wins. - [x] Alias-redirect decorator — 3 new tests pin redirect-each-part / no-alias-passthrough / empty-short-circuit. - [x] LineageEntityMerger step 6a cutover — existing ``test_source_parts_reanchored_preserving_doc_lineage`` rewritten to assert the new single bulk call (was: 3 sequential single upserts) + single sentinel final write. - [x] All 1166 unit tests pass (up from 1141 baseline; 25 new tests). - [x] Wave 7 grep-zero contract — 10/10 still pass (intent-driven gate unaffected). - [x] ruff format / check clean on touched files. - [ ] Cross-backend integration test — sediment to Wave 8 follow-up (out of scope this PR; the 4-backend Protocol contract is structurally identical to single upsert which already has cross-backend integration coverage). - [ ] CR by @huangheng (focus: invariant #1 L1, #9 alias redirect transparent, 4-backend cross-roundtrip on contract level).
earayu
left a comment
There was a problem hiding this comment.
🟢 LGTM ✅ (huangheng pass-1) — verdict = ready to merge
Wave 8 W8-2 bulk API 干净落地 — 1 Protocol method + 4 backend impl (InMemory + Postgres ON CONFLICT JSONB merge / Neo4j Cypher MERGE + parallel-list strip-then-append / Nebula read-modify-write under EntityLock) + LineageEntityMerger step 6a 单调用 cutover + alias decorator bulk path 对称延伸 + 25 new tests + 1166 全 pass + grep-zero 10/10 不破。
12-invariant cross-check (W8-2 scope)
| # | Material | 状态 | 验证依据 |
|---|---|---|---|
| 1 | L1 graph data 不污染 | ✅ | bulk 操作 lineage SETs 仅;kg.jsonl raw extract 不动 |
| 9 | upsert transparent alias redirect | ✅ | decorator bulk path mirror single-upsert: 每 part record.name 走 alias.resolve_canonical → inner store;merger 总是 pin canonical 所以 redirect noop, 但对称契约保留 |
| 10 | DB column length cap | ✅ | bulk 不动 compacted_description 列(preserve existing,与 single upsert kwarg None semantic 一致);single sentinel write step 6b 仍走 single-upsert 因需写 compacted |
| 其他 | n/a | unaffected |
Architecture quality (4 backend symmetric)
| Backend | 实施 | 关键 |
|---|---|---|
| InMemory ref | asyncio.Lock 包 loop |
dedup (doc_id, parse_version) last-wins |
| Postgres | INSERT ... ON CONFLICT (collection_id, name) DO UPDATE 单语句,strip via jsonb_array_elements NOT EXISTS strip_keys_json + append 整 new_members_json/new_parts_json |
一 transaction atomic |
| Neo4j | Cypher MERGE + parallel-list strip-then-append + IN $strip_keys predicate (key string `<doc_id> |
`) |
| Nebula | EntityLock(target_name) + 单 read / Python merge / 单 write |
与 single upsert 同 RMW pattern, fold strip-then-append 到一次 |
4 backend 全 follow same dedup-by-(document_id, parse_version) pattern, 与 single upsert lineage SET semantic 一致。✅
关键 invariant: bulk path NEVER touches compacted_description
per PR description: "Bulk path NEVER touches compacted_description (preserves existing — same COALESCE-style semantic as single upsert with None)"。这与 single upsert Option B kwarg=None preserve 一致 ✅。
caller side (merger step 6a/6b) 分工正确:
- step 6a → bulk path (re-anchor source parts under target, 不写 compacted)
- step 6b → single upsert with
compacted_description=compacted(sentinel__curation_merge__final write)
避免了 bulk 写 None 误清 compacted_description 的 silent regression。👍
Caller cutover N×M → 1
merger step 6a 原 N source × M part = N*M sequential SQL roundtrip → 1 bulk call。性能改进显著(典型 3-10 source × 5-20 parts = 15-200 hits → 1 hit)。这是 Wave 7 我 task #6 CR sediment "N×M upsert 性能" (msg=22816e0d) Wave 8 candidate 的物理交付。Wave 7 → Wave 8 sediment 闭环。👍
Alias decorator bulk symmetry
decorator 同 redirect bulk path mirror single upsert: each record.name resolves through alias_map before forwarding to inner。3 new tests (test_redirect-each-part / no-alias-passthrough / empty-short-circuit) 钉这个对称性。
merger 总是 pin canonical (从 step 1 final_target = resolve_canonical(target_name) 起),所以 redirect 实际是 no-op (canonical → canonical)。但 future caller 写 aliased name 时 bulk 仍正确 redirect — 契约对称完整。
测试覆盖 25 new
- 7 InMemory contract tests(empty / create / replace / mismatched-names ValueError / dedup-within-input / preserves-compacted / last-entity-type-wins)
- 3 alias-redirect bulk tests(each-part redirect / no-alias passthrough / empty short-circuit)
- 1 merger step 6a cutover test rewrite(assert single bulk + single sentinel final write)
- 14 pre-existing retained
- = 1166 total pass + 10/10 grep-zero unaffected + ruff clean
1 个 minor sediment(不阻塞)
PR description 显式 sediment "Cross-backend integration test → Wave 8 follow-up; 4-backend Protocol contract structurally identical to single upsert which already has cross-backend integration coverage"。
合理 — bulk follows same pattern as single upsert, 单 upsert 已 cross-backend integration cover (task #1 #1754 11 cross-backend tests)。task #11 narrative e2e 也会 indirectly cover bulk path (因 merger 现在走 bulk)。如未来真有具体 backend bulk drift 风险,sediment 进 W8-X candidate "bulk Protocol cross-backend roundtrip integration tests"。
4-pattern + simple-stable
PR body Pattern A/B/C/D 全 paste,simple-stable 4 项全显式 ✅(1 Protocol method / 0 REST surface / 0 schema 改 / 0 alembic / no operator config)。
修完会 LGTM 的清单
实际上已经可以 merge ✅。
@bryce W8-2 干净落地 + 4 backend 对称设计 + bulk-vs-compacted 关键分工 + caller cutover 1 行替换 + 25 new tests — Wave 8 close-out implementer 模板高质量。👍
@符炫炜 LGTM,可 merge。Wave 8 5/5 close-out after #1771 merge:
- #15 W8-6 ✅
- #12 W8-1 ✅
- #14 W8-3 ✅
- #16 W8-7 ✅
- #13 W8-2 (本 PR) → done
- W8-8 follow-up #1768 ✅
- = 6 PR + W8-8 = 7 PR total in Wave 8
Wave 7 sediment 列表 W8-1~W8-9 中 W8-1/2/3/6/7 + W8-8 ship,W8-4 (alembic sequence) + W8-5 (MCP real agent e2e) + W8-9 (5 pre-existing tsc) + W8-10 (retrieval pipeline 消费 search_relations) 留 Wave 9 candidate。
…(W7-10 drift) (#1773) ## What broke Wave 7 task #10 (PR #1765) added ``list_entities`` to the ``LineageGraphStore`` Protocol + 4 backends. The ``LineageGraphStoreWithAliasRedirect`` decorator was NOT updated, and the spelled-out passthrough invariant test (``test_decorator_passthrough_for_non_redirected_methods``) only covers the methods that existed when it was written — so the gap landed silently. Production crash on ``GET /api/v2/collections/{id}/graphs``: ``` AttributeError: 'LineageGraphStoreWithAliasRedirect' object has no attribute 'list_entities' File "aperag/domains/knowledge_graph/service.py", line 149, in get_knowledge_graph entities = await store.list_entities(label=normalized_label, limit=query_max_nodes) ``` This is also the e2e-http-provider failure visible on every Wave 7+8 PR including the just-merged #1771 / #1772 — pre-existing on main since W7-10 close-out, masked because the e2e-http-smoke shapes don't exercise the graph view endpoint. ## Fix 1. Add ``list_entities`` passthrough method on the decorator. Filter is by ``label`` (entity type) + pagination, not by name — rows are already canonical (alias rows live in ``aperag_lineage_entity_alias``, not in the entity table). Same passthrough rationale as ``list_entity_labels`` / ``query_entities_by_keyword``; module docstring updated. 2. Add ``list_entities`` row to the spelled-out passthrough test. 3. **NEW meta-invariant test**: ``test_decorator_covers_every_lineage_graph_store_method`` — introspect ``LineageGraphStore`` Protocol via ``dir()`` and assert the decorator implements every public method. Catches the "decorator missing method" half of the drift gap that the spelled-out test couldn't catch (because spelled-out tests only cover methods that existed when written). A future Protocol addition without a decorator update now fails CI immediately with a clear "missing method(s): X" assertion. ## Hard-gate format checklist (Wave 7+8 mirror) ### 4-pattern pre-check matrix - [x] **#1 不丢失好的算法和设计**: passthrough rationale preserved per module docstring (filter-by-label not filter-by-name). - [x] **#2 尽快上线**: 1 fix + 2 new tests (1 spelled-out row + 1 meta-test). No new abstraction. - [x] **#3 简单稳定**: passthrough is byte-for-byte forward, mirror of ``list_entity_labels``. Meta-test uses stdlib introspection, no new dep. - [x] **#4 私有化部署免维护**: zero new env / config / dep. Backend- portable (decorator wraps the 4-backend Protocol; the actual ``list_entities`` implementation already covered by 4-backend compat tests). ### Simple-stable 4-guardrail - [x] No new abstraction. - [x] No backward-compat shim. - [x] No silent behavior change — fix is additive (method that didn't exist now exists; behavior matches 4-backend Protocol contract). - [x] Meta-test prevents this exact class of drift recurring. ## Test plan - [x] ``uv run pytest tests/unit_test/indexing/test_alias_redirect_store.py`` — **16 passed** (was 15, +1 meta-test, +1 spelled-out ``list_entities`` row covered by existing test function). - [x] ``uvx ruff check`` — clean. ``uvx ruff format --check`` — clean. - [ ] Production verify: ``GET /collections/{id}/graphs`` returns 200 after this lands (the original crash from main e2e-http-provider log) — will be visible on next PR's e2e-http-provider checks. Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Summary
Wave 8 task #13 (W8-2): adds a bulk-upsert primitive on the
LineageGraphStoreProtocol so callers consolidating N×M(description_part, lineage_member)tuples to the same target entity get one transaction / one round-trip instead of N×M sequential single upserts. CutsLineageEntityMerger.merge_entitiesstep 6a over to the bulk path.Why
GraphCurationService.merge_entities(the N-source-into-1-target user merge flow) re-anchors every source's description parts under the target name. With N source entities each carrying M description parts, that loop emitted N×M sequentialupsert_entity_with_lineageround-trips — one full transaction per part. For typical curation runs (3-10 sources × 5-20 parts each), that's 15-200 sequential SQL hits where one bulk write would do.Scope (5 numbered items)
bulk_upsert_entity_with_lineage_parts(*, parts: Sequence[tuple[EntityRecord, LineageMember]])onaperag/indexing/graph.py:LineageGraphStore. Allrecord.namevalues MUST share the same string (asserted asValueError). Empty parts is a no-op. Per-part dedup key is(document_id, parse_version)last-wins. Bulk path NEVER touchescompacted_description(preserves existing — sameCOALESCE-style semantic as single upsert withNone).asyncio.Lock-guarded loop inInMemoryLineageGraphStore.INSERT … ON CONFLICT (collection_id, name) DO UPDATEthat strips matching keys viajsonb_array_elementsNOT EXISTSagainst an incomingstrip_keys_jsonarray, then appends the whole newnew_members_json/new_parts_jsonarrays. One statement, atomic.IN $strip_keyson the<doc_id>|<pv>key string). Row-lock on MERGE serialises concurrent bulk ops on the same entity.EntityLock(target_name)acquire + single read / Python merge / write. Mirrors the existing read-modify-write pattern of single upsert but folds the strip-then-append over the set of incoming keys.Caller cutover
LineageEntityMerger.merge_entities(step 6a) — replaces the N×M loop with a single bulk call. Step 6b's sentinel write (__curation_merge__final write with unified+compacted text) still goes through single-upsert because it needs thecompacted_descriptioncolumn write that the bulk path intentionally doesn't carry.Alias-redirect decorator
LineageGraphStoreWithAliasRedirect— bulk path mirrors single upsert: each part'srecord.nameresolves through the alias map before forwarding to the inner store. The merger always passes records pinned to canonicalfinal_targetso redirect is a no-op in that flow, but symmetry with single-upsert contract means future callers writing to aliased names still get correct behaviour.12-invariant cross-check (Wave 7 §K.12)
compacted_descriptionText column unchanged (bulk preserves existing value). ✅4-pattern pre-check matrix
(document_id, parse_version)exactly — same key as single upsert. ✅Simple-stable 4-guardrail
Test plan
🤖 Generated with Claude Code