-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathmanaged_agent_graph.py
More file actions
105 lines (86 loc) · 3.64 KB
/
managed_agent_graph.py
File metadata and controls
105 lines (86 loc) · 3.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""ManagedAgentGraph — LaunchDarkly managed wrapper for agent graph execution."""
from typing import Dict
from ldai.agent_graph import AgentGraphDefinition
from ldai.providers import AgentGraphRunner
from ldai.providers.types import (
AgentGraphRunnerResult,
LDAIMetrics,
ManagedGraphResult,
)
from ldai.tracker import LDAIMetricSummary
class ManagedAgentGraph:
"""
LaunchDarkly managed wrapper for AI agent graph execution.
Holds an AgentGraphRunner and an AgentGraphDefinition. Delegates execution
to the runner, then drives all graph-level and per-node tracking from the
returned :class:`~ldai.providers.types.AgentGraphRunnerResult`.
Obtain an instance via ``LDAIClient.create_agent_graph()``.
"""
def __init__(
self,
graph: AgentGraphDefinition,
runner: AgentGraphRunner,
):
"""
Initialize ManagedAgentGraph.
:param graph: The AgentGraphDefinition used to drive graph-level and
per-node tracking from the runner result metrics.
:param runner: The AgentGraphRunner to delegate execution to
"""
self._graph = graph
self._runner = runner
async def run(self, input: str) -> ManagedGraphResult:
"""
Run the agent graph with the given input.
Delegates to the underlying AgentGraphRunner, then drives all
LaunchDarkly tracking from ``result.metrics``.
:param input: The user input prompt to process through the graph.
:return: ManagedGraphResult containing the content, metric summary,
and raw response.
"""
graph_tracker = self._graph.create_tracker()
result = await graph_tracker.track_graph_metrics_of_async(
lambda r: r.metrics,
lambda: self._runner.run(input),
)
summary = graph_tracker.get_summary()
summary.node_metrics = self._track_node_metrics(result.metrics.node_metrics)
return ManagedGraphResult(
content=result.content,
metrics=summary,
raw=result.raw,
evaluations=None,
)
def _track_node_metrics(
self, node_metrics: Dict[str, LDAIMetrics]
) -> Dict[str, LDAIMetricSummary]:
"""
Drive per-node LaunchDarkly tracking events and collect node metric summaries.
For each node key present in ``node_metrics``, obtains the node's
config tracker via the graph definition, fires tracking events, and
returns a map of node key to the tracker's metric summary.
"""
node_summaries: Dict[str, LDAIMetricSummary] = {}
for node_key, node_ldai_metrics in node_metrics.items():
node = self._graph.get_node(node_key)
if node is None:
continue
node_tracker = node.get_config().create_tracker()
if node_ldai_metrics.tokens is not None:
node_tracker.track_tokens(node_ldai_metrics.tokens)
if node_ldai_metrics.duration_ms is not None:
node_tracker.track_duration(node_ldai_metrics.duration_ms)
if node_ldai_metrics.tool_calls:
node_tracker.track_tool_calls(node_ldai_metrics.tool_calls)
if node_ldai_metrics.success:
node_tracker.track_success()
else:
node_tracker.track_error()
node_summaries[node_key] = node_tracker.get_summary()
return node_summaries
def get_agent_graph_runner(self) -> AgentGraphRunner:
"""
Return the underlying AgentGraphRunner for advanced use.
:return: The AgentGraphRunner instance
"""
return self._runner