Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
4cbf426
wip: stream method instrumentation.
eternalcuriouslearner Apr 26, 2026
4cfa251
wip: cleaning up files for linting and type check.
eternalcuriouslearner Apr 26, 2026
5bfc606
Merge branch 'main' into feat/anthropic-messages-stream-method-instru…
eternalcuriouslearner Apr 28, 2026
5c90a95
polish: changelog.
eternalcuriouslearner Apr 29, 2026
9f30bc1
Merge branch 'main' into feat/anthropic-messages-stream-method-instru…
eternalcuriouslearner May 3, 2026
f7549cd
wip: removing unwanted imports.
eternalcuriouslearner May 3, 2026
a8a9785
polish: pr feedback.
eternalcuriouslearner May 3, 2026
b3b1c35
polish: correcting tox errors.
eternalcuriouslearner May 3, 2026
ce69875
polish: copilot comments.
eternalcuriouslearner May 3, 2026
e270875
wip: copilot feedback.
eternalcuriouslearner May 3, 2026
753840c
polish: fixing lint and precommit.
eternalcuriouslearner May 3, 2026
fbb367a
wip: refining the code to meet new handler methods.
eternalcuriouslearner May 4, 2026
5819605
Merge branch 'main' into feat/anthropic-messages-stream-method-instru…
eternalcuriouslearner May 5, 2026
6cc1ca6
polish: removing quotes around data types and refining the agents.md …
eternalcuriouslearner May 5, 2026
27e69f8
polish: fixing precommit.
eternalcuriouslearner May 5, 2026
e97cacf
Update instrumentation-genai/opentelemetry-instrumentation-anthropic/…
eternalcuriouslearner May 6, 2026
72108db
Update wrappers.py
eternalcuriouslearner May 6, 2026
89b211a
Merge branch 'feat/anthropic-messages-stream-method-instrumentation' …
eternalcuriouslearner May 6, 2026
d273c3a
polish: fixed indentation.
eternalcuriouslearner May 6, 2026
0019183
polish: fixing tests.
eternalcuriouslearner May 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ uv run tox -e typecheck
- The monorepo uses `uv` workspaces.
- `tox.ini` defines the test matrix - check it for available test environments.
- Do not add `type: ignore` comments. If a type error arises, solve it properly or write a follow-up plan to address it in another PR.
- When a file uses `from __future__ import annotations`, do not quote type annotations just to
avoid forward references. Keep quotes only for expressions still evaluated at runtime, such as
`typing.cast(...)`, unless the referenced type is imported at runtime.
- Whenever applicable, all code changes should have tests that actually validate the changes.

