Skip to content

Commit eb4c4f3

Browse files
earayuclaude
andauthored
feat(graph_curation): task #61 P2-S1+S2 — batch alias resolution + N-seed PG connection saturation fix (#1950)
Closes task #88 per PM @不穷 dispatch (msg=8f130f25). Implements task #61 spec v1 § 2.4 P2-S1 + Planetegg P2-HIGH (msg=db7fb085 + msg=1314ac59) + Singapore SRE diagnostic (Planetegg msg=4043adf4) batch alias resolution. Background ---------- ``LineageGraphStoreWithAliasRedirect.expand_neighbors_n_hops`` is on the ``GET /api/v2/collections/{id}/graphs`` and ``/graphs/hybrid`` read paths. Pre-fix, it called ``AliasMapRepository.resolve_canonical`` once per anchor name via ``asyncio.gather``, which checks out one PG connection per name. Spec § 2.4 P2-S1 quantification: * ``GET /graphs?max_nodes=1000`` → up to **2 × max_nodes = 2000** seeds. * ``GET /graphs/hybrid``: default 1000 / max 5000 seeds. At those cardinalities the PG connection pool saturates, observed in Singapore production (Planetegg msg=4043adf4 SRE diagnostic). Changes ------- * ``aperag/graph_curation/alias_map.py``: new :meth:`AliasMapRepository.resolve_canonical_many` batch primitive. Single SQL ``SELECT alias_name, canonical_name FROM aperag_lineage_entity_alias WHERE collection_id=? AND alias_name IN (...)`` reads all matching rows in one shot. Names absent from the result set fall back to themselves (mirrors single-name ``resolve_canonical`` semantics). Empty / falsy names short- circuit without an SQL lookup. Total connections checked out: **1** per call regardless of seed count. Caller order is preserved on the dict iteration order (insertion order semantics). * ``aperag/indexing/alias_redirect_store.py``: rewrite ``LineageGraphStoreWithAliasRedirect.expand_neighbors_n_hops`` to use the batch primitive. ``asyncio.gather`` per-name fan-out gone; ``import asyncio`` no longer needed at module top-level. * Test stub ``_FakeAliasRepo`` in ``tests/unit_test/indexing/test_alias_redirect_store.py``: now implements both ``resolve_canonical`` (single, used by upsert/get/delete redirect paths) and ``resolve_canonical_many`` (batch, used by expand) + tracks call counts so tests can pin the call-graph (i.e. expand path goes through batch primitive exactly once). Tests ----- * ``tests/unit_test/graph_curation/test_alias_map.py`` (7 new): - ``test_resolve_canonical_many_returns_self_for_unmapped_names`` - ``test_resolve_canonical_many_mixed_alias_and_canonical`` - ``test_resolve_canonical_many_dedupes_input`` - ``test_resolve_canonical_many_empty_input`` - ``test_resolve_canonical_many_handles_empty_string`` - ``test_resolve_canonical_many_per_collection_isolation`` - ``test_resolve_canonical_many_large_seed_cap`` (2000-name spec quantification — pinned correctness at the spec-cap so a future regression that re-introduces per-name fan-out either times out or breaks the result shape). * ``tests/unit_test/indexing/test_alias_redirect_store.py`` (2 new): - ``test_expand_neighbors_uses_batch_alias_resolution`` — call-graph gate: exactly 1 ``resolve_canonical_many`` call, zero ``resolve_canonical`` calls, regardless of seed count. A regression that re-introduces the gather pattern is caught immediately. - ``test_expand_neighbors_large_seed_cap_uses_single_batch_call`` — 2000-seed spec-cap pinned at the call-graph level. Local: ``uv run pytest tests/unit_test/graph_curation/ tests/unit_test/indexing/test_alias_redirect_store.py`` → **56 passed, 1 warning**. Spec / scope alignment ---------------------- * task #61 spec v1 § 2.4 P2-S1 — batch resolve primitive ✅ * task #61 spec v1 § 2.4 P2-S2 — ``expand_neighbors_n_hops`` seed cap test ✅ * Lesson #17 backend 收敛 contract — single primitive replaces N- parallel fan-out at the same caller, no FE / API changes required ✅ * Lesson #18 mechanical gate codification — call-graph assertion in the redirect-store test is the mechanical gate (caught by CI on any future regression that bypasses the batch primitive) ✅ Follow-ups (NOT in this PR) --------------------------- * P3 cross-cutting concern: every ``LineageGraphStore`` consumer that currently invokes the alias path per-name (e.g. some GraphCurationService internals) should migrate to the batch primitive — independent task gated on production data showing the residual N-fan-out is a real bottleneck. Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 7d1adfa commit eb4c4f3

