Skip to content

Commit 2e689c0

Browse files
nihalnihalaniclaude
andcommitted
feat(nodes): add hybrid search node combining BM25 keyword and vector similarity search
Split from rocketride-org#514 per reviewer request. Addresses review feedback: - RuntimeError on uninitialized engine (not silent return) - Deep copy question to prevent fan-out mutation corruption - Structured answer output instead of concatenated text blob Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7a00a2b commit 2e689c0

7 files changed

Lines changed: 1266 additions & 0 deletions

File tree

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# =============================================================================
2+
# MIT License
3+
# Copyright (c) 2026 Aparavi Software AG
4+
#
5+
# Permission is hereby granted, free of charge, to any person obtaining a copy
6+
# of this software and associated documentation files (the "Software"), to deal
7+
# in the Software without restriction, including without limitation the rights
8+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
# copies of the Software, and to permit persons to whom the Software is
10+
# furnished to do so, subject to the following conditions:
11+
#
12+
# The above copyright notice and this permission notice shall be included in
13+
# all copies or substantial portions of the Software.
14+
#
15+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
# SOFTWARE.
22+
# =============================================================================
23+
24+
# ------------------------------------------------------------------------------
25+
# This class controls the data shared between all threads for the task
26+
# ------------------------------------------------------------------------------
27+
import os
28+
from rocketlib import IGlobalBase, OPEN_MODE, warning
29+
from ai.common.config import Config
30+
31+
from .hybrid_search import HybridSearchEngine
32+
33+
34+
class IGlobal(IGlobalBase):
35+
engine: HybridSearchEngine | None = None
36+
top_k: int = 10
37+
rrf_k: int = 60
38+
39+
def validateConfig(self):
40+
"""Validate that the rank_bm25 dependency is available."""
41+
try:
42+
from depends import depends
43+
44+
requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt'
45+
depends(requirements)
46+
except Exception as e:
47+
warning(str(e))
48+
49+
def beginGlobal(self):
50+
# Are we in config mode or some other mode?
51+
if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG:
52+
# We are going to get a call to configureService but
53+
# we don't actually need to load the engine for that
54+
pass
55+
else:
56+
# Get this node's config
57+
config = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig)
58+
59+
# Read parameters from config
60+
alpha = float(config.get('alpha', 0.5))
61+
self.top_k = int(config.get('top_k', 10))
62+
self.rrf_k = int(config.get('rrf_k', 60))
63+
64+
# Validate alpha range
65+
alpha = max(0.0, min(1.0, alpha))
66+
67+
# Create the hybrid search engine
68+
self.engine = HybridSearchEngine(alpha=alpha)
69+
70+
def endGlobal(self):
71+
# Release the engine
72+
self.engine = None
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
# =============================================================================
2+
# MIT License
3+
# Copyright (c) 2026 Aparavi Software AG
4+
#
5+
# Permission is hereby granted, free of charge, to any person obtaining a copy
6+
# of this software and associated documentation files (the "Software"), to deal
7+
# in the Software without restriction, including without limitation the rights
8+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
# copies of the Software, and to permit persons to whom the Software is
10+
# furnished to do so, subject to the following conditions:
11+
#
12+
# The above copyright notice and this permission notice shall be included in
13+
# all copies or substantial portions of the Software.
14+
#
15+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
# SOFTWARE.
22+
# =============================================================================
23+
24+
# ------------------------------------------------------------------------------
25+
# This class controls the data for each thread of the task
26+
# ------------------------------------------------------------------------------
27+
import copy
28+
from typing import List
29+
30+
from rocketlib import IInstanceBase, debug
31+
from ai.common.schema import Doc, Question, Answer
32+
33+
from .IGlobal import IGlobal
34+
35+
36+
class IInstance(IInstanceBase):
37+
"""Instance that performs hybrid search (vector + BM25) over question documents."""
38+
39+
IGlobal: IGlobal
40+
41+
def writeQuestions(self, question: Question):
42+
"""
43+
Perform hybrid search over the question's documents.
44+
45+
1. Extract query text and documents (with vector scores) from the question.
46+
2. Run BM25 keyword search on document texts.
47+
3. Merge vector + BM25 results via Reciprocal Rank Fusion.
48+
4. Emit reranked documents to the output.
49+
"""
50+
if self.IGlobal.engine is None:
51+
raise RuntimeError('Hybrid search engine not initialized')
52+
53+
# Deep copy to prevent mutation corruption in fan-out branches
54+
question = copy.deepcopy(question)
55+
56+
# Extract query text from the first question
57+
query_text = ''
58+
if question.questions:
59+
query_text = question.questions[0].text or ''
60+
if not query_text:
61+
debug('No query text found in question; skipping hybrid search')
62+
return
63+
64+
# Extract documents and their vector scores
65+
docs = question.documents or []
66+
if not docs:
67+
debug('No documents found in question; skipping hybrid search')
68+
return
69+
70+
# Build document dicts for the search engine
71+
doc_dicts: List[dict] = []
72+
vector_scores: List[float] = []
73+
for i, doc in enumerate(docs):
74+
doc_dict = {
75+
'id': str(i),
76+
'text': doc.page_content or '',
77+
'original_index': i,
78+
}
79+
doc_dicts.append(doc_dict)
80+
# Use the document's score as the vector score (from upstream vector DB)
81+
vector_scores.append(float(doc.score) if doc.score is not None else 0.0)
82+
83+
# Run hybrid search
84+
results = self.IGlobal.engine.search(
85+
query=query_text,
86+
documents=doc_dicts,
87+
vector_scores=vector_scores,
88+
top_k=self.IGlobal.top_k,
89+
rrf_k=self.IGlobal.rrf_k,
90+
)
91+
92+
# Map results back to Doc objects, preserving original metadata
93+
reranked_docs: List[Doc] = []
94+
for result in results:
95+
orig_idx = result.get('original_index')
96+
if orig_idx is not None and 0 <= orig_idx < len(docs):
97+
reranked_doc = copy.deepcopy(docs[orig_idx])
98+
# Update score with RRF score
99+
rrf_score = result.get('rrf_score')
100+
if rrf_score is not None:
101+
reranked_doc.score = rrf_score
102+
reranked_docs.append(reranked_doc)
103+
104+
# Update the question with reranked documents
105+
question.documents = reranked_docs
106+
107+
# Emit reranked documents
108+
if reranked_docs and self.instance.hasListener('documents'):
109+
debug(f'Hybrid search emitting {len(reranked_docs)} reranked documents')
110+
self.instance.writeDocuments(reranked_docs)
111+
112+
# Emit structured answer if listener exists
113+
if reranked_docs and self.instance.hasListener('answers'):
114+
context_parts = []
115+
for i, doc in enumerate(reranked_docs):
116+
score = doc.metadata.get('hybrid_score', 'N/A') if doc.metadata else 'N/A'
117+
snippet = (doc.page_content or '')[:500]
118+
context_parts.append(f'[Document {i + 1}] (score: {score})\n{snippet}')
119+
answer_text = f'Hybrid search returned {len(reranked_docs)} results:\n\n' + '\n\n'.join(context_parts)
120+
ans = Answer()
121+
ans.setAnswer(answer_text)
122+
self.instance.writeAnswers(ans)

nodes/src/nodes/search_hybrid/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)