Skip to content

Commit 04e067c

Browse files
polish: pr feedback.
1 parent 250a7bf commit 04e067c

4 files changed

Lines changed: 67 additions & 36 deletions

File tree

AGENTS.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@ Apply to packages under `instrumentation/` and `instrumentation-genai/`.
7676
- When catching exceptions from the underlying library to record telemetry, always re-raise the
7777
original exception unmodified.
7878
- Do not raise new exceptions in instrumentation/telemetry code.
79+
- For GenAI streaming wrappers, prefer the shared `SyncStreamWrapper` and `AsyncStreamWrapper`
80+
helpers from `opentelemetry.util.genai._stream` instead of reimplementing iteration,
81+
close/context-manager, and finalization behavior in provider packages.
82+
- Put provider-specific chunk parsing and telemetry finalization in private hook methods or a
83+
narrow mixin. Do not make async stream wrappers inherit from sync stream wrappers.
7984

8085
### Semantic conventions
8186

CLAUDE.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,8 @@
11
@AGENTS.md
2+
3+
For GenAI streaming wrappers, use `SyncStreamWrapper` and
4+
`AsyncStreamWrapper` from `opentelemetry.util.genai._stream` instead of
5+
reimplementing iteration, close/context-manager, and finalization behavior in
6+
provider packages. Keep provider-specific chunk parsing and telemetry
7+
finalization in private hooks or a narrow mixin, and do not make async stream
8+
wrappers inherit from sync stream wrappers.

instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/chat_wrappers.py

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import json
1818
from typing import Any, Optional
1919

20-
from openai import Stream
20+
from openai import AsyncStream, Stream
2121

2222
from opentelemetry.semconv._incubating.attributes import (
2323
openai_attributes as OpenAIAttributes,
@@ -36,28 +36,19 @@
3636
from .chat_buffers import ChoiceBuffer
3737

3838

39-
class ChatStreamWrapper(SyncStreamWrapper[Any]):
39+
class _ChatStreamMixin:
40+
"""Chat-specific hooks shared by sync and async stream wrappers."""
41+
4042
invocation: InferenceInvocation
43+
capture_content: bool
44+
choice_buffers: list
4145
response_id: Optional[str] = None
4246
response_model: Optional[str] = None
4347
service_tier: Optional[str] = None
4448
finish_reasons: list = []
4549
prompt_tokens: Optional[int] = None
4650
completion_tokens: Optional[int] = None
4751

48-
def __init__(
49-
self,
50-
stream: Stream,
51-
invocation: InferenceInvocation,
52-
capture_content: bool,
53-
):
54-
SyncStreamWrapper.__init__(self, stream)
55-
self.stream = stream
56-
self.invocation = invocation
57-
self.choice_buffers = []
58-
self._started = True
59-
self.capture_content = capture_content
60-
6152
def _set_response_model(self, chunk):
6253
if self.response_model:
6354
return
@@ -155,19 +146,16 @@ def _set_output_messages(self):
155146
self.invocation.output_messages = output_messages
156147

157148
def _stop_stream(self) -> None:
158-
self.cleanup()
149+
self._cleanup()
159150

160151
def _fail_stream(self, error: BaseException) -> None:
161-
self.cleanup(error)
152+
self._cleanup(error)
162153

163154
def parse(self):
164155
"""Called when using with_raw_response with stream=True."""
165156
return self
166157

167-
def cleanup(self, error: Optional[BaseException] = None):
168-
if not self._started:
169-
return
170-
158+
def _cleanup(self, error: Optional[BaseException] = None) -> None:
171159
self.invocation.response_model_name = self.response_model
172160
self.invocation.response_id = self.response_id
173161
self.invocation.input_tokens = self.prompt_tokens
@@ -188,31 +176,39 @@ def cleanup(self, error: Optional[BaseException] = None):
188176
self.invocation.fail(error)
189177
else:
190178
self.invocation.stop()
191-
self._started = False
192179

193180

194-
class AsyncChatStreamWrapper(
195-
AsyncStreamWrapper[Any],
196-
ChatStreamWrapper,
181+
class ChatStreamWrapper(
182+
_ChatStreamMixin,
183+
SyncStreamWrapper[Any],
197184
):
198-
invocation: InferenceInvocation
199-
200185
def __init__(
201186
self,
202187
stream: Stream,
203188
invocation: InferenceInvocation,
204189
capture_content: bool,
205190
):
206-
ChatStreamWrapper.__init__(self, stream, invocation, capture_content)
191+
super().__init__(stream)
192+
self.invocation = invocation
193+
self.choice_buffers = []
194+
self.capture_content = capture_content
207195

208-
def _process_chunk(self, chunk):
209-
ChatStreamWrapper._process_chunk(self, chunk)
210196

211-
def _stop_stream(self) -> None:
212-
ChatStreamWrapper._stop_stream(self)
197+
class AsyncChatStreamWrapper(
198+
_ChatStreamMixin,
199+
AsyncStreamWrapper[Any],
200+
):
213201

214-
def _fail_stream(self, error: BaseException) -> None:
215-
ChatStreamWrapper._fail_stream(self, error)
202+
def __init__(
203+
self,
204+
stream: AsyncStream,
205+
invocation: InferenceInvocation,
206+
capture_content: bool,
207+
):
208+
super().__init__(stream)
209+
self.invocation = invocation
210+
self.choice_buffers = []
211+
self.capture_content = capture_content
216212

217213

218214
__all__ = [

util/opentelemetry-util-genai/src/opentelemetry/util/genai/_stream.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,18 @@
2424

2525

2626
class SyncStreamWrapper(ABC, Generic[ChunkT]):
27-
"""Base class for synchronous instrumented stream wrappers."""
27+
"""Base class for synchronous instrumented stream wrappers.
28+
29+
Subclass this when wrapping a provider SDK stream that is consumed with
30+
normal iteration. The subclass should pass the SDK stream to
31+
``super().__init__(stream)`` and implement the three telemetry hooks:
32+
``_process_chunk`` for per-chunk state, ``_stop_stream`` for successful
33+
finalization, and ``_fail_stream`` for failure finalization.
34+
35+
Users should consume subclasses as normal streams, for example with
36+
``for chunk in wrapper`` or ``with wrapper``. The hook methods are called
37+
internally by the wrapper lifecycle and are not part of the public API.
38+
"""
2839

2940
def __init__(self, stream: Any):
3041
self.stream = stream
@@ -106,7 +117,19 @@ def _handle_process_chunk_error(_error: Exception) -> None:
106117

107118

108119
class AsyncStreamWrapper(ABC, Generic[ChunkT]):
109-
"""Base class for asynchronous instrumented stream wrappers."""
120+
"""Base class for asynchronous instrumented stream wrappers.
121+
122+
Subclass this when wrapping a provider SDK stream that is consumed with
123+
async iteration. The subclass should pass the SDK stream to
124+
``super().__init__(stream)`` and implement the three telemetry hooks:
125+
``_process_chunk`` for per-chunk state, ``_stop_stream`` for successful
126+
finalization, and ``_fail_stream`` for failure finalization.
127+
128+
Users should consume subclasses as normal async streams, for example with
129+
``async for chunk in wrapper`` or ``async with wrapper``. The hook methods
130+
remain synchronous telemetry hooks; async stream reads and close handling
131+
are owned by this base class.
132+
"""
110133

111134
def __init__(self, stream: Any):
112135
self.stream = stream

0 commit comments

Comments
 (0)