Skip to content

Commit acbf5c4

Browse files
committed
feat(openai): Add gen_ai.client.operation.time_to_first_chunk metric for streaming
Implement the gen_ai.client.operation.time_to_first_chunk histogram metric as defined in OpenTelemetry Semantic Conventions v1.38.0. This metric records the time (in seconds) from request start to the first output chunk received during streaming chat completions. Changes: - Add time_to_first_token_s field to LLMInvocation dataclass - Add create_ttfc_histogram() factory with semconv-specified bucket boundaries - InvocationMetricsRecorder now creates and records TTFC histogram - First-token detection in stream wrappers for both new and legacy paths - 4 test cases: sync/async streaming, non-streaming exclusion, tool-call streaming Fixes #3932
1 parent b8ca943 commit acbf5c4

7 files changed

Lines changed: 283 additions & 49 deletions

File tree

Lines changed: 8 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,13 @@
11
from opentelemetry.metrics import Histogram, Meter
2-
from opentelemetry.semconv._incubating.metrics import gen_ai_metrics
3-
4-
_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [
5-
0.01,
6-
0.02,
7-
0.04,
8-
0.08,
9-
0.16,
10-
0.32,
11-
0.64,
12-
1.28,
13-
2.56,
14-
5.12,
15-
10.24,
16-
20.48,
17-
40.96,
18-
81.92,
19-
]
20-
21-
_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS = [
22-
1,
23-
4,
24-
16,
25-
64,
26-
256,
27-
1024,
28-
4096,
29-
16384,
30-
65536,
31-
262144,
32-
1048576,
33-
4194304,
34-
16777216,
35-
67108864,
36-
]
2+
from opentelemetry.util.genai.instruments import (
3+
create_duration_histogram,
4+
create_token_histogram,
5+
create_ttfc_histogram,
6+
)
377

388

399
class Instruments:
4010
def __init__(self, meter: Meter):
41-
self.operation_duration_histogram: Histogram = meter.create_histogram(
42-
name=gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION,
43-
description="GenAI operation duration",
44-
unit="s",
45-
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS,
46-
)
47-
self.token_usage_histogram: Histogram = meter.create_histogram(
48-
name=gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE,
49-
description="Measures number of input and output tokens used",
50-
unit="{token}",
51-
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS,
52-
)
11+
self.operation_duration_histogram: Histogram = create_duration_histogram(meter)
12+
self.token_usage_histogram: Histogram = create_token_histogram(meter)
13+
self.ttfc_histogram: Histogram = create_ttfc_histogram(meter)

instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ def traced_method(wrapped, instance, args, kwargs):
9090
parsed_result = result
9191
if is_streaming(kwargs):
9292
return LegacyChatStreamWrapper(
93-
parsed_result, span, logger, capture_content
93+
parsed_result, span, logger, capture_content,
94+
instruments=instruments,
95+
start_time=start,
96+
request_attributes=span_attributes,
9497
)
9598

9699
if span.is_recording():
@@ -195,7 +198,10 @@ async def traced_method(wrapped, instance, args, kwargs):
195198
parsed_result = result
196199
if is_streaming(kwargs):
197200
return LegacyChatStreamWrapper(
198-
parsed_result, span, logger, capture_content
201+
parsed_result, span, logger, capture_content,
202+
instruments=instruments,
203+
start_time=start,
204+
request_attributes=span_attributes,
199205
)
200206

