Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
title: "MultiRetriever"
id: multiretriever
slug: "/multiretriever"
description: "Runs multiple text retrievers in parallel and combines their deduplicated results."
description: "Runs multiple text retrievers in parallel and combines their results using reciprocal rank fusion or deduplication."
---

# MultiRetriever

Runs multiple text retrievers in parallel and combines their deduplicated results.
Runs multiple text retrievers in parallel and combines their results using reciprocal rank fusion or deduplication.

:::warning[Experimental]

Expand All @@ -21,8 +21,9 @@ Runs multiple text retrievers in parallel and combines their deduplicated result
| --- | --- |
| **Most common position in a pipeline** | After query input, before a [`ChatPromptBuilder`](../builders/chatpromptbuilder.mdx) in RAG pipelines |
| **Mandatory init variables** | `retrievers`: A dictionary mapping names to text retrievers (implementing the `TextRetriever` protocol) |
| **Optional init variables** | `join_mode`: `"reciprocal_rank_fusion"` (default) or `"concatenate"` |
| **Mandatory run variables** | `query`: A query string |
| **Output variables** | `documents`: A deduplicated list of retrieved documents |
| **Output variables** | `documents`: A merged list of retrieved documents |
| **API reference** | [Retrievers](/reference/retrievers-api) |
| **GitHub link** | https://github.com/deepset-ai/haystack/blob/main/haystack/components/retrievers/multi_retriever.py |
| **Package name** | `haystack-ai` |
Expand All @@ -31,20 +32,27 @@ Runs multiple text retrievers in parallel and combines their deduplicated result

## Overview

`MultiRetriever` composes any number of text retrievers into a single component. All retrievers are queried in parallel using a thread pool, and their results are deduplicated before being returned.
`MultiRetriever` composes any number of text retrievers into a single component. All retrievers are queried in parallel using a thread pool, and their results are merged before being returned.

The component:
- Queries all retrievers concurrently for better performance
- Automatically deduplicates results across retrievers
- Merges results across retrievers using the configured `join_mode`
- Supports selectively enabling retrievers at runtime via `active_retrievers`

All retrievers passed to `MultiRetriever` must implement the `TextRetriever` protocol — their `run` method must accept a text `query`, `filters`, and `top_k`. Use [`TextEmbeddingRetriever`](textembeddingretriever.mdx) to wrap an embedding-based retriever so it can be used with this component.

### Join modes

The `join_mode` parameter controls how results from multiple retrievers are merged:

- **`reciprocal_rank_fusion`** (default): Assigns scores based on each document's rank across retrieval lists using the [Reciprocal Rank Fusion](https://plg.uwaterloo.ca/~gvcormac/cormacksigir09-rrf.pdf) algorithm. Documents appearing highly ranked in multiple lists receive higher scores. Results are deduplicated and returned in descending score order. This is the recommended mode when combining retrievers with incomparable scores, such as BM25 and embedding retrievers.
- **`concatenate`**: Combines all results into a single list and deduplicates.

## Usage

### On its own

This example sets up a `MultiRetriever` combining a BM25 retriever and an embedding-based retriever (wrapped with `TextEmbeddingRetriever`). Both are queried in parallel and the deduplicated results are returned.
This example sets up a `MultiRetriever` combining a BM25 retriever and an embedding-based retriever (wrapped with `TextEmbeddingRetriever`). Both are queried in parallel and the results are merged using reciprocal rank fusion.

