66import uuid
77import warnings
88from os import environ as env
9- from typing import Any , Optional
9+ from typing import Any , Dict , Optional
1010
1111import httpx
1212from langchain_core .tracers .base import AsyncBaseTracer
1313from langchain_core .tracers .schemas import Run
1414from pydantic import PydanticDeprecationWarning
15+ from uipath_sdk ._cli ._runtime ._contracts import UiPathTraceContext
1516
17+ from ._events import CustomTraceEvents , FunctionCallEventData
1618from ._utils import _simple_serialize_defaults
1719
1820logger = logging .getLogger (__name__ )
@@ -25,78 +27,96 @@ class Status:
2527
2628
2729class AsyncUiPathTracer (AsyncBaseTracer ):
28- def __init__ (self , client = None , ** kwargs ):
30+ def __init__ (
31+ self ,
32+ context : Optional [UiPathTraceContext ] = None ,
33+ client : Optional [httpx .AsyncClient ] = None ,
34+ ** kwargs ,
35+ ):
2936 super ().__init__ (** kwargs )
3037
3138 self .client = client or httpx .AsyncClient ()
3239 self .retries = 3
3340 self .log_queue : queue .Queue [dict [str , Any ]] = queue .Queue ()
3441
42+ self .context = context or UiPathTraceContext ()
43+
3544 llm_ops_pattern = self ._get_base_url () + "{orgId}/llmops_"
36- self .orgId = env .get (
37- "UIPATH_ORGANIZATION_ID" , "00000000-0000-0000-0000-000000000000"
38- )
39- self .tenantId = env .get (
40- "UIPATH_TENANT_ID" , "00000000-0000-0000-0000-000000000000"
41- )
42- self .url = llm_ops_pattern .format (orgId = self .orgId ).rstrip ("/" )
4345
44- self .auth_token = env .get ("UNATTENDED_USER_ACCESS_TOKEN" ) or env .get (
46+ self .url = llm_ops_pattern .format (orgId = self .context .org_id ).rstrip ("/" )
47+
48+ auth_token = env .get ("UNATTENDED_USER_ACCESS_TOKEN" ) or env .get (
4549 "UIPATH_ACCESS_TOKEN"
4650 )
4751
48- self .jobKey = env .get ("UIPATH_JOB_KEY" )
49- self .folderKey = env .get ("UIPATH_FOLDER_KEY" )
50- self .processKey = env .get ("UIPATH_PROCESS_UUID" )
51- self .parent_span_id = env .get ("UIPATH_PARENT_SPAN_ID" )
52-
53- self .referenceId = self .jobKey or str (uuid .uuid4 ())
54-
55- self .headers = {
56- "Authorization" : f"Bearer { self .auth_token } " ,
57- }
52+ self .headers = {"Authorization" : f"Bearer { auth_token } " }
5853
5954 self .running = True
6055 self .worker_task = asyncio .create_task (self ._worker ())
56+ self .function_call_run_map : Dict [str , Run ] = {}
57+
58+ async def on_custom_event (
59+ self ,
60+ name : str ,
61+ data : Any ,
62+ * ,
63+ run_id : uuid .UUID ,
64+ tags = None ,
65+ metadata = None ,
66+ ** kwargs : Any ,
67+ ) -> None :
68+ if name == CustomTraceEvents .UIPATH_TRACE_FUNCTION_CALL :
69+ # only handle the function call event
70+
71+ if not isinstance (data , FunctionCallEventData ):
72+ logger .warning (
73+ f"Received unexpected data type for function call event: { type (data )} "
74+ )
75+ return
6176
62- def _get_base_url ( self ) -> str :
63- uipath_url = (
64- env . get ( "UIPATH_URL" ) or "https://cloud.uipath.com/dummyOrg/dummyTennant/"
65- )
66- uipath_url = uipath_url . rstrip ( "/" )
77+ if data . event_type == "call" :
78+ run = self . run_map [ str ( run_id )]
79+ child_run = run . create_child (
80+ name = data . function_name , run_type = data . run_type , tags = data . tags
81+ )
6782
68- # split by "//" to get ['', 'https:', 'alpha.uipath.com/ada/byoa']
69- parts = uipath_url . split ( "//" )
83+ if data . metadata is not None :
84+ run . add_metadata ( data . metadata )
7085
71- # after splitting by //, the base URL will be at index 1 along with the rest,
72- # hence split it again using "/" to get ['https:', 'alpha.uipath.com', 'ada', 'byoa']
73- base_url_parts = parts [1 ].split ("/" )
86+ call_uuid = data .call_uuid
87+ self .function_call_run_map [call_uuid ] = child_run
7488
75- # combine scheme and netloc to get the base URL
76- base_url = parts [0 ] + "//" + base_url_parts [0 ] + "/"
89+ self ._send_span (run )
7790
78- return base_url
91+ if data .event_type == "completion" :
92+ call_uuid = data .call_uuid
93+ previous_run = self .function_call_run_map .pop (call_uuid , None )
94+
95+ if previous_run :
96+ previous_run .end (self ._safe_dict_dump (data .output ), data .error )
97+ self ._send_span (previous_run )
7998
8099 async def init_trace (self , run_name , trace_id = None ) -> None :
81- trace_id_env = env .get ("UIPATH_TRACE_ID" )
100+ if self .context .trace_id :
101+ # trace id already set no need to do anything
102+ return
82103
83- if trace_id_env :
84- self .trace_parent = trace_id_env
85- else :
86- await self .start_trace (run_name , trace_id )
104+ # no trace id, start a new trace
105+ await self .start_trace (run_name , trace_id )
87106
88107 async def start_trace (self , run_name , trace_id = None ) -> None :
89- self .trace_parent = trace_id or str (uuid .uuid4 ())
90- run_name = run_name or f"Job Run: { self .trace_parent } "
108+ self .context .trace_id = str (uuid .uuid4 ())
109+
110+ run_name = run_name or f"Job Run: { self .context .trace_id } "
91111 trace_data = {
92- "id" : self .trace_parent ,
112+ "id" : self .context . trace_id ,
93113 "name" : re .sub (
94114 "[!@#$<>\.]" , "" , run_name
95115 ), # if we use these characters the Agents UI throws some error (but llmops backend seems fine)
96- "referenceId" : self .referenceId ,
116+ "referenceId" : self .context . reference_id ,
97117 "attributes" : "{}" ,
98- "organizationId" : self .orgId ,
99- "tenantId" : self .tenantId ,
118+ "organizationId" : self .context . org_id ,
119+ "tenantId" : self .context . tenant_id ,
100120 }
101121
102122 for attempt in range (self .retries ):
@@ -174,9 +194,9 @@ async def _worker(self):
174194
175195 async def _persist_run (self , run : Run ) -> None :
176196 # Determine if this is a start or end trace based on whether end_time is set
177- await self ._send_span (run )
197+ self ._send_span (run )
178198
179- async def _send_span (self , run : Run ) -> None :
199+ def _send_span (self , run : Run ) -> None :
180200 """Send span data for a run to the API"""
181201 run_id = str (run .id )
182202
@@ -191,27 +211,27 @@ async def _send_span(self, run: Run) -> None:
191211 parent_id = (
192212 str (run .parent_run_id )
193213 if run .parent_run_id is not None
194- else self .parent_span_id
214+ else self .context . parent_span_id
195215 )
196- attributes = self ._safe_json_dump (self ._run_to_dict (run ))
216+ attributes = self ._safe_jsons_dump (self ._run_to_dict (run ))
197217 status = self ._determine_status (run .error )
198218
199219 span_data = {
200220 "id" : run_id ,
201221 "parentId" : parent_id ,
202- "traceId" : self .trace_parent ,
222+ "traceId" : self .context . trace_id ,
203223 "name" : run .name ,
204224 "startTime" : start_time ,
205225 "endTime" : end_time ,
206- "referenceId" : self .referenceId ,
226+ "referenceId" : self .context . reference_id ,
207227 "attributes" : attributes ,
208- "organizationId" : self .orgId ,
209- "tenantId" : self .tenantId ,
228+ "organizationId" : self .context . org_id ,
229+ "tenantId" : self .context . tenant_id ,
210230 "spanType" : "LangGraphRun" ,
211231 "status" : status ,
212- "jobKey" : self .jobKey ,
213- "folderKey" : self .folderKey ,
214- "processKey" : self .processKey ,
232+ "jobKey" : self .context . job_id ,
233+ "folderKey" : self .context . folder_key ,
234+ "processKey" : self .context . folder_key ,
215235 }
216236
217237 self .log_queue .put (span_data )
@@ -235,14 +255,23 @@ def _determine_status(self, error: Optional[str]):
235255
236256 return Status .SUCCESS
237257
238- def _safe_json_dump (self , obj ) -> str :
258+ def _safe_jsons_dump (self , obj ) -> str :
239259 try :
240260 json_str = json .dumps (obj , default = _simple_serialize_defaults )
241261 return json_str
242262 except Exception as e :
243- logger .warning (e )
263+ logger .warning (f"Error serializing object to JSON: { e } " )
244264 return "{ }"
245265
266+ def _safe_dict_dump (self , obj ) -> Dict [str , Any ]:
267+ try :
268+ serialized = json .loads (json .dumps (obj , default = _simple_serialize_defaults ))
269+ return serialized
270+ except Exception as e :
271+ # Last resort - string representation
272+ logger .warning (f"Error serializing object to JSON: { e } " )
273+ return str (obj )
274+
246275 def _run_to_dict (self , run : Run ):
247276 with warnings .catch_warnings ():
248277 warnings .simplefilter ("ignore" , category = PydanticDeprecationWarning )
@@ -252,3 +281,21 @@ def _run_to_dict(self, run: Run):
252281 "inputs" : run .inputs .copy () if run .inputs is not None else None ,
253282 "outputs" : run .outputs .copy () if run .outputs is not None else None ,
254283 }
284+
285+ def _get_base_url (self ) -> str :
286+ uipath_url = (
287+ env .get ("UIPATH_URL" ) or "https://cloud.uipath.com/dummyOrg/dummyTennant/"
288+ )
289+ uipath_url = uipath_url .rstrip ("/" )
290+
291+ # split by "//" to get ['', 'https:', 'alpha.uipath.com/ada/byoa']
292+ parts = uipath_url .split ("//" )
293+
294+ # after splitting by //, the base URL will be at index 1 along with the rest,
295+ # hence split it again using "/" to get ['https:', 'alpha.uipath.com', 'ada', 'byoa']
296+ base_url_parts = parts [1 ].split ("/" )
297+
298+ # combine scheme and netloc to get the base URL
299+ base_url = parts [0 ] + "//" + base_url_parts [0 ] + "/"
300+
301+ return base_url
0 commit comments