-
Notifications
You must be signed in to change notification settings - Fork 971
feat: Add async support for Anthropic instrumentation #4156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0371f53
3764396
f7a7a6b
6655d83
ca4f6a8
5148d03
5ecca77
b59a192
56d5728
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,22 +14,30 @@ | |
|
|
||
| """Patching functions for Anthropic instrumentation.""" | ||
|
|
||
| from typing import TYPE_CHECKING, Any, Callable | ||
| from typing import TYPE_CHECKING, Any, Callable, Coroutine, Union | ||
|
|
||
| from opentelemetry.semconv._incubating.attributes import ( | ||
| gen_ai_attributes as GenAIAttributes, | ||
| ) | ||
| from opentelemetry.util.genai.handler import TelemetryHandler | ||
| from opentelemetry.util.genai.types import LLMInvocation | ||
| from opentelemetry.util.genai.types import Error, LLMInvocation | ||
|
|
||
| from .utils import ( | ||
| AsyncMessageStreamManagerWrapper, | ||
| AsyncStreamWrapper, | ||
| MessageWrapper, | ||
| extract_params, | ||
| get_llm_request_attributes, | ||
| ) | ||
|
|
||
| if TYPE_CHECKING: | ||
| from anthropic.resources.messages import Messages | ||
| from anthropic.types import Message | ||
| from anthropic._streaming import AsyncStream | ||
| from anthropic.lib.streaming import AsyncMessageStreamManager | ||
| from anthropic.resources.messages import AsyncMessages, Messages | ||
| from anthropic.types import Message, RawMessageStreamEvent | ||
|
|
||
|
|
||
| ANTHROPIC = "anthropic" | ||
|
|
||
|
|
||
| def messages_create( | ||
|
|
@@ -45,15 +53,18 @@ def traced_method( | |
| ) -> "Message": | ||
| params = extract_params(*args, **kwargs) | ||
| attributes = get_llm_request_attributes(params, instance) | ||
| request_model = str( | ||
| attributes.get(GenAIAttributes.GEN_AI_REQUEST_MODEL) | ||
| or params.model | ||
| or "unknown" | ||
| request_model_attribute = attributes.get( | ||
| GenAIAttributes.GEN_AI_REQUEST_MODEL | ||
| ) | ||
| request_model = ( | ||
| request_model_attribute | ||
| if isinstance(request_model_attribute, str) | ||
| else params.model or "" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fallback changed from Also this same |
||
| ) | ||
|
|
||
| invocation = LLMInvocation( | ||
| request_model=request_model, | ||
| provider="anthropic", | ||
| provider=ANTHROPIC, | ||
| attributes=attributes, | ||
| ) | ||
|
|
||
|
|
@@ -76,3 +87,107 @@ def traced_method( | |
| return result | ||
|
|
||
| return traced_method | ||
|
|
||
|
|
||
| def async_messages_stream( | ||
| handler: TelemetryHandler, | ||
| ) -> Callable[..., "AsyncMessageStreamManager"]: | ||
| """Wrap the `stream` method of the `AsyncMessages` class to trace it.""" | ||
|
|
||
| def traced_method( | ||
| wrapped: Callable[..., "AsyncMessageStreamManager"], | ||
| instance: "AsyncMessages", | ||
| args: tuple[Any, ...], | ||
| kwargs: dict[str, Any], | ||
| ) -> AsyncMessageStreamManagerWrapper: | ||
| params = extract_params(*args, **kwargs) | ||
| attributes = get_llm_request_attributes(params, instance) # type: ignore[arg-type] | ||
| request_model_attribute = attributes.get( | ||
| GenAIAttributes.GEN_AI_REQUEST_MODEL | ||
| ) | ||
| request_model = ( | ||
| request_model_attribute | ||
| if isinstance(request_model_attribute, str) | ||
| else params.model or "" | ||
| ) | ||
|
|
||
| invocation = LLMInvocation( | ||
| request_model=request_model, | ||
| provider=ANTHROPIC, | ||
| attributes=attributes, | ||
| ) | ||
|
|
||
| # Start the span before calling the wrapped method | ||
| handler.start_llm(invocation) | ||
| try: | ||
| result = wrapped(*args, **kwargs) | ||
| # Return wrapped AsyncMessageStreamManager | ||
| return AsyncMessageStreamManagerWrapper( | ||
| result, handler, invocation | ||
| ) | ||
| except Exception as exc: | ||
| handler.fail_llm( | ||
| invocation, Error(message=str(exc), type=type(exc)) | ||
| ) | ||
| raise | ||
|
|
||
| return traced_method # type: ignore[return-value] | ||
|
|
||
|
|
||
| def async_messages_create( | ||
| handler: TelemetryHandler, | ||
| ) -> Callable[ | ||
| ..., | ||
| Coroutine[ | ||
| Any, Any, Union["Message", "AsyncStream[RawMessageStreamEvent]"] | ||
| ], | ||
| ]: | ||
| """Wrap the `create` method of the `AsyncMessages` class to trace it.""" | ||
|
|
||
| async def traced_method( | ||
| wrapped: Callable[ | ||
| ..., | ||
| Coroutine[ | ||
| Any, | ||
| Any, | ||
| Union["Message", "AsyncStream[RawMessageStreamEvent]"], | ||
| ], | ||
| ], | ||
| instance: "AsyncMessages", | ||
| args: tuple[Any, ...], | ||
| kwargs: dict[str, Any], | ||
| ) -> Union["Message", AsyncStreamWrapper]: | ||
| params = extract_params(*args, **kwargs) | ||
| attributes = get_llm_request_attributes(params, instance) # type: ignore[arg-type] | ||
| request_model_attribute = attributes.get( | ||
| GenAIAttributes.GEN_AI_REQUEST_MODEL | ||
| ) | ||
| request_model = ( | ||
| request_model_attribute | ||
| if isinstance(request_model_attribute, str) | ||
| else params.model or "" | ||
| ) | ||
|
|
||
| invocation = LLMInvocation( | ||
| request_model=request_model, | ||
| provider=ANTHROPIC, | ||
| attributes=attributes, | ||
| ) | ||
|
|
||
| is_streaming = kwargs.get("stream", False) | ||
|
|
||
| # Use manual lifecycle management for both streaming and non-streaming | ||
| handler.start_llm(invocation) | ||
| try: | ||
| result = await wrapped(*args, **kwargs) | ||
| if is_streaming: | ||
| return AsyncStreamWrapper(result, handler, invocation) # type: ignore[arg-type] | ||
| wrapper = MessageWrapper(result, handler, invocation) # type: ignore[arg-type] | ||
| return wrapper.message | ||
| except Exception as exc: | ||
| handler.fail_llm( | ||
| invocation, Error(message=str(exc), type=type(exc)) | ||
| ) | ||
| raise | ||
|
|
||
| return traced_method # type: ignore[return-value] | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This unwraps
Messages.create,AsyncMessages.create, andAsyncMessages.stream— but notMessages.stream. If #4155 lands first (which patchesMessages.stream), whoever resolves the merge needs to add that unwrap too or it'll leak.