## Instrumentation rules
Expand Down
5 changes: 5 additions & 0 deletions instrumentation-genai/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ except Exception as exc:
raise
```

Content capture decisions must come from the shared handler, not from instrumentation-local
environment checks or duplicated helper logic. Evaluate the handler's content-capture API once
when creating wrappers (for example, `capture_content = handler.should_capture_content()`) and
pass that value through invocation/request helpers.

## 4. Semantic conventions

Attributes, spans, events, and metrics follow the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Add instrumentation for Anthropic `Messages.stream()` helper method
([#4499](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4499))
- Add async Anthropic message stream wrappers and manager wrappers, with wrapper
tests ([#4346](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4346))
- `AsyncMessagesStreamWrapper` for async message stream telemetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,16 @@
wrap_function_wrapper, # pyright: ignore[reportUnknownVariableType]
)

from opentelemetry.instrumentation.anthropic.package import _instruments
from opentelemetry.instrumentation.anthropic.patch import (
messages_create,
)
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.util.genai.handler import TelemetryHandler

from .package import _instruments
from .patch import (
messages_create,
messages_stream,
)


class AnthropicInstrumentor(BaseInstrumentor):
"""An instrumentor for the Anthropic Python SDK.
Expand Down Expand Up @@ -99,12 +101,17 @@ def _instrument(self, **kwargs: Any) -> None:
logger_provider=logger_provider,
)

# Patch Messages.create
# Patch Messages.create and Messages.stream
wrap_function_wrapper(
"anthropic.resources.messages",
"Messages.create",
messages_create(handler),
)
wrap_function_wrapper(
"anthropic.resources.messages",
"Messages.stream",
messages_stream(handler),
)

def _uninstrument(self, **kwargs: Any) -> None:
"""Disable Anthropic instrumentation.
Expand All @@ -117,3 +124,7 @@ def _uninstrument(self, **kwargs: Any) -> None:
anthropic.resources.messages.Messages, # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType,reportUnknownArgumentType]
"create",
)
unwrap(
anthropic.resources.messages.Messages, # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType,reportUnknownArgumentType]
"stream",
)
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
from opentelemetry.semconv._incubating.attributes import (
server_attributes as ServerAttributes,
)
from opentelemetry.util.genai.invocation import InferenceInvocation
from opentelemetry.util.genai.types import (
InputMessage,
LLMInvocation,
MessagePart,
OutputMessage,
)
Expand Down Expand Up @@ -155,7 +155,7 @@ def get_output_messages_from_message(


def set_invocation_response_attributes(
invocation: LLMInvocation,
invocation: InferenceInvocation,
message: Message | None,
capture_content: bool,
) -> None:
Expand Down Expand Up @@ -220,18 +220,15 @@ def extract_params( # pylint: disable=too-many-locals
)


def _set_server_address_and_port(
def get_server_address_and_port(
client_instance: "Messages",
attributes: dict[str, AttributeValue | None],
) -> None:
) -> tuple[str | None, int | None]:
base_url = client_instance._client.base_url
host = base_url.host
if host:
attributes[ServerAttributes.SERVER_ADDRESS] = host

port = base_url.port
if port and port != 443 and port > 0:
attributes[ServerAttributes.SERVER_PORT] = port
return (
base_url.host or None,
port if port and port != 443 and port > 0 else None,
)


def get_llm_request_attributes(
Expand All @@ -247,5 +244,9 @@ def get_llm_request_attributes(
GenAIAttributes.GEN_AI_REQUEST_TOP_K: params.top_k,
GenAIAttributes.GEN_AI_REQUEST_STOP_SEQUENCES: params.stop_sequences,
}
_set_server_address_and_port(client_instance, attributes)
address, port = get_server_address_and_port(client_instance)
if address is not None:
attributes[ServerAttributes.SERVER_ADDRESS] = address
if port is not None:
attributes[ServerAttributes.SERVER_PORT] = port
return {k: v for k, v in attributes.items() if v is not None}
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,25 @@
gen_ai_attributes as GenAIAttributes,
)
from opentelemetry.util.genai.handler import TelemetryHandler
from opentelemetry.util.genai.types import (
Error,
LLMInvocation, # TODO: migrate to InferenceInvocation
)
from opentelemetry.util.genai.utils import (
should_capture_content_on_spans_in_experimental_mode,
)
from opentelemetry.util.genai.invocation import InferenceInvocation

from .messages_extractors import (
extract_params,
get_input_messages,
get_llm_request_attributes,
get_server_address_and_port,
get_system_instruction,
)
from .wrappers import (
MessagesStreamManagerWrapper,
MessagesStreamWrapper,
MessageWrapper,
)

if TYPE_CHECKING:
from anthropic.lib.streaming._messages import ( # pylint: disable=no-name-in-module
MessageStreamManager,
)
from anthropic.resources.messages import Messages
from anthropic.types import RawMessageStreamEvent

Expand All @@ -59,73 +58,113 @@ def messages_create(
) -> Callable[
...,
Union[
"AnthropicMessage",
"AnthropicStream[RawMessageStreamEvent]",
AnthropicMessage,
AnthropicStream[RawMessageStreamEvent],
MessagesStreamWrapper[None],
],
]:
"""Wrap the `create` method of the `Messages` class to trace it."""
capture_content = should_capture_content_on_spans_in_experimental_mode()
capture_content = handler.should_capture_content()

def traced_method(
wrapped: Callable[
...,
Union[
"AnthropicMessage",
"AnthropicStream[RawMessageStreamEvent]",
AnthropicMessage,
AnthropicStream[RawMessageStreamEvent],
],
],
instance: "Messages",
instance: Messages,
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> Union[
"AnthropicMessage",
"AnthropicStream[RawMessageStreamEvent]",
AnthropicMessage,
AnthropicStream[RawMessageStreamEvent],
MessagesStreamWrapper[None],
]:
params = extract_params(*args, **kwargs)
attributes = get_llm_request_attributes(params, instance)
request_model_attribute = attributes.get(
GenAIAttributes.GEN_AI_REQUEST_MODEL
invocation = _create_invocation(
handler, instance, args, kwargs, capture_content
)
request_model = (
request_model_attribute
if isinstance(request_model_attribute, str)
else params.model
)

invocation = LLMInvocation(
request_model=request_model,
provider=ANTHROPIC,
input_messages=get_input_messages(params.messages)
if capture_content
else [],
system_instruction=get_system_instruction(params.system)
if capture_content
else [],
attributes=attributes,
)

# Use manual lifecycle management for both streaming and non-streaming
handler.start_llm(invocation)
try:
result = wrapped(*args, **kwargs)
if isinstance(result, AnthropicStream):
return MessagesStreamWrapper(
result, handler, invocation, capture_content
result, invocation, capture_content
)

wrapper = MessageWrapper(result, capture_content)
wrapper.extract_into(invocation)
handler.stop_llm(invocation)
invocation.stop()
return wrapper.message
except Exception as exc:
handler.fail_llm(
invocation, Error(message=str(exc), type=type(exc))
)
invocation.fail(exc)
raise

return cast(
'Callable[..., Union["AnthropicMessage", "AnthropicStream[RawMessageStreamEvent]", MessagesStreamWrapper[None]]]',
traced_method,
)


def _create_invocation(
handler: TelemetryHandler,
instance: Messages,
args: tuple[Any, ...],
kwargs: dict[str, Any],
capture_content: bool,
) -> InferenceInvocation:
params = extract_params(*args, **kwargs)
attributes = get_llm_request_attributes(params, instance)
request_model_attribute = attributes.get(
GenAIAttributes.GEN_AI_REQUEST_MODEL
)
request_model = (
request_model_attribute
if isinstance(request_model_attribute, str)
else params.model
)

server_address, server_port = get_server_address_and_port(instance)
invocation = handler.start_inference(
provider=ANTHROPIC,
request_model=request_model,
Comment thread
eternalcuriouslearner marked this conversation as resolved.
Comment thread
eternalcuriouslearner marked this conversation as resolved.
server_address=server_address,
server_port=server_port,
)
invocation.input_messages = (
get_input_messages(params.messages) if capture_content else []
)
invocation.system_instruction = (
get_system_instruction(params.system) if capture_content else []
)
invocation.attributes = attributes
return invocation


def messages_stream(
handler: TelemetryHandler,
) -> Callable[..., MessagesStreamManagerWrapper[Any]]:
"""Wrap the sync `stream` method of the `Messages` class."""
capture_content = handler.should_capture_content()

Comment thread
eternalcuriouslearner marked this conversation as resolved.
def traced_method(
wrapped: Callable[..., MessageStreamManager],
instance: Messages,
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> MessagesStreamManagerWrapper[Any]:
invocation = _create_invocation(
handler, instance, args, kwargs, capture_content
)

try:
return MessagesStreamManagerWrapper(
wrapped(*args, **kwargs), invocation, capture_content
)
except Exception as exc:
invocation.fail(exc)
raise

return cast(
Comment thread
eternalcuriouslearner marked this conversation as resolved.
"Callable[..., MessagesStreamManagerWrapper[Any]]", traced_method
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

import base64
import json
from collections.abc import Mapping
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, cast

from anthropic.types import (
InputJSONDelta,
Expand All @@ -43,7 +44,7 @@
)

if TYPE_CHECKING:
from collections.abc import Iterable, Mapping
from collections.abc import Iterable

from anthropic.types import (
ContentBlock,
Expand Down Expand Up @@ -160,12 +161,9 @@ def _convert_content_block_to_part(
id=block.tool_use_id,
)

# ContentBlockParam variants are TypedDicts (dicts at runtime);
# newer SDK versions may add Pydantic block types not handled above.
if isinstance(block, dict):
return _convert_dict_block_to_part(block)

return None
if not hasattr(block, "get"):
return None
return _convert_dict_block_to_part(cast(Mapping[str, Any], block))


def convert_content_to_parts(
Expand Down
Loading
Loading