diff --git a/src/uipath_langchain/agent/tools/mcp/mcp_client.py b/src/uipath_langchain/agent/tools/mcp/mcp_client.py index c201a2dcb..616ec6a88 100644 --- a/src/uipath_langchain/agent/tools/mcp/mcp_client.py +++ b/src/uipath_langchain/agent/tools/mcp/mcp_client.py @@ -15,7 +15,15 @@ from mcp import ClientSession from mcp.shared.exceptions import McpError from mcp.shared.message import SessionMessage -from mcp.types import CallToolResult, ListToolsResult +from mcp.types import ( + CallToolRequest, + CallToolRequestParams, + CallToolResult, + ClientRequest, + CreateTaskResult, + ListToolsResult, + TaskMetadata, +) from uipath._utils._ssl_context import get_httpx_client_kwargs from uipath.runtime.base import UiPathDisposableProtocol @@ -366,6 +374,39 @@ async def call_tool( f"call_tool({name})", ) + async def call_tool_as_task( + self, + name: str, + arguments: dict[str, Any] | None = None, + ) -> CreateTaskResult: + """Call an MCP tool as a task (2025-11-25), returning a task handle. + + Sends a task-augmented ``tools/call`` (``params.task``); a task-supporting + server responds with a ``CreateTaskResult`` (a handle to poll/await/drive) + instead of blocking for the tool result. The Python SDK has no client task + helper, so this sends the raw request via ``send_request``. + + Args: + name: The name of the tool to call. + arguments: Optional arguments to pass to the tool. + + Returns: + The ``CreateTaskResult`` returned by the server. + """ + return await self._execute_with_retry( + lambda session: session.send_request( + ClientRequest( + CallToolRequest( + params=CallToolRequestParams( + name=name, arguments=arguments, task=TaskMetadata() + ) + ) + ), + CreateTaskResult, + ), + f"call_tool_as_task({name})", + ) + async def dispose(self) -> None: """Dispose of the client and release all resources. diff --git a/src/uipath_langchain/agent/tools/mcp/mcp_tool.py b/src/uipath_langchain/agent/tools/mcp/mcp_tool.py index 6963f1f70..875240872 100644 --- a/src/uipath_langchain/agent/tools/mcp/mcp_tool.py +++ b/src/uipath_langchain/agent/tools/mcp/mcp_tool.py @@ -1,3 +1,4 @@ +import json import logging from contextlib import AsyncExitStack, asynccontextmanager from typing import Any, AsyncGenerator @@ -6,11 +7,21 @@ from uipath.agent.models.agent import ( AgentMcpResourceConfig, AgentMcpTool, + AgentMcpToolExecution, CachedToolsConfig, DynamicToolsConfig, + McpToolTaskSupport, ) from uipath.eval.mocks import mockable - +from uipath.platform import UiPath +from uipath.platform.common import WaitJobRaw +from uipath.platform.orchestrator import Job, JobState + +from uipath_langchain._utils.durable_interrupt import durable_interrupt +from uipath_langchain.agent.exceptions import ( + AgentRuntimeError, + AgentRuntimeErrorCode, +) from uipath_langchain.agent.tools.structured_tool_with_argument_properties import ( StructuredToolWithArgumentProperties, ) @@ -20,6 +31,32 @@ logger: logging.Logger = logging.getLogger(__name__) +# _meta keys AgentHubService stamps on a task result to mark it as a UiPath job (see PR adding +# uipath.com/* markers). The MCP client reads them to drive the job as a suspendable child job. +_UIPATH_SOURCE_META_KEY = "uipath.com/source" +_UIPATH_JOB_KEY_META_KEY = "uipath.com/jobKey" +_UIPATH_FOLDER_KEY_META_KEY = "uipath.com/folderKey" +_UIPATH_ORCHESTRATOR_SOURCE = "orchestrator" + + +def _execution_from_server_tool(tool: Any) -> AgentMcpToolExecution | None: + """Map an MCP server Tool's ``execution.taskSupport`` into the snapshot model (dynamic mode).""" + execution = getattr(tool, "execution", None) + task_support = getattr(execution, "taskSupport", None) if execution else None + if task_support is None: + return None + value = getattr(task_support, "value", task_support) + return AgentMcpToolExecution(task_support=McpToolTaskSupport(value)) + + +def _is_task_augmentable(mcp_tool: AgentMcpTool) -> bool: + """Whether the tool advertises MCP task support (``optional`` / ``required``).""" + execution = getattr(mcp_tool, "execution", None) + return execution is not None and execution.task_support in ( + McpToolTaskSupport.OPTIONAL, + McpToolTaskSupport.REQUIRED, + ) + @asynccontextmanager async def open_mcp_tools( @@ -118,6 +155,7 @@ async def create_mcp_tools( input_schema=tool.inputSchema, output_schema=tool.outputSchema, argument_properties=argument_properties, + execution=_execution_from_server_tool(tool), ) ) else: @@ -155,6 +193,8 @@ def build_mcp_tool(mcp_tool: AgentMcpTool, mcpClient: McpClient) -> Any: else: output_schema = {"type": "object", "properties": {}} + task_augmentable = _is_task_augmentable(mcp_tool) + @mockable( name=mcp_tool.name, description=mcp_tool.description, @@ -162,11 +202,15 @@ def build_mcp_tool(mcp_tool: AgentMcpTool, mcpClient: McpClient) -> Any: output_schema=output_schema, ) async def tool_fn(**kwargs: Any) -> Any: - """Execute MCP tool call with ephemeral session. + """Execute an MCP tool call with an ephemeral session. - If a session disconnect error occurs (e.g., 404 or session terminated), - the tool will retry once by re-initializing the session. + When the tool supports MCP tasks, the call starts a UiPath job and suspends the + agent until it completes (see :func:`_invoke_mcp_tool_as_job`). Otherwise the tool + is called synchronously; a session disconnect (404) retries once. """ + if task_augmentable: + return await _invoke_mcp_tool_as_job(mcp_tool, mcpClient, kwargs) + result = await mcpClient.call_tool(mcp_tool.name, arguments=kwargs) logger.info(f"Tool call successful for {mcp_tool.name}") content = result.content if hasattr(result, "content") else result @@ -184,6 +228,69 @@ async def tool_fn(**kwargs: Any) -> Any: return tool_fn +async def _invoke_mcp_tool_as_job( + mcp_tool: AgentMcpTool, + mcpClient: McpClient, + arguments: dict[str, Any], +) -> Any: + """Call a task-supporting MCP tool and suspend the agent job until the child completes. + + The task-augmented ``tools/call`` returns a ``CreateTaskResult`` whose ``_meta`` marks it + as a UiPath Orchestrator job. We then ``interrupt`` with a ``WaitJobRaw`` (exactly like + :func:`process_tool.create_process_tool`), so the runtime suspends the parent job and + resumes it with the child job's output when it finishes. + + Args: + mcp_tool: The MCP tool being invoked. + mcpClient: The client used to start the task. + arguments: The tool-call arguments. + + Returns: + The completed child job's output (parsed JSON when possible). + """ + + @durable_interrupt + async def start_mcp_job(): + create_result = await mcpClient.call_tool_as_task( + mcp_tool.name, arguments=arguments + ) + meta = create_result.meta or {} + if meta.get(_UIPATH_SOURCE_META_KEY) != _UIPATH_ORCHESTRATOR_SOURCE: + raise AgentRuntimeError( + code=AgentRuntimeErrorCode.UNEXPECTED_ERROR, + title=f"Tool '{mcp_tool.name}' did not start a UiPath job", + detail=( + "The MCP server returned a task that is not a UiPath Orchestrator job " + "(missing the uipath.com/source marker), which is not supported." + ), + ) + + return WaitJobRaw( + # The resume trigger keys off the job's GUID key (item_key = job.key) and re-fetches the + # job on resume; the numeric id is required by the model but unused here, hence the 0. + job=Job( + id=0, + key=meta.get(_UIPATH_JOB_KEY_META_KEY), + folder_key=meta.get(_UIPATH_FOLDER_KEY_META_KEY), + ), + process_folder_key=meta.get(_UIPATH_FOLDER_KEY_META_KEY), + ) + + # First run: starts the job and suspends. On resume: returns the resolved Job. + job = await start_mcp_job() + + if (getattr(job, "state", None) or "").lower() == JobState.FAULTED: + return str(getattr(job, "info", None) or "Unknown error") + + output_str = await UiPath().jobs.extract_output_async(job) + if output_str: + try: + return json.loads(output_str) + except (json.JSONDecodeError, TypeError): + return output_str + return output_str + + async def create_mcp_tools_and_clients( resources: list[AgentMcpResourceConfig], session_info_factory: SessionInfoFactory | None = None, diff --git a/tests/agent/tools/test_mcp/test_mcp_task_suspend.py b/tests/agent/tools/test_mcp/test_mcp_task_suspend.py new file mode 100644 index 000000000..d02b821da --- /dev/null +++ b/tests/agent/tools/test_mcp/test_mcp_task_suspend.py @@ -0,0 +1,144 @@ +"""Tests for suspend-on-UiPath-task (PR-B). + +A task-supporting MCP tool, when called against a UiPath MCP server, returns a +CreateTaskResult whose _meta marks it as a UiPath job. The tool then interrupts with a +WaitJobRaw (like process_tool), suspending the parent agent job until the child completes. +""" + +import datetime +from unittest.mock import AsyncMock, MagicMock, patch + +from mcp.types import CreateTaskResult, Tool, ToolExecution +from uipath.agent.models.agent import AgentMcpTool, McpToolTaskSupport +from uipath.platform.common import WaitJobRaw + +from uipath_langchain.agent.tools.mcp.mcp_client import McpClient +from uipath_langchain.agent.tools.mcp.mcp_tool import ( + _execution_from_server_tool, + _is_task_augmentable, + build_mcp_tool, +) + + +def _mcp_tool(task_support: str | None) -> AgentMcpTool: + data: dict = { + "name": "invoke-process", + "description": "Run a process", + "inputSchema": {"type": "object", "properties": {}}, + } + if task_support is not None: + data["execution"] = {"taskSupport": task_support} + return AgentMcpTool.model_validate(data) + + +def _create_task_result(source: str = "orchestrator") -> CreateTaskResult: + now = datetime.datetime.now(datetime.timezone.utc).isoformat() + return CreateTaskResult.model_validate( + { + "task": { + "taskId": "job-key-1", + "status": "working", + "createdAt": now, + "lastUpdatedAt": now, + "ttl": 86_400_000, + }, + "_meta": { + "uipath.com/source": source, + "uipath.com/jobKey": "job-key-1", + "uipath.com/folderKey": "folder-key-1", + }, + } + ) + + +class TestTaskAugmentableDetection: + def test_optional_and_required_are_augmentable(self) -> None: + assert _is_task_augmentable(_mcp_tool("optional")) is True + assert _is_task_augmentable(_mcp_tool("required")) is True + + def test_forbidden_and_missing_are_not(self) -> None: + assert _is_task_augmentable(_mcp_tool("forbidden")) is False + assert _is_task_augmentable(_mcp_tool(None)) is False + + def test_execution_mapped_from_server_tool(self) -> None: + tool = Tool( + name="p", + description="d", + inputSchema={"type": "object", "properties": {}}, + execution=ToolExecution(taskSupport="optional"), + ) + execution = _execution_from_server_tool(tool) + assert execution is not None + assert execution.task_support == McpToolTaskSupport.OPTIONAL + + def test_execution_none_when_server_tool_has_no_execution(self) -> None: + tool = Tool( + name="p", + description="d", + inputSchema={"type": "object", "properties": {}}, + ) + assert _execution_from_server_tool(tool) is None + + +class TestCallToolAsTask: + async def test_sends_task_augmented_request(self) -> None: + client = McpClient(config=MagicMock()) + session = MagicMock() + create_result = _create_task_result() + session.send_request = AsyncMock(return_value=create_result) + client._ensure_session = AsyncMock(return_value=session) # type: ignore[method-assign] + client._client_initialized = True + + result = await client.call_tool_as_task("invoke-process", arguments={"a": 1}) + + assert result is create_result + sent_request = session.send_request.call_args[0][0] + call_tool_request = sent_request.root + assert call_tool_request.params.name == "invoke-process" + assert call_tool_request.params.task is not None + + +class TestSuspendOnUiPathTask: + @patch("uipath_langchain._utils.durable_interrupt.decorator.interrupt") + @patch("uipath_langchain.agent.tools.mcp.mcp_tool.UiPath") + async def test_task_tool_starts_job_and_suspends_with_waitjob( + self, mock_uipath: MagicMock, mock_interrupt: MagicMock + ) -> None: + mcp_client = MagicMock() + mcp_client.call_tool_as_task = AsyncMock(return_value=_create_task_result()) + + resumed_job = MagicMock() + resumed_job.state = "successful" + mock_interrupt.return_value = resumed_job + + sdk = MagicMock() + sdk.jobs.extract_output_async = AsyncMock(return_value='{"out": 1}') + mock_uipath.return_value = sdk + + tool_fn = build_mcp_tool(_mcp_tool("optional"), mcp_client) + result = await tool_fn(invoiceId="INV-1") + + mcp_client.call_tool_as_task.assert_awaited_once() + # Suspended on a WaitJobRaw carrying the job + folder keys read from _meta. + wait = mock_interrupt.call_args[0][0] + assert isinstance(wait, WaitJobRaw) + assert str(wait.job.key) == "job-key-1" + assert str(wait.process_folder_key) == "folder-key-1" + # Resume returns the child job's output. + assert result == {"out": 1} + + @patch("uipath_langchain._utils.durable_interrupt.decorator.interrupt") + async def test_non_task_tool_calls_synchronously( + self, mock_interrupt: MagicMock + ) -> None: + mcp_client = MagicMock() + sync_result = MagicMock() + sync_result.content = "sync-result" + mcp_client.call_tool = AsyncMock(return_value=sync_result) + + tool_fn = build_mcp_tool(_mcp_tool(None), mcp_client) + result = await tool_fn(x=1) + + mcp_client.call_tool.assert_awaited_once() + mock_interrupt.assert_not_called() + assert result == "sync-result"