-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathv_next.py
More file actions
103 lines (80 loc) · 3.43 KB
/
Copy pathv_next.py
File metadata and controls
103 lines (80 loc) · 3.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import abc
from contextvars import ContextVar
from typing import Generic, TypeVar, ContextManager
import logging
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider, SpanProcessor, SynchronousMultiSpanProcessor, Span
from pydantic import BaseModel
from typing_extensions import override
I = TypeVar("I")
O = TypeVar("O")
logger = logging.getLogger(__name__)
class UiPathRuntime(abc.ABC, Generic[I, O]):
@abc.abstractmethod
def execute(self, input: I) -> O:
"""
Execute the agent.
"""
raise NotImplementedError()
class MyAgentInput(BaseModel):
query: str
class MyAgentOutput(BaseModel):
response: str
class MyAgent(UiPathRuntime[MyAgentInput, MyAgentOutput]):
@override
def execute(self, input: MyAgentInput) -> MyAgentOutput:
print("Running agent with input:", input)
return MyAgentOutput(response="AgentExecuted!")
class MultiSpanProcessor(SynchronousMultiSpanProcessor):
def remove_span_processor(self, span_processor: SpanProcessor) -> None:
with self._lock:
temp_list = list(self._span_processors)
temp_list.remove(span_processor)
self._span_processors = tuple(temp_list)
uipath_multi_span_processor_context = ContextVar("uipath_multi_span_processor")
class SpanProcessorContext(ContextManager):
def __init__(self, span_processor: SpanProcessor):
self.span_processor = span_processor
def __enter__(self):
uipath_multi_span_processor:MultiSpanProcessor = uipath_multi_span_processor_context.get()
if uipath_multi_span_processor:
uipath_multi_span_processor.add_span_processor(self.span_processor)
else:
print("WARNING: CALL INIT!")
def __exit__(self, exc_type, exc_val, exc_tb):
uipath_multi_span_processor = uipath_multi_span_processor_context.get()
if uipath_multi_span_processor:
uipath_multi_span_processor.remove_span_processor(self.span_processor)
self.span_processor.shutdown()
def add_span_processor(span_processor: SpanProcessor) -> SpanProcessorContext:
return SpanProcessorContext(span_processor)
def init():
# setup tracer
tracer_provider = TracerProvider()
span_processor = MultiSpanProcessor()
tracer_provider.add_span_processor(span_processor)
trace.set_tracer_provider(tracer_provider)
uipath_multi_span_processor_context.set(span_processor)
if __name__ == '__main__':
# in uipath base command (applicable to everything)
# invoke uipath_init() which sets up tracers and what not.
init()
# in CLI-run
entrypoint = "v_next.py:MyAgent"
input_json = {"query": "query!"}
# Note: input needs to be converted into correct object type.
input_object = MyAgentInput(**input_json)
# agent = RuntimeFactory.load(entrypoint) # this initializes the agent using contructor.
agent = MyAgent()
class ConsoleLoggerProcessor(SpanProcessor):
def on_start(self, span: Span, parent_context=None):
print("Started span input:", span)
# Note: eval can add UiPathRuntimeExecutionSpanExporter for evaluation purposes.
with add_span_processor(ConsoleLoggerProcessor()) as span_exporter:
tracer = trace.get_tracer("uipath-runtime")
with tracer.start_as_current_span(
name="root",
attributes={},
):
result = agent.execute(input_object)
print("Agent result:",result)