Skip to content

Commit 9d4b4c7

Browse files
DvirDukhanCopilot
andcommitted
perf(mcp): cache search_code corpus per (graph, project)
search_code's ranker (_hybrid_components) ran three full-graph Cypher reads and rebuilt the BM25 corpus in Python on every call. Measured latency scales ~linearly with graph size (77ms @2.3K nodes, 320ms @12K, ~1s extrapolated to a 41K-symbol repo), so on large repos that read + rebuild dominated harness wall-time for the agent's most-called tool. Split the ranker into a query-independent corpus (_build_corpus: the three reads + tokenization + centrality) and a cheap per-query overlay (_overlay: name_exact, representative symbol, path overlap, BM25, min-max). Cache the corpus per (graph_name, project); a warm call drops from ~300ms to ~23ms on the 12K graph (~13x), with byte-identical results. Invalidation is two-layered: - explicit reset_corpus_cache() from index_repo after a (re)index (covers the in-process writer); - a ~1ms (node_count, edge_count, commit) signature probe on every hit (FalkorDB count() is O(1) metadata; commit is a single Redis HGET), covering cross-process mutation such as the web API's /api/switch_commit which rewrites the Redis commit marker even when counts are preserved. A corpus whose graph mutates mid-build, or that races an explicit reset, is detected (pre/post signature + per-graph generation check) and served without being cached, so a torn snapshot never persists. Concurrent cold builds are collapsed to one via a per-key asyncio.Lock; the cache is LRU bounded by CODE_GRAPH_SEARCH_CACHE_MAX (default 8). Adds tests/mcp/test_search_cache.py covering warm reuse, query-independent sharing, signature invalidation, explicit reset, reset-all, concurrent build-once, and the torn-snapshot guard. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 9e05b4b commit 9d4b4c7

2 files changed

Lines changed: 499 additions & 27 deletions

File tree

api/mcp/tools/structural.py

Lines changed: 237 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
import math
2323
import os
2424
import re
25-
from collections import Counter, defaultdict
25+
from collections import Counter, OrderedDict, defaultdict
26+
from dataclasses import dataclass, field
2627
from pathlib import Path
2728
from typing import Any, Optional
2829

