Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
cb0156f
wip: move the streams to a generic ABC wrapper.
eternalcuriouslearner Apr 29, 2026
11258d9
polish: add changelogs.
eternalcuriouslearner Apr 29, 2026
6f4e510
wip: precommit.
eternalcuriouslearner Apr 29, 2026
787ecbb
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
eternalcuriouslearner Apr 29, 2026
8a273c5
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
eternalcuriouslearner Apr 30, 2026
07b4a66
wip: fix the merge issues.
eternalcuriouslearner Apr 30, 2026
c496ecf
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
lzchen Apr 30, 2026
250a7bf
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
eternalcuriouslearner Apr 30, 2026
04e067c
polish: pr feedback.
eternalcuriouslearner May 1, 2026
2157d28
polish: fixing the precommit.
eternalcuriouslearner May 1, 2026
fc3bd4d
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
eternalcuriouslearner May 1, 2026
8a8b9c7
wip: renamed to stream.py and changed finish_reasons population style.
eternalcuriouslearner May 2, 2026
ae2663b
wip: implementing copilot suggestions.
eternalcuriouslearner May 2, 2026
a246ee1
wip: fixing the tox.
eternalcuriouslearner May 2, 2026
26743fb
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
eternalcuriouslearner May 2, 2026
5689654
polish: copilot feedback.
eternalcuriouslearner May 3, 2026
c43b094
polish: copilot suggestions.
eternalcuriouslearner May 3, 2026
c06a7e7
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
eternalcuriouslearner May 5, 2026
57e2957
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
eternalcuriouslearner May 5, 2026
1cf0924
Update CLAUDE.md
eternalcuriouslearner May 6, 2026
46540b3
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
lmolkova May 6, 2026
3b31fc1
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
lzchen May 6, 2026
d60659a
wip: shifting the instructions to right agents.md file.
eternalcuriouslearner May 10, 2026
c4c3e0c
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
eternalcuriouslearner May 10, 2026
7ce1427
wip: using objectproxy.
eternalcuriouslearner May 11, 2026
a133e8d
Merge branch 'feat/move-stream-to-genai-util-as-abc' of https://githu…
eternalcuriouslearner May 11, 2026
17cd07c
fixing pypy errors.
eternalcuriouslearner May 11, 2026
a3faa51
wip: renaming functions based on events.
eternalcuriouslearner May 11, 2026
8fb9c83
polish: making chat_buffers and chat_wrappers type safe.
eternalcuriouslearner May 11, 2026
4759d81
polish: removed get_attr guard.
eternalcuriouslearner May 12, 2026
bca90da
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
eternalcuriouslearner May 12, 2026
d7b2bdd
polish: add types to chat_wrappers.py.
eternalcuriouslearner May 12, 2026
53650d5
Merge branch 'feat/move-stream-to-genai-util-as-abc' of https://githu…
eternalcuriouslearner May 12, 2026
b7cb6db
wip: use get_attr for new fields to avoid failures.
eternalcuriouslearner May 12, 2026
9782774
polish: fix lint and spdx headers.
eternalcuriouslearner May 12, 2026
ae0e6d2
polish:fixing pre-commit
eternalcuriouslearner May 12, 2026
b259a1e
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
aabmass May 12, 2026
7f1d7bd
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
eternalcuriouslearner May 12, 2026
d8cb7cc
update oldest deps
lmolkova May 12, 2026
6916805
Merge branch 'main' into feat/move-stream-to-genai-util-as-abc
lmolkova May 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ Apply to packages under `instrumentation/` and `instrumentation-genai/`.
- When catching exceptions from the underlying library to record telemetry, always re-raise the
original exception unmodified.
- Do not raise new exceptions in instrumentation/telemetry code.
- For GenAI streaming wrappers, prefer the shared `SyncStreamWrapper` and `AsyncStreamWrapper`
Comment thread
eternalcuriouslearner marked this conversation as resolved.
Outdated
helpers from `opentelemetry.util.genai._stream` instead of reimplementing iteration,
close/context-manager, and finalization behavior in provider packages.
- Put provider-specific chunk parsing and telemetry finalization in private hook methods or a
narrow mixin. Do not make async stream wrappers inherit from sync stream wrappers.

### Semantic conventions

Expand Down
7 changes: 7 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
@AGENTS.md

