Skip to content

Commit 44ed765

Browse files
committed
feat(openai): add gen_ai.server.time_to_first_token metric for streaming
Implement the gen_ai.server.time_to_first_token histogram metric as defined in OpenTelemetry Semantic Conventions v1.38.0. This metric records the time from request start to first output token received during streaming chat completions. Changes: - Add time_to_first_token_s field to LLMInvocation dataclass - Add TTFT histogram creation to util-genai instruments and metrics - Wire TTFT detection into BaseStreamWrapper.process_chunk() - Record TTFT in both legacy and new handler instrumentation paths - Add tests for sync/async streaming, non-streaming, and tool calls Resolves #3932
1 parent 7f107df commit 44ed765

6 files changed

Lines changed: 304 additions & 3 deletions

File tree

instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/instruments.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,25 @@
3535
67108864,
3636
]
3737

38+
_GEN_AI_SERVER_TIME_TO_FIRST_TOKEN_BUCKETS = [
39+
0.001,
40+
0.005,
41+
0.01,
42+
0.02,
43+
0.04,
44+
0.06,
45+
0.08,
46+
0.1,
47+
0.25,
48+
0.5,
49+
0.75,
50+
1.0,
51+
2.5,
52+
5.0,
53+
7.5,
54+
10.0,
55+
]
56+
3857

3958
class Instruments:
4059
def __init__(self, meter: Meter):
@@ -50,3 +69,9 @@ def __init__(self, meter: Meter):
5069
unit="{token}",
5170
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS,
5271
)
72+
self.ttft_histogram: Histogram = meter.create_histogram(
73+
name=gen_ai_metrics.GEN_AI_SERVER_TIME_TO_FIRST_TOKEN,
74+
description="Time to generate first token for successful responses",
75+
unit="s",
76+
explicit_bucket_boundaries_advisory=_GEN_AI_SERVER_TIME_TO_FIRST_TOKEN_BUCKETS,
77+
)

instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ def traced_method(wrapped, instance, args, kwargs):
9090
parsed_result = result
9191
if is_streaming(kwargs):
9292
return LegacyChatStreamWrapper(
93-
parsed_result, span, logger, capture_content
93+
parsed_result, span, logger, capture_content,
94+
instruments=instruments,
95+
start_time=start,
96+
request_attributes=span_attributes,
9497
)
9598

9699
if span.is_recording():
@@ -195,7 +198,10 @@ async def traced_method(wrapped, instance, args, kwargs):
195198
parsed_result = result
196199
if is_streaming(kwargs):
197200
return LegacyChatStreamWrapper(
198-
parsed_result, span, logger, capture_content
201+
parsed_result, span, logger, capture_content,
202+
instruments=instruments,
203+
start_time=start,
204+
request_attributes=span_attributes,
199205
)
200206

201207
if span.is_recording():
@@ -631,6 +637,8 @@ def __init__(
631637
self.choice_buffers = []
632638
self._started = False
633639
self.capture_content = capture_content
640+
self._first_token_received = False
641+
self._first_token_time: Optional[float] = None
634642
self._setup()
635643

636644
def _setup(self):
@@ -752,8 +760,25 @@ def process_chunk(self, chunk):
752760
self.set_response_model(chunk)
753761
self.set_response_service_tier(chunk)
754762
self.build_streaming_response(chunk)
763+
self._detect_first_token(chunk)
755764
self.set_usage(chunk)
756765

766+
def _detect_first_token(self, chunk):
767+
if self._first_token_received:
768+
return
769+
if getattr(chunk, "choices", None) is None:
770+
return
771+
for choice in chunk.choices:
772+
if not choice.delta:
773+
continue
774+
if (
775+
choice.delta.content is not None
776+
or choice.delta.tool_calls is not None
777+
):
778+
self._first_token_received = True
779+
self._first_token_time = default_timer()
780+
return
781+
757782
def __getattr__(self, name):
758783
return getattr(self.stream, name)
759784

@@ -777,10 +802,16 @@ def __init__(
777802
span: Span,
778803
logger: Logger,
779804
capture_content: bool,
805+
instruments: Optional[Instruments] = None,
806+
start_time: Optional[float] = None,
807+
request_attributes: Optional[dict] = None,
780808
):
781809
super().__init__(stream, capture_content=capture_content)
782810
self.span = span
783811
self.logger = logger
812+
self._instruments = instruments
813+
self._start_time = start_time
814+
self._request_attributes = request_attributes or {}
784815

785816
def cleanup(self, error: Optional[BaseException] = None):
786817
if not self._started:
@@ -863,9 +894,43 @@ def cleanup(self, error: Optional[BaseException] = None):
863894
if error:
864895
handle_span_exception(self.span, error)
865896
else:
897+
self._record_ttft()
866898
self.span.end()
867899
self._started = False
868900

901+
def _record_ttft(self):
902+
if (
903+
self._instruments is None
904+
or self._start_time is None
905+
or self._first_token_time is None
906+
):
907+
return
908+
ttft = max(self._first_token_time - self._start_time, 0.0)
909+
common_attributes = {
910+
GenAIAttributes.GEN_AI_OPERATION_NAME: GenAIAttributes.GenAiOperationNameValues.CHAT.value,
911+
GenAIAttributes.GEN_AI_SYSTEM: GenAIAttributes.GenAiSystemValues.OPENAI.value,
912+
}
913+
if GenAIAttributes.GEN_AI_REQUEST_MODEL in self._request_attributes:
914+
common_attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] = (
915+
self._request_attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL]
916+
)
917+
if self.response_model:
918+
common_attributes[GenAIAttributes.GEN_AI_RESPONSE_MODEL] = (
919+
self.response_model
920+
)
921+
if ServerAttributes.SERVER_ADDRESS in self._request_attributes:
922+
common_attributes[ServerAttributes.SERVER_ADDRESS] = (
923+
self._request_attributes[ServerAttributes.SERVER_ADDRESS]
924+
)
925+
if ServerAttributes.SERVER_PORT in self._request_attributes:
926+
common_attributes[ServerAttributes.SERVER_PORT] = (
927+
self._request_attributes[ServerAttributes.SERVER_PORT]
928+
)
929+
self._instruments.ttft_histogram.record(
930+
ttft,
931+
attributes=common_attributes,
932+
)
933+
869934

