Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 213 additions & 0 deletions tests/integration/compat/test_lineage_graph_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,3 +838,216 @@ async def test_delete_relation_returns_false_when_absent(store, collection_id):
_, s = store
deleted = await s.delete_relation("Alice", "Bob", "knows")
assert deleted is False


# --- task #61 P0 — bulk_upsert_entity_with_lineage_parts cross-backend -----
#
# This Protocol method (Wave 8 W8-2) was previously not exercised by any
# cross-backend test (冬柏 task #67 audit msg=3e93bb64). Bulk write paths
# are exactly where backend differences emerge: batch limits / atomicity /
# error handling / dedup contract. PM @不穷 elevated this to P0 in
# msg=10b753e8. The Protocol contract (`aperag/indexing/graph.py:575+`):
# * All record.name MUST share the same string — raise ValueError otherwise.
# * Empty parts is a no-op.
# * Per-part record.entity_type follows last-wins.
# * compacted_description is intentionally NOT a parameter.
# * Same dedup-by-(document_id, parse_version) key as single upsert.
# * Forward-only retry safety: per-part dedup so replays are idempotent.


@pytest.mark.asyncio
async def test_bulk_upsert_entity_with_lineage_parts_empty_is_noop(store, collection_id):
"""Per Protocol contract `empty parts is a no-op`. The store MUST
not raise and MUST not create any row."""

_, s = store
await s.bulk_upsert_entity_with_lineage_parts(parts=[])
# Round-trip: get_entity should return None for a name that was never
# written — the empty bulk must not implicitly create any entity.
assert await s.get_entity("Alice") is None


@pytest.mark.asyncio
async def test_bulk_upsert_entity_with_lineage_parts_rejects_mixed_names(store, collection_id):
"""Per Protocol contract `all record.name values MUST share the same
string — backends MAY assert and raise ValueError if they don't`.
All 3 production backends MUST raise to keep the bulk path's atomic
guarantee honest (a mixed-name bulk would silently fan out to N
different entity rows — atomicity meaningless)."""

_, s = store
with pytest.raises(ValueError):
await s.bulk_upsert_entity_with_lineage_parts(
parts=[
(_entity("Alice"), _LM_A_V1),
(_entity("Bob"), _LM_B_V1),
],
)
# Per @huangheng msg=99b5ffd5 + @ziang msg=84f5c3cc NIT — Lesson
# #12 v6.4 (aggregation chain): a backend that raised AFTER writing
# the first row would silently leak partial state. Pin
# "raise-then-zero-side-effect" so any backend that swaps validation
# order regresses loudly.
assert await s.get_entity("Alice") is None, "raise must occur before any row write"
assert await s.get_entity("Bob") is None, "raise must occur before any row write"


@pytest.mark.asyncio
async def test_bulk_upsert_entity_with_lineage_parts_round_trip(store, collection_id):
"""Bulk write 3 distinct (document_id, parse_version) parts for the
same entity name; all 3 lineage members + description parts must be
visible after a single round-trip."""

_, s = store
await s.bulk_upsert_entity_with_lineage_parts(
parts=[
(_entity("Alice", description="from-doc-A-v1"), _LM_A_V1),
(_entity("Alice", description="from-doc-A-v2"), _LM_A_V2),
(_entity("Alice", description="from-doc-B-v1"), _LM_B_V1),
],
)
got = await s.get_entity("Alice")
assert got is not None
keys = {(lm.document_id, lm.parse_version) for lm in got.source_lineage}
assert keys == {("doc-A", "v1"), ("doc-A", "v2"), ("doc-B", "v1")}, (
f"all 3 (document_id, parse_version) members must be visible after bulk; got {keys}"
)
# Per @huangzhangshu msg=5bbc5d1a — description_parts text must
# round-trip alongside lineage keys; a backend that wrote the
# lineage member but dropped the description text would silently
# break agent context retrieval.
parts_by_key = {(p.document_id, p.parse_version): p.text for p in got.description_parts}
assert parts_by_key[("doc-A", "v1")] == "from-doc-A-v1"
assert parts_by_key[("doc-A", "v2")] == "from-doc-A-v2"
assert parts_by_key[("doc-B", "v1")] == "from-doc-B-v1"


@pytest.mark.asyncio
async def test_bulk_upsert_entity_with_lineage_parts_dedup_last_wins_within_bulk(store, collection_id):
"""Per Protocol contract `parts sharing the same key collapse
last-wins`. Two parts in the same bulk with the same
(document_id, parse_version) MUST collapse to one row, with the
second part's description winning."""

_, s = store
await s.bulk_upsert_entity_with_lineage_parts(
parts=[
(_entity("Alice", description="first-write"), _LM_A_V1),
(_entity("Alice", description="last-write"), _LM_A_V1),
],
)
got = await s.get_entity("Alice")
assert got is not None
matching = [lm for lm in got.source_lineage if lm.document_id == "doc-A" and lm.parse_version == "v1"]
assert len(matching) == 1, f"same-key parts must collapse to one member; got {len(matching)}"
# Per @huangzhangshu msg=5bbc5d1a — last-wins is on description
# text not just lineage member. A backend that collapsed to one
# member but kept the FIRST description text would silently break
# the contract.
parts_by_key = {(p.document_id, p.parse_version): p.text for p in got.description_parts}
assert parts_by_key[("doc-A", "v1")] == "last-write", (
f"same-key dedup MUST keep last description text; got {parts_by_key.get(('doc-A', 'v1'))!r}"
)


