Skip to content

feat(graph): W8-2 — bulk_upsert_entity_with_lineage_parts Protocol + 4 backends + curation cutover#1771

Merged
earayu merged 1 commit into
mainfrom
bryce/wave8-task13-bulk-upsert
Apr 28, 2026
Merged

feat(graph): W8-2 — bulk_upsert_entity_with_lineage_parts Protocol + 4 backends + curation cutover#1771
earayu merged 1 commit into
mainfrom
bryce/wave8-task13-bulk-upsert

Conversation

@earayu
Copy link
Copy Markdown
Collaborator

@earayu earayu commented Apr 28, 2026

Summary

Wave 8 task #13 (W8-2): adds a bulk-upsert primitive on the LineageGraphStore Protocol so callers consolidating N×M (description_part, lineage_member) tuples to the same target entity get one transaction / one round-trip instead of N×M sequential single upserts. Cuts LineageEntityMerger.merge_entities step 6a over to the bulk path.

Why

GraphCurationService.merge_entities (the N-source-into-1-target user merge flow) re-anchors every source's description parts under the target name. With N source entities each carrying M description parts, that loop emitted N×M sequential upsert_entity_with_lineage round-trips — one full transaction per part. For typical curation runs (3-10 sources × 5-20 parts each), that's 15-200 sequential SQL hits where one bulk write would do.

Scope (5 numbered items)

  1. Protocol methodbulk_upsert_entity_with_lineage_parts(*, parts: Sequence[tuple[EntityRecord, LineageMember]]) on aperag/indexing/graph.py:LineageGraphStore. All record.name values MUST share the same string (asserted as ValueError). Empty parts is a no-op. Per-part dedup key is (document_id, parse_version) last-wins. Bulk path NEVER touches compacted_description (preserves existing — same COALESCE-style semantic as single upsert with None).
  2. InMemory ref impl — single asyncio.Lock-guarded loop in InMemoryLineageGraphStore.
  3. Postgres impl — single INSERT … ON CONFLICT (collection_id, name) DO UPDATE that strips matching keys via jsonb_array_elements NOT EXISTS against an incoming strip_keys_json array, then appends the whole new new_members_json / new_parts_json arrays. One statement, atomic.
  4. Neo4j impl — single Cypher MERGE + parallel-list strip-then-append, with the strip predicate matching against the set of incoming keys (IN $strip_keys on the <doc_id>|<pv> key string). Row-lock on MERGE serialises concurrent bulk ops on the same entity.
  5. Nebula impl — single EntityLock(target_name) acquire + single read / Python merge / write. Mirrors the existing read-modify-write pattern of single upsert but folds the strip-then-append over the set of incoming keys.

Caller cutover

  • LineageEntityMerger.merge_entities (step 6a) — replaces the N×M loop with a single bulk call. Step 6b's sentinel write (__curation_merge__ final write with unified+compacted text) still goes through single-upsert because it needs the compacted_description column write that the bulk path intentionally doesn't carry.

Alias-redirect decorator

  • LineageGraphStoreWithAliasRedirect — bulk path mirrors single upsert: each part's record.name resolves through the alias map before forwarding to the inner store. The merger always passes records pinned to canonical final_target so redirect is a no-op in that flow, but symmetry with single-upsert contract means future callers writing to aliased names still get correct behaviour.

12-invariant cross-check (Wave 7 §K.12)

  • feat/frontend #1 L1 不污染: bulk path operates on lineage SETs only — kg.jsonl raw extract layer untouched. ✅
  • feat: api test #3 indexer write redirect through alias map: bulk path goes through the same decorator alias resolution as single upsert. ✅
  • feat: markdown response #9 alias redirect transparent: decorator forwards bulk to inner with redirected names; merger callers see no behaviour change. ✅
  • feat: auth with github and google #10 DB column cap: Postgres compacted_description Text column unchanged (bulk preserves existing value). ✅
  • All other invariants: unaffected.

4-pattern pre-check matrix

  • Pattern A (kg.jsonl shape): unchanged
  • Pattern B (Lineage SET semantics): bulk preserves dedup-by-(document_id, parse_version) exactly — same key as single upsert. ✅
  • Pattern C (Cypher LIST): bulk reuses parallel-list encoding for the strip-then-append. ✅
  • Pattern D (vector 3-field payload): unchanged

