|
40 | 40 |
|
41 | 41 | import os |
42 | 42 | import uuid |
| 43 | +from typing import Any |
43 | 44 |
|
44 | 45 | import pytest |
45 | 46 |
|
| 47 | +from aperag.graph_curation.lineage_merge import ( |
| 48 | + CURATION_MERGE_DOCUMENT_ID, |
| 49 | + LineageEntityMerger, |
| 50 | +) |
46 | 51 | from aperag.indexing.graph import ( |
47 | 52 | EntityRecord, |
48 | 53 | InMemoryEntityLock, |
@@ -241,6 +246,78 @@ def _relation(source: str, target: str, *, relation_type: str = "knows") -> Rela |
241 | 246 | ) |
242 | 247 |
|
243 | 248 |
|
| 249 | +class _AliasRepoDouble: |
| 250 | + def __init__(self, *, raise_on_upsert: Exception | None = None) -> None: |
| 251 | + self.raise_on_upsert = raise_on_upsert |
| 252 | + self.resolve_calls: list[tuple[str, str]] = [] |
| 253 | + self.upsert_calls: list[dict[str, Any]] = [] |
| 254 | + |
| 255 | + async def resolve_canonical(self, *, collection_id: str, name: str) -> str: |
| 256 | + self.resolve_calls.append((collection_id, name)) |
| 257 | + return name |
| 258 | + |
| 259 | + async def upsert_alias( |
| 260 | + self, |
| 261 | + *, |
| 262 | + collection_id: str, |
| 263 | + alias_name: str, |
| 264 | + target: str, |
| 265 | + merged_by: str | None = None, |
| 266 | + ) -> str: |
| 267 | + if self.raise_on_upsert is not None: |
| 268 | + raise self.raise_on_upsert |
| 269 | + self.upsert_calls.append( |
| 270 | + { |
| 271 | + "collection_id": collection_id, |
| 272 | + "alias_name": alias_name, |
| 273 | + "target": target, |
| 274 | + "merged_by": merged_by, |
| 275 | + } |
| 276 | + ) |
| 277 | + return target |
| 278 | + |
| 279 | + |
| 280 | +class _ForbiddenDescriptionCollaborator: |
| 281 | + """Fail fast if the description-free variant touches legacy surfaces.""" |
| 282 | + |
| 283 | + async def compact_if_oversized(self, *_args: Any, **_kwargs: Any) -> str | None: |
| 284 | + raise AssertionError("description-free apply must not call GraphIndexCompactor") |
| 285 | + |
| 286 | + def embed_query(self, *_args: Any, **_kwargs: Any) -> list[float]: |
| 287 | + raise AssertionError("description-free apply must not embed description text") |
| 288 | + |
| 289 | + def upsert(self, *_args: Any, **_kwargs: Any) -> None: |
| 290 | + raise AssertionError("description-free apply must not upsert vector points") |
| 291 | + |
| 292 | + def delete(self, *_args: Any, **_kwargs: Any) -> None: |
| 293 | + raise AssertionError("description-free apply must not delete vector points") |
| 294 | + |
| 295 | + |
| 296 | +async def _forbidden_llm(_prompt: str) -> str: |
| 297 | + raise AssertionError("description-free apply must not call LLM") |
| 298 | + |
| 299 | + |
| 300 | +def _description_free_merger(store, collection_id: str, alias_repo: _AliasRepoDouble | None = None): |
| 301 | + forbidden = _ForbiddenDescriptionCollaborator() |
| 302 | + return LineageEntityMerger( |
| 303 | + store=store, |
| 304 | + alias_repo=alias_repo or _AliasRepoDouble(), |
| 305 | + compactor=forbidden, |
| 306 | + vector_connector=forbidden, |
| 307 | + embedder=forbidden, |
| 308 | + llm=_forbidden_llm, |
| 309 | + collection_id=collection_id, |
| 310 | + ) |
| 311 | + |
| 312 | + |
| 313 | +def _lineage_keys(entity) -> set[tuple[str, str]]: |
| 314 | + return {(member.document_id, member.parse_version) for member in entity.source_lineage} |
| 315 | + |
| 316 | + |
| 317 | +def _description_texts_by_key(entity) -> dict[tuple[str, str], str]: |
| 318 | + return {(part.document_id, part.parse_version): part.text for part in entity.description_parts} |
| 319 | + |
| 320 | + |
244 | 321 | # --- tests: lineage write + read round-trip ------------------------------- |
245 | 322 |
|
246 | 323 |
|
@@ -909,9 +986,11 @@ async def test_bulk_upsert_entity_with_lineage_parts_round_trip(store, collectio |
909 | 986 | got = await s.get_entity("Alice") |
910 | 987 | assert got is not None |
911 | 988 | keys = {(lm.document_id, lm.parse_version) for lm in got.source_lineage} |
912 | | - assert keys == {("doc-A", "v1"), ("doc-A", "v2"), ("doc-B", "v1")}, ( |
913 | | - f"all 3 (document_id, parse_version) members must be visible after bulk; got {keys}" |
914 | | - ) |
| 989 | + assert keys == { |
| 990 | + ("doc-A", "v1"), |
| 991 | + ("doc-A", "v2"), |
| 992 | + ("doc-B", "v1"), |
| 993 | + }, f"all 3 (document_id, parse_version) members must be visible after bulk; got {keys}" |
915 | 994 | # Per @huangzhangshu msg=5bbc5d1a — description_parts text must |
916 | 995 | # round-trip alongside lineage keys; a backend that wrote the |
917 | 996 | # lineage member but dropped the description text would silently |
@@ -994,9 +1073,11 @@ async def test_bulk_upsert_entity_with_lineage_parts_appends_distinct_keys(store |
994 | 1073 | got = await s.get_entity("Alice") |
995 | 1074 | assert got is not None |
996 | 1075 | keys = {(lm.document_id, lm.parse_version) for lm in got.source_lineage} |
997 | | - assert keys == {("doc-A", "v1"), ("doc-A", "v2"), ("doc-B", "v1")}, ( |
998 | | - f"bulk with distinct keys MUST NOT wipe pre-existing lineage; got {keys}" |
999 | | - ) |
| 1076 | + assert keys == { |
| 1077 | + ("doc-A", "v1"), |
| 1078 | + ("doc-A", "v2"), |
| 1079 | + ("doc-B", "v1"), |
| 1080 | + }, f"bulk with distinct keys MUST NOT wipe pre-existing lineage; got {keys}" |
1000 | 1081 |
|
1001 | 1082 |
|
1002 | 1083 | @pytest.mark.asyncio |
@@ -1051,3 +1132,227 @@ async def test_bulk_upsert_entity_with_lineage_parts_replay_is_idempotent(store, |
1051 | 1132 | assert parts_by_key[("doc-A", "v1")] == "v1-replay", ( |
1052 | 1133 | f"replay MUST overwrite description (last-wins); got {parts_by_key.get(('doc-A', 'v1'))!r}" |
1053 | 1134 | ) |
| 1135 | + |
| 1136 | + |
| 1137 | +# --- task #31 B2 — description-free LineageEntityMerger cross-backend ----- |
| 1138 | +# |
| 1139 | +# task #31 A3 introduced ``LineageEntityMerger.merge_entities_apply_description_free`` |
| 1140 | +# as the async accept-apply path. This is application-layer logic built from |
| 1141 | +# ``LineageGraphStore`` primitives, so the cross-backend contract belongs here |
| 1142 | +# (not on a nonexistent ``LineageGraphStore.merge_entities`` method). |
| 1143 | +# |
| 1144 | +# Contract pinned across PostgreSQL / Neo4j / Nebula: |
| 1145 | +# * source lineage members are re-anchored under the canonical target |
| 1146 | +# without copying stale source description text; |
| 1147 | +# * sources are deleted from L1, but the vector layer is untouched; |
| 1148 | +# * no ``__curation_merge__`` sentinel lineage is written; |
| 1149 | +# * replay after the sources are already consumed is idempotent; |
| 1150 | +# * failures before graph-store writes leave entity rows unchanged. |
| 1151 | + |
| 1152 | + |
| 1153 | +@pytest.mark.asyncio |
| 1154 | +async def test_lineage_entity_merger_description_free_apply_reanchors_lineage_cross_backend(store, collection_id): |
| 1155 | + """Description-free accept-apply preserves source lineage without |
| 1156 | + leaking source descriptions or writing the legacy curation sentinel. |
| 1157 | +
|
| 1158 | + This is the cross-backend test requested by task #31 § 3.1.4 / |
| 1159 | + § 5.2.b and Bryce's P1 gap: it exercises |
| 1160 | + ``LineageEntityMerger`` behaviour over the real backend stores, |
| 1161 | + while replacing LLM / Compactor / vector collaborators with |
| 1162 | + fail-fast doubles so any description-bearing legacy call regresses |
| 1163 | + loudly. |
| 1164 | + """ |
| 1165 | + |
| 1166 | + _, s = store |
| 1167 | + alias_repo = _AliasRepoDouble() |
| 1168 | + merger = _description_free_merger(s, collection_id, alias_repo) |
| 1169 | + |
| 1170 | + target_lineage = LineageMember( |
| 1171 | + document_id="target-doc", |
| 1172 | + parse_version="v1", |
| 1173 | + tenant_scope_key="tenant-X", |
| 1174 | + chunk_ids=("target-c1",), |
| 1175 | + ) |
| 1176 | + source_a_v1 = LineageMember( |
| 1177 | + document_id="source-doc-a", |
| 1178 | + parse_version="v1", |
| 1179 | + tenant_scope_key="tenant-X", |
| 1180 | + chunk_ids=("source-a-c1",), |
| 1181 | + ) |
| 1182 | + source_a_v2 = LineageMember( |
| 1183 | + document_id="source-doc-a", |
| 1184 | + parse_version="v2", |
| 1185 | + tenant_scope_key="tenant-X", |
| 1186 | + chunk_ids=("source-a-c2",), |
| 1187 | + ) |
| 1188 | + source_b_v1 = LineageMember( |
| 1189 | + document_id="source-doc-b", |
| 1190 | + parse_version="v1", |
| 1191 | + tenant_scope_key="tenant-X", |
| 1192 | + chunk_ids=("source-b-c1", "source-b-c2"), |
| 1193 | + ) |
| 1194 | + |
| 1195 | + await s.upsert_entity_with_lineage( |
| 1196 | + record=EntityRecord( |
| 1197 | + name="Apple Inc.", |
| 1198 | + entity_type="organization", |
| 1199 | + description="", |
| 1200 | + source_chunk_ids=("target-c1",), |
| 1201 | + ), |
| 1202 | + lineage=target_lineage, |
| 1203 | + compacted_description="legacy-target-summary", |
| 1204 | + ) |
| 1205 | + await s.upsert_entity_with_lineage( |
| 1206 | + record=EntityRecord( |
| 1207 | + name="Apple", |
| 1208 | + entity_type="organization", |
| 1209 | + description="SHOULD_NOT_LEAK_source_a_v1", |
| 1210 | + source_chunk_ids=("source-a-c1",), |
| 1211 | + ), |
| 1212 | + lineage=source_a_v1, |
| 1213 | + ) |
| 1214 | + await s.upsert_entity_with_lineage( |
| 1215 | + record=EntityRecord( |
| 1216 | + name="Apple", |
| 1217 | + entity_type="organization", |
| 1218 | + description="SHOULD_NOT_LEAK_source_a_v2", |
| 1219 | + source_chunk_ids=("source-a-c2",), |
| 1220 | + ), |
| 1221 | + lineage=source_a_v2, |
| 1222 | + ) |
| 1223 | + await s.upsert_entity_with_lineage( |
| 1224 | + record=EntityRecord( |
| 1225 | + name="AAPL", |
| 1226 | + entity_type="organization", |
| 1227 | + description="SHOULD_NOT_LEAK_source_b_v1", |
| 1228 | + source_chunk_ids=("source-b-c1", "source-b-c2"), |
| 1229 | + ), |
| 1230 | + lineage=source_b_v1, |
| 1231 | + ) |
| 1232 | + |
| 1233 | + result = await merger.merge_entities_apply_description_free( |
| 1234 | + target_name="Apple Inc.", |
| 1235 | + source_names=["Apple", "AAPL"], |
| 1236 | + merged_by="reviewer-1", |
| 1237 | + ) |
| 1238 | + |
| 1239 | + assert result.final_target == "Apple Inc." |
| 1240 | + assert result.merged_source_ids == ["Apple", "AAPL"] |
| 1241 | + assert result.unified_description == "" |
| 1242 | + assert result.compacted_description is None |
| 1243 | + assert alias_repo.upsert_calls == [ |
| 1244 | + { |
| 1245 | + "collection_id": collection_id, |
| 1246 | + "alias_name": "Apple", |
| 1247 | + "target": "Apple Inc.", |
| 1248 | + "merged_by": "reviewer-1", |
| 1249 | + }, |
| 1250 | + { |
| 1251 | + "collection_id": collection_id, |
| 1252 | + "alias_name": "AAPL", |
| 1253 | + "target": "Apple Inc.", |
| 1254 | + "merged_by": "reviewer-1", |
| 1255 | + }, |
| 1256 | + ] |
| 1257 | + |
| 1258 | + target_after = await s.get_entity("Apple Inc.") |
| 1259 | + assert target_after is not None |
| 1260 | + assert await s.get_entity("Apple") is None |
| 1261 | + assert await s.get_entity("AAPL") is None |
| 1262 | + |
| 1263 | + expected_keys = { |
| 1264 | + ("target-doc", "v1"), |
| 1265 | + ("source-doc-a", "v1"), |
| 1266 | + ("source-doc-a", "v2"), |
| 1267 | + ("source-doc-b", "v1"), |
| 1268 | + } |
| 1269 | + assert _lineage_keys(target_after) == expected_keys |
| 1270 | + assert all(member.document_id != CURATION_MERGE_DOCUMENT_ID for member in target_after.source_lineage), ( |
| 1271 | + "description-free apply must not write the legacy __curation_merge__ sentinel lineage" |
| 1272 | + ) |
| 1273 | + |
| 1274 | + descriptions = _description_texts_by_key(target_after) |
| 1275 | + assert descriptions[("source-doc-a", "v1")] == "" |
| 1276 | + assert descriptions[("source-doc-a", "v2")] == "" |
| 1277 | + assert descriptions[("source-doc-b", "v1")] == "" |
| 1278 | + assert not any("SHOULD_NOT_LEAK" in text for text in descriptions.values()) |
| 1279 | + |
| 1280 | + # Replay after the queue redelivers the same accept-apply payload: |
| 1281 | + # source rows are already gone, so the method should not duplicate |
| 1282 | + # target lineage or resurrect descriptions. |
| 1283 | + await merger.merge_entities_apply_description_free( |
| 1284 | + target_name="Apple Inc.", |
| 1285 | + source_names=["Apple", "AAPL"], |
| 1286 | + merged_by="reviewer-1", |
| 1287 | + ) |
| 1288 | + replayed = await s.get_entity("Apple Inc.") |
| 1289 | + assert replayed is not None |
| 1290 | + assert _lineage_keys(replayed) == expected_keys |
| 1291 | + replayed_descriptions = _description_texts_by_key(replayed) |
| 1292 | + assert replayed_descriptions[("source-doc-a", "v1")] == "" |
| 1293 | + assert replayed_descriptions[("source-doc-a", "v2")] == "" |
| 1294 | + assert replayed_descriptions[("source-doc-b", "v1")] == "" |
| 1295 | + |
| 1296 | + |
| 1297 | +@pytest.mark.asyncio |
| 1298 | +async def test_lineage_entity_merger_description_free_apply_alias_failure_has_no_graph_side_effect( |
| 1299 | + store, |
| 1300 | + collection_id, |
| 1301 | +): |
| 1302 | + """If alias validation fails before graph-store writes, backend |
| 1303 | + entity rows must remain unchanged. |
| 1304 | +
|
| 1305 | + This pins the zero-side-effect-on-raise edge of the application |
| 1306 | + contract without requiring alias-map DB setup in every graph |
| 1307 | + backend. The real alias repository owns its own transaction; this |
| 1308 | + test proves the merger does not touch the backend store after an |
| 1309 | + alias-layer exception. |
| 1310 | + """ |
| 1311 | + |
| 1312 | + _, s = store |
| 1313 | + alias_repo = _AliasRepoDouble(raise_on_upsert=RuntimeError("alias cycle")) |
| 1314 | + merger = _description_free_merger(s, collection_id, alias_repo) |
| 1315 | + |
| 1316 | + await s.upsert_entity_with_lineage( |
| 1317 | + record=EntityRecord( |
| 1318 | + name="Canonical", |
| 1319 | + entity_type="organization", |
| 1320 | + description="", |
| 1321 | + source_chunk_ids=("target-c1",), |
| 1322 | + ), |
| 1323 | + lineage=LineageMember( |
| 1324 | + document_id="target-doc", |
| 1325 | + parse_version="v1", |
| 1326 | + tenant_scope_key="tenant-X", |
| 1327 | + chunk_ids=("target-c1",), |
| 1328 | + ), |
| 1329 | + ) |
| 1330 | + await s.upsert_entity_with_lineage( |
| 1331 | + record=EntityRecord( |
| 1332 | + name="Alias", |
| 1333 | + entity_type="organization", |
| 1334 | + description="SHOULD_STILL_EXIST", |
| 1335 | + source_chunk_ids=("source-c1",), |
| 1336 | + ), |
| 1337 | + lineage=LineageMember( |
| 1338 | + document_id="source-doc", |
| 1339 | + parse_version="v1", |
| 1340 | + tenant_scope_key="tenant-X", |
| 1341 | + chunk_ids=("source-c1",), |
| 1342 | + ), |
| 1343 | + ) |
| 1344 | + |
| 1345 | + with pytest.raises(RuntimeError, match="alias cycle"): |
| 1346 | + await merger.merge_entities_apply_description_free( |
| 1347 | + target_name="Canonical", |
| 1348 | + source_names=["Alias"], |
| 1349 | + merged_by="reviewer-1", |
| 1350 | + ) |
| 1351 | + |
| 1352 | + target_after = await s.get_entity("Canonical") |
| 1353 | + source_after = await s.get_entity("Alias") |
| 1354 | + assert target_after is not None |
| 1355 | + assert source_after is not None |
| 1356 | + assert _lineage_keys(target_after) == {("target-doc", "v1")} |
| 1357 | + assert _lineage_keys(source_after) == {("source-doc", "v1")} |
| 1358 | + assert _description_texts_by_key(source_after)[("source-doc", "v1")] == "SHOULD_STILL_EXIST" |
0 commit comments