feat: Add async support for Anthropic instrumentation#4156
Conversation
- Add support for AsyncMessages.create() with streaming and non-streaming - Add support for AsyncMessages.stream() method - Add AsyncStreamWrapper for async streaming telemetry - Add AsyncMessageStreamManagerWrapper for async stream context manager - Add MessageWrapper for non-streaming response telemetry (shared) - Rename MessageCreateParams to MessageRequestParams - Add comprehensive tests for async functionality
- Add PR open-telemetry#4156 to CHANGELOG.md for async support - Add type: ignore comments to fix pyright type errors - Apply ruff formatting fixes
CI pyright version doesn't require these ignores.
- Introduce constants for provider name and cache token attributes. - Normalize stop reasons for consistency in telemetry data. - Enhance MessageWrapper and AsyncStreamWrapper to aggregate cache token fields into input tokens. - Update tests to validate new token aggregation logic and ensure correct handling of stop reasons.
- Add assertions for cache creation and read input tokens in async message tests. - Remove deprecated tests for MessageWrapper and AsyncStreamWrapper aggregation. - Streamline test cases to focus on updated token usage tracking.
…sage tests - Simplify the definition of cache read input tokens constant. - Consolidate assertions in async message tests for better readability and maintainability. - Ensure consistent formatting in token usage assertions across test cases.
|
howdy @anirudha, would love your feedback on this too. |
There was a problem hiding this comment.
Nice work; thanks Teja!! async wrappers look solid, and the _normalize_finish_reason + cache token tracking are good improvements over the base.
Couple things not tied to specific lines:
- Cassettes still leak
anthropic-organization-id(same as #4155).scrub_response_headersin conftest is a no-op.
| "create", | ||
| ) | ||
| unwrap( | ||
| anthropic.resources.messages.AsyncMessages, # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType,reportUnknownArgumentType] |
There was a problem hiding this comment.
This unwraps Messages.create, AsyncMessages.create, and AsyncMessages.stream — but not Messages.stream. If #4155 lands first (which patches Messages.stream), whoever resolves the merge needs to add that unwrap too or it'll leak.
| ) | ||
| raise | ||
|
|
||
| async def __aexit__( |
There was a problem hiding this comment.
AsyncStreamWrapper has self._finalized to prevent double stop_llm — nice. But AsyncMessageStreamManagerWrapper doesn't have one. Same risk if __aexit__ fires twice.
| self._invocation.attributes[ | ||
| _GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS | ||
| ] = cache_read_input_tokens | ||
| except Exception: # pylint: disable=broad-exception-caught |
There was a problem hiding this comment.
If get_final_message() fails (e.g., stream not fully consumed), this swallows it and the span gets no response data with no indication why. A logger.debug here would save someone a debugging headache.
| request_model = ( | ||
| request_model_attribute | ||
| if isinstance(request_model_attribute, str) | ||
| else params.model or "" |
There was a problem hiding this comment.
Fallback changed from "unknown" to "" — this makes the span name "chat " (trailing space) when no model is found. Probably want to keep "unknown" or some non-empty fallback.
Also this same request_model_attribute / isinstance block is copy-pasted in async_messages_create (L162) and async_messages_stream (L105). Small helper would clean that up.
|
@anirudha Thanks for the review. I am closing this PR for now and I will reopen it after I complete sync instrumentation. |
Description
This PR adds async support for the Anthropic instrumentation. It enables telemetry capture for:
AsyncMessages.create()- Both streaming (stream=True) and non-streaming async requestsAsyncMessages.stream()- The dedicated async streaming method that returns anAsyncMessageStreamManagerKey changes:
AsyncStreamWrapperclass to wrapAsyncStream[RawMessageStreamEvent]for async streaming telemetryAsyncMessageStreamManagerWrapperto wrapAsyncMessageStreamManagerasync context managerMessageWrapperfor non-streaming response telemetry extraction (shared with sync)async_messages_createandasync_messages_streampatch functionsMessageCreateParamstoMessageRequestParamsfor broader API coverageFixes #3949
Type of change
Please delete options that are not relevant.
How Has This Been Tested?
Added comprehensive tests for async functionality:
test_async_messages_create_basic- Basic async createtest_async_messages_create_with_all_params- Async create with parameterstest_async_messages_create_token_usage- Token usage capturetest_async_messages_create_stop_reason- Stop reason capturetest_async_messages_create_api_error- API error handlingtest_async_messages_create_connection_error- Connection error handlingtest_async_messages_create_streaming- Async streaming with context managertest_async_messages_create_streaming_iteration- Async streaming direct iterationtest_async_messages_create_streaming_connection_error- Streaming error handlingtest_async_messages_stream_basic- AsyncMessages.stream() methodtest_async_messages_stream_with_params- Stream with parameterstest_async_messages_stream_token_usage- Stream token usagetest_async_messages_stream_connection_error- Stream error handlingAll tests use VCR cassettes for reproducible HTTP interaction replay.
Does This PR Require a Core Repo Change?
Checklist:
See contributing.md for styleguide, changelog guidelines, and more.