Skip to content

Commit ae5a2fe

Browse files
committed
fix: runtime context as context manager
1 parent 73007ed commit ae5a2fe

File tree

4 files changed

+189
-48
lines changed

4 files changed

+189
-48
lines changed

src/uipath/runtime/base.py

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
)
2424
from uipath.runtime.logging import UiPathRuntimeExecutionLogHandler
2525
from uipath.runtime.logging._interceptor import UiPathRuntimeLogsInterceptor
26+
from uipath.runtime.result import UiPathRuntimeResult
2627
from uipath.runtime.schema import (
2728
UiPathRuntimeSchema,
2829
)
@@ -54,12 +55,13 @@ async def get_schema(self) -> UiPathRuntimeSchema:
5455
raise NotImplementedError()
5556

5657
@abstractmethod
57-
async def execute(self, input: dict[str, Any]) -> Any:
58+
async def execute(self, input: dict[str, Any]) -> UiPathRuntimeResult:
5859
"""Produce the agent output."""
5960
raise NotImplementedError()
6061

6162
async def stream(
6263
self,
64+
input: dict[str, Any],
6365
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
6466
"""Stream execution events in real-time.
6567
@@ -127,22 +129,21 @@ def __init__(
127129
self.trace_manager = trace_manager
128130
self.root_span = root_span
129131
self.execution_id = execution_id
130-
self.log_handler: Optional[UiPathRuntimeExecutionLogHandler]
132+
self.log_handler: Optional[UiPathRuntimeExecutionLogHandler] = None
131133
if execution_id is not None:
132134
self.log_handler = UiPathRuntimeExecutionLogHandler(execution_id)
133-
else:
134-
self.log_handler = None
135135

136136
async def execute(
137137
self,
138138
input: dict[str, Any],
139-
) -> Any:
139+
) -> UiPathRuntimeResult:
140140
"""Execute runtime with context."""
141141
instrumentor = self.delegate.get_instrumentor()
142-
log_interceptor = UiPathRuntimeLogsInterceptor(
143-
execution_id=self.execution_id, log_handler=self.log_handler
144-
)
145-
log_interceptor.setup()
142+
if self.log_handler:
143+
log_interceptor = UiPathRuntimeLogsInterceptor(
144+
execution_id=self.execution_id, log_handler=self.log_handler
145+
)
146+
log_interceptor.setup()
146147

147148
if instrumentor is not None:
148149
instrumentor.instrument(tracer_provider=self.trace_manager.tracer_provider)
@@ -158,13 +159,13 @@ async def execute(
158159
self.trace_manager.flush_spans()
159160
if instrumentor is not None:
160161
instrumentor.uninstrument()
161-
log_interceptor.teardown()
162+
if self.log_handler:
163+
log_interceptor.teardown()
162164

163165
@override
164166
async def stream(
165167
self,
166-
root_span: str = "root",
167-
execution_id: Optional[str] = None,
168+
input: dict[str, Any],
168169
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
169170
"""Stream runtime execution with context.
170171
@@ -184,10 +185,12 @@ async def stream(
184185
try:
185186
tracer = trace.get_tracer("uipath-runtime")
186187
span_attributes: dict[str, Any] = {}
187-
if execution_id:
188+
if self.execution_id:
188189
span_attributes["execution.id"] = "exec-a"
189-
with tracer.start_as_current_span(root_span, attributes=span_attributes):
190-
async for event in self.delegate.stream():
190+
with tracer.start_as_current_span(
191+
self.root_span, attributes=span_attributes
192+
):
193+
async for event in self.delegate.stream(input):
191194
yield event
192195
finally:
193196
self.trace_manager.flush_spans()

src/uipath/runtime/context.py

Lines changed: 146 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,16 @@
1616
from pydantic import BaseModel
1717
from uipath.core.tracing.context import UiPathTraceContext
1818

19-
from uipath.runtime.result import UiPathRuntimeResult
19+
from uipath.runtime.errors import (
20+
UiPathErrorCategory,
21+
UiPathErrorCode,
22+
UiPathErrorContract,
23+
UiPathRuntimeError,
24+
)
25+
from uipath.runtime.logging._interceptor import UiPathRuntimeLogsInterceptor
26+
from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus
27+
28+
logger = logging.getLogger(__name__)
2029

2130
C = TypeVar("C", bound="UiPathRuntimeContext")
2231

@@ -28,7 +37,6 @@ class UiPathRuntimeContext(BaseModel):
2837
input: Optional[Any] = None
2938
resume: bool = False
3039
job_id: Optional[str] = None
31-
execution_id: Optional[str] = None
3240
trace_context: Optional[UiPathTraceContext] = None
3341
config_path: str = "uipath.json"
3442
runtime_dir: Optional[str] = "__uipath"
@@ -38,13 +46,148 @@ class UiPathRuntimeContext(BaseModel):
3846
output_file: Optional[str] = None
3947
trace_file: Optional[str] = None
4048
logs_file: Optional[str] = "execution.log"
41-
log_handler: Optional[logging.Handler] = None
4249
logs_min_level: Optional[str] = "INFO"
4350
breakpoints: Optional[List[str] | Literal["*"]] = None
4451
result: Optional[UiPathRuntimeResult] = None
4552

4653
model_config = {"arbitrary_types_allowed": True, "extra": "allow"}
4754

55+
def __enter__(self):
56+
"""Async enter method called when entering the 'async with' block.
57+
58+
Initializes and prepares the runtime contextual environment.
59+
60+
Returns:
61+
The runtime context instance
62+
"""
63+
# Read the input from file if provided
64+
if self.input_file:
65+
_, file_extension = os.path.splitext(self.input_file)
66+
if file_extension != ".json":
67+
raise UiPathRuntimeError(
68+
code=UiPathErrorCode.INVALID_INPUT_FILE_EXTENSION,
69+
title="Invalid Input File Extension",
70+
detail="The provided input file must be in JSON format.",
71+
)
72+
with open(self.input_file) as f:
73+
self.input = f.read()
74+
75+
try:
76+
if isinstance(self.input, str):
77+
if self.input.strip():
78+
self.input = json.loads(self.input)
79+
else:
80+
self.input = {}
81+
elif self.input is None:
82+
self.input = {}
83+
# else: leave it as-is (already a dict, list, bool, etc.)
84+
except json.JSONDecodeError as e:
85+
raise UiPathRuntimeError(
86+
UiPathErrorCode.INPUT_INVALID_JSON,
87+
"Invalid JSON input",
88+
f"The input data is not valid JSON: {str(e)}",
89+
UiPathErrorCategory.USER,
90+
) from e
91+
92+
# Intercept all stdout/stderr/logs
93+
# Write to file (runtime), stdout (debug) or log handler (if provided)
94+
self.logs_interceptor = UiPathRuntimeLogsInterceptor(
95+
min_level=self.logs_min_level,
96+
dir=self.runtime_dir,
97+
file=self.logs_file,
98+
job_id=self.job_id,
99+
)
100+
self.logs_interceptor.setup()
101+
102+
return self
103+
104+
def __exit__(self, exc_type, exc_val, exc_tb):
105+
"""Async exit method called when exiting the 'async with' block.
106+
107+
Cleans up resources and handles any exceptions.
108+
109+
Always writes output file regardless of whether execution was successful,
110+
suspended, or encountered an error.
111+
"""
112+
try:
113+
if self.result is None:
114+
execution_result = UiPathRuntimeResult()
115+
else:
116+
execution_result = self.result
117+
118+
if exc_type:
119+
# Create error info from exception
120+
if isinstance(exc_val, UiPathRuntimeError):
121+
error_info = exc_val.error_info
122+
else:
123+
# Generic error
124+
error_info = UiPathErrorContract(
125+
code=f"ERROR_{exc_type.__name__}",
126+
title=f"Runtime error: {exc_type.__name__}",
127+
detail=str(exc_val),
128+
category=UiPathErrorCategory.UNKNOWN,
129+
)
130+
131+
execution_result.status = UiPathRuntimeStatus.FAULTED
132+
execution_result.error = error_info
133+
134+
content = execution_result.to_dict()
135+
136+
# Always write output file at runtime, except for inner runtimes
137+
# Inner runtimes have execution_id
138+
if self.job_id:
139+
with open(self.result_file_path, "w") as f:
140+
json.dump(content, f, indent=2, default=str)
141+
142+
# Write the execution output to file if requested
143+
if self.output_file:
144+
with open(self.output_file, "w") as f:
145+
f.write(content.get("output", "{}"))
146+
147+
# Don't suppress exceptions
148+
return False
149+
150+
except Exception as e:
151+
logger.error(f"Error during runtime shutdown: {str(e)}")
152+
153+
# Create a fallback error result if we fail during cleanup
154+
if not isinstance(e, UiPathRuntimeError):
155+
error_info = UiPathErrorContract(
156+
code="RUNTIME_SHUTDOWN_ERROR",
157+
title="Runtime shutdown failed",
158+
detail=f"Error: {str(e)}",
159+
category=UiPathErrorCategory.SYSTEM,
160+
)
161+
else:
162+
error_info = e.error_info
163+
164+
# Last-ditch effort to write error output
165+
try:
166+
error_result = UiPathRuntimeResult(
167+
status=UiPathRuntimeStatus.FAULTED, error=error_info
168+
)
169+
error_result_content = error_result.to_dict()
170+
if self.job_id:
171+
with open(self.result_file_path, "w") as f:
172+
json.dump(error_result_content, f, indent=2, default=str)
173+
except Exception as write_error:
174+
logger.error(f"Failed to write error output file: {str(write_error)}")
175+
raise
176+
177+
# Re-raise as RuntimeError if it's not already a UiPathRuntimeError
178+
if not isinstance(e, UiPathRuntimeError):
179+
raise RuntimeError(
180+
error_info.code,
181+
error_info.title,
182+
error_info.detail,
183+
error_info.category,
184+
) from e
185+
raise
186+
finally:
187+
# Restore original logging
188+
if hasattr(self, "logs_interceptor"):
189+
self.logs_interceptor.teardown()
190+
48191
@cached_property
49192
def result_file_path(self) -> str:
50193
"""Get the full path to the result file."""

src/uipath/runtime/debug/runtime.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Debug runtime implementation."""
22

33
import logging
4-
from typing import Generic, TypeVar
4+
from typing import Any, Generic, TypeVar
55

66
from uipath.runtime import (
77
UiPathBaseRuntime,
@@ -36,7 +36,7 @@ def __init__(
3636
self.delegate: T = delegate
3737
self.debug_bridge: UiPathDebugBridge = debug_bridge
3838

39-
async def execute(self) -> UiPathRuntimeResult:
39+
async def execute(self, input: dict[str, Any]) -> UiPathRuntimeResult:
4040
"""Execute the workflow with debug support."""
4141
try:
4242
await self.debug_bridge.connect()
@@ -46,14 +46,14 @@ async def execute(self) -> UiPathRuntimeResult:
4646
result: UiPathRuntimeResult
4747
# Try to stream events from inner runtime
4848
try:
49-
result = await self._stream_and_debug(self.delegate)
49+
result = await self._stream_and_debug(self.delegate, input)
5050
except UiPathStreamNotSupportedError:
5151
# Fallback to regular execute if streaming not supported
5252
logger.debug(
5353
f"Runtime {self.delegate.__class__.__name__} does not support "
5454
"streaming, falling back to execute()"
5555
)
56-
result = await self.delegate.execute()
56+
result = await self.delegate.execute(input)
5757

5858
await self.debug_bridge.emit_execution_completed(result)
5959

@@ -71,7 +71,9 @@ async def execute(self) -> UiPathRuntimeResult:
7171
)
7272
raise
7373

74-
async def _stream_and_debug(self, inner_runtime: T) -> UiPathRuntimeResult:
74+
async def _stream_and_debug(
75+
self, inner_runtime: T, input: dict[str, Any]
76+
) -> UiPathRuntimeResult:
7577
"""Stream events from inner runtime and handle debug interactions."""
7678
final_result: UiPathRuntimeResult
7779
execution_completed = False
@@ -84,7 +86,7 @@ async def _stream_and_debug(self, inner_runtime: T) -> UiPathRuntimeResult:
8486
# Update breakpoints from debug bridge
8587
inner_runtime.context.breakpoints = self.debug_bridge.get_breakpoints()
8688
# Stream events from inner runtime
87-
async for event in inner_runtime.stream():
89+
async for event in inner_runtime.stream(input):
8890
# Handle final result
8991
if isinstance(event, UiPathRuntimeResult):
9092
final_result = event
@@ -115,10 +117,6 @@ async def _stream_and_debug(self, inner_runtime: T) -> UiPathRuntimeResult:
115117

116118
return final_result
117119

118-
async def validate(self) -> None:
119-
"""Validate runtime configuration."""
120-
await self.delegate.validate()
121-
122120
async def cleanup(self) -> None:
123121
"""Cleanup runtime resources."""
124122
try:

0 commit comments

Comments
 (0)