diff --git a/tests/integration/compat/test_lineage_graph_compat.py b/tests/integration/compat/test_lineage_graph_compat.py index bce89743d..c143701c2 100644 --- a/tests/integration/compat/test_lineage_graph_compat.py +++ b/tests/integration/compat/test_lineage_graph_compat.py @@ -838,3 +838,216 @@ async def test_delete_relation_returns_false_when_absent(store, collection_id): _, s = store deleted = await s.delete_relation("Alice", "Bob", "knows") assert deleted is False + + +# --- task #61 P0 — bulk_upsert_entity_with_lineage_parts cross-backend ----- +# +# This Protocol method (Wave 8 W8-2) was previously not exercised by any +# cross-backend test (冬柏 task #67 audit msg=3e93bb64). Bulk write paths +# are exactly where backend differences emerge: batch limits / atomicity / +# error handling / dedup contract. PM @不穷 elevated this to P0 in +# msg=10b753e8. The Protocol contract (`aperag/indexing/graph.py:575+`): +# * All record.name MUST share the same string — raise ValueError otherwise. +# * Empty parts is a no-op. +# * Per-part record.entity_type follows last-wins. +# * compacted_description is intentionally NOT a parameter. +# * Same dedup-by-(document_id, parse_version) key as single upsert. +# * Forward-only retry safety: per-part dedup so replays are idempotent. + + +@pytest.mark.asyncio +async def test_bulk_upsert_entity_with_lineage_parts_empty_is_noop(store, collection_id): + """Per Protocol contract `empty parts is a no-op`. The store MUST + not raise and MUST not create any row.""" + + _, s = store + await s.bulk_upsert_entity_with_lineage_parts(parts=[]) + # Round-trip: get_entity should return None for a name that was never + # written — the empty bulk must not implicitly create any entity. + assert await s.get_entity("Alice") is None + + +@pytest.mark.asyncio +async def test_bulk_upsert_entity_with_lineage_parts_rejects_mixed_names(store, collection_id): + """Per Protocol contract `all record.name values MUST share the same + string — backends MAY assert and raise ValueError if they don't`. + All 3 production backends MUST raise to keep the bulk path's atomic + guarantee honest (a mixed-name bulk would silently fan out to N + different entity rows — atomicity meaningless).""" + + _, s = store + with pytest.raises(ValueError): + await s.bulk_upsert_entity_with_lineage_parts( + parts=[ + (_entity("Alice"), _LM_A_V1), + (_entity("Bob"), _LM_B_V1), + ], + ) + # Per @huangheng msg=99b5ffd5 + @ziang msg=84f5c3cc NIT — Lesson + # #12 v6.4 (aggregation chain): a backend that raised AFTER writing + # the first row would silently leak partial state. Pin + # "raise-then-zero-side-effect" so any backend that swaps validation + # order regresses loudly. + assert await s.get_entity("Alice") is None, "raise must occur before any row write" + assert await s.get_entity("Bob") is None, "raise must occur before any row write" + + +@pytest.mark.asyncio +async def test_bulk_upsert_entity_with_lineage_parts_round_trip(store, collection_id): + """Bulk write 3 distinct (document_id, parse_version) parts for the + same entity name; all 3 lineage members + description parts must be + visible after a single round-trip.""" + + _, s = store + await s.bulk_upsert_entity_with_lineage_parts( + parts=[ + (_entity("Alice", description="from-doc-A-v1"), _LM_A_V1), + (_entity("Alice", description="from-doc-A-v2"), _LM_A_V2), + (_entity("Alice", description="from-doc-B-v1"), _LM_B_V1), + ], + ) + got = await s.get_entity("Alice") + assert got is not None + keys = {(lm.document_id, lm.parse_version) for lm in got.source_lineage} + assert keys == {("doc-A", "v1"), ("doc-A", "v2"), ("doc-B", "v1")}, ( + f"all 3 (document_id, parse_version) members must be visible after bulk; got {keys}" + ) + # Per @huangzhangshu msg=5bbc5d1a — description_parts text must + # round-trip alongside lineage keys; a backend that wrote the + # lineage member but dropped the description text would silently + # break agent context retrieval. + parts_by_key = {(p.document_id, p.parse_version): p.text for p in got.description_parts} + assert parts_by_key[("doc-A", "v1")] == "from-doc-A-v1" + assert parts_by_key[("doc-A", "v2")] == "from-doc-A-v2" + assert parts_by_key[("doc-B", "v1")] == "from-doc-B-v1" + + +@pytest.mark.asyncio +async def test_bulk_upsert_entity_with_lineage_parts_dedup_last_wins_within_bulk(store, collection_id): + """Per Protocol contract `parts sharing the same key collapse + last-wins`. Two parts in the same bulk with the same + (document_id, parse_version) MUST collapse to one row, with the + second part's description winning.""" + + _, s = store + await s.bulk_upsert_entity_with_lineage_parts( + parts=[ + (_entity("Alice", description="first-write"), _LM_A_V1), + (_entity("Alice", description="last-write"), _LM_A_V1), + ], + ) + got = await s.get_entity("Alice") + assert got is not None + matching = [lm for lm in got.source_lineage if lm.document_id == "doc-A" and lm.parse_version == "v1"] + assert len(matching) == 1, f"same-key parts must collapse to one member; got {len(matching)}" + # Per @huangzhangshu msg=5bbc5d1a — last-wins is on description + # text not just lineage member. A backend that collapsed to one + # member but kept the FIRST description text would silently break + # the contract. + parts_by_key = {(p.document_id, p.parse_version): p.text for p in got.description_parts} + assert parts_by_key[("doc-A", "v1")] == "last-write", ( + f"same-key dedup MUST keep last description text; got {parts_by_key.get(('doc-A', 'v1'))!r}" + ) + + +@pytest.mark.asyncio +async def test_bulk_upsert_entity_with_lineage_parts_replaces_existing_same_key(store, collection_id): + """An existing single upsert with key (doc-A, v1) must be replaced + when a subsequent bulk write contains the same key — same dedup + contract as single upsert. Per Protocol: bulk strips existing rows + whose key matches any incoming part, then appends the new parts.""" + + _, s = store + await s.upsert_entity_with_lineage(record=_entity("Alice", description="single"), lineage=_LM_A_V1) + await s.bulk_upsert_entity_with_lineage_parts( + parts=[(_entity("Alice", description="bulk"), _LM_A_V1)], + ) + got = await s.get_entity("Alice") + assert got is not None + matching = [lm for lm in got.source_lineage if lm.document_id == "doc-A" and lm.parse_version == "v1"] + assert len(matching) == 1, "single + bulk on same key must collapse to one member" + # Per @huangzhangshu msg=5bbc5d1a — bulk's strip-then-append MUST + # keep the bulk-side description text (not the prior single + # upsert's). A backend that kept the single-side text would silently + # fail the strip semantics. + parts_by_key = {(p.document_id, p.parse_version): p.text for p in got.description_parts} + assert parts_by_key[("doc-A", "v1")] == "bulk", ( + f"bulk write MUST overwrite single-write description; got {parts_by_key.get(('doc-A', 'v1'))!r}" + ) + + +@pytest.mark.asyncio +async def test_bulk_upsert_entity_with_lineage_parts_appends_distinct_keys(store, collection_id): + """An existing single upsert with key (doc-A, v1) must coexist with + a subsequent bulk write containing distinct keys — bulk must NOT + wipe unrelated lineage members. This is the cross-backend dedup + invariant the Protocol pins.""" + + _, s = store + await s.upsert_entity_with_lineage(record=_entity("Alice"), lineage=_LM_A_V1) + await s.bulk_upsert_entity_with_lineage_parts( + parts=[ + (_entity("Alice"), _LM_A_V2), + (_entity("Alice"), _LM_B_V1), + ], + ) + got = await s.get_entity("Alice") + assert got is not None + keys = {(lm.document_id, lm.parse_version) for lm in got.source_lineage} + assert keys == {("doc-A", "v1"), ("doc-A", "v2"), ("doc-B", "v1")}, ( + f"bulk with distinct keys MUST NOT wipe pre-existing lineage; got {keys}" + ) + + +@pytest.mark.asyncio +async def test_bulk_upsert_entity_with_lineage_parts_entity_type_last_wins(store, collection_id): + """Per Protocol contract `per-part record.entity_type follows the + single-upsert "most recently observed value wins" rule (last + tuple's type is the post-write entity_type for the row)`. The + final row's entity_type MUST match the last bulk part's type.""" + + _, s = store + await s.bulk_upsert_entity_with_lineage_parts( + parts=[ + (_entity("Alice", entity_type="person"), _LM_A_V1), + (_entity("Alice", entity_type="researcher"), _LM_A_V2), + ], + ) + got = await s.get_entity("Alice") + assert got is not None + assert got.entity_type == "researcher", f"last bulk part's entity_type wins; got {got.entity_type}" + + +@pytest.mark.asyncio +async def test_bulk_upsert_entity_with_lineage_parts_replay_is_idempotent(store, collection_id): + """Per Protocol contract `Forward-only retry safety: the dedup key + is per-part, so a mid-flight crash + retry replays each tuple + idempotently`. Two consecutive bulk calls with the same key MUST + collapse to one lineage member (idempotent) AND the second call's + description MUST overwrite the first (last-wins on replay). + + Per @huangheng msg=99b5ffd5 + @ziang msg=84f5c3cc NIT — replay + safety is the critical retry-correctness invariant; a backend that + appended on replay (instead of dedup-then-replace) would silently + duplicate lineage members under retry. + """ + + _, s = store + await s.bulk_upsert_entity_with_lineage_parts( + parts=[(_entity("Alice", description="v1-first"), _LM_A_V1)], + ) + await s.bulk_upsert_entity_with_lineage_parts( + parts=[(_entity("Alice", description="v1-replay"), _LM_A_V1)], + ) + got = await s.get_entity("Alice") + assert got is not None + matching = [lm for lm in got.source_lineage if lm.document_id == "doc-A" and lm.parse_version == "v1"] + assert len(matching) == 1, f"replay must be idempotent (no duplicate member); got {len(matching)}" + # Per @huangzhangshu msg=5bbc5d1a — replay safety is not just about + # member dedup; the replay's description text MUST overwrite the + # first call's (last-wins on replay). A backend that kept the first + # text would silently fail forward-progress on retry. + parts_by_key = {(p.document_id, p.parse_version): p.text for p in got.description_parts} + assert parts_by_key[("doc-A", "v1")] == "v1-replay", ( + f"replay MUST overwrite description (last-wins); got {parts_by_key.get(('doc-A', 'v1'))!r}" + )