-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathbase.py
More file actions
231 lines (188 loc) · 7.57 KB
/
Copy pathbase.py
File metadata and controls
231 lines (188 loc) · 7.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
"""Base runtime class and async context manager implementation."""
import logging
from typing import (
Any,
AsyncGenerator,
Literal,
Protocol,
)
from pydantic import BaseModel, Field
from uipath.core import UiPathTraceManager
from uipath.runtime.events import (
UiPathRuntimeEvent,
)
from uipath.runtime.logging import UiPathRuntimeExecutionLogHandler
from uipath.runtime.logging._interceptor import UiPathRuntimeLogsInterceptor
from uipath.runtime.result import UiPathRuntimeResult
from uipath.runtime.schema import (
UiPathRuntimeSchema,
)
logger = logging.getLogger(__name__)
class UiPathStreamNotSupportedError(NotImplementedError):
"""Raised when a runtime does not support streaming."""
pass
class UiPathExecuteOptions(BaseModel):
"""Execution-time options controlling runtime behavior."""
resume: bool = Field(
default=False,
description="Indicates whether to resume a suspended execution.",
)
breakpoints: list[str] | Literal["*"] | None = Field(
default=None,
description="List of nodes or '*' to break on all steps.",
)
model_config = {"arbitrary_types_allowed": True, "extra": "allow"}
class UiPathStreamOptions(UiPathExecuteOptions):
"""Streaming-specific execution options."""
pass
class UiPathExecutableProtocol(Protocol):
"""UiPath execution interface."""
async def execute(
self,
input: dict[str, Any] | None = None,
options: UiPathExecuteOptions | None = None,
) -> UiPathRuntimeResult:
"""Execute the runtime with the given input and options."""
...
class UiPathStreamableProtocol(Protocol):
"""UiPath streaming interface."""
async def stream(
self,
input: dict[str, Any] | None = None,
options: UiPathStreamOptions | None = None,
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
"""Stream execution events in real-time.
Yields framework-agnostic BaseEvent instances during execution,
with the final event being UiPathRuntimeResult.
Yields:
UiPathRuntimeEvent subclasses: Framework-agnostic events (UiPathRuntimeMessageEvent,
UiPathRuntimeStateEvent, etc.)
Final yield: UiPathRuntimeResult (or its subclass UiPathBreakpointResult)
Raises:
UiPathRuntimeError: If execution fails
Example:
async for event in runtime.stream():
if isinstance(event, UiPathRuntimeResult):
# Last event - execution complete
print(f"Status: {event.status}")
break
elif isinstance(event, UiPathRuntimeMessageEvent):
# Handle message event
print(f"Message: {event.payload}")
elif isinstance(event, UiPathRuntimeStateEvent):
# Handle state update
print(f"State updated by: {event.node_name}")
"""
raise UiPathStreamNotSupportedError(
f"{self.__class__.__name__} does not implement streaming. "
"Use execute() instead."
)
yield
class UiPathSchemaProtocol(Protocol):
"""Contains runtime input and output schema."""
async def get_schema(self) -> UiPathRuntimeSchema:
"""Get schema for a runtime.
Returns: The runtime's schema (entrypoint type, input/output json schema).
"""
...
class UiPathDisposableProtocol(Protocol):
"""UiPath disposable interface."""
async def dispose(self) -> None:
"""Close and clean up resources."""
...
# Note: explicitly marking it as a protocol for mypy.
# https://mypy.readthedocs.io/en/stable/protocols.html#defining-subprotocols-and-subclassing-protocols
# Note that inheriting from an existing protocol does not automatically turn the subclass into a protocol
# – it just creates a regular (non-protocol) class or ABC that implements the given protocol (or protocols).
# The Protocol base class must always be explicitly present if you are defining a protocol.
class UiPathRuntimeProtocol(
UiPathExecutableProtocol,
UiPathStreamableProtocol,
UiPathSchemaProtocol,
UiPathDisposableProtocol,
Protocol,
):
"""UiPath Runtime Protocol."""
class UiPathExecutionRuntime:
"""Handles runtime execution with tracing/telemetry."""
def __init__(
self,
delegate: UiPathRuntimeProtocol,
trace_manager: UiPathTraceManager,
root_span: str = "root",
span_attributes: dict[str, str] | None = None,
log_handler: UiPathRuntimeExecutionLogHandler | None = None,
execution_id: str | None = None,
):
"""Initialize the executor."""
self.delegate = delegate
self.trace_manager = trace_manager
self.root_span = root_span
self.span_attributes = span_attributes
self.execution_id = execution_id
self.log_handler = log_handler
if execution_id is not None and log_handler is None:
self.log_handler = UiPathRuntimeExecutionLogHandler(execution_id)
async def execute(
self,
input: dict[str, Any] | None = None,
options: UiPathExecuteOptions | None = None,
) -> UiPathRuntimeResult:
"""Execute runtime with context."""
if self.log_handler:
log_interceptor = UiPathRuntimeLogsInterceptor(
execution_id=self.execution_id, log_handler=self.log_handler
)
log_interceptor.setup()
try:
if self.execution_id:
with self.trace_manager.start_execution_span(
self.root_span,
execution_id=self.execution_id,
attributes=self.span_attributes,
):
return await self.delegate.execute(input, options=options)
else:
return await self.delegate.execute(input, options=options)
finally:
self.trace_manager.flush_spans()
if self.log_handler:
log_interceptor.teardown()
async def stream(
self,
input: dict[str, Any] | None = None,
options: UiPathStreamOptions | None = None,
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
"""Stream runtime execution with context.
Args:
runtime: The runtime instance
context: The runtime context
Yields:
UiPathRuntimeEvent instances during execution and final UiPathRuntimeResult
Raises:
UiPathStreamNotSupportedError: If the runtime doesn't support streaming
"""
if self.log_handler:
log_interceptor = UiPathRuntimeLogsInterceptor(
execution_id=self.execution_id, log_handler=self.log_handler
)
log_interceptor.setup()
try:
if self.execution_id:
with self.trace_manager.start_execution_span(
self.root_span,
execution_id=self.execution_id,
attributes=self.span_attributes,
):
async for event in self.delegate.stream(input, options=options):
yield event
else:
async for event in self.delegate.stream(input, options=options):
yield event
finally:
self.trace_manager.flush_spans()
if self.log_handler:
log_interceptor.teardown()
async def get_schema(self) -> UiPathRuntimeSchema:
"""Passthrough schema for the delegate."""
return await self.delegate.get_schema()