Skip to content

Commit 45733c9

Browse files
committed
review: address greptile feedback on llm metrics
Three changes from PR #347 review: 1. Move stream_start_perf above responses.create() so ttft captures the full request-to-first-token latency (HTTP round-trip + model TTFB), not just post-connect event-loop delay. Drop the duplicate assignment that was being overwritten after the await. 2. Track last_token_at alongside first_token_at and use the generation window (first→last delta) as the tps denominator. Total stream duration was inflated by tool-call argument deltas, reasoning events, and stream_update awaits — making tps under-report the model's actual generation speed for agentic responses. 3. Lazy-create the OTel instruments inside _StreamingMetrics rather than at import time, so a MeterProvider configured *after* this module is imported still binds correctly. Apps that bootstrap OTel in a startup hook (common with lazy-init patterns) would otherwise silently send to the no-op provider.
1 parent 421bf6a commit 45733c9

1 file changed

Lines changed: 88 additions & 52 deletions

File tree

src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py

Lines changed: 88 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -80,42 +80,61 @@
8080
logger = make_logger("agentex.temporal.streaming")
8181

8282

83-
# OTel metrics for LLM streaming behavior. The meter resolves to whatever
84-
# MeterProvider the application has configured (no-op if none). All metrics
85-
# carry only a ``model`` attribute to keep cardinality bounded; resource
86-
# attributes (service.name, k8s.*, etc.) are added by the application's OTel
87-
# resource configuration.
88-
_meter = metrics.get_meter("agentex.openai_agents.streaming")
89-
_ttft_ms = _meter.create_histogram(
90-
name="agentex.llm.ttft",
91-
unit="ms",
92-
description="Time from streaming-request start to first content token (ms)",
93-
)
94-
_tps = _meter.create_histogram(
95-
name="agentex.llm.tps",
96-
unit="tokens/s",
97-
description="Output tokens per second across the streaming response",
98-
)
99-
_input_tokens = _meter.create_counter(
100-
name="agentex.llm.input_tokens",
101-
unit="tokens",
102-
description="Total input tokens sent to the LLM",
103-
)
104-
_output_tokens = _meter.create_counter(
105-
name="agentex.llm.output_tokens",
106-
unit="tokens",
107-
description="Total output tokens returned by the LLM",
108-
)
109-
_cached_input_tokens = _meter.create_counter(
110-
name="agentex.llm.cached_input_tokens",
111-
unit="tokens",
112-
description="Subset of input tokens served from prompt cache",
113-
)
114-
_reasoning_tokens = _meter.create_counter(
115-
name="agentex.llm.reasoning_tokens",
116-
unit="tokens",
117-
description="Output tokens spent on reasoning (subset of output_tokens)",
118-
)
83+
# OTel metrics for LLM streaming behavior. Instruments are created lazily on
84+
# first use so the meter resolves to whatever MeterProvider the application
85+
# eventually configures, even if that happens after this module is imported.
86+
# All metrics carry only a ``model`` attribute to keep cardinality bounded;
87+
# resource attributes (service.name, k8s.*, etc.) come from the application's
88+
# OTel resource configuration.
89+
class _StreamingMetrics:
90+
"""Lazily-created OTel instruments for streaming LLM telemetry."""
91+
92+
def __init__(self) -> None:
93+
meter = metrics.get_meter("agentex.openai_agents.streaming")
94+
self.ttft_ms = meter.create_histogram(
95+
name="agentex.llm.ttft",
96+
unit="ms",
97+
description="Time from request submission to first content token (ms)",
98+
)
99+
# Note: TPS denominator is the model-generation window
100+
# (last_token_time - first_token_time), not total stream wall time.
101+
# This isolates raw model throughput from event-loop / tool-call latency.
102+
self.tps = meter.create_histogram(
103+
name="agentex.llm.tps",
104+
unit="tokens/s",
105+
description="Output tokens per second over the generation window",
106+
)
107+
self.input_tokens = meter.create_counter(
108+
name="agentex.llm.input_tokens",
109+
unit="tokens",
110+
description="Total input tokens sent to the LLM",
111+
)
112+
self.output_tokens = meter.create_counter(
113+
name="agentex.llm.output_tokens",
114+
unit="tokens",
115+
description="Total output tokens returned by the LLM",
116+
)
117+
self.cached_input_tokens = meter.create_counter(
118+
name="agentex.llm.cached_input_tokens",
119+
unit="tokens",
120+
description="Subset of input tokens served from prompt cache",
121+
)
122+
self.reasoning_tokens = meter.create_counter(
123+
name="agentex.llm.reasoning_tokens",
124+
unit="tokens",
125+
description="Output tokens spent on reasoning (subset of output_tokens)",
126+
)
127+
128+
129+
_streaming_metrics: Optional[_StreamingMetrics] = None
130+
131+
132+
def _get_streaming_metrics() -> _StreamingMetrics:
133+
"""Return the streaming metrics singleton, creating it on first use."""
134+
global _streaming_metrics
135+
if _streaming_metrics is None:
136+
_streaming_metrics = _StreamingMetrics()
137+
return _streaming_metrics
119138