201207
if span.is_recording():
@@ -631,6 +637,8 @@ def __init__(
631637
self.choice_buffers = []
632638
self._started = False
633639
self.capture_content = capture_content
640+
self._first_token_received = False
641+
self._first_token_time: Optional[float] = None
634642
self._setup()
635643

636644
def _setup(self):
@@ -752,8 +760,25 @@ def process_chunk(self, chunk):
752760
self.set_response_model(chunk)
753761
self.set_response_service_tier(chunk)
754762
self.build_streaming_response(chunk)
763+
self._detect_first_token(chunk)
755764
self.set_usage(chunk)
756765

766+
def _detect_first_token(self, chunk):
767+
if self._first_token_received:
768+
return
769+
if getattr(chunk, "choices", None) is None:
770+
return
771+
for choice in chunk.choices:
772+
if not choice.delta:
773+
continue
774+
if (
775+
choice.delta.content is not None
776+
or choice.delta.tool_calls is not None
777+
):
778+
self._first_token_received = True
779+
self._first_token_time = default_timer()
780+
return
781+
757782
def __getattr__(self, name):
758783
return getattr(self.stream, name)
759784

@@ -777,10 +802,16 @@ def __init__(
777802
span: Span,
778803
logger: Logger,
779804
capture_content: bool,
805+
instruments: Optional[Instruments] = None,
806+
start_time: Optional[float] = None,
807+
request_attributes: Optional[dict] = None,
780808
):
781809
super().__init__(stream, capture_content=capture_content)
782810
self.span = span
783811
self.logger = logger
812+
self._instruments = instruments
813+
self._start_time = start_time
814+
self._request_attributes = request_attributes or {}
784815

785816
def cleanup(self, error: Optional[BaseException] = None):
786817
if not self._started:
@@ -863,9 +894,43 @@ def cleanup(self, error: Optional[BaseException] = None):
863894
if error:
864895
handle_span_exception(self.span, error)
865896
else:
897+
self._record_ttft()
866898
self.span.end()
867899
self._started = False
868900

901+
def _record_ttft(self):
902+
if (
903+
self._instruments is None
904+
or self._start_time is None
905+
or self._first_token_time is None
906+
):
907+
return
908+
ttft = max(self._first_token_time - self._start_time, 0.0)
909+
common_attributes = {
910+
GenAIAttributes.GEN_AI_OPERATION_NAME: GenAIAttributes.GenAiOperationNameValues.CHAT.value,
911+
GenAIAttributes.GEN_AI_SYSTEM: GenAIAttributes.GenAiSystemValues.OPENAI.value,
912+
}
913+
if GenAIAttributes.GEN_AI_REQUEST_MODEL in self._request_attributes:
914+
common_attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] = (
915+
self._request_attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL]
916+
)
917+
if self.response_model:
918+
common_attributes[GenAIAttributes.GEN_AI_RESPONSE_MODEL] = (
919+
self.response_model
920+
)
921+
if ServerAttributes.SERVER_ADDRESS in self._request_attributes:
922+
common_attributes[ServerAttributes.SERVER_ADDRESS] = (
923+
self._request_attributes[ServerAttributes.SERVER_ADDRESS]
924+
)
925+
if ServerAttributes.SERVER_PORT in self._request_attributes:
926+
common_attributes[ServerAttributes.SERVER_PORT] = (
927+
self._request_attributes[ServerAttributes.SERVER_PORT]
928+
)
929+
self._instruments.ttfc_histogram.record(
930+
ttft,
931+
attributes=common_attributes,
932+
)
933+
869934

870935
class ChatStreamWrapper(BaseStreamWrapper):
871936
handler: TelemetryHandler
@@ -941,6 +1006,15 @@ def cleanup(self, error: Optional[BaseException] = None):
9411006
},
9421007
)
9431008

1009+
if (
1010+
self._first_token_time is not None
1011+
and self.invocation.monotonic_start_s is not None
1012+
):
1013+
self.invocation.time_to_first_token_s = max(
1014+
self._first_token_time - self.invocation.monotonic_start_s,
1015+
0.0,
1016+
)
1017+
9441018
self._set_output_messages()
9451019

