Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,178 changes: 1,178 additions & 0 deletions packages/sdk/server-ai/poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion packages/sdk/server-ai/src/ldai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ async def create_agent_graph(
if not runner:
return None

return ManagedAgentGraph(runner)
return ManagedAgentGraph(graph, runner)

def agents(
self,
Expand Down
80 changes: 60 additions & 20 deletions packages/sdk/server-ai/src/ldai/managed_agent_graph.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,101 @@
"""ManagedAgentGraph — LaunchDarkly managed wrapper for agent graph execution."""

from typing import Any
from typing import Any, Dict

from ldai.providers import AgentGraphResult, AgentGraphRunner
from ldai.providers.types import GraphMetricSummary, ManagedGraphResult
from ldai.agent_graph import AgentGraphDefinition
from ldai.providers import AgentGraphRunner
from ldai.providers.types import (
AgentGraphRunnerResult,
LDAIMetrics,
ManagedGraphResult,
)
from ldai.tracker import LDAIMetricSummary


class ManagedAgentGraph:
"""
LaunchDarkly managed wrapper for AI agent graph execution.

Holds an AgentGraphRunner. Wraps the runner result in a
:class:`~ldai.providers.types.ManagedGraphResult` and builds a
:class:`~ldai.providers.types.GraphMetricSummary` from the runner's metrics.
Holds an AgentGraphRunner and an AgentGraphDefinition. Delegates execution
to the runner, then drives all graph-level and per-node tracking from the
returned :class:`~ldai.providers.types.AgentGraphRunnerResult`.

Obtain an instance via ``LDAIClient.create_agent_graph()``.
"""

def __init__(
self,
graph: AgentGraphDefinition,
runner: AgentGraphRunner,
):
"""
Initialize ManagedAgentGraph.

:param graph: The AgentGraphDefinition used to drive graph-level and
per-node tracking from the runner result metrics.
:param runner: The AgentGraphRunner to delegate execution to
"""
self._graph = graph
self._runner = runner

async def run(self, input: Any) -> ManagedGraphResult:
"""
Run the agent graph with the given input.

Delegates to the underlying AgentGraphRunner, then drives all
LaunchDarkly tracking from ``result.metrics``.

:param input: The input prompt or structured input for the graph
:return: ManagedGraphResult containing the content, metric summary, raw response,
and an optional evaluations task (currently always ``None`` for graphs —
per-graph evaluations will be added in a future PR).
:return: ManagedGraphResult containing the content, metric summary,
and raw response.
"""
result: AgentGraphResult = await self._runner.run(input)

# Build a GraphMetricSummary from the runner result's LDAIMetrics.
# path and node_metrics will be populated once graph runners are migrated
# to return AgentGraphRunnerResult with GraphMetrics (PR 11).
metrics = result.metrics
summary = GraphMetricSummary(
success=metrics.success,
usage=metrics.usage,
duration_ms=getattr(metrics, 'duration_ms', None),
graph_tracker = self._graph.create_tracker()
result = await graph_tracker.track_graph_metrics_of_async(
lambda r: r.metrics,
lambda: self._runner.run(input),
)

summary = graph_tracker.get_summary()
summary.node_metrics = self._track_node_metrics(result.metrics.node_metrics)

return ManagedGraphResult(
content=result.output,
content=result.content,
metrics=summary,
raw=result.raw,
evaluations=None,
)

def _track_node_metrics(
self, node_metrics: Dict[str, LDAIMetrics]
) -> Dict[str, LDAIMetricSummary]:
"""
Drive per-node LaunchDarkly tracking events and collect node metric summaries.

For each node key present in ``node_metrics``, obtains the node's
config tracker via the graph definition, fires tracking events, and
returns a map of node key to the tracker's metric summary.
"""
node_summaries: Dict[str, LDAIMetricSummary] = {}
for node_key, node_ldai_metrics in node_metrics.items():
node = self._graph.get_node(node_key)
if node is None:
continue
node_tracker = node.get_config().create_tracker()

if node_ldai_metrics.usage is not None:
node_tracker.track_tokens(node_ldai_metrics.usage)
if node_ldai_metrics.duration_ms is not None:
node_tracker.track_duration(node_ldai_metrics.duration_ms)
if node_ldai_metrics.tool_calls:
node_tracker.track_tool_calls(node_ldai_metrics.tool_calls)
if node_ldai_metrics.success:
node_tracker.track_success()
else:
node_tracker.track_error()

node_summaries[node_key] = node_tracker.get_summary()
return node_summaries

def get_agent_graph_runner(self) -> AgentGraphRunner:
"""
Return the underlying AgentGraphRunner for advanced use.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any, Protocol, runtime_checkable

from ldai.providers.types import AgentGraphResult
from ldai.providers.types import AgentGraphRunnerResult


@runtime_checkable
Expand All @@ -18,11 +18,11 @@ class AgentGraphRunner(Protocol):
the caller just passes input.
"""

async def run(self, input: Any) -> AgentGraphResult:
async def run(self, input: Any) -> AgentGraphRunnerResult:
"""
Run the agent graph with the given input.

:param input: The input to the agent graph (string prompt or structured input)
:return: AgentGraphResult containing the output, raw response, and metrics
:return: AgentGraphRunnerResult containing the content, raw response, and GraphMetrics
"""
...
8 changes: 4 additions & 4 deletions packages/sdk/server-ai/src/ldai/providers/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ class GraphMetrics:
class GraphMetricSummary:
"""Contains a summary of metrics for an agent graph run."""

success: bool
"""Whether the graph run succeeded."""
success: Optional[bool] = None
"""Whether the graph run succeeded. Absent if invocation status has not been tracked."""

path: List[str] = field(default_factory=list)
"""Ordered list of node keys visited during the run."""
Expand All @@ -122,8 +122,8 @@ class GraphMetricSummary:
usage: Optional[TokenUsage] = None
"""Optional aggregate token usage information across all nodes in the graph run."""

node_metrics: Dict[str, LDAIMetrics] = field(default_factory=dict)
"""Per-node metrics keyed by node key."""
node_metrics: Dict[str, LDAIMetricSummary] = field(default_factory=dict)
"""Per-node metric summaries keyed by node key."""

resumption_token: Optional[str] = None
"""Optional resumption token from the graph tracker for cross-process resumption."""
Expand Down
132 changes: 130 additions & 2 deletions packages/sdk/server-ai/src/ldai/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from ldai import log

if TYPE_CHECKING:
from ldai.providers.types import LDAIMetrics
from ldai.providers.types import GraphMetrics, GraphMetricSummary, LDAIMetrics


class FeedbackKind(Enum):
Expand Down Expand Up @@ -616,7 +616,11 @@ def _openai_to_token_usage(data: dict) -> TokenUsage:

class AIGraphTracker:
"""
Tracks graph-level, node-level, and edge-level metrics for AI agent graph operations.
Tracks graph-level metrics for AI agent graph operations.

Maintains an internal :class:`~ldai.providers.types.GraphMetricSummary`
that is updated as tracking methods are called. Retrieve it via
:meth:`get_summary`.
"""

def __init__(
Expand All @@ -642,11 +646,22 @@ def __init__(
self._version = version
self._context = context

from ldai.providers.types import GraphMetricSummary
self._summary = GraphMetricSummary()

@property
def graph_key(self) -> str:
"""Graph configuration key used in tracking payloads."""
return self._graph_key

def get_summary(self) -> GraphMetricSummary:
"""
Get the current summary of graph-level metrics.

:return: Summary of graph metrics tracked so far.
"""
return self._summary

def __get_track_data(self):
"""
Get tracking data for events.
Expand All @@ -664,6 +679,12 @@ def track_invocation_success(self) -> None:
"""
Track a successful graph invocation.
"""
if self._summary.success is not None:
log.warning(
"Invocation status has already been tracked for this graph execution. %s",
self.__get_track_data())
return
self._summary.success = True
self._ld_client.track(
"$ld:ai:graph:invocation_success",
self._context,
Expand All @@ -675,6 +696,12 @@ def track_invocation_failure(self) -> None:
"""
Track an unsuccessful graph invocation.
"""
if self._summary.success is not None:
log.warning(
"Invocation status has already been tracked for this graph execution. %s",
self.__get_track_data())
return
self._summary.success = False
self._ld_client.track(
"$ld:ai:graph:invocation_failure",
self._context,
Expand All @@ -688,6 +715,10 @@ def track_duration(self, duration: int) -> None:

:param duration: Duration in milliseconds.
"""
if self._summary.duration_ms is not None:
log.warning("Duration has already been tracked for this graph execution. %s", self.__get_track_data())
return
self._summary.duration_ms = duration
self._ld_client.track(
"$ld:ai:graph:duration:total",
self._context,
Expand All @@ -703,6 +734,10 @@ def track_total_tokens(self, tokens: Optional[TokenUsage] = None) -> None:
"""
if tokens is None or tokens.total <= 0:
return
if self._summary.usage is not None:
log.warning("Token usage has already been tracked for this graph execution. %s", self.__get_track_data())
return
self._summary.usage = tokens
self._ld_client.track(
"$ld:ai:graph:total_tokens",
self._context,
Expand All @@ -716,6 +751,10 @@ def track_path(self, path: List[str]) -> None:

:param path: An array of configuration keys representing the sequence of nodes executed during graph traversal.
"""
if self._summary.path:
log.warning("Path has already been tracked for this graph execution. %s", self.__get_track_data())
return
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
self._summary.path = list(path)
track_data = {**self.__get_track_data(), "path": path}
self._ld_client.track(
"$ld:ai:graph:path",
Expand Down Expand Up @@ -780,3 +819,92 @@ def track_handoff_failure(self, source_key: str, target_key: str) -> None:
track_data,
1,
)

def _track_from_graph_metrics(
self,
result: Any,
metrics_extractor: Callable[[Any], Optional[GraphMetrics]],
elapsed_ms: int,
) -> None:
metrics: Optional[GraphMetrics] = None
try:
metrics = metrics_extractor(result)
except Exception as exc:
log.warning("Failed to extract graph metrics: %s", exc)

if metrics is None:
self.track_duration(elapsed_ms)
return

self.track_duration(metrics.duration_ms if metrics.duration_ms is not None else elapsed_ms)
if metrics.success:
self.track_invocation_success()
else:
self.track_invocation_failure()
if metrics.path:
self.track_path(metrics.path)
if metrics.usage is not None:
self.track_total_tokens(metrics.usage)

def track_graph_metrics_of(
self,
metrics_extractor: Callable[[Any], Optional[GraphMetrics]],
func: Callable[[], Any],
) -> Any:
"""
Track graph-level metrics for a synchronous graph operation.

Times the operation, extracts :class:`~ldai.providers.types.GraphMetrics`
via the provided extractor, and fires graph-level tracking events
(path, duration, success/failure, total tokens).

If the extracted ``GraphMetrics`` has a non-``None`` ``duration_ms``,
that value is used instead of the wall-clock elapsed time.

Node-level metrics are not tracked by this method.

For async operations, use :meth:`track_graph_metrics_of_async`.

:param metrics_extractor: Function that extracts GraphMetrics from the result
:param func: Synchronous callable that runs the graph operation
:return: The result of the operation
"""
start_ns = time.perf_counter_ns()
try:
result = func()
except Exception as err:
duration = (time.perf_counter_ns() - start_ns) // 1_000_000
self.track_duration(duration)
self.track_invocation_failure()
raise err

elapsed_ms = (time.perf_counter_ns() - start_ns) // 1_000_000
self._track_from_graph_metrics(result, metrics_extractor, elapsed_ms)
return result

async def track_graph_metrics_of_async(
self,
metrics_extractor: Callable[[Any], Optional[GraphMetrics]],
func: Callable[[], Any],
) -> Any:
"""
Track graph-level metrics for an async graph operation (``func`` is awaited).

Same event semantics as :meth:`track_graph_metrics_of`.

:param metrics_extractor: Function that extracts GraphMetrics from the result
:param func: Async callable that runs the graph operation
:return: The result of the operation
"""
start_ns = time.perf_counter_ns()
try:
result = await func()
except Exception as err:
duration = (time.perf_counter_ns() - start_ns) // 1_000_000
self.track_duration(duration)
self.track_invocation_failure()
raise err

elapsed_ms = (time.perf_counter_ns() - start_ns) // 1_000_000
self._track_from_graph_metrics(result, metrics_extractor, elapsed_ms)
return result
Loading
Loading