11"""Base runtime class and async context manager implementation."""
22
3- import json
43import logging
5- import os
64from abc import ABC , abstractmethod
7- from typing import AsyncGenerator
5+ from typing import (
6+ Any ,
7+ AsyncGenerator ,
8+ Generic ,
9+ List ,
10+ Optional ,
11+ TypeVar ,
12+ )
813
9- from uipath .runtime .context import UiPathRuntimeContext
10- from uipath .runtime .errors import (
11- UiPathErrorCategory ,
12- UiPathErrorCode ,
13- UiPathErrorContract ,
14- UiPathRuntimeError ,
14+ from opentelemetry import trace
15+ from opentelemetry .instrumentation .instrumentor import ( # type: ignore[attr-defined]
16+ BaseInstrumentor ,
1517)
18+ from opentelemetry .sdk .trace import ReadableSpan , SpanProcessor , TracerProvider
19+ from opentelemetry .sdk .trace .export import SpanExporter
20+ from typing_extensions import override
21+
22+ from uipath .runtime .context import UiPathRuntimeContext
1623from uipath .runtime .events import (
1724 UiPathRuntimeEvent ,
1825)
19- from uipath .runtime .logging ._interceptor import UiPathRuntimeLogsInterceptor
20- from uipath .runtime .result import UiPathRuntimeResult , UiPathRuntimeStatus
2126from uipath .runtime .schema import (
2227 UiPathRuntimeSchema ,
2328)
29+ from uipath .runtime .tracing import (
30+ UiPathExecutionBatchTraceProcessor ,
31+ UiPathExecutionSimpleTraceProcessor ,
32+ UiPathRuntimeExecutionSpanExporter ,
33+ )
2434
2535logger = logging .getLogger (__name__ )
2636
@@ -37,7 +47,7 @@ class UiPathBaseRuntime(ABC):
3747 This allows using the class with 'async with' statements.
3848 """
3949
40- def __init__ (self , context : UiPathRuntimeContext ):
50+ def __init__ (self , context : Optional [ UiPathRuntimeContext ] = None ):
4151 """Initialize the runtime with the provided context."""
4252 self .context = context
4353
@@ -48,70 +58,10 @@ async def get_schema(self) -> UiPathRuntimeSchema:
4858 """
4959 raise NotImplementedError ()
5060
51- async def __aenter__ (self ):
52- """Async enter method called when entering the 'async with' block.
53-
54- Initializes and prepares the runtime environment.
55-
56- Returns:
57- The runtime instance
58- """
59- # Read the input from file if provided
60- if self .context .input_file :
61- _ , file_extension = os .path .splitext (self .context .input_file )
62- if file_extension != ".json" :
63- raise UiPathRuntimeError (
64- code = UiPathErrorCode .INVALID_INPUT_FILE_EXTENSION ,
65- title = "Invalid Input File Extension" ,
66- detail = "The provided input file must be in JSON format." ,
67- )
68- with open (self .context .input_file ) as f :
69- self .context .input = f .read ()
70-
71- try :
72- if isinstance (self .context .input , str ):
73- if self .context .input .strip ():
74- self .context .input = json .loads (self .context .input )
75- else :
76- self .context .input = {}
77- elif self .context .input is None :
78- self .context .input = {}
79- # else: leave it as-is (already a dict, list, bool, etc.)
80- except json .JSONDecodeError as e :
81- raise UiPathRuntimeError (
82- UiPathErrorCode .INPUT_INVALID_JSON ,
83- "Invalid JSON input" ,
84- f"The input data is not valid JSON: { str (e )} " ,
85- UiPathErrorCategory .USER ,
86- ) from e
87-
88- await self .validate ()
89-
90- # Intercept all stdout/stderr/logs
91- # Write to file (runtime), stdout (debug) or log handler (if provided)
92- self .logs_interceptor = UiPathRuntimeLogsInterceptor (
93- min_level = self .context .logs_min_level ,
94- dir = self .context .runtime_dir ,
95- file = self .context .logs_file ,
96- job_id = self .context .job_id ,
97- execution_id = self .context .execution_id ,
98- log_handler = self .context .log_handler ,
99- )
100- self .logs_interceptor .setup ()
101-
102- return self
103-
10461 @abstractmethod
105- async def execute (self ) -> UiPathRuntimeResult :
106- """Execute with the provided context.
107-
108- Returns:
109- Dictionary with execution results
110-
111- Raises:
112- RuntimeError: If execution fails
113- """
114- pass
62+ async def execute (self , input : dict [str , Any ]) -> Any :
63+ """Produce the agent output."""
64+ raise NotImplementedError ()
11565
11666 async def stream (
11767 self ,
@@ -154,101 +104,122 @@ async def stream(
154104 # Without it, the function wouldn't match the AsyncGenerator return type
155105 yield
156106
157- @abstractmethod
158- async def validate (self ):
159- """Validate runtime inputs."""
160- pass
107+ def get_instrumentor (self ) -> Optional [BaseInstrumentor ]:
108+ """Get instrumentor for this runtime. If no instrumentor is available, return None."""
109+ return None
161110
162111 @abstractmethod
163112 async def cleanup (self ):
164113 """Cleaup runtime resources."""
165114 pass
166115
167- async def __aexit__ (self , exc_type , exc_val , exc_tb ):
168- """Async exit method called when exiting the 'async with' block.
169116
170- Cleans up resources and handles any exceptions.
117+ T = TypeVar ("T" , bound = UiPathBaseRuntime )
118+
119+
120+ class TraceManager :
121+ """Trace manager."""
171122
172- Always writes output file regardless of whether execution was successful,
173- suspended, or encountered an error.
123+ def __init__ (self ):
124+ """Initialize a trace manager."""
125+ self .tracer_provider : TracerProvider = TracerProvider ()
126+ trace .set_tracer_provider (self .tracer_provider )
127+ self .tracer_span_processors : List [SpanProcessor ] = []
128+ self .execution_span_exporter = UiPathRuntimeExecutionSpanExporter ()
129+ self .add_span_exporter (self .execution_span_exporter )
130+
131+ def add_span_exporter (
132+ self ,
133+ span_exporter : SpanExporter ,
134+ batch : bool = True ,
135+ ) -> "TraceManager" :
136+ """Add a span processor to the tracer provider."""
137+ span_processor : SpanProcessor
138+ if batch :
139+ span_processor = UiPathExecutionBatchTraceProcessor (span_exporter )
140+ else :
141+ span_processor = UiPathExecutionSimpleTraceProcessor (span_exporter )
142+ self .tracer_span_processors .append (span_processor )
143+ self .tracer_provider .add_span_processor (span_processor )
144+ return self
145+
146+ def get_execution_spans (
147+ self ,
148+ execution_id : str ,
149+ ) -> List [ReadableSpan ]:
150+ """Retrieve spans for a given execution id."""
151+ return self .execution_span_exporter .get_spans (execution_id )
152+
153+ def flush_spans (self ) -> None :
154+ """Flush all span processors."""
155+ for span_processor in self .tracer_span_processors :
156+ span_processor .force_flush ()
157+
158+
159+ class UiPathTracedRuntime (UiPathBaseRuntime , Generic [T ]):
160+ """Handles runtime execution with tracing/telemetry."""
161+
162+ def __init__ (self , delegate : T , trace_manager : TraceManager ):
163+ """Initialize the executor."""
164+ self .delegate = delegate
165+ self .trace_manager = trace_manager
166+
167+ async def execute (
168+ self ,
169+ input : dict [str , Any ],
170+ root_span : str = "root" ,
171+ execution_id : Optional [str ] = None ,
172+ ) -> Any :
173+ """Execute runtime with context."""
174+ instrumentor = self .delegate .get_instrumentor ()
175+ if instrumentor is not None :
176+ instrumentor .instrument (tracer_provider = self .trace_manager .tracer_provider )
177+ try :
178+ tracer = trace .get_tracer ("uipath-runtime" )
179+ span_attributes : dict [str , Any ] = {}
180+ if execution_id :
181+ span_attributes ["execution.id" ] = execution_id
182+ with tracer .start_as_current_span (root_span , attributes = span_attributes ):
183+ return await self .delegate .execute (input )
184+ finally :
185+ self .trace_manager .flush_spans ()
186+ if instrumentor is not None :
187+ instrumentor .uninstrument ()
188+
189+ @override
190+ async def stream (
191+ self ,
192+ root_span : str = "root" ,
193+ execution_id : Optional [str ] = None ,
194+ ) -> AsyncGenerator [UiPathRuntimeEvent , None ]:
195+ """Stream runtime execution with context.
196+
197+ Args:
198+ runtime: The runtime instance
199+ context: The runtime context
200+
201+ Yields:
202+ UiPathRuntimeEvent instances during execution and final UiPathRuntimeResult
203+
204+ Raises:
205+ UiPathRuntimeStreamNotSupportedError: If the runtime doesn't support streaming
174206 """
207+ instrumentor = self .delegate .get_instrumentor ()
208+ if instrumentor is not None :
209+ instrumentor .instrument (tracer_provider = self .trace_manager .tracer_provider )
175210 try :
176- if self .context .result is None :
177- execution_result = UiPathRuntimeResult ()
178- else :
179- execution_result = self .context .result
180-
181- if exc_type :
182- # Create error info from exception
183- if isinstance (exc_val , UiPathRuntimeError ):
184- error_info = exc_val .error_info
185- else :
186- # Generic error
187- error_info = UiPathErrorContract (
188- code = f"ERROR_{ exc_type .__name__ } " ,
189- title = f"Runtime error: { exc_type .__name__ } " ,
190- detail = str (exc_val ),
191- category = UiPathErrorCategory .UNKNOWN ,
192- )
193-
194- execution_result .status = UiPathRuntimeStatus .FAULTED
195- execution_result .error = error_info
196-
197- content = execution_result .to_dict ()
198-
199- # Always write output file at runtime, except for inner runtimes
200- # Inner runtimes have execution_id
201- if self .context .job_id and not self .context .execution_id :
202- with open (self .context .result_file_path , "w" ) as f :
203- json .dump (content , f , indent = 2 , default = str )
204-
205- # Write the execution output to file if requested
206- if self .context .output_file :
207- with open (self .context .output_file , "w" ) as f :
208- f .write (content .get ("output" , "{}" ))
209-
210- # Don't suppress exceptions
211- return False
212-
213- except Exception as e :
214- logger .error (f"Error during runtime shutdown: { str (e )} " )
215-
216- # Create a fallback error result if we fail during cleanup
217- if not isinstance (e , UiPathRuntimeError ):
218- error_info = UiPathErrorContract (
219- code = "RUNTIME_SHUTDOWN_ERROR" ,
220- title = "Runtime shutdown failed" ,
221- detail = f"Error: { str (e )} " ,
222- category = UiPathErrorCategory .SYSTEM ,
223- )
224- else :
225- error_info = e .error_info
226-
227- # Last-ditch effort to write error output
228- try :
229- error_result = UiPathRuntimeResult (
230- status = UiPathRuntimeStatus .FAULTED , error = error_info
231- )
232- error_result_content = error_result .to_dict ()
233- if self .context .job_id :
234- with open (self .context .result_file_path , "w" ) as f :
235- json .dump (error_result_content , f , indent = 2 , default = str )
236- except Exception as write_error :
237- logger .error (f"Failed to write error output file: { str (write_error )} " )
238- raise
239-
240- # Re-raise as RuntimeError if it's not already a UiPathRuntimeError
241- if not isinstance (e , UiPathRuntimeError ):
242- raise RuntimeError (
243- error_info .code ,
244- error_info .title ,
245- error_info .detail ,
246- error_info .category ,
247- ) from e
248- raise
211+ tracer = trace .get_tracer ("uipath-runtime" )
212+ span_attributes : dict [str , Any ] = {}
213+ if execution_id :
214+ span_attributes ["execution.id" ] = "exec-a"
215+ with tracer .start_as_current_span (root_span , attributes = span_attributes ):
216+ async for event in self .delegate .stream ():
217+ yield event
249218 finally :
250- # Restore original logging
251- if hasattr ( self , "logs_interceptor" ) :
252- self . logs_interceptor . teardown ()
219+ self . trace_manager . flush_spans ()
220+ if instrumentor is not None :
221+ instrumentor . uninstrument ()
253222
254- await self .cleanup ()
223+ def cleanup (self ) -> None :
224+ """Close runtime resources."""
225+ pass
0 commit comments