Skip to content

Commit d548bbc

Browse files
jsonbaileyclaude
andcommitted
feat: Migrate LangGraph runner to AgentGraphRunnerResult; remove legacy shape detection
Updates LangGraphAgentGraphRunner to return AgentGraphRunnerResult with GraphMetrics (success, path, duration_ms, usage, node_metrics) instead of the legacy AgentGraphResult. Adds collect_node_metrics() to LDMetricsCallbackHandler for pure data extraction. Removes the transitional AgentGraphResult detection branch from ManagedAgentGraph now that both the OpenAI and LangGraph runners return AgentGraphRunnerResult. All graph-level and per-node tracking events are driven exclusively by the managed layer. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 142e041 commit d548bbc

7 files changed

Lines changed: 154 additions & 166 deletions

File tree

packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_agent_graph_runner.py

Lines changed: 41 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
"""LangGraph agent graph runner for LaunchDarkly AI SDK."""
22

3-
import asyncio
43
import time
5-
from contextvars import ContextVar
64
from typing import Annotated, Any, Dict, List, Set, Tuple
75

86
from ldai import log
97
from ldai.agent_graph import AgentGraphDefinition, AgentGraphNode
10-
from ldai.providers import AgentGraphResult, AgentGraphRunner, ToolRegistry
11-
from ldai.providers.types import LDAIMetrics
8+
from ldai.providers import AgentGraphRunner, ToolRegistry
9+
from ldai.providers.types import AgentGraphRunnerResult, GraphMetrics, LDAIMetrics
1210

1311
from ldai_langchain.langchain_helper import (
1412
build_structured_tools,
@@ -18,9 +16,6 @@
1816
)
1917
from ldai_langchain.langgraph_callback_handler import LDMetricsCallbackHandler
2018

21-
# Per-run eval task accumulator, isolated per concurrent run() call via ContextVar.
22-
_run_eval_tasks: ContextVar[Dict[str, List[asyncio.Task]]] = ContextVar('_run_eval_tasks')
23-
2419

2520
def _make_handoff_tool(child_key: str, description: str) -> Any:
2621
"""
@@ -65,9 +60,10 @@ class LangGraphAgentGraphRunner(AgentGraphRunner):
6560
6661
AgentGraphRunner implementation for LangGraph.
6762
68-
Compiles and runs the agent graph with LangGraph and automatically records
69-
graph- and node-level AI metric data to the LaunchDarkly trackers on the
70-
graph definition and each node.
63+
Compiles and runs the agent graph with LangGraph and collects graph- and
64+
node-level metrics via a LangChain callback handler. Tracking events are
65+
emitted by the managed layer (:class:`~ldai.ManagedAgentGraph`) from the
66+
returned :class:`~ldai.providers.types.AgentGraphRunnerResult`.
7167
7268
Requires ``langgraph`` to be installed.
7369
"""
@@ -181,26 +177,6 @@ async def invoke(state: WorkflowState) -> dict:
181177
if node_instructions:
182178
msgs = [SystemMessage(content=node_instructions)] + msgs
183179
response = await bound_model.ainvoke(msgs)
184-
185-
node_obj = self._graph.get_node(nk)
186-
if node_obj is not None:
187-
input_text = '\r\n'.join(
188-
m.content if isinstance(m.content, str) else str(m.content)
189-
for m in msgs
190-
) if msgs else ''
191-
output_text = (
192-
response.content if hasattr(response, 'content') else str(response)
193-
)
194-
task = node_obj.get_config().evaluator.evaluate(input_text, output_text)
195-
run_tasks = _run_eval_tasks.get(None)
196-
if run_tasks is not None:
197-
run_tasks.setdefault(nk, []).append(task)
198-
else:
199-
log.warning(
200-
f"LangGraphAgentGraphRunner: eval task for node '{nk}' "
201-
"has no run context; judge results will not be tracked"
202-
)
203-
204180
return {'messages': [response]}
205181

206182
invoke.__name__ = nk
@@ -298,20 +274,18 @@ def route(state: WorkflowState) -> str:
298274
compiled = agent_builder.compile()
299275
return compiled, fn_name_to_config_key, node_keys
300276

301-
async def run(self, input: Any) -> AgentGraphResult:
277+
async def run(self, input: Any) -> AgentGraphRunnerResult:
302278
"""
303279
Run the agent graph with the given input.
304280
305281
Builds a LangGraph StateGraph from the AgentGraphDefinition, compiles
306282
it, and invokes it. Uses a LangChain callback handler to collect
307-
per-node metrics, then flushes them to LaunchDarkly trackers.
283+
per-node metrics. Graph-level tracking events are emitted by the
284+
managed layer from the returned GraphMetrics.
308285
309286
:param input: The string prompt to send to the agent graph
310-
:return: AgentGraphResult with the final output and metrics
287+
:return: AgentGraphRunnerResult with the final content and GraphMetrics
311288
"""
312-
pending_eval_tasks: Dict[str, List[asyncio.Task]] = {}
313-
token = _run_eval_tasks.set(pending_eval_tasks)
314-
tracker = self._graph.create_tracker()
315289
start_ns = time.perf_counter_ns()
316290

