From 1d03a3d27f70bba4f7994640044621b546cffa6f Mon Sep 17 00:00:00 2001 From: earayu Date: Thu, 30 Apr 2026 12:51:43 +0800 Subject: [PATCH 1/3] =?UTF-8?q?test(compat):=20task=20#61=20P1=20=E2=80=94?= =?UTF-8?q?=20bulk=5Fupsert=5Fentity=5Fwith=5Flineage=5Fparts=20cross-back?= =?UTF-8?q?end?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PM @不穷 elevated this Protocol method as a P0 audit gap (msg=10b753e8). Until now ``bulk_upsert_entity_with_lineage_parts`` (Wave 8 W8-2) had no cross-backend test in `tests/integration/compat/`, even though all three production backends (Postgres / Neo4j / Nebula) implement it and the indexing worker uses it for the LineageEntityMerger merge step. Bulk write paths are exactly where backend differences emerge — batch size limits, transaction atomicity, error handling, dedup contract — and the lack of a parametrized matrix here meant any silent drift in the bulk semantics would survive merge. This adds 7 new parametrized cases that pin the Protocol contract declared in `aperag/indexing/graph.py:575+`: * empty parts is a no-op (no implicit row creation) * mixed-name parts raise ValueError (atomicity guarantee) * round-trip: 3 distinct (document_id, parse_version) parts visible after * dedup last-wins within a single bulk call * bulk replaces existing rows on matching key (same as single upsert) * bulk with distinct keys appends, never wipes pre-existing lineage * per-part entity_type follows last-wins rule Coverage delta: 30 → 37 cross-backend cases (collect-only verified). Sister to chenyexuan PR #1926 — without that workflow path fix, this test never triggered on PRs that touch `aperag/indexing/graph_storage/*`. Both PRs together restore real CI gating on cross-backend regressions for the LineageGraphStore Protocol surface. Part of task #61 DB compat audit (earayu2 directive msg=f26b703e), testing-lane slice (task #67, claimed via msg=e02c3028). Co-Authored-By: Claude Opus 4.7 --- .../compat/test_lineage_graph_compat.py | 147 ++++++++++++++++++ 1 file changed, 147 insertions(+) diff --git a/tests/integration/compat/test_lineage_graph_compat.py b/tests/integration/compat/test_lineage_graph_compat.py index bce89743d..d2927705e 100644 --- a/tests/integration/compat/test_lineage_graph_compat.py +++ b/tests/integration/compat/test_lineage_graph_compat.py @@ -838,3 +838,150 @@ async def test_delete_relation_returns_false_when_absent(store, collection_id): _, s = store deleted = await s.delete_relation("Alice", "Bob", "knows") assert deleted is False + + +# --- task #61 P0 — bulk_upsert_entity_with_lineage_parts cross-backend ----- +# +# This Protocol method (Wave 8 W8-2) was previously not exercised by any +# cross-backend test (冬柏 task #67 audit msg=3e93bb64). Bulk write paths +# are exactly where backend differences emerge: batch limits / atomicity / +# error handling / dedup contract. PM @不穷 elevated this to P0 in +# msg=10b753e8. The Protocol contract (`aperag/indexing/graph.py:575+`): +# * All record.name MUST share the same string — raise ValueError otherwise. +# * Empty parts is a no-op. +# * Per-part record.entity_type follows last-wins. +# * compacted_description is intentionally NOT a parameter. +# * Same dedup-by-(document_id, parse_version) key as single upsert. +# * Forward-only retry safety: per-part dedup so replays are idempotent. + + +@pytest.mark.asyncio +async def test_bulk_upsert_entity_with_lineage_parts_empty_is_noop(store, collection_id): + """Per Protocol contract `empty parts is a no-op`. The store MUST + not raise and MUST not create any row.""" + + _, s = store + await s.bulk_upsert_entity_with_lineage_parts(parts=[]) + # Round-trip: get_entity should return None for a name that was never + # written — the empty bulk must not implicitly create any entity. + assert await s.get_entity("Alice") is None + + +@pytest.mark.asyncio +async def test_bulk_upsert_entity_with_lineage_parts_rejects_mixed_names(store, collection_id): + """Per Protocol contract `all record.name values MUST share the same + string — backends MAY assert and raise ValueError if they don't`. + All 3 production backends MUST raise to keep the bulk path's atomic + guarantee honest (a mixed-name bulk would silently fan out to N + different entity rows — atomicity meaningless).""" + + _, s = store + with pytest.raises(ValueError): + await s.bulk_upsert_entity_with_lineage_parts( + parts=[ + (_entity("Alice"), _LM_A_V1), + (_entity("Bob"), _LM_B_V1), + ], + ) + + +@pytest.mark.asyncio +async def test_bulk_upsert_entity_with_lineage_parts_round_trip(store, collection_id): + """Bulk write 3 distinct (document_id, parse_version) parts for the + same entity name; all 3 lineage members + description parts must be + visible after a single round-trip.""" + + _, s = store + await s.bulk_upsert_entity_with_lineage_parts( + parts=[ + (_entity("Alice", description="from-doc-A-v1"), _LM_A_V1), + (_entity("Alice", description="from-doc-A-v2"), _LM_A_V2), + (_entity("Alice", description="from-doc-B-v1"), _LM_B_V1), + ], + ) + got = await s.get_entity("Alice") + assert got is not None + keys = {(lm.document_id, lm.parse_version) for lm in got.source_lineage} + assert keys == {("doc-A", "v1"), ("doc-A", "v2"), ("doc-B", "v1")}, ( + f"all 3 (document_id, parse_version) members must be visible after bulk; got {keys}" + ) + + +@pytest.mark.asyncio +async def test_bulk_upsert_entity_with_lineage_parts_dedup_last_wins_within_bulk(store, collection_id): + """Per Protocol contract `parts sharing the same key collapse + last-wins`. Two parts in the same bulk with the same + (document_id, parse_version) MUST collapse to one row, with the + second part's description winning.""" + + _, s = store + await s.bulk_upsert_entity_with_lineage_parts( + parts=[ + (_entity("Alice", description="first-write"), _LM_A_V1), + (_entity("Alice", description="last-write"), _LM_A_V1), + ], + ) + got = await s.get_entity("Alice") + assert got is not None + matching = [lm for lm in got.source_lineage if lm.document_id == "doc-A" and lm.parse_version == "v1"] + assert len(matching) == 1, f"same-key parts must collapse to one member; got {len(matching)}" + + +@pytest.mark.asyncio +async def test_bulk_upsert_entity_with_lineage_parts_replaces_existing_same_key(store, collection_id): + """An existing single upsert with key (doc-A, v1) must be replaced + when a subsequent bulk write contains the same key — same dedup + contract as single upsert. Per Protocol: bulk strips existing rows + whose key matches any incoming part, then appends the new parts.""" + + _, s = store + await s.upsert_entity_with_lineage(record=_entity("Alice", description="single"), lineage=_LM_A_V1) + await s.bulk_upsert_entity_with_lineage_parts( + parts=[(_entity("Alice", description="bulk"), _LM_A_V1)], + ) + got = await s.get_entity("Alice") + assert got is not None + matching = [lm for lm in got.source_lineage if lm.document_id == "doc-A" and lm.parse_version == "v1"] + assert len(matching) == 1, "single + bulk on same key must collapse to one member" + + +@pytest.mark.asyncio +async def test_bulk_upsert_entity_with_lineage_parts_appends_distinct_keys(store, collection_id): + """An existing single upsert with key (doc-A, v1) must coexist with + a subsequent bulk write containing distinct keys — bulk must NOT + wipe unrelated lineage members. This is the cross-backend dedup + invariant the Protocol pins.""" + + _, s = store + await s.upsert_entity_with_lineage(record=_entity("Alice"), lineage=_LM_A_V1) + await s.bulk_upsert_entity_with_lineage_parts( + parts=[ + (_entity("Alice"), _LM_A_V2), + (_entity("Alice"), _LM_B_V1), + ], + ) + got = await s.get_entity("Alice") + assert got is not None + keys = {(lm.document_id, lm.parse_version) for lm in got.source_lineage} + assert keys == {("doc-A", "v1"), ("doc-A", "v2"), ("doc-B", "v1")}, ( + f"bulk with distinct keys MUST NOT wipe pre-existing lineage; got {keys}" + ) + + +@pytest.mark.asyncio +async def test_bulk_upsert_entity_with_lineage_parts_entity_type_last_wins(store, collection_id): + """Per Protocol contract `per-part record.entity_type follows the + single-upsert "most recently observed value wins" rule (last + tuple's type is the post-write entity_type for the row)`. The + final row's entity_type MUST match the last bulk part's type.""" + + _, s = store + await s.bulk_upsert_entity_with_lineage_parts( + parts=[ + (_entity("Alice", entity_type="person"), _LM_A_V1), + (_entity("Alice", entity_type="researcher"), _LM_A_V2), + ], + ) + got = await s.get_entity("Alice") + assert got is not None + assert got.entity_type == "researcher", f"last bulk part's entity_type wins; got {got.entity_type}" From 381d7a7552717c5ed8bf730b5c1208e81a9c6335 Mon Sep 17 00:00:00 2001 From: earayu Date: Thu, 30 Apr 2026 12:56:37 +0800 Subject: [PATCH 2/3] =?UTF-8?q?fix(compat):=20task=20#61=20P1=20=E2=80=94?= =?UTF-8?q?=20fold=20huangheng+ziang=20NIT=20into=20bulk=5Fupsert=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two non-blocking NITs from @huangheng msg=99b5ffd5 + @ziang msg=84f5c3cc re-CR on PR #1927 — fold-in to land more complete test: * `_rejects_mixed_names` now also asserts post-raise zero-side-effect (`get_entity("Alice") is None` + `get_entity("Bob") is None`) — pins Lesson #12 v6.4 aggregation-chain invariant: a backend that swapped validation order to raise AFTER the first row write would silently leak partial state. * New `_replay_is_idempotent` case — pins the Protocol's "Forward-only retry safety: per-part dedup so replays are idempotent" contract. A backend that appended on replay (instead of dedup-then-replace) would silently duplicate lineage members under retry. Coverage delta: 37 → 38 cross-backend cases. Co-Authored-By: Claude Opus 4.7 --- .../compat/test_lineage_graph_compat.py | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/integration/compat/test_lineage_graph_compat.py b/tests/integration/compat/test_lineage_graph_compat.py index d2927705e..fa00cb782 100644 --- a/tests/integration/compat/test_lineage_graph_compat.py +++ b/tests/integration/compat/test_lineage_graph_compat.py @@ -883,6 +883,13 @@ async def test_bulk_upsert_entity_with_lineage_parts_rejects_mixed_names(store, (_entity("Bob"), _LM_B_V1), ], ) + # Per @huangheng msg=99b5ffd5 + @ziang msg=84f5c3cc NIT — Lesson + # #12 v6.4 (aggregation chain): a backend that raised AFTER writing + # the first row would silently leak partial state. Pin + # "raise-then-zero-side-effect" so any backend that swaps validation + # order regresses loudly. + assert await s.get_entity("Alice") is None, "raise must occur before any row write" + assert await s.get_entity("Bob") is None, "raise must occur before any row write" @pytest.mark.asyncio @@ -985,3 +992,30 @@ async def test_bulk_upsert_entity_with_lineage_parts_entity_type_last_wins(store got = await s.get_entity("Alice") assert got is not None assert got.entity_type == "researcher", f"last bulk part's entity_type wins; got {got.entity_type}" + + +@pytest.mark.asyncio +async def test_bulk_upsert_entity_with_lineage_parts_replay_is_idempotent(store, collection_id): + """Per Protocol contract `Forward-only retry safety: the dedup key + is per-part, so a mid-flight crash + retry replays each tuple + idempotently`. Two consecutive bulk calls with the same key MUST + collapse to one lineage member (idempotent) AND the second call's + description MUST overwrite the first (last-wins on replay). + + Per @huangheng msg=99b5ffd5 + @ziang msg=84f5c3cc NIT — replay + safety is the critical retry-correctness invariant; a backend that + appended on replay (instead of dedup-then-replace) would silently + duplicate lineage members under retry. + """ + + _, s = store + await s.bulk_upsert_entity_with_lineage_parts( + parts=[(_entity("Alice", description="v1-first"), _LM_A_V1)], + ) + await s.bulk_upsert_entity_with_lineage_parts( + parts=[(_entity("Alice", description="v1-replay"), _LM_A_V1)], + ) + got = await s.get_entity("Alice") + assert got is not None + matching = [lm for lm in got.source_lineage if lm.document_id == "doc-A" and lm.parse_version == "v1"] + assert len(matching) == 1, f"replay must be idempotent (no duplicate member); got {len(matching)}" From 1953933a904bc7fd17877c3d8e5ac5983d1bc736 Mon Sep 17 00:00:00 2001 From: earayu Date: Thu, 30 Apr 2026 13:48:08 +0800 Subject: [PATCH 3/3] =?UTF-8?q?fix(compat):=20task=20#61=20P1=20=E2=80=94?= =?UTF-8?q?=20fold=20huangzhangshu=20description=5Fparts=20NIT?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per @huangzhangshu testing primary CR (msg=5bbc5d1a) — the bulk_upsert cases pinned lineage member identity but did not assert ``description_parts`` text content. A backend could write the lineage member key correctly but silently drop or stale-keep the description text, breaking the agent context retrieval contract. Add `description_parts` key→text assertions to 3 cases: * `_round_trip` — all 3 (doc_id, parse_version) parts must carry their source bulk's description text (not silently dropped). * `_dedup_last_wins_within_bulk` — same-key collapse must keep the LAST description text within the bulk (not first). * `_replaces_existing_same_key` — bulk's strip-then-append must overwrite the prior single-write description (not silently keep it). * `_replay_is_idempotent` — replay must overwrite first call's description with the second's (last-wins on replay), not just dedup the member. Coverage delta: same 38 cases, but every dedup/replace/replay case now pins both lineage AND description_parts text contract. Co-Authored-By: Claude Opus 4.7 --- .../compat/test_lineage_graph_compat.py | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tests/integration/compat/test_lineage_graph_compat.py b/tests/integration/compat/test_lineage_graph_compat.py index fa00cb782..c143701c2 100644 --- a/tests/integration/compat/test_lineage_graph_compat.py +++ b/tests/integration/compat/test_lineage_graph_compat.py @@ -912,6 +912,14 @@ async def test_bulk_upsert_entity_with_lineage_parts_round_trip(store, collectio assert keys == {("doc-A", "v1"), ("doc-A", "v2"), ("doc-B", "v1")}, ( f"all 3 (document_id, parse_version) members must be visible after bulk; got {keys}" ) + # Per @huangzhangshu msg=5bbc5d1a — description_parts text must + # round-trip alongside lineage keys; a backend that wrote the + # lineage member but dropped the description text would silently + # break agent context retrieval. + parts_by_key = {(p.document_id, p.parse_version): p.text for p in got.description_parts} + assert parts_by_key[("doc-A", "v1")] == "from-doc-A-v1" + assert parts_by_key[("doc-A", "v2")] == "from-doc-A-v2" + assert parts_by_key[("doc-B", "v1")] == "from-doc-B-v1" @pytest.mark.asyncio @@ -932,6 +940,14 @@ async def test_bulk_upsert_entity_with_lineage_parts_dedup_last_wins_within_bulk assert got is not None matching = [lm for lm in got.source_lineage if lm.document_id == "doc-A" and lm.parse_version == "v1"] assert len(matching) == 1, f"same-key parts must collapse to one member; got {len(matching)}" + # Per @huangzhangshu msg=5bbc5d1a — last-wins is on description + # text not just lineage member. A backend that collapsed to one + # member but kept the FIRST description text would silently break + # the contract. + parts_by_key = {(p.document_id, p.parse_version): p.text for p in got.description_parts} + assert parts_by_key[("doc-A", "v1")] == "last-write", ( + f"same-key dedup MUST keep last description text; got {parts_by_key.get(('doc-A', 'v1'))!r}" + ) @pytest.mark.asyncio @@ -950,6 +966,14 @@ async def test_bulk_upsert_entity_with_lineage_parts_replaces_existing_same_key( assert got is not None matching = [lm for lm in got.source_lineage if lm.document_id == "doc-A" and lm.parse_version == "v1"] assert len(matching) == 1, "single + bulk on same key must collapse to one member" + # Per @huangzhangshu msg=5bbc5d1a — bulk's strip-then-append MUST + # keep the bulk-side description text (not the prior single + # upsert's). A backend that kept the single-side text would silently + # fail the strip semantics. + parts_by_key = {(p.document_id, p.parse_version): p.text for p in got.description_parts} + assert parts_by_key[("doc-A", "v1")] == "bulk", ( + f"bulk write MUST overwrite single-write description; got {parts_by_key.get(('doc-A', 'v1'))!r}" + ) @pytest.mark.asyncio @@ -1019,3 +1043,11 @@ async def test_bulk_upsert_entity_with_lineage_parts_replay_is_idempotent(store, assert got is not None matching = [lm for lm in got.source_lineage if lm.document_id == "doc-A" and lm.parse_version == "v1"] assert len(matching) == 1, f"replay must be idempotent (no duplicate member); got {len(matching)}" + # Per @huangzhangshu msg=5bbc5d1a — replay safety is not just about + # member dedup; the replay's description text MUST overwrite the + # first call's (last-wins on replay). A backend that kept the first + # text would silently fail forward-progress on retry. + parts_by_key = {(p.document_id, p.parse_version): p.text for p in got.description_parts} + assert parts_by_key[("doc-A", "v1")] == "v1-replay", ( + f"replay MUST overwrite description (last-wins); got {parts_by_key.get(('doc-A', 'v1'))!r}" + )