Skip to content
Empty file.
112 changes: 112 additions & 0 deletions src/agentex/lib/core/observability/llm_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""OTel metrics for LLM calls.

Single source of truth for LLM-call instrumentation across all agentex code
paths — temporal+openai_agents streaming today, sync ACP and the Claude SDK
plugin in future PRs. Centralizing the instrument definitions here means
those follow-ups don't need to redefine the metric names, units, or
description strings; they import ``get_llm_metrics()`` and record values.

The meter is no-op when the application hasn't configured a ``MeterProvider``,
so importing this module is safe for runtimes that don't use OTel. Instruments
are created lazily on first ``get_llm_metrics()`` call so a ``MeterProvider``
configured *after* this module is imported still binds correctly.

Cardinality is bounded:
- All metrics carry only ``model`` (the LLM model name).
- ``requests`` additionally carries ``status``, drawn from a small fixed set
(see ``classify_status``).

Resource attributes (``service.name``, ``k8s.*``, etc.) come from the
application's OTel resource configuration and are added to every series
automatically.
"""

from __future__ import annotations

from typing import Optional

from opentelemetry import metrics


class LLMMetrics:
"""Lazily-created OTel instruments for LLM call telemetry."""

def __init__(self) -> None:
meter = metrics.get_meter("agentex.llm")
self.requests = meter.create_counter(
name="agentex.llm.requests",
unit="1",
description=(
"LLM call count tagged with status (success / rate_limit / "
"server_error / client_error / timeout / network_error / "
"other_error). Use to alert on 429s, 5xxs, etc."
),
)
self.ttft_ms = meter.create_histogram(
name="agentex.llm.ttft",
unit="ms",
description="Time from request submission to first content token (ms)",
)
# Note: TPS denominator is the model-generation window
# (last_token_time - first_token_time), not total stream wall time.
# This isolates raw model throughput from event-loop / tool-call latency.
self.tps = meter.create_histogram(
name="agentex.llm.tps",
unit="tokens/s",
description="Output tokens per second over the generation window",
)
self.input_tokens = meter.create_counter(
name="agentex.llm.input_tokens",
unit="tokens",
description="Total input tokens sent to the LLM",
)
self.output_tokens = meter.create_counter(
name="agentex.llm.output_tokens",
unit="tokens",
description="Total output tokens returned by the LLM",
)
self.cached_input_tokens = meter.create_counter(
name="agentex.llm.cached_input_tokens",
unit="tokens",
description="Subset of input tokens served from prompt cache",
)
self.reasoning_tokens = meter.create_counter(
name="agentex.llm.reasoning_tokens",
unit="tokens",
description="Output tokens spent on reasoning (subset of output_tokens)",
)


_llm_metrics: Optional[LLMMetrics] = None


def get_llm_metrics() -> LLMMetrics:
"""Return the LLM metrics singleton, creating it on first use."""
global _llm_metrics
if _llm_metrics is None:
_llm_metrics = LLMMetrics()
return _llm_metrics


def classify_status(exc: Optional[BaseException]) -> str:
"""Categorize an LLM call's outcome into a small fixed set of status labels.

A successful call returns ``"success"``. Exceptions are mapped by type name
so we don't depend on a specific provider SDK's exception class hierarchy:
OpenAI, Anthropic, and other providers all use names like ``RateLimitError``,
``APITimeoutError``, ``InternalServerError``, etc.
"""
if exc is None:
return "success"
name = type(exc).__name__
if "RateLimit" in name:
return "rate_limit"
if "Timeout" in name:
return "timeout"
if any(s in name for s in ("ServerError", "InternalServer", "ServiceUnavailable", "BadGateway")):
return "server_error"
if "Connection" in name:
return "network_error"
if any(s in name for s in ("BadRequest", "Authentication", "Permission", "NotFound", "Conflict", "UnprocessableEntity")):
return "client_error"
return "other_error"
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Custom Temporal Model Provider with streaming support for OpenAI agents."""
from __future__ import annotations

import time
import uuid
from typing import Any, List, Union, Optional, override

Expand Down Expand Up @@ -31,6 +32,7 @@
# Re-export the canonical StreamingMode literal from the streaming service so
# all layers share a single definition.
from agentex.lib.core.services.adk.streaming import StreamingMode as StreamingMode
from agentex.lib.core.observability.llm_metrics import classify_status, get_llm_metrics

try:
from agents.tool import ShellTool # type: ignore[attr-defined]
Expand Down Expand Up @@ -78,6 +80,11 @@
logger = make_logger("agentex.temporal.streaming")


