Skip to content

feat: Add async support for Anthropic instrumentation#4156

Closed
vasantteja wants to merge 9 commits into
open-telemetry:mainfrom
vasantteja:anthropic-async
Closed

feat: Add async support for Anthropic instrumentation#4156
vasantteja wants to merge 9 commits into
open-telemetry:mainfrom
vasantteja:anthropic-async

Conversation

@vasantteja

@vasantteja vasantteja commented Feb 1, 2026

Copy link
Copy Markdown
Contributor

Description

This PR adds async support for the Anthropic instrumentation. It enables telemetry capture for:

  1. AsyncMessages.create() - Both streaming (stream=True) and non-streaming async requests
  2. AsyncMessages.stream() - The dedicated async streaming method that returns an AsyncMessageStreamManager

Key changes:

  • Added AsyncStreamWrapper class to wrap AsyncStream[RawMessageStreamEvent] for async streaming telemetry
  • Added AsyncMessageStreamManagerWrapper to wrap AsyncMessageStreamManager async context manager
  • Added MessageWrapper for non-streaming response telemetry extraction (shared with sync)
  • Added async_messages_create and async_messages_stream patch functions
  • Renamed MessageCreateParams to MessageRequestParams for broader API coverage

Fixes #3949

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How Has This Been Tested?

Added comprehensive tests for async functionality:

  • test_async_messages_create_basic - Basic async create
  • test_async_messages_create_with_all_params - Async create with parameters
  • test_async_messages_create_token_usage - Token usage capture
  • test_async_messages_create_stop_reason - Stop reason capture
  • test_async_messages_create_api_error - API error handling
  • test_async_messages_create_connection_error - Connection error handling
  • test_async_messages_create_streaming - Async streaming with context manager
  • test_async_messages_create_streaming_iteration - Async streaming direct iteration
  • test_async_messages_create_streaming_connection_error - Streaming error handling
  • test_async_messages_stream_basic - AsyncMessages.stream() method
  • test_async_messages_stream_with_params - Stream with parameters
  • test_async_messages_stream_token_usage - Stream token usage
  • test_async_messages_stream_connection_error - Stream error handling

All tests use VCR cassettes for reproducible HTTP interaction replay.

Does This PR Require a Core Repo Change?

  • Yes. - Link to PR:
  • No.

Checklist:

See contributing.md for styleguide, changelog guidelines, and more.

  • Followed the style guidelines of this project
  • Changelogs have been updated
  • Unit tests have been added
  • Documentation has been updated

- Add support for AsyncMessages.create() with streaming and non-streaming
- Add support for AsyncMessages.stream() method
- Add AsyncStreamWrapper for async streaming telemetry
- Add AsyncMessageStreamManagerWrapper for async stream context manager
- Add MessageWrapper for non-streaming response telemetry (shared)
- Rename MessageCreateParams to MessageRequestParams
- Add comprehensive tests for async functionality
- Add PR open-telemetry#4156 to CHANGELOG.md for async support
- Add type: ignore comments to fix pyright type errors
- Apply ruff formatting fixes
CI pyright version doesn't require these ignores.
@vasantteja vasantteja removed their assignment Feb 3, 2026
- Introduce constants for provider name and cache token attributes.
- Normalize stop reasons for consistency in telemetry data.
- Enhance MessageWrapper and AsyncStreamWrapper to aggregate cache token fields into input tokens.
- Update tests to validate new token aggregation logic and ensure correct handling of stop reasons.
- Add assertions for cache creation and read input tokens in async message tests.
- Remove deprecated tests for MessageWrapper and AsyncStreamWrapper aggregation.
- Streamline test cases to focus on updated token usage tracking.
…sage tests

- Simplify the definition of cache read input tokens constant.
- Consolidate assertions in async message tests for better readability and maintainability.
- Ensure consistent formatting in token usage assertions across test cases.
@vasantteja

Copy link
Copy Markdown
Contributor Author

howdy @anirudha, would love your feedback on this too.

@vasantteja vasantteja removed their assignment Feb 9, 2026
@vasantteja vasantteja removed their assignment Feb 9, 2026
@vasantteja vasantteja removed their assignment Feb 11, 2026

@anirudha anirudha left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work; thanks Teja!! async wrappers look solid, and the _normalize_finish_reason + cache token tracking are good improvements over the base.

Couple things not tied to specific lines:

  • Cassettes still leak anthropic-organization-id (same as #4155). scrub_response_headers in conftest is a no-op.

"create",
)
unwrap(
anthropic.resources.messages.AsyncMessages, # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType,reportUnknownArgumentType]

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unwraps Messages.create, AsyncMessages.create, and AsyncMessages.stream — but not Messages.stream. If #4155 lands first (which patches Messages.stream), whoever resolves the merge needs to add that unwrap too or it'll leak.

)
raise

async def __aexit__(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncStreamWrapper has self._finalized to prevent double stop_llm — nice. But AsyncMessageStreamManagerWrapper doesn't have one. Same risk if __aexit__ fires twice.

self._invocation.attributes[
_GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS
] = cache_read_input_tokens
except Exception: # pylint: disable=broad-exception-caught

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If get_final_message() fails (e.g., stream not fully consumed), this swallows it and the span gets no response data with no indication why. A logger.debug here would save someone a debugging headache.

request_model = (
request_model_attribute
if isinstance(request_model_attribute, str)
else params.model or ""

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fallback changed from "unknown" to "" — this makes the span name "chat " (trailing space) when no model is found. Probably want to keep "unknown" or some non-empty fallback.

Also this same request_model_attribute / isinstance block is copy-pasted in async_messages_create (L162) and async_messages_stream (L105). Small helper would clean that up.

@vasantteja vasantteja removed their assignment Feb 16, 2026
@vasantteja

Copy link
Copy Markdown
Contributor Author

@anirudha Thanks for the review. I am closing this PR for now and I will reopen it after I complete sync instrumentation.

@vasantteja vasantteja closed this Feb 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add OpenTelemetry instrumentation for the Anthropic Claude Python SDK

7 participants