120139

121140
def _serialize_item(item: Any) -> dict[str, Any]:
@@ -632,7 +651,11 @@ async def get_response(
632651
# endpoints recognize this parameter, so we don't auto-inject a default.
633652
prompt_cache_key = extra_args.pop("prompt_cache_key", NOT_GIVEN)
634653

635-
# Create the response stream using Responses API
654+
# Create the response stream using Responses API.
655+
# Bookmark request start *before* the await so ttft captures the full
656+
# user-perceived latency (HTTP round-trip + model TTFB), not just the
657+
# post-connect event-loop delay.
658+
stream_start_perf = time.perf_counter()
636659
logger.debug(f"[TemporalStreamingModel] Creating response stream with Responses API")
637660
stream = await self.client.responses.create( # type: ignore[call-overload]
638661

@@ -682,12 +705,12 @@ async def get_response(
682705
reasoning_summaries = []
683706
reasoning_contents = []
684707
event_count = 0
685-
# Wall-clock instrumentation for ttft / tps / tpot. ``stream_start_perf``
686-
# bookmarks just before the event loop so the timer captures only the
687-
# streaming portion, not request setup. ``first_token_at`` is set on
688-
# the first content delta (text or reasoning summary).
689-
stream_start_perf = time.perf_counter()
708+
# ttft / tps instrumentation. ``stream_start_perf`` is set above,
709+
# before the responses.create() await, so it captures the full
710+
# request-to-first-token latency. ``first_token_at`` and
711+
# ``last_token_at`` bracket the model-generation window for tps.
690712
first_token_at: Optional[float] = None
713+
last_token_at: Optional[float] = None
691714

692715
# We expect task_id to always be provided for streaming
693716
if not task_id:
@@ -767,9 +790,14 @@ async def get_response(
767790
# Handle text streaming
768791
delta = getattr(event, 'delta', '')
769792

770-
# First content-bearing event in this stream — bookmark for ttft.
793+
# Bookmark first/last content-bearing events for ttft and tps.
794+
# last_token_at is updated on every delta so tps measures only
795+
# the model-generation window, not subsequent tool-call /
796+
# event-handler time.
797+
now_perf = time.perf_counter()
771798
if first_token_at is None:
772-
first_token_at = time.perf_counter()
799+
first_token_at = now_perf
800+
last_token_at = now_perf
773801

774802
if isinstance(event, ResponseReasoningSummaryTextDeltaEvent) and reasoning_context:
775803
# Stream reasoning summary deltas - these are the actual reasoning tokens!
@@ -1037,16 +1065,24 @@ async def get_response(
10371065
# no-op if the application hasn't configured a MeterProvider, so this
10381066
# is safe to do unconditionally. We only emit ttft / tps when their
10391067
# input data is actually meaningful (got a content delta, got tokens).
1068+
m = _get_streaming_metrics()
10401069
metric_attrs = {"model": self.model_name}
1041-
stream_duration_s = time.perf_counter() - stream_start_perf
1042-
_input_tokens.add(usage.input_tokens or 0, metric_attrs)
1043-
_output_tokens.add(usage.output_tokens or 0, metric_attrs)
1044-
_cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, metric_attrs)
1045-
_reasoning_tokens.add(usage.output_tokens_details.reasoning_tokens or 0, metric_attrs)
1070+
m.input_tokens.add(usage.input_tokens or 0, metric_attrs)
1071+
m.output_tokens.add(usage.output_tokens or 0, metric_attrs)
1072+
m.cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, metric_attrs)
1073+
m.reasoning_tokens.add(usage.output_tokens_details.reasoning_tokens or 0, metric_attrs)
10461074
if first_token_at is not None:
1047-
_ttft_ms.record((first_token_at - stream_start_perf) * 1000, metric_attrs)
1048-
if (usage.output_tokens or 0) > 0 and stream_duration_s > 0:
1049-
_tps.record(usage.output_tokens / stream_duration_s, metric_attrs)
1075+
m.ttft_ms.record((first_token_at - stream_start_perf) * 1000, metric_attrs)
1076+
# tps denominator is the generation window (first→last delta), not
1077+
# total stream wall time — see _StreamingMetrics for rationale.
1078+
if (
1079+
first_token_at is not None
1080+
and last_token_at is not None
1081+
and last_token_at > first_token_at
1082+
and (usage.output_tokens or 0) > 0
1083+
):
1084+
generation_window_s = last_token_at - first_token_at
1085+
m.tps.record(usage.output_tokens / generation_window_s, metric_attrs)
10501086

10511087
# Return the response. response_id is the server-issued id from
10521088
# ResponseCompletedEvent.response.id, or None when the stream ended

0 commit comments

Comments
 (0)