Skip to content

Commit ded2a2b

Browse files
jsonbaileyclaude
andcommitted
feat: Add judge evaluation support to agent graphs
Implement spec AIRUNNER 2.1.3 and GRAPH 1.3.1. The agent graph runner now captures per-node input/output pairs on AgentGraphRunnerResult.eval_requests without dispatching any judges itself. ManagedAgentGraph consumes those requests to fire judge evaluations as a single background asyncio Task surfaced on ManagedGraphResult.evaluations. - Add EvalRequest dataclass (node_key, input, output). - AgentGraphRunnerResult.eval_requests is populated for nodes whose AIAgentConfig has a judge_configuration with at least one judge. - ManagedGraphResult.evaluations is now always an asyncio Task; when no eval_requests exist it resolves immediately to an empty list. - LangGraph runner emits one EvalRequest per node activation that is not a functional-tool-loop step. Responses whose only tool calls are handoff tools still emit. Per-run isolation: the eval_requests list is built locally in run() and passed through make_node_fn so concurrent calls do not share state. - OpenAI runner extracts eval_requests from result.new_items, pairing each agent's final message with the prompt that triggered the activation (user input for the root, source agent's last message for downstream nodes via HandoffOutputItem). Re-implements PR #142 (merged then reverted) without the in-runner evaluator dispatch or the ContextVar-based task accumulator. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent a3ac7a1 commit ded2a2b

9 files changed

Lines changed: 1006 additions & 40 deletions

File tree

packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_agent_graph_runner.py

Lines changed: 97 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
"""LangGraph agent graph runner for LaunchDarkly AI SDK."""
22

33
import time
4-
from typing import Annotated, Any, Dict, List, Set, Tuple
4+
from typing import Annotated, Any, Dict, FrozenSet, List, Set, Tuple
55

66
from ldai import log
77
from ldai.agent_graph import AgentGraphDefinition, AgentGraphNode
88
from ldai.providers import AgentGraphRunner, ToolRegistry
9-
from ldai.providers.types import AgentGraphRunnerResult, AIGraphMetrics
9+
from ldai.providers.types import AgentGraphRunnerResult, AIGraphMetrics, EvalRequest
1010

