Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 90 additions & 3 deletions aperag/graph_curation/alias_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

Persists user-driven entity merge intent. The :class:`AliasMapRepository`
is the canonical write/read surface; downstream consumers use the two
methods :meth:`AliasMapRepository.resolve_canonical` (read) and
:meth:`AliasMapRepository.upsert_alias` (write, with cycle reject and
transitive flatten).
methods :meth:`AliasMapRepository.resolve_canonical` (read,
single-name) / :meth:`AliasMapRepository.resolve_canonical_many` (read,
batch) and :meth:`AliasMapRepository.upsert_alias` (write, with cycle
reject and transitive flatten).

Design notes
------------
Expand All @@ -41,6 +42,7 @@
from __future__ import annotations

import logging
from typing import Sequence

from sqlalchemy import delete, select, update
from sqlalchemy.ext.asyncio import AsyncSession
Expand Down Expand Up @@ -80,6 +82,11 @@ async def resolve_canonical(self, *, collection_id: str, name: str) -> str:
(i.e. ``name`` is already canonical or has never been merged).
Returns at most one indirection because :meth:`upsert_alias`
flattens transitively at write time.

Single-name reads are cheap; for batched (n > 1) callers prefer
:meth:`resolve_canonical_many` which folds N lookups into one
SQL roundtrip — see task #61 P2-S1 (Planetegg msg=db7fb085 +
msg=1314ac59 batch alias resolution P2-HIGH).
"""
if not name:
return name
Expand All @@ -92,6 +99,86 @@ async def _op(session: AsyncSession) -> str:

return await self._execute_query(_op)

async def resolve_canonical_many(
self,
*,
collection_id: str,
names: Sequence[str],
) -> dict[str, str]:
"""Batch alias resolution — single SQL ``SELECT ... WHERE ... IN``
roundtrip (task #61 P2-S1+S2).

Returns a mapping from each input name to its canonical form.
Names with no alias row map to themselves (mirrors
:meth:`resolve_canonical` semantics). Empty / falsy names also
map to themselves so callers don't have to filter input.

Why this exists: pre-task-#61-P2 the only public API was the
per-name :meth:`resolve_canonical`. Callers that needed to
resolve N names did so via ``asyncio.gather`` of N parallel
coroutines — each one acquired a separate ``AsyncSession`` /
DB connection. On
:meth:`LineageGraphStoreWithAliasRedirect.expand_neighbors_n_hops`
N is the seed cap of the calling endpoint, which can be large:

* ``GET /api/v2/collections/{id}/graphs?max_nodes=1000``
→ up to **2 × max_nodes = 2000** seeds (per Planetegg
msg=db7fb085 + spec § 2.4 P2-S1 quantification).
* ``GET /graphs/hybrid``: default 1000 / max 5000 seeds.

2000 parallel ``resolve_canonical`` calls translate to 2000
connection-pool checkouts — Singapore production observed PG
connection saturation on the ``/graphs`` endpoint
(Planetegg msg=4043adf4 SRE diagnostic).

Implementation: in-place dedupe + single ``SELECT alias_name,
canonical_name FROM aperag_lineage_entity_alias WHERE
collection_id = ? AND alias_name IN (...)`` reads all matching
rows in one shot. Names absent from the result set fall back
to themselves. Total connections checked out: **1**.

Order of the input is preserved on the dict's iteration order
(Python ``dict`` preserves insertion order since 3.7).
"""
# Map empty / falsy names to themselves up-front, then dedupe
# the rest. ``dict`` insertion order preserves caller order.
out: dict[str, str] = {}
unique_names: list[str] = []
seen: set[str] = set()
for n in names:
if not n:
out[n] = n
continue
if n in seen:
continue
seen.add(n)
unique_names.append(n)
if not unique_names:
return out

async def _op(session: AsyncSession) -> dict[str, str]:
stmt = select(
LineageEntityAlias.alias_name,
LineageEntityAlias.canonical_name,
).where(
LineageEntityAlias.collection_id == collection_id,
LineageEntityAlias.alias_name.in_(unique_names),
)
result = await session.execute(stmt)
return {str(row[0]): str(row[1]) for row in result.all()}

resolved = await self._execute_query(_op)

# Restore caller order: every input ``name`` (in the order it
# was passed) gets a key in the output. Names that didn't show
# up in the SQL result map to themselves (no alias row → name
# is already canonical).
for n in names:
if n in out: # already added (empty / falsy short-circuit)
continue
out[n] = resolved.get(n, n)
return out

async def list_aliases_pointing_at(self, *, collection_id: str, canonical_name: str) -> list[str]:
"""Return every ``alias_name`` whose row points at
``canonical_name`` in ``collection_id``.
Expand Down
35 changes: 24 additions & 11 deletions aperag/indexing/alias_redirect_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@

from __future__ import annotations

import asyncio
import logging
from dataclasses import replace
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -263,20 +262,34 @@ async def expand_neighbors_n_hops(
) -> tuple[list[EntityWithLineage], list[RelationWithLineage]]:
if not entity_names:
return await self._inner.expand_neighbors_n_hops(entity_names=entity_names, hops=hops)
# Resolve every anchor through the alias map so a caller seeding
# ``["Alicia"]`` walks the canonical ``Alice`` neighbourhood.
# ``asyncio.gather`` keeps the per-anchor lookup cost flat —
# N is bounded by the caller (typically small).
resolved = await asyncio.gather(
*(self._alias_repo.resolve_canonical(collection_id=self._collection_id, name=n) for n in entity_names),
return_exceptions=False,
# Resolve every anchor through the alias map so a caller
# seeding ``["Alicia"]`` walks the canonical ``Alice``
# neighbourhood.
#
# task #61 P2-S1+S2 (per Planetegg msg=db7fb085 +
# msg=1314ac59 + spec § 2.4): batched resolution via
# ``resolve_canonical_many`` so we issue ONE SQL roundtrip
# regardless of the seed count. Pre-fix this site used
# ``asyncio.gather`` of N per-name ``resolve_canonical``
# coroutines, which checked out N PG connections in
# parallel. ``GET /graphs?max_nodes=1000`` → up to 2 ×
# 1000 = 2000 seeds (per spec quantification); ``GET
# /graphs/hybrid`` allows up to 5000 seeds. At those
# cardinalities the connection pool saturates and unrelated
# API requests stall — Singapore production observed this
# exact saturation pattern (Planetegg msg=4043adf4 SRE
# diagnostic).
resolved_map = await self._alias_repo.resolve_canonical_many(
collection_id=self._collection_id,
names=entity_names,
)
# De-dup so ``["Alicia", "Alice"]`` (alias + canonical mixed)
# doesn't double-traverse the same anchor; preserve input order
# for deterministic output.
# doesn't double-traverse the same anchor; preserve input
# order for deterministic output.
seen: set[str] = set()
canonical_anchors: list[str] = []
for original, canonical in zip(entity_names, resolved):
for original in entity_names:
canonical = resolved_map.get(original, original)
if canonical not in seen:
seen.add(canonical)
canonical_anchors.append(canonical)
Expand Down
127 changes: 127 additions & 0 deletions tests/unit_test/graph_curation/test_alias_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,130 @@ async def test_list_aliases_pointing_at_returns_alphabetical(session):

pointing_at_C = await repo.list_aliases_pointing_at(collection_id="c1", canonical_name="C")
assert pointing_at_C == ["Alpha", "Mu", "Zeta"]


# ---------------------------------------------------------------------
# task #61 P2-S1+S2 — batch resolve_canonical_many
# ---------------------------------------------------------------------


@pytest.mark.asyncio
async def test_resolve_canonical_many_returns_self_for_unmapped_names(session):
"""No alias rows → every input name maps to itself (mirrors the
single-name :meth:`resolve_canonical` semantic)."""
repo = AliasMapRepository(session=session)
out = await repo.resolve_canonical_many(collection_id="c1", names=["Apple", "Banana", "Cherry"])
assert out == {"Apple": "Apple", "Banana": "Banana", "Cherry": "Cherry"}


@pytest.mark.asyncio
async def test_resolve_canonical_many_mixed_alias_and_canonical(session):
"""Some inputs are aliases, some are already canonical, some
don't exist at all — each maps to its correct resolution."""
repo = AliasMapRepository(session=session)
await repo.upsert_alias(collection_id="c1", alias_name="Apple", target="Apple Inc.")
await repo.upsert_alias(collection_id="c1", alias_name="MS", target="Microsoft")

out = await repo.resolve_canonical_many(
collection_id="c1",
names=["Apple", "MS", "Apple Inc.", "Banana"], # alias / alias / canonical / unknown
)
assert out == {
"Apple": "Apple Inc.",
"MS": "Microsoft",
"Apple Inc.": "Apple Inc.",
"Banana": "Banana",
}


@pytest.mark.asyncio
async def test_resolve_canonical_many_dedupes_input(session):
"""Duplicate input names result in a single dict entry (Python
dict semantics) but with the same canonical resolution. Pinned to
catch a future refactor that accidentally returns multi-value
output."""
repo = AliasMapRepository(session=session)
await repo.upsert_alias(collection_id="c1", alias_name="Apple", target="Apple Inc.")
out = await repo.resolve_canonical_many(
collection_id="c1",
names=["Apple", "Apple", "Apple Inc.", "Apple"],
)
# ``dict`` collapses duplicates by key; insertion-order is
# preserved (Apple first, Apple Inc. second).
assert out == {"Apple": "Apple Inc.", "Apple Inc.": "Apple Inc."}


@pytest.mark.asyncio
async def test_resolve_canonical_many_empty_input(session):
"""Empty input → empty output (defensive — caller bridges the
edge without an extra ``if names`` branch)."""
repo = AliasMapRepository(session=session)
assert await repo.resolve_canonical_many(collection_id="c1", names=[]) == {}


@pytest.mark.asyncio
async def test_resolve_canonical_many_handles_empty_string(session):
"""Empty / falsy names short-circuit to themselves without an SQL
lookup (mirrors single-name :meth:`resolve_canonical` defensive
branch)."""
repo = AliasMapRepository(session=session)
await repo.upsert_alias(collection_id="c1", alias_name="Real", target="Resolved")
out = await repo.resolve_canonical_many(collection_id="c1", names=["", "Real", ""])
assert out == {"": "", "Real": "Resolved"}


@pytest.mark.asyncio
async def test_resolve_canonical_many_per_collection_isolation(session):
"""Same alias_name in different collections resolves to different
canonical names — task #61 spec § 2.4 P2-S1 cross-collection seed
cap test pinpoints this isolation."""
repo = AliasMapRepository(session=session)
await repo.upsert_alias(collection_id="c1", alias_name="Apple", target="Apple Inc.")
await repo.upsert_alias(collection_id="c2", alias_name="Apple", target="Apple Records")

c1_out = await repo.resolve_canonical_many(collection_id="c1", names=["Apple"])
c2_out = await repo.resolve_canonical_many(collection_id="c2", names=["Apple"])
assert c1_out == {"Apple": "Apple Inc."}
assert c2_out == {"Apple": "Apple Records"}


@pytest.mark.asyncio
async def test_resolve_canonical_many_large_seed_cap(session):
"""Pinned to catch a future regression that re-introduces per-name
DB roundtrips: even at the ``/graphs?max_nodes=1000`` worst case
(2 × max_nodes = 2000 seeds, per spec § 2.4 P2-S1 quantification),
the batch API must complete with a single SQL roundtrip.

We can't directly assert "1 SQL roundtrip" in a pure-unit test
against in-memory SQLite, but we can pin the *result correctness*
at the spec-quantified seed cardinality so a future refactor that
silently re-fans-out would either time out (in-memory SQLite is
fast enough that 2000 SELECT roundtrips is ~10ms — observable
only via a perf timeout) or break correctness.

The companion :func:`test_expand_neighbors_uses_batch_alias_resolution`
in ``test_alias_redirect_store.py`` pins the call-graph: the
``expand_neighbors_n_hops`` site MUST go through
``resolve_canonical_many`` exactly once (not N
``resolve_canonical`` calls).
"""
repo = AliasMapRepository(session=session)
# Seed 50 aliases (cheap on sqlite); query 2000 names where the
# first 50 are mapped + the remaining 1950 are unmapped (resolve
# to themselves).
for i in range(50):
await repo.upsert_alias(
collection_id="c1",
alias_name=f"alias_{i}",
target=f"canonical_{i}",
)

names = [f"alias_{i}" for i in range(50)] + [f"unmapped_{i}" for i in range(1950)]
out = await repo.resolve_canonical_many(collection_id="c1", names=names)

# Spot-check shape + a few representative rows.
assert len(out) == 2000
assert out["alias_0"] == "canonical_0"
assert out["alias_49"] == "canonical_49"
assert out["unmapped_0"] == "unmapped_0"
assert out["unmapped_1949"] == "unmapped_1949"
Loading
Loading