For GenAI streaming wrappers, use `SyncStreamWrapper` and
`AsyncStreamWrapper` from `opentelemetry.util.genai._stream` instead of
reimplementing iteration, close/context-manager, and finalization behavior in
provider packages. Keep provider-specific chunk parsing and telemetry
finalization in private hooks or a narrow mixin, and do not make async stream
wrappers inherit from sync stream wrappers.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Refactor chat completion stream wrappers to use shared GenAI stream lifecycle helpers.
([#4500](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4500))

- Migrate experimental path from deprecated `LLMInvocation` to `InferenceInvocation`,
using `handler.start_inference()` and `invocation.stop()`/`invocation.fail()` directly
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class ToolCallBuffer:
def __init__(self, index, tool_call_id, function_name):
Comment thread
eternalcuriouslearner marked this conversation as resolved.
Outdated
self.index = index
self.function_name = function_name
self.tool_call_id = tool_call_id
self.arguments = []

def append_arguments(self, arguments):
if arguments is not None:
Comment thread
aabmass marked this conversation as resolved.
self.arguments.append(arguments)


class ChoiceBuffer:
def __init__(self, index):
self.index = index
self.finish_reason = None
self.text_content = []
self.tool_calls_buffers = []

def append_text_content(self, content):
self.text_content.append(content)

def append_tool_call(self, tool_call):
idx = tool_call.index
for _ in range(len(self.tool_calls_buffers), idx + 1):
self.tool_calls_buffers.append(None)

function = tool_call.function
if not self.tool_calls_buffers[idx]:
self.tool_calls_buffers[idx] = ToolCallBuffer(
idx,
tool_call.id,
function.name if function else None,
)

if function:
self.tool_calls_buffers[idx].append_arguments(function.arguments)
Comment thread
eternalcuriouslearner marked this conversation as resolved.
Outdated
Comment thread
eternalcuriouslearner marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import json
from typing import Any, Optional

from openai import AsyncStream, Stream

from opentelemetry.semconv._incubating.attributes import (
openai_attributes as OpenAIAttributes,
)
from opentelemetry.util.genai._stream import (
AsyncStreamWrapper,
SyncStreamWrapper,
)
Comment thread
eternalcuriouslearner marked this conversation as resolved.
Outdated
from opentelemetry.util.genai.invocation import InferenceInvocation
from opentelemetry.util.genai.types import (
OutputMessage,
Text,
ToolCallRequest,
)

from .chat_buffers import ChoiceBuffer


class _ChatStreamMixin:
"""Chat-specific hooks shared by sync and async stream wrappers."""

invocation: InferenceInvocation
capture_content: bool
choice_buffers: list
response_id: Optional[str] = None
response_model: Optional[str] = None
service_tier: Optional[str] = None
finish_reasons: list = []
prompt_tokens: Optional[int] = None
completion_tokens: Optional[int] = None
Comment thread
eternalcuriouslearner marked this conversation as resolved.
Outdated

def _set_response_model(self, chunk):
if self.response_model:
return

if getattr(chunk, "model", None):
self.response_model = chunk.model
Comment thread
aabmass marked this conversation as resolved.
Outdated

def _set_response_id(self, chunk):
if self.response_id:
return

if getattr(chunk, "id", None):
self.response_id = chunk.id

def _set_response_service_tier(self, chunk):
if self.service_tier:
return

if getattr(chunk, "service_tier", None):
self.service_tier = chunk.service_tier

def _build_streaming_response(self, chunk):
if getattr(chunk, "choices", None) is None:
return

choices = chunk.choices
for choice in choices:
if not choice.delta:
continue

for idx in range(len(self.choice_buffers), choice.index + 1):
self.choice_buffers.append(ChoiceBuffer(idx))

if choice.finish_reason:
self.choice_buffers[
choice.index
].finish_reason = choice.finish_reason

if choice.delta.content is not None:
self.choice_buffers[choice.index].append_text_content(
choice.delta.content
)

if choice.delta.tool_calls is not None:
for tool_call in choice.delta.tool_calls:
self.choice_buffers[choice.index].append_tool_call(
tool_call
)

def _set_usage(self, chunk):
if getattr(chunk, "usage", None):
self.completion_tokens = chunk.usage.completion_tokens
self.prompt_tokens = chunk.usage.prompt_tokens

def _process_chunk(self, chunk):
self._set_response_id(chunk)
self._set_response_model(chunk)
self._set_response_service_tier(chunk)
self._build_streaming_response(chunk)
self._set_usage(chunk)

def _set_output_messages(self):
if not self.capture_content: # optimization
return
output_messages = []
for choice in self.choice_buffers:
message = OutputMessage(
role="assistant",
finish_reason=choice.finish_reason or "error",
parts=[],
)
if choice.text_content:
message.parts.append(
Text(content="".join(choice.text_content))
)
if choice.tool_calls_buffers:
tool_calls = []
for tool_call in choice.tool_calls_buffers:
arguments = None
arguments_str = "".join(tool_call.arguments)
if arguments_str:
try:
arguments = json.loads(arguments_str)
except json.JSONDecodeError:
arguments = arguments_str
Comment thread
lmolkova marked this conversation as resolved.
tool_call_part = ToolCallRequest(
name=tool_call.function_name,
id=tool_call.tool_call_id,
arguments=arguments,
)
tool_calls.append(tool_call_part)
message.parts.extend(tool_calls)
output_messages.append(message)

self.invocation.output_messages = output_messages

def _stop_stream(self) -> None:
self._cleanup()

def _fail_stream(self, error: BaseException) -> None:
self._cleanup(error)

def parse(self):
"""Called when using with_raw_response with stream=True."""
return self

def _cleanup(self, error: Optional[BaseException] = None) -> None:
self.invocation.response_model_name = self.response_model
self.invocation.response_id = self.response_id
self.invocation.input_tokens = self.prompt_tokens
self.invocation.output_tokens = self.completion_tokens
# TODO: Derive finish_reasons from choice_buffers so streaming
# invocations match non-streaming response finalization.
self.invocation.finish_reasons = self.finish_reasons
if self.service_tier:
Comment thread
eternalcuriouslearner marked this conversation as resolved.
Outdated
self.invocation.attributes.update(
{
OpenAIAttributes.OPENAI_RESPONSE_SERVICE_TIER: self.service_tier
},
)

self._set_output_messages()

if error:
self.invocation.fail(error)
else:
self.invocation.stop()


class ChatStreamWrapper(
_ChatStreamMixin,
SyncStreamWrapper[Any],
):
def __init__(
self,
stream: Stream,
invocation: InferenceInvocation,
capture_content: bool,
):
super().__init__(stream)
self.invocation = invocation
self.choice_buffers = []
self.capture_content = capture_content


class AsyncChatStreamWrapper(
_ChatStreamMixin,
AsyncStreamWrapper[Any],
):
def __init__(
self,
stream: AsyncStream,
invocation: InferenceInvocation,
capture_content: bool,
):
super().__init__(stream)
self.invocation = invocation
self.choice_buffers = []
self.capture_content = capture_content


__all__ = [
"AsyncChatStreamWrapper",
"ChatStreamWrapper",
]
Loading
Loading