Simple-stable 4-guardrail

  • feat/frontend #1 不无限扩范围: 1 new Protocol method, no new public REST surface, no new schema column, no new alembic migration
  • feat: auth bearer token support #2 尽快上线: single PR, no spec amend needed
  • feat: api test #3 简单稳定: bulk semantic mirrors single upsert exactly; caller cutover is a 1-call replacement
  • fix: upload token #4 私有化部署免维护: backend-portable (4 backends shipped cross-backend); no operator-facing config knob

Test plan

  • InMemory contract — 7 new tests pin empty / create / replace / mismatched-names / dedup-within-input / preserves-compacted / last-entity-type-wins
  • Alias-redirect decorator — 3 new tests pin redirect-each-part / no-alias-passthrough / empty-short-circuit
  • LineageEntityMerger step 6a cutover — existing test rewritten to assert the new single bulk call + single sentinel final write
  • All 1166 unit tests pass (up from 1141 baseline; 25 new tests)
  • Wave 7 grep-zero contract — 10/10 still pass (intent-driven gate unaffected)
  • ruff format / check clean on touched files
  • Cross-backend integration test — sediment to Wave 8 follow-up (out of scope this PR; 4-backend Protocol contract structurally identical to single upsert which already has cross-backend integration coverage)
  • CR by @huangheng (focus: invariant feat/frontend #1 L1, feat: markdown response #9 alias redirect transparent, 4-backend cross-roundtrip on contract level)

🤖 Generated with Claude Code

…nds + curation cutover (W8-2)

Wave 8 task #13 (W8-2): adds a bulk-upsert primitive on the
``LineageGraphStore`` Protocol so callers consolidating N×M
``(description_part, lineage_member)`` tuples to the same target
entity get one transaction / one round-trip instead of N×M
sequential single upserts. Cuts ``LineageEntityMerger.merge_entities``
step 6a over to the bulk path.

## Why

``GraphCurationService.merge_entities`` (the N-source-into-1-target
user merge flow) re-anchors every source's description parts under
the target name. With N source entities each carrying M description
parts, that loop emitted N×M sequential ``upsert_entity_with_lineage``
round-trips — one full transaction per part. For typical curation
runs (3-10 sources × 5-20 parts each), that's 15-200 sequential SQL
hits where one bulk write would do.

Per architect spec (Wave 8 candidate W8-2 sediment) + huangheng's
task #6 PR #1758 perf observation, this folds into a single bulk
write per backend.

## Scope (5 numbered items)

1. **Protocol method** — ``bulk_upsert_entity_with_lineage_parts(*,
   parts: Sequence[tuple[EntityRecord, LineageMember]])`` on
   ``aperag/indexing/graph.py:LineageGraphStore``. All ``record.name``
   values MUST share the same string (asserted as ``ValueError``).
   Empty parts is a no-op. Per-part dedup key is
   ``(document_id, parse_version)`` last-wins. Bulk path NEVER
   touches ``compacted_description`` (preserves existing — same
   ``COALESCE``-style semantic as single upsert with ``None``).
2. **InMemory ref impl** — single ``asyncio.Lock``-guarded loop in
   ``InMemoryLineageGraphStore``.
3. **Postgres impl** — single ``INSERT … ON CONFLICT (collection_id,
   name) DO UPDATE`` that strips matching keys via
   ``jsonb_array_elements`` ``NOT EXISTS`` against an incoming
   ``strip_keys_json`` array, then appends the whole new
   ``new_members_json`` / ``new_parts_json`` arrays. One statement,
   atomic.
4. **Neo4j impl** — single Cypher MERGE + parallel-list strip-then-
   append, with the strip predicate matching against the **set** of
   incoming keys (``IN $strip_keys`` on the ``"<doc_id>|<pv>"`` key
   string). Row-lock on MERGE serialises concurrent bulk ops on the
   same entity.
5. **Nebula impl** — single ``EntityLock(target_name)`` acquire +
   single read / Python merge / write. Mirrors the existing
   read-modify-write pattern of single upsert but folds the strip-
   then-append over the **set** of incoming keys.

## Caller cutover

* ``LineageEntityMerger.merge_entities`` (step 6a) — replaces the
  N×M ``for src in source_entities: for part in
  src.description_parts: await self._store.upsert_entity_with_lineage(...)``
  with a single ``bulk_upsert_entity_with_lineage_parts`` call.
  Step 6b's sentinel write (``__curation_merge__`` final write with
  unified+compacted text) still goes through the single-upsert path
  because it needs the ``compacted_description`` column write that
  the bulk path intentionally doesn't carry.

