| title | Joiners |
|---|---|
| id | joiners-api |
| description | Components that join list of different objects |
| slug | /joiners-api |
Bases: Enum
Enum for AnswerJoiner join modes.
from_str(string: str) -> JoinModeConvert a string to a JoinMode enum.
Merges multiple lists of Answer objects into a single list.
Use this component to combine answers from different Generators into a single list.
Currently, the component supports only one join mode: CONCATENATE.
This mode concatenates multiple lists of answers into a single list.
In this example, AnswerJoiner merges answers from two different Generators:
from haystack.components.builders import AnswerBuilder
from haystack.components.joiners import AnswerJoiner
from haystack.core.pipeline import Pipeline
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
query = "What's Natural Language Processing?"
messages = [ChatMessage.from_system("You are a helpful, respectful and honest assistant. Be super concise."),
ChatMessage.from_user(query)]
pipe = Pipeline()
pipe.add_component("llm_1", OpenAIChatGenerator()
pipe.add_component("llm_2", OpenAIChatGenerator()
pipe.add_component("aba", AnswerBuilder())
pipe.add_component("abb", AnswerBuilder())
pipe.add_component("joiner", AnswerJoiner())
pipe.connect("llm_1.replies", "aba")
pipe.connect("llm_2.replies", "abb")
pipe.connect("aba.answers", "joiner")
pipe.connect("abb.answers", "joiner")
results = pipe.run(data={"llm_1": {"messages": messages},
"llm_2": {"messages": messages},
"aba": {"query": query},
"abb": {"query": query}})__init__(
join_mode: str | JoinMode = JoinMode.CONCATENATE,
top_k: int | None = None,
sort_by_score: bool = False,
) -> NoneCreates an AnswerJoiner component.
Parameters:
- join_mode (
str | JoinMode) – Specifies the join mode to use. Available modes: concatenate: Concatenates multiple lists of Answers into a single list.- top_k (
int | None) – The maximum number of Answers to return. - sort_by_score (
bool) – IfTrue, sorts the documents by score in descending order. If a document has no score, it is handled as if its score is -infinity.
run(
answers: Variadic[list[AnswerType]], top_k: int | None = None
) -> dict[str, Any]Joins multiple lists of Answers into a single list depending on the join_mode parameter.
Parameters:
- answers (
Variadic[list[AnswerType]]) – Nested list of Answers to be merged. - top_k (
int | None) – The maximum number of Answers to return. Overrides the instance'stop_kif provided.
Returns:
dict[str, Any]– A dictionary with the following keys:answers: Merged list of Answers
to_dict() -> dict[str, Any]Serializes the component to a dictionary.
Returns:
dict[str, Any]– Dictionary with serialized data.
from_dict(data: dict[str, Any]) -> AnswerJoinerDeserializes the component from a dictionary.
Parameters:
- data (
dict[str, Any]) – The dictionary to deserialize from.
Returns:
AnswerJoiner– The deserialized component.
A component that merges multiple input branches of a pipeline into a single output stream.
BranchJoiner receives multiple inputs of the same data type and forwards the first received value
to its output. This is useful for scenarios where multiple branches need to converge before proceeding.
-
Loop Handling:
BranchJoinerhelps close loops in pipelines. For example, if a pipeline component validates or modifies incoming data and produces an error-handling branch,BranchJoinercan merge both branches and send (or resend in the case of a loop) the data to the component that evaluates errors. See "Usage example" below. -
Decision-Based Merging:
BranchJoinerreconciles branches coming from Router components (such asConditionalRouter,TextLanguageRouter). Suppose aTextLanguageRouterdirects user queries to different Retrievers based on the detected language. Each Retriever processes its assigned query and passes the results toBranchJoiner, which consolidates them into a single output before passing them to the next component, such as aPromptBuilder.
import json
from haystack import Pipeline
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.joiners import BranchJoiner
from haystack.components.validators import JsonSchemaValidator
from haystack.dataclasses import ChatMessage
# Define a schema for validation
person_schema = {
"type": "object",
"properties": {
"first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
"last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
"nationality": {"type": "string", "enum": ["Italian", "Portuguese", "American"]},
},
"required": ["first_name", "last_name", "nationality"]
}
# Initialize a pipeline
pipe = Pipeline()
# Add components to the pipeline
pipe.add_component("joiner", BranchJoiner(list[ChatMessage]))
pipe.add_component("generator", OpenAIChatGenerator(model="gpt-4.1-mini"))
pipe.add_component("validator", JsonSchemaValidator(json_schema=person_schema))
# And connect them
pipe.connect("joiner", "generator")
pipe.connect("generator.replies", "validator.messages")
pipe.connect("validator.validation_error", "joiner")
result = pipe.run(
data={
"generator": {"generation_kwargs": {"response_format": {"type": "json_object"}}},
"joiner": {"value": [ChatMessage.from_user("Create json from Peter Parker")]}}
)
print(json.loads(result["validator"]["validated"][0].text))
>> {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation':
>> 'Superhero', 'age': 23, 'location': 'New York City'}Note that BranchJoiner can manage only one data type at a time. In this case, BranchJoiner is created for
passing list[ChatMessage]. This determines the type of data that BranchJoiner will receive from the upstream
connected components and also the type of data that BranchJoiner will send through its output.
In the code example, BranchJoiner receives a looped back list[ChatMessage] from the JsonSchemaValidator and
sends it down to the OpenAIChatGenerator for re-generation. We can have multiple loopback connections in the
pipeline. In this instance, the downstream component is only one (the OpenAIChatGenerator), but the pipeline could
have more than one downstream component.
__init__(type_: type) -> NoneCreates a BranchJoiner component.
Parameters:
- type_ (
type) – The expected data type of inputs and outputs.
to_dict() -> dict[str, Any]Serializes the component into a dictionary.
Returns:
dict[str, Any]– Dictionary with serialized data.
from_dict(data: dict[str, Any]) -> BranchJoinerDeserializes a BranchJoiner instance from a dictionary.
Parameters:
- data (
dict[str, Any]) – The dictionary containing serialized component data.
Returns:
BranchJoiner– A deserializedBranchJoinerinstance.
run(**kwargs: Any) -> dict[str, Any]Executes the BranchJoiner, selecting the first available input value and passing it downstream.
Parameters:
- **kwargs (
Any) – The input data. Must be of the type declared bytype_during initialization.
Returns:
dict[str, Any]– A dictionary with a single keyvalue, containing the first input received.
Bases: Enum
Enum for join mode.
from_str(string: str) -> JoinModeConvert a string to a JoinMode enum.
Joins multiple lists of documents into a single list.
It supports different join modes:
- concatenate: Keeps the highest-scored document in case of duplicates.
- merge: Calculates a weighted sum of scores for duplicates and merges them.
- reciprocal_rank_fusion: Merges and assigns scores based on reciprocal rank fusion.
- distribution_based_rank_fusion: Merges and assigns scores based on scores distribution in each Retriever.
from haystack import Pipeline, Document
from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder
from haystack.components.joiners import DocumentJoiner
from haystack.components.retrievers import InMemoryBM25Retriever
from haystack.components.retrievers import InMemoryEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
document_store = InMemoryDocumentStore()
docs = [Document(content="Paris"), Document(content="Berlin"), Document(content="London")]
embedder = SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
docs_embeddings = embedder.run(docs)
document_store.write_documents(docs_embeddings['documents'])
p = Pipeline()
p.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="bm25_retriever")
p.add_component(
instance=SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2"),
name="text_embedder",
)
p.add_component(instance=InMemoryEmbeddingRetriever(document_store=document_store), name="embedding_retriever")
p.add_component(instance=DocumentJoiner(), name="joiner")
p.connect("bm25_retriever", "joiner")
p.connect("embedding_retriever", "joiner")
p.connect("text_embedder", "embedding_retriever")
query = "What is the capital of France?"
p.run(data={"query": query, "text": query, "top_k": 1})__init__(
join_mode: str | JoinMode = JoinMode.CONCATENATE,
weights: list[float] | None = None,
top_k: int | None = None,
sort_by_score: bool = True,
) -> NoneCreates a DocumentJoiner component.
Parameters:
- join_mode (
str | JoinMode) – Specifies the join mode to use. Available modes: concatenate: Keeps the highest-scored document in case of duplicates.merge: Calculates a weighted sum of scores for duplicates and merges them.reciprocal_rank_fusion: Merges and assigns scores based on reciprocal rank fusion.distribution_based_rank_fusion: Merges and assigns scores based on scores distribution in each Retriever.- weights (
list[float] | None) – Assign importance to each list of documents to influence how they're joined. This parameter is ignored forconcatenateordistribution_based_rank_fusionjoin modes. Weight for each list of documents must match the number of inputs. - top_k (
int | None) – The maximum number of documents to return. - sort_by_score (
bool) – IfTrue, sorts the documents by score in descending order. If a document has no score, it is handled as if its score is -infinity.
run(
documents: Variadic[list[Document]], top_k: int | None = None
) -> dict[str, Any]Joins multiple lists of Documents into a single list depending on the join_mode parameter.
Parameters:
- documents (
Variadic[list[Document]]) – List of list of documents to be merged. - top_k (
int | None) – The maximum number of documents to return. Overrides the instance'stop_kif provided.
Returns:
dict[str, Any]– A dictionary with the following keys:documents: Merged list of Documents
to_dict() -> dict[str, Any]Serializes the component to a dictionary.
Returns:
dict[str, Any]– Dictionary with serialized data.
from_dict(data: dict[str, Any]) -> DocumentJoinerDeserializes the component from a dictionary.
Parameters:
- data (
dict[str, Any]) – The dictionary to deserialize from.
Returns:
DocumentJoiner– The deserialized component.
A component that joins multiple lists into a single flat list.
The ListJoiner receives multiple lists of the same type and concatenates them into a single flat list. The output order respects the pipeline's execution sequence, with earlier inputs being added first.
Usage example:
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack import Pipeline
from haystack.components.joiners import ListJoiner
user_message = [ChatMessage.from_user("Give a brief answer the following question: {{query}}")]
feedback_prompt = """
You are given a question and an answer.
Your task is to provide a score and a brief feedback on the answer.
Question: {{query}}
Answer: {{response}}
"""
feedback_message = [ChatMessage.from_system(feedback_prompt)]
prompt_builder = ChatPromptBuilder(template=user_message)
feedback_prompt_builder = ChatPromptBuilder(template=feedback_message)
llm = OpenAIChatGenerator()
feedback_llm = OpenAIChatGenerator()
pipe = Pipeline()
pipe.add_component("prompt_builder", prompt_builder)
pipe.add_component("llm", llm)
pipe.add_component("feedback_prompt_builder", feedback_prompt_builder)
pipe.add_component("feedback_llm", feedback_llm)
pipe.add_component("list_joiner", ListJoiner(list[ChatMessage]))
pipe.connect("prompt_builder.prompt", "llm.messages")
pipe.connect("prompt_builder.prompt", "list_joiner")
pipe.connect("llm.replies", "list_joiner")
pipe.connect("llm.replies", "feedback_prompt_builder.response")
pipe.connect("feedback_prompt_builder.prompt", "feedback_llm.messages")
pipe.connect("feedback_llm.replies", "list_joiner")
query = "What is nuclear physics?"
ans = pipe.run(data={"prompt_builder": {"template_variables":{"query": query}},
"feedback_prompt_builder": {"template_variables":{"query": query}}})
print(ans["list_joiner"]["values"])__init__(list_type_: type | None = None) -> NoneCreates a ListJoiner component.
Parameters:
- list_type_ (
type | None) – The expected type of the lists this component will join (e.g., list[ChatMessage]). If specified, all input lists must conform to this type. If None, the component defaults to handling lists of any type including mixed types.
to_dict() -> dict[str, Any]Serializes the component to a dictionary.
Returns:
dict[str, Any]– Dictionary with serialized data.
from_dict(data: dict[str, Any]) -> ListJoinerDeserializes the component from a dictionary.
Parameters:
- data (
dict[str, Any]) – Dictionary to deserialize from.
Returns:
ListJoiner– Deserialized component.
run(values: Variadic[list[Any]]) -> dict[str, list[Any]]Joins multiple lists into a single flat list.
Parameters:
- values (
Variadic[list[Any]]) – The list to be joined.
Returns:
dict[str, list[Any]]– Dictionary with 'values' key containing the joined list.
Component to join strings from different components to a list of strings.
from haystack.components.joiners import StringJoiner
from haystack.components.builders import PromptBuilder
from haystack.core.pipeline import Pipeline
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
string_1 = "What's Natural Language Processing?"
string_2 = "What is life?"
pipeline = Pipeline()
pipeline.add_component("prompt_builder_1", PromptBuilder("Builder 1: {{query}}"))
pipeline.add_component("prompt_builder_2", PromptBuilder("Builder 2: {{query}}"))
pipeline.add_component("string_joiner", StringJoiner())
pipeline.connect("prompt_builder_1.prompt", "string_joiner.strings")
pipeline.connect("prompt_builder_2.prompt", "string_joiner.strings")
print(pipeline.run(data={"prompt_builder_1": {"query": string_1}, "prompt_builder_2": {"query": string_2}}))
>> {"string_joiner": {"strings": ["Builder 1: What's Natural Language Processing?", "Builder 2: What is life?"]}}run(strings: Variadic[str]) -> dict[str, list[str]]Joins strings into a list of strings
Parameters:
- strings (
Variadic[str]) – strings from different components
Returns:
dict[str, list[str]]– A dictionary with the following keys:strings: Merged list of strings