Skip to content

Commit a454bf8

Browse files
Skiipy11claude
andcommitted
feat: multi-path retrieval with RRF fusion (v2.3)
Search now runs 3 retrieval paths in parallel and merges with Reciprocal Rank Fusion — dramatically improving recall for exact names, technical terms, and entity-connected memories. New retrieval paths: - BM25 keyword search (Postgres tsvector/GIN or SQLite FTS5) - BFS spreading activation through entity relationship graph - Results merged with RRF: score(d) = sum(1/(k+rank)) across paths New files: - api/src/services/rrf.js — RRF fusion algorithm (13 unit tests) - api/src/services/keyword-search.js — BM25/FTS keyword search service - api/src/services/graph-search.js — BFS entity graph retrieval - api/scripts/backfill-keyword-index.js — migration for existing memories - api/tests/rrf.test.js — comprehensive RRF test suite Modified: - GET /memory/search runs all 3 paths via Promise.all, fuses with RRF - POST /memory indexes content for keyword search on write - DELETE and supersede paths deactivate keyword index entries - Postgres: memory_search table + tsvector trigger + co-occurrence indexes - SQLite: FTS5 virtual table for keyword search fallback - Qdrant: getPoints() batch retrieval for RRF payload hydration - Stats endpoint shows retrieval path status and keyword index count - MCP brain_search description updated, format=full shows retrieval_sources Feature-flagged: MULTI_PATH_SEARCH=true (default on) Configurable: RRF_K, GRAPH_SEARCH_MAX_DEPTH, GRAPH_SEARCH_DECAY Inspired by vectorize-io/hindsight's 4-way parallel search architecture. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 87af269 commit a454bf8

15 files changed

Lines changed: 873 additions & 23 deletions

File tree

.env.example

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,15 @@ DECAY_FACTOR=0.98
7474
# WEBHOOK_URLS=https://n8n.example.com/webhook/brain # Comma-separated webhook URLs for memory events
7575
# WEBHOOK_EVENTS=store,supersede,delete # Which events trigger notifications (default: all)
7676

77+
# --- Multi-Path Retrieval ---
78+
# Enables parallel vector + keyword (BM25) + graph (entity BFS) search, merged with Reciprocal Rank Fusion.
79+
# Requires: structured store (postgres recommended for full BM25, sqlite for FTS5 fallback)
80+
# MULTI_PATH_SEARCH=true # Enable/disable multi-path retrieval (default: true)
81+
# RRF_K=60 # RRF smoothing constant. Range 50-100. Higher = more equal weighting (default: 60)
82+
# GRAPH_SEARCH_MAX_DEPTH=2 # Max BFS hops through entity graph (default: 2)
83+
# GRAPH_SEARCH_DECAY=0.8 # Activation decay per hop (default: 0.8)
84+
# GRAPH_SEARCH_CAUSAL_BOOST=2.0 # Boost for typed entity relationships (uses, works_on, etc.) vs co_occurrence (default: 2.0)
85+
7786
# --- MCP Server Timeouts ---
7887
# BRAIN_MCP_TIMEOUT=15000 # Default timeout for MCP→API calls in ms (default: 15000)
7988
# BRAIN_MCP_CONSOLIDATION_TIMEOUT=120000 # Timeout for sync consolidation calls in ms (default: 120000)

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@
3030

