66import uuid
77import warnings
88from os import environ as env
9- from typing import Any , Optional
9+ from typing import Any , Dict , Optional
1010
1111import httpx
1212from langchain_core .tracers .base import AsyncBaseTracer
1313from langchain_core .tracers .schemas import Run
1414from pydantic import PydanticDeprecationWarning
15+ from uipath_sdk ._cli ._runtime ._contracts import UiPathTraceContext
1516
17+ from ._events import CustomTraceEvents , FunctionCallEventData
1618from ._utils import _simple_serialize_defaults
1719
1820logger = logging .getLogger (__name__ )
@@ -25,78 +27,98 @@ class Status:
2527
2628
2729class AsyncUiPathTracer (AsyncBaseTracer ):
28- def __init__ (self , client = None , ** kwargs ):
30+ def __init__ (
31+ self ,
32+ context : Optional [UiPathTraceContext ] = None ,
33+ client : Optional [httpx .AsyncClient ] = None ,
34+ ** kwargs ,
35+ ):
2936 super ().__init__ (** kwargs )
3037
3138 self .client = client or httpx .AsyncClient ()
3239 self .retries = 3
3340 self .log_queue : queue .Queue [dict [str , Any ]] = queue .Queue ()
3441
42+ self .context = context or UiPathTraceContext ()
43+
3544 llm_ops_pattern = self ._get_base_url () + "{orgId}/llmops_"
36- self .orgId = env .get (
37- "UIPATH_ORGANIZATION_ID" , "00000000-0000-0000-0000-000000000000"
38- )
39- self .tenantId = env .get (
40- "UIPATH_TENANT_ID" , "00000000-0000-0000-0000-000000000000"
41- )
42- self .url = llm_ops_pattern .format (orgId = self .orgId ).rstrip ("/" )
4345
44- self .auth_token = env .get ("UNATTENDED_USER_ACCESS_TOKEN" ) or env .get (
46+ self .url = llm_ops_pattern .format (orgId = self .context .org_id ).rstrip ("/" )
47+
48+ auth_token = env .get ("UNATTENDED_USER_ACCESS_TOKEN" ) or env .get (
4549 "UIPATH_ACCESS_TOKEN"
4650 )
4751
48- self .jobKey = env .get ("UIPATH_JOB_KEY" )
49- self .folderKey = env .get ("UIPATH_FOLDER_KEY" )
50- self .processKey = env .get ("UIPATH_PROCESS_UUID" )
51- self .parent_span_id = env .get ("UIPATH_PARENT_SPAN_ID" )
52-
53- self .referenceId = self .jobKey or str (uuid .uuid4 ())
54-
55- self .headers = {
56- "Authorization" : f"Bearer { self .auth_token } " ,
57- }
52+ self .headers = {"Authorization" : f"Bearer { auth_token } " }
5853
5954 self .running = True
6055 self .worker_task = asyncio .create_task (self ._worker ())
56+ self .function_call_run_map : Dict [str , Run ] = {}
57+
58+ async def on_custom_event (
59+ self ,
60+ name : str ,
61+ data : Any ,
62+ * ,
63+ run_id : uuid .UUID ,
64+ tags = None ,
65+ metadata = None ,
66+ ** kwargs : Any ,
67+ ) -> None :
68+ if name == CustomTraceEvents .UIPATH_TRACE_FUNCTION_CALL :
69+ # only handle the function call event
70+
71+ if not isinstance (data , FunctionCallEventData ):
72+ logger .warning (
73+ f"Received unexpected data type for function call event: { type (data )} "
74+ )
75+ return
6176
62- def _get_base_url ( self ) -> str :
63- uipath_url = (
64- env . get ( "UIPATH_URL" ) or "https://cloud.uipath.com/dummyOrg/dummyTennant/"
65- )
66- uipath_url = uipath_url . rstrip ( "/" )
77+ if data . event_type == "call" :
78+ run = self . run_map [ str ( run_id )]
79+ child_run = run . create_child (
80+ name = data . function_name , run_type = data . run_type , tags = data . tags
81+ )
6782
68- # split by "//" to get ['', 'https:', 'alpha.uipath.com/ada/byoa']
69- parts = uipath_url . split ( "//" )
83+ if data . metadata is not None :
84+ run . add_metadata ( data . metadata )
7085
71- # after splitting by //, the base URL will be at index 1 along with the rest,
72- # hence split it again using "/" to get ['https:', 'alpha.uipath.com', 'ada', 'byoa']
73- base_url_parts = parts [1 ].split ("/" )
86+ call_uuid = data .call_uuid
87+ self .function_call_run_map [call_uuid ] = child_run
7488
75- # combine scheme and netloc to get the base URL
76- base_url = parts [0 ] + "//" + base_url_parts [0 ] + "/"
89+ self ._send_span (run )
7790
78- return base_url
91+ if data .event_type == "completion" :
92+ call_uuid = data .call_uuid
93+ previous_run = self .function_call_run_map .pop (call_uuid , None )
94+
95+ if previous_run :
96+ previous_run .end (
97+ outputs = self ._safe_dict_dump (data .output ), error = data .error
98+ )
99+ self ._send_span (previous_run )
79100
80101 async def init_trace (self , run_name , trace_id = None ) -> None :
81- trace_id_env = env .get ("UIPATH_TRACE_ID" )
102+ if self .context .trace_id :
103+ # trace id already set no need to do anything
104+ return
82105
83- if trace_id_env :
84- self .trace_parent = trace_id_env
85- else :
86- await self .start_trace (run_name , trace_id )
106+ # no trace id, start a new trace
107+ await self .start_trace (run_name , trace_id )
87108
88109 async def start_trace (self , run_name , trace_id = None ) -> None :
89- self .trace_parent = trace_id or str (uuid .uuid4 ())
90- run_name = run_name or f"Job Run: { self .trace_parent } "
110+ self .context .trace_id = str (uuid .uuid4 ())
111+
112+ run_name = run_name or f"Job Run: { self .context .trace_id } "
91113 trace_data = {
92- "id" : self .trace_parent ,
114+ "id" : self .context . trace_id ,
93115 "name" : re .sub (
94116 "[!@#$<>\.]" , "" , run_name
95117 ), # if we use these characters the Agents UI throws some error (but llmops backend seems fine)
96- "referenceId" : self .referenceId ,
118+ "referenceId" : self .context . reference_id ,
97119 "attributes" : "{}" ,
98- "organizationId" : self .orgId ,
99- "tenantId" : self .tenantId ,
120+ "organizationId" : self .context . org_id ,
121+ "tenantId" : self .context . tenant_id ,
100122 }
101123
102124 for attempt in range (self .retries ):
@@ -174,9 +196,9 @@ async def _worker(self):
174196
175197 async def _persist_run (self , run : Run ) -> None :
176198 # Determine if this is a start or end trace based on whether end_time is set
177- await self ._send_span (run )
199+ self ._send_span (run )
178200
179- async def _send_span (self , run : Run ) -> None :
201+ def _send_span (self , run : Run ) -> None :
180202 """Send span data for a run to the API"""
181203 run_id = str (run .id )
182204
@@ -191,27 +213,27 @@ async def _send_span(self, run: Run) -> None:
191213 parent_id = (
192214 str (run .parent_run_id )
193215 if run .parent_run_id is not None
194- else self .parent_span_id
216+ else self .context . parent_span_id
195217 )
196- attributes = self ._safe_json_dump (self ._run_to_dict (run ))
218+ attributes = self ._safe_jsons_dump (self ._run_to_dict (run ))
197219 status = self ._determine_status (run .error )
198220
199221 span_data = {
200222 "id" : run_id ,
201223 "parentId" : parent_id ,
202- "traceId" : self .trace_parent ,
224+ "traceId" : self .context . trace_id ,
203225 "name" : run .name ,
204226 "startTime" : start_time ,
205227 "endTime" : end_time ,
206- "referenceId" : self .referenceId ,
228+ "referenceId" : self .context . reference_id ,
207229 "attributes" : attributes ,
208- "organizationId" : self .orgId ,
209- "tenantId" : self .tenantId ,
230+ "organizationId" : self .context . org_id ,
231+ "tenantId" : self .context . tenant_id ,
210232 "spanType" : "LangGraphRun" ,
211233 "status" : status ,
212- "jobKey" : self .jobKey ,
213- "folderKey" : self .folderKey ,
214- "processKey" : self .processKey ,
234+ "jobKey" : self .context . job_id ,
235+ "folderKey" : self .context . folder_key ,
236+ "processKey" : self .context . folder_key ,
215237 }
216238
217239 self .log_queue .put (span_data )
@@ -235,14 +257,23 @@ def _determine_status(self, error: Optional[str]):
235257
236258 return Status .SUCCESS
237259
238- def _safe_json_dump (self , obj ) -> str :
260+ def _safe_jsons_dump (self , obj ) -> str :
239261 try :
240262 json_str = json .dumps (obj , default = _simple_serialize_defaults )
241263 return json_str
242264 except Exception as e :
243- logger .warning (e )
265+ logger .warning (f"Error serializing object to JSON: { e } " )
244266 return "{ }"
245267
268+ def _safe_dict_dump (self , obj ) -> Dict [str , Any ]:
269+ try :
270+ serialized = json .loads (json .dumps (obj , default = _simple_serialize_defaults ))
271+ return serialized
272+ except Exception as e :
273+ # Last resort - string representation
274+ logger .warning (f"Error serializing object to JSON: { e } " )
275+ return {"raw" : str (obj )}
276+
246277 def _run_to_dict (self , run : Run ):
247278 with warnings .catch_warnings ():
248279 warnings .simplefilter ("ignore" , category = PydanticDeprecationWarning )
@@ -252,3 +283,21 @@ def _run_to_dict(self, run: Run):
252283 "inputs" : run .inputs .copy () if run .inputs is not None else None ,
253284 "outputs" : run .outputs .copy () if run .outputs is not None else None ,
254285 }
286+
287+ def _get_base_url (self ) -> str :
288+ uipath_url = (
289+ env .get ("UIPATH_URL" ) or "https://cloud.uipath.com/dummyOrg/dummyTennant/"
290+ )
291+ uipath_url = uipath_url .rstrip ("/" )
292+
293+ # split by "//" to get ['', 'https:', 'alpha.uipath.com/ada/byoa']
294+ parts = uipath_url .split ("//" )
295+
296+ # after splitting by //, the base URL will be at index 1 along with the rest,
297+ # hence split it again using "/" to get ['https:', 'alpha.uipath.com', 'ada', 'byoa']
298+ base_url_parts = parts [1 ].split ("/" )
299+
300+ # combine scheme and netloc to get the base URL
301+ base_url = parts [0 ] + "//" + base_url_parts [0 ] + "/"
302+
303+ return base_url
0 commit comments