66import uuid
77import warnings
88from os import environ as env
9- from typing import Any , Optional
9+ from typing import Any , Dict , Optional
1010
1111import httpx
1212from langchain_core .tracers .base import AsyncBaseTracer
1313from langchain_core .tracers .schemas import Run
1414from pydantic import PydanticDeprecationWarning
15+ from uipath_sdk ._cli ._runtime ._contracts import UiPathTraceContext
1516
17+ from ._events import CustomTraceEvents , FunctionCallEventData
1618from ._utils import _simple_serialize_defaults
1719
1820logger = logging .getLogger (__name__ )
@@ -25,78 +27,87 @@ class Status:
2527
2628
2729class AsyncUiPathTracer (AsyncBaseTracer ):
28- def __init__ (self , client = None , ** kwargs ):
30+ def __init__ (self , context : UiPathTraceContext = None , client = None , ** kwargs ):
2931 super ().__init__ (** kwargs )
3032
3133 self .client = client or httpx .AsyncClient ()
3234 self .retries = 3
3335 self .log_queue : queue .Queue [dict [str , Any ]] = queue .Queue ()
3436
37+ self .context = context or UiPathTraceContext ()
38+
3539 llm_ops_pattern = self ._get_base_url () + "{orgId}/llmops_"
36- self .orgId = env .get (
37- "UIPATH_ORGANIZATION_ID" , "00000000-0000-0000-0000-000000000000"
38- )
39- self .tenantId = env .get (
40- "UIPATH_TENANT_ID" , "00000000-0000-0000-0000-000000000000"
41- )
42- self .url = llm_ops_pattern .format (orgId = self .orgId ).rstrip ("/" )
4340
44- self .auth_token = env .get ("UNATTENDED_USER_ACCESS_TOKEN" ) or env .get (
41+ self .url = llm_ops_pattern .format (orgId = self .context .org_id ).rstrip ("/" )
42+
43+ auth_token = env .get ("UNATTENDED_USER_ACCESS_TOKEN" ) or env .get (
4544 "UIPATH_ACCESS_TOKEN"
4645 )
4746
48- self .jobKey = env .get ("UIPATH_JOB_KEY" )
49- self .folderKey = env .get ("UIPATH_FOLDER_KEY" )
50- self .processKey = env .get ("UIPATH_PROCESS_UUID" )
51- self .parent_span_id = env .get ("UIPATH_PARENT_SPAN_ID" )
52-
53- self .referenceId = self .jobKey or str (uuid .uuid4 ())
54-
55- self .headers = {
56- "Authorization" : f"Bearer { self .auth_token } " ,
57- }
47+ self .headers = {"Authorization" : f"Bearer { auth_token } " }
5848
5949 self .running = True
6050 self .worker_task = asyncio .create_task (self ._worker ())
51+ self .function_call_run_map : Dict [str , Run ] = {}
52+
53+ async def on_custom_event (
54+ self ,
55+ name : str ,
56+ data : dict ,
57+ * ,
58+ run_id : uuid .UUID ,
59+ tags = None ,
60+ metadata = None ,
61+ ** kwargs : Any ,
62+ ) -> None :
63+ if name == CustomTraceEvents .UIPATH_TRACE_FUNCTION_CALL :
64+ # only handle the function call event
65+
66+ if not isinstance (data , FunctionCallEventData ):
67+ logger .warning (
68+ f"Received unexpected data type for function call event: { type (data )} "
69+ )
70+ return
6171
62- def _get_base_url (self ) -> str :
63- uipath_url = (
64- env .get ("UIPATH_URL" ) or "https://cloud.uipath.com/dummyOrg/dummyTennant/"
65- )
66- uipath_url = uipath_url .rstrip ("/" )
72+ if data .event_type == "call" :
73+ run = self .run_map [str (run_id )]
74+ child_run = run .create_child (name = data .function_name , run_type = data .run_type , tags = data .tags )
75+ run .add_metadata (data .metadata )
6776
68- # split by "//" to get ['', 'https:', 'alpha.uipath.com/ada/byoa']
69- parts = uipath_url . split ( "//" )
77+ call_uuid = data . call_uuid
78+ self . function_call_run_map [ call_uuid ] = child_run
7079
71- # after splitting by //, the base URL will be at index 1 along with the rest,
72- # hence split it again using "/" to get ['https:', 'alpha.uipath.com', 'ada', 'byoa']
73- base_url_parts = parts [1 ].split ("/" )
80+ self ._send_span (run )
7481
75- # combine scheme and netloc to get the base URL
76- base_url = parts [0 ] + "//" + base_url_parts [0 ] + "/"
82+ if data .event_type == "completion" :
83+ call_uuid = data .call_uuid
84+ run = self .function_call_run_map .pop (call_uuid , None )
7785
78- return base_url
86+ if run :
87+ run .end (self ._safe_dict_dump (data .result ), data .error )
88+ self ._send_span (run )
7989
8090 async def init_trace (self , run_name , trace_id = None ) -> None :
81- trace_id_env = env .get ("UIPATH_TRACE_ID" )
91+ if self .context .trace_id :
92+ # trace id already set no need to do anything
93+ return
8294
83- if trace_id_env :
84- self .trace_parent = trace_id_env
85- else :
86- await self .start_trace (run_name , trace_id )
95+ # no trace id, start a new trace
96+ await self .start_trace (run_name , trace_id )
8797
8898 async def start_trace (self , run_name , trace_id = None ) -> None :
89- self .trace_parent = trace_id or str (uuid .uuid4 ())
90- run_name = run_name or f"Job Run: { self .trace_parent } "
99+ self .context .trace_id = str (uuid .uuid4 ())
100+
101+ run_name = run_name or f"Job Run: { self .context .trace_id } "
91102 trace_data = {
92- "id" : self .trace_parent ,
103+ "id" : self .context . trace_id ,
93104 "name" : re .sub (
94105 "[!@#$<>\.]" , "" , run_name
95106 ), # if we use these characters the Agents UI throws some error (but llmops backend seems fine)
96- "referenceId" : self .referenceId ,
107+ "referenceId" : self .context . reference_id ,
97108 "attributes" : "{}" ,
98- "organizationId" : self .orgId ,
99- "tenantId" : self .tenantId ,
109+ "organizationId" : self .context . org_id ,
110+ "tenantId" : self .context . tenant_id ,
100111 }
101112
102113 for attempt in range (self .retries ):
@@ -174,9 +185,9 @@ async def _worker(self):
174185
175186 async def _persist_run (self , run : Run ) -> None :
176187 # Determine if this is a start or end trace based on whether end_time is set
177- await self ._send_span (run )
188+ self ._send_span (run )
178189
179- async def _send_span (self , run : Run ) -> None :
190+ def _send_span (self , run : Run ) -> None :
180191 """Send span data for a run to the API"""
181192 run_id = str (run .id )
182193
@@ -191,27 +202,27 @@ async def _send_span(self, run: Run) -> None:
191202 parent_id = (
192203 str (run .parent_run_id )
193204 if run .parent_run_id is not None
194- else self .parent_span_id
205+ else self .context . parent_span_id
195206 )
196- attributes = self ._safe_json_dump (self ._run_to_dict (run ))
207+ attributes = self ._safe_jsons_dump (self ._run_to_dict (run ))
197208 status = self ._determine_status (run .error )
198209
199210 span_data = {
200211 "id" : run_id ,
201212 "parentId" : parent_id ,
202- "traceId" : self .trace_parent ,
213+ "traceId" : self .context . trace_id ,
203214 "name" : run .name ,
204215 "startTime" : start_time ,
205216 "endTime" : end_time ,
206- "referenceId" : self .referenceId ,
217+ "referenceId" : self .context . reference_id ,
207218 "attributes" : attributes ,
208- "organizationId" : self .orgId ,
209- "tenantId" : self .tenantId ,
219+ "organizationId" : self .context . org_id ,
220+ "tenantId" : self .context . tenant_id ,
210221 "spanType" : "LangGraphRun" ,
211222 "status" : status ,
212- "jobKey" : self .jobKey ,
213- "folderKey" : self .folderKey ,
214- "processKey" : self .processKey ,
223+ "jobKey" : self .context . job_id ,
224+ "folderKey" : self .context . folder_key ,
225+ "processKey" : self .context . folder_key ,
215226 }
216227
217228 self .log_queue .put (span_data )
@@ -235,13 +246,22 @@ def _determine_status(self, error: Optional[str]):
235246
236247 return Status .SUCCESS
237248
238- def _safe_json_dump (self , obj ) -> str :
249+ def _safe_jsons_dump (self , obj ) -> str :
239250 try :
240251 json_str = json .dumps (obj , default = _simple_serialize_defaults )
241252 return json_str
242253 except Exception as e :
243- logger .warning (e )
254+ logger .warning (f"Error serializing object to JSON: { e } " )
244255 return "{ }"
256+
257+ def _safe_dict_dump (self , obj ) -> dict :
258+ try :
259+ serialized = json .loads (json .dumps (obj , default = _simple_serialize_defaults ))
260+ return serialized
261+ except Exception as e :
262+ # Last resort - string representation
263+ logger .warning (f"Error serializing object to JSON: { e } " )
264+ return str (obj )
245265
246266 def _run_to_dict (self , run : Run ):
247267 with warnings .catch_warnings ():
@@ -252,3 +272,21 @@ def _run_to_dict(self, run: Run):
252272 "inputs" : run .inputs .copy () if run .inputs is not None else None ,
253273 "outputs" : run .outputs .copy () if run .outputs is not None else None ,
254274 }
275+
276+ def _get_base_url (self ) -> str :
277+ uipath_url = (
278+ env .get ("UIPATH_URL" ) or "https://cloud.uipath.com/dummyOrg/dummyTennant/"
279+ )
280+ uipath_url = uipath_url .rstrip ("/" )
281+
282+ # split by "//" to get ['', 'https:', 'alpha.uipath.com/ada/byoa']
283+ parts = uipath_url .split ("//" )
284+
285+ # after splitting by //, the base URL will be at index 1 along with the rest,
286+ # hence split it again using "/" to get ['https:', 'alpha.uipath.com', 'ada', 'byoa']
287+ base_url_parts = parts [1 ].split ("/" )
288+
289+ # combine scheme and netloc to get the base URL
290+ base_url = parts [0 ] + "//" + base_url_parts [0 ] + "/"
291+
292+ return base_url
0 commit comments