-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat(nodes): add hybrid search node with BM25 keyword and vector similarity fusion #602
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| # ============================================================================= | ||
| # MIT License | ||
| # Copyright (c) 2026 Aparavi Software AG | ||
| # | ||
| # Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| # of this software and associated documentation files (the "Software"), to deal | ||
| # in the Software without restriction, including without limitation the rights | ||
| # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| # copies of the Software, and to permit persons to whom the Software is | ||
| # furnished to do so, subject to the following conditions: | ||
| # | ||
| # The above copyright notice and this permission notice shall be included in | ||
| # all copies or substantial portions of the Software. | ||
| # | ||
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| # SOFTWARE. | ||
| # ============================================================================= | ||
|
|
||
| # ------------------------------------------------------------------------------ | ||
| # This class controls the data shared between all threads for the task | ||
| # ------------------------------------------------------------------------------ | ||
| import os | ||
| from rocketlib import IGlobalBase, OPEN_MODE, warning | ||
| from ai.common.config import Config | ||
|
|
||
| from .hybrid_search import HybridSearchEngine | ||
|
|
||
|
|
||
| class IGlobal(IGlobalBase): | ||
| engine: HybridSearchEngine | None = None | ||
| top_k: int = 10 | ||
| rrf_k: int = 60 | ||
|
|
||
| def validateConfig(self): | ||
| """Validate that the rank_bm25 dependency is available.""" | ||
| try: | ||
| from depends import depends | ||
|
|
||
| requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt' | ||
| depends(requirements) | ||
| except Exception as e: | ||
| warning(str(e)) | ||
|
|
||
| def beginGlobal(self): | ||
| # Are we in config mode or some other mode? | ||
| if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG: | ||
| # We are going to get a call to configureService but | ||
| # we don't actually need to load the engine for that | ||
| pass | ||
| else: | ||
| # Get this node's config | ||
| config = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig) | ||
|
|
||
| # Read parameters from config | ||
| alpha = float(config.get('alpha', 0.5)) | ||
| self.top_k = int(config.get('top_k', 10)) | ||
| self.rrf_k = int(config.get('rrf_k', 60)) | ||
|
|
||
| # Validate alpha range | ||
| alpha = max(0.0, min(1.0, alpha)) | ||
|
|
||
| # Create the hybrid search engine | ||
| self.engine = HybridSearchEngine(alpha=alpha) | ||
|
|
||
| def endGlobal(self): | ||
| # Release the engine | ||
| self.engine = None | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| # ============================================================================= | ||
| # MIT License | ||
| # Copyright (c) 2026 Aparavi Software AG | ||
| # | ||
| # Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| # of this software and associated documentation files (the "Software"), to deal | ||
| # in the Software without restriction, including without limitation the rights | ||
| # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| # copies of the Software, and to permit persons to whom the Software is | ||
| # furnished to do so, subject to the following conditions: | ||
| # | ||
| # The above copyright notice and this permission notice shall be included in | ||
| # all copies or substantial portions of the Software. | ||
| # | ||
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| # SOFTWARE. | ||
| # ============================================================================= | ||
|
|
||
| # ------------------------------------------------------------------------------ | ||
| # This class controls the data for each thread of the task | ||
| # ------------------------------------------------------------------------------ | ||
| import copy | ||
| from typing import List | ||
|
|
||
| from rocketlib import IInstanceBase, debug | ||
| from ai.common.schema import Doc, Question, Answer | ||
|
|
||
| from .IGlobal import IGlobal | ||
|
|
||
|
|
||
| class IInstance(IInstanceBase): | ||
| """Instance that performs hybrid search (vector + BM25) over question documents.""" | ||
|
|
||
| IGlobal: IGlobal | ||
|
|
||
| def writeQuestions(self, question: Question): | ||
| """ | ||
| Perform hybrid search over the question's documents. | ||
|
|
||
| 1. Extract query text and documents (with vector scores) from the question. | ||
| 2. Run BM25 keyword search on document texts. | ||
| 3. Merge vector + BM25 results via Reciprocal Rank Fusion. | ||
| 4. Emit reranked documents to the output. | ||
| """ | ||
| if self.IGlobal.engine is None: | ||
| raise RuntimeError('Hybrid search engine not initialized') | ||
|
|
||
| # Deep copy to prevent mutation corruption in fan-out branches | ||
| question = copy.deepcopy(question) | ||
|
|
||
| # Extract query text from the first question | ||
| query_text = '' | ||
| if question.questions: | ||
| query_text = question.questions[0].text or '' | ||
| if not query_text: | ||
| debug('No query text found in question; skipping hybrid search') | ||
| return | ||
|
|
||
| # Extract documents and their vector scores | ||
| docs = question.documents or [] | ||
| if not docs: | ||
| debug('No documents found in question; skipping hybrid search') | ||
| return | ||
|
|
||
| # Build document dicts for the search engine | ||
| doc_dicts: List[dict] = [] | ||
| vector_scores: List[float] = [] | ||
| for i, doc in enumerate(docs): | ||
| doc_dict = { | ||
| 'id': str(i), | ||
| 'text': doc.page_content or '', | ||
| 'original_index': i, | ||
| } | ||
| doc_dicts.append(doc_dict) | ||
| # Use the document's score as the vector score (from upstream vector DB) | ||
| vector_scores.append(float(doc.score) if doc.score is not None else 0.0) | ||
|
|
||
| # Run hybrid search | ||
| results = self.IGlobal.engine.search( | ||
| query=query_text, | ||
| documents=doc_dicts, | ||
| vector_scores=vector_scores, | ||
| top_k=self.IGlobal.top_k, | ||
| rrf_k=self.IGlobal.rrf_k, | ||
| ) | ||
|
|
||
| # Map results back to Doc objects, preserving original metadata | ||
| reranked_docs: List[Doc] = [] | ||
| for result in results: | ||
| orig_idx = result.get('original_index') | ||
| if orig_idx is not None and 0 <= orig_idx < len(docs): | ||
| reranked_doc = copy.deepcopy(docs[orig_idx]) | ||
| # Update score with RRF score | ||
| rrf_score = result.get('rrf_score') | ||
| if rrf_score is not None: | ||
| reranked_doc.score = rrf_score | ||
| reranked_docs.append(reranked_doc) | ||
|
Comment on lines
+94
to
+102
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Copy the actual ranking score onto the emitted In BM25-only mode ( 🛠️ Proposed fix- rrf_score = result.get('rrf_score')
- if rrf_score is not None:
- reranked_doc.score = rrf_score
+ score = result.get('rrf_score')
+ if score is None:
+ score = result.get('bm25_score')
+ if score is None:
+ score = result.get('vector_score')
+ if score is not None:
+ reranked_doc.score = score🤖 Prompt for AI Agents |
||
|
|
||
| # Update the question with reranked documents | ||
| question.documents = reranked_docs | ||
|
|
||
| # Emit reranked documents | ||
| if reranked_docs and self.instance.hasListener('documents'): | ||
| debug(f'Hybrid search emitting {len(reranked_docs)} reranked documents') | ||
| self.instance.writeDocuments(reranked_docs) | ||
|
|
||
| # Emit structured answer if listener exists | ||
| if reranked_docs and self.instance.hasListener('answers'): | ||
| context_parts = [] | ||
| for i, doc in enumerate(reranked_docs): | ||
| score = doc.score if doc.score is not None else 'N/A' | ||
| snippet = (doc.page_content or '')[:500] | ||
| context_parts.append(f'[Document {i + 1}] (score: {score})\n{snippet}') | ||
| answer_text = f'Hybrid search returned {len(reranked_docs)} results:\n\n' + '\n\n'.join(context_parts) | ||
| ans = Answer() | ||
| ans.setAnswer(answer_text) | ||
| self.instance.writeAnswers(ans) | ||
|
Comment on lines
+113
to
+122
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The answer payload still drops source attribution. Each block is only labeled |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: rocketride-org/rocketride-server
Length of output: 50388
Move dependency resolution to
beginGlobal()to match the standard pattern used across the codebase.The
depends(requirements)call is currently isolated invalidateConfig(), but runtime execution ofbeginGlobal()does not load dependencies. This pattern deviates from virtually all other nodes in the codebase (vectorizer, question, prompt, summarization, text_output, and 60+ additional nodes), which consistently load dependencies in the non-CONFIG branch ofbeginGlobal(). If the framework does not guaranteevalidateConfig()executes beforebeginGlobal(), the lazy import ofrank_bm25inhybrid_search.pywill fail when requests reach the engine.Move the
depends(requirements)call into theelsebranch ofbeginGlobal()(the execution path for non-CONFIG mode) to ensure dependencies are available before engine initialization:Suggested pattern (already used by sibling nodes):
🤖 Prompt for AI Agents