Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4cbf426
wip: stream method instrumentation.
eternalcuriouslearner Apr 26, 2026
4cfa251
wip: cleaning up files for linting and type check.
eternalcuriouslearner Apr 26, 2026
5bfc606
Merge branch 'main' into feat/anthropic-messages-stream-method-instru…
eternalcuriouslearner Apr 28, 2026
5c90a95
polish: changelog.
eternalcuriouslearner Apr 29, 2026
9f30bc1
Merge branch 'main' into feat/anthropic-messages-stream-method-instru…
eternalcuriouslearner May 3, 2026
f7549cd
wip: removing unwanted imports.
eternalcuriouslearner May 3, 2026
a8a9785
polish: pr feedback.
eternalcuriouslearner May 3, 2026
b3b1c35
polish: correcting tox errors.
eternalcuriouslearner May 3, 2026
ce69875
polish: copilot comments.
eternalcuriouslearner May 3, 2026
e270875
wip: copilot feedback.
eternalcuriouslearner May 3, 2026
753840c
polish: fixing lint and precommit.
eternalcuriouslearner May 3, 2026
fbb367a
wip: refining the code to meet new handler methods.
eternalcuriouslearner May 4, 2026
5819605
Merge branch 'main' into feat/anthropic-messages-stream-method-instru…
eternalcuriouslearner May 5, 2026
6cc1ca6
polish: removing quotes around data types and refining the agents.md …
eternalcuriouslearner May 5, 2026
27e69f8
polish: fixing precommit.
eternalcuriouslearner May 5, 2026
e97cacf
Update instrumentation-genai/opentelemetry-instrumentation-anthropic/…
eternalcuriouslearner May 6, 2026
72108db
Update wrappers.py
eternalcuriouslearner May 6, 2026
89b211a
Merge branch 'feat/anthropic-messages-stream-method-instrumentation' …
eternalcuriouslearner May 6, 2026
d273c3a
polish: fixed indentation.
eternalcuriouslearner May 6, 2026
0019183
polish: fixing tests.
eternalcuriouslearner May 6, 2026
f31e362
polish: removing safe_instrumentation.
eternalcuriouslearner May 8, 2026
4f7890f
Merge branch 'main' into feat/anthropic-messages-stream-method-instru…
eternalcuriouslearner May 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Add instrumentation for Anthropic `Messages.stream()` helper method
([#4499](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4499))
- Add async Anthropic message stream wrappers and manager wrappers, with wrapper
tests ([#4346](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4346))
- `AsyncMessagesStreamWrapper` for async message stream telemetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,16 @@
wrap_function_wrapper, # pyright: ignore[reportUnknownVariableType]
)

from opentelemetry.instrumentation.anthropic.package import _instruments
from opentelemetry.instrumentation.anthropic.patch import (
messages_create,
)
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.util.genai.handler import TelemetryHandler

from .package import _instruments
from .patch import (
messages_create,
messages_stream,
)


class AnthropicInstrumentor(BaseInstrumentor):
"""An instrumentor for the Anthropic Python SDK.
Expand Down Expand Up @@ -99,12 +101,17 @@ def _instrument(self, **kwargs: Any) -> None:
logger_provider=logger_provider,
)

# Patch Messages.create
# Patch Messages.create and Messages.stream
wrap_function_wrapper(
"anthropic.resources.messages",
"Messages.create",
messages_create(handler),
)
wrap_function_wrapper(
"anthropic.resources.messages",
"Messages.stream",
messages_stream(handler),
)

def _uninstrument(self, **kwargs: Any) -> None:
"""Disable Anthropic instrumentation.
Expand All @@ -117,3 +124,7 @@ def _uninstrument(self, **kwargs: Any) -> None:
anthropic.resources.messages.Messages, # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType,reportUnknownArgumentType]
"create",
)
unwrap(
anthropic.resources.messages.Messages, # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType,reportUnknownArgumentType]
"stream",
)
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
from opentelemetry.semconv._incubating.attributes import (
server_attributes as ServerAttributes,
)
from opentelemetry.util.genai.invocation import InferenceInvocation
from opentelemetry.util.genai.types import (
InputMessage,
LLMInvocation,
MessagePart,
OutputMessage,
)
Expand Down Expand Up @@ -155,7 +155,7 @@ def get_output_messages_from_message(


def set_invocation_response_attributes(
invocation: LLMInvocation,
invocation: InferenceInvocation,
message: Message | None,
capture_content: bool,
) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,7 @@
gen_ai_attributes as GenAIAttributes,
)
from opentelemetry.util.genai.handler import TelemetryHandler
from opentelemetry.util.genai.types import (
Error,
LLMInvocation, # TODO: migrate to InferenceInvocation
)
from opentelemetry.util.genai.utils import (
should_capture_content_on_spans_in_experimental_mode,
)
from opentelemetry.util.genai.invocation import InferenceInvocation

from .messages_extractors import (
extract_params,
Expand All @@ -41,11 +35,15 @@
get_system_instruction,
)
from .wrappers import (
MessagesStreamManagerWrapper,
MessagesStreamWrapper,
MessageWrapper,
)

if TYPE_CHECKING:
from anthropic.lib.streaming._messages import ( # pylint: disable=no-name-in-module
MessageStreamManager,
)
from anthropic.resources.messages import Messages
from anthropic.types import RawMessageStreamEvent

Expand All @@ -65,7 +63,6 @@ def messages_create(
],
]:
"""Wrap the `create` method of the `Messages` class to trace it."""
capture_content = should_capture_content_on_spans_in_experimental_mode()

def traced_method(
wrapped: Callable[
Expand All @@ -83,49 +80,88 @@ def traced_method(
"AnthropicStream[RawMessageStreamEvent]",
MessagesStreamWrapper[None],
]:
params = extract_params(*args, **kwargs)
attributes = get_llm_request_attributes(params, instance)
request_model_attribute = attributes.get(
GenAIAttributes.GEN_AI_REQUEST_MODEL
invocation, capture_content = _create_invocation(
handler, instance, args, kwargs
)
request_model = (
request_model_attribute
if isinstance(request_model_attribute, str)
else params.model
)

invocation = LLMInvocation(
request_model=request_model,
provider=ANTHROPIC,
input_messages=get_input_messages(params.messages)
if capture_content
else [],
system_instruction=get_system_instruction(params.system)
if capture_content
else [],
attributes=attributes,
)

# Use manual lifecycle management for both streaming and non-streaming
handler.start_llm(invocation)
try:
result = wrapped(*args, **kwargs)
if isinstance(result, AnthropicStream):
return MessagesStreamWrapper(
result, handler, invocation, capture_content
result, invocation, capture_content
)

wrapper = MessageWrapper(result, capture_content)
wrapper.extract_into(invocation)
handler.stop_llm(invocation)
invocation.stop()
return wrapper.message
except Exception as exc:
handler.fail_llm(
invocation, Error(message=str(exc), type=type(exc))
)
invocation.fail(exc)
raise

return cast(
'Callable[..., Union["AnthropicMessage", "AnthropicStream[RawMessageStreamEvent]", MessagesStreamWrapper[None]]]',
traced_method,
)


def _create_invocation(
handler: TelemetryHandler,
instance: "Messages",
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> tuple[InferenceInvocation, bool]:
should_capture_content = cast(
Comment thread
eternalcuriouslearner marked this conversation as resolved.
Outdated
"Callable[[], bool]", getattr(handler, "should_capture_content")
)
capture_content = should_capture_content()
params = extract_params(*args, **kwargs)
attributes = get_llm_request_attributes(params, instance)
request_model_attribute = attributes.get(
GenAIAttributes.GEN_AI_REQUEST_MODEL
)
request_model = (
request_model_attribute
if isinstance(request_model_attribute, str)
else params.model
)

invocation = handler.start_inference(
provider=ANTHROPIC,
request_model=request_model,
Comment thread
eternalcuriouslearner marked this conversation as resolved.
Comment thread
eternalcuriouslearner marked this conversation as resolved.
)
invocation.input_messages = (
get_input_messages(params.messages) if capture_content else []
)
invocation.system_instruction = (
get_system_instruction(params.system) if capture_content else []
)
invocation.attributes = attributes
return invocation, capture_content


def messages_stream(
handler: TelemetryHandler,
) -> Callable[..., MessagesStreamManagerWrapper[Any]]:
"""Wrap the sync `stream` method of the `Messages` class."""

Comment thread
eternalcuriouslearner marked this conversation as resolved.
def traced_method(
wrapped: Callable[..., "MessageStreamManager"],
instance: "Messages",
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> MessagesStreamManagerWrapper[Any]:
invocation, capture_content = _create_invocation(
handler, instance, args, kwargs
)

try:
return MessagesStreamManagerWrapper(
wrapped(*args, **kwargs), invocation, capture_content
)
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

messages_stream() starts an InferenceInvocation (which immediately starts a span and attaches it to the current context) before the underlying MessageStreamManager is entered. If a caller stores the manager and enters it later—or never enters it—this leaves the span/context attached for longer than the actual request (or indefinitely), which can corrupt parent/child relationships and leak context.

Consider deferring handler.start_inference(...) until the manager wrapper’s __enter__ (e.g., pass the handler + extracted params or a factory into MessagesStreamManagerWrapper and create/start the invocation inside __enter__, failing/stopping it there as needed).

Copilot uses AI. Check for mistakes.
except Exception as exc:
invocation.fail(exc)
raise

return cast(
Comment thread
eternalcuriouslearner marked this conversation as resolved.
"Callable[..., MessagesStreamManagerWrapper[Any]]", traced_method
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import base64
import json
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, cast

from anthropic.types import (
InputJSONDelta,
Expand Down Expand Up @@ -160,12 +160,9 @@ def _convert_content_block_to_part(
id=block.tool_use_id,
)

# ContentBlockParam variants are TypedDicts (dicts at runtime);
# newer SDK versions may add Pydantic block types not handled above.
if isinstance(block, dict):
return _convert_dict_block_to_part(block)

return None
if not hasattr(block, "get"):
return None
return _convert_dict_block_to_part(cast("Mapping[str, Any]", block))
Comment thread
eternalcuriouslearner marked this conversation as resolved.
Outdated
Comment thread
eternalcuriouslearner marked this conversation as resolved.
Outdated


def convert_content_to_parts(
Expand Down
Loading
Loading