Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-langchain"
version = "0.8.24"
version = "0.8.25"
description = "Python SDK that enables developers to build and deploy LangGraph agents to the UiPath Cloud Platform"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Durable interrupt package for side-effect-safe interrupt/resume in LangGraph."""

from .decorator import _durable_state, durable_interrupt
from .decorator import (
_durable_state,
durable_interrupt,
)
from .skip_interrupt import SkipInterruptValue

__all__ = [
Expand Down
11 changes: 11 additions & 0 deletions src/uipath_langchain/agent/tools/tool_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
LowCodeAgentDefinition,
)

from uipath_langchain.chat.hitl import REQUIRE_CONVERSATIONAL_CONFIRMATION

from .context_tool import create_context_tool
from .escalation_tool import create_escalation_tool
from .extraction_tool import create_ixp_extraction_tool
Expand Down Expand Up @@ -54,6 +56,15 @@ async def create_tools_from_resources(
else:
tools.append(tool)

if agent.is_conversational:
props = getattr(resource, "properties", None)
if props and getattr(
props, REQUIRE_CONVERSATIONAL_CONFIRMATION, False
):
if tool.metadata is None:
tool.metadata = {}
tool.metadata[REQUIRE_CONVERSATIONAL_CONFIRMATION] = True

Comment thread
andreitava-uip marked this conversation as resolved.
return tools


Expand Down
32 changes: 30 additions & 2 deletions src/uipath_langchain/agent/tools/tool_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
extract_current_tool_call_index,
find_latest_ai_message,
)
from uipath_langchain.chat.hitl import request_conversational_tool_confirmation

# the type safety can be improved with generics
ToolWrapperReturnType = dict[str, Any] | Command[Any] | None
Expand Down Expand Up @@ -80,6 +81,15 @@ def _func(self, state: AgentGraphState) -> OutputType:
if call is None:
return None

# prompt user for approval if tool requires confirmation
conversational_confirmation = request_conversational_tool_confirmation(
call, self.tool
)
if conversational_confirmation:
if conversational_confirmation.cancelled:
# tool confirmation rejected
return self._process_result(call, conversational_confirmation.cancelled)

try:
if self.wrapper:
inputs = self._prepare_wrapper_inputs(
Expand All @@ -88,7 +98,11 @@ def _func(self, state: AgentGraphState) -> OutputType:
result = self.wrapper(*inputs)
else:
result = self.tool.invoke(call)
return self._process_result(call, result)
output = self._process_result(call, result)
if conversational_confirmation:
# HITL approved - apply confirmation metadata to tool result message
conversational_confirmation.annotate_result(output)
return output
except GraphBubbleUp:
# LangGraph uses exceptions for interrupt control flow — re-raise so
# handle_tool_errors doesn't swallow expected interrupts as errors.
Expand All @@ -104,15 +118,29 @@ async def _afunc(self, state: AgentGraphState) -> OutputType:
if call is None:
return None

# prompt user for approval if tool requires confirmation
conversational_confirmation = request_conversational_tool_confirmation(
call, self.tool
)
if conversational_confirmation:
if conversational_confirmation.cancelled:
# tool confirmation rejected
return self._process_result(call, conversational_confirmation.cancelled)

try:
if self.awrapper:
inputs = self._prepare_wrapper_inputs(
self.awrapper, self.tool, call, state
)

result = await self.awrapper(*inputs)
else:
result = await self.tool.ainvoke(call)
return self._process_result(call, result)
output = self._process_result(call, result)
if conversational_confirmation:
# HITL approved - apply confirmation metadata to tool result message
conversational_confirmation.annotate_result(output)
return output
except GraphBubbleUp:
# LangGraph uses exceptions for interrupt control flow — re-raise so
# handle_tool_errors doesn't swallow expected interrupts as errors.
Expand Down
112 changes: 102 additions & 10 deletions src/uipath_langchain/chat/hitl.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,64 @@
import functools
import inspect
import json
from inspect import Parameter
from typing import Annotated, Any, Callable
from typing import Annotated, Any, Callable, NamedTuple

from langchain_core.messages.tool import ToolCall, ToolMessage
from langchain_core.tools import BaseTool, InjectedToolCallId
from langchain_core.tools import tool as langchain_tool
from langgraph.types import interrupt
from uipath.core.chat import (
UiPathConversationToolCallConfirmationValue,
)

_CANCELLED_MESSAGE = "Cancelled by user"
CANCELLED_MESSAGE = "Cancelled by user"

CONVERSATIONAL_APPROVED_TOOL_ARGS = "conversational_approved_tool_args"
REQUIRE_CONVERSATIONAL_CONFIRMATION = "require_conversational_confirmation"


class ConfirmationResult(NamedTuple):
"""Result of a tool confirmation check."""

cancelled: ToolMessage | None # ToolMessage if cancelled, None if approved
args_modified: bool
approved_args: dict[str, Any] | None = None

def annotate_result(self, output: dict[str, Any] | Any) -> None:
"""Apply confirmation metadata to a tool result message."""
Comment thread
JoshParkSJ marked this conversation as resolved.
msg = None
if isinstance(output, dict):
messages = output.get("messages")
if messages:
msg = messages[0]
else:
# Tools with @durable_interrupt return a Command whose messages
# are nested under output.update["messages"].
update = getattr(output, "update", None)
if isinstance(update, dict):
messages = update.get("messages")
if messages:
msg = messages[0]
if msg is None:
return
if self.approved_args is not None:
msg.response_metadata[CONVERSATIONAL_APPROVED_TOOL_ARGS] = (
self.approved_args
)
if self.args_modified:
try:
result_value = json.loads(msg.content)
except (json.JSONDecodeError, TypeError):
result_value = msg.content
msg.content = json.dumps(
{
"meta": {
"args_modified_by_user": True,
"executed_args": self.approved_args,
},
"result": result_value,
}
)


def _patch_span_input(approved_args: dict[str, Any]) -> None:
Expand Down Expand Up @@ -53,7 +101,7 @@ def _patch_span_input(approved_args: dict[str, Any]) -> None:
pass


def _request_approval(
def request_approval(
tool_args: dict[str, Any],
tool: BaseTool,
) -> dict[str, Any] | None:
Expand All @@ -70,14 +118,20 @@ def _request_approval(
if tool_call_schema is not None:
input_schema = tool_call_schema.model_json_schema()

response = interrupt(
UiPathConversationToolCallConfirmationValue(
# Lazy import to avoid circular dependency:
# hitl -> agent.tools.durable_interrupt -> agent.tools -> tool_node -> hitl
from uipath_langchain.agent.tools.durable_interrupt import durable_interrupt
Comment thread
JoshParkSJ marked this conversation as resolved.
Outdated

@durable_interrupt
def ask_confirmation():
return UiPathConversationToolCallConfirmationValue(
tool_call_id=tool_call_id,
tool_name=tool.name,
input_schema=input_schema,
input_value=tool_args,
)
)

response = ask_confirmation()

# The resume payload from CAS has shape:
# {"type": "uipath_cas_tool_call_confirmation",
Expand All @@ -89,9 +143,46 @@ def _request_approval(
if not confirmation.get("approved", True):
return None

return confirmation.get("input") or tool_args
return (
confirmation.get("input")
if confirmation.get("input") is not None
else tool_args
)


# for conversational low code agents
def request_conversational_tool_confirmation(
call: ToolCall, tool: BaseTool
) -> ConfirmationResult | None:
"""Check whether a tool requires user confirmation and request approval"""
if not (tool.metadata and tool.metadata.get(REQUIRE_CONVERSATIONAL_CONFIRMATION)):
return None

original_args = call["args"]
approved_args = request_approval(
{**original_args, "tool_call_id": call["id"]}, tool
)
if approved_args is None:
cancelled_msg = ToolMessage(
content=json.dumps({"meta": CANCELLED_MESSAGE}),
name=call["name"],
tool_call_id=call["id"],
)
cancelled_msg.response_metadata[CONVERSATIONAL_APPROVED_TOOL_ARGS] = (
original_args
)
return ConfirmationResult(cancelled=cancelled_msg, args_modified=False)

# Mutate call args so the tool executes with the approved values
call["args"] = approved_args
return ConfirmationResult(
Comment thread
andreitava-uip marked this conversation as resolved.
cancelled=None,
args_modified=approved_args != original_args,
approved_args=approved_args,
)


# for conversational coded agents
def requires_approval(
func: Callable[..., Any] | None = None,
*,
Expand All @@ -107,9 +198,10 @@ def decorator(fn: Callable[..., Any]) -> BaseTool:
# wrap the tool/function
@functools.wraps(fn)
def wrapper(**tool_args: Any) -> Any:
approved_args = _request_approval(tool_args, _created_tool[0])
approved_args = request_approval(tool_args, _created_tool[0])
if approved_args is None:
return _CANCELLED_MESSAGE
return json.dumps({"meta": CANCELLED_MESSAGE})

_patch_span_input(approved_args)
return fn(**approved_args)

Expand Down
17 changes: 12 additions & 5 deletions src/uipath_langchain/runtime/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(self, runtime_id: str, storage: UiPathRuntimeStorageProtocol | None
"""Initialize the mapper with empty state."""
self.runtime_id = runtime_id
self.storage = storage
self.tool_names_requiring_confirmation: set[str] = set()
self.current_message: AIMessageChunk
self.seen_message_ids: set[str] = set()
self._storage_lock = asyncio.Lock()
Expand Down Expand Up @@ -389,11 +390,17 @@ async def map_current_message_to_start_tool_call_events(self):
tool_call_id_to_message_id_map[tool_call_id] = (
self.current_message.id
)
events.append(
self.map_tool_call_to_tool_call_start_event(
self.current_message.id, tool_call

# if tool requires confirmation, we skip start tool call
if (
tool_call["name"]
not in self.tool_names_requiring_confirmation
):
events.append(
self.map_tool_call_to_tool_call_start_event(
self.current_message.id, tool_call
)
)
)

if self.storage is not None:
await self.storage.set_value(
Expand Down Expand Up @@ -665,7 +672,7 @@ def _map_langchain_ai_message_to_uipath_message_data(
role="assistant",
content_parts=content_parts,
tool_calls=uipath_tool_calls,
interrupts=[], # TODO: Interrupts
interrupts=[],
)


Expand Down
16 changes: 16 additions & 0 deletions src/uipath_langchain/runtime/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
)
from uipath.runtime.schema import UiPathRuntimeSchema

from uipath_langchain.chat.hitl import REQUIRE_CONVERSATIONAL_CONFIRMATION
from uipath_langchain.runtime.errors import LangGraphErrorCode, LangGraphRuntimeError
from uipath_langchain.runtime.messages import UiPathChatMessagesMapper
from uipath_langchain.runtime.schema import get_entrypoints_schema, get_graph_schema
Expand Down Expand Up @@ -64,6 +65,9 @@ def __init__(
self.entrypoint: str | None = entrypoint
self.callbacks: list[BaseCallbackHandler] = callbacks or []
self.chat = UiPathChatMessagesMapper(self.runtime_id, storage)
self.chat.tool_names_requiring_confirmation = (
self._get_tool_names_requiring_confirmation()
)
self._middleware_node_names: set[str] = self._detect_middleware_nodes()

async def execute(
Expand Down Expand Up @@ -486,6 +490,18 @@ def _detect_middleware_nodes(self) -> set[str]:

return middleware_nodes

def _get_tool_names_requiring_confirmation(self) -> set[str]:
names: set[str] = set()
for node_name, node_spec in self.graph.nodes.items():
# langgraph's processing node.bound -> runnable.tool -> baseTool (if tool node)
tool = getattr(getattr(node_spec, "bound", None), "tool", None)
if tool is None:
continue
metadata = getattr(tool, "metadata", None) or {}
if metadata.get(REQUIRE_CONVERSATIONAL_CONFIRMATION):
names.add(getattr(tool, "name", node_name))
return names

def _is_middleware_node(self, node_name: str) -> bool:
"""Check if a node name represents a middleware node."""
return node_name in self._middleware_node_names
Expand Down
Loading
Loading