9461020
if error:
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import pytest
2+
from tests.test_utils import DEFAULT_MODEL, USER_ONLY_PROMPT
3+
4+
from opentelemetry.semconv._incubating.attributes import (
5+
gen_ai_attributes as GenAIAttributes,
6+
)
7+
from opentelemetry.semconv._incubating.attributes import (
8+
server_attributes as ServerAttributes,
9+
)
10+
from opentelemetry.util.genai.instruments import (
11+
GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK,
12+
_GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK_BUCKETS,
13+
get_metric_data_points,
14+
)
15+
16+
17+
def test_streaming_chat_records_ttft_metric(
18+
metric_reader, openai_client, instrument_with_content, vcr
19+
):
20+
"""TTFT metric is recorded for streaming chat completions."""
21+
with vcr.use_cassette("test_chat_completion_streaming.yaml"):
22+
response = openai_client.chat.completions.create(
23+
model=DEFAULT_MODEL,
24+
messages=USER_ONLY_PROMPT,
25+
stream=True,
26+
stream_options={"include_usage": True},
27+
)
28+
for _ in response:
29+
pass
30+
31+
data_points = get_metric_data_points(metric_reader, GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK)
32+
assert len(data_points) == 1, (
33+
"expected exactly one TTFC data point for streaming"
34+
)
35+
36+
data_point = data_points[0]
37+
assert data_point.sum >= 0
38+
assert data_point.count == 1
39+
assert data_point.explicit_bounds == tuple(_GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK_BUCKETS)
40+
41+
assert GenAIAttributes.GEN_AI_OPERATION_NAME in data_point.attributes
42+
assert (
43+
data_point.attributes[GenAIAttributes.GEN_AI_OPERATION_NAME]
44+
== GenAIAttributes.GenAiOperationNameValues.CHAT.value
45+
)
46+
assert GenAIAttributes.GEN_AI_REQUEST_MODEL in data_point.attributes
47+
assert (
48+
data_point.attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL]
49+
== "gpt-4o-mini"
50+
)
51+
assert ServerAttributes.SERVER_ADDRESS in data_point.attributes
52+
53+
54+
@pytest.mark.asyncio()
55+
async def test_async_streaming_chat_records_ttft_metric(
56+
metric_reader, async_openai_client, instrument_with_content, vcr
57+
):
58+
"""TTFT metric is recorded for async streaming chat completions."""
59+
with vcr.use_cassette("test_async_chat_completion_streaming.yaml"):
60+
response = await async_openai_client.chat.completions.create(
61+
model=DEFAULT_MODEL,
62+
messages=USER_ONLY_PROMPT,
63+
stream=True,
64+
stream_options={"include_usage": True},
65+
)
66+
async for _ in response:
67+
pass
68+
69+
data_points = get_metric_data_points(metric_reader, GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK)
70+
assert len(data_points) == 1, (
71+
"expected exactly one TTFC data point for async streaming"
72+
)
73+
74+
data_point = data_points[0]
75+
assert data_point.sum >= 0
76+
assert data_point.count == 1
77+
assert data_point.explicit_bounds == tuple(_GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK_BUCKETS)
78+
79+
80+
def test_non_streaming_chat_does_not_record_ttft_metric(
81+
metric_reader, openai_client, instrument_with_content, vcr
82+
):
83+
"""TTFT metric should NOT be recorded for non-streaming requests."""
84+
with vcr.use_cassette("test_chat_completion_metrics.yaml"):
85+
openai_client.chat.completions.create(
86+
messages=USER_ONLY_PROMPT, model=DEFAULT_MODEL, stream=False
87+
)
88+
89+
data_points = get_metric_data_points(metric_reader, GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK)
90+
assert len(data_points) == 0, (
91+
"gen_ai.client.operation.time_to_first_chunk metric should not be recorded for non-streaming"
92+
)
93+
94+
95+
def test_streaming_tool_calls_records_ttft_metric(
96+
metric_reader, openai_client, instrument_with_content, vcr
97+
):
98+
"""TTFT metric is recorded for streaming responses with tool calls."""
99+
with vcr.use_cassette(
100+
"test_chat_completion_multiple_tools_streaming_with_content.yaml"
101+
):
102+
response = openai_client.chat.completions.create(
103+
model=DEFAULT_MODEL,
104+
messages=[{"role": "user", "content": "What's the weather?"}],
105+
stream=True,
106+
stream_options={"include_usage": True},
107+
tools=[
108+
{
109+
"type": "function",
110+
"function": {
111+
"name": "get_weather",
112+
"parameters": {
113+
"type": "object",
114+
"properties": {
115+
"location": {"type": "string"},
116+
},
117+
},
118+
},
119+
}
120+
],
121+
)
122+
for _ in response:
123+
pass
124+
125+
data_points = get_metric_data_points(metric_reader, GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK)
126+
assert len(data_points) == 1, (
127+
"expected exactly one TTFC data point for streaming tool calls"
128+
)
129+
130+
data_point = data_points[0]
131+
assert data_point.sum >= 0
132+
assert data_point.count == 1

util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ def __init__( # pylint: disable=too-many-locals
113113
self.seed = seed
114114
self.server_address = server_address
115115
self.server_port = server_port
116+
self.time_to_first_token_s: float | None = None
117+
"""Time to first token in seconds (streaming responses only)."""
116118
self._start()
117119

118120
def _get_message_attributes(self, *, for_span: bool) -> dict[str, Any]:
@@ -283,6 +285,8 @@ class LLMInvocation:
283285
seed: int | None = None
284286
server_address: str | None = None
285287
server_port: int | None = None
288+
time_to_first_token_s: float | None = None
289+
"""Time to first token in seconds (streaming responses only)."""
286290

287291
_inference_invocation: InferenceInvocation | None = field(
288292
default=None, init=False, repr=False
@@ -347,6 +351,7 @@ def _sync_to_invocation(self) -> None:
347351
inv.server_port = self.server_port
348352
inv.attributes = self.attributes
349353
inv.metric_attributes = self.metric_attributes
354+
inv.time_to_first_token_s = self.time_to_first_token_s
350355

351356
@property
352357
def span(self) -> Span:
@@ -356,3 +361,10 @@ def span(self) -> Span:
356361
if self._inference_invocation is not None
357362
else INVALID_SPAN
358363
)
364+
365+
@property
366+
def monotonic_start_s(self) -> float | None:
367+
"""Monotonic start time, delegated from the underlying InferenceInvocation."""
368+
if self._inference_invocation is not None:
369+
return self._inference_invocation._monotonic_start_s
370+
return None

0 commit comments

Comments
 (0)