Skip to content

Commit 67b8249

Browse files
committed
Merge remote-tracking branch 'upstream/main' into genai-utils/agent-invocation
2 parents a310d6d + 196d088 commit 67b8249

12 files changed

Lines changed: 457 additions & 46 deletions

File tree

.github/workflows/stale.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,5 @@ jobs:
3333
This PR has been closed due to inactivity. Please reopen if you would
3434
like to continue working on it.
3535
exempt-pr-labels: "hold,WIP,blocked-by-spec,do not merge"
36+
# TODO: Revert back to default of 30 after we have cleared the backlog of stale PRs.
37+
operations-per-run: 500

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111
1212
## Unreleased
1313

14+
### Added
15+
16+
- `opentelemetry-instrumentation-confluent-kafka`: Loosen confluent-kafka upper bound to <3.0.0
17+
([#4289](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4289))
18+
1419
### Fixed
1520

1621
- Fix intermittent `Core Contrib Test` CI failures caused by GitHub git CDN SHA propagation lag by installing core packages from the already-checked-out local copy instead of a second git clone

docs-requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ botocore~=1.0
2727
boto3~=1.0
2828
cassandra-driver~=3.25
2929
celery>=4.0
30-
confluent-kafka>= 1.8.2,<= 2.13.0
30+
confluent-kafka>= 1.8.2,< 3.0.0
3131
elasticsearch>=6.0,<9.0
3232
flask~=2.0
3333
falcon~=2.0

instrumentation-genai/opentelemetry-instrumentation-openai-v2/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313
`OTEL_SEMCONV_STABILITY_OPT_IN` to `gen_ai_latest_experimental` to enable.
1414
Add dependency on `opentelemetry-util-genai` pypi package.
1515
([#3715](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3715))
16+
- Add wrappers for OpenAI Responses API streams and response stream managers
17+
([#4280](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4280))
1618

1719
## Version 2.3b0 (2025-12-24)
1820

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
"""Wrappers for OpenAI Responses API streams and stream managers."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
from contextlib import ExitStack, contextmanager
7+
from types import TracebackType
8+
from typing import TYPE_CHECKING, Callable, Generator, Generic, TypeVar
9+
10+
from opentelemetry.util.genai.handler import TelemetryHandler
11+
from opentelemetry.util.genai.types import Error, LLMInvocation
12+
13+
# OpenAI Responses internals are version-gated (added in openai>=1.66.0), so
14+
# pylint may not resolve them in all lint environments even though we guard
15+
# runtime usage with ImportError fallbacks below.
16+
try:
17+
from openai.lib.streaming.responses._events import ( # pylint: disable=no-name-in-module
18+
ResponseCompletedEvent,
19+
)
20+
from openai.types.responses import ( # pylint: disable=no-name-in-module
21+
ResponseCreatedEvent,
22+
ResponseErrorEvent,
23+
ResponseFailedEvent,
24+
ResponseIncompleteEvent,
25+
ResponseInProgressEvent,
26+
)
27+
28+
_RESPONSE_EVENTS_WITH_RESPONSE = (
29+
ResponseCreatedEvent,
30+
ResponseInProgressEvent,
31+
ResponseFailedEvent,
32+
ResponseIncompleteEvent,
33+
ResponseCompletedEvent,
34+
)
35+
except ImportError:
36+
ResponseCompletedEvent = None
37+
ResponseCreatedEvent = None
38+
ResponseErrorEvent = None
39+
ResponseFailedEvent = None
40+
ResponseIncompleteEvent = None
41+
ResponseInProgressEvent = None
42+
_RESPONSE_EVENTS_WITH_RESPONSE = ()
43+
44+
try:
45+
from opentelemetry.instrumentation.openai_v2.response_extractors import ( # pylint: disable=no-name-in-module
46+
_set_invocation_response_attributes,
47+
)
48+
except ImportError:
49+
_set_invocation_response_attributes = None
50+
51+
if TYPE_CHECKING:
52+
from openai.lib.streaming.responses._events import ( # pylint: disable=no-name-in-module
53+
ResponseStreamEvent,
54+
)
55+
from openai.lib.streaming.responses._responses import (
56+
ResponseStream,
57+
ResponseStreamManager,
58+
) # pylint: disable=no-name-in-module
59+
from openai.types.responses import ( # pylint: disable=no-name-in-module
60+
ParsedResponse,
61+
Response,
62+
)
63+
64+
_logger = logging.getLogger(__name__)
65+
TextFormatT = TypeVar("TextFormatT")
66+
ResponseT = TypeVar("ResponseT")
67+
68+
69+
def _set_response_attributes(
70+
invocation: "LLMInvocation",
71+
result: "ParsedResponse[TextFormatT] | Response | None",
72+
capture_content: bool,
73+
) -> None:
74+
if _set_invocation_response_attributes is None:
75+
return
76+
_set_invocation_response_attributes(invocation, result, capture_content)
77+
78+
79+
class _ResponseProxy(Generic[ResponseT]):
80+
def __init__(self, response: ResponseT, finalize: Callable[[], None]):
81+
self._response = response
82+
self._finalize = finalize
83+
84+
def close(self) -> None:
85+
try:
86+
self._response.close()
87+
finally:
88+
self._finalize()
89+
90+
def __getattr__(self, name: str):
91+
return getattr(self._response, name)
92+
93+
94+
class ResponseStreamWrapper(Generic[TextFormatT]):
95+
"""Wrapper for OpenAI Responses API stream objects.
96+
97+
Wraps ResponseStream from the OpenAI SDK:
98+
https://github.com/openai/openai-python/blob/656e3cab4a18262a49b961d41293367e45ee71b9/src/openai/_streaming.py#L55
99+
"""
100+
101+
def __init__(
102+
self,
103+
stream: "ResponseStream[TextFormatT]",
104+
handler: TelemetryHandler,
105+
invocation: "LLMInvocation",
106+
capture_content: bool,
107+
):
108+
self.stream = stream
109+
self.handler = handler
110+
self.invocation = invocation
111+
self._capture_content = capture_content
112+
self._finalized = False
113+
114+
def __enter__(self) -> "ResponseStreamWrapper":
115+
return self
116+
117+
def __exit__(
118+
self,
119+
exc_type: type[BaseException] | None,
120+
exc_val: BaseException | None,
121+
exc_tb: TracebackType | None,
122+
) -> bool:
123+
try:
124+
if exc_type is not None:
125+
self._fail(
126+
str(exc_val), type(exc_val) if exc_val else Exception
127+
)
128+
finally:
129+
self.close()
130+
return False
131+
132+
def close(self) -> None:
133+
try:
134+
self.stream.close()
135+
finally:
136+
self._stop(None)
137+
138+
def __iter__(self) -> "ResponseStreamWrapper":
139+
return self
140+
141+
def __next__(self) -> "ResponseStreamEvent[TextFormatT]":
142+
try:
143+
event = next(self.stream)
144+
except StopIteration:
145+
self._stop(None)
146+
raise
147+
except Exception as error:
148+
self._fail(str(error), type(error))
149+
raise
150+
with self._safe_instrumentation("event processing"):
151+
self.process_event(event)
152+
return event
153+
154+
def get_final_response(self) -> "ParsedResponse[TextFormatT]":
155+
self.until_done()
156+
return self.stream.get_final_response()
157+
158+
def until_done(self) -> "ResponseStreamWrapper":
159+
for _ in self:
160+
pass
161+
return self
162+
163+
def parse(self) -> "ResponseStreamWrapper":
164+
raise NotImplementedError(
165+
"ResponseStreamWrapper.parse() is not implemented"
166+
)
167+
168+
# TODO: Replace __getattr__ passthrough with wrapt.ObjectProxy in a future
169+
# cleanup once wrapt 2 typing support is available (wrapt PR #3903).
170+
def __getattr__(self, name: str):
171+
return getattr(self.stream, name)
172+
173+
@property
174+
def response(self):
175+
response = self.stream.response
176+
if response is None:
177+
return None
178+
return _ResponseProxy(response, lambda: self._stop(None))
179+
180+
def _stop(
181+
self, result: "ParsedResponse[TextFormatT] | Response | None"
182+
) -> None:
183+
if self._finalized:
184+
return
185+
with self._safe_instrumentation("response attribute extraction"):
186+
_set_response_attributes(
187+
self.invocation, result, self._capture_content
188+
)
189+
with self._safe_instrumentation("stop_llm"):
190+
self.handler.stop_llm(self.invocation)
191+
self._finalized = True
192+
193+
def _fail(self, message: str, error_type: type[BaseException]) -> None:
194+
if self._finalized:
195+
return
196+
with self._safe_instrumentation("fail_llm"):
197+
self.handler.fail_llm(
198+
self.invocation, Error(message=message, type=error_type)
199+
)
200+
self._finalized = True
201+
202+
@staticmethod
203+
@contextmanager
204+
def _safe_instrumentation(context: str) -> Generator[None, None, None]:
205+
try:
206+
yield
207+
except Exception: # pylint: disable=broad-exception-caught
208+
_logger.debug(
209+
"OpenAI responses instrumentation error during %s",
210+
context,
211+
exc_info=True,
212+
stacklevel=2,
213+
)
214+
215+
def process_event(self, event: "ResponseStreamEvent[TextFormatT]") -> None:
216+
event_type = event.type
217+
response: "ParsedResponse[TextFormatT] | Response | None" = None
218+
219+
if isinstance(event, _RESPONSE_EVENTS_WITH_RESPONSE):
220+
response = event.response
221+
222+
if response and not self.invocation.request_model:
223+
model = response.model
224+
if model:
225+
self.invocation.request_model = model
226+
227+
if isinstance(event, ResponseCompletedEvent):
228+
self._stop(response)
229+
return
230+
231+
if isinstance(event, (ResponseFailedEvent, ResponseIncompleteEvent)):
232+
with self._safe_instrumentation("response attribute extraction"):
233+
_set_response_attributes(
234+
self.invocation, response, self._capture_content
235+
)
236+
self._fail(event_type, RuntimeError)
237+
return
238+
239+
if isinstance(event, ResponseErrorEvent):
240+
error_type = event.code or "response.error"
241+
message = event.message or error_type
242+
self._fail(message, RuntimeError)
243+
244+
245+
class ResponseStreamManagerWrapper(Generic[TextFormatT]):
246+
"""Wrapper for OpenAI Responses API stream managers.
247+
248+
Wraps ResponseStreamManager from the OpenAI SDK:
249+
https://github.com/openai/openai-python/blob/656e3cab4a18262a49b961d41293367e45ee71b9/src/openai/lib/streaming/responses/_responses.py#L95
250+
"""
251+
252+
def __init__(
253+
self,
254+
manager: "ResponseStreamManager[TextFormatT]",
255+
handler: TelemetryHandler,
256+
invocation: "LLMInvocation",
257+
capture_content: bool,
258+
):
259+
self._manager = manager
260+
self._handler = handler
261+
self._invocation = invocation
262+
self._capture_content = capture_content
263+
self._stream_wrapper: ResponseStreamWrapper[TextFormatT] | None = None
264+
265+
def __enter__(self) -> ResponseStreamWrapper[TextFormatT]:
266+
stream = self._manager.__enter__()
267+
self._stream_wrapper = ResponseStreamWrapper(
268+
stream,
269+
self._handler,
270+
self._invocation,
271+
self._capture_content,
272+
)
273+
return self._stream_wrapper
274+
275+
def __exit__(
276+
self,
277+
exc_type: type[BaseException] | None,
278+
exc_val: BaseException | None,
279+
exc_tb: TracebackType | None,
280+
) -> bool:
281+
suppressed = False
282+
stream_wrapper = self._stream_wrapper
283+
self._stream_wrapper = None
284+
with ExitStack() as cleanup:
285+
if stream_wrapper is not None:
286+
287+
def finalize_stream_wrapper() -> None:
288+
if suppressed:
289+
stream_wrapper.__exit__(None, None, None)
290+
else:
291+
stream_wrapper.__exit__(exc_type, exc_val, exc_tb)
292+
293+
cleanup.callback(finalize_stream_wrapper)
294+
suppressed = self._manager.__exit__(exc_type, exc_val, exc_tb)
295+
return suppressed
296+
297+
def parse(self) -> "ResponseStreamManagerWrapper[TextFormatT]":
298+
raise NotImplementedError(
299+
"ResponseStreamManagerWrapper.parse() is not implemented"
300+
)
301+
302+
# TODO: Replace __getattr__ passthrough with wrapt.ObjectProxy in a future
303+
# cleanup once wrapt 2 typing support is available (wrapt PR #3903).
304+
def __getattr__(self, name: str):
305+
return getattr(self._manager, name)

0 commit comments

Comments
 (0)