Skip to content

Commit 1d03a3d

Browse files
earayuclaude
andcommitted
test(compat): task #61 P1 — bulk_upsert_entity_with_lineage_parts cross-backend
PM @不穷 elevated this Protocol method as a P0 audit gap (msg=10b753e8). Until now ``bulk_upsert_entity_with_lineage_parts`` (Wave 8 W8-2) had no cross-backend test in `tests/integration/compat/`, even though all three production backends (Postgres / Neo4j / Nebula) implement it and the indexing worker uses it for the LineageEntityMerger merge step. Bulk write paths are exactly where backend differences emerge — batch size limits, transaction atomicity, error handling, dedup contract — and the lack of a parametrized matrix here meant any silent drift in the bulk semantics would survive merge. This adds 7 new parametrized cases that pin the Protocol contract declared in `aperag/indexing/graph.py:575+`: * empty parts is a no-op (no implicit row creation) * mixed-name parts raise ValueError (atomicity guarantee) * round-trip: 3 distinct (document_id, parse_version) parts visible after * dedup last-wins within a single bulk call * bulk replaces existing rows on matching key (same as single upsert) * bulk with distinct keys appends, never wipes pre-existing lineage * per-part entity_type follows last-wins rule Coverage delta: 30 → 37 cross-backend cases (collect-only verified). Sister to chenyexuan PR #1926 — without that workflow path fix, this test never triggered on PRs that touch `aperag/indexing/graph_storage/*`. Both PRs together restore real CI gating on cross-backend regressions for the LineageGraphStore Protocol surface. Part of task #61 DB compat audit (earayu2 directive msg=f26b703e), testing-lane slice (task #67, claimed via msg=e02c3028). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent a0403cf commit 1d03a3d

1 file changed

Lines changed: 147 additions & 0 deletions

File tree

