feat: Add instrumentation for AsyncMessages.stream, Messages.parse and AsyncMessages.parse methods.#191
Conversation
…d AsyncMessages.parse methods.
There was a problem hiding this comment.
Pull request overview
This PR extends the Anthropic GenAI instrumentation to cover additional stable messages APIs (structured-output parse and async stream) while continuing to route telemetry through opentelemetry-util-genai, and adds VCR-backed tests/cassettes validating the new coverage.
Changes:
- Add instrumentation hooks for
AsyncMessages.stream, plus optional wrapping forMessages.parse/AsyncMessages.parsewhen supported by the installed Anthropic SDK. - Update async stream manager wrapping to create the
InferenceInvocationlazily on__aenter__, matching the sync stream-manager behavior. - Add VCR-backed tests and cassettes for parse + async streaming success, content capture, API error, mid-stream error, early-close, and user-exception scenarios.
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| instrumentation/opentelemetry-instrumentation-genai-anthropic/src/opentelemetry/instrumentation/genai/anthropic/init.py | Patches additional Anthropic SDK methods (AsyncMessages.stream, conditional *.parse) and updates unpatching accordingly. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/src/opentelemetry/instrumentation/genai/anthropic/patch.py | Adds a new wrapper factory for AsyncMessages.stream returning an async stream-manager wrapper with lazy invocation creation. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/src/opentelemetry/instrumentation/genai/anthropic/wrappers.py | Updates async stream-manager wrapper to accept an invocation factory and create the invocation in __aenter__. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/tests/test_async_messages.py | Adds VCR-backed tests for AsyncMessages.stream telemetry, content capture, and error/early-close/user-exception paths. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/tests/test_instrumentor.py | Adds unit-style monkeypatch tests asserting parse and async stream are instrumented and emit chat spans. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/tests/test_parse_messages.py | Adds VCR-backed tests for Messages.parse / AsyncMessages.parse including content capture and API error behavior. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/tests/cassettes/test_sync_messages_parse_basic.yaml | Recorded cassette for sync Messages.parse basic structured-output test. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/tests/cassettes/test_sync_messages_parse_captures_content.yaml | Recorded cassette for sync Messages.parse with content capture assertions. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/tests/cassettes/test_sync_messages_parse_api_error.yaml | Recorded cassette for sync Messages.parse API error path. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/tests/cassettes/test_async_messages_parse_basic.yaml | Recorded cassette for async AsyncMessages.parse basic structured-output test. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/tests/cassettes/test_async_messages_parse_captures_content.yaml | Recorded cassette for async AsyncMessages.parse with content capture assertions. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/tests/cassettes/test_async_messages_parse_api_error.yaml | Recorded cassette for async AsyncMessages.parse API error path. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/tests/cassettes/test_async_messages_stream.yaml | Recorded cassette for async AsyncMessages.stream happy path. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/tests/cassettes/test_async_messages_stream_captures_content.yaml | Recorded cassette for async AsyncMessages.stream with content capture assertions. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/tests/cassettes/test_async_messages_stream_delegates_response_attribute.yaml | Recorded cassette ensuring wrapped stream exposes response attributes. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/tests/cassettes/test_async_messages_stream_api_error.yaml | Recorded cassette for async AsyncMessages.stream API error path. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/tests/cassettes/test_async_messages_stream_interrupted_mid_iteration.yaml | Recorded cassette used by test that injects a mid-iteration exception. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/tests/cassettes/test_async_messages_stream_closed_early_by_caller.yaml | Recorded cassette for early-close path. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/tests/cassettes/test_async_messages_stream_user_exception.yaml | Recorded cassette for caller-raised exception inside the stream context. |
| instrumentation/opentelemetry-instrumentation-genai-anthropic/.changelog/+messages-parse-async-stream.added | Towncrier fragment describing the new instrumentation coverage. |
| if callable(invocation): | ||
| invocation_factory = invocation | ||
| else: | ||
| def invocation_factory() -> InferenceInvocation: | ||
| return invocation |
| def _is_parse_supported() -> bool: | ||
| """Check if the parse() method is available on the Messages class. | ||
|
|
||
| Messages.parse() for structured outputs was added in a newer anthropic | ||
| SDK release; create() and stream() are always present. | ||
| """ | ||
| try: | ||
| from anthropic.resources.messages import ( # pylint: disable=import-outside-toplevel # noqa: PLC0415 | ||
| Messages, | ||
| ) | ||
|
|
||
| return hasattr(Messages, "parse") | ||
| except ImportError: | ||
| return False |
| assert span.attributes[GenAIAttributes.GEN_AI_OPERATION_NAME] == "chat" | ||
| assert span.attributes[GenAIAttributes.GEN_AI_PROVIDER_NAME] == "anthropic" | ||
| assert span.attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] == model |
| assert span.name == f"chat {model}" | ||
| assert span.attributes[GenAIAttributes.GEN_AI_OPERATION_NAME] == "chat" | ||
| assert span.attributes[GenAIAttributes.GEN_AI_PROVIDER_NAME] == "anthropic" | ||
| assert span.attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] == model |
| @@ -0,0 +1 @@ | |||
| Add Anthropic instrumentation for Messages.parse, AsyncMessages.parse, and AsyncMessages.stream. | |||
|
This PR has review comments. Review suggestions, whether from maintainers or automated reviewers, aren't always correct or required. Please evaluate each comment on its merits, then make sure each thread has a clear outcome. For example, link to the commit if you applied a suggestion, explain why it wasn't applied, or ask a follow-up question. Automation flags a PR for human review once every review thread has a reply or is marked as resolved. Status across open PRs is visible on the pull request dashboard. |
Description
This PR brings our native Anthropic SDK instrumentation closer to parity with OpenInference’s Anthropic SDK coverage by adding instrumentation for the remaining stable Messages APIs.
Specifically, it adds coverage for:
It keeps the existing util-genai based implementation and does not migrate to OpenInference internals.
Fixes # (NA)
Type of change
Please delete options that are not relevant.
How has this been tested?
Please describe the tests that you ran to verify your changes. Provide
instructions so we can reproduce. List any relevant details for your test
configuration.
Checklist
See CONTRIBUTING.md
for the style guide, changelog guidance, and more.