Skip to content

Commit 3c5e5ea

Browse files
authored
Merge branch 'main' into mike/towncrier
2 parents 7878405 + 287a65e commit 3c5e5ea

17 files changed

Lines changed: 1326 additions & 131 deletions

File tree

AGENTS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ uv run tox -e typecheck
6565
- The monorepo uses `uv` workspaces.
6666
- `tox.ini` defines the test matrix - check it for available test environments.
6767
- Do not add `type: ignore` comments. If a type error arises, solve it properly or write a follow-up plan to address it in another PR.
68+
- Annotate function signatures (parameters and return types) and class attributes. Prefer `from __future__ import annotations` over runtime-quoted strings.
6869
- When a file uses `from __future__ import annotations`, do not quote type annotations just to
6970
avoid forward references. Keep quotes only for expressions still evaluated at runtime, such as
7071
`typing.cast(...)`, unless the referenced type is imported at runtime.

CLAUDE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
@AGENTS.md
2+

instrumentation-genai/AGENTS.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@ This layer is responsible only for:
1919
Everything else (span creation, metric recording, event emission, context propagation)
2020
belongs in `util/opentelemetry-util-genai`.
2121

22+
For GenAI streaming wrappers, prefer the shared `SyncStreamWrapper` and `AsyncStreamWrapper`
23+
helpers from `opentelemetry.util.genai.stream` instead of reimplementing iteration,
24+
close/context-manager, and finalization behavior in provider packages.
25+
26+
Put provider-specific chunk parsing and telemetry finalization in private hook methods or a
27+
narrow mixin. Do not make async stream wrappers inherit from sync stream wrappers.
28+
2229
## 2. TelemetryHandler Initialization
2330

2431
Construct `TelemetryHandler` once inside `_instrument()`, passing all OTel providers and the