tests/integration/compat/test_lineage_graph_compat.py

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -838,3 +838,150 @@ async def test_delete_relation_returns_false_when_absent(store, collection_id):
838838
_, s = store
839839
deleted = await s.delete_relation("Alice", "Bob", "knows")
840840
assert deleted is False
841+
842+
843+
# --- task #61 P0 — bulk_upsert_entity_with_lineage_parts cross-backend -----
844+
#
845+
# This Protocol method (Wave 8 W8-2) was previously not exercised by any
846+
# cross-backend test (冬柏 task #67 audit msg=3e93bb64). Bulk write paths
847+
# are exactly where backend differences emerge: batch limits / atomicity /
848+
# error handling / dedup contract. PM @不穷 elevated this to P0 in
849+
# msg=10b753e8. The Protocol contract (`aperag/indexing/graph.py:575+`):
850+
# * All record.name MUST share the same string — raise ValueError otherwise.
851+
# * Empty parts is a no-op.
852+
# * Per-part record.entity_type follows last-wins.
853+
# * compacted_description is intentionally NOT a parameter.
854+
# * Same dedup-by-(document_id, parse_version) key as single upsert.
855+
# * Forward-only retry safety: per-part dedup so replays are idempotent.
856+
857+
858+
@pytest.mark.asyncio
859+
async def test_bulk_upsert_entity_with_lineage_parts_empty_is_noop(store, collection_id):
860+
"""Per Protocol contract `empty parts is a no-op`. The store MUST
861+
not raise and MUST not create any row."""
862+
863+
_, s = store
864+
await s.bulk_upsert_entity_with_lineage_parts(parts=[])
865+
# Round-trip: get_entity should return None for a name that was never
866+
# written — the empty bulk must not implicitly create any entity.
867+
assert await s.get_entity("Alice") is None
868+
869+
870+
@pytest.mark.asyncio
871+
async def test_bulk_upsert_entity_with_lineage_parts_rejects_mixed_names(store, collection_id):
872+
"""Per Protocol contract `all record.name values MUST share the same
873+
string — backends MAY assert and raise ValueError if they don't`.
874+
All 3 production backends MUST raise to keep the bulk path's atomic
875+
guarantee honest (a mixed-name bulk would silently fan out to N
876+
different entity rows — atomicity meaningless)."""
877+
878+
_, s = store
879+
with pytest.raises(ValueError):
880+
await s.bulk_upsert_entity_with_lineage_parts(
881+
parts=[
882+
(_entity("Alice"), _LM_A_V1),
883+
(_entity("Bob"), _LM_B_V1),
884+
],
885+
)
886+
887+
888+
@pytest.mark.asyncio
889+
async def test_bulk_upsert_entity_with_lineage_parts_round_trip(store, collection_id):
890+
"""Bulk write 3 distinct (document_id, parse_version) parts for the
891+
same entity name; all 3 lineage members + description parts must be
892+
visible after a single round-trip."""
893+
894+
_, s = store
895+
await s.bulk_upsert_entity_with_lineage_parts(
896+
parts=[
897+
(_entity("Alice", description="from-doc-A-v1"), _LM_A_V1),
898+
(_entity("Alice", description="from-doc-A-v2"), _LM_A_V2),
899+
(_entity("Alice", description="from-doc-B-v1"), _LM_B_V1),
900+
],
901+
)
902+
got = await s.get_entity("Alice")
903+
assert got is not None
904+
keys = {(lm.document_id, lm.parse_version) for lm in got.source_lineage}
905+
assert keys == {("doc-A", "v1"), ("doc-A", "v2"), ("doc-B", "v1")}, (
906+
f"all 3 (document_id, parse_version) members must be visible after bulk; got {keys}"
907+
)
908+
909+
910+
@pytest.mark.asyncio
911+
async def test_bulk_upsert_entity_with_lineage_parts_dedup_last_wins_within_bulk(store, collection_id):
912+
"""Per Protocol contract `parts sharing the same key collapse
913+
last-wins`. Two parts in the same bulk with the same
914+
(document_id, parse_version) MUST collapse to one row, with the
915+
second part's description winning."""
916+
917+
_, s = store
918+
await s.bulk_upsert_entity_with_lineage_parts(
919+
parts=[
920+
(_entity("Alice", description="first-write"), _LM_A_V1),
921+
(_entity("Alice", description="last-write"), _LM_A_V1),
922+
],
923+
)
924+
got = await s.get_entity("Alice")
925+
assert got is not None
926+
matching = [lm for lm in got.source_lineage if lm.document_id == "doc-A" and lm.parse_version == "v1"]
927+
assert len(matching) == 1, f"same-key parts must collapse to one member; got {len(matching)}"
928+
929+
930+
@pytest.mark.asyncio
931+
async def test_bulk_upsert_entity_with_lineage_parts_replaces_existing_same_key(store, collection_id):
932+
"""An existing single upsert with key (doc-A, v1) must be replaced
933+
when a subsequent bulk write contains the same key — same dedup
934+
contract as single upsert. Per Protocol: bulk strips existing rows
935+
whose key matches any incoming part, then appends the new parts."""
936+
937+
_, s = store
938+
await s.upsert_entity_with_lineage(record=_entity("Alice", description="single"), lineage=_LM_A_V1)
939+
await s.bulk_upsert_entity_with_lineage_parts(
940+
parts=[(_entity("Alice", description="bulk"), _LM_A_V1)],
941+
)
942+
got = await s.get_entity("Alice")
943+
assert got is not None
944+
matching = [lm for lm in got.source_lineage if lm.document_id == "doc-A" and lm.parse_version == "v1"]
945+
assert len(matching) == 1, "single + bulk on same key must collapse to one member"
946+
947+
948+
@pytest.mark.asyncio
949+
async def test_bulk_upsert_entity_with_lineage_parts_appends_distinct_keys(store, collection_id):
950+
"""An existing single upsert with key (doc-A, v1) must coexist with
951+
a subsequent bulk write containing distinct keys — bulk must NOT
952+
wipe unrelated lineage members. This is the cross-backend dedup
953+
invariant the Protocol pins."""
954+
955+
_, s = store
956+
await s.upsert_entity_with_lineage(record=_entity("Alice"), lineage=_LM_A_V1)
957+
await s.bulk_upsert_entity_with_lineage_parts(
958+
parts=[
959+
(_entity("Alice"), _LM_A_V2),
960+
(_entity("Alice"), _LM_B_V1),
961+
],
962+
)
963+
got = await s.get_entity("Alice")
964+
assert got is not None
965+
keys = {(lm.document_id, lm.parse_version) for lm in got.source_lineage}
966+
assert keys == {("doc-A", "v1"), ("doc-A", "v2"), ("doc-B", "v1")}, (
967+
f"bulk with distinct keys MUST NOT wipe pre-existing lineage; got {keys}"
968+
)
969+
970+
971+
@pytest.mark.asyncio
972+
async def test_bulk_upsert_entity_with_lineage_parts_entity_type_last_wins(store, collection_id):
973+
"""Per Protocol contract `per-part record.entity_type follows the
974+
single-upsert "most recently observed value wins" rule (last
975+
tuple's type is the post-write entity_type for the row)`. The
976+
final row's entity_type MUST match the last bulk part's type."""
977+
978+
_, s = store
979+
await s.bulk_upsert_entity_with_lineage_parts(
980+
parts=[
981+
(_entity("Alice", entity_type="person"), _LM_A_V1),
982+
(_entity("Alice", entity_type="researcher"), _LM_A_V2),
983+
],
984+
)
985+
got = await s.get_entity("Alice")
986+
assert got is not None
987+
assert got.entity_type == "researcher", f"last bulk part's entity_type wins; got {got.entity_type}"

0 commit comments

Comments
 (0)