Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions src/uipath_langchain/agent/tools/client_side_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""Factory for creating client-side tools that execute on the client SDK."""

import inspect
import json
from logging import getLogger
from typing import Annotated, Any

from langchain_core.messages import ToolMessage
from langchain_core.tools import InjectedToolCallId, StructuredTool
from uipath.agent.models.agent import AgentClientSideToolResourceConfig
from uipath.eval.mocks import mockable

from uipath_langchain._utils.durable_interrupt import durable_interrupt
from uipath_langchain.agent.react.jsonschema_pydantic_converter import (
create_model as create_model_from_schema,
)

from .utils import sanitize_tool_name

logger = getLogger(__name__)

CLIENT_SIDE_TOOL_MARKER = "uipath_client_tool"


def create_client_side_tool(
resource: AgentClientSideToolResourceConfig,
) -> StructuredTool:
"""Create a client-side tool that pauses the graph and waits for the client to execute it.

The tool uses @durable_interrupt to suspend the graph. The client SDK receives
an executingToolCall event, runs its registered handler, and sends endToolCall
back through CAS. The bridge routes that endToolCall to wait_for_resume(),
which unblocks the graph with the client's result.
"""
tool_name = sanitize_tool_name(resource.name)
input_model = create_model_from_schema(resource.input_schema)

async def client_side_tool_fn(
*, tool_call_id: Annotated[str, InjectedToolCallId], **kwargs: Any
) -> Any:
@mockable(
name=resource.name,
description=resource.description,
input_schema=input_model.model_json_schema(),
output_schema=(resource.output_schema or {}),
example_calls=getattr(resource.properties, 'example_calls', None),
)
@durable_interrupt
async def wait_for_client_execution() -> dict[str, Any]:
return {
"tool_call_id": tool_call_id,
"tool_name": tool_name,
"input": kwargs,
"is_execution_phase": True,
}

# First run: raises GraphInterrupt with the tool call info.
# On resume: returns the client's result (output, isError, etc.)
# During evals: @mockable intercepts and returns simulated response.
result = await wait_for_client_execution()

# The resume value from the bridge is the endToolCall payload
output = result.get("output")
is_error = result.get("is_error", False)

content = str(output) if output is not None else ""
if isinstance(output, dict):
content = json.dumps(output)

return ToolMessage(
content=content,
tool_call_id=tool_call_id,
status="error" if is_error else "success",
response_metadata={CLIENT_SIDE_TOOL_MARKER: True},
)

# Patch signature so LangChain injects tool_call_id at runtime
original_sig = inspect.signature(client_side_tool_fn)
params = [p for p in original_sig.parameters.values() if p.name != "kwargs"] + [
inspect.Parameter("kwargs", inspect.Parameter.VAR_KEYWORD, annotation=Any),
]
client_side_tool_fn.__signature__ = original_sig.replace(parameters=params)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm a bit confused what this is for


tool = StructuredTool(
name=tool_name,
description=resource.description or f"Client-side tool: {tool_name}",
args_schema=input_model,
coroutine=client_side_tool_fn,
metadata={
CLIENT_SIDE_TOOL_MARKER: True,
"output_schema": resource.output_schema,
},
)

return tool
5 changes: 5 additions & 0 deletions src/uipath_langchain/agent/tools/tool_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from uipath.agent.models.agent import (
AgentClientSideToolResourceConfig,
AgentContextResourceConfig,
AgentEscalationResourceConfig,
AgentIntegrationToolResourceConfig,
Expand All @@ -18,6 +19,7 @@

from uipath_langchain.chat.hitl import REQUIRE_CONVERSATIONAL_CONFIRMATION

from .client_side_tool import create_client_side_tool
from .context_tool import create_context_tool
from .escalation_tool import create_escalation_tool
from .extraction_tool import create_ixp_extraction_tool
Expand Down Expand Up @@ -120,4 +122,7 @@ async def _build_tool_for_resource(
elif isinstance(resource, AgentIxpVsEscalationResourceConfig):
return create_ixp_escalation_tool(resource)

elif isinstance(resource, AgentClientSideToolResourceConfig):
return create_client_side_tool(resource)

Comment on lines +125 to +127
return None
6 changes: 6 additions & 0 deletions src/uipath_langchain/chat/hitl.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,18 @@ def request_approval(
"""
tool_call_id: str = tool_args.pop("tool_call_id")

# If this is a server-side tool (not client-side), execution follows immediately
# after confirmation — mark this as the execution trigger so the bridge emits
# executingToolCall. For client-side tools, the execution interrupt sets this instead.
is_execution_trigger = not (tool.metadata or {}).get("uipath_client_tool", False)
Comment on lines +129 to +132

@durable_interrupt
def ask_confirmation():
return {
"tool_call_id": tool_call_id,
"tool_name": tool.name,
"input": tool_args,
"is_execution_phase": is_execution_trigger,
}

