Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Add async support for Anthropic instrumentation with `AsyncMessages.create` and `AsyncMessages.stream`
([#4156](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4156))
- Initial implementation of Anthropic instrumentation
([#3978](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3978))
- Implement sync `Messages.create` instrumentation with GenAI semantic convention attributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@
)

from opentelemetry.instrumentation.anthropic.package import _instruments
from opentelemetry.instrumentation.anthropic.patch import messages_create
from opentelemetry.instrumentation.anthropic.patch import (
async_messages_create,
async_messages_stream,
messages_create,
)
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.util.genai.handler import TelemetryHandler
Expand Down Expand Up @@ -103,6 +107,20 @@ def _instrument(self, **kwargs: Any) -> None:
wrapper=messages_create(handler),
)

# Patch AsyncMessages.create
wrap_function_wrapper(
module="anthropic.resources.messages",
name="AsyncMessages.create",
wrapper=async_messages_create(handler),
)

# Patch AsyncMessages.stream
wrap_function_wrapper(
module="anthropic.resources.messages",
name="AsyncMessages.stream",
wrapper=async_messages_stream(handler),
)

def _uninstrument(self, **kwargs: Any) -> None:
"""Disable Anthropic instrumentation.

Expand All @@ -114,3 +132,11 @@ def _uninstrument(self, **kwargs: Any) -> None:
anthropic.resources.messages.Messages, # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType,reportUnknownArgumentType]
"create",
)
unwrap(
anthropic.resources.messages.AsyncMessages, # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType,reportUnknownArgumentType]
"create",
)
unwrap(
anthropic.resources.messages.AsyncMessages, # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType,reportUnknownArgumentType]

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unwraps Messages.create, AsyncMessages.create, and AsyncMessages.stream — but not Messages.stream. If #4155 lands first (which patches Messages.stream), whoever resolves the merge needs to add that unwrap too or it'll leak.

"stream",
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,30 @@

"""Patching functions for Anthropic instrumentation."""

from typing import TYPE_CHECKING, Any, Callable
from typing import TYPE_CHECKING, Any, Callable, Coroutine, Union

from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAIAttributes,
)
from opentelemetry.util.genai.handler import TelemetryHandler
from opentelemetry.util.genai.types import LLMInvocation
from opentelemetry.util.genai.types import Error, LLMInvocation

from .utils import (
AsyncMessageStreamManagerWrapper,
AsyncStreamWrapper,
MessageWrapper,
extract_params,
get_llm_request_attributes,
)

if TYPE_CHECKING:
from anthropic.resources.messages import Messages
from anthropic.types import Message
from anthropic._streaming import AsyncStream
from anthropic.lib.streaming import AsyncMessageStreamManager
from anthropic.resources.messages import AsyncMessages, Messages
from anthropic.types import Message, RawMessageStreamEvent


ANTHROPIC = "anthropic"


def messages_create(
Expand All @@ -45,15 +53,18 @@ def traced_method(
) -> "Message":
params = extract_params(*args, **kwargs)
attributes = get_llm_request_attributes(params, instance)
request_model = str(
attributes.get(GenAIAttributes.GEN_AI_REQUEST_MODEL)
or params.model
or "unknown"
request_model_attribute = attributes.get(
GenAIAttributes.GEN_AI_REQUEST_MODEL
)
request_model = (
request_model_attribute
if isinstance(request_model_attribute, str)
else params.model or ""

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fallback changed from "unknown" to "" — this makes the span name "chat " (trailing space) when no model is found. Probably want to keep "unknown" or some non-empty fallback.

Also this same request_model_attribute / isinstance block is copy-pasted in async_messages_create (L162) and async_messages_stream (L105). Small helper would clean that up.

)

invocation = LLMInvocation(
request_model=request_model,
provider="anthropic",
provider=ANTHROPIC,
attributes=attributes,
)

Expand All @@ -76,3 +87,107 @@ def traced_method(
return result

return traced_method


def async_messages_stream(
handler: TelemetryHandler,
) -> Callable[..., "AsyncMessageStreamManager"]:
"""Wrap the `stream` method of the `AsyncMessages` class to trace it."""

def traced_method(
wrapped: Callable[..., "AsyncMessageStreamManager"],
instance: "AsyncMessages",
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> AsyncMessageStreamManagerWrapper:
params = extract_params(*args, **kwargs)
attributes = get_llm_request_attributes(params, instance) # type: ignore[arg-type]
request_model_attribute = attributes.get(
GenAIAttributes.GEN_AI_REQUEST_MODEL
)
request_model = (
request_model_attribute
if isinstance(request_model_attribute, str)
else params.model or ""
)

invocation = LLMInvocation(
request_model=request_model,
provider=ANTHROPIC,
attributes=attributes,
)

# Start the span before calling the wrapped method
handler.start_llm(invocation)
try:
result = wrapped(*args, **kwargs)
# Return wrapped AsyncMessageStreamManager
return AsyncMessageStreamManagerWrapper(
result, handler, invocation
)
except Exception as exc:
handler.fail_llm(
invocation, Error(message=str(exc), type=type(exc))
)
raise

return traced_method # type: ignore[return-value]


def async_messages_create(
handler: TelemetryHandler,
) -> Callable[
...,
Coroutine[
Any, Any, Union["Message", "AsyncStream[RawMessageStreamEvent]"]
],
]:
"""Wrap the `create` method of the `AsyncMessages` class to trace it."""

async def traced_method(
wrapped: Callable[
...,
Coroutine[
Any,
Any,
Union["Message", "AsyncStream[RawMessageStreamEvent]"],
],
],
instance: "AsyncMessages",
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> Union["Message", AsyncStreamWrapper]:
params = extract_params(*args, **kwargs)
attributes = get_llm_request_attributes(params, instance) # type: ignore[arg-type]
request_model_attribute = attributes.get(
GenAIAttributes.GEN_AI_REQUEST_MODEL
)
request_model = (
request_model_attribute
if isinstance(request_model_attribute, str)
else params.model or ""
)

invocation = LLMInvocation(
request_model=request_model,
provider=ANTHROPIC,
attributes=attributes,
)

is_streaming = kwargs.get("stream", False)

# Use manual lifecycle management for both streaming and non-streaming
handler.start_llm(invocation)
try:
result = await wrapped(*args, **kwargs)
if is_streaming:
return AsyncStreamWrapper(result, handler, invocation) # type: ignore[arg-type]
wrapper = MessageWrapper(result, handler, invocation) # type: ignore[arg-type]
return wrapper.message
except Exception as exc:
handler.fail_llm(
invocation, Error(message=str(exc), type=type(exc))
)
raise

return traced_method # type: ignore[return-value]
Loading