Skip to content

Commit b18ff9f

Browse files
committed
fix: runtime context as context manager
1 parent 40fe6b3 commit b18ff9f

File tree

4 files changed

+200
-52
lines changed

4 files changed

+200
-52
lines changed

src/uipath/runtime/base.py

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
TypeVar,
1111
)
1212

13-
from opentelemetry import trace
1413
from typing_extensions import override
1514
from uipath.core import UiPathTraceManager
1615

@@ -20,6 +19,7 @@
2019
)
2120
from uipath.runtime.logging import UiPathRuntimeExecutionLogHandler
2221
from uipath.runtime.logging._interceptor import UiPathRuntimeLogsInterceptor
22+
from uipath.runtime.result import UiPathRuntimeResult
2323
from uipath.runtime.schema import (
2424
UiPathRuntimeSchema,
2525
)
@@ -51,12 +51,13 @@ async def get_schema(self) -> UiPathRuntimeSchema:
5151
raise NotImplementedError()
5252

5353
@abstractmethod
54-
async def execute(self, input: dict[str, Any]) -> Any:
54+
async def execute(self, input: dict[str, Any]) -> UiPathRuntimeResult:
5555
"""Produce the agent output."""
5656
raise NotImplementedError()
5757

5858
async def stream(
5959
self,
60+
input: dict[str, Any],
6061
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
6162
"""Stream execution events in real-time.
6263
@@ -120,16 +121,14 @@ def __init__(
120121
self.trace_manager = trace_manager
121122
self.root_span = root_span
122123
self.execution_id = execution_id
123-
self.log_handler: Optional[UiPathRuntimeExecutionLogHandler]
124+
self.log_handler: Optional[UiPathRuntimeExecutionLogHandler] = None
124125
if execution_id is not None:
125126
self.log_handler = UiPathRuntimeExecutionLogHandler(execution_id)
126-
else:
127-
self.log_handler = None
128127

129128
async def execute(
130129
self,
131130
input: dict[str, Any],
132-
) -> Any:
131+
) -> UiPathRuntimeResult:
133132
"""Execute runtime with context."""
134133
if self.log_handler:
135134
log_interceptor = UiPathRuntimeLogsInterceptor(
@@ -153,8 +152,7 @@ async def execute(
153152
@override
154153
async def stream(
155154
self,
156-
root_span: str = "root",
157-
execution_id: Optional[str] = None,
155+
input: dict[str, Any],
158156
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
159157
"""Stream runtime execution with context.
160158
@@ -166,18 +164,24 @@ async def stream(
166164
UiPathRuntimeEvent instances during execution and final UiPathRuntimeResult
167165
168166
Raises:
169-
UiPathRuntimeStreamNotSupportedError: If the runtime doesn't support streaming
167+
UiPathStreamNotSupportedError: If the runtime doesn't support streaming
170168
"""
169+
if self.log_handler:
170+
log_interceptor = UiPathRuntimeLogsInterceptor(
171+
execution_id=self.execution_id, log_handler=self.log_handler
172+
)
173+
log_interceptor.setup()
171174
try:
172-
tracer = trace.get_tracer("uipath-runtime")
173-
span_attributes: dict[str, Any] = {}
174-
if execution_id:
175-
span_attributes["execution.id"] = "exec-a"
176-
with tracer.start_as_current_span(root_span, attributes=span_attributes):
177-
async for event in self.delegate.stream():
178-
yield event
175+
if self.execution_id:
176+
with self.trace_manager.start_execution_span(
177+
self.root_span, execution_id=self.execution_id
178+
):
179+
async for event in self.delegate.stream(input):
180+
yield event
179181
finally:
180182
self.trace_manager.flush_spans()
183+
if self.log_handler:
184+
log_interceptor.teardown()
181185

182186
def cleanup(self) -> None:
183187
"""Close runtime resources."""

src/uipath/runtime/context.py

Lines changed: 146 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,16 @@
1616
from pydantic import BaseModel
1717
from uipath.core.tracing.context import UiPathTraceContext
1818

19-
from uipath.runtime.result import UiPathRuntimeResult
19+
from uipath.runtime.errors import (
20+
UiPathErrorCategory,
21+
UiPathErrorCode,
22+
UiPathErrorContract,
23+
UiPathRuntimeError,
24+
)
25+
from uipath.runtime.logging._interceptor import UiPathRuntimeLogsInterceptor
26+
from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus
27+
28+
logger = logging.getLogger(__name__)
2029

2130
C = TypeVar("C", bound="UiPathRuntimeContext")
2231

@@ -28,7 +37,6 @@ class UiPathRuntimeContext(BaseModel):
2837
input: Optional[Any] = None
2938
resume: bool = False
3039
job_id: Optional[str] = None
31-
execution_id: Optional[str] = None
3240
trace_context: Optional[UiPathTraceContext] = None
3341
config_path: str = "uipath.json"
3442
runtime_dir: Optional[str] = "__uipath"
@@ -38,13 +46,148 @@ class UiPathRuntimeContext(BaseModel):
3846
output_file: Optional[str] = None
3947
trace_file: Optional[str] = None
4048
logs_file: Optional[str] = "execution.log"
41-
log_handler: Optional[logging.Handler] = None
4249
logs_min_level: Optional[str] = "INFO"
4350
breakpoints: Optional[List[str] | Literal["*"]] = None
4451
result: Optional[UiPathRuntimeResult] = None
4552

4653
model_config = {"arbitrary_types_allowed": True, "extra": "allow"}
4754

55+
def __enter__(self):
56+
"""Async enter method called when entering the 'async with' block.
57+
58+
Initializes and prepares the runtime contextual environment.
59+
60+
Returns:
61+
The runtime context instance
62+
"""
63+
# Read the input from file if provided
64+
if self.input_file:
65+
_, file_extension = os.path.splitext(self.input_file)
66+
if file_extension != ".json":
67+
raise UiPathRuntimeError(
68+
code=UiPathErrorCode.INVALID_INPUT_FILE_EXTENSION,
69+
title="Invalid Input File Extension",
70+
detail="The provided input file must be in JSON format.",
71+
)
72+
with open(self.input_file) as f:
73+
self.input = f.read()
74+
75+
try:
76+
if isinstance(self.input, str):
77+
if self.input.strip():
78+
self.input = json.loads(self.input)
79+
else:
80+
self.input = {}
81+
elif self.input is None:
82+
self.input = {}
83+
# else: leave it as-is (already a dict, list, bool, etc.)
84+
except json.JSONDecodeError as e:
85+
raise UiPathRuntimeError(
86+
UiPathErrorCode.INPUT_INVALID_JSON,
87+
"Invalid JSON input",
88+
f"The input data is not valid JSON: {str(e)}",
89+
UiPathErrorCategory.USER,
90+
) from e
91+
92+
# Intercept all stdout/stderr/logs
93+
# Write to file (runtime), stdout (debug) or log handler (if provided)
94+
self.logs_interceptor = UiPathRuntimeLogsInterceptor(
95+
min_level=self.logs_min_level,
96+
dir=self.runtime_dir,
97+
file=self.logs_file,
98+
job_id=self.job_id,
99+
)
100+
self.logs_interceptor.setup()
101+
102+
return self
103+
104+
def __exit__(self, exc_type, exc_val, exc_tb):
105+
"""Async exit method called when exiting the 'async with' block.
106+
107+
Cleans up resources and handles any exceptions.
108+
109+
Always writes output file regardless of whether execution was successful,
110+
suspended, or encountered an error.
111+
"""
112+
try:
113+
if self.result is None:
114+
execution_result = UiPathRuntimeResult()
115+
else:
116+
execution_result = self.result
117+
118+
if exc_type:
119+
# Create error info from exception
120+
if isinstance(exc_val, UiPathRuntimeError):
121+
error_info = exc_val.error_info
122+
else:
123+
# Generic error
124+
error_info = UiPathErrorContract(
125+
code=f"ERROR_{exc_type.__name__}",
126+
title=f"Runtime error: {exc_type.__name__}",
127+
detail=str(exc_val),
128+
category=UiPathErrorCategory.UNKNOWN,
129+
)
130+
131+
execution_result.status = UiPathRuntimeStatus.FAULTED
132+
execution_result.error = error_info
133+
134+
content = execution_result.to_dict()
135+
136+
# Always write output file at runtime, except for inner runtimes
137+
# Inner runtimes have execution_id
138+
if self.job_id:
139+
with open(self.result_file_path, "w") as f:
140+
json.dump(content, f, indent=2, default=str)
141+
142+
# Write the execution output to file if requested
143+
if self.output_file:
144+
with open(self.output_file, "w") as f:
145+
f.write(content.get("output", "{}"))
146+
147+
# Don't suppress exceptions
148+
return False
149+
150+
except Exception as e:
151+
logger.error(f"Error during runtime shutdown: {str(e)}")
152+
153+
# Create a fallback error result if we fail during cleanup
154+
if not isinstance(e, UiPathRuntimeError):
155+
error_info = UiPathErrorContract(
156+
code="RUNTIME_SHUTDOWN_ERROR",
157+
title="Runtime shutdown failed",
158+
detail=f"Error: {str(e)}",
159+
category=UiPathErrorCategory.SYSTEM,
160+
)
161+
else:
162+
error_info = e.error_info
163+
164+
# Last-ditch effort to write error output
165+
try:
166+
error_result = UiPathRuntimeResult(
167+
status=UiPathRuntimeStatus.FAULTED, error=error_info
168+
)
169+
error_result_content = error_result.to_dict()
170+
if self.job_id:
171+
with open(self.result_file_path, "w") as f:
172+
json.dump(error_result_content, f, indent=2, default=str)
173+
except Exception as write_error:
174+
logger.error(f"Failed to write error output file: {str(write_error)}")
175+
raise
176+
177+
# Re-raise as RuntimeError if it's not already a UiPathRuntimeError
178+
if not isinstance(e, UiPathRuntimeError):
179+
raise RuntimeError(
180+
error_info.code,
181+
error_info.title,
182+
error_info.detail,
183+
error_info.category,
184+
) from e
185+
raise
186+
finally:
187+
# Restore original logging
188+
if hasattr(self, "logs_interceptor"):
189+
self.logs_interceptor.teardown()
190+
48191
@cached_property
49192
def result_file_path(self) -> str:
50193
"""Get the full path to the result file."""

src/uipath/runtime/debug/runtime.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Debug runtime implementation."""
22

33
import logging
4-
from typing import Generic, TypeVar
4+
from typing import Any, Generic, TypeVar
55

66
from uipath.runtime import (
77
UiPathBaseRuntime,
@@ -36,7 +36,7 @@ def __init__(
3636
self.delegate: T = delegate
3737
self.debug_bridge: UiPathDebugBridge = debug_bridge
3838

39-
async def execute(self) -> UiPathRuntimeResult:
39+
async def execute(self, input: dict[str, Any]) -> UiPathRuntimeResult:
4040
"""Execute the workflow with debug support."""
4141
try:
4242
await self.debug_bridge.connect()
@@ -46,14 +46,14 @@ async def execute(self) -> UiPathRuntimeResult:
4646
result: UiPathRuntimeResult
4747
# Try to stream events from inner runtime
4848
try:
49-
result = await self._stream_and_debug(self.delegate)
49+
result = await self._stream_and_debug(self.delegate, input)
5050
except UiPathStreamNotSupportedError:
5151
# Fallback to regular execute if streaming not supported
5252
logger.debug(
5353
f"Runtime {self.delegate.__class__.__name__} does not support "
5454
"streaming, falling back to execute()"
5555
)
56-
result = await self.delegate.execute()
56+
result = await self.delegate.execute(input)
5757

5858
await self.debug_bridge.emit_execution_completed(result)
5959

@@ -71,7 +71,9 @@ async def execute(self) -> UiPathRuntimeResult:
7171
)
7272
raise
7373

74-
async def _stream_and_debug(self, inner_runtime: T) -> UiPathRuntimeResult:
74+
async def _stream_and_debug(
75+
self, inner_runtime: T, input: dict[str, Any]
76+
) -> UiPathRuntimeResult:
7577
"""Stream events from inner runtime and handle debug interactions."""
7678
final_result: UiPathRuntimeResult
7779
execution_completed = False
@@ -82,9 +84,10 @@ async def _stream_and_debug(self, inner_runtime: T) -> UiPathRuntimeResult:
8284
# Keep streaming until execution completes (not just paused at breakpoint)
8385
while not execution_completed:
8486
# Update breakpoints from debug bridge
85-
inner_runtime.context.breakpoints = self.debug_bridge.get_breakpoints()
87+
if inner_runtime.context is not None:
88+
inner_runtime.context.breakpoints = self.debug_bridge.get_breakpoints()
8689
# Stream events from inner runtime
87-
async for event in inner_runtime.stream():
90+
async for event in inner_runtime.stream(input):
8891
# Handle final result
8992
if isinstance(event, UiPathRuntimeResult):
9093
final_result = event
@@ -96,7 +99,8 @@ async def _stream_and_debug(self, inner_runtime: T) -> UiPathRuntimeResult:
9699
await self.debug_bridge.emit_breakpoint_hit(event)
97100
await self.debug_bridge.wait_for_resume()
98101

99-
self.delegate.context.resume = True
102+
if inner_runtime.context is not None:
103+
inner_runtime.context.resume = True
100104

101105
except UiPathDebugQuitError:
102106
final_result = UiPathRuntimeResult(
@@ -115,10 +119,6 @@ async def _stream_and_debug(self, inner_runtime: T) -> UiPathRuntimeResult:
115119

116120
return final_result
117121

118-
async def validate(self) -> None:
119-
"""Validate runtime configuration."""
120-
await self.delegate.validate()
121-
122122
async def cleanup(self) -> None:
123123
"""Cleanup runtime resources."""
124124
try:

0 commit comments

Comments
 (0)