response = ask_confirmation()
Expand Down
57 changes: 46 additions & 11 deletions src/uipath_langchain/runtime/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
UiPathConversationContentPartEndEvent,
UiPathConversationContentPartEvent,
UiPathConversationContentPartStartEvent,
UiPathConversationExecutingToolCallEvent,
UiPathConversationMessage,
UiPathConversationMessageData,
UiPathConversationMessageEndEvent,
Expand Down Expand Up @@ -60,6 +61,7 @@ def __init__(self, runtime_id: str, storage: UiPathRuntimeStorageProtocol | None
self.storage = storage
self.current_message: AIMessageChunk | AIMessage
self.tools_requiring_confirmation: dict[str, Any] = {}
self.client_side_tools: dict[str, Any] = {} # {tool_name: output_schema}
self.seen_message_ids: set[str] = set()
self._storage_lock = asyncio.Lock()
self._citation_stream_processor = CitationStreamProcessor()
Expand Down Expand Up @@ -436,15 +438,39 @@ async def map_current_message_to_start_tool_call_events(self):
tool_name in self.tools_requiring_confirmation
)
input_schema = self.tools_requiring_confirmation.get(tool_name)
is_client_side = tool_name in self.client_side_tools
output_schema = (
self.client_side_tools.get(tool_name)
if is_client_side
else None
)
events.append(
self.map_tool_call_to_tool_call_start_event(
self.current_message.id,
tool_call,
require_confirmation=require_confirmation or None,
input_schema=input_schema,
is_client_side_tool=is_client_side or None,
output_schema=output_schema,
)
)

# Emit executingToolCall from MessageMapper since there's no durable interrupt
# to trigger it from the runtime loop.
if not require_confirmation and not is_client_side:
events.append(
UiPathConversationMessageEvent(
Comment on lines +458 to +462
message_id=self.current_message.id,
tool_call=UiPathConversationToolCallEvent(
tool_call_id=tool_call["id"],
executing=UiPathConversationExecutingToolCallEvent(
tool_name=tool_call["name"],
input=tool_call["args"],
),
),
)
)

if self.storage is not None:
await self.storage.set_value(
self.runtime_id,
Expand Down Expand Up @@ -476,19 +502,24 @@ async def map_tool_message_to_events(
# Keep as string if not valid JSON
pass

events = [
UiPathConversationMessageEvent(
message_id=message_id,
tool_call=UiPathConversationToolCallEvent(
tool_call_id=message.tool_call_id,
end=UiPathConversationToolCallEndEvent(
timestamp=self.get_timestamp(),
output=content_value,
is_error=message.status == "error",
# Suppress endToolCall for client-side tools — the client already has the result (it produced it).
is_client_side = message.response_metadata.get("uipath_client_tool", False)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use CLIENT_SIDE_TOOL_MARKER

events: list[UiPathConversationMessageEvent] = []

if not is_client_side:
events.append(
Comment on lines +505 to +510
UiPathConversationMessageEvent(
message_id=message_id,
tool_call=UiPathConversationToolCallEvent(
tool_call_id=message.tool_call_id,
end=UiPathConversationToolCallEndEvent(
timestamp=self.get_timestamp(),
output=content_value,
is_error=message.status == "error",
),
),
),
)
)
]

if is_last_tool_call:
events.append(self.map_to_message_end_event(message_id))
Expand Down Expand Up @@ -546,6 +577,8 @@ def map_tool_call_to_tool_call_start_event(
*,
require_confirmation: bool | None = None,
input_schema: Any | None = None,
is_client_side_tool: bool | None = None,
output_schema: Any | None = None,
) -> UiPathConversationMessageEvent:
return UiPathConversationMessageEvent(
message_id=message_id,
Expand All @@ -557,6 +590,8 @@ def map_tool_call_to_tool_call_start_event(
input=tool_call["args"],
require_confirmation=require_confirmation,
input_schema=input_schema,
is_client_side_tool=is_client_side_tool,
output_schema=output_schema,
),
),
)
Expand Down
30 changes: 30 additions & 0 deletions src/uipath_langchain/runtime/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
)
from uipath.runtime.schema import UiPathRuntimeSchema

from uipath_langchain.agent.tools.client_side_tool import CLIENT_SIDE_TOOL_MARKER
from uipath_langchain.agent.tools.tool_node import RunnableCallableWithTool
from uipath_langchain.chat.hitl import get_confirmation_schema
from uipath_langchain.runtime.errors import LangGraphErrorCode, LangGraphRuntimeError
Expand Down Expand Up @@ -68,6 +69,7 @@ def __init__(
self.callbacks: list[BaseCallbackHandler] = callbacks or []
self.chat = UiPathChatMessagesMapper(self.runtime_id, storage)
self.chat.tools_requiring_confirmation = self._get_tool_confirmation_info()
self.chat.client_side_tools = self._get_client_side_tools()
self._middleware_node_names: set[str] = self._detect_middleware_nodes()
Comment on lines 71 to 73

async def execute(
Expand Down Expand Up @@ -522,6 +524,34 @@ def _get_tool_confirmation_info(self) -> dict[str, Any]:

return schemas

def _get_client_side_tools(self) -> dict[str, Any]:
"""Build {tool_name: output_schema} for client-side tools from compiled graph nodes."""

tools: dict[str, Any] = {}
for node_name, node_spec in self.graph.nodes.items():
bound = getattr(node_spec, "bound", None)
if bound is None:
continue

tool = getattr(bound, "tool", None)
if tool is not None:
metadata = getattr(tool, "metadata", None) or {}
if metadata.get(CLIENT_SIDE_TOOL_MARKER):
name = getattr(tool, "name", node_name)
tools[name] = metadata.get("output_schema")
continue

tools_by_name = getattr(bound, "tools_by_name", None)
if isinstance(tools_by_name, dict):
for name, tool in tools_by_name.items():
metadata = getattr(tool, "metadata", None) or {}
if metadata.get(CLIENT_SIDE_TOOL_MARKER):
tools[str(getattr(tool, "name", name))] = metadata.get(
"output_schema"
)

return tools

def _is_middleware_node(self, node_name: str) -> bool:
"""Check if a node name represents a middleware node."""
return node_name in self._middleware_node_names
Expand Down
Loading