Skip to content

Commit 22e6296

Browse files
authored
Merge pull request #984 from UiPath/feature/add-chat-agent-runtime
feat: add chat protocol implementation with socket io
2 parents 6b9652a + da986de commit 22e6296

File tree

4 files changed

+345
-40
lines changed

4 files changed

+345
-40
lines changed

pyproject.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
[project]
22
name = "uipath"
3-
version = "2.2.21"
3+
version = "2.2.22"
44
description = "Python SDK and CLI for UiPath Platform, enabling programmatic interaction with automation services, process management, and deployment tools."
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.11"
77
dependencies = [
8-
"uipath-runtime>=0.2.0, <0.3.0",
8+
"uipath-runtime>=0.2.2, <0.3.0",
99
"click>=8.3.1",
1010
"httpx>=0.28.1",
1111
"pyjwt>=2.10.1",
@@ -15,9 +15,9 @@ dependencies = [
1515
"rich>=14.2.0",
1616
"truststore>=0.10.1",
1717
"mockito>=1.5.4",
18-
"hydra-core>=1.3.2",
1918
"pydantic-function-models>=0.1.11",
2019
"pysignalr==1.3.0",
20+
"python-socketio>=5.15.0, <6.0.0",
2121
"coverage>=7.8.2",
2222
"mermaid-builder==0.0.3",
2323
]

src/uipath/_cli/_chat/_protocol.py

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
"""Chat bridge implementations for conversational agents."""
2+
3+
import asyncio
4+
import logging
5+
import os
6+
from typing import Any
7+
from urllib.parse import urlparse
8+
9+
import socketio # type: ignore[import-untyped]
10+
from socketio import AsyncClient
11+
from uipath.core.chat import (
12+
UiPathConversationEvent,
13+
UiPathConversationExchangeEndEvent,
14+
UiPathConversationExchangeEvent,
15+
)
16+
from uipath.runtime.chat import UiPathChatProtocol
17+
from uipath.runtime.context import UiPathRuntimeContext
18+
19+
logger = logging.getLogger(__name__)
20+
21+
22+
class WebSocketChatBridge:
23+
"""WebSocket-based chat bridge for streaming conversational events to CAS.
24+
25+
Implements UiPathChatBridgeProtocol using python-socketio library.
26+
"""
27+
28+
def __init__(
29+
self,
30+
websocket_url: str,
31+
conversation_id: str,
32+
exchange_id: str,
33+
headers: dict[str, str],
34+
auth: dict[str, Any] | None = None,
35+
):
36+
"""Initialize the WebSocket chat bridge.
37+
38+
Args:
39+
websocket_url: The WebSocket server URL to connect to
40+
conversation_id: The conversation ID for this session
41+
exchange_id: The exchange ID for this session
42+
headers: HTTP headers to send during connection
43+
auth: Optional authentication data to send during connection
44+
"""
45+
self.websocket_url = websocket_url
46+
self.conversation_id = conversation_id
47+
self.exchange_id = exchange_id
48+
self.auth = auth
49+
self.headers = headers
50+
self._client: AsyncClient | None = None
51+
self._connected_event = asyncio.Event()
52+
53+
async def connect(self, timeout: float = 10.0) -> None:
54+
"""Establish WebSocket connection to the server.
55+
56+
Args:
57+
timeout: Connection timeout in seconds (default: 10.0)
58+
59+
Raises:
60+
RuntimeError: If connection fails or times out
61+
62+
Example:
63+
```python
64+
manager = WebSocketManager("http://localhost:3000")
65+
await manager.connect()
66+
```
67+
"""
68+
if self._client is not None:
69+
logger.warning("WebSocket client already connected")
70+
return
71+
72+
# Create new SocketIO client
73+
self._client = socketio.AsyncClient(
74+
logger=logger,
75+
engineio_logger=logger,
76+
)
77+
78+
# Register connection event handlers
79+
self._client.on("connect", self._handle_connect)
80+
self._client.on("disconnect", self._handle_disconnect)
81+
self._client.on("connect_error", self._handle_connect_error)
82+
83+
# Clear connection event
84+
self._connected_event.clear()
85+
86+
try:
87+
# Attempt to connect with timeout
88+
logger.info(f"Connecting to WebSocket server: {self.websocket_url}")
89+
90+
await asyncio.wait_for(
91+
self._client.connect(
92+
url=self.websocket_url,
93+
headers=self.headers,
94+
auth=self.auth,
95+
transports=["websocket"],
96+
),
97+
timeout=timeout,
98+
)
99+
100+
# Wait for connection confirmation
101+
await asyncio.wait_for(self._connected_event.wait(), timeout=timeout)
102+
103+
logger.info("WebSocket connection established successfully")
104+
105+
except asyncio.TimeoutError as e:
106+
error_message = (
107+
f"Failed to connect to WebSocket server within {timeout}s timeout"
108+
)
109+
logger.error(error_message)
110+
await self._cleanup_client()
111+
raise RuntimeError(error_message) from e
112+
113+
except Exception as e:
114+
error_message = f"Failed to connect to WebSocket server: {e}"
115+
logger.error(error_message)
116+
await self._cleanup_client()
117+
raise RuntimeError(error_message) from e
118+
119+
async def disconnect(self) -> None:
120+
"""Close the WebSocket connection gracefully.
121+
122+
Sends an exchange end event before disconnecting to signal that the
123+
exchange is complete. Uses stored conversation/exchange IDs.
124+
"""
125+
if self._client is None:
126+
logger.warning("WebSocket client not connected")
127+
return
128+
129+
# Send exchange end event using stored IDs
130+
if self._client and self._connected_event.is_set():
131+
try:
132+
end_event = UiPathConversationEvent(
133+
conversation_id=self.conversation_id,
134+
exchange=UiPathConversationExchangeEvent(
135+
exchange_id=self.exchange_id,
136+
end=UiPathConversationExchangeEndEvent(),
137+
),
138+
)
139+
event_data = end_event.model_dump(
140+
mode="json", exclude_none=True, by_alias=True
141+
)
142+
await self._client.emit("ConversationEvent", event_data)
143+
logger.info("Exchange end event sent")
144+
except Exception as e:
145+
logger.warning(f"Error sending exchange end event: {e}")
146+
147+
try:
148+
logger.info("Disconnecting from WebSocket server")
149+
await self._client.disconnect()
150+
logger.info("WebSocket disconnected successfully")
151+
except Exception as e:
152+
logger.error(f"Error during WebSocket disconnect: {e}")
153+
finally:
154+
await self._cleanup_client()
155+
156+
async def emit_message_event(self, message_event: Any) -> None:
157+
"""Wrap and send a message event to the WebSocket server.
158+
159+
Args:
160+
message_event: UiPathConversationMessageEvent to wrap and send
161+
162+
Raises:
163+
RuntimeError: If client is not connected
164+
"""
165+
if self._client is None:
166+
raise RuntimeError("WebSocket client not connected. Call connect() first.")
167+
168+
if not self._connected_event.is_set():
169+
raise RuntimeError("WebSocket client not in connected state")
170+
171+
try:
172+
# Wrap message event with conversation/exchange IDs
173+
wrapped_event = UiPathConversationEvent(
174+
conversation_id=self.conversation_id,
175+
exchange=UiPathConversationExchangeEvent(
176+
exchange_id=self.exchange_id,
177+
message=message_event,
178+
),
179+
)
180+
181+
event_data = wrapped_event.model_dump(
182+
mode="json", exclude_none=True, by_alias=True
183+
)
184+
185+
logger.debug("Sending conversation event to WebSocket")
186+
await self._client.emit("ConversationEvent", event_data)
187+
logger.debug("Conversation event sent successfully")
188+
189+
except Exception as e:
190+
logger.error(f"Error sending conversation event to WebSocket: {e}")
191+
raise RuntimeError(f"Failed to send conversation event: {e}") from e
192+
193+
@property
194+
def is_connected(self) -> bool:
195+
"""Check if the WebSocket is currently connected.
196+
197+
Returns:
198+
True if connected, False otherwise
199+
"""
200+
return self._client is not None and self._connected_event.is_set()
201+
202+
async def _handle_connect(self) -> None:
203+
"""Handle successful connection event."""
204+
logger.info("WebSocket connection established")
205+
self._connected_event.set()
206+
207+
async def _handle_disconnect(self) -> None:
208+
"""Handle disconnection event."""
209+
logger.info("WebSocket connection closed")
210+
self._connected_event.clear()
211+
212+
async def _handle_connect_error(self, data: Any) -> None:
213+
"""Handle connection error event."""
214+
logger.error(f"WebSocket connection error: {data}")
215+
216+
async def _cleanup_client(self) -> None:
217+
"""Clean up client resources."""
218+
self._connected_event.clear()
219+
self._client = None
220+
221+
222+
def get_chat_bridge(
223+
context: UiPathRuntimeContext,
224+
conversation_id: str,
225+
exchange_id: str,
226+
) -> UiPathChatProtocol:
227+
"""Factory to get WebSocket chat bridge for conversational agents.
228+
229+
Args:
230+
context: The runtime context containing environment configuration
231+
conversation_id: The conversation ID for this session
232+
exchange_id: The exchange ID for this session
233+
234+
Returns:
235+
WebSocketChatBridge instance configured for CAS
236+
237+
Raises:
238+
RuntimeError: If UIPATH_URL is not set or invalid
239+
240+
Example:
241+
```python
242+
bridge = get_chat_bridge(context, "conv-123", "exch-456")
243+
await bridge.connect()
244+
await bridge.emit_message_event(message_event)
245+
await bridge.disconnect(conversation_id, exchange_id)
246+
```
247+
"""
248+
# Extract host from UIPATH_URL
249+
base_url = os.environ.get("UIPATH_URL")
250+
if not base_url:
251+
raise RuntimeError(
252+
"UIPATH_URL environment variable required for conversational mode"
253+
)
254+
255+
parsed = urlparse(base_url)
256+
if not parsed.netloc:
257+
raise RuntimeError(f"Invalid UIPATH_URL format: {base_url}")
258+
259+
host = parsed.netloc
260+
261+
# Construct WebSocket URL for CAS
262+
websocket_url = f"wss://{host}/autopilotforeveryone_/websocket_/socket.io?conversationId={conversation_id}"
263+
264+
# Build headers from context
265+
headers = {
266+
"Authorization": f"Bearer {os.environ.get('UIPATH_ACCESS_TOKEN', '')}",
267+
"X-UiPath-Internal-TenantId": context.tenant_id
268+
or os.environ.get("UIPATH_TENANT_ID", ""),
269+
"X-UiPath-Internal-AccountId": context.org_id
270+
or os.environ.get("UIPATH_ORGANIZATION_ID", ""),
271+
"X-UiPath-ConversationId": conversation_id,
272+
}
273+
274+
return WebSocketChatBridge(
275+
websocket_url=websocket_url,
276+
conversation_id=conversation_id,
277+
exchange_id=exchange_id,
278+
headers=headers,
279+
)

src/uipath/_cli/cli_run.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ async def execute() -> None:
162162
runtime = await factory.new_runtime(
163163
entrypoint, ctx.job_id or "default"
164164
)
165+
165166
if ctx.job_id:
166167
trace_manager.add_span_exporter(LlmOpsHttpExporter())
167168
ctx.result = await execute_runtime(ctx, runtime)

0 commit comments

Comments
 (0)