870935
class ChatStreamWrapper(BaseStreamWrapper):
871936
handler: TelemetryHandler
@@ -941,6 +1006,15 @@ def cleanup(self, error: Optional[BaseException] = None):
9411006
},
9421007
)
9431008

1009+
if (
1010+
self._first_token_time is not None
1011+
and self.invocation.monotonic_start_s is not None
1012+
):
1013+
self.invocation.time_to_first_token_s = max(
1014+
self._first_token_time - self.invocation.monotonic_start_s,
1015+
0.0,
1016+
)
1017+
9441018
self._set_output_messages()
9451019

9461020
if error:
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
import pytest
2+
from tests.test_utils import DEFAULT_MODEL, USER_ONLY_PROMPT
3+
4+
from opentelemetry.semconv._incubating.attributes import (
5+
gen_ai_attributes as GenAIAttributes,
6+
)
7+
from opentelemetry.semconv._incubating.attributes import (
8+
server_attributes as ServerAttributes,
9+
)
10+
from opentelemetry.semconv._incubating.metrics import gen_ai_metrics
11+
from opentelemetry.util.genai.utils import is_experimental_mode
12+
13+
_TTFT_BUCKETS = (
14+
0.001,
15+
0.005,
16+
0.01,
17+
0.02,
18+
0.04,
19+
0.06,
20+
0.08,
21+
0.1,
22+
0.25,
23+
0.5,
24+
0.75,
25+
1.0,
26+
2.5,
27+
5.0,
28+
7.5,
29+
10.0,
30+
)
31+
32+
33+
def _get_ttft_metric(metric_reader):
34+
metrics = metric_reader.get_metrics_data().resource_metrics
35+
if not metrics:
36+
return None
37+
for scope_metrics in metrics[0].scope_metrics:
38+
for m in scope_metrics.metrics:
39+
if m.name == gen_ai_metrics.GEN_AI_SERVER_TIME_TO_FIRST_TOKEN:
40+
return m
41+
return None
42+
43+
44+
def test_streaming_chat_records_ttft_metric(
45+
metric_reader, openai_client, instrument_with_content, vcr
46+
):
47+
"""TTFT metric is recorded for streaming chat completions."""
48+
with vcr.use_cassette("test_chat_completion_streaming.yaml"):
49+
response = openai_client.chat.completions.create(
50+
model=DEFAULT_MODEL,
51+
messages=USER_ONLY_PROMPT,
52+
stream=True,
53+
stream_options={"include_usage": True},
54+
)
55+
for _ in response:
56+
pass
57+
58+
ttft_metric = _get_ttft_metric(metric_reader)
59+
assert ttft_metric is not None, (
60+
"gen_ai.server.time_to_first_token metric should be recorded for streaming"
61+
)
62+
63+
data_point = ttft_metric.data.data_points[0]
64+
assert data_point.sum >= 0
65+
assert data_point.count == 1
66+
assert data_point.explicit_bounds == _TTFT_BUCKETS
67+
68+
latest_experimental_enabled = is_experimental_mode()
69+
assert GenAIAttributes.GEN_AI_OPERATION_NAME in data_point.attributes
70+
assert (
71+
data_point.attributes[GenAIAttributes.GEN_AI_OPERATION_NAME]
72+
== GenAIAttributes.GenAiOperationNameValues.CHAT.value
73+
)
74+
assert GenAIAttributes.GEN_AI_REQUEST_MODEL in data_point.attributes
75+
assert (
76+
data_point.attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL]
77+
== "gpt-4o-mini"
78+
)
79+
assert ServerAttributes.SERVER_ADDRESS in data_point.attributes
80+
81+
82+
@pytest.mark.asyncio()
83+
async def test_async_streaming_chat_records_ttft_metric(
84+
metric_reader, async_openai_client, instrument_with_content, vcr
85+
):
86+
"""TTFT metric is recorded for async streaming chat completions."""
87+
with vcr.use_cassette("test_async_chat_completion_streaming.yaml"):
88+
response = await async_openai_client.chat.completions.create(
89+
model=DEFAULT_MODEL,
90+
messages=USER_ONLY_PROMPT,
91+
stream=True,
92+
stream_options={"include_usage": True},
93+
)
94+
async for _ in response:
95+
pass
96+
97+
ttft_metric = _get_ttft_metric(metric_reader)
98+
assert ttft_metric is not None, (
99+
"gen_ai.server.time_to_first_token metric should be recorded for async streaming"
100+
)
101+
102+
data_point = ttft_metric.data.data_points[0]
103+
assert data_point.sum >= 0
104+
assert data_point.count == 1
105+
assert data_point.explicit_bounds == _TTFT_BUCKETS
106+
107+
108+
def test_non_streaming_chat_does_not_record_ttft_metric(
109+
metric_reader, openai_client, instrument_with_content, vcr
110+
):
111+
"""TTFT metric should NOT be recorded for non-streaming requests."""
112+
with vcr.use_cassette("test_chat_completion_metrics.yaml"):
113+
openai_client.chat.completions.create(
114+
messages=USER_ONLY_PROMPT, model=DEFAULT_MODEL, stream=False
115+
)
116+
117+
ttft_metric = _get_ttft_metric(metric_reader)
118+
assert ttft_metric is None, (
119+
"gen_ai.server.time_to_first_token metric should not be recorded for non-streaming"
120+
)
121+
122+
123+
def test_streaming_tool_calls_records_ttft_metric(
124+
metric_reader, openai_client, instrument_with_content, vcr
125+
):
126+
"""TTFT metric is recorded for streaming responses with tool calls."""
127+
with vcr.use_cassette(
128+
"test_chat_completion_multiple_tools_streaming_with_content.yaml"
129+
):
130+
response = openai_client.chat.completions.create(
131+
model=DEFAULT_MODEL,
132+
messages=[{"role": "user", "content": "What's the weather?"}],
133+
stream=True,
134+
stream_options={"include_usage": True},
135+
tools=[
136+
{
137+
"type": "function",
138+
"function": {
139+
"name": "get_weather",
140+
"parameters": {
141+
"type": "object",
142+
"properties": {
143+
"location": {"type": "string"},
144+
},
145+
},
146+
},
147+
}
148+
],
149+
)
150+
for _ in response:
151+
pass
152+
153+
ttft_metric = _get_ttft_metric(metric_reader)
154+
assert ttft_metric is not None, (
155+
"gen_ai.server.time_to_first_token metric should be recorded for streaming tool calls"
156+
)
157+
158+
data_point = ttft_metric.data.data_points[0]
159+
assert data_point.sum >= 0
160+
assert data_point.count == 1

