Skip to content

Commit 421bf6a

Browse files
committed
feat(streaming): emit OTel metrics for ttft, tps, and per-call token counts
Adds six metrics to TemporalStreamingModel.get_response so applications that configure an OTel MeterProvider can see streaming-call behavior without per-app instrumentation: - agentex.llm.ttft (histogram, ms): time from request start to first content delta. Captured on the first ResponseTextDeltaEvent / ResponseReasoningTextDeltaEvent / ResponseReasoningSummaryTextDeltaEvent. - agentex.llm.tps (histogram, tokens/s): output_tokens / stream_duration. Use 1/tps for time-per-output-token (tpot). - agentex.llm.input_tokens / output_tokens / cached_input_tokens / reasoning_tokens (counters): pulled from the captured ResponsesAPI Usage at end-of-stream. Cache hit rate is computed at query time as rate(cached_input_tokens) / rate(input_tokens). Why - The data was already captured (line 854 captured_usage = response.usage) but never emitted as metrics. Apps could only see total LLM call duration, not the meaningful breakdowns. - Doing this in the SDK rather than each app means every consumer of TemporalStreamingModel gets the metrics for free. - Cardinality is bounded — only `model` is a metric attribute. Resource attributes (service.name, k8s.*, etc.) come from the application's configured OTel resource, so cross-app comparisons work cleanly in Mimir/Prometheus. The meter is a no-op when no MeterProvider is configured, so this is safe for apps that don't run with OTel.
1 parent cf249b9 commit 421bf6a

1 file changed

Lines changed: 65 additions & 0 deletions

File tree

src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Custom Temporal Model Provider with streaming support for OpenAI agents."""
22
from __future__ import annotations
33

4+
import time
45
import uuid
56
from typing import Any, List, Union, Optional, override
67

@@ -26,6 +27,7 @@
2627
CodeInterpreterTool,
2728
ImageGenerationTool,
2829
)
30+
from opentelemetry import metrics
2931
from agents.computer import Computer, AsyncComputer
3032

3133
# Re-export the canonical StreamingMode literal from the streaming service so
@@ -78,6 +80,44 @@
7880
logger = make_logger("agentex.temporal.streaming")
7981

8082

83+
# OTel metrics for LLM streaming behavior. The meter resolves to whatever
84+
# MeterProvider the application has configured (no-op if none). All metrics
85+
# carry only a ``model`` attribute to keep cardinality bounded; resource
86+
# attributes (service.name, k8s.*, etc.) are added by the application's OTel
87+
# resource configuration.
88+
_meter = metrics.get_meter("agentex.openai_agents.streaming")
89+
_ttft_ms = _meter.create_histogram(
90+
name="agentex.llm.ttft",
91+
unit="ms",
92+
description="Time from streaming-request start to first content token (ms)",
93+
)
94+
_tps = _meter.create_histogram(
95+
name="agentex.llm.tps",
96+
unit="tokens/s",
97+
description="Output tokens per second across the streaming response",
98+
)
99+
_input_tokens = _meter.create_counter(
100+
name="agentex.llm.input_tokens",
101+
unit="tokens",
102+
description="Total input tokens sent to the LLM",
103+
)
104+
_output_tokens = _meter.create_counter(
105+
name="agentex.llm.output_tokens",
106+
unit="tokens",
107+
description="Total output tokens returned by the LLM",
108+
)
109+
_cached_input_tokens = _meter.create_counter(
110+
name="agentex.llm.cached_input_tokens",
111+
unit="tokens",
112+
description="Subset of input tokens served from prompt cache",
113+
)
114+
_reasoning_tokens = _meter.create_counter(
115+
name="agentex.llm.reasoning_tokens",
116+
unit="tokens",
117+
description="Output tokens spent on reasoning (subset of output_tokens)",
118+
)
119+
120+
81121
def _serialize_item(item: Any) -> dict[str, Any]:
82122
"""
83123
Universal serializer for any item type from OpenAI Agents SDK.
@@ -642,6 +682,12 @@ async def get_response(
642682
reasoning_summaries = []
643683
reasoning_contents = []
644684
event_count = 0
685+
# Wall-clock instrumentation for ttft / tps / tpot. ``stream_start_perf``
686+
# bookmarks just before the event loop so the timer captures only the
687+
# streaming portion, not request setup. ``first_token_at`` is set on
688+
# the first content delta (text or reasoning summary).
689+
stream_start_perf = time.perf_counter()
690+
first_token_at: Optional[float] = None
645691

646692
# We expect task_id to always be provided for streaming
647693
if not task_id:
@@ -721,6 +767,10 @@ async def get_response(
721767
# Handle text streaming
722768
delta = getattr(event, 'delta', '')
723769

770+
# First content-bearing event in this stream — bookmark for ttft.
771+
if first_token_at is None:
772+
first_token_at = time.perf_counter()
773+
724774
if isinstance(event, ResponseReasoningSummaryTextDeltaEvent) and reasoning_context:
725775
# Stream reasoning summary deltas - these are the actual reasoning tokens!
726776
try:
@@ -983,6 +1033,21 @@ async def get_response(
9831033

9841034
span.output = output_data
9851035

1036+
# Emit LLM metrics derived from the captured stream. The meter is a
1037+
# no-op if the application hasn't configured a MeterProvider, so this
1038+
# is safe to do unconditionally. We only emit ttft / tps when their
1039+
# input data is actually meaningful (got a content delta, got tokens).
1040+
metric_attrs = {"model": self.model_name}
1041+
stream_duration_s = time.perf_counter() - stream_start_perf
1042+
_input_tokens.add(usage.input_tokens or 0, metric_attrs)
1043+
_output_tokens.add(usage.output_tokens or 0, metric_attrs)
1044+
_cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, metric_attrs)
1045+
_reasoning_tokens.add(usage.output_tokens_details.reasoning_tokens or 0, metric_attrs)
1046+
if first_token_at is not None:
1047+
_ttft_ms.record((first_token_at - stream_start_perf) * 1000, metric_attrs)
1048+
if (usage.output_tokens or 0) > 0 and stream_duration_s > 0:
1049+
_tps.record(usage.output_tokens / stream_duration_s, metric_attrs)
1050+
9861051
# Return the response. response_id is the server-issued id from
9871052
# ResponseCompletedEvent.response.id, or None when the stream ended
9881053
# without a completed event (error path) — matching the documented

0 commit comments

Comments
 (0)