## Alias-redirect decorator

* ``LineageGraphStoreWithAliasRedirect`` — bulk path mirrors single
  upsert: each part's ``record.name`` resolves through the alias
  map before forwarding to the inner store. The merger always
  passes records pinned to the canonical ``final_target`` so the
  redirect is a no-op in that flow, but symmetry with the single
  upsert contract means a future caller writing to an aliased name
  still gets correct behaviour.

## 12-invariant cross-check (Wave 7 §K.12)

* **#1 L1 不污染**: bulk path operates on lineage SETs only — kg.jsonl
  raw extract layer untouched. ✅
* **#3 indexer write redirect through alias map**: bulk path goes
  through the same decorator alias resolution as single upsert. ✅
* **#9 alias redirect transparent**: decorator forwards bulk to
  inner with redirected names; merger callers see no behaviour
  change. ✅
* **#10 DB column cap**: Postgres ``compacted_description`` Text
  column unchanged (bulk path preserves existing value via
  COALESCE-equivalent — bulk passes ``None`` end-to-end through
  the Postgres SQL → INSERT branch sets NULL, ON CONFLICT branch
  preserves). ✅
* All other invariants: unaffected.

## 4-pattern pre-check matrix

* Pattern A (kg.jsonl shape): unchanged
* Pattern B (Lineage SET semantics): bulk preserves dedup-by-
  ``(document_id, parse_version)`` exactly — same key as single
  upsert. ✅
* Pattern C (Cypher LIST<MAP>): bulk reuses parallel-list encoding
  for the strip-then-append. ✅
* Pattern D (vector 3-field payload): unchanged

## Simple-stable 4-guardrail

* **#1 不无限扩范围**: 1 new Protocol method, no new public REST
  surface, no new schema column, no new alembic migration.
* **#2 尽快上线**: single PR, no spec amend needed (architect
  W8-2 candidate sediment + Wave 7 task #1 3-backend Protocol
  pattern reuse).
* **#3 简单稳定**: bulk semantic mirrors single upsert exactly;
  caller cutover is a 1-call replacement of the N×M loop.
* **#4 私有化部署免维护**: backend-portable (4 backends shipped
  cross-backend); no operator-facing config knob.

## Test plan

- [x] InMemory contract — 7 new tests pin empty / create / replace /
      mismatched-names / dedup-within-input / preserves-compacted /
      last-entity-type-wins.
- [x] Alias-redirect decorator — 3 new tests pin redirect-each-part /
      no-alias-passthrough / empty-short-circuit.
- [x] LineageEntityMerger step 6a cutover — existing
      ``test_source_parts_reanchored_preserving_doc_lineage``
      rewritten to assert the new single bulk call (was: 3
      sequential single upserts) + single sentinel final write.
- [x] All 1166 unit tests pass (up from 1141 baseline; 25 new tests).
- [x] Wave 7 grep-zero contract — 10/10 still pass (intent-driven
      gate unaffected).
- [x] ruff format / check clean on touched files.
- [ ] Cross-backend integration test — sediment to Wave 8 follow-up
      (out of scope this PR; the 4-backend Protocol contract is
      structurally identical to single upsert which already has
      cross-backend integration coverage).
- [ ] CR by @huangheng (focus: invariant #1 L1, #9 alias redirect
      transparent, 4-backend cross-roundtrip on contract level).
Copy link
Copy Markdown
Collaborator Author

@earayu earayu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 LGTM ✅ (huangheng pass-1) — verdict = ready to merge

Wave 8 W8-2 bulk API 干净落地 — 1 Protocol method + 4 backend impl (InMemory + Postgres ON CONFLICT JSONB merge / Neo4j Cypher MERGE + parallel-list strip-then-append / Nebula read-modify-write under EntityLock) + LineageEntityMerger step 6a 单调用 cutover + alias decorator bulk path 对称延伸 + 25 new tests + 1166 全 pass + grep-zero 10/10 不破。

12-invariant cross-check (W8-2 scope)