```python
from haystack import Document
Expand Down
31 changes: 4 additions & 27 deletions haystack/components/joiners/document_joiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from haystack import Document, component, default_from_dict, default_to_dict, logging
from haystack.core.component.types import Variadic
from haystack.utils.misc import _reciprocal_rank_fusion

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -117,7 +118,7 @@ def __init__(
join_mode_functions = {
JoinMode.CONCATENATE: DocumentJoiner._concatenate,
JoinMode.MERGE: self._merge,
JoinMode.RECIPROCAL_RANK_FUSION: self._reciprocal_rank_fusion,
JoinMode.RECIPROCAL_RANK_FUSION: self._rrf,
JoinMode.DISTRIBUTION_BASED_RANK_FUSION: DocumentJoiner._distribution_based_rank_fusion,
}
self.join_mode_function = join_mode_functions[join_mode]
Expand Down Expand Up @@ -193,35 +194,11 @@ def _merge(self, document_lists: list[list[Document]]) -> list[Document]:

return [replace(doc, score=scores_map[doc.id]) for doc in documents_map.values()]

def _reciprocal_rank_fusion(self, document_lists: list[list[Document]]) -> list[Document]:
def _rrf(self, document_lists: list[list[Document]]) -> list[Document]:
"""
Merge multiple lists of Documents and assign scores based on reciprocal rank fusion.

The constant k is set to 61 (60 was suggested by the original paper,
plus 1 as python lists are 0-based and the paper used 1-based ranking).
"""
# This check prevents a division by zero when no documents are passed
if not document_lists:
return []

k = 61

scores_map: dict = defaultdict(int)
documents_map = {}
weights = self.weights if self.weights else [1 / len(document_lists)] * len(document_lists)

# Calculate weighted reciprocal rank fusion score
for documents, weight in zip(document_lists, weights, strict=True):
for rank, doc in enumerate(documents):
scores_map[doc.id] += (weight * len(document_lists)) / (k + rank)
documents_map[doc.id] = doc

# Normalize scores. Note: len(results) / k is the maximum possible score,
# achieved by being ranked first in all doc lists with non-zero weight.
for _id in scores_map:
scores_map[_id] /= len(document_lists) / k

return [replace(doc, score=scores_map[doc.id]) for doc in documents_map.values()]
return _reciprocal_rank_fusion(document_lists, weights=self.weights)

@staticmethod
def _distribution_based_rank_fusion(document_lists: list[list[Document]]) -> list[Document]:
Expand Down
40 changes: 28 additions & 12 deletions haystack/components/retrievers/multi_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@

import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any
from math import inf
from typing import Any, Literal

from haystack import component, default_from_dict, default_to_dict
from haystack.components.retrievers.types.protocol import TextRetriever
from haystack.core.serialization import component_from_dict, component_to_dict, import_class_by_name
from haystack.dataclasses import Document
from haystack.utils.experimental import _experimental
from haystack.utils.misc import _deduplicate_documents
from haystack.utils.misc import _deduplicate_documents, _reciprocal_rank_fusion


@_experimental
Expand Down Expand Up @@ -82,6 +83,7 @@ def __init__(
filters: dict[str, Any] | None = None,
top_k: int = 10,
max_workers: int = 4,
join_mode: Literal["concatenate", "reciprocal_rank_fusion"] = "reciprocal_rank_fusion",
) -> None:
"""
Create the MultiRetriever component.
Expand All @@ -95,13 +97,30 @@ def __init__(
The maximum number of documents to return per retriever.
:param max_workers:
The maximum number of threads to use for parallel retrieval.
:param join_mode:
How to merge results from multiple retrievers. Available modes:
- `concatenate`: Combines all results into a single list and deduplicates.
- `reciprocal_rank_fusion`: Deduplicates and assigns scores based on reciprocal rank fusion.
"""
self.retrievers = retrievers
self.filters = filters
self.top_k = top_k
self.max_workers = max_workers
self.join_mode = join_mode
self._is_warmed_up = False

def _merge_results(self, document_lists: list[list[Document]]) -> list[Document]:
"""
Merge per-retriever result lists according to `join_mode`.

In `concatenate` mode, all lists are flattened and deduplicated. In `reciprocal_rank_fusion` mode, results
are deduplicated and re-scored using RRF, then returned in descending score order.
"""
if self.join_mode == "reciprocal_rank_fusion":
documents = _reciprocal_rank_fusion(document_lists)
return sorted(documents, key=lambda d: d.score if d.score is not None else -inf, reverse=True)
return _deduplicate_documents([doc for docs in document_lists for doc in docs])

def _resolve_retrievers(self, active_retrievers: list[str] | None) -> dict[str, TextRetriever]:
"""
Returns the subset of retrievers to run based on the active_retrievers list.
Expand Down Expand Up @@ -171,7 +190,7 @@ def run(

retrievers_to_run = self._resolve_retrievers(active_retrievers)

all_documents: list[Document] = []
results_by_name: dict[str, list[Document]] = {}
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_name = {
executor.submit(retriever.run, query=query, filters=resolved_filters, top_k=resolved_top_k): name
Expand All @@ -180,11 +199,12 @@ def run(
for future in as_completed(future_to_name):
name = future_to_name[future]
try:
all_documents.extend(future.result().get("documents", []))
results_by_name[name] = future.result().get("documents", [])
except Exception as e:
raise RuntimeError(f"Retriever '{name}' failed: {e}") from e

return {"documents": _deduplicate_documents(all_documents)}
document_lists = [results_by_name[name] for name in retrievers_to_run]
return {"documents": self._merge_results(document_lists)}

@component.output_types(documents=list[Document])
async def run_async(
Expand Down Expand Up @@ -238,13 +258,8 @@ async def _run_one(name: str, retriever: TextRetriever) -> list[Document]:
except Exception as e:
raise RuntimeError(f"Retriever '{name}' failed: {e}") from e

results = await asyncio.gather(*[_run_one(name, r) for name, r in retrievers_to_run.items()])

all_documents: list[Document] = []
for docs in results:
all_documents.extend(docs)

return {"documents": _deduplicate_documents(all_documents)}
document_lists = list(await asyncio.gather(*[_run_one(name, r) for name, r in retrievers_to_run.items()]))
return {"documents": self._merge_results(document_lists)}

def to_dict(self) -> dict[str, Any]:
"""
Expand All @@ -259,6 +274,7 @@ def to_dict(self) -> dict[str, Any]:
filters=self.filters,
top_k=self.top_k,
max_workers=self.max_workers,
join_mode=self.join_mode,
)

@classmethod
Expand Down
37 changes: 37 additions & 0 deletions haystack/utils/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import json
import mimetypes
import tempfile
from collections import defaultdict
from dataclasses import replace
from math import inf
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, overload
Expand Down Expand Up @@ -145,6 +147,41 @@ def _deduplicate_documents(documents: list["Document"]) -> list["Document"]:
return list(highest_scoring_docs.values())


def _reciprocal_rank_fusion(
document_lists: list[list["Document"]], weights: list[float] | None = None
) -> list["Document"]:
"""
Merge multiple ranked lists of Documents using Reciprocal Rank Fusion, deduplicating across lists.

See the original paper: https://plg.uwaterloo.ca/~gvcormac/cormacksigir09-rrf.pdf

The constant k is set to 61 (60 was suggested by the original paper, plus 1 as python lists are 0-based and the
paper used 1-based ranking).

:param document_lists: A list of ranked document lists to fuse.
:param weights: Optional per-list weights. Defaults to equal weights.
:returns:
Deduplicated list of documents with updated RRF scores.
"""
if not document_lists:
return []

k = 61
scores_map: dict = defaultdict(int)
documents_map: dict = {}
resolved_weights = weights if weights else [1 / len(document_lists)] * len(document_lists)

for documents, weight in zip(document_lists, resolved_weights, strict=True):
for rank, doc in enumerate(documents):
scores_map[doc.id] += (weight * len(document_lists)) / (k + rank)
documents_map[doc.id] = doc

for _id in scores_map:
scores_map[_id] /= len(document_lists) / k

return [replace(doc, score=scores_map[doc.id]) for doc in documents_map.values()]


@overload
def _parse_dict_from_json(
text: str, expected_keys: list[str] | None = ..., raise_on_failure: Literal[True] = ...
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
enhancements:
- |
Add ``join_mode`` parameter to the experimental ``MultiRetriever`` component, supporting
``"reciprocal_rank_fusion"`` (default) and ``"concatenate"``. Reciprocal Rank Fusion merges
the ranked result lists from all retrievers into a single deduplicated list ordered by RRF score.
The underlying RRF logic is extracted into a shared utility ``_reciprocal_rank_fusion`` in
``haystack.utils.misc``, which is now also used by ``DocumentJoiner``.
40 changes: 0 additions & 40 deletions test/components/joiners/test_document_joiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,43 +270,3 @@ def test_output_documents_not_sorted_by_score(self):
documents_2 = [Document(content="d", score=0.2)]
output = joiner.run([documents_1, documents_2])
assert output["documents"] == documents_1 + documents_2

def test_test_score_norm_with_rrf(self):
"""
Verifies reciprocal rank fusion (RRF) of the DocumentJoiner component with various weight configurations.
It creates a set of documents, forms them into two lists, and then applies multiple DocumentJoiner
instances with distinct weights to these lists. The test checks if the resulting
joined documents are correctly sorted in descending order by score, ensuring the RRF ranking works as
expected under different weighting scenarios.
"""
num_docs = 6
docs = []

for i in range(num_docs):
docs.append(Document(content=f"doc{i}"))

docs_2 = [docs[0], docs[4], docs[2], docs[5], docs[1]]
document_lists = [docs, docs_2]

joiner_1 = DocumentJoiner(join_mode="reciprocal_rank_fusion", weights=[0.5, 0.5])

joiner_2 = DocumentJoiner(join_mode="reciprocal_rank_fusion", weights=[7, 7])

joiner_3 = DocumentJoiner(join_mode="reciprocal_rank_fusion", weights=[0.7, 0.3])

joiner_4 = DocumentJoiner(join_mode="reciprocal_rank_fusion", weights=[0.6, 0.4])

joiner_5 = DocumentJoiner(join_mode="reciprocal_rank_fusion", weights=[1, 0])

joiners = [joiner_1, joiner_2, joiner_3, joiner_4, joiner_5]

for joiner in joiners:
join_results = joiner.run(documents=document_lists)
is_sorted = all(
join_results["documents"][i].score >= join_results["documents"][i + 1].score
for i in range(len(join_results["documents"]) - 1)
)

assert is_sorted, (
"Documents are not sorted in descending order by score, there is an issue with rff ranking"
)
Loading
Loading