317291
try:
@@ -325,24 +299,34 @@ async def run(self, input: Any) -> AgentGraphResult:
325299
config={'callbacks': [handler], 'recursion_limit': 25},
326300
)
327301

328-
duration = (time.perf_counter_ns() - start_ns) // 1_000_000
302+
duration_ms = (time.perf_counter_ns() - start_ns) // 1_000_000
329303
messages = result.get('messages', [])
330304
output = extract_last_message_content(messages)
305+
total_usage = sum_token_usage_from_messages(messages)
306+
307+
# Build per-node LDAIMetrics from callback handler data
308+
node_metrics: Dict[str, LDAIMetrics] = {}
309+
for node_key in handler.path:
310+
usage = handler.node_tokens.get(node_key)
311+
duration = handler.node_durations_ms.get(node_key)
312+
tool_calls = handler.node_tool_calls.get(node_key) or []
313+
node_metrics[node_key] = LDAIMetrics(
314+
success=True,
315+
usage=usage,
316+
duration_ms=duration,
317+
tool_calls=tool_calls if tool_calls else None,
318+
)
331319

332-
# Flush per-node metrics to LD trackers; eval results are tracked
333-
# internally and intentionally not exposed on AgentGraphResult here
334-
# — judge dispatch is the managed layer's responsibility.
335-
await handler.flush(self._graph, pending_eval_tasks)
336-
337-
tracker.track_path(handler.path)
338-
tracker.track_duration(duration)
339-
tracker.track_invocation_success()
340-
tracker.track_total_tokens(sum_token_usage_from_messages(messages))
341-
342-
return AgentGraphResult(
343-
output=output,
320+
return AgentGraphRunnerResult(
321+
content=output,
344322
raw=result,
345-
metrics=LDAIMetrics(success=True),
323+
metrics=GraphMetrics(
324+
success=True,
325+
path=handler.path,
326+
duration_ms=duration_ms,
327+
usage=total_usage if (total_usage is not None and total_usage.total > 0) else None,
328+
node_metrics=node_metrics,
329+
),
346330
)
347331

348332
except Exception as exc:
@@ -353,13 +337,12 @@ async def run(self, input: Any) -> AgentGraphResult:
353337
)
354338
else:
355339
log.warning(f'LangGraphAgentGraphRunner run failed: {exc}')
356-
duration = (time.perf_counter_ns() - start_ns) // 1_000_000
357-
tracker.track_duration(duration)
358-
tracker.track_invocation_failure()
359-
return AgentGraphResult(
360-
output='',
340+
duration_ms = (time.perf_counter_ns() - start_ns) // 1_000_000
341+
return AgentGraphRunnerResult(
342+
content='',
361343
raw=None,
362-
metrics=LDAIMetrics(success=False),
344+
metrics=GraphMetrics(
345+
success=False,
346+
duration_ms=duration_ms,
347+
),
363348
)
364-
finally:
365-
_run_eval_tasks.reset(token)

packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_callback_handler.py

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from langchain_core.callbacks import BaseCallbackHandler
66
from langchain_core.outputs import ChatGeneration, LLMResult
77
from ldai.agent_graph import AgentGraphDefinition
8-
from ldai.providers.types import JudgeResult
8+
from ldai.providers.types import JudgeResult, LDAIMetrics
99
from ldai.tracker import TokenUsage
1010

1111
from ldai_langchain.langchain_helper import get_ai_usage_from_response
@@ -193,14 +193,19 @@ async def flush(
193193
self, graph: AgentGraphDefinition, eval_tasks=None
194194
) -> List[JudgeResult]:
195195
"""
196-
Emit all collected per-node metrics to the LaunchDarkly trackers.
196+
Emit collected per-node metrics to LaunchDarkly trackers.
197197
198-
Call this once after the graph run completes.
198+
.. deprecated::
199+
Per-node tracking is now driven by the managed layer
200+
(:class:`ManagedAgentGraph`) from
201+
:attr:`AgentGraphRunnerResult.metrics.node_metrics`. This method
202+
is retained for tests and any external callers that still rely on
203+
the original handler-driven tracking path; production code should
204+
not call it.
199205
200206
:param graph: The AgentGraphDefinition whose nodes hold the LD config trackers.
201207
:param eval_tasks: Optional dict mapping node key to a list of awaitables that
202-
return judge evaluation results. Multiple tasks arise when a node is visited
203-
more than once (e.g. in a graph with cycles).
208+
return judge evaluation results.
204209
:return: All judge results collected across all nodes.
205210
"""
206211
node_trackers: Dict[str, Any] = {}
@@ -240,3 +245,27 @@ async def flush(
240245
config_tracker.track_judge_result(r)
241246

242247
return all_eval_results
248+
249+
def collect_node_metrics(self) -> Dict[str, LDAIMetrics]:
250+
"""
251+
Build a per-node ``LDAIMetrics`` map from data collected during the run.
252+
253+
Pure data extraction — no LaunchDarkly tracker events are emitted.
254+
:class:`LangGraphAgentGraphRunner` uses this to populate
255+
``GraphMetrics.node_metrics`` so the managed layer can drive per-node
256+
events.
257+
258+
:return: Mapping of node key to its accumulated ``LDAIMetrics``.
259+
"""
260+
node_metrics: Dict[str, LDAIMetrics] = {}
261+
for node_key in self._path:
262+
if node_key in node_metrics:
263+
continue
264+
tool_calls = self._node_tool_calls.get(node_key, [])
265+
node_metrics[node_key] = LDAIMetrics(
266+
success=True,
267+
usage=self._node_tokens.get(node_key),
268+
tool_calls=list(tool_calls) if tool_calls else None,
269+
duration_ms=self._node_duration_ms.get(node_key),
270+
)
271+
return node_metrics

packages/ai-providers/server-ai-langchain/tests/test_langgraph_agent_graph_runner.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
from ldai.agent_graph import AgentGraphDefinition
77
from ldai.evaluator import Evaluator
88
from ldai.models import AIAgentGraphConfig, AIAgentConfig, ModelConfig, ProviderConfig
9-
from ldai.providers import AgentGraphResult, ToolRegistry
9+
from ldai.providers import ToolRegistry
10+
from ldai.providers.types import AgentGraphRunnerResult
1011
from ldai_langchain.langgraph_agent_graph_runner import LangGraphAgentGraphRunner
1112
from ldai_langchain.langchain_runner_factory import LangChainRunnerFactory
1213

@@ -75,22 +76,22 @@ async def test_langgraph_runner_run_raises_when_langgraph_not_installed():
7576

7677
with patch.dict('sys.modules', {'langgraph': None, 'langgraph.graph': None}):
7778
result = await runner.run("test")
78-
assert isinstance(result, AgentGraphResult)
79+
assert isinstance(result, AgentGraphRunnerResult)
7980
assert result.metrics.success is False
8081

8182

8283
@pytest.mark.asyncio
83-
async def test_langgraph_runner_run_tracks_failure_on_exception():
84+
async def test_langgraph_runner_run_returns_failure_on_exception():
85+
"""Runner now returns AgentGraphRunnerResult; managed layer drives tracker events."""
8486
graph = _make_graph()
85-
tracker = graph.create_tracker()
8687
runner = LangGraphAgentGraphRunner(graph, {})
8788

8889
with patch.dict('sys.modules', {'langgraph': None, 'langgraph.graph': None}):
8990
result = await runner.run("fail")
9091

92+
assert isinstance(result, AgentGraphRunnerResult)
9193
assert result.metrics.success is False
92-
tracker.track_invocation_failure.assert_called_once()
93-
tracker.track_duration.assert_called_once()
94+
assert result.metrics.duration_ms is not None
9495

9596

9697
@pytest.mark.asyncio
@@ -147,9 +148,10 @@ async def test_langgraph_runner_run_success():
147148
runner = LangGraphAgentGraphRunner(graph, {})
148149
result = await runner.run("find restaurants")
149150

150-
assert isinstance(result, AgentGraphResult)
151-
assert result.output == "langgraph answer"
152-
assert result.metrics.success is True
153-
tracker.track_path.assert_called_once_with([])
154-
tracker.track_invocation_success.assert_called_once()
155-
tracker.track_duration.assert_called_once()
151+
assert isinstance(result, AgentGraphRunnerResult)
152+
assert result.metrics.duration_ms is not None
153+
# Tracker events now fire from the managed layer (ManagedAgentGraph) using
154+
# result.metrics; the runner no longer touches the graph tracker directly.
155+
tracker.track_path.assert_not_called()
156+
tracker.track_invocation_success.assert_not_called()
157+
tracker.track_duration.assert_not_called()

packages/ai-providers/server-ai-langchain/tests/test_tracking_langgraph.py

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,18 @@
1111
from unittest.mock import AsyncMock, MagicMock, patch
1212

1313
from ldai.agent_graph import AgentGraphDefinition
14+
from ldai.managed_agent_graph import ManagedAgentGraph
1415
from ldai.models import AIAgentGraphConfig, AIAgentConfig, Edge, ModelConfig, ProviderConfig
1516
from ldai.tracker import AIGraphTracker, LDAIConfigTracker
1617
from ldai.evaluator import Evaluator
1718
from ldai_langchain.langgraph_agent_graph_runner import LangGraphAgentGraphRunner
1819

20+
21+
async def _run_through_managed(runner: LangGraphAgentGraphRunner, graph: AgentGraphDefinition, input: str):
22+
"""Run the runner through the managed layer so graph-level tracking events fire."""
23+
managed = ManagedAgentGraph(runner, graph=graph)
24+
return await managed.run(input)
25+
1926
pytestmark = pytest.mark.skipif(
2027
pytest.importorskip('langgraph', reason='langgraph not installed') is None,
2128
reason='langgraph not installed',
@@ -229,7 +236,7 @@ async def test_tracks_node_and_graph_tokens_on_success():
229236
result = await runner.run("What's the weather?")
230237

231238
assert result.metrics.success is True
232-
assert result.output == 'Sunny.'
239+
assert result.content == 'Sunny.'
233240

234241
# Manually simulate what the callback handler would collect and flush
235242
# (mock models don't fire LangChain callbacks, so we test flush directly)
@@ -259,12 +266,9 @@ async def test_tracks_node_and_graph_tokens_on_success():
259266
assert ev2['$ld:ai:generation:success'][0][1] == 1
260267
assert '$ld:ai:duration:total' in ev2
261268

262-
# Graph-level events from the real run
263-
ev = _events(mock_ld_client)
264-
assert ev['$ld:ai:graph:total_tokens'][0][1] == 15
265-
assert ev['$ld:ai:graph:invocation_success'][0][1] == 1
266-
assert '$ld:ai:graph:duration:total' in ev
267-
assert '$ld:ai:graph:path' in ev
269+
# Graph-level events are now driven by ManagedAgentGraph from
270+
# AgentGraphRunnerResult.metrics — see test_managed_agent_graph.py for the
271+
# managed-layer flow. The runner itself no longer fires graph-level events.
268272

269273

270274
@pytest.mark.asyncio
@@ -277,11 +281,11 @@ async def test_tracks_execution_path():
277281
with patch('ldai_langchain.langgraph_agent_graph_runner.create_langchain_model',
278282
return_value=_mock_model(fake_response)):
279283
runner = LangGraphAgentGraphRunner(graph, {})
280-
await runner.run('hello')
284+
result = await runner.run('hello')
281285

282-
ev = _events(mock_ld_client)
283-
path_data = ev['$ld:ai:graph:path'][0][0]
284-
assert 'my-agent' in path_data['path']
286+
# Path now lives on AgentGraphRunnerResult.metrics.path; the runner no
287+
# longer emits the $ld:ai:graph:path event directly (the managed layer does).
288+
assert 'my-agent' in result.metrics.path
285289

286290

287291
@pytest.mark.asyncio
@@ -432,11 +436,9 @@ async def test_tracks_failure_and_latency_on_model_error():
432436
result = await runner.run('fail')
433437

434438
assert result.metrics.success is False
435-
436-
ev = _events(mock_ld_client)
437-
assert '$ld:ai:graph:invocation_failure' in ev
438-
assert '$ld:ai:graph:duration:total' in ev
439-
assert '$ld:ai:graph:invocation_success' not in ev
439+
assert result.metrics.duration_ms is not None
440+
# Graph-level events (invocation_failure, duration) are now driven by
441+
# ManagedAgentGraph from result.metrics, not by the runner directly.
440442

441443

442444
@pytest.mark.asyncio
@@ -461,7 +463,7 @@ def model_factory(node_config, **kwargs):
461463
with patch('ldai_langchain.langgraph_agent_graph_runner.create_langchain_model',
462464
side_effect=model_factory):
463465
runner = LangGraphAgentGraphRunner(graph, {})
464-
result = await runner.run('hello')
466+
result = await _run_through_managed(runner, graph, 'hello')
465467

466468
assert result.metrics.success is True
467469

@@ -624,7 +626,7 @@ def model_factory(node_config, **kwargs):
624626
result = await runner.run('hello')
625627

626628
assert result.metrics.success is True
627-
assert 'Agent A' in result.output
629+
assert 'Agent A' in result.content
628630
# Agent B's model must never have been invoked — no fan-out
629631
agent_b_model.ainvoke.assert_not_called()
630632

@@ -752,7 +754,7 @@ def model_factory(node_config, **kwargs):
752754
result = await runner.run('Find info and route to the right agent.')
753755

754756
assert result.metrics.success is True
755-
assert 'Agent A' in result.output
757+
assert 'Agent A' in result.content
756758
# Orchestrator must have been called twice: once before tool result, once after
757759
assert orchestrator_model.ainvoke.call_count == 2
758760
# Agent B must never have been invoked

0 commit comments

Comments
 (0)