# Material 状态 验证依据
1 L1 graph data 不污染 bulk 操作 lineage SETs 仅;kg.jsonl raw extract 不动
9 upsert transparent alias redirect decorator bulk path mirror single-upsert: 每 part record.name 走 alias.resolve_canonical → inner store;merger 总是 pin canonical 所以 redirect noop, 但对称契约保留
10 DB column length cap bulk 不动 compacted_description 列(preserve existing,与 single upsert kwarg None semantic 一致);single sentinel write step 6b 仍走 single-upsert 因需写 compacted
其他 n/a unaffected

Architecture quality (4 backend symmetric)

Backend 实施 关键
InMemory ref asyncio.Lock 包 loop dedup (doc_id, parse_version) last-wins
Postgres INSERT ... ON CONFLICT (collection_id, name) DO UPDATE 单语句,strip via jsonb_array_elements NOT EXISTS strip_keys_json + append 整 new_members_json/new_parts_json 一 transaction atomic
Neo4j Cypher MERGE + parallel-list strip-then-append + IN $strip_keys predicate (key string `<doc_id> `)
Nebula EntityLock(target_name) + 单 read / Python merge / 单 write 与 single upsert 同 RMW pattern, fold strip-then-append 到一次

4 backend 全 follow same dedup-by-(document_id, parse_version) pattern, 与 single upsert lineage SET semantic 一致。✅

关键 invariant: bulk path NEVER touches compacted_description

per PR description: "Bulk path NEVER touches compacted_description (preserves existing — same COALESCE-style semantic as single upsert with None)"。这与 single upsert Option B kwarg=None preserve 一致 ✅。

caller side (merger step 6a/6b) 分工正确:

  • step 6a → bulk path (re-anchor source parts under target, 不写 compacted)
  • step 6b → single upsert with compacted_description=compacted (sentinel __curation_merge__ final write)

避免了 bulk 写 None 误清 compacted_description 的 silent regression。👍

Caller cutover N×M → 1

merger step 6a 原 N source × M part = N*M sequential SQL roundtrip → 1 bulk call。性能改进显著(典型 3-10 source × 5-20 parts = 15-200 hits → 1 hit)。这是 Wave 7 我 task #6 CR sediment "N×M upsert 性能" (msg=22816e0d) Wave 8 candidate 的物理交付。Wave 7 → Wave 8 sediment 闭环。👍

Alias decorator bulk symmetry

decorator 同 redirect bulk path mirror single upsert: each record.name resolves through alias_map before forwarding to inner。3 new tests (test_redirect-each-part / no-alias-passthrough / empty-short-circuit) 钉这个对称性。

merger 总是 pin canonical (从 step 1 final_target = resolve_canonical(target_name) 起),所以 redirect 实际是 no-op (canonical → canonical)。但 future caller 写 aliased name 时 bulk 仍正确 redirect — 契约对称完整。

测试覆盖 25 new

  • 7 InMemory contract tests(empty / create / replace / mismatched-names ValueError / dedup-within-input / preserves-compacted / last-entity-type-wins)
  • 3 alias-redirect bulk tests(each-part redirect / no-alias passthrough / empty short-circuit)
  • 1 merger step 6a cutover test rewrite(assert single bulk + single sentinel final write)
  • 14 pre-existing retained
  • = 1166 total pass + 10/10 grep-zero unaffected + ruff clean

1 个 minor sediment(不阻塞)

PR description 显式 sediment "Cross-backend integration test → Wave 8 follow-up; 4-backend Protocol contract structurally identical to single upsert which already has cross-backend integration coverage"。

