Skip to content

Commit 8a192dc

Browse files
authored
chore: (mcp) Remove ToolInvoker from tests, docstrings and examples (#3474)
1 parent c48fcbe commit 8a192dc

11 files changed

Lines changed: 328 additions & 593 deletions

File tree

.github/workflows/mcp.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,13 @@ jobs:
7575
with:
7676
python-version: ${{ matrix.python-version }}
7777

78-
- name: Install Hatch
78+
- name: Install Hatch and uv
7979
run: |
8080
python -m pip install --upgrade pip
8181
pip install hatch --uploaded-prior-to=P1D
82+
# uv ships the `uvx` launcher used by the stdio MCP server integration tests
83+
# (e.g. `uvx mcp-server-time`). Installed on every OS so those tests run everywhere.
84+
pip install uv
8285
8386
- name: Set up Docker
8487
if: runner.os == 'Linux'

integrations/mcp/examples/time_pipeline.py

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,8 @@
1111
import logging
1212

1313
from haystack import Pipeline
14-
from haystack.components.converters import OutputAdapter
14+
from haystack.components.agents import Agent
1515
from haystack.components.generators.chat import OpenAIChatGenerator
16-
from haystack.components.tools import ToolInvoker
1716
from haystack.dataclasses import ChatMessage
1817

1918
from haystack_integrations.tools.mcp.mcp_tool import MCPTool, StdioServerInfo
@@ -38,28 +37,16 @@ def main():
3837
server_info=StdioServerInfo(command="uvx", args=["mcp-server-time", "--local-timezone=Europe/Berlin"]),
3938
)
4039
pipeline = Pipeline()
41-
pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o-mini", tools=[time_tool]))
42-
pipeline.add_component("tool_invoker", ToolInvoker(tools=[time_tool]))
4340
pipeline.add_component(
44-
"adapter",
45-
OutputAdapter(
46-
template="{{ initial_msg + initial_tool_messages + tool_messages }}",
47-
output_type=list[ChatMessage],
48-
unsafe=True,
49-
),
41+
"agent", Agent(chat_generator=OpenAIChatGenerator(model="gpt-4o-mini"), tools=[time_tool])
5042
)
51-
pipeline.add_component("response_llm", OpenAIChatGenerator(model="gpt-4o-mini"))
52-
pipeline.connect("llm.replies", "tool_invoker.messages")
53-
pipeline.connect("llm.replies", "adapter.initial_tool_messages")
54-
pipeline.connect("tool_invoker.tool_messages", "adapter.tool_messages")
55-
pipeline.connect("adapter.output", "response_llm.messages")
5643

5744
user_input = "What is the time in New York? Be brief." # can be any city
5845
user_input_msg = ChatMessage.from_user(text=user_input)
5946

60-
result = pipeline.run({"llm": {"messages": [user_input_msg]}, "adapter": {"initial_msg": [user_input_msg]}})
47+
result = pipeline.run({"agent": {"messages": [user_input_msg]}})
6148

62-
print(result["response_llm"]["replies"][0].text)
49+
print(result["agent"]["messages"][-1].text)
6350
finally:
6451
if time_tool:
6552
time_tool.close()

integrations/mcp/examples/time_pipeline_toolset.py

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,8 @@
1111
import os
1212

1313
from haystack import Pipeline
14-
from haystack.components.converters import OutputAdapter
14+
from haystack.components.agents import Agent
1515
from haystack.components.generators.chat import OpenAIChatGenerator
16-
from haystack.components.tools import ToolInvoker
1716
from haystack.dataclasses import ChatMessage
1817

1918
from haystack_integrations.tools.mcp import MCPToolset, StdioServerInfo
@@ -35,28 +34,16 @@ def main():
3534
print("For now, demonstrating direct tool usage:")
3635