@@ -165,26 +166,192 @@ async def _hybrid_rank(g, query: str, project: Optional[str]) -> list[dict[str,
165166

166167

167168
async def _hybrid_components(g, query: str, project: Optional[str]):
168-
"""Fetch graph data and build per-file, weight-independent components.
169+
"""Build the per-file, weight-independent components for ``query``.
169170
170171
Returns ``(files, comps, rep, abs_of, file_id_of)`` where ``comps[f]`` holds
171172
the min-max-normalized ``name``/``path``/``bm25``/``cent`` scores plus the
172173
``pen`` penalty, and ``file_id_of[f]`` is the File node id (handle the agent
173174
feeds to ``get_file_neighbors``). Separated from weighting so weight sweeps
174175
reuse the exact same normalized inputs as the live ranker.
176+
177+
The expensive, query-INDEPENDENT half (three full-graph reads + tokenization)
178+
is built once by :func:`_build_corpus` and cached per ``(graph, project)``;
179+
only the cheap per-query overlay (:func:`_overlay`) runs on every call. See
180+
the corpus-cache block below for the latency rationale and invalidation.
181+
"""
182+
corpus = await _get_corpus(g, project)
183+
return _overlay(corpus, query)
184+
185+
186+
# ---------------------------------------------------------------------------
187+
# Corpus cache (search_code hot path)
188+
# ---------------------------------------------------------------------------
189+
# ``search_code`` is the agent's most-called tool. Its ranker used to run three
190+
# full-graph Cypher reads (every File, every Function/Class incl. ``n.doc``,
191+
# every cross-file edge) and rebuild the BM25 corpus in Python on EVERY call.
192+
# Measured latency scales ~linearly with graph size (77ms @2.3K nodes, 320ms
193+
# @12K, ~1s extrapolated to a 41K-symbol repo), so on large repos that read +
194+
# rebuild dominated harness wall-time (PR #701 review).
195+
#
196+
# Everything :func:`_build_corpus` produces is query-INDEPENDENT, so we cache it
197+
# per ``(graph_name, project)`` and let :func:`_overlay` apply the cheap
198+
# per-query scoring. Invalidation is two-layered:
199+
# * Explicit: ``reset_corpus_cache(graph_name)`` from ``index_repo`` after a
200+
# (re)index -- covers the in-process writer.
201+
# * Implicit: a ``(node_count, edge_count, commit)`` signature probe on every
202+
# hit (~1ms; FalkorDB ``count()`` is O(1) metadata, the commit marker is a
203+
# single Redis HGET). Covers cross-process mutation -- notably the web API's
204+
# ``/api/switch_commit``, which rewrites the Redis commit marker even when
205+
# node/edge counts are preserved. A rebuild whose graph mutates mid-build,
206+
# or that races an explicit reset, is detected (pre/post sig + generation
207+
# check) and served WITHOUT being cached, so a torn snapshot never persists.
208+
209+
210+
@dataclass
211+
class _Corpus:
212+
"""Cached, query-independent search corpus for one ``(graph, project)``."""
213+
214+
files: list[str]
215+
abs_of: dict[str, str]
216+
file_id_of: dict[str, Any]
217+
pathtok: dict[str, list[str]]
218+
bodytok: dict[str, list[str]]
219+
# name-bearing symbols per file: (name, name_lower, src_start, src_end).
220+
sym_by_file: dict[str, list[tuple[str, str, Any, Any]]]
221+
n_cent: dict[str, float] # min-max centrality (query-independent)
222+
sig: tuple = field(default_factory=tuple)
223+
224+
225+
_CORPUS_CACHE: "OrderedDict[tuple, _Corpus]" = OrderedDict()
226+
_CORPUS_LOCKS: dict[tuple, asyncio.Lock] = {}
227+
_CORPUS_GEN: dict[str, int] = defaultdict(int)
228+
_CORPUS_META_LOCK = asyncio.Lock() # guards lazy per-key lock creation
229+
_REDIS_MARKER_CLIENT: Any = None # lazily created, reused (connection pool)
230+
231+
232+
def _corpus_cache_max() -> int:
233+
"""Max number of corpora to retain (LRU). Each holds a full token corpus."""
234+
try:
235+
return max(1, int(os.getenv("CODE_GRAPH_SEARCH_CACHE_MAX", "8")))
236+
except ValueError:
237+
return 8
238+
239+
240+
def _commit_marker(project: Optional[str], branch: Optional[str]) -> Optional[str]:
241+
"""Best-effort, quiet read of the recorded commit for ``(project, branch)``.
242+
243+
Folded into the corpus signature so a cross-process ``switch_commit`` /
244+
reindex (which rewrites the Redis commit marker) invalidates the cache even
245+
when node/edge counts happen to be preserved. Any failure -> ``None`` (the
246+
signature degrades to counts-only and never raises into ``search_code``).
247+
Reads Redis directly rather than via ``get_repo_commit`` so a missing marker
248+
(non-git folder) doesn't log a warning on every probe.
249+
"""
250+
global _REDIS_MARKER_CLIENT
251+
if not project:
252+
return None
253+
try:
254+
from api.info import _repo_info_key, get_redis_connection
255+
256+
if _REDIS_MARKER_CLIENT is None:
257+
_REDIS_MARKER_CLIENT = get_redis_connection()
258+
return _REDIS_MARKER_CLIENT.hget(_repo_info_key(project, branch), "commit")
259+
except Exception:
260+
return None
261+
262+
263+
async def _graph_sig(g, project: Optional[str]) -> tuple:
264+
"""Cheap (~1ms) cache-validity signature for the graph behind ``g``.
265+
266+
On any probe failure returns a never-equal sentinel so the caller rebuilds
267+
(degrading to the always-fresh status quo) rather than serving a stale hit.
268+
"""
269+
try:
270+
n = (await g._query("MATCH (n) RETURN count(n)")).result_set[0][0]
271+
e = (await g._query("MATCH ()-[r]->() RETURN count(r)")).result_set[0][0]
272+
except Exception:
273+
return (None, None, object())
274+
commit = _commit_marker(project, getattr(g, "branch", None))
275+
return (int(n or 0), int(e or 0), commit)
276+
277+
278+
async def _corpus_lock(key: tuple) -> asyncio.Lock:
279+
async with _CORPUS_META_LOCK:
280+
lock = _CORPUS_LOCKS.get(key)
281+
if lock is None:
282+
lock = asyncio.Lock()
283+
_CORPUS_LOCKS[key] = lock
284+
return lock
285+
286+
287+
def reset_corpus_cache(graph_name: Optional[str] = None) -> None:
288+
"""Invalidate cached search corpora.
289+
290+
Called by ``index_repo`` after a (re)index so the next ``search_code``
291+
rebuilds. ``graph_name`` ``None`` clears everything; otherwise clears
292+
entries for that graph identity. Bumps a per-graph generation counter so a
293+
rebuild that started before the reset won't store its now-stale corpus.
294+
"""
295+
if graph_name is None:
296+
_CORPUS_CACHE.clear()
297+
for gname in list(_CORPUS_GEN.keys()):
298+
_CORPUS_GEN[gname] += 1
299+
return
300+
for key in [k for k in _CORPUS_CACHE if k[0] == graph_name]:
301+
del _CORPUS_CACHE[key]
302+
_CORPUS_GEN[graph_name] += 1
303+
304+
305+
async def _get_corpus(g, project: Optional[str]) -> _Corpus:
306+
"""Return a cached corpus for ``(g, project)`` or build (and maybe cache) one."""
307+
key = (g.name, project)
308+
sig = await _graph_sig(g, project)
309+
ent = _CORPUS_CACHE.get(key)
310+
if ent is not None and ent.sig == sig:
311+
_CORPUS_CACHE.move_to_end(key)
312+
return ent
313+
314+
lock = await _corpus_lock(key)
315+
async with lock:
316+
# Re-check: another coroutine may have rebuilt while we waited.
317+
sig_before = await _graph_sig(g, project)
318+
ent = _CORPUS_CACHE.get(key)
319+
if ent is not None and ent.sig == sig_before:
320+
_CORPUS_CACHE.move_to_end(key)
321+
return ent
322+
323+
gen_before = _CORPUS_GEN[g.name]
324+
corpus = await _build_corpus(g, project)
325+
sig_after = await _graph_sig(g, project)
326+
corpus.sig = sig_after
327+
328+
# Only cache a corpus that is internally consistent (the graph did not
329+
# change across the reads) and was not invalidated by a reset mid-build.
330+
stable = sig_after == sig_before and _CORPUS_GEN[g.name] == gen_before
331+
if stable:
332+
_CORPUS_CACHE[key] = corpus
333+
_CORPUS_CACHE.move_to_end(key)
334+
while len(_CORPUS_CACHE) > _corpus_cache_max():
335+
_CORPUS_CACHE.popitem(last=False)
336+
return corpus
337+
338+
339+
async def _build_corpus(g, project: Optional[str]) -> _Corpus:
340+
"""Run the three graph reads + tokenization (the query-independent half).
341+
342+
Mirrors the original ``_hybrid_components`` read/build logic exactly, minus
343+
the query-dependent ``name_exact``/``rep`` selection (moved to
344+
:func:`_overlay`). ``bodytok`` (path + symbol-name subtokens + capped doc
345+
tokens) and ``centrality`` are query-independent and built identically here.
175346
"""
176347
def rel(p: Optional[str]) -> str:
177348
return _relativize(p, project) if p else ""
178349

179-
qtok = set(_tokenize(query))
180-
qids = _issue_identifiers(query)
181-
182350
pathtok: dict[str, list[str]] = {}
183351
bodytok: dict[str, list[str]] = defaultdict(list)
184352
abs_of: dict[str, str] = {}
185353
file_id_of: dict[str, Any] = {}
186-
name_exact: dict[str, float] = defaultdict(float)
187-
rep: dict[str, dict[str, Any]] = {}
354+
sym_by_file: dict[str, list[tuple[str, str, Any, Any]]] = defaultdict(list)
188355

189356
files_res = await g._query("MATCH (f:File) RETURN f.path, ID(f)")
190357
for row in files_res.result_set:
@@ -214,19 +381,9 @@ def rel(p: Optional[str]) -> str:
214381
continue
215382
if name:
216383
bodytok[rp].extend(_subtokens(name))
217-
is_exact = name.lower() in qids
218-
if is_exact:
219-
name_exact[rp] += 1.0
220-
# Representative symbol for the file's snippet: prefer one whose name
221-
# exactly matches a query identifier, otherwise the lowest-``src_start``
222-
# symbol. Fully deterministic regardless of result-set order via a
223-
# stable sort key (exact first, then src_start, then name, then
224-
# src_end) so ties / missing ``src_start`` never depend on row order.
225-
cand = {"name": name, "src_start": start, "src_end": end,
226-
"exact": is_exact}
227-
cur = rep.get(rp)
228-
if cur is None or _rep_key(cand) < _rep_key(cur):
229-
rep[rp] = cand
384+
# Record name-bearing symbols so _overlay can recompute name_exact
385+
# and the representative symbol against the (query-dependent) ids.
386+
sym_by_file[rp].append((name, name.lower(), start, end))
230387
if doc and body_used[rp] < _HYBRID_BODY_TOKEN_CAP:
231388
toks = _tokenize(doc)[: _HYBRID_BODY_TOKEN_CAP - body_used[rp]]
232389
bodytok[rp].extend(toks)
@@ -242,15 +399,63 @@ def rel(p: Optional[str]) -> str:
242399
centrality[rel(bpath)] += math.log1p(int(deg or 0))
243400

244401
files = sorted(abs_of)
402+
n_cent = (
403+
_minmax({f: centrality.get(f, 0.0) for f in files}) if files else {}
404+
)
405+
406+
return _Corpus(
407+
files=files,
408+
abs_of=dict(abs_of),
409+
file_id_of=dict(file_id_of),
410+
pathtok=dict(pathtok),
411+
bodytok=dict(bodytok),
412+
sym_by_file=dict(sym_by_file),
413+
n_cent=n_cent,
414+
)
415+
416+
417+
def _overlay(corpus: _Corpus, query: str):
418+
"""Apply the cheap, query-dependent scoring over a cached corpus.
419+
420+
Reproduces the original ``_hybrid_components`` output exactly: name-exact
421+
counts and the representative symbol come from ``corpus.sym_by_file`` (the
422+
``_rep_key`` ordering is row-order independent), BM25 and path overlap run
423+
over the cached corpus, and the ``n_name`` normalization preserves the
424+
original ``name_exact if name_exact else zeros`` quirk.
425+
"""
426+
files = corpus.files
245427
if not files:
246428
return [], {}, {}, {}, {}
247429

248-
path_overlap = {f: float(len(qtok & set(pathtok.get(f, [])))) for f in files}
249-
raw_bm25 = _bm25(qtok, files, bodytok)
430+
qtok = set(_tokenize(query))
431+
qids = _issue_identifiers(query)
432+
433+
name_exact: dict[str, float] = defaultdict(float)
434+
rep: dict[str, dict[str, Any]] = {}
435+
for f, syms in corpus.sym_by_file.items():
436+
best: Optional[dict[str, Any]] = None
437+
cnt = 0.0
438+
for (name, name_lower, start, end) in syms:
439+
is_exact = name_lower in qids
440+
if is_exact:
441+
cnt += 1.0
442+
cand = {"name": name, "src_start": start, "src_end": end,
443+
"exact": is_exact}
444+
if best is None or _rep_key(cand) < _rep_key(best):
445+
best = cand
446+
if cnt:
447+
name_exact[f] = cnt
448+
if best is not None:
449+
rep[f] = best
450+
451+
path_overlap = {
452+
f: float(len(qtok & set(corpus.pathtok.get(f, [])))) for f in files
453+
}
454+
raw_bm25 = _bm25(qtok, files, corpus.bodytok)
250455
n_name = _minmax(name_exact if name_exact else {f: 0.0 for f in files})
251456
n_path = _minmax(path_overlap)
252457
n_bm25 = _minmax(raw_bm25)
253-
n_cent = _minmax({f: centrality.get(f, 0.0) for f in files})
458+
n_cent = corpus.n_cent
254459

255460
comps: dict[str, dict[str, float]] = {}
256461
for f in files:
@@ -261,15 +466,15 @@ def rel(p: Optional[str]) -> str:
261466
"cent": n_cent.get(f, 0.0),
262467
"pen": _HYBRID_W_PEN if _PENALTY_RE.search(f) else 0.0,
263468
# Raw (un-normalized) query-dependent signal. A file with zero
264-
# lexical overlap (name/path/body) is not relevant to the query
265-
# only query-independent centrality could rank it so search_code
469+
# lexical overlap (name/path/body) is not relevant to the query --
470+
# only query-independent centrality could rank it -- so search_code
266471
# drops it rather than returning noise for an unmatched query.
267472
"lex": (name_exact.get(f, 0.0)
268473
+ path_overlap.get(f, 0.0)
269474
+ raw_bm25.get(f, 0.0)),
270475
}
271476

272-
return files, comps, rep, abs_of, file_id_of
477+
return files, comps, rep, corpus.abs_of, corpus.file_id_of
273478

274479

275480
def _hybrid_score(
@@ -488,7 +693,12 @@ def _payload(project) -> dict[str, Any]:
488693
"mode": "full",
489694
}
490695

491-
return await loop.run_in_executor(None, _do_index)
696+
result = await loop.run_in_executor(None, _do_index)
697+
# A (re)index invalidates any cached search corpus for this graph so the
698+
# next search_code rebuilds against the new contents (covers the in-process
699+
# writer; cross-process mutation is caught by the signature probe).
700+
reset_corpus_cache(result.get("graph_name"))
701+
return result
492702

493703

494704
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)