-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_executor.py
More file actions
258 lines (207 loc) · 8.84 KB
/
test_executor.py
File metadata and controls
258 lines (207 loc) · 8.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
"""Simple test for runtime factory and executor span capture."""
import logging
import sys
from typing import Any, AsyncGenerator, TypeVar
import pytest
from opentelemetry import trace
from uipath.core import UiPathTraceManager
from uipath.runtime import (
UiPathExecuteOptions,
UiPathExecutionRuntime,
UiPathRuntimeEvent,
UiPathRuntimeProtocol,
)
from uipath.runtime.base import UiPathStreamOptions
from uipath.runtime.logging._interceptor import UiPathRuntimeLogsInterceptor
from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus
from uipath.runtime.schema import UiPathRuntimeSchema
class BaseMockRuntime:
"""Base runtime class with unimplemented methods."""
async def get_schema(self) -> UiPathRuntimeSchema:
"""NotImplemented"""
raise NotImplementedError()
async def stream(
self,
input: dict[str, Any] | None = None,
options: UiPathStreamOptions | None = None,
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
"""NotImplemented"""
raise NotImplementedError()
yield
async def dispose(self) -> None:
"""Dispose resources if any."""
pass
class MockRuntimeA(BaseMockRuntime):
"""Mock runtime A for testing."""
async def execute(
self,
input: dict[str, Any] | None = None,
options: UiPathExecuteOptions | None = None,
) -> UiPathRuntimeResult:
print(f"executing {input}")
return UiPathRuntimeResult(
output={"runtime": "A"}, status=UiPathRuntimeStatus.SUCCESSFUL
)
class MockRuntimeB(BaseMockRuntime):
"""Mock runtime B for testing."""
async def execute(
self,
input: dict[str, Any] | None = None,
options: UiPathExecuteOptions | None = None,
) -> UiPathRuntimeResult:
print(f"executing {input}")
return UiPathRuntimeResult(
output={"runtime": "B"}, status=UiPathRuntimeStatus.SUCCESSFUL
)
class MockRuntimeC(BaseMockRuntime):
"""Mock runtime C that emits custom spans."""
async def execute(
self,
input: dict[str, Any] | None = None,
options: UiPathExecuteOptions | None = None,
) -> UiPathRuntimeResult:
print(f"executing {input}")
tracer = trace.get_tracer("test-runtime-c")
# Create a child span
with tracer.start_as_current_span(
"custom-child-span", attributes={"operation": "child", "step": "1"}
):
# Simulate some work
pass
# Create a sibling span
with tracer.start_as_current_span(
"custom-sibling-span", attributes={"operation": "sibling", "step": "2"}
):
# Simulate more work
pass
# Create nested spans
with tracer.start_as_current_span(
"parent-operation", attributes={"operation": "parent"}
):
with tracer.start_as_current_span(
"nested-child-operation", attributes={"operation": "nested"}
):
pass
return UiPathRuntimeResult(
output={"runtime": "C", "spans_created": 4},
status=UiPathRuntimeStatus.SUCCESSFUL,
)
T = TypeVar("T", bound=UiPathRuntimeProtocol)
class UiPathTestRuntimeFactory:
def __init__(self, runtime_class: type[UiPathRuntimeProtocol]):
self.runtime_class = runtime_class
async def new_runtime(
self, entrypoint: str, runtime_id: str
) -> UiPathRuntimeProtocol:
return self.runtime_class()
@pytest.fixture(autouse=True)
def _isolate_logging():
"""Save and restore logging state so tests don't leak into each other."""
root = logging.getLogger()
original_level = root.level
original_handlers = list(root.handlers)
original_stdout = sys.stdout
original_stderr = sys.stderr
yield
root.setLevel(original_level)
root.handlers = original_handlers
sys.stdout = original_stdout
sys.stderr = original_stderr
logging.disable(logging.NOTSET)
@pytest.mark.asyncio
async def test_multiple_factories_same_executor(tmp_path):
"""Test factories using same trace manager, verify spans are captured correctly."""
# Set up a master interceptor so that sys.stdout is a LoggerWriter,
# matching real production usage where UiPathRuntimeContext provides one.
master = UiPathRuntimeLogsInterceptor(
job_id="test-job", dir=str(tmp_path), file="test.log"
)
master.setup()
trace_manager = UiPathTraceManager()
# Create factories for different runtimes
factory_a = UiPathTestRuntimeFactory(MockRuntimeA)
factory_b = UiPathTestRuntimeFactory(MockRuntimeB)
factory_c = UiPathTestRuntimeFactory(MockRuntimeC)
# Execute runtime A
runtime_a = await factory_a.new_runtime(entrypoint="", runtime_id="runtime-a")
execution_runtime_a = UiPathExecutionRuntime(
runtime_a, trace_manager, "runtime-a-span", execution_id="exec-a"
)
result_a = await execution_runtime_a.execute({"input": "a"})
# Execute runtime B
runtime_b = await factory_b.new_runtime(entrypoint="", runtime_id="runtime-b")
execution_runtime_b = UiPathExecutionRuntime(
runtime_b, trace_manager, "runtime-b-span", execution_id="exec-b"
)
result_b = await execution_runtime_b.execute({"input": "b"})
# Execute runtime C with custom spans
runtime_c = await factory_c.new_runtime(entrypoint="", runtime_id="runtime-c")
execution_runtime_c = UiPathExecutionRuntime(
runtime_c, trace_manager, "runtime-c-span", execution_id="exec-c"
)
result_c = await execution_runtime_c.execute({"input": "c"})
# Verify results
assert result_a.status == UiPathRuntimeStatus.SUCCESSFUL
assert result_a.output == {"runtime": "A"}
assert result_b.status == UiPathRuntimeStatus.SUCCESSFUL
assert result_b.output == {"runtime": "B"}
assert result_c.status == UiPathRuntimeStatus.SUCCESSFUL
assert result_c.output == {"runtime": "C", "spans_created": 4}
# Verify spans for execution A
spans_a = trace_manager.get_execution_spans("exec-a")
assert len(spans_a) > 0
span_names_a = [s.name for s in spans_a]
assert "runtime-a-span" in span_names_a
# Verify spans for execution B
spans_b = trace_manager.get_execution_spans("exec-b")
assert len(spans_b) > 0
span_names_b = [s.name for s in spans_b]
assert "runtime-b-span" in span_names_b
# Verify spans for execution C (should include custom spans)
spans_c = trace_manager.get_execution_spans("exec-c")
assert len(spans_c) > 0
span_names_c = [s.name for s in spans_c]
# Verify root span exists
assert "runtime-c-span" in span_names_c
# Verify custom child and sibling spans exist
assert "custom-child-span" in span_names_c
assert "custom-sibling-span" in span_names_c
assert "parent-operation" in span_names_c
assert "nested-child-operation" in span_names_c
# Verify span hierarchy by checking parent relationships
root_span_c = next(s for s in spans_c if s.name == "runtime-c-span")
child_span = next(s for s in spans_c if s.name == "custom-child-span")
sibling_span = next(s for s in spans_c if s.name == "custom-sibling-span")
parent_op = next(s for s in spans_c if s.name == "parent-operation")
nested_op = next(s for s in spans_c if s.name == "nested-child-operation")
# Child and sibling should have root as parent
assert child_span.parent is not None
assert sibling_span.parent is not None
assert child_span.parent.span_id == root_span_c.context.span_id
assert sibling_span.parent.span_id == root_span_c.context.span_id
# Nested operation should have parent operation as parent
assert nested_op.parent is not None
assert parent_op.parent is not None
assert nested_op.parent.span_id == parent_op.context.span_id
assert parent_op.parent.span_id == root_span_c.context.span_id
# Verify spans are isolated by execution_id
for span in spans_a:
assert span.attributes is not None
assert span.attributes.get("execution.id") == "exec-a"
for span in spans_b:
assert span.attributes is not None
assert span.attributes.get("execution.id") == "exec-b"
for span in spans_c:
assert span.attributes is not None
assert span.attributes.get("execution.id") == "exec-c"
# Verify logs are captured
assert execution_runtime_a.log_handler
assert len(execution_runtime_a.log_handler.buffer) > 0
assert execution_runtime_a.log_handler.buffer[0].msg == "executing {'input': 'a'}"
assert execution_runtime_b.log_handler
assert len(execution_runtime_b.log_handler.buffer) > 0
assert execution_runtime_b.log_handler.buffer[0].msg == "executing {'input': 'b'}"
assert execution_runtime_c.log_handler
assert len(execution_runtime_c.log_handler.buffer) > 0
assert execution_runtime_c.log_handler.buffer[0].msg == "executing {'input': 'c'}"
master.teardown()