Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import asyncio
import concurrent.futures
import json
import threading
import warnings
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -1048,12 +1049,13 @@ def _connect_and_initialize(self, tool_name: str) -> types.Tool:

return tool

def _invoke_tool(self, **kwargs: Any) -> str:
def _invoke_tool(self, **kwargs: Any) -> str | dict[str, Any]:
"""
Synchronous tool invocation.

:param kwargs: Arguments to pass to the tool
:returns: JSON string representation of the tool invocation result
:returns: JSON string or dictionary representation of the tool invocation result.
Returns a dictionary when outputs_to_state is configured to enable state updates.
"""
logger.debug(f"TOOL: Invoking tool '{self.name}' with args: {kwargs}")
try:
Expand All @@ -1070,6 +1072,26 @@ async def invoke():
logger.debug(f"TOOL: About to run invoke for '{self.name}'")
result = AsyncExecutor.get_instance().run(invoke(), timeout=self._invocation_timeout)
logger.debug(f"TOOL: Invoke complete for '{self.name}', result type: {type(result)}")

# Parse JSON to dict only when outputs_to_state is configured.
# ToolInvoker requires dict for _merge_tool_outputs(); ToolCallResult.result expects str otherwise.
if self.outputs_to_state:
parsed = json.loads(result)

# Per MCP spec, content[] may contain TextContent, ImageContent, AudioContent, etc.
# Parse only first TextContent block (ToolInvoker requires dict, not list).
content = parsed.get("content", [])
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
text = block.get("text", "")
try:
return json.loads(text)
except (json.JSONDecodeError, TypeError):
return text

# No TextContent found, return full parsed response as fallback
return parsed

return result
except (MCPError, TimeoutError) as e:
logger.debug(f"TOOL: Known error during invoke of '{self.name}': {e!s}")
Expand All @@ -1081,19 +1103,41 @@ async def invoke():
message = f"Failed to invoke tool '{self.name}' with args: {kwargs} , got error: {e!s}"
raise MCPInvocationError(message, self.name, kwargs) from e

async def ainvoke(self, **kwargs: Any) -> str:
async def ainvoke(self, **kwargs: Any) -> str | dict[str, Any]:
"""
Asynchronous tool invocation.

:param kwargs: Arguments to pass to the tool
:returns: JSON string representation of the tool invocation result
:returns: JSON string or dictionary representation of the tool invocation result.
Returns a dictionary when outputs_to_state is configured to enable state updates.
:raises MCPInvocationError: If the tool invocation fails
:raises TimeoutError: If the operation times out
"""
try:
self.warm_up()
client = cast(MCPClient, self._client)
return await asyncio.wait_for(client.call_tool(self.name, kwargs), timeout=self._invocation_timeout)
result = await asyncio.wait_for(client.call_tool(self.name, kwargs), timeout=self._invocation_timeout)

# Parse JSON to dict only when outputs_to_state is configured.
# ToolInvoker requires dict for _merge_tool_outputs(); ToolCallResult.result expects str otherwise.
if self.outputs_to_state:
parsed = json.loads(result)

# Per MCP spec, content[] may contain TextContent, ImageContent, AudioContent, etc.
# Parse only first TextContent block (ToolInvoker requires dict, not list).
content = parsed.get("content", [])
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
text = block.get("text", "")
try:
return json.loads(text)
except (json.JSONDecodeError, TypeError):
return text

# No TextContent found, return full parsed response as fallback
return parsed

return result
except asyncio.TimeoutError as e:
message = f"Tool invocation timed out after {self._invocation_timeout} seconds"
raise TimeoutError(message) from e
Expand Down
Loading