3131
Born from a production setup where [OpenClaw](https://github.com/openclaw/openclaw) agents, Claude Code, and n8n workflows needed to share memory across separate machines. Nothing existed that did this well, so we built it.
3232

33-
### What's New in v2.2
33+
### What's New in v2.3
34+
35+
- **Multi-Path Retrieval with RRF Fusion** — Search now runs three retrieval paths in parallel: vector (semantic similarity), keyword (BM25 full-text via Postgres tsvector or SQLite FTS5), and graph (BFS spreading activation through entity relationships). Results are merged using Reciprocal Rank Fusion (RRF). Exact names and technical terms now surface reliably even when embeddings miss them. Entity relationships that were previously stored but unused are now a first-class retrieval signal. Feature-flagged via `MULTI_PATH_SEARCH=true` (default on). Use `format=full` in `brain_search` to see which paths contributed to each result. Includes a backfill script (`scripts/backfill-keyword-index.js`) for existing memories.
36+
37+
### What Was New in v2.2
3438

3539
- **Noise-Free Entity Extraction** — v2.2 filters out CSS properties, code identifiers, shell commands, sentence fragments, French prose, and generic phrases. Pattern-based filtering with 50+ generic noun/adjective blocklists. Includes a retroactive cleanup script (`scripts/cleanup-garbage-entities.js`) to purge existing noise.
3640
- **Per-Client Knowledge Base** — Fingerprint-based client identification with accent normalization. One tool call (`brain_client`) returns everything known about a client: brand, strategy, meetings, content, technical details, relationships. Fuzzy name resolution ("JL" resolves to "jetloans").
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
#!/usr/bin/env node
2+
/**
3+
* Backfill keyword search index from existing Qdrant memories.
4+
* Scrolls all active points and inserts their text into the memory_search table
5+
* for BM25/full-text keyword retrieval.
6+
*
7+
* Usage: node api/scripts/backfill-keyword-index.js
8+
* Requires: .env configured with QDRANT_URL, STRUCTURED_STORE (postgres|sqlite)
9+
*/
10+
11+
try { await import('dotenv/config'); } catch (e) { /* dotenv not needed in Docker */ }
12+
import { initQdrant, scrollPoints } from '../src/services/qdrant.js';
13+
import { initStore, _getStoreInstance, getBackendType } from '../src/services/stores/interface.js';
14+
import { initKeywordSearch, indexMemory, isKeywordSearchAvailable } from '../src/services/keyword-search.js';
15+
import { initEmbeddings } from '../src/services/embedders/interface.js';
16+
17+
async function backfill() {
18+
console.log('[backfill] Starting keyword index backfill...');
19+
20+
// Initialize services
21+
await initEmbeddings();
22+
await initQdrant();
23+
await initStore();
24+
initKeywordSearch(_getStoreInstance(), getBackendType());
25+
26+
if (!isKeywordSearchAvailable()) {
27+
console.error('[backfill] Keyword search not available. Need postgres or sqlite backend.');
28+
process.exit(1);
29+
}
30+
31+
let processed = 0;
32+
let indexed = 0;
33+
let skipped = 0;
34+
let errors = 0;
35+
let offset = null;
36+
37+
while (true) {
38+
const result = await scrollPoints({ active: true }, 100, offset);
39+
const points = result.points || [];
40+
41+
if (points.length === 0) break;
42+
43+
for (const point of points) {
44+
processed++;
45+
const p = point.payload;
46+
47+
if (!p || !p.text) {
48+
skipped++;
49+
continue;
50+
}
51+
52+
try {
53+
await indexMemory(point.id, p.text, {
54+
client_id: p.client_id || 'global',
55+
source_agent: p.source_agent || null,
56+
type: p.type || null,
57+
});
58+
indexed++;
59+
} catch (e) {
60+
errors++;
61+
if (errors <= 5) {
62+
console.error(`[backfill] Error indexing ${point.id}: ${e.message}`);
63+
}
64+
}
65+
66+
if (processed % 100 === 0) {
67+
console.log(`[backfill] Progress: ${processed} processed, ${indexed} indexed, ${skipped} skipped, ${errors} errors`);
68+
}
69+
}
70+
71+
offset = result.next_page_offset;
72+
if (!offset) break;
73+
}
74+
75+
console.log(`[backfill] Complete: ${processed} processed, ${indexed} indexed, ${skipped} skipped, ${errors} errors`);
76+
process.exit(0);
77+
}
78+
79+
backfill().catch(err => {
80+
console.error('[backfill] Fatal error:', err);
81+
process.exit(1);
82+
});

api/src/index.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import { exportRouter } from './routes/export.js';
1313
import { graphRouter } from './routes/graph.js';
1414
import { initQdrant, ensureEntityIndex } from './services/qdrant.js';
1515
import { initEmbeddings } from './services/embedders/interface.js';
16-
import { initStore, isEntityStoreAvailable, loadAllAliases } from './services/stores/interface.js';
16+
import { initStore, isEntityStoreAvailable, loadAllAliases, _getStoreInstance, getBackendType } from './services/stores/interface.js';
17+
import { initKeywordSearch } from './services/keyword-search.js';
1718
import { initClientResolver } from './services/client-resolver.js';
1819
import { initLLM } from './services/llm/interface.js';
1920
import { runConsolidation } from './services/consolidation.js';
@@ -69,6 +70,9 @@ async function start() {
6970
// Initialize structured storage backend
7071
await initStore();
7172

73+
// Initialize keyword search (BM25 via Postgres tsvector or SQLite FTS5)
74+
initKeywordSearch(_getStoreInstance(), getBackendType());
75+
7276
// Initialize client fingerprint resolver (Baserow → fuzzy matcher)
7377
await initClientResolver();
7478

api/src/routes/memory.js

Lines changed: 130 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import crypto from 'crypto';
33
import { embed } from '../services/embedders/interface.js';
44
import {
55
upsertPoint, searchPoints, updatePointPayload,
6-
findByPayload, computeEffectiveConfidence, getPoint,
6+
findByPayload, computeEffectiveConfidence, getPoint, getPoints,
77
} from '../services/qdrant.js';
88
import {
99
createEvent, upsertFact, upsertStatus, listEvents, listFacts, listStatuses, isStoreAvailable,
@@ -13,6 +13,11 @@ import { scrubCredentials, scrubObject } from '../services/scrub.js';
1313
import { extractEntities, linkExtractedEntities } from '../services/entities.js';
1414
import { validateMemoryInput, MAX_OBSERVED_BY } from '../middleware/validate.js';
1515
import { dispatchNotification } from '../services/notifications.js';
16+
import { isKeywordSearchAvailable, indexMemory, deactivateMemory, keywordSearch } from '../services/keyword-search.js';
17+
import { isGraphSearchAvailable, graphSearch } from '../services/graph-search.js';
18+
import { reciprocalRankFusion } from '../services/rrf.js';
19+
20+
const MULTI_PATH_SEARCH = process.env.MULTI_PATH_SEARCH !== 'false'; // default: true
1621
import { getClientResolver } from '../services/client-resolver.js';
1722

1823
export const memoryRouter = Router();
@@ -127,6 +132,7 @@ memoryRouter.post('/', async (req, res) => {
127132
superseded_by: pointId,
128133
superseded_at: now,
129134
});
135+
deactivateMemory(matches[0].id).catch(() => {});
130136
dispatchNotification('memory_superseded', { id: matches[0].id, ...matches[0].payload });
131137
}
132138
} else if (type === 'status' && req.body.subject) {
@@ -139,6 +145,7 @@ memoryRouter.post('/', async (req, res) => {
139145
superseded_by: pointId,
140146
superseded_at: now,
141147
});
148+
deactivateMemory(matches[0].id).catch(() => {});
142149
dispatchNotification('memory_superseded', { id: matches[0].id, ...matches[0].payload });
143150
}
144151
}
@@ -183,6 +190,15 @@ memoryRouter.post('/', async (req, res) => {
183190
const vector = await embed(cleanContent, 'store');
184191
await upsertPoint(pointId, vector, payload);
185192

193+
// Index in keyword search (fire-and-forget)
194+
if (isKeywordSearchAvailable()) {
195+
indexMemory(pointId, cleanContent, {
196+
client_id: client_id || 'global',
197+
source_agent,
198+
type,
199+
}).catch(e => console.error('[memory:keyword-index]', e.message));
200+
}
201+
186202
// Dispatch webhook notification for new memory
187203
dispatchNotification('memory_stored', { id: pointId, ...payload });
188204

@@ -248,24 +264,24 @@ memoryRouter.post('/', async (req, res) => {
248264
}
249265
});
250266

251-
// GET /memory/search — Semantic search via Qdrant
267+
// GET /memory/search — Multi-path retrieval with RRF fusion
268+
// Paths: vector (semantic), keyword (BM25), graph (entity BFS)
252269
memoryRouter.get('/search', async (req, res) => {
253270
try {
254271
const { q, type, source_agent, client_id, category, limit, include_superseded, entity, format } = req.query;
255272
const isCompact = format === 'compact';
273+
const isFull = format === 'full';
274+
const maxResults = Math.min(parseInt(limit) || 10, 100);
256275

257276
if (!q) {
258277
return res.status(400).json({ error: 'Missing required query parameter: q' });
259278
}
260279

261-
const vector = await embed(q, 'search');
262-
263280
const filter = {};
264281
if (type) filter.type = type;
265282
if (source_agent) filter.source_agent = source_agent;
266283
if (client_id) filter.client_id = client_id;
267284
if (category) filter.category = category;
268-
// By default, only return active memories (not superseded)
269285
if (include_superseded !== 'true') filter.active = true;
270286

271287
// Entity filter — resolve alias to canonical name, then filter via Qdrant payload
@@ -281,25 +297,99 @@ memoryRouter.get('/search', async (req, res) => {
281297
nestedFilters.push({ arrayField: 'entities', key: 'name', value: entityName });
282298
}
283299

284-
const rawResults = await searchPoints(vector, filter, Math.min(parseInt(limit) || 10, 100), nestedFilters);
300+
// --- Multi-path retrieval ---
301+
const useMultiPath = MULTI_PATH_SEARCH && !entity; // entity filter is Qdrant-only
302+
const fetchLimit = useMultiPath ? Math.min(maxResults * 2, 50) : maxResults;
303+
304+
// Always run vector search
305+
const vectorPromise = embed(q, 'search').then(vector =>
306+
searchPoints(vector, filter, fetchLimit, nestedFilters)
307+
);
308+
309+
// Run keyword + graph in parallel (only if multi-path enabled)
310+
const keywordPromise = (useMultiPath && isKeywordSearchAvailable())
311+
? keywordSearch(q, filter, fetchLimit).catch(e => {
312+
console.error('[memory:keyword-search]', e.message);
313+
return [];
314+
})
315+
: Promise.resolve([]);
316+
317+
const graphPromise = (useMultiPath && isGraphSearchAvailable())
318+
? graphSearch(q, filter, Math.min(maxResults, 20)).catch(e => {
319+
console.error('[memory:graph-search]', e.message);
320+
return [];
321+
})
322+
: Promise.resolve([]);
323+
324+
const [vectorResults, keywordResults, graphResults] = await Promise.all([
325+
vectorPromise, keywordPromise, graphPromise,
326+
]);
327+
328+
// --- Build result set ---
329+
let finalResults;
330+
const retrievalSources = {};
331+
332+
if (useMultiPath && (keywordResults.length > 0 || graphResults.length > 0)) {
333+
// Build ranked lists for RRF
334+
const rankedLists = [
335+
vectorResults.map(r => ({ id: r.id, source: 'vector' })),
336+
];
337+
if (keywordResults.length > 0) {
338+
rankedLists.push(keywordResults.map(r => ({ id: r.memory_id, source: 'keyword' })));
339+
}
340+
if (graphResults.length > 0) {
341+
rankedLists.push(graphResults.map(r => ({ id: r.memory_id, source: 'graph' })));
342+
}
343+
344+
const fused = reciprocalRankFusion(rankedLists);
345+
const topFused = fused.slice(0, maxResults);
346+
347+
// Track which sources contributed to each result
348+
for (const f of topFused) {
349+
retrievalSources[f.id] = f.sources;
350+
}
351+
352+
// Build payload map from vector results (already have full payloads)
353+
const payloadMap = new Map();
354+
for (const r of vectorResults) {
355+
payloadMap.set(r.id, { id: r.id, score: r.score, payload: r.payload });
356+
}
357+
358+
// Fetch payloads for keyword/graph hits not in vector results
359+
const missingIds = topFused.map(f => f.id).filter(id => !payloadMap.has(id));
360+
if (missingIds.length > 0) {
361+
try {
362+
const fetched = await getPoints(missingIds);
363+
for (const pt of fetched) {
364+
payloadMap.set(pt.id, { id: pt.id, score: 0, payload: pt.payload });
365+
}
366+
} catch (e) {
367+
console.error('[memory:search] Batch fetch failed:', e.message);
368+
}
369+
}
370+
371+
// Assemble results in RRF order
372+
finalResults = topFused
373+
.map(f => payloadMap.get(f.id))
374+
.filter(Boolean);
375+
} else {
376+
// Single-path: vector only
377+
finalResults = vectorResults.slice(0, maxResults);
378+
}
285379

286380
// Apply confidence decay + access-weighted ranking
287-
// Memories that get used more are more valuable — self-curating brain
288381
const COMPACT_MAX = 200;
289-
const results = rawResults.map(r => {
382+
const results = finalResults.map(r => {
290383
const effectiveConfidence = computeEffectiveConfidence(r.payload);
291384
const p = r.payload;
292-
293-
// Access boost: log(access_count + 1) gives diminishing returns
294-
// 0 accesses = 1.0x, 1 = 1.3x, 5 = 1.8x, 20 = 2.3x, 100 = 2.7x
295385
const accessBoost = 1 + (0.3 * Math.log2((p.access_count || 0) + 1));
296-
const effectiveScore = +(r.score * effectiveConfidence * accessBoost).toFixed(4);
386+
const effectiveScore = +(((r.score || 0.5) * effectiveConfidence * accessBoost)).toFixed(4);
297387

298388
if (isCompact) {
299389
const text = p.text || '';
300390
return {
301391
id: r.id,
302-
score: +r.score.toFixed(4),
392+
score: +(r.score || 0).toFixed(4),
303393
effective_score: effectiveScore,
304394
type: p.type,
305395
content: text.length > COMPACT_MAX ? text.slice(0, COMPACT_MAX) + '...' : text,
@@ -310,22 +400,28 @@ memoryRouter.get('/search', async (req, res) => {
310400
};
311401
}
312402

313-
return {
403+
const base = {
314404
id: r.id,
315-
score: r.score,
405+
score: r.score || 0,
316406
confidence: effectiveConfidence,
317407
effective_score: effectiveScore,
318408
...p,
319409
};
410+
411+
// In full format, show which retrieval paths contributed
412+
if (isFull && retrievalSources[r.id]) {
413+
base.retrieval_sources = retrievalSources[r.id];
414+
}
415+
416+
return base;
320417
});
321418

322-
// Re-sort by effective_score (now includes access weight)
419+
// Re-sort by effective_score
323420
results.sort((a, b) => b.effective_score - a.effective_score);
324421

325422
// Async: increment access_count and update last_accessed_at for returned results
326423
const pointIds = results.map(r => r.id);
327424
if (pointIds.length > 0) {
328-
// Fire and forget — don't slow down the response
329425
Promise.resolve().then(async () => {
330426
try {
331427
const now = new Date().toISOString();
@@ -341,11 +437,25 @@ memoryRouter.get('/search', async (req, res) => {
341437
});
342438
}
343439

344-
res.json({
440+
const response = {
345441
query: q,
346442
count: results.length,
347443
results,
348-
});
444+
};
445+
446+
// In full format, add retrieval metadata
447+
if (isFull && useMultiPath) {
448+
response.retrieval = {
449+
multi_path: true,
450+
paths: {
451+
vector: vectorResults.length,
452+
keyword: keywordResults.length,
453+
graph: graphResults.length,
454+
},
455+
};
456+
}
457+
458+
res.json(response);
349459
} catch (err) {
350460
console.error('[memory:search] Error:', err.message);
351461
res.status(500).json({ error: err.message });
@@ -426,6 +536,7 @@ memoryRouter.delete('/:id', async (req, res) => {
426536
deletion_reason: reason || null,
427537
});
428538

539+
deactivateMemory(id).catch(() => {});
429540
dispatchNotification('memory_deleted', { id, ...point.payload });
430541

431542
console.log(`[memory:delete] Memory ${id} soft-deleted by ${req.authenticatedAgent || 'admin'}${reason ? ': ' + reason : ''}`);

0 commit comments

Comments
 (0)