Skip to content

Commit d39b0cb

Browse files
committed
fix: MCP recall daemon performance — wire fast param + choose_pool (PR #22)
- Thread 'fast' recall parameter through the full MCP→daemon→worker stack (recall_worker, worker_pool, daemon_proxy, pool_adapter, tools_core, tools_v3). Engine.recall(fast=True) shipped in v3.4.40 but was unreachable from MCP tools until now. - Wire 'session_id' through pool_recall kwargs (was silently dropped). - Replace session_init AutoRecall double-call with single pool_recall (fast=True) — removes one redundant recall round-trip per session. - Switch tools_core.recall and tools_v3.recall_trace from hardcoded WorkerPool.shared() to choose_pool() — enables daemon-first routing (single shared ONNX process instead of N local subprocesses). - Make FTS triggers idempotent with IF NOT EXISTS (race safety on repeated schema init). - Clean up dead code: remove unused engine= calls in observe and close_session; remove unused imports. Author: VikingOwl91
1 parent 058e61c commit d39b0cb

14 files changed

Lines changed: 266 additions & 292 deletions

src/superlocalmemory/core/recall_worker.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,12 @@ def _get_engine():
5959
return _engine
6060

6161

62-
def _handle_recall(query: str, limit: int, session_id: str = "") -> dict:
62+
def _handle_recall(
63+
query: str, limit: int, session_id: str = "", fast: bool = False,
64+
) -> dict:
6365
engine = _get_engine()
6466
response = engine.recall(
65-
query, limit=limit, session_id=session_id or None,
67+
query, limit=limit, session_id=session_id or None, fast=bool(fast),
6668
)
6769

6870
# Batch-fetch original memory text for all results
@@ -290,7 +292,7 @@ def _worker_main() -> None:
290292
if cmd == "recall":
291293
result = _handle_recall(
292294
req.get("query", ""), req.get("limit", 10),
293-
req.get("session_id", ""),
295+
req.get("session_id", ""), bool(req.get("fast", False)),
294296
)
295297
_respond(result)
296298
elif cmd == "store":

src/superlocalmemory/core/worker_pool.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ def shared(cls) -> WorkerPool:
6767

6868
def recall(
6969
self, query: str, limit: int = 10, session_id: str = "",
70+
fast: bool = False,
7071
) -> dict:
7172
"""Run recall in worker subprocess. Returns result dict.
7273
@@ -77,7 +78,7 @@ def recall(
7778
"""
7879
return self._send({
7980
"cmd": "recall", "query": query, "limit": limit,
80-
"session_id": session_id or "",
81+
"session_id": session_id or "", "fast": bool(fast),
8182
})
8283

8384
def store(self, content: str, metadata: dict | None = None) -> dict:

src/superlocalmemory/mcp/_daemon_proxy.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,14 @@ def _url(self, path: str) -> str:
4545

4646
def recall(
4747
self, query: str, limit: int = 10, session_id: str = "",
48+
fast: bool = False,
4849
) -> dict[str, Any]:
49-
params = urllib.parse.urlencode(
50-
{"q": query, "limit": limit, "session_id": session_id or ""}
51-
)
50+
params = urllib.parse.urlencode({
51+
"q": query,
52+
"limit": limit,
53+
"session_id": session_id or "",
54+
"fast": "true" if fast else "false",
55+
})
5256
try:
5357
with urllib.request.urlopen(
5458
self._url(f"/recall?{params}"), timeout=self._timeout,

src/superlocalmemory/mcp/_pool_adapter.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,17 @@ def _unwrap_error(raw: Any, op: str) -> None:
7474
raise PoolError(f"pool.{op} failed: {reason}")
7575

7676

77-
def pool_recall(query: str, limit: int = 10, **_: Any) -> PoolRecallResponse:
77+
def pool_recall(query: str, limit: int = 10, **kwargs: Any) -> PoolRecallResponse:
7878
"""Call pool.recall and reshape its dict into a typed response.
7979
8080
Raises :class:`PoolError` on worker death or any non-ok envelope.
8181
"""
82-
raw = _pool().recall(query=query, limit=limit)
82+
raw = _pool().recall(
83+
query=query,
84+
limit=limit,
85+
session_id=str(kwargs.get("session_id") or ""),
86+
fast=bool(kwargs.get("fast", False)),
87+
)
8388
_unwrap_error(raw, "recall")
8489
items = raw.get("results", []) if isinstance(raw, dict) else []
8590
results = [

src/superlocalmemory/mcp/tools_active.py

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ async def session_init(
9393
The AI should call this automatically before any other work.
9494
"""
9595
try:
96-
from superlocalmemory.hooks.auto_recall import AutoRecall
9796
from superlocalmemory.hooks.rules_engine import RulesEngine
9897
from superlocalmemory.mcp._pool_adapter import pool_recall
9998

@@ -104,21 +103,37 @@ async def session_init(
104103
return {"success": True, "context": "", "memories": [], "message": "Auto-recall disabled"}
105104

106105
recall_config = rules.get_recall_config()
107-
auto = AutoRecall(
108-
recall_fn=pool_recall,
109-
config={
110-
"enabled": True,
111-
"max_memories_injected": max_results,
112-
"relevance_threshold": recall_config.get("relevance_threshold", 0.3),
113-
},
114-
)
115-
116-
# Get formatted context for system prompt injection
117-
context = auto.get_session_context(project_path=project_path, query=query)
118-
119-
# Get structured results for tool response
120-
search_query = query or f"project context {project_path}" if project_path else "recent important decisions"
121-
memories = auto.get_query_context(search_query)
106+
relevance_threshold = recall_config.get("relevance_threshold", 0.3)
107+
if query:
108+
search_query = query
109+
elif project_path:
110+
search_query = f"project context {project_path}"
111+
else:
112+
search_query = "recent important decisions"
113+
114+
response = pool_recall(search_query, limit=max_results, fast=True)
115+
relevant = [
116+
r for r in response.results
117+
if r.score >= relevance_threshold
118+
]
119+
120+
# Build both return shapes from one recall. Calling recall twice
121+
# doubles session startup latency and can return duplicate snippets.
122+
context = ""
123+
if relevant:
124+
lines = ["# Relevant Memory Context", ""]
125+
for r in relevant[:max_results]:
126+
lines.append(f"- {r.fact.content[:200]}")
127+
context = "\n".join(lines)
128+
129+
memories = [
130+
{
131+
"fact_id": r.fact.fact_id,
132+
"content": r.fact.content[:300],
133+
"score": round(r.score, 3),
134+
}
135+
for r in relevant[:max_results]
136+
]
122137

123138
# Get learning status
124139
pid = engine.profile_id
@@ -184,7 +199,6 @@ async def observe(
184199
from superlocalmemory.hooks.rules_engine import RulesEngine
185200
from superlocalmemory.mcp._pool_adapter import pool_store
186201

187-
engine = get_engine()
188202
rules = RulesEngine()
189203

190204
auto = AutoCapture(
@@ -305,7 +319,6 @@ async def close_session(session_id: str = "") -> dict:
305319
"""
306320
try:
307321
engine = get_engine()
308-
pid = engine.profile_id
309322
sid = session_id or getattr(engine, '_last_session_id', '')
310323
if not sid:
311324
return {"success": False, "error": "No session_id provided"}

src/superlocalmemory/mcp/tools_core.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,9 @@
1313

1414
from __future__ import annotations
1515

16-
import json
1716
import logging
1817
from pathlib import Path
19-
from typing import Any, Callable
18+
from typing import Callable
2019

2120
from mcp.types import ToolAnnotations
2221

@@ -111,7 +110,6 @@ async def remember(
111110
Extracts atomic facts, resolves entities, builds graph edges,
112111
and indexes for 4-channel retrieval.
113112
"""
114-
import asyncio
115113
try:
116114
# v3.4.32: Store-first pattern. Write to pending.db and return
117115
# immediately. The daemon's pending-materializer thread drains
@@ -141,7 +139,7 @@ async def remember(
141139
@server.tool(annotations=ToolAnnotations(readOnlyHint=True))
142140
async def recall(
143141
query: str, limit: int = 10, agent_id: str = "mcp_client",
144-
session_id: str = "",
142+
session_id: str = "", fast: bool = False,
145143
) -> dict:
146144
"""Search memories by semantic query with 4-channel retrieval, RRF fusion, and reranking.
147145
@@ -153,8 +151,8 @@ async def recall(
153151
"""
154152
import asyncio
155153
try:
156-
from superlocalmemory.core.worker_pool import WorkerPool
157-
pool = WorkerPool.shared()
154+
from superlocalmemory.mcp._daemon_proxy import choose_pool
155+
pool = choose_pool()
158156
# S9-DASH-10: priority for session_id, so engagement
159157
# signals land on the right pending_outcome:
160158
# 1. Explicit ``session_id`` tool-call argument.
@@ -199,6 +197,7 @@ async def recall(
199197
# block behind a single threading.Lock. See worker_pool.py.
200198
result = await asyncio.to_thread(
201199
pool.recall, query, limit=limit, session_id=effective_sid,
200+
fast=bool(fast),
202201
)
203202
if result.get("ok"):
204203
# Record implicit feedback: every returned result is a recall_hit

src/superlocalmemory/mcp/tools_v3.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -285,8 +285,8 @@ async def recall_trace(query: str, limit: int = 10) -> dict:
285285
limit: Maximum results (default 10).
286286
"""
287287
try:
288-
from superlocalmemory.core.worker_pool import WorkerPool
289-
raw = WorkerPool.shared().recall(query=query, limit=limit)
288+
from superlocalmemory.mcp._daemon_proxy import choose_pool
289+
raw = choose_pool().recall(query=query, limit=limit)
290290
items = raw.get("results", []) if isinstance(raw, dict) else []
291291
results = []
292292
for item in items[:limit]:

src/superlocalmemory/storage/schema.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,23 +252,23 @@ def _set_pragmas(conn: sqlite3.Connection) -> None:
252252
-- left by V2 migration.
253253
254254
-- INSERT trigger
255-
CREATE TRIGGER atomic_facts_fts_insert
255+
CREATE TRIGGER IF NOT EXISTS atomic_facts_fts_insert
256256
AFTER INSERT ON atomic_facts
257257
BEGIN
258258
INSERT INTO atomic_facts_fts (rowid, fact_id, content)
259259
VALUES (NEW.rowid, NEW.fact_id, NEW.content);
260260
END;
261261
262262
-- DELETE trigger
263-
CREATE TRIGGER atomic_facts_fts_delete
263+
CREATE TRIGGER IF NOT EXISTS atomic_facts_fts_delete
264264
AFTER DELETE ON atomic_facts
265265
BEGIN
266266
INSERT INTO atomic_facts_fts (atomic_facts_fts, rowid, fact_id, content)
267267
VALUES ('delete', OLD.rowid, OLD.fact_id, OLD.content);
268268
END;
269269
270270
-- UPDATE trigger
271-
CREATE TRIGGER atomic_facts_fts_update
271+
CREATE TRIGGER IF NOT EXISTS atomic_facts_fts_update
272272
AFTER UPDATE OF content ON atomic_facts
273273
BEGIN
274274
INSERT INTO atomic_facts_fts (atomic_facts_fts, rowid, fact_id, content)
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Copyright (c) 2026 Varun Pratap Bhardwaj / Qualixar
2+
# Licensed under AGPL-3.0-or-later - see LICENSE file
3+
# Part of SuperLocalMemory V3 | https://qualixar.com | https://varunpratap.com
4+
5+
"""WorkerPool recall request shaping."""
6+
7+
from __future__ import annotations
8+
9+
10+
def test_worker_pool_recall_forwards_fast_flag(monkeypatch):
11+
from superlocalmemory.core.worker_pool import WorkerPool
12+
13+
pool = WorkerPool()
14+
sent = {}
15+
16+
def _fake_send(payload):
17+
sent.update(payload)
18+
return {"ok": True}
19+
20+
monkeypatch.setattr(pool, "_send", _fake_send)
21+
22+
assert pool.recall("q", limit=3, session_id="s-1", fast=True) == {"ok": True}
23+
assert sent == {
24+
"cmd": "recall",
25+
"query": "q",
26+
"limit": 3,
27+
"session_id": "s-1",
28+
"fast": True,
29+
}

tests/test_mcp/test_mcp_daemon_proxy.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@
88
from __future__ import annotations
99

1010
import json
11-
from types import SimpleNamespace
1211

1312
import pytest
1413

15-
from superlocalmemory.mcp._daemon_proxy import DaemonPoolProxy, choose_pool
14+
from superlocalmemory.mcp._daemon_proxy import DaemonPoolProxy
1615
from superlocalmemory.mcp._pool_adapter import (
1716
PoolError, pool_recall, pool_store,
1817
)
@@ -21,7 +20,7 @@
2120
class TestPoolErrorSurfacing:
2221
def test_pool_recall_raises_on_ok_false(self, monkeypatch):
2322
class _Dead:
24-
def recall(self, query, limit=10, session_id=""):
23+
def recall(self, query, limit=10, session_id="", fast=False):
2524
return {"ok": False, "error": "worker died"}
2625
from superlocalmemory.mcp import _pool_adapter
2726
monkeypatch.setattr(_pool_adapter, "_pool", lambda: _Dead())
@@ -41,7 +40,7 @@ def store(self, content, metadata=None):
4140

4241
def test_pool_recall_success_does_not_raise(self, monkeypatch):
4342
class _Ok:
44-
def recall(self, query, limit=10, session_id=""):
43+
def recall(self, query, limit=10, session_id="", fast=False):
4544
return {"ok": True, "results": [], "query_type": "x"}
4645
from superlocalmemory.mcp import _pool_adapter
4746
monkeypatch.setattr(_pool_adapter, "_pool", lambda: _Ok())
@@ -73,6 +72,23 @@ def _fake_urlopen(req, timeout=30):
7372
assert "limit=3" in captured["url"]
7473
assert "session_id=s-1" in captured["url"]
7574

75+
def test_recall_forwards_fast_flag(self, monkeypatch):
76+
captured = {}
77+
78+
def _fake_urlopen(req, timeout=30):
79+
captured["url"] = getattr(req, "full_url", req)
80+
return _FakeResp(json.dumps({
81+
"ok": True, "results": [], "query_type": "semantic",
82+
}).encode())
83+
84+
import superlocalmemory.mcp._daemon_proxy as mod
85+
monkeypatch.setattr(mod.urllib.request, "urlopen", _fake_urlopen)
86+
87+
proxy = DaemonPoolProxy(port=9999)
88+
out = proxy.recall("fast path", fast=True)
89+
assert out["ok"] is True
90+
assert "fast=true" in captured["url"]
91+
7692
def test_store_forwards_http_post(self, monkeypatch):
7793
captured = {}
7894

0 commit comments

Comments
 (0)