1111
from ldai_langchain.langchain_helper import (
1212
build_structured_tools,
@@ -17,6 +17,62 @@
1717
from ldai_langchain.langgraph_callback_handler import LDMetricsCallbackHandler
1818

1919

20+
def _message_content_to_str(content: Any) -> str:
21+
"""Normalize a LangChain message ``content`` (string or list of parts) to a string."""
22+
if isinstance(content, str):
23+
return content
24+
if isinstance(content, list):
25+
parts: List[str] = []
26+
for item in content:
27+
if isinstance(item, str):
28+
parts.append(item)
29+
elif isinstance(item, dict):
30+
text = item.get('text')
31+
if isinstance(text, str):
32+
parts.append(text)
33+
return '\r\n'.join(parts)
34+
return str(content)
35+
36+
37+
def _maybe_record_eval_request(
38+
eval_requests: List[EvalRequest],
39+
node_key: str,
40+
msgs: List[Any],
41+
response: Any,
42+
handoff_tool_names: FrozenSet[str],
43+
) -> None:
44+
"""
45+
Append an :class:`EvalRequest` to ``eval_requests`` when ``response``
46+
represents the agent's final output for this activation.
47+
48+
Skips emission when the response only requests further tool calls (still
49+
working in a tool loop) or when there is no content to evaluate. Tool
50+
calls limited to handoff tools are treated as the agent terminating with
51+
a transfer, so the response is still emitted.
52+
"""
53+
tool_calls = getattr(response, 'tool_calls', None) or []
54+
if tool_calls:
55+
# If every tool call is a handoff, the agent is terminating with a
56+
# transfer; otherwise it is still working through a tool loop.
57+
for tc in tool_calls:
58+
name = tc.get('name') if isinstance(tc, dict) else getattr(tc, 'name', None)
59+
if name not in handoff_tool_names:
60+
return
61+
62+
response_content = getattr(response, 'content', response)
63+
output_text = _message_content_to_str(response_content)
64+
if not output_text or not output_text.strip():
65+
return
66+
67+
input_text = '\r\n'.join(
68+
_message_content_to_str(getattr(m, 'content', m)) for m in msgs
69+
) if msgs else ''
70+
71+
eval_requests.append(
72+
EvalRequest(node_key=node_key, input=input_text, output=output_text)
73+
)
74+
75+
2076
def _make_handoff_tool(child_key: str, description: str) -> Any:
2177
"""
2278
Create a tool that transfers control to ``child_key``.
@@ -81,19 +137,10 @@ def __init__(
81137
"""
82138
self._graph = graph
83139
self._tools = tools
84-
self._compiled: Any = None
85-
self._fn_name_to_config_key: Dict[str, str] = {}
86-
self._node_keys: Set[str] = set()
87-
88-
def _ensure_compiled(self) -> None:
89-
"""Build and cache the compiled graph if not already done."""
90-
if self._compiled is None:
91-
compiled, fn_name_to_config_key, node_keys = self._build_graph()
92-
self._compiled = compiled
93-
self._fn_name_to_config_key = fn_name_to_config_key
94-
self._node_keys = node_keys
95-
96-
def _build_graph(self) -> Tuple[Any, Dict[str, str], Set[str]]:
140+
141+
def _build_graph(
142+
self, eval_requests: List[EvalRequest]
143+
) -> Tuple[Any, Dict[str, str], Set[str]]:
97144
"""
98145
Build and compile the LangGraph StateGraph from the AgentGraphDefinition.
99146
@@ -169,20 +216,46 @@ def handle_traversal(node: AgentGraphNode, ctx: dict) -> None:
169216
else:
170217
model = lc_model
171218

172-
def make_node_fn(bound_model: Any, node_instructions: Any, nk: str):
219+
# Names of the handoff tools attached to this node. Tool calls
220+
# against these are control-flow signals, not the agent doing work,
221+
# so they must not block emission of an EvalRequest.
222+
handoff_tool_names: FrozenSet[str] = frozenset(
223+
getattr(t, 'name', '') for t in handoff_fns
224+
)
225+
226+
# Whether this node has at least one judge configured. Nodes without
227+
# judges contribute zero EvalRequest entries.
228+
jc = getattr(node_config, 'judge_configuration', None)
229+
node_has_judges = bool(jc is not None and getattr(jc, 'judges', None))
230+
231+
def make_node_fn(
232+
bound_model: Any,
233+
node_instructions: Any,
234+
nk: str,
235+
ht_names: FrozenSet[str],
236+
emit_eval: bool,
237+
):
173238
async def invoke(state: WorkflowState) -> dict:
174239
if not bound_model:
175240
return {'messages': []}
176241
msgs = list(state['messages'])
177242
if node_instructions:
178243
msgs = [SystemMessage(content=node_instructions)] + msgs
179244
response = await bound_model.ainvoke(msgs)
245+
246+
if emit_eval:
247+
_maybe_record_eval_request(
248+
eval_requests, nk, msgs, response, ht_names
249+
)
250+
180251
return {'messages': [response]}
181252

182253
invoke.__name__ = nk
183254
return invoke
184255

185-
invoke_fn = make_node_fn(model, instructions, node_key)
256+
invoke_fn = make_node_fn(
257+
model, instructions, node_key, handoff_tool_names, node_has_judges
258+
)
186259
agent_builder.add_node(node_key, invoke_fn)
187260

188261
if node_key == root_key:
@@ -287,14 +360,16 @@ async def run(self, input: str) -> AgentGraphRunnerResult:
287360
:return: AgentGraphRunnerResult with the final content and AIGraphMetrics
288361
"""
289362
start_ns = time.perf_counter_ns()
363+
# Per-run state — kept local so concurrent run() calls do not share it.
364+
eval_requests: List[EvalRequest] = []
290365

291366
try:
292367
from langchain_core.messages import HumanMessage
293368

294-
self._ensure_compiled()
295-
handler = LDMetricsCallbackHandler(self._node_keys, self._fn_name_to_config_key)
369+
compiled, fn_name_to_config_key, node_keys = self._build_graph(eval_requests)
370+
handler = LDMetricsCallbackHandler(node_keys, fn_name_to_config_key)
296371

297-
result = await self._compiled.ainvoke( # type: ignore[call-overload]
372+
result = await compiled.ainvoke( # type: ignore[call-overload]
298373
{'messages': [HumanMessage(content=input)]},
299374
config={'callbacks': [handler], 'recursion_limit': 25},
300375
)
@@ -316,6 +391,7 @@ async def run(self, input: str) -> AgentGraphRunnerResult:
316391
tokens=total_usage if (total_usage is not None and total_usage.total > 0) else None,
317392
node_metrics=node_metrics,
318393
),
394+
eval_requests=eval_requests if eval_requests else None,
319395
)
320396

321397
except Exception as exc:
@@ -334,4 +410,5 @@ async def run(self, input: str) -> AgentGraphRunnerResult:
334410
success=False,
335411
duration_ms=duration_ms,
336412
),
413+
eval_requests=eval_requests if eval_requests else None,
337414
)

0 commit comments

Comments
 (0)