Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions nodes/src/nodes/search_hybrid/IGlobal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# =============================================================================
# MIT License
# Copyright (c) 2026 Aparavi Software AG
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# =============================================================================

# ------------------------------------------------------------------------------
# This class controls the data shared between all threads for the task
# ------------------------------------------------------------------------------
import os
from rocketlib import IGlobalBase, OPEN_MODE, warning
from ai.common.config import Config

from .hybrid_search import HybridSearchEngine


class IGlobal(IGlobalBase):
engine: HybridSearchEngine | None = None
top_k: int = 10
rrf_k: int = 60

def validateConfig(self):
"""Validate that the rank_bm25 dependency is available."""
try:
from depends import depends

requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt'
depends(requirements)
Comment on lines +39 to +45
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

printf '== search_hybrid IGlobal ==\n'
sed -n '1,140p' nodes/src/nodes/search_hybrid/IGlobal.py

printf '\n== sibling nodes resolving requirements in beginGlobal ==\n'
rg -n -C2 "depends\(requirements\)|from depends import depends" nodes/src/nodes --type py

printf '\n== framework callers of validateConfig ==\n'
rg -n -C2 "\bvalidateConfig\s*\(" --type py

printf '\n== framework callers of beginGlobal ==\n'
rg -n -C2 "\bbeginGlobal\s*\(" --type py

Repository: rocketride-org/rocketride-server

Length of output: 50388


Move dependency resolution to beginGlobal() to match the standard pattern used across the codebase.

The depends(requirements) call is currently isolated in validateConfig(), but runtime execution of beginGlobal() does not load dependencies. This pattern deviates from virtually all other nodes in the codebase (vectorizer, question, prompt, summarization, text_output, and 60+ additional nodes), which consistently load dependencies in the non-CONFIG branch of beginGlobal(). If the framework does not guarantee validateConfig() executes before beginGlobal(), the lazy import of rank_bm25 in hybrid_search.py will fail when requests reach the engine.

Move the depends(requirements) call into the else branch of beginGlobal() (the execution path for non-CONFIG mode) to ensure dependencies are available before engine initialization:

