| title | MCP |
|---|---|
| id | integrations-mcp |
| description | MCP integration for Haystack |
| slug | /integrations-mcp |
Thread-safe event loop executor for running async code from sync contexts.
get_instance() -> AsyncExecutorGet or create the global singleton executor instance.
__init__() -> NoneInitialize a dedicated event loop
run(coro: Coroutine[Any, Any, Any], timeout: float | None = None) -> AnyRun a coroutine in the event loop.
Parameters:
- coro (
Coroutine[Any, Any, Any]) – Coroutine to execute - timeout (
float | None) – Optional timeout in seconds
Returns:
Any– Result of the coroutine
Raises:
TimeoutError– If execution exceeds timeout
get_loop() -> asyncio.AbstractEventLoopGet the event loop.
Returns:
AbstractEventLoop– The event loop
run_background(
coro_factory: Callable[[asyncio.Event], Coroutine[Any, Any, Any]],
timeout: float | None = None,
) -> tuple[concurrent.futures.Future[Any], asyncio.Event]Schedule coro_factory to run in the executor's event loop without blocking the caller thread.
The factory receives an :class:asyncio.Event that can be used to cooperatively shut
the coroutine down. The method returns both the concurrent future (to observe
completion or failure) and the created stop_event so that callers can signal termination.
Parameters:
- coro_factory (
Callable\[[Event], Coroutine[Any, Any, Any]]) – A callable receiving the stop_event and returning the coroutine to execute. - timeout (
float | None) – Optional timeout while waiting for the stop_event to be created.
Returns:
tuple[Future[Any], Event]– Tuple(future, stop_event).
shutdown(timeout: float = 2) -> NoneShut down the background event loop and thread.
Parameters:
- timeout (
float) – Timeout in seconds for shutting down the event loop
Bases: Exception
Base class for MCP-related errors.
__init__(message: str) -> NoneInitialize the MCPError.
Parameters:
- message (
str) – Descriptive error message
Bases: MCPError
Error connecting to MCP server.
__init__(
message: str,
server_info: MCPServerInfo | None = None,
operation: str | None = None,
) -> NoneInitialize the MCPConnectionError.
Parameters:
- message (
str) – Descriptive error message - server_info (
MCPServerInfo | None) – Server connection information that was used - operation (
str | None) – Name of the operation that was being attempted
Bases: MCPError
Error when a tool is not found on the server.
__init__(
message: str, tool_name: str, available_tools: list[str] | None = None
) -> NoneInitialize the MCPToolNotFoundError.
Parameters:
- message (
str) – Descriptive error message - tool_name (
str) – Name of the tool that was requested but not found - available_tools (
list[str] | None) – List of available tool names, if known
Bases: ToolInvocationError
Error during tool invocation.
__init__(
message: str, tool_name: str, tool_args: dict[str, Any] | None = None
) -> NoneInitialize the MCPInvocationError.
Parameters:
- message (
str) – Descriptive error message - tool_name (
str) – Name of the tool that was being invoked - tool_args (
dict[str, Any] | None) – Arguments that were passed to the tool
Bases: ABC
Abstract base class for MCP clients.
This class defines the common interface and shared functionality for all MCP clients, regardless of the transport mechanism used.
connect() -> list[types.Tool]Connect to an MCP server.
Returns:
list[Tool]– List of available tools on the server
Raises:
MCPConnectionError– If connection to the server fails
call_tool(tool_name: str, tool_args: dict[str, Any]) -> strCall a tool on the connected MCP server.
Parameters:
- tool_name (
str) – Name of the tool to call - tool_args (
dict[str, Any]) – Arguments to pass to the tool
Returns:
str– JSON string representation of the tool invocation result
Raises:
MCPConnectionError– If not connected to an MCP serverMCPInvocationError– If the tool invocation fails
aclose() -> NoneClose the connection and clean up resources.
This method ensures all resources are properly released, even if errors occur.
Bases: MCPClient
MCP client that connects to servers using stdio transport.
__init__(
command: str,
args: list[str] | None = None,
env: dict[str, str | Secret] | None = None,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
) -> NoneInitialize a stdio MCP client.
Parameters:
- command (
str) – Command to run (e.g., "python", "node") - args (
list[str] | None) – Arguments to pass to the command - env (
dict[str, str | Secret] | None) – Environment variables for the command - max_retries (
int) – Maximum number of reconnection attempts - base_delay (
float) – Base delay for exponential backoff in seconds
connect() -> list[types.Tool]Connect to an MCP server using stdio transport.
Returns:
list[Tool]– List of available tools on the server
Raises:
MCPConnectionError– If connection to the server fails
Bases: MCPClient
MCP client that connects to servers using SSE transport.
__init__(
server_info: SSEServerInfo,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
) -> NoneInitialize an SSE MCP client using server configuration.
Parameters:
- server_info (
SSEServerInfo) – Configuration object containing URL, token, timeout, etc. - max_retries (
int) – Maximum number of reconnection attempts - base_delay (
float) – Base delay for exponential backoff in seconds
connect() -> list[types.Tool]Connect to an MCP server using SSE transport.
Note: If both custom headers and token are provided, custom headers take precedence.
Returns:
list[Tool]– List of available tools on the server
Raises:
MCPConnectionError– If connection to the server fails
Bases: MCPClient
MCP client that connects to servers using streamable HTTP transport.
__init__(
server_info: StreamableHttpServerInfo,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
) -> NoneInitialize a streamable HTTP MCP client using server configuration.
Parameters:
- server_info (
StreamableHttpServerInfo) – Configuration object containing URL, token, timeout, etc. - max_retries (
int) – Maximum number of reconnection attempts - base_delay (
float) – Base delay for exponential backoff in seconds
connect() -> list[types.Tool]Connect to an MCP server using streamable HTTP transport.
Note: If both custom headers and token are provided, custom headers take precedence.
Returns:
list[Tool]– List of available tools on the server
Raises:
MCPConnectionError– If connection to the server fails
Bases: ABC
Abstract base class for MCP server connection parameters.
This class defines the common interface for all MCP server connection types.
create_client() -> MCPClientCreate an appropriate MCP client for this server info.
Returns:
MCPClient– An instance of MCPClient configured with this server info
to_dict() -> dict[str, Any]Serialize this server info to a dictionary.
Returns:
dict[str, Any]– Dictionary representation of this server info
from_dict(data: dict[str, Any]) -> MCPServerInfoDeserialize server info from a dictionary.
Parameters:
- data (
dict[str, Any]) – Dictionary containing serialized server info
Returns:
MCPServerInfo– Instance of the appropriate server info class
Bases: MCPServerInfo
Data class that encapsulates SSE MCP server connection parameters.
For authentication tokens containing sensitive data, you can use Secret objects for secure handling and serialization:
server_info = SSEServerInfo(
url="https://my-mcp-server.com",
token=Secret.from_env_var("API_KEY"),
)For custom headers (e.g., non-standard authentication):
# Single custom header with Secret
server_info = SSEServerInfo(
url="https://my-mcp-server.com",
headers={"X-API-Key": Secret.from_env_var("API_KEY")},
)
# Multiple headers (mix of Secret and plain strings)
server_info = SSEServerInfo(
url="https://my-mcp-server.com",
headers={
"X-API-Key": Secret.from_env_var("API_KEY"),
"X-Client-ID": "my-client-id",
},
)Parameters:
- url (
str | None) – Full URL of the MCP server (including /sse endpoint) - base_url (
str | None) – Base URL of the MCP server (deprecated, use url instead) - token (
str | Secret | None) – Authentication token for the server (optional, generates "Authorization: Bearer<token>" header) - headers (
dict[str, str | Secret] | None) – Custom HTTP headers (optional, takes precedence over token parameter if provided) - timeout (
int) – Connection timeout in seconds
create_client() -> MCPClientCreate an SSE MCP client.
Returns:
MCPClient– Configured MCPClient instance
Bases: MCPServerInfo
Data class that encapsulates streamable HTTP MCP server connection parameters.
For authentication tokens containing sensitive data, you can use Secret objects for secure handling and serialization:
server_info = StreamableHttpServerInfo(
url="https://my-mcp-server.com",
token=Secret.from_env_var("API_KEY"),
)For custom headers (e.g., non-standard authentication):
# Single custom header with Secret
server_info = StreamableHttpServerInfo(
url="https://my-mcp-server.com",
headers={"X-API-Key": Secret.from_env_var("API_KEY")},
)
# Multiple headers (mix of Secret and plain strings)
server_info = StreamableHttpServerInfo(
url="https://my-mcp-server.com",
headers={
"X-API-Key": Secret.from_env_var("API_KEY"),
"X-Client-ID": "my-client-id",
},
)Parameters:
- url (
str) – Full URL of the MCP server (streamable HTTP endpoint) - token (
str | Secret | None) – Authentication token for the server (optional, generates "Authorization: Bearer<token>" header) - headers (
dict[str, str | Secret] | None) – Custom HTTP headers (optional, takes precedence over token parameter if provided) - timeout (
int) – Connection timeout in seconds
create_client() -> MCPClientCreate a streamable HTTP MCP client.
Returns:
MCPClient– Configured StreamableHttpClient instance
Bases: MCPServerInfo
Data class that encapsulates stdio MCP server connection parameters.
Parameters:
- command (
str) – Command to run (e.g., "python", "node") - args (
list[str] | None) – Arguments to pass to the command - env (
dict[str, str | Secret] | None) – Environment variables for the command
For environment variables containing sensitive data, you can use Secret objects for secure handling and serialization:
server_info = StdioServerInfo(
command="uv",
args=["run", "my-mcp-server"],
env={
"WORKSPACE_PATH": "/path/to/workspace", # Plain string
"API_KEY": Secret.from_env_var("API_KEY"), # Secret object
}
)Secret objects will be properly serialized and deserialized without exposing the secret value, while plain strings will be preserved as-is. Use Secret objects for sensitive data that needs to be handled securely.
create_client() -> MCPClientCreate a stdio MCP client.
Returns:
MCPClient– Configured StdioMCPClient instance
Bases: Tool
A Tool that represents a single tool from an MCP server.
This implementation uses the official MCP SDK for protocol handling while maintaining compatibility with the Haystack tool ecosystem.
Response handling:
- Text and image content are supported and returned as JSON strings
- The JSON contains the structured response from the MCP server
- Use json.loads() to parse the response into a dictionary
State-mapping support:
- MCPTool supports state-mapping parameters (
outputs_to_string,inputs_from_state,outputs_to_state) - These enable integration with Agent state for automatic parameter injection and output handling
- See the
__init__method documentation for details on each parameter
Example using Streamable HTTP:
import json
from haystack_integrations.tools.mcp import MCPTool, StreamableHttpServerInfo
# Create tool instance
tool = MCPTool(
name="multiply",
server_info=StreamableHttpServerInfo(url="http://localhost:8000/mcp")
)
# Use the tool and parse result
result_json = tool.invoke(a=5, b=3)
result = json.loads(result_json)Example using SSE (deprecated):
import json
from haystack.tools import MCPTool, SSEServerInfo
# Create tool instance
tool = MCPTool(
name="add",
server_info=SSEServerInfo(url="http://localhost:8000/sse")
)
# Use the tool and parse result
result_json = tool.invoke(a=5, b=3)
result = json.loads(result_json)Example using stdio:
import json
from haystack.tools import MCPTool, StdioServerInfo
# Create tool instance
tool = MCPTool(
name="get_current_time",
server_info=StdioServerInfo(command="python", args=["path/to/server.py"])
)
# Use the tool and parse result
result_json = tool.invoke(timezone="America/New_York")
result = json.loads(result_json)__init__(
name: str,
server_info: MCPServerInfo,
description: str | None = None,
connection_timeout: int = 30,
invocation_timeout: int = 30,
eager_connect: bool = False,
outputs_to_string: dict[str, Any] | None = None,
inputs_from_state: dict[str, str] | None = None,
outputs_to_state: dict[str, dict[str, Any]] | None = None,
) -> NoneInitialize the MCP tool.
Parameters:
- name (
str) – Name of the tool to use - server_info (
MCPServerInfo) – Server connection information - description (
str | None) – Custom description (if None, server description will be used) - connection_timeout (
int) – Timeout in seconds for server connection - invocation_timeout (
int) – Default timeout in seconds for tool invocations - eager_connect (
bool) – If True, connect to server during initialization. If False (default), defer connection until warm_up or first tool use, whichever comes first. - outputs_to_string (
dict[str, Any] | None) – Optional dictionary defining how tool outputs should be converted into a string. If the source is provided only the specified output key is sent to the handler. If the source is omitted the whole tool result is sent to the handler. Example:{"source": "docs", "handler": my_custom_function} - inputs_from_state (
dict[str, str] | None) – Optional dictionary mapping state keys to tool parameter names. Example:{"repository": "repo"}maps state's "repository" to tool's "repo" parameter. - outputs_to_state (
dict[str, dict[str, Any]] | None) – Optional dictionary defining how tool outputs map to keys within state as well as optional handlers. If the source is provided only the specified output key is sent to the handler. Example with source:{"documents": {"source": "docs", "handler": custom_handler}}Example without source:{"documents": {"handler": custom_handler}}
Raises:
MCPConnectionError– If connection to the server failsMCPToolNotFoundError– If no tools are available or the requested tool is not foundTimeoutError– If connection times out
ainvoke(**kwargs: Any) -> str | dict[str, Any]Asynchronous tool invocation.
Parameters:
- kwargs (
Any) – Arguments to pass to the tool
Returns:
str | dict[str, Any]– JSON string or dictionary representation of the tool invocation result. Returns a dictionary when outputs_to_state is configured to enable state updates.
Raises:
MCPInvocationError– If the tool invocation failsTimeoutError– If the operation times out
warm_up() -> NoneConnect and fetch the tool schema if eager_connect is turned off.
to_dict() -> dict[str, Any]Serializes the MCPTool to a dictionary.
The serialization preserves all information needed to recreate the tool, including server connection parameters, timeout settings, and state-mapping parameters. Note that the active connection is not maintained.
Returns:
dict[str, Any]– Dictionary with serialized data in the format:{"type": fully_qualified_class_name, "data": {parameters}}
from_dict(data: dict[str, Any]) -> ToolDeserializes the MCPTool from a dictionary.
This method reconstructs an MCPTool instance from a serialized dictionary, including recreating the server_info object and state-mapping parameters. A new connection will be established to the MCP server during initialization.
Parameters:
- data (
dict[str, Any]) – Dictionary containing serialized tool data
Returns:
Tool– A fully initialized MCPTool instance
Raises:
Exception– if connection fails
close() -> NoneClose the tool synchronously.
Bases: Toolset
A Toolset that connects to an MCP (Model Context Protocol) server and provides access to its tools.
MCPToolset dynamically discovers and loads all tools from any MCP-compliant server, supporting both network-based streaming connections (Streamable HTTP, SSE) and local process-based stdio connections. This dual connectivity allows for integrating with both remote and local MCP servers.
Example using MCPToolset in a Haystack Pipeline:
# Prerequisites:
# 1. pip install uvx mcp-server-time # Install required MCP server and tools
# 2. export OPENAI_API_KEY="your-api-key" # Set up your OpenAI API key
import os
from haystack import Pipeline
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.tools import ToolInvoker
from haystack.dataclasses import ChatMessage
from haystack_integrations.tools.mcp import MCPToolset, StdioServerInfo
# Create server info for the time service (can also use SSEServerInfo for remote servers)
server_info = StdioServerInfo(command="uvx", args=["mcp-server-time", "--local-timezone=Europe/Berlin"])
# Create the toolset - this will automatically discover all available tools
# You can optionally specify which tools to include
mcp_toolset = MCPToolset(
server_info=server_info,
tool_names=["get_current_time"] # Only include the get_current_time tool
)
# Create a pipeline with the toolset
pipeline = Pipeline()
pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o-mini", tools=mcp_toolset))
pipeline.add_component("tool_invoker", ToolInvoker(tools=mcp_toolset))
pipeline.add_component(
"adapter",
OutputAdapter(
template="{{ initial_msg + initial_tool_messages + tool_messages }}",
output_type=list[ChatMessage],
unsafe=True,
),
)
pipeline.add_component("response_llm", OpenAIChatGenerator(model="gpt-4o-mini"))
pipeline.connect("llm.replies", "tool_invoker.messages")
pipeline.connect("llm.replies", "adapter.initial_tool_messages")
pipeline.connect("tool_invoker.tool_messages", "adapter.tool_messages")
pipeline.connect("adapter.output", "response_llm.messages")
# Run the pipeline with a user question
user_input = "What is the time in New York? Be brief."
user_input_msg = ChatMessage.from_user(text=user_input)
result = pipeline.run({"llm": {"messages": [user_input_msg]}, "adapter": {"initial_msg": [user_input_msg]}})
print(result["response_llm"]["replies"][0].text)You can also use the toolset via Streamable HTTP to talk to remote servers:
from haystack_integrations.tools.mcp import MCPToolset, StreamableHttpServerInfo
# Create the toolset with streamable HTTP connection
toolset = MCPToolset(
server_info=StreamableHttpServerInfo(url="http://localhost:8000/mcp"),
tool_names=["multiply"] # Optional: only include specific tools
)
# Use the toolset as shown in the pipeline example aboveExample with state configuration for Agent integration:
from haystack_integrations.tools.mcp import MCPToolset, StdioServerInfo
# Create the toolset with per-tool state configuration
# This enables tools to read from and write to the Agent's State
toolset = MCPToolset(
server_info=StdioServerInfo(command="uvx", args=["mcp-server-git"]),
tool_names=["git_status", "git_diff", "git_log"],
# Maps the state key "repository" to the tool parameter "repo_path" for each tool
inputs_from_state={
"git_status": {"repository": "repo_path"},
"git_diff": {"repository": "repo_path"},
"git_log": {"repository": "repo_path"},
},
# Map tool outputs to state keys for each tool
outputs_to_state={
"git_status": {"status_result": {"source": "status"}}, # Extract "status" from output
"git_diff": {"diff_result": {}}, # use full output with default handling
},
)Example using SSE (deprecated):
from haystack_integrations.tools.mcp import MCPToolset, SSEServerInfo
from haystack.components.tools import ToolInvoker
# Create the toolset with an SSE connection
sse_toolset = MCPToolset(
server_info=SSEServerInfo(url="http://some-remote-server.com:8000/sse"),
tool_names=["add", "subtract"] # Only include specific tools
)
# Use the toolset as shown in the pipeline example above__init__(
server_info: MCPServerInfo,
tool_names: list[str] | None = None,
connection_timeout: float = 30.0,
invocation_timeout: float = 30.0,
eager_connect: bool = False,
inputs_from_state: dict[str, dict[str, str]] | None = None,
outputs_to_state: dict[str, dict[str, dict[str, Any]]] | None = None,
outputs_to_string: dict[str, dict[str, Any]] | None = None,
) -> NoneInitialize the MCP toolset.
Parameters:
- server_info (
MCPServerInfo) – Connection information for the MCP server - tool_names (
list[str] | None) – Optional list of tool names to include. If provided, only tools with matching names will be added to the toolset. - connection_timeout (
float) – Timeout in seconds for server connection - invocation_timeout (
float) – Default timeout in seconds for tool invocations - eager_connect (
bool) – If True, connect to server and load tools during initialization. If False (default), defer connection to warm_up. - inputs_from_state (
dict[str, dict[str, str]] | None) – Optional dictionary mapping tool names to their inputs_from_state config. Each config maps state keys to tool parameter names. Tool names should match available tools from the server; a warning is logged for unknown tools. Note: With Haystack >= 2.22.0, parameter names are validated; ValueError is raised for invalid parameters. With earlier versions, invalid parameters fail at runtime. Example:{"git_status": {"repository": "repo_path"}} - outputs_to_state (
dict[str, dict[str, dict[str, Any]]] | None) – Optional dictionary mapping tool names to their outputs_to_state config. Each config defines how tool outputs map to state keys with optional handlers. Tool names should match available tools from the server; a warning is logged for unknown tools. Example:{"git_status": {"status_result": {"source": "status"}}} - outputs_to_string (
dict[str, dict[str, Any]] | None) – Optional dictionary mapping tool names to their outputs_to_string config. Each config defines how tool outputs are converted to strings. Tool names should match available tools from the server; a warning is logged for unknown tools. Example:{"git_diff": {"source": "diff", "handler": format_diff}}
Raises:
MCPToolNotFoundError– If any of the specified tool names are not found on the serverValueError– If parameter names in inputs_from_state are invalid (Haystack >= 2.22.0 only)
warm_up() -> NoneConnect and load tools when eager_connect is turned off.
This method is automatically called by ToolInvoker.warm_up() and Pipeline.warm_up().
You can also call it directly before using the toolset to ensure all tool schemas
are available without performing a real invocation.
to_dict() -> dict[str, Any]Serialize the MCPToolset to a dictionary.
Returns:
dict[str, Any]– A dictionary representation of the MCPToolset
from_dict(data: dict[str, Any]) -> MCPToolsetDeserialize an MCPToolset from a dictionary.
Parameters:
- data (
dict[str, Any]) – Dictionary representation of the MCPToolset
Returns:
MCPToolset– A new MCPToolset instance
close() -> NoneClose the underlying MCP client safely.