instrumentation-genai/opentelemetry-instrumentation-openai-v2/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
- Refactor chat completion stream wrappers to use shared GenAI stream lifecycle helpers.
11+
([#4500](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4500))
1012
- Pass tool definitions from `tools` kwarg to `InferenceInvocation.tool_definitions`
1113
so `gen_ai.tool.definitions` span attribute is populated on chat completion spans
1214
([#4554](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4554))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Copyright The OpenTelemetry Authors
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
from __future__ import annotations
5+
6+
from openai.types.chat.chat_completion_chunk import ChoiceDeltaToolCall
7+
8+
9+
class ToolCallBuffer:
10+
def __init__(
11+
self,
12+
index: int,
13+
tool_call_id: str | None,
14+
function_name: str | None,
15+
) -> None:
16+
self.index: int = index
17+
self.function_name: str | None = function_name
18+
self.tool_call_id: str | None = tool_call_id
19+
self.arguments: list[str] = []
20+
21+
def append_arguments(self, arguments: str | None) -> None:
22+
if arguments is not None:
23+
self.arguments.append(arguments)
24+
25+
26+
class ChoiceBuffer:
27+
def __init__(self, index: int) -> None:
28+
self.index: int = index
29+
self.finish_reason: str | None = None
30+
self.text_content: list[str] = []
31+
self.tool_calls_buffers: list[ToolCallBuffer | None] = []
32+
33+
def append_text_content(self, content: str) -> None:
34+
self.text_content.append(content)
35+
36+
def append_tool_call(self, tool_call: ChoiceDeltaToolCall) -> None:
37+
idx = tool_call.index
38+
for _ in range(len(self.tool_calls_buffers), idx + 1):
39+
self.tool_calls_buffers.append(None)
40+
41+
function = tool_call.function
42+
buffer = self.tool_calls_buffers[idx]
43+
if buffer is None:
44+
buffer = ToolCallBuffer(
45+
idx,
46+
tool_call.id,
47+
function.name if function else None,
48+
)
49+
self.tool_calls_buffers[idx] = buffer
50+
51+
if function:
52+
buffer.append_arguments(function.arguments)
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
# Copyright The OpenTelemetry Authors
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
from __future__ import annotations
5+
6+
import json
7+
from typing import Optional
8+
9+
from openai import AsyncStream, Stream
10+
from openai.types.chat import ChatCompletionChunk
11+
12+
from opentelemetry.semconv._incubating.attributes import (
13+
openai_attributes as OpenAIAttributes,
14+
)
15+
from opentelemetry.util.genai.invocation import InferenceInvocation
16+
from opentelemetry.util.genai.stream import (
17+
AsyncStreamWrapper,
18+
SyncStreamWrapper,
19+
)
20+
from opentelemetry.util.genai.types import (
21+
OutputMessage,
22+
Text,
23+
ToolCallRequest,
24+
)
25+
26+
from .chat_buffers import ChoiceBuffer
27+
28+
29+
class _ChatStreamMixin:
30+
"""Chat-specific hooks shared by sync and async stream wrappers."""
31+
32+
_self_invocation: InferenceInvocation
33+
_self_capture_content: bool
34+
_self_choice_buffers: list[ChoiceBuffer]
35+
_self_response_id: Optional[str]
36+
_self_response_model: Optional[str]
37+
_self_service_tier: Optional[str]
38+
_self_prompt_tokens: Optional[int]
39+
_self_completion_tokens: Optional[int]
40+
41+
def _set_response_model(self, chunk: ChatCompletionChunk) -> None:
42+
if self._self_response_model:
43+
return
44+
45+
if chunk.model:
46+
self._self_response_model = chunk.model
47+
48+
def _set_response_id(self, chunk: ChatCompletionChunk) -> None:
49+
if self._self_response_id:
50+
return
51+
52+
if chunk.id:
53+
self._self_response_id = chunk.id
54+
55+
def _set_response_service_tier(self, chunk: ChatCompletionChunk) -> None:
56+
if self._self_service_tier:
57+
return
58+
59+
service_tier = getattr(chunk, "service_tier", None)
60+
if service_tier:
61+
self._self_service_tier = service_tier
62+
63+
def _build_streaming_response(self, chunk: ChatCompletionChunk) -> None:
64+
if chunk.choices is None:
65+
return
66+
67+
for choice in chunk.choices:
68+
if not choice.delta:
69+
continue
70+
71+
for idx in range(len(self._self_choice_buffers), choice.index + 1):
72+
self._self_choice_buffers.append(ChoiceBuffer(idx))
73+
74+
if choice.finish_reason:
75+
self._self_choice_buffers[
76+
choice.index
77+
].finish_reason = choice.finish_reason
78+
79+
if choice.delta.content is not None:
80+
self._self_choice_buffers[choice.index].append_text_content(
81+
choice.delta.content
82+
)
83+
84+
if choice.delta.tool_calls is not None:
85+
for tool_call in choice.delta.tool_calls:
86+
self._self_choice_buffers[choice.index].append_tool_call(
87+
tool_call
88+
)
89+
90+
def _set_usage(self, chunk: ChatCompletionChunk) -> None:
91+
usage = getattr(chunk, "usage", None)
92+
if usage:
93+
self._self_completion_tokens = usage.completion_tokens
94+
self._self_prompt_tokens = usage.prompt_tokens
95+
96+
def _process_chunk(self, chunk: ChatCompletionChunk) -> None:
97+
self._set_response_id(chunk)
98+
self._set_response_model(chunk)
99+
self._set_response_service_tier(chunk)
100+
self._build_streaming_response(chunk)
101+
self._set_usage(chunk)
102+
103+
def _set_output_messages(self) -> None:
104+
if not self._self_capture_content: # optimization
105+
return
106+
output_messages = []
107+
for choice in self._self_choice_buffers:
108+
message = OutputMessage(
109+
role="assistant",
110+
finish_reason=choice.finish_reason or "error",
111+
parts=[],
112+
)
113+
if choice.text_content:
114+
message.parts.append(
115+
Text(content="".join(choice.text_content))
116+
)
117+
if choice.tool_calls_buffers:
118+
tool_calls = []
119+
for tool_call in filter(None, choice.tool_calls_buffers):
120+
arguments = None
121+
arguments_str = "".join(tool_call.arguments)
122+
if arguments_str:
123+
try:
124+
arguments = json.loads(arguments_str)
125+
except json.JSONDecodeError:
126+
arguments = arguments_str
127+
tool_call_part = ToolCallRequest(
128+
name=tool_call.function_name,
129+
id=tool_call.tool_call_id,
130+
arguments=arguments,
131+
)
132+
tool_calls.append(tool_call_part)
133+
message.parts.extend(tool_calls)
134+
output_messages.append(message)
135+
136+
self._self_invocation.output_messages = output_messages
137+
138+
def _on_stream_end(self) -> None:
139+
self._cleanup()
140+
141+
def _on_stream_error(self, error: BaseException) -> None:
142+
self._cleanup(error)
143+
144+
def parse(self) -> _ChatStreamMixin:
145+
"""Called when using with_raw_response with stream=True."""
146+
return self
147+
148+
def _cleanup(self, error: Optional[BaseException] = None) -> None:
149+
self._self_invocation.response_model_name = self._self_response_model
150+
self._self_invocation.response_id = self._self_response_id
151+
self._self_invocation.input_tokens = self._self_prompt_tokens
152+
self._self_invocation.output_tokens = self._self_completion_tokens
153+
finish_reasons = [
154+
choice.finish_reason
155+
for choice in self._self_choice_buffers
156+
if choice.finish_reason
157+
]
158+
if finish_reasons:
159+
self._self_invocation.finish_reasons = finish_reasons
160+
if self._self_service_tier:
161+
self._self_invocation.attributes.update(
162+
{
163+
OpenAIAttributes.OPENAI_RESPONSE_SERVICE_TIER: self._self_service_tier
164+
},
165+
)
166+
167+
self._set_output_messages()
168+
169+
if error:
170+
self._self_invocation.fail(error)
171+
else:
172+
self._self_invocation.stop()
173+
174+
175+
class ChatStreamWrapper(
176+
_ChatStreamMixin,
177+
SyncStreamWrapper[ChatCompletionChunk],
178+
):
179+
def __init__(
180+
self,
181+
stream: Stream[ChatCompletionChunk],
182+
invocation: InferenceInvocation,
183+
capture_content: bool,
184+
) -> None:
185+
super().__init__(stream)
186+
self._self_invocation = invocation
187+
self._self_choice_buffers = []
188+
self._self_capture_content = capture_content
189+
self._self_response_id = None
190+
self._self_response_model = None
191+
self._self_service_tier = None
192+
self._self_prompt_tokens = None
193+
self._self_completion_tokens = None
194+
195+
196+
class AsyncChatStreamWrapper(
197+
_ChatStreamMixin,
198+
AsyncStreamWrapper[ChatCompletionChunk],
199+
):
200+
def __init__(
201+
self,
202+
stream: AsyncStream[ChatCompletionChunk],
203+
invocation: InferenceInvocation,
204+
capture_content: bool,
205+
) -> None:
206+
super().__init__(stream)
207+
self._self_invocation = invocation
208+
self._self_choice_buffers = []
209+
self._self_capture_content = capture_content
210+
self._self_response_id = None
211+
self._self_response_model = None
212+
self._self_service_tier = None
213+
self._self_prompt_tokens = None
214+
self._self_completion_tokens = None
215+
216+
217+
__all__ = [
218+
"AsyncChatStreamWrapper",
219+
"ChatStreamWrapper",
220+
]

0 commit comments

Comments
 (0)