@pytest.mark.asyncio
async def test_bulk_upsert_entity_with_lineage_parts_replaces_existing_same_key(store, collection_id):
"""An existing single upsert with key (doc-A, v1) must be replaced
when a subsequent bulk write contains the same key — same dedup
contract as single upsert. Per Protocol: bulk strips existing rows
whose key matches any incoming part, then appends the new parts."""

_, s = store
await s.upsert_entity_with_lineage(record=_entity("Alice", description="single"), lineage=_LM_A_V1)
await s.bulk_upsert_entity_with_lineage_parts(
parts=[(_entity("Alice", description="bulk"), _LM_A_V1)],
)
got = await s.get_entity("Alice")
assert got is not None
matching = [lm for lm in got.source_lineage if lm.document_id == "doc-A" and lm.parse_version == "v1"]
assert len(matching) == 1, "single + bulk on same key must collapse to one member"
# Per @huangzhangshu msg=5bbc5d1a — bulk's strip-then-append MUST
# keep the bulk-side description text (not the prior single
# upsert's). A backend that kept the single-side text would silently
# fail the strip semantics.
parts_by_key = {(p.document_id, p.parse_version): p.text for p in got.description_parts}
assert parts_by_key[("doc-A", "v1")] == "bulk", (
f"bulk write MUST overwrite single-write description; got {parts_by_key.get(('doc-A', 'v1'))!r}"
)


@pytest.mark.asyncio
async def test_bulk_upsert_entity_with_lineage_parts_appends_distinct_keys(store, collection_id):
"""An existing single upsert with key (doc-A, v1) must coexist with
a subsequent bulk write containing distinct keys — bulk must NOT
wipe unrelated lineage members. This is the cross-backend dedup
invariant the Protocol pins."""

_, s = store
await s.upsert_entity_with_lineage(record=_entity("Alice"), lineage=_LM_A_V1)
await s.bulk_upsert_entity_with_lineage_parts(
parts=[
(_entity("Alice"), _LM_A_V2),
(_entity("Alice"), _LM_B_V1),
],
)
got = await s.get_entity("Alice")
assert got is not None
keys = {(lm.document_id, lm.parse_version) for lm in got.source_lineage}
assert keys == {("doc-A", "v1"), ("doc-A", "v2"), ("doc-B", "v1")}, (
f"bulk with distinct keys MUST NOT wipe pre-existing lineage; got {keys}"
)


@pytest.mark.asyncio
async def test_bulk_upsert_entity_with_lineage_parts_entity_type_last_wins(store, collection_id):
"""Per Protocol contract `per-part record.entity_type follows the
single-upsert "most recently observed value wins" rule (last
tuple's type is the post-write entity_type for the row)`. The
final row's entity_type MUST match the last bulk part's type."""

_, s = store
await s.bulk_upsert_entity_with_lineage_parts(
parts=[
(_entity("Alice", entity_type="person"), _LM_A_V1),
(_entity("Alice", entity_type="researcher"), _LM_A_V2),
],
)
got = await s.get_entity("Alice")
assert got is not None
assert got.entity_type == "researcher", f"last bulk part's entity_type wins; got {got.entity_type}"


@pytest.mark.asyncio
async def test_bulk_upsert_entity_with_lineage_parts_replay_is_idempotent(store, collection_id):
"""Per Protocol contract `Forward-only retry safety: the dedup key
is per-part, so a mid-flight crash + retry replays each tuple
idempotently`. Two consecutive bulk calls with the same key MUST
collapse to one lineage member (idempotent) AND the second call's
description MUST overwrite the first (last-wins on replay).

Per @huangheng msg=99b5ffd5 + @ziang msg=84f5c3cc NIT — replay
safety is the critical retry-correctness invariant; a backend that
appended on replay (instead of dedup-then-replace) would silently
duplicate lineage members under retry.
"""

_, s = store
await s.bulk_upsert_entity_with_lineage_parts(
parts=[(_entity("Alice", description="v1-first"), _LM_A_V1)],
)
await s.bulk_upsert_entity_with_lineage_parts(
parts=[(_entity("Alice", description="v1-replay"), _LM_A_V1)],
)
got = await s.get_entity("Alice")
assert got is not None
matching = [lm for lm in got.source_lineage if lm.document_id == "doc-A" and lm.parse_version == "v1"]
assert len(matching) == 1, f"replay must be idempotent (no duplicate member); got {len(matching)}"
# Per @huangzhangshu msg=5bbc5d1a — replay safety is not just about
# member dedup; the replay's description text MUST overwrite the
# first call's (last-wins on replay). A backend that kept the first
# text would silently fail forward-progress on retry.
parts_by_key = {(p.document_id, p.parse_version): p.text for p in got.description_parts}
assert parts_by_key[("doc-A", "v1")] == "v1-replay", (
f"replay MUST overwrite description (last-wins); got {parts_by_key.get(('doc-A', 'v1'))!r}"
)
Loading