Skip to content
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Custom Temporal Model Provider with streaming support for OpenAI agents."""
from __future__ import annotations

import time
import uuid
from typing import Any, List, Union, Optional, override

Expand All @@ -26,6 +27,7 @@
CodeInterpreterTool,
ImageGenerationTool,
)
from opentelemetry import metrics
from agents.computer import Computer, AsyncComputer

# Re-export the canonical StreamingMode literal from the streaming service so
Expand Down Expand Up @@ -78,6 +80,44 @@
logger = make_logger("agentex.temporal.streaming")


# OTel metrics for LLM streaming behavior. The meter resolves to whatever
# MeterProvider the application has configured (no-op if none). All metrics
# carry only a ``model`` attribute to keep cardinality bounded; resource
# attributes (service.name, k8s.*, etc.) are added by the application's OTel
# resource configuration.
_meter = metrics.get_meter("agentex.openai_agents.streaming")
_ttft_ms = _meter.create_histogram(
name="agentex.llm.ttft",
unit="ms",
description="Time from streaming-request start to first content token (ms)",
)
_tps = _meter.create_histogram(
name="agentex.llm.tps",
unit="tokens/s",
description="Output tokens per second across the streaming response",
)
_input_tokens = _meter.create_counter(
name="agentex.llm.input_tokens",
unit="tokens",
description="Total input tokens sent to the LLM",
)
_output_tokens = _meter.create_counter(
name="agentex.llm.output_tokens",
unit="tokens",
description="Total output tokens returned by the LLM",
)
_cached_input_tokens = _meter.create_counter(
name="agentex.llm.cached_input_tokens",
unit="tokens",
description="Subset of input tokens served from prompt cache",
)
_reasoning_tokens = _meter.create_counter(
name="agentex.llm.reasoning_tokens",
unit="tokens",
description="Output tokens spent on reasoning (subset of output_tokens)",
)


Comment thread
greptile-apps[bot] marked this conversation as resolved.
Outdated
def _serialize_item(item: Any) -> dict[str, Any]:
"""
Universal serializer for any item type from OpenAI Agents SDK.
Expand Down Expand Up @@ -642,6 +682,12 @@ async def get_response(
reasoning_summaries = []
reasoning_contents = []
event_count = 0
# Wall-clock instrumentation for ttft / tps / tpot. ``stream_start_perf``
# bookmarks just before the event loop so the timer captures only the
# streaming portion, not request setup. ``first_token_at`` is set on
# the first content delta (text or reasoning summary).
stream_start_perf = time.perf_counter()
first_token_at: Optional[float] = None
Comment thread
greptile-apps[bot] marked this conversation as resolved.

# We expect task_id to always be provided for streaming
if not task_id:
Expand Down Expand Up @@ -721,6 +767,10 @@ async def get_response(
# Handle text streaming
delta = getattr(event, 'delta', '')

# First content-bearing event in this stream — bookmark for ttft.
if first_token_at is None:
first_token_at = time.perf_counter()

if isinstance(event, ResponseReasoningSummaryTextDeltaEvent) and reasoning_context:
# Stream reasoning summary deltas - these are the actual reasoning tokens!
try:
Expand Down Expand Up @@ -983,6 +1033,21 @@ async def get_response(

span.output = output_data

# Emit LLM metrics derived from the captured stream. The meter is a
# no-op if the application hasn't configured a MeterProvider, so this
# is safe to do unconditionally. We only emit ttft / tps when their
# input data is actually meaningful (got a content delta, got tokens).
metric_attrs = {"model": self.model_name}
stream_duration_s = time.perf_counter() - stream_start_perf
_input_tokens.add(usage.input_tokens or 0, metric_attrs)
_output_tokens.add(usage.output_tokens or 0, metric_attrs)
_cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, metric_attrs)
_reasoning_tokens.add(usage.output_tokens_details.reasoning_tokens or 0, metric_attrs)
if first_token_at is not None:
_ttft_ms.record((first_token_at - stream_start_perf) * 1000, metric_attrs)
if (usage.output_tokens or 0) > 0 and stream_duration_s > 0:
_tps.record(usage.output_tokens / stream_duration_s, metric_attrs)
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Outdated

# Return the response. response_id is the server-issued id from
# ResponseCompletedEvent.response.id, or None when the stream ended
# without a completed event (error path) — matching the documented
Expand Down
Loading