4 files changed

Lines changed: 341 additions & 14 deletions

File tree

aperag/graph_curation/alias_map.py

Lines changed: 90 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616
1717
Persists user-driven entity merge intent. The :class:`AliasMapRepository`
1818
is the canonical write/read surface; downstream consumers use the two
19-
methods :meth:`AliasMapRepository.resolve_canonical` (read) and
20-
:meth:`AliasMapRepository.upsert_alias` (write, with cycle reject and
21-
transitive flatten).
19+
methods :meth:`AliasMapRepository.resolve_canonical` (read,
20+
single-name) / :meth:`AliasMapRepository.resolve_canonical_many` (read,
21+
batch) and :meth:`AliasMapRepository.upsert_alias` (write, with cycle
22+
reject and transitive flatten).
2223
2324
Design notes
2425
------------
@@ -41,6 +42,7 @@
4142
from __future__ import annotations
4243

4344
import logging
45+
from typing import Sequence
4446

4547
from sqlalchemy import delete, select, update
4648
from sqlalchemy.ext.asyncio import AsyncSession
@@ -80,6 +82,11 @@ async def resolve_canonical(self, *, collection_id: str, name: str) -> str:
8082
(i.e. ``name`` is already canonical or has never been merged).
8183
Returns at most one indirection because :meth:`upsert_alias`
8284
flattens transitively at write time.
85+
86+
Single-name reads are cheap; for batched (n > 1) callers prefer
87+
:meth:`resolve_canonical_many` which folds N lookups into one
88+
SQL roundtrip — see task #61 P2-S1 (Planetegg msg=db7fb085 +
89+
msg=1314ac59 batch alias resolution P2-HIGH).
8390
"""
8491
if not name:
8592
return name
@@ -92,6 +99,86 @@ async def _op(session: AsyncSession) -> str:
9299

93100
return await self._execute_query(_op)
94101

102+
async def resolve_canonical_many(
103+
self,
104+
*,
105+
collection_id: str,
106+
names: Sequence[str],
107+
) -> dict[str, str]:
108+
"""Batch alias resolution — single SQL ``SELECT ... WHERE ... IN``
109+
roundtrip (task #61 P2-S1+S2).
110+
111+
Returns a mapping from each input name to its canonical form.
112+
Names with no alias row map to themselves (mirrors
113+
:meth:`resolve_canonical` semantics). Empty / falsy names also
114+
map to themselves so callers don't have to filter input.
115+
116+
Why this exists: pre-task-#61-P2 the only public API was the
117+
per-name :meth:`resolve_canonical`. Callers that needed to
118+
resolve N names did so via ``asyncio.gather`` of N parallel
119+
coroutines — each one acquired a separate ``AsyncSession`` /
120+
DB connection. On
121+
:meth:`LineageGraphStoreWithAliasRedirect.expand_neighbors_n_hops`
122+
N is the seed cap of the calling endpoint, which can be large:
123+
124+
* ``GET /api/v2/collections/{id}/graphs?max_nodes=1000``
125+
→ up to **2 × max_nodes = 2000** seeds (per Planetegg
126+
msg=db7fb085 + spec § 2.4 P2-S1 quantification).
127+
* ``GET /graphs/hybrid``: default 1000 / max 5000 seeds.
128+
129+
2000 parallel ``resolve_canonical`` calls translate to 2000
130+
connection-pool checkouts — Singapore production observed PG
131+
connection saturation on the ``/graphs`` endpoint
132+
(Planetegg msg=4043adf4 SRE diagnostic).
133+
134+
Implementation: in-place dedupe + single ``SELECT alias_name,
135+
canonical_name FROM aperag_lineage_entity_alias WHERE
136+
collection_id = ? AND alias_name IN (...)`` reads all matching
137+
rows in one shot. Names absent from the result set fall back
138+
to themselves. Total connections checked out: **1**.
139+
140+
Order of the input is preserved on the dict's iteration order
141+
(Python ``dict`` preserves insertion order since 3.7).
142+
"""
143+
# Map empty / falsy names to themselves up-front, then dedupe
144+
# the rest. ``dict`` insertion order preserves caller order.
145+
out: dict[str, str] = {}
146+
unique_names: list[str] = []
147+
seen: set[str] = set()
148+
for n in names:
149+
if not n:
150+
out[n] = n
151+
continue
152+
if n in seen:
153+
continue
154+
seen.add(n)
155+
unique_names.append(n)
156+
if not unique_names:
157+
return out
158+
159+
async def _op(session: AsyncSession) -> dict[str, str]:
160+
stmt = select(
161+
LineageEntityAlias.alias_name,
162+
LineageEntityAlias.canonical_name,
163+
).where(
164+
LineageEntityAlias.collection_id == collection_id,
165+
LineageEntityAlias.alias_name.in_(unique_names),
166+
)
167+
result = await session.execute(stmt)
168+
return {str(row[0]): str(row[1]) for row in result.all()}
169+
170+
resolved = await self._execute_query(_op)
171+
172+
# Restore caller order: every input ``name`` (in the order it
173+
# was passed) gets a key in the output. Names that didn't show
174+
# up in the SQL result map to themselves (no alias row → name
175+
# is already canonical).
176+
for n in names:
177+
if n in out: # already added (empty / falsy short-circuit)
178+
continue
179+
out[n] = resolved.get(n, n)
180+
return out
181+
95182
async def list_aliases_pointing_at(self, *, collection_id: str, canonical_name: str) -> list[str]:
96183
"""Return every ``alias_name`` whose row points at
97184
``canonical_name`` in ``collection_id``.

aperag/indexing/alias_redirect_store.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@
7070

7171
from __future__ import annotations
7272

73-
import asyncio
7473
import logging
7574
from dataclasses import replace
7675
from typing import TYPE_CHECKING
@@ -263,20 +262,34 @@ async def expand_neighbors_n_hops(
263262
) -> tuple[list[EntityWithLineage], list[RelationWithLineage]]:
264263
if not entity_names:
265264
return await self._inner.expand_neighbors_n_hops(entity_names=entity_names, hops=hops)
266-
# Resolve every anchor through the alias map so a caller seeding
267-
# ``["Alicia"]`` walks the canonical ``Alice`` neighbourhood.
268-
# ``asyncio.gather`` keeps the per-anchor lookup cost flat —
269-
# N is bounded by the caller (typically small).
270-
resolved = await asyncio.gather(
271-
*(self._alias_repo.resolve_canonical(collection_id=self._collection_id, name=n) for n in entity_names),
272-
return_exceptions=False,
265+
# Resolve every anchor through the alias map so a caller
266+
# seeding ``["Alicia"]`` walks the canonical ``Alice``
267+
# neighbourhood.
268+
#
269+
# task #61 P2-S1+S2 (per Planetegg msg=db7fb085 +
270+
# msg=1314ac59 + spec § 2.4): batched resolution via
271+
# ``resolve_canonical_many`` so we issue ONE SQL roundtrip
272+
# regardless of the seed count. Pre-fix this site used
273+
# ``asyncio.gather`` of N per-name ``resolve_canonical``
274+
# coroutines, which checked out N PG connections in
275+
# parallel. ``GET /graphs?max_nodes=1000`` → up to 2 ×
276+
# 1000 = 2000 seeds (per spec quantification); ``GET
277+
# /graphs/hybrid`` allows up to 5000 seeds. At those
278+
# cardinalities the connection pool saturates and unrelated
279+
# API requests stall — Singapore production observed this
280+
# exact saturation pattern (Planetegg msg=4043adf4 SRE
281+
# diagnostic).
282+
resolved_map = await self._alias_repo.resolve_canonical_many(
283+
collection_id=self._collection_id,
284+
names=entity_names,
273285
)
274286
# De-dup so ``["Alicia", "Alice"]`` (alias + canonical mixed)
275-
# doesn't double-traverse the same anchor; preserve input order
276-
# for deterministic output.
287+
# doesn't double-traverse the same anchor; preserve input
288+
# order for deterministic output.
277289
seen: set[str] = set()
278290
canonical_anchors: list[str] = []
279-
for original, canonical in zip(entity_names, resolved):
291+
for original in entity_names:
292+
canonical = resolved_map.get(original, original)
280293
if canonical not in seen:
281294
seen.add(canonical)
282295
canonical_anchors.append(canonical)

tests/unit_test/graph_curation/test_alias_map.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,3 +195,130 @@ async def test_list_aliases_pointing_at_returns_alphabetical(session):
195195

196196
pointing_at_C = await repo.list_aliases_pointing_at(collection_id="c1", canonical_name="C")
197197
assert pointing_at_C == ["Alpha", "Mu", "Zeta"]
198+
199+
200+
# ---------------------------------------------------------------------
201+
# task #61 P2-S1+S2 — batch resolve_canonical_many
202+
# ---------------------------------------------------------------------
203+
204+
205+
@pytest.mark.asyncio
206+
async def test_resolve_canonical_many_returns_self_for_unmapped_names(session):
207+
"""No alias rows → every input name maps to itself (mirrors the
208+
single-name :meth:`resolve_canonical` semantic)."""
209+
repo = AliasMapRepository(session=session)
210+
out = await repo.resolve_canonical_many(collection_id="c1", names=["Apple", "Banana", "Cherry"])
211+
assert out == {"Apple": "Apple", "Banana": "Banana", "Cherry": "Cherry"}
212+
213+
214+
@pytest.mark.asyncio
215+
async def test_resolve_canonical_many_mixed_alias_and_canonical(session):
216+
"""Some inputs are aliases, some are already canonical, some
217+
don't exist at all — each maps to its correct resolution."""
218+
repo = AliasMapRepository(session=session)
219+
await repo.upsert_alias(collection_id="c1", alias_name="Apple", target="Apple Inc.")
220+
await repo.upsert_alias(collection_id="c1", alias_name="MS", target="Microsoft")
221+
222+
out = await repo.resolve_canonical_many(
223+
collection_id="c1",
224+
names=["Apple", "MS", "Apple Inc.", "Banana"], # alias / alias / canonical / unknown
225+
)
226+
assert out == {
227+
"Apple": "Apple Inc.",
228+
"MS": "Microsoft",
229+
"Apple Inc.": "Apple Inc.",
230+
"Banana": "Banana",
231+
}
232+
233+
234+
@pytest.mark.asyncio
235+
async def test_resolve_canonical_many_dedupes_input(session):
236+
"""Duplicate input names result in a single dict entry (Python
237+
dict semantics) but with the same canonical resolution. Pinned to
238+
catch a future refactor that accidentally returns multi-value
239+
output."""
240+
repo = AliasMapRepository(session=session)
241+
await repo.upsert_alias(collection_id="c1", alias_name="Apple", target="Apple Inc.")
242+
out = await repo.resolve_canonical_many(
243+
collection_id="c1",
244+
names=["Apple", "Apple", "Apple Inc.", "Apple"],
245+
)
246+
# ``dict`` collapses duplicates by key; insertion-order is
247+
# preserved (Apple first, Apple Inc. second).
248+
assert out == {"Apple": "Apple Inc.", "Apple Inc.": "Apple Inc."}
249+
250+
251+
@pytest.mark.asyncio
252+
async def test_resolve_canonical_many_empty_input(session):
253+
"""Empty input → empty output (defensive — caller bridges the
254+
edge without an extra ``if names`` branch)."""
255+
repo = AliasMapRepository(session=session)
256+
assert await repo.resolve_canonical_many(collection_id="c1", names=[]) == {}
257+
258+
259+
@pytest.mark.asyncio
260+
async def test_resolve_canonical_many_handles_empty_string(session):
261+
"""Empty / falsy names short-circuit to themselves without an SQL
262+
lookup (mirrors single-name :meth:`resolve_canonical` defensive
263+
branch)."""
264+
repo = AliasMapRepository(session=session)
265+
await repo.upsert_alias(collection_id="c1", alias_name="Real", target="Resolved")
266+
out = await repo.resolve_canonical_many(collection_id="c1", names=["", "Real", ""])
267+
assert out == {"": "", "Real": "Resolved"}
268+
269+
270+
@pytest.mark.asyncio
271+
async def test_resolve_canonical_many_per_collection_isolation(session):
272+
"""Same alias_name in different collections resolves to different
273+
canonical names — task #61 spec § 2.4 P2-S1 cross-collection seed
274+
cap test pinpoints this isolation."""
275+
repo = AliasMapRepository(session=session)
276+
await repo.upsert_alias(collection_id="c1", alias_name="Apple", target="Apple Inc.")
277+
await repo.upsert_alias(collection_id="c2", alias_name="Apple", target="Apple Records")
278+
279+
c1_out = await repo.resolve_canonical_many(collection_id="c1", names=["Apple"])
280+
c2_out = await repo.resolve_canonical_many(collection_id="c2", names=["Apple"])
281+
assert c1_out == {"Apple": "Apple Inc."}
282+
assert c2_out == {"Apple": "Apple Records"}
283+
284+
285+
@pytest.mark.asyncio
286+
async def test_resolve_canonical_many_large_seed_cap(session):
287+
"""Pinned to catch a future regression that re-introduces per-name
288+
DB roundtrips: even at the ``/graphs?max_nodes=1000`` worst case
289+
(2 × max_nodes = 2000 seeds, per spec § 2.4 P2-S1 quantification),
290+
the batch API must complete with a single SQL roundtrip.
291+
292+
We can't directly assert "1 SQL roundtrip" in a pure-unit test
293+
against in-memory SQLite, but we can pin the *result correctness*
294+
at the spec-quantified seed cardinality so a future refactor that
295+
silently re-fans-out would either time out (in-memory SQLite is
296+
fast enough that 2000 SELECT roundtrips is ~10ms — observable
297+
only via a perf timeout) or break correctness.
298+
299+
The companion :func:`test_expand_neighbors_uses_batch_alias_resolution`
300+
in ``test_alias_redirect_store.py`` pins the call-graph: the
301+
``expand_neighbors_n_hops`` site MUST go through
302+
``resolve_canonical_many`` exactly once (not N
303+
``resolve_canonical`` calls).
304+
"""
305+
repo = AliasMapRepository(session=session)
306+
# Seed 50 aliases (cheap on sqlite); query 2000 names where the
307+
# first 50 are mapped + the remaining 1950 are unmapped (resolve
308+
# to themselves).
309+
for i in range(50):
310+
await repo.upsert_alias(
311+
collection_id="c1",
312+
alias_name=f"alias_{i}",
313+
target=f"canonical_{i}",
314+
)
315+
316+
names = [f"alias_{i}" for i in range(50)] + [f"unmapped_{i}" for i in range(1950)]
317+
out = await repo.resolve_canonical_many(collection_id="c1", names=names)
318+
319+
# Spot-check shape + a few representative rows.
320+
assert len(out) == 2000
321+
assert out["alias_0"] == "canonical_0"
322+
assert out["alias_49"] == "canonical_49"
323+
assert out["unmapped_0"] == "unmapped_0"
324+
assert out["unmapped_1949"] == "unmapped_1949"

0 commit comments

Comments
 (0)