66import uuid
77import warnings
88from os import environ as env
9- from typing import Any , Optional
9+ from typing import Any , Dict , Optional
1010
1111import httpx
1212from langchain_core .tracers .base import AsyncBaseTracer
1313from langchain_core .tracers .schemas import Run
1414from pydantic import PydanticDeprecationWarning
15+ from uipath_sdk ._cli ._runtime ._contracts import UiPathTraceContext
1516
17+ from ._events import CustomTraceEvents , FunctionCallEventData
1618from ._utils import _simple_serialize_defaults
1719
1820logger = logging .getLogger (__name__ )
@@ -25,78 +27,94 @@ class Status:
2527
2628
2729class AsyncUiPathTracer (AsyncBaseTracer ):
28- def __init__ (self , client = None , ** kwargs ):
30+ def __init__ (
31+ self ,
32+ context : Optional [UiPathTraceContext ] = None ,
33+ client : Optional [httpx .AsyncClient ] = None ,
34+ ** kwargs ,
35+ ):
2936 super ().__init__ (** kwargs )
3037
3138 self .client = client or httpx .AsyncClient ()
3239 self .retries = 3
3340 self .log_queue : queue .Queue [dict [str , Any ]] = queue .Queue ()
3441
42+ self .context = context or UiPathTraceContext ()
43+
3544 llm_ops_pattern = self ._get_base_url () + "{orgId}/llmops_"
36- self .orgId = env .get (
37- "UIPATH_ORGANIZATION_ID" , "00000000-0000-0000-0000-000000000000"
38- )
39- self .tenantId = env .get (
40- "UIPATH_TENANT_ID" , "00000000-0000-0000-0000-000000000000"
41- )
42- self .url = llm_ops_pattern .format (orgId = self .orgId ).rstrip ("/" )
4345
44- self .auth_token = env .get ("UNATTENDED_USER_ACCESS_TOKEN" ) or env .get (
46+ self .url = llm_ops_pattern .format (orgId = self .context .org_id ).rstrip ("/" )
47+
48+ auth_token = env .get ("UNATTENDED_USER_ACCESS_TOKEN" ) or env .get (
4549 "UIPATH_ACCESS_TOKEN"
4650 )
4751
48- self .jobKey = env .get ("UIPATH_JOB_KEY" )
49- self .folderKey = env .get ("UIPATH_FOLDER_KEY" )
50- self .processKey = env .get ("UIPATH_PROCESS_UUID" )
51- self .parent_span_id = env .get ("UIPATH_PARENT_SPAN_ID" )
52-
53- self .referenceId = self .jobKey or str (uuid .uuid4 ())
54-
55- self .headers = {
56- "Authorization" : f"Bearer { self .auth_token } " ,
57- }
52+ self .headers = {"Authorization" : f"Bearer { auth_token } " }
5853
5954 self .running = True
6055 self .worker_task = asyncio .create_task (self ._worker ())
56+ self .function_call_run_map : Dict [str , Run ] = {}
57+
58+ async def on_custom_event (
59+ self ,
60+ name : str ,
61+ data : dict ,
62+ * ,
63+ run_id : uuid .UUID ,
64+ tags = None ,
65+ metadata = None ,
66+ ** kwargs : Any ,
67+ ) -> None :
68+ if name == CustomTraceEvents .UIPATH_TRACE_FUNCTION_CALL :
69+ # only handle the function call event
70+
71+ if not isinstance (data , FunctionCallEventData ):
72+ logger .warning (
73+ f"Received unexpected data type for function call event: { type (data )} "
74+ )
75+ return
6176
62- def _get_base_url (self ) -> str :
63- uipath_url = (
64- env .get ("UIPATH_URL" ) or "https://cloud.uipath.com/dummyOrg/dummyTennant/"
65- )
66- uipath_url = uipath_url .rstrip ("/" )
77+ if data .event_type == "call" :
78+ run = self .run_map [str (run_id )]
79+ child_run = run .create_child (
80+ name = data .function_name , run_type = data .run_type , tags = data .tags
81+ )
82+ run .add_metadata (data .metadata )
6783
68- # split by "//" to get ['', 'https:', 'alpha.uipath.com/ada/byoa']
69- parts = uipath_url . split ( "//" )
84+ call_uuid = data . call_uuid
85+ self . function_call_run_map [ call_uuid ] = child_run
7086
71- # after splitting by //, the base URL will be at index 1 along with the rest,
72- # hence split it again using "/" to get ['https:', 'alpha.uipath.com', 'ada', 'byoa']
73- base_url_parts = parts [1 ].split ("/" )
87+ self ._send_span (run )
7488
75- # combine scheme and netloc to get the base URL
76- base_url = parts [0 ] + "//" + base_url_parts [0 ] + "/"
89+ if data .event_type == "completion" :
90+ call_uuid = data .call_uuid
91+ run = self .function_call_run_map .pop (call_uuid , None )
7792
78- return base_url
93+ if run :
94+ run .end (self ._safe_dict_dump (data .result ), data .error )
95+ self ._send_span (run )
7996
8097 async def init_trace (self , run_name , trace_id = None ) -> None :
81- trace_id_env = env .get ("UIPATH_TRACE_ID" )
98+ if self .context .trace_id :
99+ # trace id already set no need to do anything
100+ return
82101
83- if trace_id_env :
84- self .trace_parent = trace_id_env
85- else :
86- await self .start_trace (run_name , trace_id )
102+ # no trace id, start a new trace
103+ await self .start_trace (run_name , trace_id )
87104
88105 async def start_trace (self , run_name , trace_id = None ) -> None :
89- self .trace_parent = trace_id or str (uuid .uuid4 ())
90- run_name = run_name or f"Job Run: { self .trace_parent } "
106+ self .context .trace_id = str (uuid .uuid4 ())
107+
108+ run_name = run_name or f"Job Run: { self .context .trace_id } "
91109 trace_data = {
92- "id" : self .trace_parent ,
110+ "id" : self .context . trace_id ,
93111 "name" : re .sub (
94112 "[!@#$<>\.]" , "" , run_name
95113 ), # if we use these characters the Agents UI throws some error (but llmops backend seems fine)
96- "referenceId" : self .referenceId ,
114+ "referenceId" : self .context . reference_id ,
97115 "attributes" : "{}" ,
98- "organizationId" : self .orgId ,
99- "tenantId" : self .tenantId ,
116+ "organizationId" : self .context . org_id ,
117+ "tenantId" : self .context . tenant_id ,
100118 }
101119
102120 for attempt in range (self .retries ):
@@ -174,9 +192,9 @@ async def _worker(self):
174192
175193 async def _persist_run (self , run : Run ) -> None :
176194 # Determine if this is a start or end trace based on whether end_time is set
177- await self ._send_span (run )
195+ self ._send_span (run )
178196
179- async def _send_span (self , run : Run ) -> None :
197+ def _send_span (self , run : Run ) -> None :
180198 """Send span data for a run to the API"""
181199 run_id = str (run .id )
182200
@@ -191,27 +209,27 @@ async def _send_span(self, run: Run) -> None:
191209 parent_id = (
192210 str (run .parent_run_id )
193211 if run .parent_run_id is not None
194- else self .parent_span_id
212+ else self .context . parent_span_id
195213 )
196- attributes = self ._safe_json_dump (self ._run_to_dict (run ))
214+ attributes = self ._safe_jsons_dump (self ._run_to_dict (run ))
197215 status = self ._determine_status (run .error )
198216
199217 span_data = {
200218 "id" : run_id ,
201219 "parentId" : parent_id ,
202- "traceId" : self .trace_parent ,
220+ "traceId" : self .context . trace_id ,
203221 "name" : run .name ,
204222 "startTime" : start_time ,
205223 "endTime" : end_time ,
206- "referenceId" : self .referenceId ,
224+ "referenceId" : self .context . reference_id ,
207225 "attributes" : attributes ,
208- "organizationId" : self .orgId ,
209- "tenantId" : self .tenantId ,
226+ "organizationId" : self .context . org_id ,
227+ "tenantId" : self .context . tenant_id ,
210228 "spanType" : "LangGraphRun" ,
211229 "status" : status ,
212- "jobKey" : self .jobKey ,
213- "folderKey" : self .folderKey ,
214- "processKey" : self .processKey ,
230+ "jobKey" : self .context . job_id ,
231+ "folderKey" : self .context . folder_key ,
232+ "processKey" : self .context . folder_key ,
215233 }
216234
217235 self .log_queue .put (span_data )
@@ -235,14 +253,23 @@ def _determine_status(self, error: Optional[str]):
235253
236254 return Status .SUCCESS
237255
238- def _safe_json_dump (self , obj ) -> str :
256+ def _safe_jsons_dump (self , obj ) -> str :
239257 try :
240258 json_str = json .dumps (obj , default = _simple_serialize_defaults )
241259 return json_str
242260 except Exception as e :
243- logger .warning (e )
261+ logger .warning (f"Error serializing object to JSON: { e } " )
244262 return "{ }"
245263
264+ def _safe_dict_dump (self , obj ) -> dict :
265+ try :
266+ serialized = json .loads (json .dumps (obj , default = _simple_serialize_defaults ))
267+ return serialized
268+ except Exception as e :
269+ # Last resort - string representation
270+ logger .warning (f"Error serializing object to JSON: { e } " )
271+ return str (obj )
272+
246273 def _run_to_dict (self , run : Run ):
247274 with warnings .catch_warnings ():
248275 warnings .simplefilter ("ignore" , category = PydanticDeprecationWarning )
@@ -252,3 +279,21 @@ def _run_to_dict(self, run: Run):
252279 "inputs" : run .inputs .copy () if run .inputs is not None else None ,
253280 "outputs" : run .outputs .copy () if run .outputs is not None else None ,
254281 }
282+
283+ def _get_base_url (self ) -> str :
284+ uipath_url = (
285+ env .get ("UIPATH_URL" ) or "https://cloud.uipath.com/dummyOrg/dummyTennant/"
286+ )
287+ uipath_url = uipath_url .rstrip ("/" )
288+
289+ # split by "//" to get ['', 'https:', 'alpha.uipath.com/ada/byoa']
290+ parts = uipath_url .split ("//" )
291+
292+ # after splitting by //, the base URL will be at index 1 along with the rest,
293+ # hence split it again using "/" to get ['https:', 'alpha.uipath.com', 'ada', 'byoa']
294+ base_url_parts = parts [1 ].split ("/" )
295+
296+ # combine scheme and netloc to get the base URL
297+ base_url = parts [0 ] + "//" + base_url_parts [0 ] + "/"
298+
299+ return base_url
0 commit comments