util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,25 @@
3535
67108864,
3636
]
3737

38+
_GEN_AI_SERVER_TIME_TO_FIRST_TOKEN_BUCKETS = [
39+
0.001,
40+
0.005,
41+
0.01,
42+
0.02,
43+
0.04,
44+
0.06,
45+
0.08,
46+
0.1,
47+
0.25,
48+
0.5,
49+
0.75,
50+
1.0,
51+
2.5,
52+
5.0,
53+
7.5,
54+
10.0,
55+
]
56+
3857

3958
def create_duration_histogram(meter: Meter) -> Histogram:
4059
return meter.create_histogram(
@@ -52,3 +71,12 @@ def create_token_histogram(meter: Meter) -> Histogram:
5271
unit="{token}",
5372
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS,
5473
)
74+
75+
76+
def create_ttft_histogram(meter: Meter) -> Histogram:
77+
return meter.create_histogram(
78+
name=gen_ai_metrics.GEN_AI_SERVER_TIME_TO_FIRST_TOKEN,
79+
description="Time to generate first token for successful responses",
80+
unit="s",
81+
explicit_bucket_boundaries_advisory=_GEN_AI_SERVER_TIME_TO_FIRST_TOKEN_BUCKETS,
82+
)

0 commit comments

Comments
 (0)