Skip to content

Commit 73007ed

Browse files
akshaylivecristipufu
authored andcommitted
refactor(RuntimeFactory): refactor runtime and factory
This is done in order to follow single responsibility principles (SRP).
1 parent 58e1959 commit 73007ed

File tree

9 files changed

+661
-963
lines changed

9 files changed

+661
-963
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ dependencies = [
88
"opentelemetry-sdk>=1.38.0",
99
"opentelemetry-instrumentation>=0.59b0",
1010
"pydantic>=2.12.3",
11-
"uipath-core>=0.0.1",
11+
"uipath-core>=0.0.3",
1212
]
1313
classifiers = [
1414
"Intended Audience :: Developers",

src/uipath/runtime/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from uipath.runtime.base import UiPathBaseRuntime, UiPathStreamNotSupportedError
44
from uipath.runtime.context import UiPathRuntimeContext
55
from uipath.runtime.events import UiPathRuntimeEvent
6-
from uipath.runtime.factory import UiPathRuntimeExecutor, UiPathRuntimeFactory
6+
from uipath.runtime.factory import UiPathRuntimeFactory
77
from uipath.runtime.result import (
88
UiPathApiTrigger,
99
UiPathBreakpointResult,
@@ -17,7 +17,6 @@
1717
"UiPathRuntimeContext",
1818
"UiPathBaseRuntime",
1919
"UiPathRuntimeFactory",
20-
"UiPathRuntimeExecutor",
2120
"UiPathRuntimeResult",
2221
"UiPathRuntimeStatus",
2322
"UiPathRuntimeEvent",

src/uipath/runtime/base.py

Lines changed: 105 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,28 @@
11
"""Base runtime class and async context manager implementation."""
22

3-
import json
43
import logging
5-
import os
64
from abc import ABC, abstractmethod
7-
from typing import AsyncGenerator
5+
from typing import (
6+
Any,
7+
AsyncGenerator,
8+
Generic,
9+
Optional,
10+
TypeVar,
11+
)
812

9-
from uipath.runtime.context import UiPathRuntimeContext
10-
from uipath.runtime.errors import (
11-
UiPathErrorCategory,
12-
UiPathErrorCode,
13-
UiPathErrorContract,
14-
UiPathRuntimeError,
13+
from opentelemetry import trace
14+
from opentelemetry.instrumentation.instrumentor import ( # type: ignore[attr-defined]
15+
BaseInstrumentor,
1516
)
17+
from typing_extensions import override
18+
from uipath.core import UiPathTraceManager
19+
20+
from uipath.runtime.context import UiPathRuntimeContext
1621
from uipath.runtime.events import (
1722
UiPathRuntimeEvent,
1823
)
24+
from uipath.runtime.logging import UiPathRuntimeExecutionLogHandler
1925
from uipath.runtime.logging._interceptor import UiPathRuntimeLogsInterceptor
20-
from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus
2126
from uipath.runtime.schema import (
2227
UiPathRuntimeSchema,
2328
)
@@ -37,7 +42,7 @@ class UiPathBaseRuntime(ABC):
3742
This allows using the class with 'async with' statements.
3843
"""
3944

40-
def __init__(self, context: UiPathRuntimeContext):
45+
def __init__(self, context: Optional[UiPathRuntimeContext] = None):
4146
"""Initialize the runtime with the provided context."""
4247
self.context = context
4348

@@ -48,70 +53,10 @@ async def get_schema(self) -> UiPathRuntimeSchema:
4853
"""
4954
raise NotImplementedError()
5055

51-
async def __aenter__(self):
52-
"""Async enter method called when entering the 'async with' block.
53-
54-
Initializes and prepares the runtime environment.
55-
56-
Returns:
57-
The runtime instance
58-
"""
59-
# Read the input from file if provided
60-
if self.context.input_file:
61-
_, file_extension = os.path.splitext(self.context.input_file)
62-
if file_extension != ".json":
63-
raise UiPathRuntimeError(
64-
code=UiPathErrorCode.INVALID_INPUT_FILE_EXTENSION,
65-
title="Invalid Input File Extension",
66-
detail="The provided input file must be in JSON format.",
67-
)
68-
with open(self.context.input_file) as f:
69-
self.context.input = f.read()
70-
71-
try:
72-
if isinstance(self.context.input, str):
73-
if self.context.input.strip():
74-
self.context.input = json.loads(self.context.input)
75-
else:
76-
self.context.input = {}
77-
elif self.context.input is None:
78-
self.context.input = {}
79-
# else: leave it as-is (already a dict, list, bool, etc.)
80-
except json.JSONDecodeError as e:
81-
raise UiPathRuntimeError(
82-
UiPathErrorCode.INPUT_INVALID_JSON,
83-
"Invalid JSON input",
84-
f"The input data is not valid JSON: {str(e)}",
85-
UiPathErrorCategory.USER,
86-
) from e
87-
88-
await self.validate()
89-
90-
# Intercept all stdout/stderr/logs
91-
# Write to file (runtime), stdout (debug) or log handler (if provided)
92-
self.logs_interceptor = UiPathRuntimeLogsInterceptor(
93-
min_level=self.context.logs_min_level,
94-
dir=self.context.runtime_dir,
95-
file=self.context.logs_file,
96-
job_id=self.context.job_id,
97-
execution_id=self.context.execution_id,
98-
log_handler=self.context.log_handler,
99-
)
100-
self.logs_interceptor.setup()
101-
102-
return self
103-
10456
@abstractmethod
105-
async def execute(self) -> UiPathRuntimeResult:
106-
"""Execute with the provided context.
107-
108-
Returns:
109-
Dictionary with execution results
110-
111-
Raises:
112-
RuntimeError: If execution fails
113-
"""
114-
pass
57+
async def execute(self, input: dict[str, Any]) -> Any:
58+
"""Produce the agent output."""
59+
raise NotImplementedError()
11560

11661
async def stream(
11762
self,
@@ -154,101 +99,101 @@ async def stream(
15499
# Without it, the function wouldn't match the AsyncGenerator return type
155100
yield
156101

157-
@abstractmethod
158-
async def validate(self):
159-
"""Validate runtime inputs."""
160-
pass
102+
def get_instrumentor(self) -> Optional[BaseInstrumentor]:
103+
"""Get instrumentor for this runtime. If no instrumentor is available, return None."""
104+
return None
161105

162106
@abstractmethod
163107
async def cleanup(self):
164108
"""Cleaup runtime resources."""
165109
pass
166110

167-
async def __aexit__(self, exc_type, exc_val, exc_tb):
168-
"""Async exit method called when exiting the 'async with' block.
169111

170-
Cleans up resources and handles any exceptions.
112+
T = TypeVar("T", bound=UiPathBaseRuntime)
171113

172-
Always writes output file regardless of whether execution was successful,
173-
suspended, or encountered an error.
174-
"""
114+
115+
class UiPathExecutionRuntime(UiPathBaseRuntime, Generic[T]):
116+
"""Handles runtime execution with tracing/telemetry."""
117+
118+
def __init__(
119+
self,
120+
delegate: T,
121+
trace_manager: UiPathTraceManager,
122+
root_span: str = "root",
123+
execution_id: Optional[str] = None,
124+
):
125+
"""Initialize the executor."""
126+
self.delegate = delegate
127+
self.trace_manager = trace_manager
128+
self.root_span = root_span
129+
self.execution_id = execution_id
130+
self.log_handler: Optional[UiPathRuntimeExecutionLogHandler]
131+
if execution_id is not None:
132+
self.log_handler = UiPathRuntimeExecutionLogHandler(execution_id)
133+
else:
134+
self.log_handler = None
135+
136+
async def execute(
137+
self,
138+
input: dict[str, Any],
139+
) -> Any:
140+
"""Execute runtime with context."""
141+
instrumentor = self.delegate.get_instrumentor()
142+
log_interceptor = UiPathRuntimeLogsInterceptor(
143+
execution_id=self.execution_id, log_handler=self.log_handler
144+
)
145+
log_interceptor.setup()
146+
147+
if instrumentor is not None:
148+
instrumentor.instrument(tracer_provider=self.trace_manager.tracer_provider)
175149
try:
176-
if self.context.result is None:
177-
execution_result = UiPathRuntimeResult()
178-
else:
179-
execution_result = self.context.result
180-
181-
if exc_type:
182-
# Create error info from exception
183-
if isinstance(exc_val, UiPathRuntimeError):
184-
error_info = exc_val.error_info
185-
else:
186-
# Generic error
187-
error_info = UiPathErrorContract(
188-
code=f"ERROR_{exc_type.__name__}",
189-
title=f"Runtime error: {exc_type.__name__}",
190-
detail=str(exc_val),
191-
category=UiPathErrorCategory.UNKNOWN,
192-
)
193-
194-
execution_result.status = UiPathRuntimeStatus.FAULTED
195-
execution_result.error = error_info
196-
197-
content = execution_result.to_dict()
198-
199-
# Always write output file at runtime, except for inner runtimes
200-
# Inner runtimes have execution_id
201-
if self.context.job_id and not self.context.execution_id:
202-
with open(self.context.result_file_path, "w") as f:
203-
json.dump(content, f, indent=2, default=str)
204-
205-
# Write the execution output to file if requested
206-
if self.context.output_file:
207-
with open(self.context.output_file, "w") as f:
208-
f.write(content.get("output", "{}"))
209-
210-
# Don't suppress exceptions
211-
return False
212-
213-
except Exception as e:
214-
logger.error(f"Error during runtime shutdown: {str(e)}")
215-
216-
# Create a fallback error result if we fail during cleanup
217-
if not isinstance(e, UiPathRuntimeError):
218-
error_info = UiPathErrorContract(
219-
code="RUNTIME_SHUTDOWN_ERROR",
220-
title="Runtime shutdown failed",
221-
detail=f"Error: {str(e)}",
222-
category=UiPathErrorCategory.SYSTEM,
223-
)
150+
if self.execution_id:
151+
with self.trace_manager.start_execution_span(
152+
self.root_span, execution_id=self.execution_id
153+
):
154+
return await self.delegate.execute(input)
224155
else:
225-
error_info = e.error_info
226-
227-
# Last-ditch effort to write error output
228-
try:
229-
error_result = UiPathRuntimeResult(
230-
status=UiPathRuntimeStatus.FAULTED, error=error_info
231-
)
232-
error_result_content = error_result.to_dict()
233-
if self.context.job_id:
234-
with open(self.context.result_file_path, "w") as f:
235-
json.dump(error_result_content, f, indent=2, default=str)
236-
except Exception as write_error:
237-
logger.error(f"Failed to write error output file: {str(write_error)}")
238-
raise
239-
240-
# Re-raise as RuntimeError if it's not already a UiPathRuntimeError
241-
if not isinstance(e, UiPathRuntimeError):
242-
raise RuntimeError(
243-
error_info.code,
244-
error_info.title,
245-
error_info.detail,
246-
error_info.category,
247-
) from e
248-
raise
156+
return await self.delegate.execute(input)
249157
finally:
250-
# Restore original logging
251-
if hasattr(self, "logs_interceptor"):
252-
self.logs_interceptor.teardown()
158+
self.trace_manager.flush_spans()
159+
if instrumentor is not None:
160+
instrumentor.uninstrument()
161+
log_interceptor.teardown()
253162

254-
await self.cleanup()
163+
@override
164+
async def stream(
165+
self,
166+
root_span: str = "root",
167+
execution_id: Optional[str] = None,
168+
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
169+
"""Stream runtime execution with context.
170+
171+
Args:
172+
runtime: The runtime instance
173+
context: The runtime context
174+
175+
Yields:
176+
UiPathRuntimeEvent instances during execution and final UiPathRuntimeResult
177+
178+
Raises:
179+
UiPathRuntimeStreamNotSupportedError: If the runtime doesn't support streaming
180+
"""
181+
instrumentor = self.delegate.get_instrumentor()
182+
if instrumentor is not None:
183+
instrumentor.instrument(tracer_provider=self.trace_manager.tracer_provider)
184+
try:
185+
tracer = trace.get_tracer("uipath-runtime")
186+
span_attributes: dict[str, Any] = {}
187+
if execution_id:
188+
span_attributes["execution.id"] = "exec-a"
189+
with tracer.start_as_current_span(root_span, attributes=span_attributes):
190+
async for event in self.delegate.stream():
191+
yield event
192+
finally:
193+
self.trace_manager.flush_spans()
194+
if instrumentor is not None:
195+
instrumentor.uninstrument()
196+
197+
def cleanup(self) -> None:
198+
"""Close runtime resources."""
199+
pass

0 commit comments

Comments
 (0)