# LLM metrics live in agentex.lib.core.observability.llm_metrics so other
# code paths (sync ACP, Claude SDK plugin, future provider integrations)
# can share the same instrument definitions without redefining names.


def _serialize_item(item: Any) -> dict[str, Any]:
"""
Universal serializer for any item type from OpenAI Agents SDK.
Expand Down Expand Up @@ -592,7 +599,11 @@ async def get_response(
# endpoints recognize this parameter, so we don't auto-inject a default.
prompt_cache_key = extra_args.pop("prompt_cache_key", NOT_GIVEN)

# Create the response stream using Responses API
# Create the response stream using Responses API.
# Bookmark request start *before* the await so ttft captures the full
# user-perceived latency (HTTP round-trip + model TTFB), not just the
# post-connect event-loop delay.
stream_start_perf = time.perf_counter()
logger.debug(f"[TemporalStreamingModel] Creating response stream with Responses API")
stream = await self.client.responses.create( # type: ignore[call-overload]

Expand Down Expand Up @@ -642,6 +653,12 @@ async def get_response(
reasoning_summaries = []
reasoning_contents = []
event_count = 0
# ttft / tps instrumentation. ``stream_start_perf`` is set above,
# before the responses.create() await, so it captures the full
# request-to-first-token latency. ``first_token_at`` and
# ``last_token_at`` bracket the model-generation window for tps.
first_token_at: Optional[float] = None
Comment thread
greptile-apps[bot] marked this conversation as resolved.
last_token_at: Optional[float] = None

# We expect task_id to always be provided for streaming
if not task_id:
Expand All @@ -656,6 +673,20 @@ async def get_response(
# Log event type
logger.debug(f"[TemporalStreamingModel] Event {event_count}: {type(event).__name__}")

# Bookmark first/last token-producing events for ttft and tps.
# Includes function-call argument deltas so the generation window
# covers every event type whose tokens land in usage.output_tokens.
if isinstance(event, (
ResponseTextDeltaEvent,
ResponseReasoningTextDeltaEvent,
ResponseReasoningSummaryTextDeltaEvent,
ResponseFunctionCallArgumentsDeltaEvent,
)):
now_perf = time.perf_counter()
if first_token_at is None:
first_token_at = now_perf
last_token_at = now_perf

# Handle different event types using isinstance for type safety
if isinstance(event, ResponseOutputItemAddedEvent):
# New output item (reasoning, function call, or message)
Expand Down Expand Up @@ -983,6 +1014,33 @@ async def get_response(

span.output = output_data

# Emit LLM metrics derived from the captured stream. The meter is a
# no-op if the application hasn't configured a MeterProvider, so this
# is safe to do unconditionally. We only emit ttft / tps when their
# input data is actually meaningful (got a content delta, got tokens).
m = get_llm_metrics()
metric_attrs = {"model": self.model_name}
m.requests.add(1, {**metric_attrs, "status": "success"})
m.input_tokens.add(usage.input_tokens or 0, metric_attrs)
m.output_tokens.add(usage.output_tokens or 0, metric_attrs)
m.cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, metric_attrs)
m.reasoning_tokens.add(usage.output_tokens_details.reasoning_tokens or 0, metric_attrs)
if first_token_at is not None:
m.ttft_ms.record((first_token_at - stream_start_perf) * 1000, metric_attrs)
# tps denominator is the generation window (first→last delta), not
# total stream wall time — see LLMMetrics for rationale. Single-token
# responses (where first_token_at == last_token_at, e.g. a one-token
# tool-result acknowledgement) collapse the window to 0 and are
# intentionally skipped — TPS is undefined in that case.
if (
first_token_at is not None
and last_token_at is not None
and last_token_at > first_token_at
and (usage.output_tokens or 0) > 0
):
generation_window_s = last_token_at - first_token_at
m.tps.record(usage.output_tokens / generation_window_s, metric_attrs)

# Return the response. response_id is the server-issued id from
# ResponseCompletedEvent.response.id, or None when the stream ended
# without a completed event (error path) — matching the documented
Expand All @@ -998,6 +1056,12 @@ async def get_response(

except Exception as e:
logger.error(f"Error using Responses API: {e}")
# Emit a request-counter event so 429s, 5xxs, timeouts, etc. are
# observable on the SDK side. Status histograms / token counters
# only fire on successful completion above.
get_llm_metrics().requests.add(
1, {"model": self.model_name, "status": classify_status(e)}
)
raise
Comment thread
greptile-apps[bot] marked this conversation as resolved.

# The _get_response_with_responses_api method has been merged into get_response above
Expand Down
Loading