-
Notifications
You must be signed in to change notification settings - Fork 943
gen-ai instrumentation(feat): anthropic messages stream method instrumentation #4499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 11 commits
4cbf426
4cfa251
5bfc606
5c90a95
9f30bc1
f7549cd
a8a9785
b3b1c35
ce69875
e270875
753840c
fbb367a
5819605
6cc1ca6
27e69f8
e97cacf
72108db
89b211a
d273c3a
0019183
f31e362
4f7890f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,13 +26,7 @@ | |
| gen_ai_attributes as GenAIAttributes, | ||
| ) | ||
| from opentelemetry.util.genai.handler import TelemetryHandler | ||
| from opentelemetry.util.genai.types import ( | ||
| Error, | ||
| LLMInvocation, # TODO: migrate to InferenceInvocation | ||
| ) | ||
| from opentelemetry.util.genai.utils import ( | ||
| should_capture_content_on_spans_in_experimental_mode, | ||
| ) | ||
| from opentelemetry.util.genai.invocation import InferenceInvocation | ||
|
|
||
| from .messages_extractors import ( | ||
| extract_params, | ||
|
|
@@ -41,11 +35,15 @@ | |
| get_system_instruction, | ||
| ) | ||
| from .wrappers import ( | ||
| MessagesStreamManagerWrapper, | ||
| MessagesStreamWrapper, | ||
| MessageWrapper, | ||
| ) | ||
|
|
||
| if TYPE_CHECKING: | ||
| from anthropic.lib.streaming._messages import ( # pylint: disable=no-name-in-module | ||
| MessageStreamManager, | ||
| ) | ||
| from anthropic.resources.messages import Messages | ||
| from anthropic.types import RawMessageStreamEvent | ||
|
|
||
|
|
@@ -65,7 +63,6 @@ def messages_create( | |
| ], | ||
| ]: | ||
| """Wrap the `create` method of the `Messages` class to trace it.""" | ||
| capture_content = should_capture_content_on_spans_in_experimental_mode() | ||
|
|
||
| def traced_method( | ||
| wrapped: Callable[ | ||
|
|
@@ -83,49 +80,88 @@ def traced_method( | |
| "AnthropicStream[RawMessageStreamEvent]", | ||
| MessagesStreamWrapper[None], | ||
| ]: | ||
| params = extract_params(*args, **kwargs) | ||
| attributes = get_llm_request_attributes(params, instance) | ||
| request_model_attribute = attributes.get( | ||
| GenAIAttributes.GEN_AI_REQUEST_MODEL | ||
| invocation, capture_content = _create_invocation( | ||
| handler, instance, args, kwargs | ||
| ) | ||
| request_model = ( | ||
| request_model_attribute | ||
| if isinstance(request_model_attribute, str) | ||
| else params.model | ||
| ) | ||
|
|
||
| invocation = LLMInvocation( | ||
| request_model=request_model, | ||
| provider=ANTHROPIC, | ||
| input_messages=get_input_messages(params.messages) | ||
| if capture_content | ||
| else [], | ||
| system_instruction=get_system_instruction(params.system) | ||
| if capture_content | ||
| else [], | ||
| attributes=attributes, | ||
| ) | ||
|
|
||
| # Use manual lifecycle management for both streaming and non-streaming | ||
| handler.start_llm(invocation) | ||
| try: | ||
| result = wrapped(*args, **kwargs) | ||
| if isinstance(result, AnthropicStream): | ||
| return MessagesStreamWrapper( | ||
| result, handler, invocation, capture_content | ||
| result, invocation, capture_content | ||
| ) | ||
|
|
||
| wrapper = MessageWrapper(result, capture_content) | ||
| wrapper.extract_into(invocation) | ||
| handler.stop_llm(invocation) | ||
| invocation.stop() | ||
| return wrapper.message | ||
| except Exception as exc: | ||
| handler.fail_llm( | ||
| invocation, Error(message=str(exc), type=type(exc)) | ||
| ) | ||
| invocation.fail(exc) | ||
| raise | ||
|
|
||
| return cast( | ||
| 'Callable[..., Union["AnthropicMessage", "AnthropicStream[RawMessageStreamEvent]", MessagesStreamWrapper[None]]]', | ||
| traced_method, | ||
| ) | ||
|
|
||
|
|
||
| def _create_invocation( | ||
| handler: TelemetryHandler, | ||
| instance: "Messages", | ||
| args: tuple[Any, ...], | ||
| kwargs: dict[str, Any], | ||
| ) -> tuple[InferenceInvocation, bool]: | ||
| should_capture_content = cast( | ||
| "Callable[[], bool]", getattr(handler, "should_capture_content") | ||
| ) | ||
| capture_content = should_capture_content() | ||
| params = extract_params(*args, **kwargs) | ||
| attributes = get_llm_request_attributes(params, instance) | ||
| request_model_attribute = attributes.get( | ||
| GenAIAttributes.GEN_AI_REQUEST_MODEL | ||
| ) | ||
| request_model = ( | ||
| request_model_attribute | ||
| if isinstance(request_model_attribute, str) | ||
| else params.model | ||
| ) | ||
|
|
||
| invocation = handler.start_inference( | ||
| provider=ANTHROPIC, | ||
| request_model=request_model, | ||
|
eternalcuriouslearner marked this conversation as resolved.
eternalcuriouslearner marked this conversation as resolved.
|
||
| ) | ||
| invocation.input_messages = ( | ||
| get_input_messages(params.messages) if capture_content else [] | ||
| ) | ||
| invocation.system_instruction = ( | ||
| get_system_instruction(params.system) if capture_content else [] | ||
| ) | ||
| invocation.attributes = attributes | ||
| return invocation, capture_content | ||
|
|
||
|
|
||
| def messages_stream( | ||
| handler: TelemetryHandler, | ||
| ) -> Callable[..., MessagesStreamManagerWrapper[Any]]: | ||
| """Wrap the sync `stream` method of the `Messages` class.""" | ||
|
|
||
|
eternalcuriouslearner marked this conversation as resolved.
|
||
| def traced_method( | ||
| wrapped: Callable[..., "MessageStreamManager"], | ||
| instance: "Messages", | ||
| args: tuple[Any, ...], | ||
| kwargs: dict[str, Any], | ||
| ) -> MessagesStreamManagerWrapper[Any]: | ||
| invocation, capture_content = _create_invocation( | ||
| handler, instance, args, kwargs | ||
| ) | ||
|
|
||
| try: | ||
| return MessagesStreamManagerWrapper( | ||
| wrapped(*args, **kwargs), invocation, capture_content | ||
| ) | ||
|
||
| except Exception as exc: | ||
| invocation.fail(exc) | ||
| raise | ||
|
|
||
| return cast( | ||
|
eternalcuriouslearner marked this conversation as resolved.
|
||
| "Callable[..., MessagesStreamManagerWrapper[Any]]", traced_method | ||
| ) | ||
Uh oh!
There was an error while loading. Please reload this page.