Suggested pattern (already used by sibling nodes):
def beginGlobal(self):
    if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG:
        pass
    else:
        from depends import depends
        requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt'
        depends(requirements)
        
        # existing config/engine init code
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/search_hybrid/IGlobal.py` around lines 39 - 45, The
depends(requirements) call in validateConfig should be moved into the non-CONFIG
branch of beginGlobal to match other nodes: remove the dependency resolution
from validateConfig, and in beginGlobal's else branch import depends (from
depends import depends), compute requirements using
os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt' and call
depends(requirements) before any engine/ hybrid initialization (so lazy imports
in hybrid_search.py succeed at runtime); keep beginGlobal's CONFIG branch
unchanged.

except Exception as e:
warning(str(e))

def beginGlobal(self):
# Are we in config mode or some other mode?
if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG:
# We are going to get a call to configureService but
# we don't actually need to load the engine for that
pass
else:
# Get this node's config
config = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig)

# Read parameters from config
alpha = float(config.get('alpha', 0.5))
self.top_k = int(config.get('top_k', 10))
self.rrf_k = int(config.get('rrf_k', 60))

# Validate alpha range
alpha = max(0.0, min(1.0, alpha))

# Create the hybrid search engine
self.engine = HybridSearchEngine(alpha=alpha)

def endGlobal(self):
# Release the engine
self.engine = None
122 changes: 122 additions & 0 deletions nodes/src/nodes/search_hybrid/IInstance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# =============================================================================
# MIT License
# Copyright (c) 2026 Aparavi Software AG
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# =============================================================================

# ------------------------------------------------------------------------------
# This class controls the data for each thread of the task
# ------------------------------------------------------------------------------
import copy
from typing import List

from rocketlib import IInstanceBase, debug
from ai.common.schema import Doc, Question, Answer

from .IGlobal import IGlobal


class IInstance(IInstanceBase):
"""Instance that performs hybrid search (vector + BM25) over question documents."""

IGlobal: IGlobal

def writeQuestions(self, question: Question):
"""
Perform hybrid search over the question's documents.

1. Extract query text and documents (with vector scores) from the question.
2. Run BM25 keyword search on document texts.
3. Merge vector + BM25 results via Reciprocal Rank Fusion.
4. Emit reranked documents to the output.
"""
if self.IGlobal.engine is None:
raise RuntimeError('Hybrid search engine not initialized')

# Deep copy to prevent mutation corruption in fan-out branches
question = copy.deepcopy(question)

# Extract query text from the first question
query_text = ''
if question.questions:
query_text = question.questions[0].text or ''
if not query_text:
debug('No query text found in question; skipping hybrid search')
return

# Extract documents and their vector scores
docs = question.documents or []
if not docs:
debug('No documents found in question; skipping hybrid search')
return

# Build document dicts for the search engine
doc_dicts: List[dict] = []
vector_scores: List[float] = []
for i, doc in enumerate(docs):
doc_dict = {
'id': str(i),
'text': doc.page_content or '',
'original_index': i,
}
doc_dicts.append(doc_dict)
# Use the document's score as the vector score (from upstream vector DB)
vector_scores.append(float(doc.score) if doc.score is not None else 0.0)

# Run hybrid search
results = self.IGlobal.engine.search(
query=query_text,
documents=doc_dicts,
vector_scores=vector_scores,
top_k=self.IGlobal.top_k,
rrf_k=self.IGlobal.rrf_k,
)

# Map results back to Doc objects, preserving original metadata
reranked_docs: List[Doc] = []
for result in results:
orig_idx = result.get('original_index')
if orig_idx is not None and 0 <= orig_idx < len(docs):
reranked_doc = copy.deepcopy(docs[orig_idx])
# Update score with RRF score
rrf_score = result.get('rrf_score')
if rrf_score is not None:
reranked_doc.score = rrf_score
reranked_docs.append(reranked_doc)
Comment on lines +94 to +102
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Copy the actual ranking score onto the emitted Doc.

In BM25-only mode (alpha == 0) or any non-fused fallback, result carries bm25_score/vector_score but no rrf_score. This branch leaves reranked_doc.score at the upstream value, so the documents and answers lanes can report a score that did not determine the ranking.

🛠️ Proposed fix
-                rrf_score = result.get('rrf_score')
-                if rrf_score is not None:
-                    reranked_doc.score = rrf_score
+                score = result.get('rrf_score')
+                if score is None:
+                    score = result.get('bm25_score')
+                if score is None:
+                    score = result.get('vector_score')
+                if score is not None:
+                    reranked_doc.score = score
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/search_hybrid/IInstance.py` around lines 94 - 102, The loop
that builds reranked_docs currently only sets reranked_doc.score when rrf_score
exists, leaving the emitted Doc with its upstream score in BM25-only or
vector-only fallbacks; update the logic in the results iteration (the block
handling result, reranked_doc, rrf_score) to set reranked_doc.score to the
actual ranking score: use rrf_score if present, otherwise fall back to
result.get('bm25_score') then result.get('vector_score') (and only if none exist
keep the original docs[orig_idx].score), and then append to reranked_docs so
documents/answers report the score that determined ranking.


# Update the question with reranked documents
question.documents = reranked_docs

# Emit reranked documents
if reranked_docs and self.instance.hasListener('documents'):
debug(f'Hybrid search emitting {len(reranked_docs)} reranked documents')
self.instance.writeDocuments(reranked_docs)

# Emit structured answer if listener exists
if reranked_docs and self.instance.hasListener('answers'):
context_parts = []
for i, doc in enumerate(reranked_docs):
score = doc.score if doc.score is not None else 'N/A'
snippet = (doc.page_content or '')[:500]
context_parts.append(f'[Document {i + 1}] (score: {score})\n{snippet}')
answer_text = f'Hybrid search returned {len(reranked_docs)} results:\n\n' + '\n\n'.join(context_parts)
ans = Answer()
ans.setAnswer(answer_text)
self.instance.writeAnswers(ans)
Comment on lines +113 to +122
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

The answer payload still drops source attribution.

Each block is only labeled [Document N], so any upstream source/title/url metadata disappears from the answer lane. That misses the PR goal of preserving source attribution and makes the emitted answer much harder to audit.

Empty file.
Loading
Loading