From 8c7a59db19a77be442593875153db6bfc76d6290 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Wed, 29 Apr 2026 15:19:06 +0200 Subject: [PATCH 1/5] Add rrf to MultiRetriever and create a util method for reuse --- .../components/joiners/document_joiner.py | 27 +-------- .../components/retrievers/multi_retriever.py | 41 +++++++++---- haystack/utils/misc.py | 36 ++++++++++++ .../multi-retriever-rrf-effc174d0a440788.yaml | 8 +++ .../joiners/test_document_joiner.py | 40 ------------- .../retrievers/test_multi_retriever.py | 43 +++++++++++--- test/utils/test_misc.py | 57 ++++++++++++++++++- 7 files changed, 167 insertions(+), 85 deletions(-) create mode 100644 releasenotes/notes/multi-retriever-rrf-effc174d0a440788.yaml diff --git a/haystack/components/joiners/document_joiner.py b/haystack/components/joiners/document_joiner.py index 0460b360d6..3c31f8ba14 100644 --- a/haystack/components/joiners/document_joiner.py +++ b/haystack/components/joiners/document_joiner.py @@ -11,6 +11,7 @@ from haystack import Document, component, default_from_dict, default_to_dict, logging from haystack.core.component.types import Variadic +from haystack.utils.misc import _reciprocal_rank_fusion logger = logging.getLogger(__name__) @@ -196,32 +197,8 @@ def _merge(self, document_lists: list[list[Document]]) -> list[Document]: def _reciprocal_rank_fusion(self, document_lists: list[list[Document]]) -> list[Document]: """ Merge multiple lists of Documents and assign scores based on reciprocal rank fusion. - - The constant k is set to 61 (60 was suggested by the original paper, - plus 1 as python lists are 0-based and the paper used 1-based ranking). """ - # This check prevents a division by zero when no documents are passed - if not document_lists: - return [] - - k = 61 - - scores_map: dict = defaultdict(int) - documents_map = {} - weights = self.weights if self.weights else [1 / len(document_lists)] * len(document_lists) - - # Calculate weighted reciprocal rank fusion score - for documents, weight in zip(document_lists, weights, strict=True): - for rank, doc in enumerate(documents): - scores_map[doc.id] += (weight * len(document_lists)) / (k + rank) - documents_map[doc.id] = doc - - # Normalize scores. Note: len(results) / k is the maximum possible score, - # achieved by being ranked first in all doc lists with non-zero weight. - for _id in scores_map: - scores_map[_id] /= len(document_lists) / k - - return [replace(doc, score=scores_map[doc.id]) for doc in documents_map.values()] + return _reciprocal_rank_fusion(document_lists, weights=self.weights) @staticmethod def _distribution_based_rank_fusion(document_lists: list[list[Document]]) -> list[Document]: diff --git a/haystack/components/retrievers/multi_retriever.py b/haystack/components/retrievers/multi_retriever.py index 975f6170f1..8dc58e8a30 100644 --- a/haystack/components/retrievers/multi_retriever.py +++ b/haystack/components/retrievers/multi_retriever.py @@ -4,14 +4,15 @@ import asyncio from concurrent.futures import ThreadPoolExecutor, as_completed -from typing import Any +from math import inf +from typing import Any, Literal from haystack import component, default_from_dict, default_to_dict from haystack.components.retrievers.types.protocol import TextRetriever from haystack.core.serialization import component_from_dict, component_to_dict, import_class_by_name from haystack.dataclasses import Document from haystack.utils.experimental import _experimental -from haystack.utils.misc import _deduplicate_documents +from haystack.utils.misc import _deduplicate_documents, _reciprocal_rank_fusion @_experimental @@ -82,6 +83,7 @@ def __init__( filters: dict[str, Any] | None = None, top_k: int = 10, max_workers: int = 4, + join_mode: Literal["concatenate", "reciprocal_rank_fusion"] = "reciprocal_rank_fusion", ) -> None: """ Create the MultiRetriever component. @@ -95,13 +97,31 @@ def __init__( The maximum number of documents to return per retriever. :param max_workers: The maximum number of threads to use for parallel retrieval. + :param join_mode: + How to merge results from multiple retrievers. Available modes: + - `concatenate`: Combines all results into a single list and deduplicates. + - `reciprocal_rank_fusion`: Deduplicates and assigns scores based on reciprocal rank fusion. """ self.retrievers = retrievers self.filters = filters self.top_k = top_k self.max_workers = max_workers + self.join_mode = join_mode self._is_warmed_up = False + def _merge_results(self, document_lists: list[list[Document]]) -> list[Document]: + """ + Merge per-retriever result lists according to `join_mode`. + + In `concatenate` mode, all lists are flattened and deduplicated. + In `reciprocal_rank_fusion` mode, results are deduplicated and re-scored using RRF, then returned in + descending score order. + """ + if self.join_mode == "reciprocal_rank_fusion": + documents = _reciprocal_rank_fusion(document_lists) + return sorted(documents, key=lambda d: d.score if d.score is not None else -inf, reverse=True) + return _deduplicate_documents([doc for docs in document_lists for doc in docs]) + def _resolve_retrievers(self, active_retrievers: list[str] | None) -> dict[str, TextRetriever]: """ Returns the subset of retrievers to run based on the active_retrievers list. @@ -171,7 +191,7 @@ def run( retrievers_to_run = self._resolve_retrievers(active_retrievers) - all_documents: list[Document] = [] + results_by_name: dict[str, list[Document]] = {} with ThreadPoolExecutor(max_workers=self.max_workers) as executor: future_to_name = { executor.submit(retriever.run, query=query, filters=resolved_filters, top_k=resolved_top_k): name @@ -180,11 +200,12 @@ def run( for future in as_completed(future_to_name): name = future_to_name[future] try: - all_documents.extend(future.result().get("documents", [])) + results_by_name[name] = future.result().get("documents", []) except Exception as e: raise RuntimeError(f"Retriever '{name}' failed: {e}") from e - return {"documents": _deduplicate_documents(all_documents)} + document_lists = [results_by_name[name] for name in retrievers_to_run] + return {"documents": self._merge_results(document_lists)} @component.output_types(documents=list[Document]) async def run_async( @@ -238,13 +259,8 @@ async def _run_one(name: str, retriever: TextRetriever) -> list[Document]: except Exception as e: raise RuntimeError(f"Retriever '{name}' failed: {e}") from e - results = await asyncio.gather(*[_run_one(name, r) for name, r in retrievers_to_run.items()]) - - all_documents: list[Document] = [] - for docs in results: - all_documents.extend(docs) - - return {"documents": _deduplicate_documents(all_documents)} + document_lists = list(await asyncio.gather(*[_run_one(name, r) for name, r in retrievers_to_run.items()])) + return {"documents": self._merge_results(document_lists)} def to_dict(self) -> dict[str, Any]: """ @@ -259,6 +275,7 @@ def to_dict(self) -> dict[str, Any]: filters=self.filters, top_k=self.top_k, max_workers=self.max_workers, + join_mode=self.join_mode, ) @classmethod diff --git a/haystack/utils/misc.py b/haystack/utils/misc.py index d043bb9d62..8073ca4d54 100644 --- a/haystack/utils/misc.py +++ b/haystack/utils/misc.py @@ -5,6 +5,8 @@ import json import mimetypes import tempfile +from collections import defaultdict +from dataclasses import replace from math import inf from pathlib import Path from typing import TYPE_CHECKING, Any, Literal, overload @@ -145,6 +147,40 @@ def _deduplicate_documents(documents: list["Document"]) -> list["Document"]: return list(highest_scoring_docs.values()) +def _reciprocal_rank_fusion( + document_lists: list[list["Document"]], weights: list[float] | None = None +) -> list["Document"]: + """ + Merge multiple ranked lists of Documents using Reciprocal Rank Fusion, deduplicating across lists. + + See the original paper: https://plg.uwaterloo.ca/~gvcormac/cormacksigir09-rrf.pdf + + The constant k is set to 61 (60 was suggested by the original paper, + plus 1 as python lists are 0-based and the paper used 1-based ranking). + + :param document_lists: A list of ranked document lists to fuse. + :param weights: Optional per-list weights. Defaults to equal weights. + :returns: Deduplicated list of documents with updated RRF scores. + """ + if not document_lists: + return [] + + k = 61 + scores_map: dict = defaultdict(int) + documents_map: dict = {} + resolved_weights = weights if weights else [1 / len(document_lists)] * len(document_lists) + + for documents, weight in zip(document_lists, resolved_weights, strict=True): + for rank, doc in enumerate(documents): + scores_map[doc.id] += (weight * len(document_lists)) / (k + rank) + documents_map[doc.id] = doc + + for _id in scores_map: + scores_map[_id] /= len(document_lists) / k + + return [replace(doc, score=scores_map[doc.id]) for doc in documents_map.values()] + + @overload def _parse_dict_from_json( text: str, expected_keys: list[str] | None = ..., raise_on_failure: Literal[True] = ... diff --git a/releasenotes/notes/multi-retriever-rrf-effc174d0a440788.yaml b/releasenotes/notes/multi-retriever-rrf-effc174d0a440788.yaml new file mode 100644 index 0000000000..72b9b566db --- /dev/null +++ b/releasenotes/notes/multi-retriever-rrf-effc174d0a440788.yaml @@ -0,0 +1,8 @@ +--- +enhancements: + - | + Add ``join_mode`` parameter to the experimental ``MultiRetriever`` component, supporting + ``"reciprocal_rank_fusion"`` (default) and ``"concatenate"``. Reciprocal Rank Fusion merges + the ranked result lists from all retrievers into a single deduplicated list ordered by RRF score. + The underlying RRF logic is extracted into a shared utility ``_reciprocal_rank_fusion`` in + ``haystack.utils.misc``, which is now also used by ``DocumentJoiner``. diff --git a/test/components/joiners/test_document_joiner.py b/test/components/joiners/test_document_joiner.py index 9886c25837..75abf2450d 100644 --- a/test/components/joiners/test_document_joiner.py +++ b/test/components/joiners/test_document_joiner.py @@ -270,43 +270,3 @@ def test_output_documents_not_sorted_by_score(self): documents_2 = [Document(content="d", score=0.2)] output = joiner.run([documents_1, documents_2]) assert output["documents"] == documents_1 + documents_2 - - def test_test_score_norm_with_rrf(self): - """ - Verifies reciprocal rank fusion (RRF) of the DocumentJoiner component with various weight configurations. - It creates a set of documents, forms them into two lists, and then applies multiple DocumentJoiner - instances with distinct weights to these lists. The test checks if the resulting - joined documents are correctly sorted in descending order by score, ensuring the RRF ranking works as - expected under different weighting scenarios. - """ - num_docs = 6 - docs = [] - - for i in range(num_docs): - docs.append(Document(content=f"doc{i}")) - - docs_2 = [docs[0], docs[4], docs[2], docs[5], docs[1]] - document_lists = [docs, docs_2] - - joiner_1 = DocumentJoiner(join_mode="reciprocal_rank_fusion", weights=[0.5, 0.5]) - - joiner_2 = DocumentJoiner(join_mode="reciprocal_rank_fusion", weights=[7, 7]) - - joiner_3 = DocumentJoiner(join_mode="reciprocal_rank_fusion", weights=[0.7, 0.3]) - - joiner_4 = DocumentJoiner(join_mode="reciprocal_rank_fusion", weights=[0.6, 0.4]) - - joiner_5 = DocumentJoiner(join_mode="reciprocal_rank_fusion", weights=[1, 0]) - - joiners = [joiner_1, joiner_2, joiner_3, joiner_4, joiner_5] - - for joiner in joiners: - join_results = joiner.run(documents=document_lists) - is_sorted = all( - join_results["documents"][i].score >= join_results["documents"][i + 1].score - for i in range(len(join_results["documents"]) - 1) - ) - - assert is_sorted, ( - "Documents are not sorted in descending order by score, there is an issue with rff ranking" - ) diff --git a/test/components/retrievers/test_multi_retriever.py b/test/components/retrievers/test_multi_retriever.py index 43af0a15bf..7f45e8fd3e 100644 --- a/test/components/retrievers/test_multi_retriever.py +++ b/test/components/retrievers/test_multi_retriever.py @@ -103,14 +103,32 @@ def test_init_default_parameters(self): assert retriever.filters is None assert retriever.top_k == 10 assert retriever.max_workers == 4 + assert retriever.join_mode == "reciprocal_rank_fusion" def test_init_custom_parameters(self): retrievers = {"mock": MockRetriever()} - retriever = MultiRetriever(retrievers=retrievers, filters={"field": "meta.category"}, top_k=5, max_workers=2) + retriever = MultiRetriever( + retrievers=retrievers, filters={"field": "meta.category"}, top_k=5, max_workers=2, join_mode="concatenate" + ) assert retriever.retrievers == retrievers assert retriever.filters == {"field": "meta.category"} assert retriever.top_k == 5 assert retriever.max_workers == 2 + assert retriever.join_mode == "concatenate" + + def test_run_rrf_assigns_scores_and_sorts(self, sample_documents): + docs_a = [sample_documents[0], sample_documents[1], sample_documents[2]] + docs_b = [sample_documents[2], sample_documents[0], sample_documents[3]] + retriever = MultiRetriever( + retrievers={"a": MockRetriever(docs_a), "b": MockRetriever(docs_b)}, join_mode="reciprocal_rank_fusion" + ) + result = retriever.run(query="energy") + assert all(doc.score is not None for doc in result["documents"]) + scores = [doc.score for doc in result["documents"]] + assert scores == sorted(scores, reverse=True) + # doc1 ranked 1st in a and 2nd in b, doc3 ranked 3rd in a and 1st in b — doc1 should beat doc3 + ids = [doc.id for doc in result["documents"]] + assert ids.index("doc1") < ids.index("doc3") def test_run_with_empty_document_store(self): retriever = MultiRetriever(retrievers={"mock": MockRetriever()}) @@ -223,6 +241,7 @@ def test_to_dict(self): "filters": None, "top_k": 5, "max_workers": 2, + "join_mode": "reciprocal_rank_fusion", }, } @@ -255,6 +274,7 @@ def test_from_dict(self): "filters": None, "top_k": 5, "max_workers": 2, + "join_mode": "concatenate", }, } result = MultiRetriever.from_dict(data) @@ -264,6 +284,7 @@ def test_from_dict(self): assert isinstance(result.retrievers["bm25"], InMemoryBM25Retriever) assert result.top_k == 5 assert result.max_workers == 2 + assert result.join_mode == "concatenate" def test_from_dict_with_no_retrievers(self): data = { @@ -290,7 +311,6 @@ def test_from_dict_with_unknown_retriever_type_raises(self): MultiRetriever.from_dict(data) @pytest.mark.integration - @pytest.mark.slow def test_run_with_filters(self, del_hf_env_vars, bm25_retriever, embedding_retriever): retriever = MultiRetriever(retrievers={"bm25": bm25_retriever, "embedding": embedding_retriever}) result = retriever.run(query="energy", filters={"field": "meta.category", "operator": "==", "value": "solar"}) @@ -298,14 +318,12 @@ def test_run_with_filters(self, del_hf_env_vars, bm25_retriever, embedding_retri assert result["documents"][0].meta["category"] == "solar" @pytest.mark.integration - @pytest.mark.slow def test_run_with_top_k(self, del_hf_env_vars, bm25_retriever, embedding_retriever): retriever = MultiRetriever(retrievers={"bm25": bm25_retriever, "embedding": embedding_retriever}) result = retriever.run(query="energy", top_k=2) assert len(result["documents"]) == 2 @pytest.mark.integration - @pytest.mark.slow def test_run_with_active_retrievers_integration(self, del_hf_env_vars, bm25_retriever, embedding_retriever): retriever = MultiRetriever(retrievers={"bm25": bm25_retriever, "embedding": embedding_retriever}) result_bm25_active = retriever.run(query="energy", active_retrievers=["bm25"]) @@ -345,6 +363,20 @@ async def test_run_async_deduplicates_results(self, sample_documents): assert len(result["documents"]) == 2 assert [doc.id for doc in result["documents"]].count("doc1") == 1 + @pytest.mark.asyncio + async def test_run_async_rrf_assigns_scores_and_sorts(self, sample_documents): + docs_a = [sample_documents[0], sample_documents[1], sample_documents[2]] + docs_b = [sample_documents[2], sample_documents[0], sample_documents[3]] + retriever = MultiRetriever( + retrievers={"a": MockRetriever(docs_a), "b": MockRetriever(docs_b)}, join_mode="reciprocal_rank_fusion" + ) + result = await retriever.run_async(query="energy") + assert all(doc.score is not None for doc in result["documents"]) + scores = [doc.score for doc in result["documents"]] + assert scores == sorted(scores, reverse=True) + ids = [doc.id for doc in result["documents"]] + assert ids.index("doc1") < ids.index("doc3") + @pytest.mark.asyncio async def test_run_async_resolves_filters_and_top_k(self): received: dict = {} @@ -414,7 +446,6 @@ async def run_async(self, query: str, filters: dict[str, Any] | None = None, top assert result["documents"][0].id == "async1" @pytest.mark.integration - @pytest.mark.slow @pytest.mark.asyncio async def test_run_async_with_filters(self, del_hf_env_vars, bm25_retriever, embedding_retriever): retriever = MultiRetriever(retrievers={"bm25": bm25_retriever, "embedding": embedding_retriever}) @@ -425,7 +456,6 @@ async def test_run_async_with_filters(self, del_hf_env_vars, bm25_retriever, emb assert result["documents"][0].meta["category"] == "solar" @pytest.mark.integration - @pytest.mark.slow @pytest.mark.asyncio async def test_run_async_with_top_k(self, del_hf_env_vars, bm25_retriever, embedding_retriever): retriever = MultiRetriever(retrievers={"bm25": bm25_retriever, "embedding": embedding_retriever}) @@ -433,7 +463,6 @@ async def test_run_async_with_top_k(self, del_hf_env_vars, bm25_retriever, embed assert len(result["documents"]) == 2 @pytest.mark.integration - @pytest.mark.slow @pytest.mark.asyncio async def test_run_async_with_active_retrievers_integration( self, del_hf_env_vars, bm25_retriever, embedding_retriever diff --git a/test/utils/test_misc.py b/test/utils/test_misc.py index 45782f3b7d..49a196c1b1 100644 --- a/test/utils/test_misc.py +++ b/test/utils/test_misc.py @@ -8,7 +8,12 @@ import pytest from haystack import Document -from haystack.utils.misc import _deduplicate_documents, _normalize_metadata_field_name, _parse_dict_from_json +from haystack.utils.misc import ( + _deduplicate_documents, + _normalize_metadata_field_name, + _parse_dict_from_json, + _reciprocal_rank_fusion, +) class TestNormalizeMetadataFieldName: @@ -50,6 +55,56 @@ def test_deduplicate_documents_keeps_first_when_scores_missing(self): assert result[0].content == "first" +class TestReciprocalRankFusion: + def test_empty_input_returns_empty(self): + assert _reciprocal_rank_fusion([]) == [] + + def test_single_list_assigns_scores(self): + docs = [Document(id="a"), Document(id="b"), Document(id="c")] + result = _reciprocal_rank_fusion([docs]) + assert len(result) == 3 + assert all(doc.score is not None for doc in result) + + def test_scores_decrease_with_rank(self): + docs = [Document(id="a"), Document(id="b"), Document(id="c")] + result = _reciprocal_rank_fusion([docs]) + by_id = {doc.id: doc.score for doc in result} + assert by_id["a"] > by_id["b"] > by_id["c"] + + def test_deduplicates_across_lists(self): + docs_a = [Document(id="a"), Document(id="b")] + docs_b = [Document(id="b"), Document(id="c")] + result = _reciprocal_rank_fusion([docs_a, docs_b]) + assert len(result) == 3 + assert {doc.id for doc in result} == {"a", "b", "c"} + + def test_higher_ranked_doc_gets_higher_score(self): + docs_a = [Document(id="a"), Document(id="b"), Document(id="c")] + docs_b = [Document(id="c"), Document(id="a"), Document(id="d")] + result = _reciprocal_rank_fusion([docs_a, docs_b]) + by_id = {doc.id: doc.score for doc in result} + # "a" is ranked 1st and 2nd; "c" is ranked 3rd and 1st — "a" should win + assert by_id["a"] > by_id["c"] + + def test_equal_weights_by_default(self): + docs_a = [Document(id="x")] + docs_b = [Document(id="x")] + result_default = _reciprocal_rank_fusion([docs_a, docs_b]) + result_explicit = _reciprocal_rank_fusion([docs_a, docs_b], weights=[0.5, 0.5]) + assert result_default[0].score == pytest.approx(result_explicit[0].score) + + def test_weights_influence_scores(self): + docs_a = [Document(id="a"), Document(id="b")] + docs_b = [Document(id="b"), Document(id="a")] + result_equal = _reciprocal_rank_fusion([docs_a, docs_b]) + result_weighted = _reciprocal_rank_fusion([docs_a, docs_b], weights=[0.9, 0.1]) + equal_by_id = {doc.id: doc.score for doc in result_equal} + weighted_by_id = {doc.id: doc.score for doc in result_weighted} + # with equal weights both docs score the same; with heavy weight on list_a, "a" should outscore "b" + assert equal_by_id["a"] == pytest.approx(equal_by_id["b"]) + assert weighted_by_id["a"] > weighted_by_id["b"] + + class TestJsonParsing: @pytest.fixture def mock_logger(self): From 3e9ba003e171abec41f53138e1e2f3c37e1f23ba Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Wed, 29 Apr 2026 15:20:32 +0200 Subject: [PATCH 2/5] docstrings --- haystack/components/retrievers/multi_retriever.py | 5 ++--- haystack/utils/misc.py | 7 ++++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/haystack/components/retrievers/multi_retriever.py b/haystack/components/retrievers/multi_retriever.py index 8dc58e8a30..911fa47cb0 100644 --- a/haystack/components/retrievers/multi_retriever.py +++ b/haystack/components/retrievers/multi_retriever.py @@ -113,9 +113,8 @@ def _merge_results(self, document_lists: list[list[Document]]) -> list[Document] """ Merge per-retriever result lists according to `join_mode`. - In `concatenate` mode, all lists are flattened and deduplicated. - In `reciprocal_rank_fusion` mode, results are deduplicated and re-scored using RRF, then returned in - descending score order. + In `concatenate` mode, all lists are flattened and deduplicated. In `reciprocal_rank_fusion` mode, results + are deduplicated and re-scored using RRF, then returned in descending score order. """ if self.join_mode == "reciprocal_rank_fusion": documents = _reciprocal_rank_fusion(document_lists) diff --git a/haystack/utils/misc.py b/haystack/utils/misc.py index 8073ca4d54..068dbb6adc 100644 --- a/haystack/utils/misc.py +++ b/haystack/utils/misc.py @@ -155,12 +155,13 @@ def _reciprocal_rank_fusion( See the original paper: https://plg.uwaterloo.ca/~gvcormac/cormacksigir09-rrf.pdf - The constant k is set to 61 (60 was suggested by the original paper, - plus 1 as python lists are 0-based and the paper used 1-based ranking). + The constant k is set to 61 (60 was suggested by the original paper, plus 1 as python lists are 0-based and the + paper used 1-based ranking). :param document_lists: A list of ranked document lists to fuse. :param weights: Optional per-list weights. Defaults to equal weights. - :returns: Deduplicated list of documents with updated RRF scores. + :returns: + Deduplicated list of documents with updated RRF scores. """ if not document_lists: return [] From 109ef457a59f5dddda48f49304808998c6ef6f3a Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Wed, 29 Apr 2026 15:28:06 +0200 Subject: [PATCH 3/5] Update docs to include join_mode --- .../retrievers/multiretriever.mdx | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/docs-website/docs/pipeline-components/retrievers/multiretriever.mdx b/docs-website/docs/pipeline-components/retrievers/multiretriever.mdx index f2d84fc22a..c1244fc041 100644 --- a/docs-website/docs/pipeline-components/retrievers/multiretriever.mdx +++ b/docs-website/docs/pipeline-components/retrievers/multiretriever.mdx @@ -2,12 +2,12 @@ title: "MultiRetriever" id: multiretriever slug: "/multiretriever" -description: "Runs multiple text retrievers in parallel and combines their deduplicated results." +description: "Runs multiple text retrievers in parallel and combines their results using reciprocal rank fusion or deduplication." --- # MultiRetriever -Runs multiple text retrievers in parallel and combines their deduplicated results. +Runs multiple text retrievers in parallel and combines their results using reciprocal rank fusion or deduplication. :::warning[Experimental] @@ -21,8 +21,9 @@ Runs multiple text retrievers in parallel and combines their deduplicated result | --- | --- | | **Most common position in a pipeline** | After query input, before a [`ChatPromptBuilder`](../builders/chatpromptbuilder.mdx) in RAG pipelines | | **Mandatory init variables** | `retrievers`: A dictionary mapping names to text retrievers (implementing the `TextRetriever` protocol) | +| **Optional init variables** | `join_mode`: `"reciprocal_rank_fusion"` (default) or `"concatenate"` | | **Mandatory run variables** | `query`: A query string | -| **Output variables** | `documents`: A deduplicated list of retrieved documents | +| **Output variables** | `documents`: A merged list of retrieved documents | | **API reference** | [Retrievers](/reference/retrievers-api) | | **GitHub link** | https://github.com/deepset-ai/haystack/blob/main/haystack/components/retrievers/multi_retriever.py | | **Package name** | `haystack-ai` | @@ -31,20 +32,27 @@ Runs multiple text retrievers in parallel and combines their deduplicated result ## Overview -`MultiRetriever` composes any number of text retrievers into a single component. All retrievers are queried in parallel using a thread pool, and their results are deduplicated before being returned. +`MultiRetriever` composes any number of text retrievers into a single component. All retrievers are queried in parallel using a thread pool, and their results are merged before being returned. The component: - Queries all retrievers concurrently for better performance -- Automatically deduplicates results across retrievers +- Merges results across retrievers using the configured `join_mode` - Supports selectively enabling retrievers at runtime via `active_retrievers` All retrievers passed to `MultiRetriever` must implement the `TextRetriever` protocol — their `run` method must accept a text `query`, `filters`, and `top_k`. Use [`TextEmbeddingRetriever`](textembeddingretriever.mdx) to wrap an embedding-based retriever so it can be used with this component. +### Join modes + +The `join_mode` parameter controls how results from multiple retrievers are merged: + +- **`reciprocal_rank_fusion`** (default): Assigns scores based on each document's rank across retrieval lists using the [Reciprocal Rank Fusion](https://plg.uwaterloo.ca/~gvcormac/cormacksigir09-rrf.pdf) algorithm. Documents appearing highly ranked in multiple lists receive higher scores. Results are deduplicated and returned in descending score order. This is the recommended mode when combining retrievers with incomparable scores, such as BM25 and embedding retrievers. +- **`concatenate`**: Combines all results into a single list and deduplicates. + ## Usage ### On its own -This example sets up a `MultiRetriever` combining a BM25 retriever and an embedding-based retriever (wrapped with `TextEmbeddingRetriever`). Both are queried in parallel and the deduplicated results are returned. +This example sets up a `MultiRetriever` combining a BM25 retriever and an embedding-based retriever (wrapped with `TextEmbeddingRetriever`). Both are queried in parallel and the results are merged using reciprocal rank fusion. ```python from haystack import Document From 978aab11726abcde755ad608a7a0227b0bdc7ec9 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Wed, 29 Apr 2026 16:33:45 +0200 Subject: [PATCH 4/5] fix tests --- test/components/retrievers/test_multi_retriever.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/components/retrievers/test_multi_retriever.py b/test/components/retrievers/test_multi_retriever.py index 7f45e8fd3e..6dcd2518c4 100644 --- a/test/components/retrievers/test_multi_retriever.py +++ b/test/components/retrievers/test_multi_retriever.py @@ -328,7 +328,8 @@ def test_run_with_active_retrievers_integration(self, del_hf_env_vars, bm25_retr retriever = MultiRetriever(retrievers={"bm25": bm25_retriever, "embedding": embedding_retriever}) result_bm25_active = retriever.run(query="energy", active_retrievers=["bm25"]) result_bm25 = bm25_retriever.run(query="energy") - assert result_bm25_active == result_bm25 + # Scores differ because MultiRetriever applies join_mode processing (e.g. RRF) even for a single retriever. + assert [doc.id for doc in result_bm25_active["documents"]] == [doc.id for doc in result_bm25["documents"]] class TestMultiRetrieverAsync: @@ -470,7 +471,8 @@ async def test_run_async_with_active_retrievers_integration( retriever = MultiRetriever(retrievers={"bm25": bm25_retriever, "embedding": embedding_retriever}) result_bm25_active = await retriever.run_async(query="energy", active_retrievers=["bm25"]) result_bm25 = await bm25_retriever.run_async(query="energy") - assert result_bm25_active == result_bm25 + # Scores differ because MultiRetriever applies join_mode processing (e.g. RRF) even for a single retriever. + assert [doc.id for doc in result_bm25_active["documents"]] == [doc.id for doc in result_bm25["documents"]] class TestMultiRetrieverExperimental: From 08002123f367e06acf401acc2a1c5f7dfebe6a00 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Thu, 30 Apr 2026 13:06:57 +0200 Subject: [PATCH 5/5] Prevent name clash --- haystack/components/joiners/document_joiner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/haystack/components/joiners/document_joiner.py b/haystack/components/joiners/document_joiner.py index 3c31f8ba14..8a77b8d3d0 100644 --- a/haystack/components/joiners/document_joiner.py +++ b/haystack/components/joiners/document_joiner.py @@ -118,7 +118,7 @@ def __init__( join_mode_functions = { JoinMode.CONCATENATE: DocumentJoiner._concatenate, JoinMode.MERGE: self._merge, - JoinMode.RECIPROCAL_RANK_FUSION: self._reciprocal_rank_fusion, + JoinMode.RECIPROCAL_RANK_FUSION: self._rrf, JoinMode.DISTRIBUTION_BASED_RANK_FUSION: DocumentJoiner._distribution_based_rank_fusion, } self.join_mode_function = join_mode_functions[join_mode] @@ -194,7 +194,7 @@ def _merge(self, document_lists: list[list[Document]]) -> list[Document]: return [replace(doc, score=scores_map[doc.id]) for doc in documents_map.values()] - def _reciprocal_rank_fusion(self, document_lists: list[list[Document]]) -> list[Document]: + def _rrf(self, document_lists: list[list[Document]]) -> list[Document]: """ Merge multiple lists of Documents and assign scores based on reciprocal rank fusion. """