Skip to content

Commit 63c97ce

Browse files
jsonbaileyclaude
andcommitted
feat: migrate ManagedAgentGraph to new Runner protocol, update AI providers
ManagedAgentGraph now expects AgentGraphRunnerResult (with GraphMetrics) from all runners and drives graph-level and per-node LD tracking from result.metrics. OpenAIAgentGraphRunner and LangGraphAgentGraphRunner are migrated to return AgentGraphRunnerResult; all direct tracker calls are removed from the runners. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 6dc30f0 commit 63c97ce

10 files changed

Lines changed: 387 additions & 264 deletions

File tree

packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_agent_graph_runner.py

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77

88
from ldai import log
99
from ldai.agent_graph import AgentGraphDefinition, AgentGraphNode
10-
from ldai.providers import AgentGraphResult, AgentGraphRunner, ToolRegistry
11-
from ldai.providers.types import LDAIMetrics
10+
from ldai.providers import AgentGraphRunner, ToolRegistry
11+
from ldai.providers.types import AgentGraphRunnerResult, GraphMetrics, LDAIMetrics
1212

1313
from ldai_langchain.langchain_helper import (
1414
build_structured_tools,
@@ -65,9 +65,10 @@ class LangGraphAgentGraphRunner(AgentGraphRunner):
6565
6666
AgentGraphRunner implementation for LangGraph.
6767
68-
Compiles and runs the agent graph with LangGraph and automatically records
69-
graph- and node-level AI metric data to the LaunchDarkly trackers on the
70-
graph definition and each node.
68+
Compiles and runs the agent graph with LangGraph and collects per-node AI
69+
metric data via a LangChain callback handler. All LaunchDarkly tracking is
70+
driven by the managed layer (``ManagedAgentGraph``) from the returned
71+
:class:`~ldai.providers.types.AgentGraphRunnerResult`.
7172
7273
Requires ``langgraph`` to be installed.
7374
"""
@@ -298,20 +299,21 @@ def route(state: WorkflowState) -> str:
298299
compiled = agent_builder.compile()
299300
return compiled, fn_name_to_config_key, node_keys
300301

301-
async def run(self, input: Any) -> AgentGraphResult:
302+
async def run(self, input: Any) -> AgentGraphRunnerResult:
302303
"""
303304
Run the agent graph with the given input.
304305
305306
Builds a LangGraph StateGraph from the AgentGraphDefinition, compiles
306307
it, and invokes it. Uses a LangChain callback handler to collect
307-
per-node metrics, then flushes them to LaunchDarkly trackers.
308+
per-node metrics, then returns them as an
309+
:class:`~ldai.providers.types.AgentGraphRunnerResult` with
310+
:class:`~ldai.providers.types.GraphMetrics`.
308311
309312
:param input: The string prompt to send to the agent graph
310-
:return: AgentGraphResult with the final output and metrics
313+
:return: AgentGraphRunnerResult with the final content and GraphMetrics
311314
"""
312315
pending_eval_tasks: Dict[str, List[asyncio.Task]] = {}
313316
token = _run_eval_tasks.set(pending_eval_tasks)
314-
tracker = self._graph.create_tracker()
315317
start_ns = time.perf_counter_ns()
316318

317319
try:
@@ -329,20 +331,26 @@ async def run(self, input: Any) -> AgentGraphResult:
329331
messages = result.get('messages', [])
330332
output = extract_last_message_content(messages)
331333

332-
# Flush per-node metrics to LD trackers; eval results are tracked
333-
# internally and intentionally not exposed on AgentGraphResult here
334-
# — judge dispatch is the managed layer's responsibility.
335-
await handler.flush(self._graph, pending_eval_tasks)
334+
# Await eval tasks collected during node traversal (judge results are
335+
# returned in node_metrics but not tracked here — the managed layer
336+
# is responsible for driving tracker calls).
337+
await handler.flush_eval_tasks(pending_eval_tasks)
336338

337-
tracker.track_path(handler.path)
338-
tracker.track_duration(duration)
339-
tracker.track_invocation_success()
340-
tracker.track_total_tokens(sum_token_usage_from_messages(messages))
339+
total_usage = sum_token_usage_from_messages(messages)
341340

342-
return AgentGraphResult(
343-
output=output,
341+
# Build per-node LDAIMetrics from the callback handler's collected data.
342+
node_metrics = self._build_node_metrics(handler)
343+
344+
return AgentGraphRunnerResult(
345+
content=output,
344346
raw=result,
345-
metrics=LDAIMetrics(success=True),
347+
metrics=GraphMetrics(
348+
success=True,
349+
path=handler.path,
350+
duration_ms=duration,
351+
usage=total_usage,
352+
node_metrics=node_metrics,
353+
),
346354
)
347355

348356
except Exception as exc:
@@ -354,12 +362,28 @@ async def run(self, input: Any) -> AgentGraphResult:
354362
else:
355363
log.warning(f'LangGraphAgentGraphRunner run failed: {exc}')
356364
duration = (time.perf_counter_ns() - start_ns) // 1_000_000
357-
tracker.track_duration(duration)
358-
tracker.track_invocation_failure()
359-
return AgentGraphResult(
360-
output='',
365+
return AgentGraphRunnerResult(
366+
content='',
361367
raw=None,
362-
metrics=LDAIMetrics(success=False),
368+
metrics=GraphMetrics(
369+
success=False,
370+
duration_ms=duration,
371+
),
363372
)
364373
finally:
365374
_run_eval_tasks.reset(token)
375+
376+
def _build_node_metrics(self, handler: LDMetricsCallbackHandler) -> Dict[str, LDAIMetrics]:
377+
"""Build per-node LDAIMetrics from callback handler collected data."""
378+
node_metrics: Dict[str, LDAIMetrics] = {}
379+
for node_key in handler.path:
380+
usage = handler.node_tokens.get(node_key)
381+
duration_ms = handler.node_durations_ms.get(node_key)
382+
tool_calls = handler.node_tool_calls.get(node_key) or []
383+
node_metrics[node_key] = LDAIMetrics(
384+
success=True,
385+
usage=usage,
386+
duration_ms=duration_ms,
387+
tool_calls=tool_calls if tool_calls else None,
388+
)
389+
return node_metrics

packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_callback_handler.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,35 @@ def on_tool_end(
189189
# Flush
190190
# ------------------------------------------------------------------
191191

192+
async def flush_eval_tasks(self, eval_tasks: Optional[Dict[str, List[Any]]] = None) -> None:
193+
"""
194+
Await all pending judge evaluation tasks collected during the graph run.
195+
196+
Tracker calls are driven by the managed layer (``ManagedAgentGraph``)
197+
from the returned ``GraphMetrics``. This method only ensures eval
198+
coroutines are awaited so they complete and do not leak.
199+
200+
:param eval_tasks: Dict mapping node key to a list of awaitables that
201+
return judge evaluation results.
202+
"""
203+
if not eval_tasks:
204+
return
205+
for node_key in self._path:
206+
for eval_task in eval_tasks.get(node_key, []):
207+
await eval_task
208+
192209
async def flush(
193210
self, graph: AgentGraphDefinition, eval_tasks=None
194211
) -> List[JudgeResult]:
195212
"""
196213
Emit all collected per-node metrics to the LaunchDarkly trackers.
197214
215+
.. deprecated::
216+
Use ``flush_eval_tasks()`` instead. Direct tracker calls from
217+
the runner are no longer supported; the managed layer
218+
(``ManagedAgentGraph``) drives all tracking from the returned
219+
``GraphMetrics``.
220+
198221
Call this once after the graph run completes.
199222
200223
:param graph: The AgentGraphDefinition whose nodes hold the LD config trackers.

packages/ai-providers/server-ai-langchain/tests/test_langgraph_agent_graph_runner.py

Lines changed: 34 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from ldai.agent_graph import AgentGraphDefinition
77
from ldai.evaluator import Evaluator
88
from ldai.models import AIAgentGraphConfig, AIAgentConfig, ModelConfig, ProviderConfig
9-
from ldai.providers import AgentGraphResult, ToolRegistry
9+
from ldai.providers import AgentGraphRunnerResult, ToolRegistry
1010
from ldai_langchain.langgraph_agent_graph_runner import LangGraphAgentGraphRunner
1111
from ldai_langchain.langchain_runner_factory import LangChainRunnerFactory
1212

@@ -75,28 +75,36 @@ async def test_langgraph_runner_run_raises_when_langgraph_not_installed():
7575

7676
with patch.dict('sys.modules', {'langgraph': None, 'langgraph.graph': None}):
7777
result = await runner.run("test")
78-
assert isinstance(result, AgentGraphResult)
78+
assert isinstance(result, AgentGraphRunnerResult)
7979
assert result.metrics.success is False
8080

8181

8282
@pytest.mark.asyncio
8383
async def test_langgraph_runner_run_tracks_failure_on_exception():
8484
graph = _make_graph()
85-
tracker = graph.create_tracker()
8685
runner = LangGraphAgentGraphRunner(graph, {})
8786

8887
with patch.dict('sys.modules', {'langgraph': None, 'langgraph.graph': None}):
8988
result = await runner.run("fail")
9089

9190
assert result.metrics.success is False
92-
tracker.track_invocation_failure.assert_called_once()
93-
tracker.track_duration.assert_called_once()
91+
assert result.metrics.duration_ms is not None
92+
93+
# Runner no longer calls graph-level tracker directly.
94+
graph_tracker = graph.create_tracker()
95+
graph_tracker.track_invocation_failure.assert_not_called()
9496

9597

9698
@pytest.mark.asyncio
9799
async def test_langgraph_runner_run_success():
100+
"""
101+
Tests successful run by injecting a pre-compiled graph mock.
102+
103+
Bypasses ``_build_graph()`` (which requires a real LangChain/LangGraph
104+
installation) by setting ``_compiled``, ``_node_keys``, and
105+
``_fn_name_to_config_key`` directly on the runner before calling ``run()``.
106+
"""
98107
graph = _make_graph()
99-
tracker = graph.create_tracker()
100108

101109
mock_message = MagicMock()
102110
mock_message.content = "langgraph answer"
@@ -106,50 +114,33 @@ async def test_langgraph_runner_run_success():
106114
mock_compiled = MagicMock()
107115
mock_compiled.ainvoke = AsyncMock(return_value={'messages': [mock_message]})
108116

109-
mock_state_graph_instance = MagicMock()
110-
mock_state_graph_instance.add_node = MagicMock()
111-
mock_state_graph_instance.add_edge = MagicMock()
112-
mock_state_graph_instance.compile = MagicMock(return_value=mock_compiled)
113-
114-
mock_langgraph_graph = MagicMock()
115-
mock_langgraph_graph.END = 'END'
116-
mock_langgraph_graph.START = 'START'
117-
mock_langgraph_graph.StateGraph = MagicMock(return_value=mock_state_graph_instance)
118-
119-
mock_human_message = MagicMock()
120117
mock_lc_core_messages = MagicMock()
118+
mock_human_message = MagicMock()
121119
mock_lc_core_messages.HumanMessage = MagicMock(return_value=mock_human_message)
122-
mock_lc_core_messages.AnyMessage = MagicMock()
123-
124-
mock_model_response = MagicMock()
125-
mock_model_response.content = 'langgraph answer'
126-
mock_model_response.usage_metadata = None
127-
mock_model_response.response_metadata = None
128-
mock_model_response.tool_calls = None
129120

130-
mock_llm = MagicMock()
131-
mock_llm.ainvoke = AsyncMock(return_value=mock_model_response)
132-
133-
mock_init_model = MagicMock()
134-
mock_init_model.return_value = mock_llm
135-
mock_langchain_chat = MagicMock()
136-
mock_langchain_chat.init_chat_model = mock_init_model
121+
runner = LangGraphAgentGraphRunner(graph, {})
122+
# Inject pre-compiled state to avoid needing a real LangGraph/LangChain install.
123+
runner._compiled = mock_compiled
124+
runner._node_keys = {'root-agent'}
125+
runner._fn_name_to_config_key = {}
137126

138127
with patch.dict('sys.modules', {
139-
'langgraph': MagicMock(),
140-
'langgraph.graph': mock_langgraph_graph,
141-
'langchain_core': MagicMock(),
142128
'langchain_core.messages': mock_lc_core_messages,
143-
'langchain': MagicMock(),
144-
'langchain.chat_models': mock_langchain_chat,
145-
'typing_extensions': __import__('typing_extensions'),
146129
}):
147-
runner = LangGraphAgentGraphRunner(graph, {})
148130
result = await runner.run("find restaurants")
149131

150-
assert isinstance(result, AgentGraphResult)
151-
assert result.output == "langgraph answer"
132+
assert isinstance(result, AgentGraphRunnerResult)
133+
assert result.content == "langgraph answer"
152134
assert result.metrics.success is True
153-
tracker.track_path.assert_called_once_with([])
154-
tracker.track_invocation_success.assert_called_once()
155-
tracker.track_duration.assert_called_once()
135+
assert result.metrics.path == []
136+
assert result.metrics.duration_ms is not None
137+
138+
# Runner no longer calls graph-level tracker directly.
139+
graph_tracker = graph.create_tracker()
140+
graph_tracker.track_path.assert_not_called()
141+
graph_tracker.track_invocation_success.assert_not_called()
142+
graph_tracker.track_duration.assert_not_called()
143+
144+
# Runner no longer calls node-level trackers directly.
145+
node_factory = graph.get_node('root-agent').get_config().create_tracker
146+
node_factory.assert_not_called()

packages/ai-providers/server-ai-langchain/tests/test_tracking_langgraph.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from unittest.mock import AsyncMock, MagicMock, patch
1212

1313
from ldai.agent_graph import AgentGraphDefinition
14+
from ldai.managed_agent_graph import ManagedAgentGraph
1415
from ldai.models import AIAgentGraphConfig, AIAgentConfig, Edge, ModelConfig, ProviderConfig
1516
from ldai.tracker import AIGraphTracker, LDAIConfigTracker
1617
from ldai.evaluator import Evaluator
@@ -226,16 +227,16 @@ async def test_tracks_node_and_graph_tokens_on_success():
226227
with patch('ldai_langchain.langgraph_agent_graph_runner.create_langchain_model',
227228
return_value=_mock_model(fake_response)):
228229
runner = LangGraphAgentGraphRunner(graph, {})
229-
result = await runner.run("What's the weather?")
230+
managed = ManagedAgentGraph(runner, graph=graph)
231+
result = await managed.run("What's the weather?")
230232

231233
assert result.metrics.success is True
232-
assert result.output == 'Sunny.'
234+
assert result.content == 'Sunny.'
233235

234236
# Manually simulate what the callback handler would collect and flush
235237
# (mock models don't fire LangChain callbacks, so we test flush directly)
236238
mock_ld_client2 = MagicMock()
237239
graph2 = _make_graph(mock_ld_client2)
238-
tracker2 = graph2.create_tracker()
239240

240241
handler = LDMetricsCallbackHandler({'root-agent'}, {})
241242
node_run_id = uuid4()
@@ -259,7 +260,7 @@ async def test_tracks_node_and_graph_tokens_on_success():
259260
assert ev2['$ld:ai:generation:success'][0][1] == 1
260261
assert '$ld:ai:duration:total' in ev2
261262

262-
# Graph-level events from the real run
263+
# Graph-level events from the real run (fired by ManagedAgentGraph)
263264
ev = _events(mock_ld_client)
264265
assert ev['$ld:ai:graph:total_tokens'][0][1] == 15
265266
assert ev['$ld:ai:graph:invocation_success'][0][1] == 1
@@ -277,7 +278,8 @@ async def test_tracks_execution_path():
277278
with patch('ldai_langchain.langgraph_agent_graph_runner.create_langchain_model',
278279
return_value=_mock_model(fake_response)):
279280
runner = LangGraphAgentGraphRunner(graph, {})
280-
await runner.run('hello')
281+
managed = ManagedAgentGraph(runner, graph=graph)
282+
await managed.run('hello')
281283

282284
ev = _events(mock_ld_client)
283285
path_data = ev['$ld:ai:graph:path'][0][0]
@@ -429,7 +431,8 @@ async def test_tracks_failure_and_latency_on_model_error():
429431
with patch('ldai_langchain.langgraph_agent_graph_runner.create_langchain_model',
430432
return_value=error_model):
431433
runner = LangGraphAgentGraphRunner(graph, {})
432-
result = await runner.run('fail')
434+
managed = ManagedAgentGraph(runner, graph=graph)
435+
result = await managed.run('fail')
433436

434437
assert result.metrics.success is False
435438

@@ -461,14 +464,14 @@ def model_factory(node_config, **kwargs):
461464
with patch('ldai_langchain.langgraph_agent_graph_runner.create_langchain_model',
462465
side_effect=model_factory):
463466
runner = LangGraphAgentGraphRunner(graph, {})
464-
result = await runner.run('hello')
467+
managed = ManagedAgentGraph(runner, graph=graph)
468+
result = await managed.run('hello')
465469

466470
assert result.metrics.success is True
467471

468472
# Simulate per-node token events via callback handler (mock models don't fire callbacks)
469473
mock_ld_client2 = MagicMock()
470474
graph2 = _make_two_node_graph(mock_ld_client2)
471-
tracker2 = graph2.create_tracker()
472475

473476
handler = LDMetricsCallbackHandler({'root-agent', 'child-agent'}, {})
474477

@@ -624,7 +627,7 @@ def model_factory(node_config, **kwargs):
624627
result = await runner.run('hello')
625628

626629
assert result.metrics.success is True
627-
assert 'Agent A' in result.output
630+
assert 'Agent A' in result.content
628631
# Agent B's model must never have been invoked — no fan-out
629632
agent_b_model.ainvoke.assert_not_called()
630633

@@ -752,7 +755,7 @@ def model_factory(node_config, **kwargs):
752755
result = await runner.run('Find info and route to the right agent.')
753756

754757
assert result.metrics.success is True
755-
assert 'Agent A' in result.output
758+
assert 'Agent A' in result.content
756759
# Orchestrator must have been called twice: once before tool result, once after
757760
assert orchestrator_model.ainvoke.call_count == 2
758761
# Agent B must never have been invoked

0 commit comments

Comments
 (0)