3736
pipeline = Pipeline()
38-
pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4.1-mini", tools=mcp_toolset))
39-
pipeline.add_component("tool_invoker", ToolInvoker(tools=mcp_toolset))
4037
pipeline.add_component(
41-
"adapter",
42-
OutputAdapter(
43-
template="{{ initial_msg + initial_tool_messages + tool_messages }}",
44-
output_type=list[ChatMessage],
45-
unsafe=True,
46-
),
38+
"agent", Agent(chat_generator=OpenAIChatGenerator(model="gpt-4.1-mini"), tools=mcp_toolset)
4739
)
48-
pipeline.add_component("response_llm", OpenAIChatGenerator(model="gpt-4.1-mini"))
49-
pipeline.connect("llm.replies", "tool_invoker.messages")
50-
pipeline.connect("llm.replies", "adapter.initial_tool_messages")
51-
pipeline.connect("tool_invoker.tool_messages", "adapter.tool_messages")
52-
pipeline.connect("adapter.output", "response_llm.messages")
5340

5441
user_input = "What is the time in New York? Be brief." # can be any city
5542
user_input_msg = ChatMessage.from_user(text=user_input)
5643

57-
result = pipeline.run({"llm": {"messages": [user_input_msg]}, "adapter": {"initial_msg": [user_input_msg]}})
44+
result = pipeline.run({"agent": {"messages": [user_input_msg]}})
5845

59-
print(result["response_llm"]["replies"][0].text)
46+
print(result["agent"]["messages"][-1].text)
6047

6148
finally:
6249
if mcp_toolset:

integrations/mcp/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ classifiers = [
3030
]
3131
dependencies = [
3232
"mcp>=1.8.0",
33-
"haystack-ai>=2.19.0",
33+
"haystack-ai>=2.23.0",
3434
"exceptiongroup", # Backport of ExceptionGroup for Python < 3.11
3535
"httpx" # HTTP client library used for SSE connections
3636
]

integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ def run_background(
151151
152152
:param coro_factory: A callable receiving the stop_event and returning the coroutine to execute.
153153
:param timeout: Optional timeout while waiting for the stop_event to be created.
154-
:returns: Tuple ``(future, stop_event)``.
154+
:returns: Tuple `(future, stop_event)`.
155155
"""
156156
# A promise that will be fulfilled from inside the coroutine_with_stop_event coroutine once the
157157
# stop_event is created *inside* the target event loop to ensure it is bound to the
@@ -1107,7 +1107,7 @@ async def invoke() -> Any:
11071107
logger.debug(f"TOOL: Invoke complete for '{self.name}', result type: {type(result)}")
11081108

11091109
# Parse JSON to dict only when outputs_to_state is configured.
1110-
# ToolInvoker requires dict for _merge_tool_outputs(); ToolCallResult.result expects str otherwise.
1110+
# State output handlers require a dict; ToolCallResult.result expects str otherwise.
11111111
if self.outputs_to_state:
11121112
return _extract_first_text_element(result)
11131113

@@ -1138,7 +1138,7 @@ async def ainvoke(self, **kwargs: Any) -> str | dict[str, Any]:
11381138
result = await asyncio.wait_for(client.call_tool(self.name, kwargs), timeout=self._invocation_timeout)
11391139

11401140
# Parse JSON to dict only when outputs_to_state is configured.
1141-
# ToolInvoker requires dict for _merge_tool_outputs(); ToolCallResult.result expects str otherwise.
1141+
# State output handlers require a dict; ToolCallResult.result expects str otherwise.
11421142
if self.outputs_to_state:
11431143
return _extract_first_text_element(result)
11441144

integrations/mcp/src/haystack_integrations/tools/mcp/mcp_toolset.py

Lines changed: 64 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,18 @@
1212
from haystack import logging
1313
from haystack.core.serialization import generate_qualified_class_name, import_class_by_name
1414
from haystack.tools import Tool, Toolset
15-
from haystack.utils.callable_serialization import deserialize_callable, serialize_callable
15+
from haystack.tools.tool import (
16+
_deserialize_outputs_to_state as _hs_deserialize_outputs_to_state,
17+
)
18+
from haystack.tools.tool import (
19+
_deserialize_outputs_to_string as _hs_deserialize_outputs_to_string,
20+
)
21+
from haystack.tools.tool import (
22+
_serialize_outputs_to_state as _hs_serialize_outputs_to_state,
23+
)
24+
from haystack.tools.tool import (
25+
_serialize_outputs_to_string as _hs_serialize_outputs_to_string,
26+
)
1627

1728
from .mcp_tool import (
1829
AsyncExecutor,
@@ -29,86 +40,62 @@
2940
logger = logging.getLogger(__name__)
3041

3142

32-
def _serialize_state_config(config: dict[str, dict[str, Any]] | None) -> dict[str, dict[str, Any]] | None:
43+
def _serialize_outputs_to_state(config: dict[str, dict[str, Any]] | None) -> dict[str, dict[str, Any]] | None:
3344
"""
34-
Serialize a state configuration dictionary, converting any callable handlers to their string representation.
35-
36-
Works for both outputs_to_state (tool_name -> {state_key -> {source, handler}})
37-
and outputs_to_string (tool_name -> {source, handler}).
45+
Serialize a per-tool `outputs_to_state` mapping (tool_name -> {state_key -> {source, handler}}).
3846
39-
Note: The keys "source" and "handler" are reserved and used internally to distinguish between
40-
outputs_to_string format and outputs_to_state format. Do not use these as state keys in
41-
outputs_to_state configurations.
47+
Callable handlers are converted to their string representation via Haystack's serialization helper.
4248
43-
:param config: The state configuration dictionary to serialize
44-
:returns: The serialized configuration dictionary, or None if empty
49+
:param config: The per-tool `outputs_to_state` mapping to serialize
50+
:returns: The serialized mapping, or None if empty
4551
"""
4652
if not config:
4753
return None
54+
serialized = {
55+
name: _hs_serialize_outputs_to_state(tool_config) for name, tool_config in config.items() if tool_config
56+
}
57+
return serialized or None
4858

49-
serialized = {}
50-
for tool_name, tool_config in config.items():
51-
if not tool_config:
52-
continue
53-
54-
# Check if this is outputs_to_string format (flat with optional source/handler)
55-
# or outputs_to_state format (nested with state keys)
56-
if "source" in tool_config or "handler" in tool_config:
57-
# outputs_to_string format: {source?, handler?}
58-
serialized_tool_config = tool_config.copy()
59-
if "handler" in tool_config and callable(tool_config["handler"]):
60-
serialized_tool_config["handler"] = serialize_callable(tool_config["handler"])
61-
serialized[tool_name] = serialized_tool_config
62-
else:
63-
# outputs_to_state format: {state_key -> {source?, handler?}}
64-
serialized_tool_config = {}
65-
for state_key, state_config in tool_config.items():
66-
serialized_state_config = state_config.copy()
67-
if "handler" in state_config and callable(state_config["handler"]):
68-
serialized_state_config["handler"] = serialize_callable(state_config["handler"])
69-
serialized_tool_config[state_key] = serialized_state_config
70-
serialized[tool_name] = serialized_tool_config
7159

72-
return serialized if serialized else None
60+
def _serialize_outputs_to_string(config: dict[str, dict[str, Any]] | None) -> dict[str, dict[str, Any]] | None:
61+
"""
62+
Serialize a per-tool `outputs_to_string` mapping (tool_name -> {source?, handler?}).
7363
64+
Callable handlers are converted to their string representation via Haystack's serialization helper.
7465
75-
def _deserialize_state_config(config: dict[str, dict[str, Any]] | None) -> dict[str, dict[str, Any]]:
66+
:param config: The per-tool `outputs_to_string` mapping to serialize
67+
:returns: The serialized mapping, or None if empty
7668
"""
77-
Deserialize a state configuration dictionary, converting any serialized handlers back to callables.
69+
if not config:
70+
return None
71+
serialized = {
72+
name: _hs_serialize_outputs_to_string(tool_config) for name, tool_config in config.items() if tool_config
73+
}
74+
return serialized or None
7875

79-
Works for both outputs_to_state (tool_name -> {state_key -> {source, handler}})
80-
and outputs_to_string (tool_name -> {source, handler}).
8176

82-
:param config: The state configuration dictionary to deserialize
83-
:returns: The deserialized configuration dictionary
77+
def _deserialize_outputs_to_state(config: dict[str, dict[str, Any]] | None) -> dict[str, dict[str, Any]]:
78+
"""
79+
Deserialize a per-tool `outputs_to_state` mapping, restoring string handlers back to callables.
80+
81+
:param config: The per-tool `outputs_to_state` mapping to deserialize
82+
:returns: The deserialized mapping
8483
"""
8584
if not config:
8685
return {}
86+
return {name: _hs_deserialize_outputs_to_state(tool_config) for name, tool_config in config.items() if tool_config}
8787

88-
deserialized = {}
89-
for tool_name, tool_config in config.items():
90-
if not tool_config:
91-
continue
92-
93-
# Check if this is outputs_to_string format (flat with optional source/handler)
94-
# or outputs_to_state format (nested with state keys)
95-
if "source" in tool_config or "handler" in tool_config:
96-
# outputs_to_string format: {source?, handler?}
97-
deserialized_tool_config = tool_config.copy()
98-
if "handler" in tool_config and isinstance(tool_config["handler"], str):
99-
deserialized_tool_config["handler"] = deserialize_callable(tool_config["handler"])
100-
deserialized[tool_name] = deserialized_tool_config
101-
else:
102-
# outputs_to_state format: {state_key -> {source?, handler?}}
103-
deserialized_tool_config = {}
104-
for state_key, state_config in tool_config.items():
105-
deserialized_state_config = state_config.copy()
106-
if "handler" in state_config and isinstance(state_config["handler"], str):
107-
deserialized_state_config["handler"] = deserialize_callable(state_config["handler"])
108-
deserialized_tool_config[state_key] = deserialized_state_config
109-
deserialized[tool_name] = deserialized_tool_config
11088

111-
return deserialized
89+
def _deserialize_outputs_to_string(config: dict[str, dict[str, Any]] | None) -> dict[str, dict[str, Any]]:
90+
"""
91+
Deserialize a per-tool `outputs_to_string` mapping, restoring string handlers back to callables.
92+
93+
:param config: The per-tool `outputs_to_string` mapping to deserialize
94+
:returns: The deserialized mapping
95+
"""
96+
if not config:
97+
return {}
98+
return {name: _hs_deserialize_outputs_to_string(tool_config) for name, tool_config in config.items() if tool_config}
11299

113100

114101
class MCPToolset(Toolset):
@@ -126,11 +113,9 @@ class MCPToolset(Toolset):
126113
# 1. pip install uvx mcp-server-time # Install required MCP server and tools
127114
# 2. export OPENAI_API_KEY="your-api-key" # Set up your OpenAI API key
128115
129-
import os
130116
from haystack import Pipeline
131-
from haystack.components.converters import OutputAdapter
117+
from haystack.components.agents import Agent
132118
from haystack.components.generators.chat import OpenAIChatGenerator
133-
from haystack.components.tools import ToolInvoker
134119
from haystack.dataclasses import ChatMessage
135120
from haystack_integrations.tools.mcp import MCPToolset, StdioServerInfo
136121
@@ -144,30 +129,18 @@ class MCPToolset(Toolset):
144129
tool_names=["get_current_time"] # Only include the get_current_time tool
145130
)
146131
147-
# Create a pipeline with the toolset
132+
# Create a pipeline with an Agent that owns the tool-calling loop.
133+
# The Agent passes the toolset to the chat generator, executes any requested
134+
# tool calls, and continues until a final answer is produced.
148135
pipeline = Pipeline()
149-
pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o-mini", tools=mcp_toolset))
150-
pipeline.add_component("tool_invoker", ToolInvoker(tools=mcp_toolset))
151-
pipeline.add_component(
152-
"adapter",
153-
OutputAdapter(
154-
template="{{ initial_msg + initial_tool_messages + tool_messages }}",
155-
output_type=list[ChatMessage],
156-
unsafe=True,
157-
),
158-
)
159-
pipeline.add_component("response_llm", OpenAIChatGenerator(model="gpt-4o-mini"))
160-
pipeline.connect("llm.replies", "tool_invoker.messages")
161-
pipeline.connect("llm.replies", "adapter.initial_tool_messages")
162-
pipeline.connect("tool_invoker.tool_messages", "adapter.tool_messages")
163-
pipeline.connect("adapter.output", "response_llm.messages")
136+
pipeline.add_component("agent", Agent(chat_generator=OpenAIChatGenerator(model="gpt-4o-mini"), tools=mcp_toolset))
164137
165138
# Run the pipeline with a user question
166139
user_input = "What is the time in New York? Be brief."
167140
user_input_msg = ChatMessage.from_user(text=user_input)
168141
169-
result = pipeline.run({"llm": {"messages": [user_input_msg]}, "adapter": {"initial_msg": [user_input_msg]}})
170-
print(result["response_llm"]["replies"][0].text)
142+
result = pipeline.run({"agent": {"messages": [user_input_msg]}})
143+
print(result["agent"]["messages"][-1].text)
171144
```
172145
173146
You can also use the toolset via Streamable HTTP to talk to remote servers:
@@ -209,7 +182,6 @@ class MCPToolset(Toolset):
209182
Example using SSE (deprecated):
210183
```python
211184
from haystack_integrations.tools.mcp import MCPToolset, SSEServerInfo
212-
from haystack.components.tools import ToolInvoker
213185
214186
# Create the toolset with an SSE connection
215187
sse_toolset = MCPToolset(
@@ -291,7 +263,7 @@ def warm_up(self) -> None:
291263
"""
292264
Connect and load tools when eager_connect is turned off.
293265
294-
This method is automatically called by ``ToolInvoker.warm_up()`` and ``Pipeline.warm_up()``.
266+
This method is automatically called by `Agent.warm_up()` and `Pipeline.warm_up()`.
295267
You can also call it directly before using the toolset to ensure all tool schemas
296268
are available without performing a real invocation.
297269
"""
@@ -341,12 +313,12 @@ def invoke_tool(**kwargs: Any) -> Any:
341313
mcp_client.call_tool(tool_name, kwargs), timeout=tool_timeout
342314
)
343315
# Parse JSON to dict only when outputs_to_state is configured.
344-
# ToolInvoker requires dict for _merge_tool_outputs(); ToolCallResult.result expects str otherwise.
316+
# State output handlers require a dict; ToolCallResult.result expects str otherwise.
345317
if outputs_to_state:
346318
parsed = json.loads(result)
347319

348320
# Per MCP spec, content[] may contain TextContent, ImageContent, AudioContent, etc.
349-
# Parse only first TextContent block (ToolInvoker requires dict, not list).
321+
# Parse only first TextContent block (state output handlers require a dict, not a list).
350322
content = parsed.get("content", [])
351323
for block in content:
352324
if isinstance(block, dict) and block.get("type") == "text":
@@ -494,8 +466,8 @@ def to_dict(self) -> dict[str, Any]:
494466
"invocation_timeout": self.invocation_timeout,
495467
"eager_connect": self.eager_connect,
496468
"inputs_from_state": self.inputs_from_state if self.inputs_from_state else None,
497-
"outputs_to_state": _serialize_state_config(self.outputs_to_state),
498-
"outputs_to_string": _serialize_state_config(self.outputs_to_string),
469+
"outputs_to_state": _serialize_outputs_to_state(self.outputs_to_state),
470+
"outputs_to_string": _serialize_outputs_to_string(self.outputs_to_string),
499471
},
500472
}
501473

@@ -516,8 +488,8 @@ def from_dict(cls, data: dict[str, Any]) -> "MCPToolset":
516488

517489
# Deserialize state configuration parameters
518490
inputs_from_state = inner_data.get("inputs_from_state")
519-
outputs_to_state = _deserialize_state_config(inner_data.get("outputs_to_state"))
520-
outputs_to_string = _deserialize_state_config(inner_data.get("outputs_to_string"))
491+
outputs_to_state = _deserialize_outputs_to_state(inner_data.get("outputs_to_state"))
492+
outputs_to_string = _deserialize_outputs_to_string(inner_data.get("outputs_to_string"))
521493

522494
# Create a new MCPToolset instance
523495
return cls(

0 commit comments

Comments
 (0)