合理 — bulk follows same pattern as single upsert, 单 upsert 已 cross-backend integration cover (task #1 #1754 11 cross-backend tests)。task #11 narrative e2e 也会 indirectly cover bulk path (因 merger 现在走 bulk)。如未来真有具体 backend bulk drift 风险,sediment 进 W8-X candidate "bulk Protocol cross-backend roundtrip integration tests"。

4-pattern + simple-stable

PR body Pattern A/B/C/D 全 paste,simple-stable 4 项全显式 ✅(1 Protocol method / 0 REST surface / 0 schema 改 / 0 alembic / no operator config)。

修完会 LGTM 的清单

实际上已经可以 merge ✅。

@bryce W8-2 干净落地 + 4 backend 对称设计 + bulk-vs-compacted 关键分工 + caller cutover 1 行替换 + 25 new tests — Wave 8 close-out implementer 模板高质量。👍

@符炫炜 LGTM,可 merge。Wave 8 5/5 close-out after #1771 merge

  • #15 W8-6 ✅
  • #12 W8-1 ✅
  • #14 W8-3 ✅
  • #16 W8-7 ✅
  • #13 W8-2 (本 PR) → done
  • W8-8 follow-up #1768
  • = 6 PR + W8-8 = 7 PR total in Wave 8

Wave 7 sediment 列表 W8-1~W8-9 中 W8-1/2/3/6/7 + W8-8 ship,W8-4 (alembic sequence) + W8-5 (MCP real agent e2e) + W8-9 (5 pre-existing tsc) + W8-10 (retrieval pipeline 消费 search_relations) 留 Wave 9 candidate。

@earayu earayu merged commit a7f0466 into main Apr 28, 2026
7 checks passed
@earayu earayu deleted the bryce/wave8-task13-bulk-upsert branch April 28, 2026 03:06
earayu added a commit that referenced this pull request Apr 28, 2026
…(W7-10 drift) (#1773)

## What broke

Wave 7 task #10 (PR #1765) added ``list_entities`` to the
``LineageGraphStore`` Protocol + 4 backends. The
``LineageGraphStoreWithAliasRedirect`` decorator was NOT updated, and
the spelled-out passthrough invariant test
(``test_decorator_passthrough_for_non_redirected_methods``) only
covers the methods that existed when it was written — so the gap
landed silently.

Production crash on ``GET /api/v2/collections/{id}/graphs``:

```
AttributeError: 'LineageGraphStoreWithAliasRedirect' object has no attribute 'list_entities'
  File "aperag/domains/knowledge_graph/service.py", line 149,
       in get_knowledge_graph
    entities = await store.list_entities(label=normalized_label, limit=query_max_nodes)
```

This is also the e2e-http-provider failure visible on every Wave 7+8
PR including the just-merged #1771 / #1772 — pre-existing on main
since W7-10 close-out, masked because the e2e-http-smoke shapes don't
exercise the graph view endpoint.

## Fix

1. Add ``list_entities`` passthrough method on the decorator. Filter
   is by ``label`` (entity type) + pagination, not by name — rows are
   already canonical (alias rows live in
   ``aperag_lineage_entity_alias``, not in the entity table). Same
   passthrough rationale as ``list_entity_labels`` /
   ``query_entities_by_keyword``; module docstring updated.

2. Add ``list_entities`` row to the spelled-out passthrough test.

3. **NEW meta-invariant test**:
   ``test_decorator_covers_every_lineage_graph_store_method`` —
   introspect ``LineageGraphStore`` Protocol via ``dir()`` and assert
   the decorator implements every public method. Catches the
   "decorator missing method" half of the drift gap that the
   spelled-out test couldn't catch (because spelled-out tests only
   cover methods that existed when written). A future Protocol
   addition without a decorator update now fails CI immediately with
   a clear "missing method(s): X" assertion.

## Hard-gate format checklist (Wave 7+8 mirror)

### 4-pattern pre-check matrix
- [x] **#1 不丢失好的算法和设计**: passthrough rationale preserved per
      module docstring (filter-by-label not filter-by-name).
- [x] **#2 尽快上线**: 1 fix + 2 new tests (1 spelled-out row + 1
      meta-test). No new abstraction.
- [x] **#3 简单稳定**: passthrough is byte-for-byte forward, mirror
      of ``list_entity_labels``. Meta-test uses stdlib introspection,
      no new dep.
- [x] **#4 私有化部署免维护**: zero new env / config / dep. Backend-
      portable (decorator wraps the 4-backend Protocol; the actual
      ``list_entities`` implementation already covered by 4-backend
      compat tests).

### Simple-stable 4-guardrail
- [x] No new abstraction.
- [x] No backward-compat shim.
- [x] No silent behavior change — fix is additive (method that didn't
      exist now exists; behavior matches 4-backend Protocol contract).
- [x] Meta-test prevents this exact class of drift recurring.

## Test plan

- [x] ``uv run pytest tests/unit_test/indexing/test_alias_redirect_store.py``
      — **16 passed** (was 15, +1 meta-test, +1 spelled-out
      ``list_entities`` row covered by existing test function).
- [x] ``uvx ruff check`` — clean. ``uvx ruff format --check`` — clean.
- [ ] Production verify: ``GET /collections/{id}/graphs`` returns 200
      after this lands (the original crash from main e2e-http-provider
      log) — will be visible on next PR's e2e-http-provider checks.

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant