Skip to content

Commit a7f0466

Browse files
authored
feat(graph): bulk_upsert_entity_with_lineage_parts Protocol + 4 backends + curation cutover (W8-2) (#1771)
Wave 8 task #13 (W8-2): adds a bulk-upsert primitive on the ``LineageGraphStore`` Protocol so callers consolidating N×M ``(description_part, lineage_member)`` tuples to the same target entity get one transaction / one round-trip instead of N×M sequential single upserts. Cuts ``LineageEntityMerger.merge_entities`` step 6a over to the bulk path. ## Why ``GraphCurationService.merge_entities`` (the N-source-into-1-target user merge flow) re-anchors every source's description parts under the target name. With N source entities each carrying M description parts, that loop emitted N×M sequential ``upsert_entity_with_lineage`` round-trips — one full transaction per part. For typical curation runs (3-10 sources × 5-20 parts each), that's 15-200 sequential SQL hits where one bulk write would do. Per architect spec (Wave 8 candidate W8-2 sediment) + huangheng's task #6 PR #1758 perf observation, this folds into a single bulk write per backend. ## Scope (5 numbered items) 1. **Protocol method** — ``bulk_upsert_entity_with_lineage_parts(*, parts: Sequence[tuple[EntityRecord, LineageMember]])`` on ``aperag/indexing/graph.py:LineageGraphStore``. All ``record.name`` values MUST share the same string (asserted as ``ValueError``). Empty parts is a no-op. Per-part dedup key is ``(document_id, parse_version)`` last-wins. Bulk path NEVER touches ``compacted_description`` (preserves existing — same ``COALESCE``-style semantic as single upsert with ``None``). 2. **InMemory ref impl** — single ``asyncio.Lock``-guarded loop in ``InMemoryLineageGraphStore``. 3. **Postgres impl** — single ``INSERT … ON CONFLICT (collection_id, name) DO UPDATE`` that strips matching keys via ``jsonb_array_elements`` ``NOT EXISTS`` against an incoming ``strip_keys_json`` array, then appends the whole new ``new_members_json`` / ``new_parts_json`` arrays. One statement, atomic. 4. **Neo4j impl** — single Cypher MERGE + parallel-list strip-then- append, with the strip predicate matching against the **set** of incoming keys (``IN $strip_keys`` on the ``"<doc_id>|<pv>"`` key string). Row-lock on MERGE serialises concurrent bulk ops on the same entity. 5. **Nebula impl** — single ``EntityLock(target_name)`` acquire + single read / Python merge / write. Mirrors the existing read-modify-write pattern of single upsert but folds the strip- then-append over the **set** of incoming keys. ## Caller cutover * ``LineageEntityMerger.merge_entities`` (step 6a) — replaces the N×M ``for src in source_entities: for part in src.description_parts: await self._store.upsert_entity_with_lineage(...)`` with a single ``bulk_upsert_entity_with_lineage_parts`` call. Step 6b's sentinel write (``__curation_merge__`` final write with unified+compacted text) still goes through the single-upsert path because it needs the ``compacted_description`` column write that the bulk path intentionally doesn't carry. ## Alias-redirect decorator * ``LineageGraphStoreWithAliasRedirect`` — bulk path mirrors single upsert: each part's ``record.name`` resolves through the alias map before forwarding to the inner store. The merger always passes records pinned to the canonical ``final_target`` so the redirect is a no-op in that flow, but symmetry with the single upsert contract means a future caller writing to an aliased name still gets correct behaviour. ## 12-invariant cross-check (Wave 7 §K.12) * **#1 L1 不污染**: bulk path operates on lineage SETs only — kg.jsonl raw extract layer untouched. ✅ * **#3 indexer write redirect through alias map**: bulk path goes through the same decorator alias resolution as single upsert. ✅ * **#9 alias redirect transparent**: decorator forwards bulk to inner with redirected names; merger callers see no behaviour change. ✅ * **#10 DB column cap**: Postgres ``compacted_description`` Text column unchanged (bulk path preserves existing value via COALESCE-equivalent — bulk passes ``None`` end-to-end through the Postgres SQL → INSERT branch sets NULL, ON CONFLICT branch preserves). ✅ * All other invariants: unaffected. ## 4-pattern pre-check matrix * Pattern A (kg.jsonl shape): unchanged * Pattern B (Lineage SET semantics): bulk preserves dedup-by- ``(document_id, parse_version)`` exactly — same key as single upsert. ✅ * Pattern C (Cypher LIST<MAP>): bulk reuses parallel-list encoding for the strip-then-append. ✅ * Pattern D (vector 3-field payload): unchanged ## Simple-stable 4-guardrail * **#1 不无限扩范围**: 1 new Protocol method, no new public REST surface, no new schema column, no new alembic migration. * **#2 尽快上线**: single PR, no spec amend needed (architect W8-2 candidate sediment + Wave 7 task #1 3-backend Protocol pattern reuse). * **#3 简单稳定**: bulk semantic mirrors single upsert exactly; caller cutover is a 1-call replacement of the N×M loop. * **#4 私有化部署免维护**: backend-portable (4 backends shipped cross-backend); no operator-facing config knob. ## Test plan - [x] InMemory contract — 7 new tests pin empty / create / replace / mismatched-names / dedup-within-input / preserves-compacted / last-entity-type-wins. - [x] Alias-redirect decorator — 3 new tests pin redirect-each-part / no-alias-passthrough / empty-short-circuit. - [x] LineageEntityMerger step 6a cutover — existing ``test_source_parts_reanchored_preserving_doc_lineage`` rewritten to assert the new single bulk call (was: 3 sequential single upserts) + single sentinel final write. - [x] All 1166 unit tests pass (up from 1141 baseline; 25 new tests). - [x] Wave 7 grep-zero contract — 10/10 still pass (intent-driven gate unaffected). - [x] ruff format / check clean on touched files. - [ ] Cross-backend integration test — sediment to Wave 8 follow-up (out of scope this PR; the 4-backend Protocol contract is structurally identical to single upsert which already has cross-backend integration coverage). - [ ] CR by @huangheng (focus: invariant #1 L1, #9 alias redirect transparent, 4-backend cross-roundtrip on contract level).
1 parent 66c6861 commit a7f0466

9 files changed

Lines changed: 592 additions & 35 deletions

File tree

aperag/graph_curation/lineage_merge.py

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -260,28 +260,35 @@ async def merge_entities(
260260

261261
# Step 6a — re-anchor source parts under the target name,
262262
# preserving their original lineage so per-doc tracking is not
263-
# lost (invariant #1). DescriptionPart carries no chunk_ids of
264-
# its own; we look up the matching LineageMember by
265-
# ``(document_id, parse_version)`` to recover them.
263+
# lost (invariant #1). Wave 8 W8-2: the N×M loop is folded
264+
# into a single ``bulk_upsert_entity_with_lineage_parts`` call
265+
# so the whole consolidation is one transaction / one round-
266+
# trip per backend, not N×M sequential upserts.
267+
bulk_parts: list[tuple[EntityRecord, LineageMember]] = []
266268
for src in source_entities:
267269
lineage_by_key = {m.key(): m for m in src.source_lineage}
270+
tenant = self._tenant_scope_key_for(src)
268271
for part in src.description_parts:
269272
member = lineage_by_key.get(part.key())
270273
chunk_ids = tuple(member.chunk_ids) if member is not None else ()
271-
await self._store.upsert_entity_with_lineage(
272-
record=EntityRecord(
273-
name=final_target,
274-
entity_type=target_entity.entity_type,
275-
description=part.text,
276-
source_chunk_ids=chunk_ids,
277-
),
278-
lineage=LineageMember(
279-
document_id=part.document_id,
280-
parse_version=part.parse_version,
281-
tenant_scope_key=self._tenant_scope_key_for(src),
282-
chunk_ids=chunk_ids,
283-
),
274+
bulk_parts.append(
275+
(
276+
EntityRecord(
277+
name=final_target,
278+
entity_type=target_entity.entity_type,
279+
description=part.text,
280+
source_chunk_ids=chunk_ids,
281+
),
282+
LineageMember(
283+
document_id=part.document_id,
284+
parse_version=part.parse_version,
285+
tenant_scope_key=tenant,
286+
chunk_ids=chunk_ids,
287+
),
288+
)
284289
)
290+
if bulk_parts:
291+
await self._store.bulk_upsert_entity_with_lineage_parts(parts=bulk_parts)
285292

286293
# Step 6b — final write with unified text + compacted_description
287294
# under the curation-merge sentinel lineage.

aperag/indexing/alias_redirect_store.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,34 @@ async def upsert_relation_with_lineage(
163163
compacted_description=compacted_description,
164164
)
165165

166+
async def bulk_upsert_entity_with_lineage_parts(self, *, parts) -> None:
167+
# Wave 8 W8-2: alias-redirect on the bulk write surface,
168+
# mirror of the single-upsert path. Resolve each ``record.name``
169+
# through the alias map; if any tuple's name redirects, the
170+
# whole bulk write retargets to the canonical name. The
171+
# ``LineageEntityMerger`` is the only known caller and always
172+
# passes records already pinned to the canonical
173+
# ``final_target`` so the redirect is a no-op in that flow —
174+
# but we apply it here for symmetry with the single upsert
175+
# contract (a future caller writing to an aliased name still
176+
# gets the right behaviour).
177+
if not parts:
178+
return
179+
redirected: list[tuple[EntityRecord, LineageMember]] = []
180+
for record, lineage in parts:
181+
canonical = await self._alias_repo.resolve_canonical(collection_id=self._collection_id, name=record.name)
182+
if canonical != record.name:
183+
logger.debug(
184+
"alias_redirect: bulk entity part %r → %r (collection=%s)",
185+
record.name,
186+
canonical,
187+
self._collection_id,
188+
)
189+
redirected.append((replace(record, name=canonical), lineage))
190+
else:
191+
redirected.append((record, lineage))
192+
await self._inner.bulk_upsert_entity_with_lineage_parts(parts=redirected)
193+
166194
# ------------------------------------------------------------------
167195
# Passthrough — forward every non-redirected Protocol method
168196
# unchanged. Pinned by

aperag/indexing/graph.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,39 @@ async def upsert_relation_with_lineage(
572572
a non-None string overwrites.
573573
"""
574574

575+
async def bulk_upsert_entity_with_lineage_parts(
576+
self,
577+
*,
578+
parts: Sequence[tuple[EntityRecord, LineageMember]],
579+
) -> None:
580+
"""Wave 8 W8-2: atomic bulk variant of
581+
:meth:`upsert_entity_with_lineage`.
582+
583+
Each ``(record, lineage)`` tuple lands as a separate description
584+
part with the same ``(document_id, parse_version)`` dedup key
585+
the single upsert uses; semantically equivalent to looping
586+
:meth:`upsert_entity_with_lineage` but executed in **one
587+
transaction / one round-trip** so callers consolidating N×M
588+
parts (e.g. :class:`LineageEntityMerger.merge_entities` step 6a)
589+
get O(1) network round-trips instead of O(N×M).
590+
591+
Contract:
592+
593+
* All ``record.name`` values MUST share the same string —
594+
backends MAY assert and raise ``ValueError`` if they don't.
595+
* Empty ``parts`` is a no-op.
596+
* Per-part ``record.entity_type`` follows the single-upsert
597+
"most recently observed value wins" rule (last tuple's type
598+
is the post-write entity_type for the row).
599+
* ``compacted_description`` is intentionally **not** a
600+
parameter — the bulk path never touches the column. Callers
601+
that need to set it run a separate single
602+
:meth:`upsert_entity_with_lineage` afterwards (the merger's
603+
step 6b sentinel write does exactly this).
604+
* Forward-only retry safety: the dedup key is per-part, so a
605+
mid-flight crash + retry replays each tuple idempotently.
606+
"""
607+
575608
async def get_entity(self, entity_name: str) -> EntityWithLineage | None:
576609
"""Read-path helper used by tests / read primitives. Returns
577610
the canonical lineage view, or ``None`` if the row was GC'd.
@@ -893,6 +926,32 @@ async def upsert_relation_with_lineage(
893926
if compacted_description is not None:
894927
row.compacted_description = compacted_description
895928

929+
async def bulk_upsert_entity_with_lineage_parts(
930+
self,
931+
*,
932+
parts: Sequence[tuple[EntityRecord, LineageMember]],
933+
) -> None:
934+
if not parts:
935+
return
936+
target_name = parts[0][0].name
937+
if any(record.name != target_name for record, _ in parts):
938+
raise ValueError("bulk_upsert_entity_with_lineage_parts: all records must share the same name")
939+
async with self._guard:
940+
row = self._entities.get(target_name)
941+
if row is None:
942+
row = _InMemoryEntityRow(name=target_name, entity_type=parts[0][0].entity_type)
943+
self._entities[target_name] = row
944+
for record, lineage in parts:
945+
# Type may evolve as new docs refine the entity; keep
946+
# the most recently observed value (mirror single upsert).
947+
row.entity_type = record.entity_type
948+
row.source_lineage[lineage.key()] = lineage
949+
row.description_parts[lineage.key()] = DescriptionPart(
950+
document_id=lineage.document_id,
951+
parse_version=lineage.parse_version,
952+
text=record.description,
953+
)
954+
896955
# ---- read path --------------------------------------------------
897956

898957
async def get_entity(self, entity_name: str) -> EntityWithLineage | None:

aperag/indexing/graph_storage/nebula.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,72 @@ def _upsert() -> None:
845845

846846
await asyncio.to_thread(_upsert)
847847

848+
async def bulk_upsert_entity_with_lineage_parts(
849+
self,
850+
*,
851+
parts,
852+
) -> None:
853+
"""Wave 8 W8-2: Nebula bulk variant — single ``EntityLock``
854+
acquire + single read-modify-write applies the whole ``parts``
855+
list. Reuses the read/Python-merge/write pattern of
856+
:meth:`upsert_entity_with_lineage` but folds the strip-then-
857+
append over the **set** of incoming keys, so N×M parts collapse
858+
to one write.
859+
"""
860+
if not parts:
861+
return
862+
target_name = parts[0][0].name
863+
if any(record.name != target_name for record, _ in parts):
864+
raise ValueError("bulk_upsert_entity_with_lineage_parts: all records must share the same name")
865+
866+
# Dedup last-wins by (document_id, parse_version).
867+
deduped: dict[tuple[str, str], tuple[EntityRecord, LineageMember]] = {}
868+
for record, lineage in parts:
869+
deduped[(lineage.document_id, lineage.parse_version)] = (record, lineage)
870+
871+
new_members_in: list[LineageMember] = []
872+
new_parts_in: list[DescriptionPart] = []
873+
last_entity_type: str = parts[0][0].entity_type
874+
for record, lineage in deduped.values():
875+
new_members_in.append(lineage)
876+
new_parts_in.append(
877+
DescriptionPart(
878+
document_id=lineage.document_id,
879+
parse_version=lineage.parse_version,
880+
text=record.description,
881+
)
882+
)
883+
last_entity_type = record.entity_type
884+
885+
keys_to_strip = set(deduped.keys())
886+
887+
await self.ensure_schema()
888+
async with self._entity_lock.acquire(target_name):
889+
890+
def _upsert() -> None:
891+
row = self._read_entity_lineage(target_name)
892+
if row is None:
893+
merged_members = list(new_members_in)
894+
merged_parts = list(new_parts_in)
895+
existing_compacted: str | None = None
896+
else:
897+
_existing_type, members, parts_existing, existing_compacted = row
898+
kept_members = [m for m in members if m.key() not in keys_to_strip]
899+
kept_parts = [p for p in parts_existing if p.key() not in keys_to_strip]
900+
merged_members = kept_members + new_members_in
901+
merged_parts = kept_parts + new_parts_in
902+
self._write_entity_vertex(
903+
name=target_name,
904+
type_value=last_entity_type,
905+
source_lineage=merged_members,
906+
description_parts=merged_parts,
907+
# Bulk path never touches compacted_description
908+
# (preserves existing, mirror Postgres / Neo4j).
909+
compacted_description=existing_compacted,
910+
)
911+
912+
await asyncio.to_thread(_upsert)
913+
848914
async def upsert_relation_with_lineage(
849915
self,
850916
*,

aperag/indexing/graph_storage/neo4j.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,98 @@ async def upsert_entity_with_lineage(
440440
compacted_description=compacted_description,
441441
)
442442

443+
async def bulk_upsert_entity_with_lineage_parts(
444+
self,
445+
*,
446+
parts,
447+
) -> None:
448+
"""Wave 8 W8-2: bulk variant — single Cypher statement covers
449+
the whole ``parts`` list. Strip-then-append is expressed against
450+
the **set** of incoming ``(document_id, parse_version)`` keys
451+
rather than a single key, so the MERGE row-lock still serialises
452+
concurrent bulk ops against the same entity.
453+
"""
454+
if not parts:
455+
return
456+
target_name = parts[0][0].name
457+
if any(record.name != target_name for record, _ in parts):
458+
raise ValueError("bulk_upsert_entity_with_lineage_parts: all records must share the same name")
459+
460+
# Dedup last-wins by (document_id, parse_version).
461+
deduped: dict[tuple[str, str], tuple[EntityRecord, LineageMember]] = {}
462+
for record, lineage in parts:
463+
deduped[(lineage.document_id, lineage.parse_version)] = (record, lineage)
464+
465+
new_member_jsons: list[str] = []
466+
new_part_jsons: list[str] = []
467+
new_doc_ids: list[str] = []
468+
new_parse_versions: list[str] = []
469+
last_entity_type: str = parts[0][0].entity_type
470+
for record, lineage in deduped.values():
471+
new_member_jsons.append(_lineage_member_json(lineage))
472+
new_part_jsons.append(
473+
_description_part_json(
474+
DescriptionPart(
475+
document_id=lineage.document_id,
476+
parse_version=lineage.parse_version,
477+
text=record.description,
478+
)
479+
)
480+
)
481+
new_doc_ids.append(lineage.document_id)
482+
new_parse_versions.append(lineage.parse_version)
483+
last_entity_type = record.entity_type
484+
485+
# ``strip_keys`` is a list of strings ``"<doc_id>|<parse_version>"`` —
486+
# Cypher list-of-string membership is the easiest way to express
487+
# "drop element if its key matches *any* incoming key" without
488+
# nested list-comprehensions over a list of maps (which is harder
489+
# to read and the same complexity).
490+
strip_keys = [f"{doc_id}|{parse_version}" for (doc_id, parse_version) in deduped.keys()]
491+
492+
query = (
493+
f"MERGE (n:{_ENTITY_LABEL} {{collection_id: $collection_id, name: $name}}) "
494+
f"ON CREATE SET "
495+
f" n.source_lineage = [], "
496+
f" n.source_lineage_doc_ids = [], "
497+
f" n.source_lineage_parse_versions = [], "
498+
f" n.description_parts = [], "
499+
f" n.description_parts_doc_ids = [], "
500+
f" n.description_parts_parse_versions = [], "
501+
f" n.compacted_description = NULL, "
502+
f" n.gmt_created = datetime() "
503+
f"WITH n, "
504+
f" [i IN range(0, size(n.source_lineage_doc_ids) - 1) "
505+
f" WHERE NOT (n.source_lineage_doc_ids[i] + '|' + n.source_lineage_parse_versions[i]) "
506+
f" IN $strip_keys] AS sl_keep, "
507+
f" [i IN range(0, size(n.description_parts_doc_ids) - 1) "
508+
f" WHERE NOT (n.description_parts_doc_ids[i] + '|' + n.description_parts_parse_versions[i]) "
509+
f" IN $strip_keys] AS dp_keep "
510+
f"SET n.entity_type = $entity_type, "
511+
f" n.source_lineage = [i IN sl_keep | n.source_lineage[i]] + $new_member_jsons, "
512+
f" n.source_lineage_doc_ids = [i IN sl_keep | n.source_lineage_doc_ids[i]] + $new_doc_ids, "
513+
f" n.source_lineage_parse_versions = "
514+
f" [i IN sl_keep | n.source_lineage_parse_versions[i]] + $new_parse_versions, "
515+
f" n.description_parts = [i IN dp_keep | n.description_parts[i]] + $new_part_jsons, "
516+
f" n.description_parts_doc_ids = "
517+
f" [i IN dp_keep | n.description_parts_doc_ids[i]] + $new_doc_ids, "
518+
f" n.description_parts_parse_versions = "
519+
f" [i IN dp_keep | n.description_parts_parse_versions[i]] + $new_parse_versions, "
520+
f" n.gmt_updated = datetime()"
521+
)
522+
async with self._session() as session:
523+
await session.run(
524+
query,
525+
collection_id=self._collection_id,
526+
name=target_name,
527+
entity_type=last_entity_type,
528+
strip_keys=strip_keys,
529+
new_member_jsons=new_member_jsons,
530+
new_part_jsons=new_part_jsons,
531+
new_doc_ids=new_doc_ids,
532+
new_parse_versions=new_parse_versions,
533+
)
534+
443535
async def upsert_relation_with_lineage(
444536
self,
445537
*,

0 commit comments

Comments
 (0)