diff --git a/aperag/graph_curation/alias_map.py b/aperag/graph_curation/alias_map.py index c5e9cf5e0..4bd62bda2 100644 --- a/aperag/graph_curation/alias_map.py +++ b/aperag/graph_curation/alias_map.py @@ -16,9 +16,10 @@ Persists user-driven entity merge intent. The :class:`AliasMapRepository` is the canonical write/read surface; downstream consumers use the two -methods :meth:`AliasMapRepository.resolve_canonical` (read) and -:meth:`AliasMapRepository.upsert_alias` (write, with cycle reject and -transitive flatten). +methods :meth:`AliasMapRepository.resolve_canonical` (read, +single-name) / :meth:`AliasMapRepository.resolve_canonical_many` (read, +batch) and :meth:`AliasMapRepository.upsert_alias` (write, with cycle +reject and transitive flatten). Design notes ------------ @@ -41,6 +42,7 @@ from __future__ import annotations import logging +from typing import Sequence from sqlalchemy import delete, select, update from sqlalchemy.ext.asyncio import AsyncSession @@ -80,6 +82,11 @@ async def resolve_canonical(self, *, collection_id: str, name: str) -> str: (i.e. ``name`` is already canonical or has never been merged). Returns at most one indirection because :meth:`upsert_alias` flattens transitively at write time. + + Single-name reads are cheap; for batched (n > 1) callers prefer + :meth:`resolve_canonical_many` which folds N lookups into one + SQL roundtrip — see task #61 P2-S1 (Planetegg msg=db7fb085 + + msg=1314ac59 batch alias resolution P2-HIGH). """ if not name: return name @@ -92,6 +99,86 @@ async def _op(session: AsyncSession) -> str: return await self._execute_query(_op) + async def resolve_canonical_many( + self, + *, + collection_id: str, + names: Sequence[str], + ) -> dict[str, str]: + """Batch alias resolution — single SQL ``SELECT ... WHERE ... IN`` + roundtrip (task #61 P2-S1+S2). + + Returns a mapping from each input name to its canonical form. + Names with no alias row map to themselves (mirrors + :meth:`resolve_canonical` semantics). Empty / falsy names also + map to themselves so callers don't have to filter input. + + Why this exists: pre-task-#61-P2 the only public API was the + per-name :meth:`resolve_canonical`. Callers that needed to + resolve N names did so via ``asyncio.gather`` of N parallel + coroutines — each one acquired a separate ``AsyncSession`` / + DB connection. On + :meth:`LineageGraphStoreWithAliasRedirect.expand_neighbors_n_hops` + N is the seed cap of the calling endpoint, which can be large: + + * ``GET /api/v2/collections/{id}/graphs?max_nodes=1000`` + → up to **2 × max_nodes = 2000** seeds (per Planetegg + msg=db7fb085 + spec § 2.4 P2-S1 quantification). + * ``GET /graphs/hybrid``: default 1000 / max 5000 seeds. + + 2000 parallel ``resolve_canonical`` calls translate to 2000 + connection-pool checkouts — Singapore production observed PG + connection saturation on the ``/graphs`` endpoint + (Planetegg msg=4043adf4 SRE diagnostic). + + Implementation: in-place dedupe + single ``SELECT alias_name, + canonical_name FROM aperag_lineage_entity_alias WHERE + collection_id = ? AND alias_name IN (...)`` reads all matching + rows in one shot. Names absent from the result set fall back + to themselves. Total connections checked out: **1**. + + Order of the input is preserved on the dict's iteration order + (Python ``dict`` preserves insertion order since 3.7). + """ + # Map empty / falsy names to themselves up-front, then dedupe + # the rest. ``dict`` insertion order preserves caller order. + out: dict[str, str] = {} + unique_names: list[str] = [] + seen: set[str] = set() + for n in names: + if not n: + out[n] = n + continue + if n in seen: + continue + seen.add(n) + unique_names.append(n) + if not unique_names: + return out + + async def _op(session: AsyncSession) -> dict[str, str]: + stmt = select( + LineageEntityAlias.alias_name, + LineageEntityAlias.canonical_name, + ).where( + LineageEntityAlias.collection_id == collection_id, + LineageEntityAlias.alias_name.in_(unique_names), + ) + result = await session.execute(stmt) + return {str(row[0]): str(row[1]) for row in result.all()} + + resolved = await self._execute_query(_op) + + # Restore caller order: every input ``name`` (in the order it + # was passed) gets a key in the output. Names that didn't show + # up in the SQL result map to themselves (no alias row → name + # is already canonical). + for n in names: + if n in out: # already added (empty / falsy short-circuit) + continue + out[n] = resolved.get(n, n) + return out + async def list_aliases_pointing_at(self, *, collection_id: str, canonical_name: str) -> list[str]: """Return every ``alias_name`` whose row points at ``canonical_name`` in ``collection_id``. diff --git a/aperag/indexing/alias_redirect_store.py b/aperag/indexing/alias_redirect_store.py index 272e9b68d..7f7d03353 100644 --- a/aperag/indexing/alias_redirect_store.py +++ b/aperag/indexing/alias_redirect_store.py @@ -70,7 +70,6 @@ from __future__ import annotations -import asyncio import logging from dataclasses import replace from typing import TYPE_CHECKING @@ -263,20 +262,34 @@ async def expand_neighbors_n_hops( ) -> tuple[list[EntityWithLineage], list[RelationWithLineage]]: if not entity_names: return await self._inner.expand_neighbors_n_hops(entity_names=entity_names, hops=hops) - # Resolve every anchor through the alias map so a caller seeding - # ``["Alicia"]`` walks the canonical ``Alice`` neighbourhood. - # ``asyncio.gather`` keeps the per-anchor lookup cost flat — - # N is bounded by the caller (typically small). - resolved = await asyncio.gather( - *(self._alias_repo.resolve_canonical(collection_id=self._collection_id, name=n) for n in entity_names), - return_exceptions=False, + # Resolve every anchor through the alias map so a caller + # seeding ``["Alicia"]`` walks the canonical ``Alice`` + # neighbourhood. + # + # task #61 P2-S1+S2 (per Planetegg msg=db7fb085 + + # msg=1314ac59 + spec § 2.4): batched resolution via + # ``resolve_canonical_many`` so we issue ONE SQL roundtrip + # regardless of the seed count. Pre-fix this site used + # ``asyncio.gather`` of N per-name ``resolve_canonical`` + # coroutines, which checked out N PG connections in + # parallel. ``GET /graphs?max_nodes=1000`` → up to 2 × + # 1000 = 2000 seeds (per spec quantification); ``GET + # /graphs/hybrid`` allows up to 5000 seeds. At those + # cardinalities the connection pool saturates and unrelated + # API requests stall — Singapore production observed this + # exact saturation pattern (Planetegg msg=4043adf4 SRE + # diagnostic). + resolved_map = await self._alias_repo.resolve_canonical_many( + collection_id=self._collection_id, + names=entity_names, ) # De-dup so ``["Alicia", "Alice"]`` (alias + canonical mixed) - # doesn't double-traverse the same anchor; preserve input order - # for deterministic output. + # doesn't double-traverse the same anchor; preserve input + # order for deterministic output. seen: set[str] = set() canonical_anchors: list[str] = [] - for original, canonical in zip(entity_names, resolved): + for original in entity_names: + canonical = resolved_map.get(original, original) if canonical not in seen: seen.add(canonical) canonical_anchors.append(canonical) diff --git a/tests/unit_test/graph_curation/test_alias_map.py b/tests/unit_test/graph_curation/test_alias_map.py index d629b3ca9..925988af1 100644 --- a/tests/unit_test/graph_curation/test_alias_map.py +++ b/tests/unit_test/graph_curation/test_alias_map.py @@ -195,3 +195,130 @@ async def test_list_aliases_pointing_at_returns_alphabetical(session): pointing_at_C = await repo.list_aliases_pointing_at(collection_id="c1", canonical_name="C") assert pointing_at_C == ["Alpha", "Mu", "Zeta"] + + +# --------------------------------------------------------------------- +# task #61 P2-S1+S2 — batch resolve_canonical_many +# --------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_resolve_canonical_many_returns_self_for_unmapped_names(session): + """No alias rows → every input name maps to itself (mirrors the + single-name :meth:`resolve_canonical` semantic).""" + repo = AliasMapRepository(session=session) + out = await repo.resolve_canonical_many(collection_id="c1", names=["Apple", "Banana", "Cherry"]) + assert out == {"Apple": "Apple", "Banana": "Banana", "Cherry": "Cherry"} + + +@pytest.mark.asyncio +async def test_resolve_canonical_many_mixed_alias_and_canonical(session): + """Some inputs are aliases, some are already canonical, some + don't exist at all — each maps to its correct resolution.""" + repo = AliasMapRepository(session=session) + await repo.upsert_alias(collection_id="c1", alias_name="Apple", target="Apple Inc.") + await repo.upsert_alias(collection_id="c1", alias_name="MS", target="Microsoft") + + out = await repo.resolve_canonical_many( + collection_id="c1", + names=["Apple", "MS", "Apple Inc.", "Banana"], # alias / alias / canonical / unknown + ) + assert out == { + "Apple": "Apple Inc.", + "MS": "Microsoft", + "Apple Inc.": "Apple Inc.", + "Banana": "Banana", + } + + +@pytest.mark.asyncio +async def test_resolve_canonical_many_dedupes_input(session): + """Duplicate input names result in a single dict entry (Python + dict semantics) but with the same canonical resolution. Pinned to + catch a future refactor that accidentally returns multi-value + output.""" + repo = AliasMapRepository(session=session) + await repo.upsert_alias(collection_id="c1", alias_name="Apple", target="Apple Inc.") + out = await repo.resolve_canonical_many( + collection_id="c1", + names=["Apple", "Apple", "Apple Inc.", "Apple"], + ) + # ``dict`` collapses duplicates by key; insertion-order is + # preserved (Apple first, Apple Inc. second). + assert out == {"Apple": "Apple Inc.", "Apple Inc.": "Apple Inc."} + + +@pytest.mark.asyncio +async def test_resolve_canonical_many_empty_input(session): + """Empty input → empty output (defensive — caller bridges the + edge without an extra ``if names`` branch).""" + repo = AliasMapRepository(session=session) + assert await repo.resolve_canonical_many(collection_id="c1", names=[]) == {} + + +@pytest.mark.asyncio +async def test_resolve_canonical_many_handles_empty_string(session): + """Empty / falsy names short-circuit to themselves without an SQL + lookup (mirrors single-name :meth:`resolve_canonical` defensive + branch).""" + repo = AliasMapRepository(session=session) + await repo.upsert_alias(collection_id="c1", alias_name="Real", target="Resolved") + out = await repo.resolve_canonical_many(collection_id="c1", names=["", "Real", ""]) + assert out == {"": "", "Real": "Resolved"} + + +@pytest.mark.asyncio +async def test_resolve_canonical_many_per_collection_isolation(session): + """Same alias_name in different collections resolves to different + canonical names — task #61 spec § 2.4 P2-S1 cross-collection seed + cap test pinpoints this isolation.""" + repo = AliasMapRepository(session=session) + await repo.upsert_alias(collection_id="c1", alias_name="Apple", target="Apple Inc.") + await repo.upsert_alias(collection_id="c2", alias_name="Apple", target="Apple Records") + + c1_out = await repo.resolve_canonical_many(collection_id="c1", names=["Apple"]) + c2_out = await repo.resolve_canonical_many(collection_id="c2", names=["Apple"]) + assert c1_out == {"Apple": "Apple Inc."} + assert c2_out == {"Apple": "Apple Records"} + + +@pytest.mark.asyncio +async def test_resolve_canonical_many_large_seed_cap(session): + """Pinned to catch a future regression that re-introduces per-name + DB roundtrips: even at the ``/graphs?max_nodes=1000`` worst case + (2 × max_nodes = 2000 seeds, per spec § 2.4 P2-S1 quantification), + the batch API must complete with a single SQL roundtrip. + + We can't directly assert "1 SQL roundtrip" in a pure-unit test + against in-memory SQLite, but we can pin the *result correctness* + at the spec-quantified seed cardinality so a future refactor that + silently re-fans-out would either time out (in-memory SQLite is + fast enough that 2000 SELECT roundtrips is ~10ms — observable + only via a perf timeout) or break correctness. + + The companion :func:`test_expand_neighbors_uses_batch_alias_resolution` + in ``test_alias_redirect_store.py`` pins the call-graph: the + ``expand_neighbors_n_hops`` site MUST go through + ``resolve_canonical_many`` exactly once (not N + ``resolve_canonical`` calls). + """ + repo = AliasMapRepository(session=session) + # Seed 50 aliases (cheap on sqlite); query 2000 names where the + # first 50 are mapped + the remaining 1950 are unmapped (resolve + # to themselves). + for i in range(50): + await repo.upsert_alias( + collection_id="c1", + alias_name=f"alias_{i}", + target=f"canonical_{i}", + ) + + names = [f"alias_{i}" for i in range(50)] + [f"unmapped_{i}" for i in range(1950)] + out = await repo.resolve_canonical_many(collection_id="c1", names=names) + + # Spot-check shape + a few representative rows. + assert len(out) == 2000 + assert out["alias_0"] == "canonical_0" + assert out["alias_49"] == "canonical_49" + assert out["unmapped_0"] == "unmapped_0" + assert out["unmapped_1949"] == "unmapped_1949" diff --git a/tests/unit_test/indexing/test_alias_redirect_store.py b/tests/unit_test/indexing/test_alias_redirect_store.py index be5aa46eb..bb7354690 100644 --- a/tests/unit_test/indexing/test_alias_redirect_store.py +++ b/tests/unit_test/indexing/test_alias_redirect_store.py @@ -52,12 +52,38 @@ class _FakeAliasRepo: + """Stub implementing both ``resolve_canonical`` (single-name, used + by the upsert / get / delete redirect paths) and + ``resolve_canonical_many`` (batch, used by + ``expand_neighbors_n_hops`` per task #61 P2-S1+S2). + + Tracks call counts so tests can pin the call-graph (e.g. the + expand path goes through the batch primitive once, not N + single-name calls).""" + def __init__(self, mapping: dict[str, str] | None = None) -> None: self._mapping = mapping or {} + self.resolve_canonical_calls = 0 + self.resolve_canonical_many_calls = 0 + self.last_many_names: list[str] | None = None async def resolve_canonical(self, *, collection_id: str, name: str) -> str: + self.resolve_canonical_calls += 1 return self._mapping.get(name, name) + async def resolve_canonical_many(self, *, collection_id: str, names) -> dict[str, str]: + self.resolve_canonical_many_calls += 1 + self.last_many_names = list(names) + out: dict[str, str] = {} + for n in names: + if not n: + out[n] = n + continue + if n in out: + continue + out[n] = self._mapping.get(n, n) + return out + def _record(name: str = "Apple") -> EntityRecord: return EntityRecord( @@ -351,6 +377,80 @@ async def test_expand_neighbors_empty_seeds_passes_through(): inner.expand_neighbors_n_hops.assert_awaited_once_with(entity_names=[], hops=1) +# --------------------------------------------------------------------- +# task #61 P2-S1+S2 — expand_neighbors goes through batch alias +# resolution (not per-name asyncio.gather) +# --------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_expand_neighbors_uses_batch_alias_resolution(): + """Pinned by task #61 P2-S1+S2 (Planetegg msg=db7fb085 + + msg=1314ac59 P2-HIGH): the per-anchor ``resolve_canonical`` + fan-out via ``asyncio.gather`` was the Singapore PG connection + saturation root cause. After this PR ``expand_neighbors_n_hops`` + MUST call ``resolve_canonical_many`` exactly once per invocation + — never falls back to per-name ``resolve_canonical`` regardless + of seed count. + + A regression that re-introduces the gather pattern would either: + (a) fall back to per-name ``resolve_canonical`` calls (caught by + the call-count assertion below), or (b) call + ``resolve_canonical_many`` once *per name* (caught by the count + == 1 assertion). + """ + inner = AsyncMock() + inner.expand_neighbors_n_hops = AsyncMock(return_value=([], [])) + repo = _FakeAliasRepo({"Alicia": "Alice", "Bobby": "Bob"}) + decorator = LineageGraphStoreWithAliasRedirect( + inner=inner, + alias_repo=repo, + collection_id="col-1", + ) + + # Use a "spec-cap-shaped" 5-name seed — large enough to make + # per-name fan-out visible if it were re-introduced. + await decorator.expand_neighbors_n_hops( + entity_names=["Alicia", "Bobby", "Charlie", "Dorothy", "Edward"], + hops=2, + ) + + # The single batch call carries every input name in order. + assert repo.resolve_canonical_many_calls == 1 + assert repo.last_many_names == ["Alicia", "Bobby", "Charlie", "Dorothy", "Edward"] + # Per-name gather path is gone. + assert repo.resolve_canonical_calls == 0 + # Inner saw the deduped + canonicalised anchor list. + inner.expand_neighbors_n_hops.assert_awaited_once_with( + entity_names=["Alice", "Bob", "Charlie", "Dorothy", "Edward"], + hops=2, + ) + + +@pytest.mark.asyncio +async def test_expand_neighbors_large_seed_cap_uses_single_batch_call(): + """Spec § 2.4 P2-S1 quantification: ``GET + /api/v2/collections/{id}/graphs?max_nodes=1000`` produces up to + ``2 × max_nodes = 2000`` seeds. Pinned that even at this seed + cap the worker-side alias resolution stays at one batch call. + """ + inner = AsyncMock() + inner.expand_neighbors_n_hops = AsyncMock(return_value=([], [])) + repo = _FakeAliasRepo() # no aliases — every input maps to itself + decorator = LineageGraphStoreWithAliasRedirect( + inner=inner, + alias_repo=repo, + collection_id="col-1", + ) + seeds = [f"entity_{i}" for i in range(2000)] + + await decorator.expand_neighbors_n_hops(entity_names=seeds, hops=1) + + assert repo.resolve_canonical_many_calls == 1 + assert len(repo.last_many_names or []) == 2000 + assert repo.resolve_canonical_calls == 0 + + # --------------------------------------------------------------------- # Decorator passthrough invariant (huangheng CR lock) # ---------------------------------------------------------------------