Skip to content

Commit 40fe6b3

Browse files
akshaylivecristipufu
authored andcommitted
refactor(RuntimeFactory): refactor runtime and factory
This is done in order to follow single responsibility principles (SRP).
1 parent 58e1959 commit 40fe6b3

File tree

9 files changed

+647
-964
lines changed

9 files changed

+647
-964
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ dependencies = [
88
"opentelemetry-sdk>=1.38.0",
99
"opentelemetry-instrumentation>=0.59b0",
1010
"pydantic>=2.12.3",
11-
"uipath-core>=0.0.1",
11+
"uipath-core>=0.0.3",
1212
]
1313
classifiers = [
1414
"Intended Audience :: Developers",

src/uipath/runtime/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from uipath.runtime.base import UiPathBaseRuntime, UiPathStreamNotSupportedError
44
from uipath.runtime.context import UiPathRuntimeContext
55
from uipath.runtime.events import UiPathRuntimeEvent
6-
from uipath.runtime.factory import UiPathRuntimeExecutor, UiPathRuntimeFactory
6+
from uipath.runtime.factory import UiPathRuntimeFactory
77
from uipath.runtime.result import (
88
UiPathApiTrigger,
99
UiPathBreakpointResult,
@@ -17,7 +17,6 @@
1717
"UiPathRuntimeContext",
1818
"UiPathBaseRuntime",
1919
"UiPathRuntimeFactory",
20-
"UiPathRuntimeExecutor",
2120
"UiPathRuntimeResult",
2221
"UiPathRuntimeStatus",
2322
"UiPathRuntimeEvent",

src/uipath/runtime/base.py

Lines changed: 91 additions & 161 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,25 @@
11
"""Base runtime class and async context manager implementation."""
22

3-
import json
43
import logging
5-
import os
64
from abc import ABC, abstractmethod
7-
from typing import AsyncGenerator
5+
from typing import (
6+
Any,
7+
AsyncGenerator,
8+
Generic,
9+
Optional,
10+
TypeVar,
11+
)
12+
13+
from opentelemetry import trace
14+
from typing_extensions import override
15+
from uipath.core import UiPathTraceManager
816

917
from uipath.runtime.context import UiPathRuntimeContext
10-
from uipath.runtime.errors import (
11-
UiPathErrorCategory,
12-
UiPathErrorCode,
13-
UiPathErrorContract,
14-
UiPathRuntimeError,
15-
)
1618
from uipath.runtime.events import (
1719
UiPathRuntimeEvent,
1820
)
21+
from uipath.runtime.logging import UiPathRuntimeExecutionLogHandler
1922
from uipath.runtime.logging._interceptor import UiPathRuntimeLogsInterceptor
20-
from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus
2123
from uipath.runtime.schema import (
2224
UiPathRuntimeSchema,
2325
)
@@ -37,7 +39,7 @@ class UiPathBaseRuntime(ABC):
3739
This allows using the class with 'async with' statements.
3840
"""
3941

40-
def __init__(self, context: UiPathRuntimeContext):
42+
def __init__(self, context: Optional[UiPathRuntimeContext] = None):
4143
"""Initialize the runtime with the provided context."""
4244
self.context = context
4345

@@ -48,70 +50,10 @@ async def get_schema(self) -> UiPathRuntimeSchema:
4850
"""
4951
raise NotImplementedError()
5052

51-
async def __aenter__(self):
52-
"""Async enter method called when entering the 'async with' block.
53-
54-
Initializes and prepares the runtime environment.
55-
56-
Returns:
57-
The runtime instance
58-
"""
59-
# Read the input from file if provided
60-
if self.context.input_file:
61-
_, file_extension = os.path.splitext(self.context.input_file)
62-
if file_extension != ".json":
63-
raise UiPathRuntimeError(
64-
code=UiPathErrorCode.INVALID_INPUT_FILE_EXTENSION,
65-
title="Invalid Input File Extension",
66-
detail="The provided input file must be in JSON format.",
67-
)
68-
with open(self.context.input_file) as f:
69-
self.context.input = f.read()
70-
71-
try:
72-
if isinstance(self.context.input, str):
73-
if self.context.input.strip():
74-
self.context.input = json.loads(self.context.input)
75-
else:
76-
self.context.input = {}
77-
elif self.context.input is None:
78-
self.context.input = {}
79-
# else: leave it as-is (already a dict, list, bool, etc.)
80-
except json.JSONDecodeError as e:
81-
raise UiPathRuntimeError(
82-
UiPathErrorCode.INPUT_INVALID_JSON,
83-
"Invalid JSON input",
84-
f"The input data is not valid JSON: {str(e)}",
85-
UiPathErrorCategory.USER,
86-
) from e
87-
88-
await self.validate()
89-
90-
# Intercept all stdout/stderr/logs
91-
# Write to file (runtime), stdout (debug) or log handler (if provided)
92-
self.logs_interceptor = UiPathRuntimeLogsInterceptor(
93-
min_level=self.context.logs_min_level,
94-
dir=self.context.runtime_dir,
95-
file=self.context.logs_file,
96-
job_id=self.context.job_id,
97-
execution_id=self.context.execution_id,
98-
log_handler=self.context.log_handler,
99-
)
100-
self.logs_interceptor.setup()
101-
102-
return self
103-
10453
@abstractmethod
105-
async def execute(self) -> UiPathRuntimeResult:
106-
"""Execute with the provided context.
107-
108-
Returns:
109-
Dictionary with execution results
110-
111-
Raises:
112-
RuntimeError: If execution fails
113-
"""
114-
pass
54+
async def execute(self, input: dict[str, Any]) -> Any:
55+
"""Produce the agent output."""
56+
raise NotImplementedError()
11557

11658
async def stream(
11759
self,
@@ -154,101 +96,89 @@ async def stream(
15496
# Without it, the function wouldn't match the AsyncGenerator return type
15597
yield
15698

157-
@abstractmethod
158-
async def validate(self):
159-
"""Validate runtime inputs."""
160-
pass
161-
16299
@abstractmethod
163100
async def cleanup(self):
164101
"""Cleaup runtime resources."""
165102
pass
166103

167-
async def __aexit__(self, exc_type, exc_val, exc_tb):
168-
"""Async exit method called when exiting the 'async with' block.
169104

170-
Cleans up resources and handles any exceptions.
105+
T = TypeVar("T", bound=UiPathBaseRuntime)
106+
107+
108+
class UiPathExecutionRuntime(UiPathBaseRuntime, Generic[T]):
109+
"""Handles runtime execution with tracing/telemetry."""
110+
111+
def __init__(
112+
self,
113+
delegate: T,
114+
trace_manager: UiPathTraceManager,
115+
root_span: str = "root",
116+
execution_id: Optional[str] = None,
117+
):
118+
"""Initialize the executor."""
119+
self.delegate = delegate
120+
self.trace_manager = trace_manager
121+
self.root_span = root_span
122+
self.execution_id = execution_id
123+
self.log_handler: Optional[UiPathRuntimeExecutionLogHandler]
124+
if execution_id is not None:
125+
self.log_handler = UiPathRuntimeExecutionLogHandler(execution_id)
126+
else:
127+
self.log_handler = None
128+
129+
async def execute(
130+
self,
131+
input: dict[str, Any],
132+
) -> Any:
133+
"""Execute runtime with context."""
134+
if self.log_handler:
135+
log_interceptor = UiPathRuntimeLogsInterceptor(
136+
execution_id=self.execution_id, log_handler=self.log_handler
137+
)
138+
log_interceptor.setup()
171139

172-
Always writes output file regardless of whether execution was successful,
173-
suspended, or encountered an error.
174-
"""
175140
try:
176-
if self.context.result is None:
177-
execution_result = UiPathRuntimeResult()
178-
else:
179-
execution_result = self.context.result
180-
181-
if exc_type:
182-
# Create error info from exception
183-
if isinstance(exc_val, UiPathRuntimeError):
184-
error_info = exc_val.error_info
185-
else:
186-
# Generic error
187-
error_info = UiPathErrorContract(
188-
code=f"ERROR_{exc_type.__name__}",
189-
title=f"Runtime error: {exc_type.__name__}",
190-
detail=str(exc_val),
191-
category=UiPathErrorCategory.UNKNOWN,
192-
)
193-
194-
execution_result.status = UiPathRuntimeStatus.FAULTED
195-
execution_result.error = error_info
196-
197-
content = execution_result.to_dict()
198-
199-
# Always write output file at runtime, except for inner runtimes
200-
# Inner runtimes have execution_id
201-
if self.context.job_id and not self.context.execution_id:
202-
with open(self.context.result_file_path, "w") as f:
203-
json.dump(content, f, indent=2, default=str)
204-
205-
# Write the execution output to file if requested
206-
if self.context.output_file:
207-
with open(self.context.output_file, "w") as f:
208-
f.write(content.get("output", "{}"))
209-
210-
# Don't suppress exceptions
211-
return False
212-
213-
except Exception as e:
214-
logger.error(f"Error during runtime shutdown: {str(e)}")
215-
216-
# Create a fallback error result if we fail during cleanup
217-
if not isinstance(e, UiPathRuntimeError):
218-
error_info = UiPathErrorContract(
219-
code="RUNTIME_SHUTDOWN_ERROR",
220-
title="Runtime shutdown failed",
221-
detail=f"Error: {str(e)}",
222-
category=UiPathErrorCategory.SYSTEM,
223-
)
141+
if self.execution_id:
142+
with self.trace_manager.start_execution_span(
143+
self.root_span, execution_id=self.execution_id
144+
):
145+
return await self.delegate.execute(input)
224146
else:
225-
error_info = e.error_info
226-
227-
# Last-ditch effort to write error output
228-
try:
229-
error_result = UiPathRuntimeResult(
230-
status=UiPathRuntimeStatus.FAULTED, error=error_info
231-
)
232-
error_result_content = error_result.to_dict()
233-
if self.context.job_id:
234-
with open(self.context.result_file_path, "w") as f:
235-
json.dump(error_result_content, f, indent=2, default=str)
236-
except Exception as write_error:
237-
logger.error(f"Failed to write error output file: {str(write_error)}")
238-
raise
239-
240-
# Re-raise as RuntimeError if it's not already a UiPathRuntimeError
241-
if not isinstance(e, UiPathRuntimeError):
242-
raise RuntimeError(
243-
error_info.code,
244-
error_info.title,
245-
error_info.detail,
246-
error_info.category,
247-
) from e
248-
raise
147+
return await self.delegate.execute(input)
148+
finally:
149+
self.trace_manager.flush_spans()
150+
if self.log_handler:
151+
log_interceptor.teardown()
152+
153+
@override
154+
async def stream(
155+
self,
156+
root_span: str = "root",
157+
execution_id: Optional[str] = None,
158+
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
159+
"""Stream runtime execution with context.
160+
161+
Args:
162+
runtime: The runtime instance
163+
context: The runtime context
164+
165+
Yields:
166+
UiPathRuntimeEvent instances during execution and final UiPathRuntimeResult
167+
168+
Raises:
169+
UiPathRuntimeStreamNotSupportedError: If the runtime doesn't support streaming
170+
"""
171+
try:
172+
tracer = trace.get_tracer("uipath-runtime")
173+
span_attributes: dict[str, Any] = {}
174+
if execution_id:
175+
span_attributes["execution.id"] = "exec-a"
176+
with tracer.start_as_current_span(root_span, attributes=span_attributes):
177+
async for event in self.delegate.stream():
178+
yield event
249179
finally:
250-
# Restore original logging
251-
if hasattr(self, "logs_interceptor"):
252-
self.logs_interceptor.teardown()
180+
self.trace_manager.flush_spans()
253181

254-
await self.cleanup()
182+
def cleanup(self) -> None:
183+
"""Close runtime resources."""
184+
pass

0 commit comments

Comments
 (0)