diff --git a/AGENTS.md b/AGENTS.md index 09ab2575fb..a281fff511 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -65,6 +65,9 @@ uv run tox -e typecheck - The monorepo uses `uv` workspaces. - `tox.ini` defines the test matrix - check it for available test environments. - Do not add `type: ignore` comments. If a type error arises, solve it properly or write a follow-up plan to address it in another PR. +- When a file uses `from __future__ import annotations`, do not quote type annotations just to + avoid forward references. Keep quotes only for expressions still evaluated at runtime, such as + `typing.cast(...)`, unless the referenced type is imported at runtime. - Whenever applicable, all code changes should have tests that actually validate the changes. ## Instrumentation rules diff --git a/instrumentation-genai/AGENTS.md b/instrumentation-genai/AGENTS.md index 5cf82d3b14..786b973854 100644 --- a/instrumentation-genai/AGENTS.md +++ b/instrumentation-genai/AGENTS.md @@ -61,6 +61,11 @@ except Exception as exc: raise ``` +Content capture decisions must come from the shared handler, not from instrumentation-local +environment checks or duplicated helper logic. Evaluate the handler's content-capture API once +when creating wrappers (for example, `capture_content = handler.should_capture_content()`) and +pass that value through invocation/request helpers. + ## 4. Semantic conventions Attributes, spans, events, and metrics follow the diff --git a/instrumentation-genai/opentelemetry-instrumentation-anthropic/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-anthropic/CHANGELOG.md index aa5813ad3c..31759ba0b9 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-anthropic/CHANGELOG.md +++ b/instrumentation-genai/opentelemetry-instrumentation-anthropic/CHANGELOG.md @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Add instrumentation for Anthropic `Messages.stream()` helper method + ([#4499](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4499)) - Add async Anthropic message stream wrappers and manager wrappers, with wrapper tests ([#4346](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4346)) - `AsyncMessagesStreamWrapper` for async message stream telemetry diff --git a/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/__init__.py index 7195dc7787..19a34d16d4 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/__init__.py +++ b/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/__init__.py @@ -53,14 +53,16 @@ wrap_function_wrapper, # pyright: ignore[reportUnknownVariableType] ) -from opentelemetry.instrumentation.anthropic.package import _instruments -from opentelemetry.instrumentation.anthropic.patch import ( - messages_create, -) from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import unwrap from opentelemetry.util.genai.handler import TelemetryHandler +from .package import _instruments +from .patch import ( + messages_create, + messages_stream, +) + class AnthropicInstrumentor(BaseInstrumentor): """An instrumentor for the Anthropic Python SDK. @@ -99,12 +101,17 @@ def _instrument(self, **kwargs: Any) -> None: logger_provider=logger_provider, ) - # Patch Messages.create + # Patch Messages.create and Messages.stream wrap_function_wrapper( "anthropic.resources.messages", "Messages.create", messages_create(handler), ) + wrap_function_wrapper( + "anthropic.resources.messages", + "Messages.stream", + messages_stream(handler), + ) def _uninstrument(self, **kwargs: Any) -> None: """Disable Anthropic instrumentation. @@ -117,3 +124,7 @@ def _uninstrument(self, **kwargs: Any) -> None: anthropic.resources.messages.Messages, # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType,reportUnknownArgumentType] "create", ) + unwrap( + anthropic.resources.messages.Messages, # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType,reportUnknownArgumentType] + "stream", + ) diff --git a/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/messages_extractors.py b/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/messages_extractors.py index 8c3dcf26ec..32d83ae21d 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/messages_extractors.py +++ b/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/messages_extractors.py @@ -27,9 +27,9 @@ from opentelemetry.semconv._incubating.attributes import ( server_attributes as ServerAttributes, ) +from opentelemetry.util.genai.invocation import InferenceInvocation from opentelemetry.util.genai.types import ( InputMessage, - LLMInvocation, MessagePart, OutputMessage, ) @@ -155,7 +155,7 @@ def get_output_messages_from_message( def set_invocation_response_attributes( - invocation: LLMInvocation, + invocation: InferenceInvocation, message: Message | None, capture_content: bool, ) -> None: @@ -220,18 +220,15 @@ def extract_params( # pylint: disable=too-many-locals ) -def _set_server_address_and_port( +def get_server_address_and_port( client_instance: "Messages", - attributes: dict[str, AttributeValue | None], -) -> None: +) -> tuple[str | None, int | None]: base_url = client_instance._client.base_url - host = base_url.host - if host: - attributes[ServerAttributes.SERVER_ADDRESS] = host - port = base_url.port - if port and port != 443 and port > 0: - attributes[ServerAttributes.SERVER_PORT] = port + return ( + base_url.host or None, + port if port and port != 443 and port > 0 else None, + ) def get_llm_request_attributes( @@ -247,5 +244,9 @@ def get_llm_request_attributes( GenAIAttributes.GEN_AI_REQUEST_TOP_K: params.top_k, GenAIAttributes.GEN_AI_REQUEST_STOP_SEQUENCES: params.stop_sequences, } - _set_server_address_and_port(client_instance, attributes) + address, port = get_server_address_and_port(client_instance) + if address is not None: + attributes[ServerAttributes.SERVER_ADDRESS] = address + if port is not None: + attributes[ServerAttributes.SERVER_PORT] = port return {k: v for k, v in attributes.items() if v is not None} diff --git a/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/patch.py b/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/patch.py index 845e10de3f..363ad0d2bc 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/patch.py +++ b/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/patch.py @@ -26,26 +26,25 @@ gen_ai_attributes as GenAIAttributes, ) from opentelemetry.util.genai.handler import TelemetryHandler -from opentelemetry.util.genai.types import ( - Error, - LLMInvocation, # TODO: migrate to InferenceInvocation -) -from opentelemetry.util.genai.utils import ( - should_capture_content_on_spans_in_experimental_mode, -) +from opentelemetry.util.genai.invocation import InferenceInvocation from .messages_extractors import ( extract_params, get_input_messages, get_llm_request_attributes, + get_server_address_and_port, get_system_instruction, ) from .wrappers import ( + MessagesStreamManagerWrapper, MessagesStreamWrapper, MessageWrapper, ) if TYPE_CHECKING: + from anthropic.lib.streaming._messages import ( # pylint: disable=no-name-in-module + MessageStreamManager, + ) from anthropic.resources.messages import Messages from anthropic.types import RawMessageStreamEvent @@ -59,73 +58,113 @@ def messages_create( ) -> Callable[ ..., Union[ - "AnthropicMessage", - "AnthropicStream[RawMessageStreamEvent]", + AnthropicMessage, + AnthropicStream[RawMessageStreamEvent], MessagesStreamWrapper[None], ], ]: """Wrap the `create` method of the `Messages` class to trace it.""" - capture_content = should_capture_content_on_spans_in_experimental_mode() + capture_content = handler.should_capture_content() def traced_method( wrapped: Callable[ ..., Union[ - "AnthropicMessage", - "AnthropicStream[RawMessageStreamEvent]", + AnthropicMessage, + AnthropicStream[RawMessageStreamEvent], ], ], - instance: "Messages", + instance: Messages, args: tuple[Any, ...], kwargs: dict[str, Any], ) -> Union[ - "AnthropicMessage", - "AnthropicStream[RawMessageStreamEvent]", + AnthropicMessage, + AnthropicStream[RawMessageStreamEvent], MessagesStreamWrapper[None], ]: - params = extract_params(*args, **kwargs) - attributes = get_llm_request_attributes(params, instance) - request_model_attribute = attributes.get( - GenAIAttributes.GEN_AI_REQUEST_MODEL + invocation = _create_invocation( + handler, instance, args, kwargs, capture_content ) - request_model = ( - request_model_attribute - if isinstance(request_model_attribute, str) - else params.model - ) - - invocation = LLMInvocation( - request_model=request_model, - provider=ANTHROPIC, - input_messages=get_input_messages(params.messages) - if capture_content - else [], - system_instruction=get_system_instruction(params.system) - if capture_content - else [], - attributes=attributes, - ) - - # Use manual lifecycle management for both streaming and non-streaming - handler.start_llm(invocation) try: result = wrapped(*args, **kwargs) if isinstance(result, AnthropicStream): return MessagesStreamWrapper( - result, handler, invocation, capture_content + result, invocation, capture_content ) wrapper = MessageWrapper(result, capture_content) wrapper.extract_into(invocation) - handler.stop_llm(invocation) + invocation.stop() return wrapper.message except Exception as exc: - handler.fail_llm( - invocation, Error(message=str(exc), type=type(exc)) - ) + invocation.fail(exc) raise return cast( 'Callable[..., Union["AnthropicMessage", "AnthropicStream[RawMessageStreamEvent]", MessagesStreamWrapper[None]]]', traced_method, ) + + +def _create_invocation( + handler: TelemetryHandler, + instance: Messages, + args: tuple[Any, ...], + kwargs: dict[str, Any], + capture_content: bool, +) -> InferenceInvocation: + params = extract_params(*args, **kwargs) + attributes = get_llm_request_attributes(params, instance) + request_model_attribute = attributes.get( + GenAIAttributes.GEN_AI_REQUEST_MODEL + ) + request_model = ( + request_model_attribute + if isinstance(request_model_attribute, str) + else params.model + ) + + server_address, server_port = get_server_address_and_port(instance) + invocation = handler.start_inference( + provider=ANTHROPIC, + request_model=request_model, + server_address=server_address, + server_port=server_port, + ) + invocation.input_messages = ( + get_input_messages(params.messages) if capture_content else [] + ) + invocation.system_instruction = ( + get_system_instruction(params.system) if capture_content else [] + ) + invocation.attributes = attributes + return invocation + + +def messages_stream( + handler: TelemetryHandler, +) -> Callable[..., MessagesStreamManagerWrapper[Any]]: + """Wrap the sync `stream` method of the `Messages` class.""" + capture_content = handler.should_capture_content() + + def traced_method( + wrapped: Callable[..., MessageStreamManager], + instance: Messages, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> MessagesStreamManagerWrapper[Any]: + invocation = _create_invocation( + handler, instance, args, kwargs, capture_content + ) + + try: + return MessagesStreamManagerWrapper( + wrapped(*args, **kwargs), invocation, capture_content + ) + except Exception as exc: + invocation.fail(exc) + raise + + return cast( + "Callable[..., MessagesStreamManagerWrapper[Any]]", traced_method + ) diff --git a/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/utils.py b/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/utils.py index 7399b8831b..dbfe14ff6f 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/utils.py +++ b/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/utils.py @@ -18,8 +18,9 @@ import base64 import json +from collections.abc import Mapping from dataclasses import dataclass -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, cast from anthropic.types import ( InputJSONDelta, @@ -43,7 +44,7 @@ ) if TYPE_CHECKING: - from collections.abc import Iterable, Mapping + from collections.abc import Iterable from anthropic.types import ( ContentBlock, @@ -160,12 +161,9 @@ def _convert_content_block_to_part( id=block.tool_use_id, ) - # ContentBlockParam variants are TypedDicts (dicts at runtime); - # newer SDK versions may add Pydantic block types not handled above. - if isinstance(block, dict): - return _convert_dict_block_to_part(block) - - return None + if not hasattr(block, "get"): + return None + return _convert_dict_block_to_part(cast(Mapping[str, Any], block)) def convert_content_to_parts( diff --git a/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/wrappers.py b/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/wrappers.py index 1a9d5319a5..93c9aafa2b 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/wrappers.py +++ b/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/wrappers.py @@ -15,7 +15,7 @@ from __future__ import annotations import logging -from contextlib import AsyncExitStack, ExitStack, contextmanager +from contextlib import contextmanager from types import TracebackType from typing import ( TYPE_CHECKING, @@ -28,12 +28,6 @@ cast, ) -from opentelemetry.util.genai.handler import TelemetryHandler -from opentelemetry.util.genai.types import ( - Error, - LLMInvocation, # TODO: migrate to InferenceInvocation -) - from .messages_extractors import set_invocation_response_attributes try: @@ -60,6 +54,8 @@ ) from anthropic.types.parsed_message import ParsedMessage + from opentelemetry.util.genai.invocation import InferenceInvocation + _logger = logging.getLogger(__name__) ResponseT = TypeVar("ResponseT") @@ -68,8 +64,8 @@ def _set_response_attributes( - invocation: LLMInvocation, - result: "Message | None", + invocation: InferenceInvocation, + result: Message | None, capture_content: bool, ) -> None: set_invocation_response_attributes(invocation, result, capture_content) @@ -112,7 +108,7 @@ def __init__(self, message: Message, capture_content: bool): self._message = message self._capture_content = capture_content - def extract_into(self, invocation: LLMInvocation) -> None: + def extract_into(self, invocation: InferenceInvocation) -> None: """Extract response data into the invocation.""" set_invocation_response_attributes( invocation, self._message, self._capture_content @@ -134,19 +130,17 @@ class MessagesStreamWrapper( def __init__( self, - stream: "Stream[RawMessageStreamEvent] | MessageStream[ResponseFormatT]", - handler: TelemetryHandler, - invocation: LLMInvocation, + stream: Stream[RawMessageStreamEvent] | MessageStream[ResponseFormatT], + invocation: InferenceInvocation, capture_content: bool, ): self.stream = stream - self.handler = handler self.invocation = invocation - self._message: "Message | ParsedMessage[ResponseFormatT] | None" = None + self._message: Message | ParsedMessage[ResponseFormatT] | None = None self._capture_content = capture_content self._finalized = False - def __enter__(self) -> "MessagesStreamWrapper[ResponseFormatT]": + def __enter__(self) -> MessagesStreamWrapper[ResponseFormatT]: return self def __exit__( @@ -156,10 +150,8 @@ def __exit__( exc_tb: TracebackType | None, ) -> bool: try: - if exc_type is not None: - self._fail( - str(exc_val), type(exc_val) if exc_val else Exception - ) + if exc_val is not None: + self._fail(exc_val) finally: self.close() return False @@ -167,22 +159,24 @@ def __exit__( def close(self) -> None: try: self.stream.close() - finally: - self._stop() + except Exception as exc: + self._fail(exc) + raise + self._stop() - def __iter__(self) -> "MessagesStreamWrapper[ResponseFormatT]": + def __iter__(self) -> MessagesStreamWrapper[ResponseFormatT]: return self def __next__( self, - ) -> "RawMessageStreamEvent | ParsedMessageStreamEvent[ResponseFormatT]": + ) -> RawMessageStreamEvent | ParsedMessageStreamEvent[ResponseFormatT]: try: chunk = next(self.stream) except StopIteration: self._stop() raise except Exception as exc: - self._fail(str(exc), type(exc)) + self._fail(exc) raise with self._safe_instrumentation("stream chunk processing"): self._process_chunk(chunk) @@ -202,17 +196,15 @@ def _stop(self) -> None: _set_response_attributes( self.invocation, self._message, self._capture_content ) - with self._safe_instrumentation("stop_llm"): - self.handler.stop_llm(self.invocation) + with self._safe_instrumentation("invocation stop"): + self.invocation.stop() self._finalized = True - def _fail(self, message: str, error_type: type[BaseException]) -> None: + def _fail(self, exc: BaseException) -> None: if self._finalized: return - with self._safe_instrumentation("fail_llm"): - self.handler.fail_llm( - self.invocation, Error(message=message, type=error_type) - ) + with self._safe_instrumentation("invocation fail"): + self.invocation.fail(exc) self._finalized = True @staticmethod @@ -231,7 +223,8 @@ def _safe_instrumentation( def _process_chunk( self, - chunk: "RawMessageStreamEvent | ParsedMessageStreamEvent[ResponseFormatT]", + chunk: RawMessageStreamEvent + | ParsedMessageStreamEvent[ResponseFormatT], ) -> None: """Accumulate a final message snapshot from a streaming chunk.""" snapshot = cast( @@ -256,21 +249,20 @@ class AsyncMessagesStreamWrapper(MessagesStreamWrapper[ResponseFormatT]): def __init__( self, - stream: "AsyncStream[RawMessageStreamEvent] | AsyncMessageStream[ResponseFormatT]", - handler: TelemetryHandler, - invocation: LLMInvocation, + stream: AsyncStream[RawMessageStreamEvent] + | AsyncMessageStream[ResponseFormatT], + invocation: InferenceInvocation, capture_content: bool, ): self.stream = stream - self.handler = handler self.invocation = invocation - self._message: "Message | ParsedMessage[ResponseFormatT] | None" = None + self._message: Message | ParsedMessage[ResponseFormatT] | None = None self._capture_content = capture_content self._finalized = False async def __aenter__( self, - ) -> "AsyncMessagesStreamWrapper[ResponseFormatT]": + ) -> AsyncMessagesStreamWrapper[ResponseFormatT]: return self async def __aexit__( @@ -280,10 +272,8 @@ async def __aexit__( exc_tb: TracebackType | None, ) -> bool: try: - if exc_type is not None: - self._fail( - str(exc_val), type(exc_val) if exc_val else Exception - ) + if exc_val is not None: + self._fail(exc_val) finally: await self.close() return False @@ -291,10 +281,12 @@ async def __aexit__( async def close(self) -> None: # type: ignore[override] try: await self.stream.close() - finally: - self._stop() + except Exception as exc: + self._fail(exc) + raise + self._stop() - def __aiter__(self) -> "AsyncMessagesStreamWrapper[ResponseFormatT]": + def __aiter__(self) -> AsyncMessagesStreamWrapper[ResponseFormatT]: return self @property @@ -303,14 +295,14 @@ def response(self) -> Any: async def __anext__( self, - ) -> "RawMessageStreamEvent | ParsedMessageStreamEvent[ResponseFormatT]": + ) -> RawMessageStreamEvent | ParsedMessageStreamEvent[ResponseFormatT]: try: chunk = await self.stream.__anext__() except StopAsyncIteration: self._stop() raise except Exception as exc: - self._fail(str(exc), type(exc)) + self._fail(exc) raise with self._safe_instrumentation("stream chunk processing"): self._process_chunk(chunk) @@ -322,13 +314,11 @@ class MessagesStreamManagerWrapper(Generic[ResponseFormatT]): def __init__( self, - manager: "MessageStreamManager[ResponseFormatT]", - handler: TelemetryHandler, - invocation: LLMInvocation, + manager: MessageStreamManager[ResponseFormatT], + invocation: InferenceInvocation, capture_content: bool, ): self._manager = manager - self._handler = handler self._invocation = invocation self._capture_content = capture_content self._stream_wrapper: MessagesStreamWrapper[ResponseFormatT] | None = ( @@ -336,10 +326,13 @@ def __init__( ) def __enter__(self) -> MessagesStreamWrapper[ResponseFormatT]: - stream = self._manager.__enter__() + try: + stream = self._manager.__enter__() + except Exception as exc: + self._invocation.fail(exc) + raise self._stream_wrapper = MessagesStreamWrapper( stream, - self._handler, self._invocation, self._capture_content, ) @@ -351,21 +344,22 @@ def __exit__( exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> bool | None: - suppressed = False stream_wrapper = self._stream_wrapper self._stream_wrapper = None - with ExitStack() as cleanup: - if stream_wrapper is not None: - - def finalize_stream_wrapper() -> None: - if suppressed: - stream_wrapper.__exit__(None, None, None) - else: - stream_wrapper.__exit__(exc_type, exc_val, exc_tb) - - cleanup.callback(finalize_stream_wrapper) + try: suppressed = self._manager.__exit__(exc_type, exc_val, exc_tb) - return suppressed + except Exception as exc: + if stream_wrapper is not None: + stream_wrapper.__exit__(type(exc), exc, exc.__traceback__) + else: + self._invocation.fail(exc) + raise + if stream_wrapper is not None: + if suppressed: + stream_wrapper.__exit__(None, None, None) + else: + stream_wrapper.__exit__(exc_type, exc_val, exc_tb) + return suppressed def __getattr__(self, name: str) -> object: return getattr(self._manager, name) @@ -380,13 +374,11 @@ class AsyncMessagesStreamManagerWrapper(Generic[ResponseFormatT]): def __init__( self, - manager: "AsyncMessageStreamManager[ResponseFormatT]", - handler: TelemetryHandler, - invocation: LLMInvocation, + manager: AsyncMessageStreamManager[ResponseFormatT], + invocation: InferenceInvocation, capture_content: bool, ): self._manager = manager - self._handler = handler self._invocation = invocation self._capture_content = capture_content self._stream_wrapper: ( @@ -396,10 +388,13 @@ def __init__( async def __aenter__( self, ) -> AsyncMessagesStreamWrapper[ResponseFormatT]: - msg_stream = await self._manager.__aenter__() + try: + msg_stream = await self._manager.__aenter__() + except Exception as exc: + self._invocation.fail(exc) + raise self._stream_wrapper = AsyncMessagesStreamWrapper( msg_stream, - self._handler, self._invocation, self._capture_content, ) @@ -411,25 +406,26 @@ async def __aexit__( exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> bool | None: - suppressed = False stream_wrapper = self._stream_wrapper self._stream_wrapper = None - async with AsyncExitStack() as cleanup: - if stream_wrapper is not None: - - async def finalize_stream_wrapper() -> None: - if suppressed: - await stream_wrapper.__aexit__(None, None, None) - else: - await stream_wrapper.__aexit__( - exc_type, exc_val, exc_tb - ) - - cleanup.push_async_callback(finalize_stream_wrapper) + try: suppressed = await self._manager.__aexit__( exc_type, exc_val, exc_tb ) - return suppressed + except Exception as exc: + if stream_wrapper is not None: + await stream_wrapper.__aexit__( + type(exc), exc, exc.__traceback__ + ) + else: + self._invocation.fail(exc) + raise + if stream_wrapper is not None: + if suppressed: + await stream_wrapper.__aexit__(None, None, None) + else: + await stream_wrapper.__aexit__(exc_type, exc_val, exc_tb) + return suppressed def __getattr__(self, name: str) -> object: return getattr(self._manager, name) diff --git a/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream.yaml b/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream.yaml new file mode 100644 index 0000000000..7b92e4e237 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream.yaml @@ -0,0 +1,141 @@ +interactions: +- request: + body: |- + { + "max_tokens": 100, + "messages": [ + { + "role": "user", + "content": "Say hello in one word." + } + ], + "model": "claude-sonnet-4-20250514", + "stream": true + } + headers: + Accept: + - application/json + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Length: + - '131' + Content-Type: + - application/json + Host: + - api.anthropic.com + User-Agent: + - Anthropic/Python 0.96.0 + X-Stainless-Arch: + - arm64 + X-Stainless-Async: + - 'false' + X-Stainless-Helper-Method: + - stream + X-Stainless-Lang: + - python + X-Stainless-OS: + - MacOS + X-Stainless-Package-Version: + - 0.96.0 + X-Stainless-Runtime: + - CPython + X-Stainless-Runtime-Version: + - 3.13.9 + X-Stainless-Stream-Helper: + - messages + anthropic-version: + - '2023-06-01' + x-api-key: + - test_anthropic_api_key + x-stainless-read-timeout: + - '600' + x-stainless-retry-count: + - '0' + x-stainless-timeout: + - NOT_GIVEN + method: POST + uri: https://api.anthropic.com/v1/messages + response: + body: + string: |+ + event: message_start + data: {"type":"message_start","message":{"model":"claude-sonnet-4-20250514","id":"msg_01FpWuSsvRgJp3eYbdHBinNp","type":"message","role":"assistant","content":[],"stop_reason":null,"stop_sequence":null,"stop_details":null,"usage":{"input_tokens":13,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":2,"service_tier":"standard","inference_geo":"not_available"}} } + + event: content_block_start + data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""} } + + event: ping + data: {"type": "ping"} + + event: content_block_delta + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello!"} } + + event: content_block_stop + data: {"type":"content_block_stop","index":0 } + + event: message_delta + data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null,"stop_details":null},"usage":{"input_tokens":13,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":5} } + + event: message_stop + data: {"type":"message_stop" } + + headers: + CF-RAY: + - 9f27cb7e49a7c336-EWR + Cache-Control: + - no-cache + Connection: + - keep-alive + Content-Security-Policy: + - default-src 'none'; frame-ancestors 'none' + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Sun, 26 Apr 2026 18:54:15 GMT + Server: + - cloudflare + Transfer-Encoding: + - chunked + X-Robots-Tag: + - none + anthropic-ratelimit-input-tokens-limit: + - '450000' + anthropic-ratelimit-input-tokens-remaining: + - '450000' + anthropic-ratelimit-input-tokens-reset: + - '2026-04-26T18:54:14Z' + anthropic-ratelimit-output-tokens-limit: + - '90000' + anthropic-ratelimit-output-tokens-remaining: + - '90000' + anthropic-ratelimit-output-tokens-reset: + - '2026-04-26T18:54:14Z' + anthropic-ratelimit-requests-limit: + - '1000' + anthropic-ratelimit-requests-remaining: + - '999' + anthropic-ratelimit-requests-reset: + - '2026-04-26T18:54:14Z' + anthropic-ratelimit-tokens-limit: + - '540000' + anthropic-ratelimit-tokens-remaining: + - '540000' + anthropic-ratelimit-tokens-reset: + - '2026-04-26T18:54:14Z' + cf-cache-status: + - DYNAMIC + request-id: + - req_011CaSta4VpT5jmAHFwkrWEx + set-cookie: + - _cfuvid=fBfT7HzkROiMbhYZZb.a20S2fBiaSuYdrEdkO.cQAAU-1777229654.7651122-1.0.1.1-MTBV4KVg94Wya_bPOQNKc7u6yC1bHmornZk3mDa_jpA; + HttpOnly; SameSite=None; Secure; Path=/; Domain=api.anthropic.com + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + x-envoy-upstream-service-time: + - '1072' + status: + code: 200 + message: OK +version: 1 diff --git a/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream_api_error.yaml b/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream_api_error.yaml new file mode 100644 index 0000000000..c7c9a50639 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream_api_error.yaml @@ -0,0 +1,110 @@ +interactions: +- request: + body: |- + { + "max_tokens": 100, + "messages": [ + { + "role": "user", + "content": "Hello" + } + ], + "model": "invalid-model-name", + "stream": true + } + headers: + Accept: + - application/json + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Length: + - '108' + Content-Type: + - application/json + Host: + - api.anthropic.com + User-Agent: + - Anthropic/Python 0.96.0 + X-Stainless-Arch: + - arm64 + X-Stainless-Async: + - 'false' + X-Stainless-Helper-Method: + - stream + X-Stainless-Lang: + - python + X-Stainless-OS: + - MacOS + X-Stainless-Package-Version: + - 0.96.0 + X-Stainless-Runtime: + - CPython + X-Stainless-Runtime-Version: + - 3.13.9 + X-Stainless-Stream-Helper: + - messages + anthropic-version: + - '2023-06-01' + x-api-key: + - test_anthropic_api_key + x-stainless-read-timeout: + - '600' + x-stainless-retry-count: + - '0' + x-stainless-timeout: + - NOT_GIVEN + method: POST + uri: https://api.anthropic.com/v1/messages + response: + body: + string: |- + { + "type": "error", + "error": { + "type": "not_found_error", + "message": "model: invalid-model-name" + }, + "request_id": "req_011CaStaVbvatZBsVSEdCNup" + } + headers: + CF-RAY: + - 9f27cba32ab3e8a3-EWR + Connection: + - keep-alive + Content-Security-Policy: + - default-src 'none'; frame-ancestors 'none' + Content-Type: + - application/json + Date: + - Sun, 26 Apr 2026 18:54:20 GMT + Server: + - cloudflare + Transfer-Encoding: + - chunked + X-Robots-Tag: + - none + cf-cache-status: + - DYNAMIC + content-length: + - '133' + request-id: + - req_011CaStaVbvatZBsVSEdCNup + server-timing: + - x-originResponse;dur=24 + set-cookie: + - _cfuvid=8TQIv3k3c.VCQiwMFKLGgBJhj79YyFhu9LbIIlsvbQA-1777229660.661682-1.0.1.1-uptE0mJmjr5xQDBoWJKx3DLKES6.BVQIhMDo9KANFKM; + HttpOnly; SameSite=None; Secure; Path=/; Domain=api.anthropic.com + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + vary: + - Accept-Encoding + x-envoy-upstream-service-time: + - '22' + x-should-retry: + - 'false' + status: + code: 404 + message: Not Found +version: 1 diff --git a/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream_captures_content.yaml b/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream_captures_content.yaml new file mode 100644 index 0000000000..c5cedb7fb0 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream_captures_content.yaml @@ -0,0 +1,147 @@ +interactions: +- request: + body: |- + { + "max_tokens": 100, + "messages": [ + { + "role": "user", + "content": "Say hello in one word." + } + ], + "model": "claude-sonnet-4-20250514", + "stream": true + } + headers: + Accept: + - application/json + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Length: + - '131' + Content-Type: + - application/json + Host: + - api.anthropic.com + User-Agent: + - Anthropic/Python 0.96.0 + X-Stainless-Arch: + - arm64 + X-Stainless-Async: + - 'false' + X-Stainless-Helper-Method: + - stream + X-Stainless-Lang: + - python + X-Stainless-OS: + - MacOS + X-Stainless-Package-Version: + - 0.96.0 + X-Stainless-Runtime: + - CPython + X-Stainless-Runtime-Version: + - 3.13.9 + X-Stainless-Stream-Helper: + - messages + anthropic-version: + - '2023-06-01' + x-api-key: + - test_anthropic_api_key + x-stainless-read-timeout: + - '600' + x-stainless-retry-count: + - '0' + x-stainless-timeout: + - NOT_GIVEN + method: POST + uri: https://api.anthropic.com/v1/messages + response: + body: + string: |+ + event: message_start + data: {"type":"message_start","message":{"model":"claude-sonnet-4-20250514","id":"msg_01Bt5rjPN2WdS2JXoYHith46","type":"message","role":"assistant","content":[],"stop_reason":null,"stop_sequence":null,"stop_details":null,"usage":{"input_tokens":13,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":2,"service_tier":"standard","inference_geo":"not_available"}} } + + event: content_block_start + data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""} } + + event: ping + data: {"type": "ping"} + + event: content_block_delta + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello!"} } + + event: content_block_stop + data: {"type":"content_block_stop","index":0 } + + event: message_delta + data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null,"stop_details":null},"usage":{"input_tokens":13,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":5} } + + event: message_stop + data: {"type":"message_stop" } + + headers: + CF-RAY: + - 9f27cb86aa326a4e-EWR + Cache-Control: + - no-cache + Connection: + - keep-alive + Content-Security-Policy: + - default-src 'none'; frame-ancestors 'none' + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Sun, 26 Apr 2026 18:54:17 GMT + Server: + - cloudflare + Transfer-Encoding: + - chunked + X-Robots-Tag: + - none + anthropic-ratelimit-input-tokens-limit: + - '450000' + anthropic-ratelimit-input-tokens-remaining: + - '450000' + anthropic-ratelimit-input-tokens-reset: + - '2026-04-26T18:54:16Z' + anthropic-ratelimit-output-tokens-limit: + - '90000' + anthropic-ratelimit-output-tokens-remaining: + - '90000' + anthropic-ratelimit-output-tokens-reset: + - '2026-04-26T18:54:16Z' + anthropic-ratelimit-requests-limit: + - '1000' + anthropic-ratelimit-requests-remaining: + - '999' + anthropic-ratelimit-requests-reset: + - '2026-04-26T18:54:16Z' + anthropic-ratelimit-tokens-limit: + - '540000' + anthropic-ratelimit-tokens-remaining: + - '540000' + anthropic-ratelimit-tokens-reset: + - '2026-04-26T18:54:16Z' + cf-cache-status: + - DYNAMIC + content-length: + - '1175' + request-id: + - req_011CaStaA8ttNQuUt57xTZuL + server-timing: + - x-originResponse;dur=1160 + set-cookie: + - _cfuvid=MYGinn5q2GajEKA_1ZXEQSMPZFa_pITpAgkkbV8.mqM-1777229656.10707-1.0.1.1-7.CYqTxyXg1Q3PxRwisVM9dNE5a3H9KCY5kZiqcw9qk; + HttpOnly; SameSite=None; Secure; Path=/; Domain=api.anthropic.com + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + vary: + - Accept-Encoding + x-envoy-upstream-service-time: + - '1158' + status: + code: 200 + message: OK +version: 1 diff --git a/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream_closed_early_by_caller.yaml b/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream_closed_early_by_caller.yaml new file mode 100644 index 0000000000..0cb083b381 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream_closed_early_by_caller.yaml @@ -0,0 +1,147 @@ +interactions: +- request: + body: |- + { + "max_tokens": 100, + "messages": [ + { + "role": "user", + "content": "Say hello in one word." + } + ], + "model": "claude-sonnet-4-20250514", + "stream": true + } + headers: + Accept: + - application/json + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Length: + - '131' + Content-Type: + - application/json + Host: + - api.anthropic.com + User-Agent: + - Anthropic/Python 0.96.0 + X-Stainless-Arch: + - arm64 + X-Stainless-Async: + - 'false' + X-Stainless-Helper-Method: + - stream + X-Stainless-Lang: + - python + X-Stainless-OS: + - MacOS + X-Stainless-Package-Version: + - 0.96.0 + X-Stainless-Runtime: + - CPython + X-Stainless-Runtime-Version: + - 3.13.9 + X-Stainless-Stream-Helper: + - messages + anthropic-version: + - '2023-06-01' + x-api-key: + - test_anthropic_api_key + x-stainless-read-timeout: + - '600' + x-stainless-retry-count: + - '0' + x-stainless-timeout: + - NOT_GIVEN + method: POST + uri: https://api.anthropic.com/v1/messages + response: + body: + string: |+ + event: message_start + data: {"type":"message_start","message":{"model":"claude-sonnet-4-20250514","id":"msg_01Bso8Ps9vpszQBB5vFUaWQx","type":"message","role":"assistant","content":[],"stop_reason":null,"stop_sequence":null,"stop_details":null,"usage":{"input_tokens":13,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":2,"service_tier":"standard","inference_geo":"not_available"}} } + + event: content_block_start + data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""} } + + event: ping + data: {"type": "ping"} + + event: content_block_delta + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello!"}} + + event: content_block_stop + data: {"type":"content_block_stop","index":0 } + + event: message_delta + data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null,"stop_details":null},"usage":{"input_tokens":13,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":5} } + + event: message_stop + data: {"type":"message_stop"} + + headers: + CF-RAY: + - 9f62cebe3f501906-EWR + Cache-Control: + - no-cache + Connection: + - keep-alive + Content-Security-Policy: + - default-src 'none'; frame-ancestors 'none' + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Sun, 03 May 2026 22:47:28 GMT + Server: + - cloudflare + Transfer-Encoding: + - chunked + X-Robots-Tag: + - none + anthropic-ratelimit-input-tokens-limit: + - '450000' + anthropic-ratelimit-input-tokens-remaining: + - '450000' + anthropic-ratelimit-input-tokens-reset: + - '2026-05-03T22:47:27Z' + anthropic-ratelimit-output-tokens-limit: + - '90000' + anthropic-ratelimit-output-tokens-remaining: + - '90000' + anthropic-ratelimit-output-tokens-reset: + - '2026-05-03T22:47:27Z' + anthropic-ratelimit-requests-limit: + - '1000' + anthropic-ratelimit-requests-remaining: + - '999' + anthropic-ratelimit-requests-reset: + - '2026-05-03T22:47:27Z' + anthropic-ratelimit-tokens-limit: + - '540000' + anthropic-ratelimit-tokens-remaining: + - '540000' + anthropic-ratelimit-tokens-reset: + - '2026-05-03T22:47:27Z' + cf-cache-status: + - DYNAMIC + content-length: + - '1154' + request-id: + - req_011CagT1x9aQyaFmxJ5GNARp + set-cookie: + - _cfuvid=FJE3MQyoJt8lkF4.Kt2pCOiULwrAWJefeAJPW3dEadE-1777848447.712399-1.0.1.1-jh2AssDriF8MBsq43fuPcFLny3DvtSkWWHqOo0x8k18; + HttpOnly; SameSite=None; Secure; Path=/; Domain=api.anthropic.com + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + traceresponse: + - 00-ca1e689f10c6d029831c2bac29080b26-ec7932028676ce57-01 + vary: + - Accept-Encoding + x-envoy-upstream-service-time: + - '881' + status: + code: 200 + message: OK +version: 1 diff --git a/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream_delegates_response_attribute.yaml b/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream_delegates_response_attribute.yaml new file mode 100644 index 0000000000..a6bb0aaaaf --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream_delegates_response_attribute.yaml @@ -0,0 +1,150 @@ +interactions: +- request: + body: |- + { + "max_tokens": 100, + "messages": [ + { + "role": "user", + "content": "Say hi." + } + ], + "model": "claude-sonnet-4-20250514", + "stream": true + } + headers: + Accept: + - application/json + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Length: + - '116' + Content-Type: + - application/json + Host: + - api.anthropic.com + User-Agent: + - Anthropic/Python 0.96.0 + X-Stainless-Arch: + - arm64 + X-Stainless-Async: + - 'false' + X-Stainless-Helper-Method: + - stream + X-Stainless-Lang: + - python + X-Stainless-OS: + - MacOS + X-Stainless-Package-Version: + - 0.96.0 + X-Stainless-Runtime: + - CPython + X-Stainless-Runtime-Version: + - 3.13.9 + X-Stainless-Stream-Helper: + - messages + anthropic-version: + - '2023-06-01' + x-api-key: + - test_anthropic_api_key + x-stainless-read-timeout: + - '600' + x-stainless-retry-count: + - '0' + x-stainless-timeout: + - NOT_GIVEN + method: POST + uri: https://api.anthropic.com/v1/messages + response: + body: + string: |+ + event: message_start + data: {"type":"message_start","message":{"model":"claude-sonnet-4-20250514","id":"msg_01Mx6ioJSjFRgxnu5wbxXTg6","type":"message","role":"assistant","content":[],"stop_reason":null,"stop_sequence":null,"stop_details":null,"usage":{"input_tokens":10,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":1,"service_tier":"standard","inference_geo":"not_available"}} } + + event: content_block_start + data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""} } + + event: ping + data: {"type": "ping"} + + event: content_block_delta + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hi"} } + + event: content_block_delta + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"! How are you doing today?"} } + + event: content_block_stop + data: {"type":"content_block_stop","index":0 } + + event: message_delta + data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null,"stop_details":null},"usage":{"input_tokens":10,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":11} } + + event: message_stop + data: {"type":"message_stop" } + + headers: + CF-RAY: + - 9f27cb90192a2142-EWR + Cache-Control: + - no-cache + Connection: + - keep-alive + Content-Security-Policy: + - default-src 'none'; frame-ancestors 'none' + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Sun, 26 Apr 2026 18:54:18 GMT + Server: + - cloudflare + Transfer-Encoding: + - chunked + X-Robots-Tag: + - none + anthropic-ratelimit-input-tokens-limit: + - '450000' + anthropic-ratelimit-input-tokens-remaining: + - '450000' + anthropic-ratelimit-input-tokens-reset: + - '2026-04-26T18:54:17Z' + anthropic-ratelimit-output-tokens-limit: + - '90000' + anthropic-ratelimit-output-tokens-remaining: + - '90000' + anthropic-ratelimit-output-tokens-reset: + - '2026-04-26T18:54:17Z' + anthropic-ratelimit-requests-limit: + - '1000' + anthropic-ratelimit-requests-remaining: + - '999' + anthropic-ratelimit-requests-reset: + - '2026-04-26T18:54:17Z' + anthropic-ratelimit-tokens-limit: + - '540000' + anthropic-ratelimit-tokens-remaining: + - '540000' + anthropic-ratelimit-tokens-reset: + - '2026-04-26T18:54:17Z' + cf-cache-status: + - DYNAMIC + content-length: + - '1328' + request-id: + - req_011CaStaGaM4ju6Mv4ateGNY + server-timing: + - x-originResponse;dur=825 + set-cookie: + - _cfuvid=vZNYZnqh1dW.Xjsana2i.xalgEY5pro6FXuKaKY9sGI-1777229657.6146472-1.0.1.1-yjjv90xIDiOv1Wz7BgYKITwNZqlds2H57hBPL5y0KNc; + HttpOnly; SameSite=None; Secure; Path=/; Domain=api.anthropic.com + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + vary: + - Accept-Encoding + x-envoy-upstream-service-time: + - '823' + status: + code: 200 + message: OK +version: 1 diff --git a/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream_interrupted_mid_iteration.yaml b/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream_interrupted_mid_iteration.yaml new file mode 100644 index 0000000000..8b9ffe41ed --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream_interrupted_mid_iteration.yaml @@ -0,0 +1,147 @@ +interactions: +- request: + body: |- + { + "max_tokens": 100, + "messages": [ + { + "role": "user", + "content": "Say hello in one word." + } + ], + "model": "claude-sonnet-4-20250514", + "stream": true + } + headers: + Accept: + - application/json + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Length: + - '131' + Content-Type: + - application/json + Host: + - api.anthropic.com + User-Agent: + - Anthropic/Python 0.96.0 + X-Stainless-Arch: + - arm64 + X-Stainless-Async: + - 'false' + X-Stainless-Helper-Method: + - stream + X-Stainless-Lang: + - python + X-Stainless-OS: + - MacOS + X-Stainless-Package-Version: + - 0.96.0 + X-Stainless-Runtime: + - CPython + X-Stainless-Runtime-Version: + - 3.13.9 + X-Stainless-Stream-Helper: + - messages + anthropic-version: + - '2023-06-01' + x-api-key: + - test_anthropic_api_key + x-stainless-read-timeout: + - '600' + x-stainless-retry-count: + - '0' + x-stainless-timeout: + - NOT_GIVEN + method: POST + uri: https://api.anthropic.com/v1/messages + response: + body: + string: |+ + event: message_start + data: {"type":"message_start","message":{"model":"claude-sonnet-4-20250514","id":"msg_01ArZ4yKLwhfUF16CVjeZSWf","type":"message","role":"assistant","content":[],"stop_reason":null,"stop_sequence":null,"stop_details":null,"usage":{"input_tokens":13,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":2,"service_tier":"standard","inference_geo":"not_available"}}} + + event: content_block_start + data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""} } + + event: ping + data: {"type": "ping"} + + event: content_block_delta + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello!"} } + + event: content_block_stop + data: {"type":"content_block_stop","index":0 } + + event: message_delta + data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null,"stop_details":null},"usage":{"input_tokens":13,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":5}} + + event: message_stop + data: {"type":"message_stop" } + + headers: + CF-RAY: + - 9f62ceb70f4658c1-EWR + Cache-Control: + - no-cache + Connection: + - keep-alive + Content-Security-Policy: + - default-src 'none'; frame-ancestors 'none' + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Sun, 03 May 2026 22:47:27 GMT + Server: + - cloudflare + Transfer-Encoding: + - chunked + X-Robots-Tag: + - none + anthropic-ratelimit-input-tokens-limit: + - '450000' + anthropic-ratelimit-input-tokens-remaining: + - '450000' + anthropic-ratelimit-input-tokens-reset: + - '2026-05-03T22:47:26Z' + anthropic-ratelimit-output-tokens-limit: + - '90000' + anthropic-ratelimit-output-tokens-remaining: + - '90000' + anthropic-ratelimit-output-tokens-reset: + - '2026-05-03T22:47:26Z' + anthropic-ratelimit-requests-limit: + - '1000' + anthropic-ratelimit-requests-remaining: + - '999' + anthropic-ratelimit-requests-reset: + - '2026-05-03T22:47:26Z' + anthropic-ratelimit-tokens-limit: + - '540000' + anthropic-ratelimit-tokens-remaining: + - '540000' + anthropic-ratelimit-tokens-reset: + - '2026-05-03T22:47:26Z' + cf-cache-status: + - DYNAMIC + content-length: + - '1166' + request-id: + - req_011CagT1sFeNm1tViDNAWHeZ + set-cookie: + - _cfuvid=rj75w_GU5xV9cWP7nOfJwscHGqT_ppwRllzb6fSv0Fw-1777848446.5684419-1.0.1.1-f0Kmq9td0Tj1eY06Rwpq86V7Zainu268u8hWFi4QmtY; + HttpOnly; SameSite=None; Secure; Path=/; Domain=api.anthropic.com + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + traceresponse: + - 00-f6a597a36c8af29bd5bd3faa514a8df9-8263b57a1c59d381-01 + vary: + - Accept-Encoding + x-envoy-upstream-service-time: + - '871' + status: + code: 200 + message: OK +version: 1 diff --git a/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream_user_exception.yaml b/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream_user_exception.yaml new file mode 100644 index 0000000000..cd6855edf1 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/test_sync_messages_stream_user_exception.yaml @@ -0,0 +1,147 @@ +interactions: +- request: + body: |- + { + "max_tokens": 100, + "messages": [ + { + "role": "user", + "content": "Say hello in one word." + } + ], + "model": "claude-sonnet-4-20250514", + "stream": true + } + headers: + Accept: + - application/json + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Length: + - '131' + Content-Type: + - application/json + Host: + - api.anthropic.com + User-Agent: + - Anthropic/Python 0.96.0 + X-Stainless-Arch: + - arm64 + X-Stainless-Async: + - 'false' + X-Stainless-Helper-Method: + - stream + X-Stainless-Lang: + - python + X-Stainless-OS: + - MacOS + X-Stainless-Package-Version: + - 0.96.0 + X-Stainless-Runtime: + - CPython + X-Stainless-Runtime-Version: + - 3.13.9 + X-Stainless-Stream-Helper: + - messages + anthropic-version: + - '2023-06-01' + x-api-key: + - test_anthropic_api_key + x-stainless-read-timeout: + - '600' + x-stainless-retry-count: + - '0' + x-stainless-timeout: + - NOT_GIVEN + method: POST + uri: https://api.anthropic.com/v1/messages + response: + body: + string: |+ + event: message_start + data: {"type":"message_start","message":{"model":"claude-sonnet-4-20250514","id":"msg_01PH8Hd2d8JSSU614XXUdhtY","type":"message","role":"assistant","content":[],"stop_reason":null,"stop_sequence":null,"stop_details":null,"usage":{"input_tokens":13,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":2,"service_tier":"standard","inference_geo":"not_available"}} } + + event: content_block_start + data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""} } + + event: ping + data: {"type": "ping"} + + event: content_block_delta + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello."} } + + event: content_block_stop + data: {"type":"content_block_stop","index":0 } + + event: message_delta + data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null,"stop_details":null},"usage":{"input_tokens":13,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":5} } + + event: message_stop + data: {"type":"message_stop" } + + headers: + CF-RAY: + - 9f27cc17f9b8729c-EWR + Cache-Control: + - no-cache + Connection: + - keep-alive + Content-Security-Policy: + - default-src 'none'; frame-ancestors 'none' + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Sun, 26 Apr 2026 18:54:40 GMT + Server: + - cloudflare + Transfer-Encoding: + - chunked + X-Robots-Tag: + - none + anthropic-ratelimit-input-tokens-limit: + - '450000' + anthropic-ratelimit-input-tokens-remaining: + - '450000' + anthropic-ratelimit-input-tokens-reset: + - '2026-04-26T18:54:39Z' + anthropic-ratelimit-output-tokens-limit: + - '90000' + anthropic-ratelimit-output-tokens-remaining: + - '90000' + anthropic-ratelimit-output-tokens-reset: + - '2026-04-26T18:54:39Z' + anthropic-ratelimit-requests-limit: + - '1000' + anthropic-ratelimit-requests-remaining: + - '999' + anthropic-ratelimit-requests-reset: + - '2026-04-26T18:54:39Z' + anthropic-ratelimit-tokens-limit: + - '540000' + anthropic-ratelimit-tokens-remaining: + - '540000' + anthropic-ratelimit-tokens-reset: + - '2026-04-26T18:54:39Z' + cf-cache-status: + - DYNAMIC + content-length: + - '1156' + request-id: + - req_011CaStbsWaJNdwuqs2rpEZu + server-timing: + - x-originResponse;dur=874 + set-cookie: + - _cfuvid=BaJxW_3cagsuNsmTXrSvzGLeKMfyRWc3_0Gowfc3JHk-1777229679.3546934-1.0.1.1-qSr2meNp_OQHOJ_LjwDvlghr6BtUGC6_SQHghI5eSjo; + HttpOnly; SameSite=None; Secure; Path=/; Domain=api.anthropic.com + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + vary: + - Accept-Encoding + x-envoy-upstream-service-time: + - '873' + status: + code: 200 + message: OK +version: 1 diff --git a/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/test_async_wrappers.py b/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/test_async_wrappers.py index 067ef0085e..f5b288fa50 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/test_async_wrappers.py +++ b/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/test_async_wrappers.py @@ -24,48 +24,36 @@ ) -def _noop_stop_llm(invocation): - del invocation - - -def _noop_fail_llm(invocation, error): - del invocation - del error - - -def _make_handler(): +def _make_invocation(): return SimpleNamespace( - stop_llm=_noop_stop_llm, - fail_llm=_noop_fail_llm, + attributes={}, + request_model=None, + stop=lambda: None, + fail=lambda error: None, ) -def _make_invocation(): - return SimpleNamespace(attributes={}, request_model=None) - - -def _make_stream_wrapper(stream, handler=None): +def _make_stream_wrapper(stream): return MessagesStreamWrapper( stream=stream, - handler=handler or _make_handler(), invocation=_make_invocation(), capture_content=False, ) -def _make_async_stream_wrapper(stream, handler=None): +def _make_async_stream_wrapper(stream): return AsyncMessagesStreamWrapper( stream=stream, - handler=handler or _make_handler(), invocation=_make_invocation(), capture_content=False, ) class _FakeSyncStream: - def __init__(self, *, events=None, error=None): + def __init__(self, *, events=None, error=None, close_error=None): self._events = list(events or []) self._error = error + self._close_error = close_error self.close_calls = 0 self.response = _FakeSyncResponse() @@ -81,12 +69,15 @@ def __next__(self): def close(self): self.close_calls += 1 + if self._close_error is not None: + raise self._close_error class _FakeAsyncStream: - def __init__(self, *, events=None, error=None): + def __init__(self, *, events=None, error=None, close_error=None): self._events = list(events or []) self._error = error + self._close_error = close_error self.close_calls = 0 self.final_message = SimpleNamespace(id="msg_final") self.response = _FakeAsyncResponse() @@ -100,19 +91,26 @@ async def __anext__(self): async def close(self): self.close_calls += 1 + if self._close_error is not None: + raise self._close_error async def get_final_message(self): return self.final_message class _FakeSyncManager: - def __init__(self, stream, suppressed=False, exit_error=None): + def __init__( + self, stream, suppressed=False, enter_error=None, exit_error=None + ): self._stream = stream self._suppressed = suppressed + self._enter_error = enter_error self._exit_error = exit_error self.exit_args = None def __enter__(self): + if self._enter_error is not None: + raise self._enter_error return self._stream def __exit__(self, exc_type, exc_val, exc_tb): @@ -123,13 +121,18 @@ def __exit__(self, exc_type, exc_val, exc_tb): class _FakeAsyncManager: - def __init__(self, stream, suppressed=False, exit_error=None): + def __init__( + self, stream, suppressed=False, enter_error=None, exit_error=None + ): self._stream = stream self._suppressed = suppressed + self._enter_error = enter_error self._exit_error = exit_error self.exit_args = None async def __aenter__(self): + if self._enter_error is not None: + raise self._enter_error return self._stream async def __aexit__(self, exc_type, exc_val, exc_tb): @@ -200,9 +203,7 @@ def test_sync_stream_wrapper_exit_fails_and_closes_on_exception(): failures = [] wrapper._stop = lambda: stopped.append(True) - wrapper._fail = lambda message, error_type: failures.append( - (message, error_type) - ) + wrapper._fail = failures.append error = ValueError("boom") result = wrapper.__exit__(ValueError, error, None) @@ -210,7 +211,25 @@ def test_sync_stream_wrapper_exit_fails_and_closes_on_exception(): assert result is False assert stream.close_calls == 1 assert stopped == [True] - assert failures == [("boom", ValueError)] + assert failures == [error] + + +def test_sync_stream_wrapper_close_failure_fails_and_reraises(): + error = RuntimeError("close failed") + stream = _FakeSyncStream(close_error=error) + wrapper = _make_stream_wrapper(stream) + stopped = [] + failures = [] + + wrapper._stop = lambda: stopped.append(True) + wrapper._fail = failures.append + + with pytest.raises(RuntimeError, match="close failed"): + wrapper.close() + + assert stream.close_calls == 1 + assert failures == [error] + assert not stopped def test_sync_stream_wrapper_processes_events_and_stops_on_completion(): @@ -240,14 +259,12 @@ def test_sync_stream_wrapper_fails_and_reraises_stream_errors(): wrapper = _make_stream_wrapper(stream) failures = [] - wrapper._fail = lambda message, error_type: failures.append( - (message, error_type) - ) + wrapper._fail = failures.append with pytest.raises(ValueError, match="boom"): next(wrapper) - assert failures == [("boom", ValueError)] + assert failures == [error] def test_sync_stream_wrapper_getattr_passthrough(): @@ -274,7 +291,6 @@ def test_sync_manager_enter_constructs_stream_wrapper(): stream = _FakeSyncStream() wrapper = MessagesStreamManagerWrapper( manager=_FakeSyncManager(stream=stream), - handler=_make_handler(), invocation=_make_invocation(), capture_content=False, ) @@ -285,10 +301,30 @@ def test_sync_manager_enter_constructs_stream_wrapper(): assert wrapper._stream_wrapper is result +def test_sync_manager_enter_fails_invocation_when_manager_raises(): + error = RuntimeError("manager enter failure") + failures = [] + invocation = _make_invocation() + invocation.fail = failures.append + wrapper = MessagesStreamManagerWrapper( + manager=_FakeSyncManager( + stream=SimpleNamespace(), + enter_error=error, + ), + invocation=invocation, + capture_content=False, + ) + + with pytest.raises(RuntimeError, match="manager enter failure"): + with wrapper: + pass + + assert failures == [error] + + def test_sync_manager_exit_forwards_exception_to_stream_wrapper(): wrapper = MessagesStreamManagerWrapper( manager=_FakeSyncManager(stream=SimpleNamespace(), suppressed=False), - handler=SimpleNamespace(), invocation=_make_invocation(), capture_content=False, ) @@ -306,7 +342,6 @@ def test_sync_manager_exit_forwards_exception_to_stream_wrapper(): def test_sync_manager_exit_uses_none_exception_when_manager_suppresses(): wrapper = MessagesStreamManagerWrapper( manager=_FakeSyncManager(stream=SimpleNamespace(), suppressed=True), - handler=SimpleNamespace(), invocation=_make_invocation(), capture_content=False, ) @@ -329,7 +364,6 @@ def test_sync_manager_exit_still_finalizes_stream_wrapper_when_manager_raises(): suppressed=False, exit_error=manager_error, ), - handler=SimpleNamespace(), invocation=_make_invocation(), capture_content=False, ) @@ -341,7 +375,8 @@ def test_sync_manager_exit_still_finalizes_stream_wrapper_when_manager_raises(): wrapper.__exit__(ValueError, error, None) assert wrapper._manager.exit_args == (ValueError, error, None) - assert stream_wrapper.exit_args == (ValueError, error, None) + assert stream_wrapper.exit_args[:2] == (RuntimeError, manager_error) + assert stream_wrapper.exit_args[2] is not None @pytest.mark.asyncio @@ -367,9 +402,7 @@ async def test_async_stream_wrapper_exit_fails_and_closes_on_exception(): failures = [] wrapper._stop = lambda: stopped.append(True) - wrapper._fail = lambda message, error_type: failures.append( - (message, error_type) - ) + wrapper._fail = failures.append error = ValueError("boom") result = await wrapper.__aexit__(ValueError, error, None) @@ -377,7 +410,7 @@ async def test_async_stream_wrapper_exit_fails_and_closes_on_exception(): assert result is False assert stream.close_calls == 1 assert stopped == [True] - assert failures == [("boom", ValueError)] + assert failures == [error] @pytest.mark.asyncio @@ -394,6 +427,25 @@ async def test_async_stream_wrapper_close_uses_close_and_stops(): assert stopped == [True] +@pytest.mark.asyncio +async def test_async_stream_wrapper_close_failure_fails_and_reraises(): + error = RuntimeError("close failed") + stream = _FakeAsyncStream(close_error=error) + wrapper = _make_async_stream_wrapper(stream) + stopped = [] + failures = [] + + wrapper._stop = lambda: stopped.append(True) + wrapper._fail = failures.append + + with pytest.raises(RuntimeError, match="close failed"): + await wrapper.close() + + assert stream.close_calls == 1 + assert failures == [error] + assert not stopped + + @pytest.mark.asyncio async def test_async_stream_wrapper_processes_events_and_stops_on_completion(): event = SimpleNamespace(type="message_start") @@ -423,14 +475,12 @@ async def test_async_stream_wrapper_fails_and_reraises_stream_errors(): wrapper = _make_async_stream_wrapper(stream) failures = [] - wrapper._fail = lambda message, error_type: failures.append( - (message, error_type) - ) + wrapper._fail = failures.append with pytest.raises(ValueError, match="boom"): await anext(wrapper) - assert failures == [("boom", ValueError)] + assert failures == [error] @pytest.mark.asyncio @@ -463,7 +513,6 @@ async def test_async_manager_enter_constructs_async_stream_wrapper(): stream = _FakeAsyncStream() wrapper = AsyncMessagesStreamManagerWrapper( manager=_FakeAsyncManager(stream=stream), - handler=_make_handler(), invocation=_make_invocation(), capture_content=False, ) @@ -474,11 +523,32 @@ async def test_async_manager_enter_constructs_async_stream_wrapper(): assert wrapper._stream_wrapper is result +@pytest.mark.asyncio +async def test_async_manager_enter_fails_invocation_when_manager_raises(): + error = RuntimeError("manager enter failure") + failures = [] + invocation = _make_invocation() + invocation.fail = failures.append + wrapper = AsyncMessagesStreamManagerWrapper( + manager=_FakeAsyncManager( + stream=SimpleNamespace(), + enter_error=error, + ), + invocation=invocation, + capture_content=False, + ) + + with pytest.raises(RuntimeError, match="manager enter failure"): + async with wrapper: + pass + + assert failures == [error] + + @pytest.mark.asyncio async def test_async_manager_exit_forwards_exception_to_stream_wrapper(): wrapper = AsyncMessagesStreamManagerWrapper( manager=_FakeAsyncManager(stream=SimpleNamespace(), suppressed=False), - handler=SimpleNamespace(), invocation=_make_invocation(), capture_content=False, ) @@ -497,7 +567,6 @@ async def test_async_manager_exit_forwards_exception_to_stream_wrapper(): async def test_async_manager_exit_uses_none_exception_when_manager_suppresses(): wrapper = AsyncMessagesStreamManagerWrapper( manager=_FakeAsyncManager(stream=SimpleNamespace(), suppressed=True), - handler=SimpleNamespace(), invocation=_make_invocation(), capture_content=False, ) @@ -521,7 +590,6 @@ async def test_async_manager_exit_still_finalizes_stream_wrapper_when_manager_ra suppressed=False, exit_error=manager_error, ), - handler=SimpleNamespace(), invocation=_make_invocation(), capture_content=False, ) @@ -533,4 +601,5 @@ async def test_async_manager_exit_still_finalizes_stream_wrapper_when_manager_ra await wrapper.__aexit__(ValueError, error, None) assert wrapper._manager.exit_args == (ValueError, error, None) - assert stream_wrapper.exit_args == (ValueError, error, None) + assert stream_wrapper.exit_args[:2] == (RuntimeError, manager_error) + assert stream_wrapper.exit_args[2] is not None diff --git a/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/test_sync_messages.py b/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/test_sync_messages.py index a073f3b569..9bcf8bef91 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/test_sync_messages.py +++ b/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/test_sync_messages.py @@ -13,6 +13,7 @@ # limitations under the License. """Tests for sync Messages.create instrumentation.""" +# pylint: disable=too-many-lines import inspect import json @@ -482,6 +483,242 @@ def test_sync_messages_create_streaming_captures_content( assert output_messages[0]["parts"] +@pytest.mark.vcr() +def test_sync_messages_stream( # pylint: disable=too-many-locals + request, span_exporter, anthropic_client, instrument_no_content +): + """Test Messages.stream produces correct span.""" + _skip_if_cassette_missing_and_no_real_key(request) + model = "claude-sonnet-4-20250514" + messages = [{"role": "user", "content": "Say hello in one word."}] + + response_id = None + response_model = None + stop_reason = None + input_tokens = None + output_tokens = None + + with anthropic_client.messages.stream( + model=model, + max_tokens=100, + messages=messages, + ) as stream: + for chunk in stream: + if chunk.type == "message_start": + message = getattr(chunk, "message", None) + if message: + response_id = getattr(message, "id", None) + response_model = getattr(message, "model", None) + usage = getattr(message, "usage", None) + if usage: + input_tokens = getattr(usage, "input_tokens", None) + elif chunk.type == "message_delta": + delta = getattr(chunk, "delta", None) + if delta: + stop_reason = getattr(delta, "stop_reason", None) + usage = getattr(chunk, "usage", None) + if usage: + output_tokens = getattr(usage, "output_tokens", None) + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + + assert_span_attributes( + spans[0], + request_model=model, + response_id=response_id, + response_model=response_model, + input_tokens=input_tokens, + output_tokens=output_tokens, + finish_reasons=[normalize_stop_reason(stop_reason)] + if stop_reason + else None, + ) + + +@pytest.mark.vcr() +def test_sync_messages_stream_captures_content( + request, span_exporter, anthropic_client, instrument_with_content +): + """Test content capture on Messages.stream.""" + _skip_if_cassette_missing_and_no_real_key(request) + model = "claude-sonnet-4-20250514" + messages = [{"role": "user", "content": "Say hello in one word."}] + + with anthropic_client.messages.stream( + model=model, + max_tokens=100, + messages=messages, + ) as stream: + for _ in stream: + pass + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + + input_messages = _load_span_messages( + span, GenAIAttributes.GEN_AI_INPUT_MESSAGES + ) + output_messages = _load_span_messages( + span, GenAIAttributes.GEN_AI_OUTPUT_MESSAGES + ) + assert input_messages[0]["role"] == "user" + assert input_messages[0]["parts"][0]["type"] == "text" + assert output_messages[0]["role"] == "assistant" + assert output_messages[0]["parts"] + + +@pytest.mark.vcr() +def test_sync_messages_stream_delegates_response_attribute( + request, anthropic_client, instrument_no_content +): + """Messages.stream wrapper should expose attributes from the wrapped stream.""" + _skip_if_cassette_missing_and_no_real_key(request) + + with anthropic_client.messages.stream( + model="claude-sonnet-4-20250514", + max_tokens=100, + messages=[{"role": "user", "content": "Say hi."}], + ) as stream: + assert stream.response is not None + assert stream.response.status_code == 200 + assert stream.response.headers.get("request-id") is not None + + +def test_sync_messages_stream_connection_error( + span_exporter, instrument_no_content +): + """Test that connection errors from Messages.stream are handled correctly.""" + model = "claude-sonnet-4-20250514" + messages = [{"role": "user", "content": "Hello"}] + + client = Anthropic(base_url="http://localhost:9999") + + with pytest.raises(APIConnectionError): + with client.messages.stream( + model=model, + max_tokens=100, + messages=messages, + timeout=0.1, + ) as stream: + list(stream) + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] == model + assert ErrorAttributes.ERROR_TYPE in span.attributes + assert "APIConnectionError" in span.attributes[ErrorAttributes.ERROR_TYPE] + + +@pytest.mark.vcr() +def test_sync_messages_stream_api_error( + request, span_exporter, anthropic_client, instrument_no_content +): + """Test that API errors from Messages.stream are propagated.""" + _skip_if_cassette_missing_and_no_real_key(request) + model = "invalid-model-name" + messages = [{"role": "user", "content": "Hello"}] + + with pytest.raises(NotFoundError): + with anthropic_client.messages.stream( + model=model, + max_tokens=100, + messages=messages, + ) as stream: + list(stream) + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] == model + assert ErrorAttributes.ERROR_TYPE in span.attributes + assert "NotFoundError" in span.attributes[ErrorAttributes.ERROR_TYPE] + + +@pytest.mark.vcr() +def test_sync_messages_stream_interrupted_mid_iteration( + request, + span_exporter, + anthropic_client, + instrument_no_content, + monkeypatch, +): + """Mid-stream network errors from Messages.stream propagate and record error.""" + _skip_if_cassette_missing_and_no_real_key(request) + model = "claude-sonnet-4-20250514" + messages = [{"role": "user", "content": "Say hello in one word."}] + + class ErrorInjectingStreamDelegate: + def __init__(self, inner): + self._inner = inner + self._count = 0 + + def __iter__(self): + return self + + def __next__(self): + if self._count == 1: + raise ConnectionError("connection reset during stream") + self._count += 1 + return next(self._inner) + + def close(self): + return self._inner.close() + + def __getattr__(self, name): + return getattr(self._inner, name) + + with pytest.raises( + ConnectionError, match="connection reset during stream" + ): + with anthropic_client.messages.stream( + model=model, + max_tokens=100, + messages=messages, + ) as stream: + monkeypatch.setattr( + stream, + "stream", + ErrorInjectingStreamDelegate(stream.stream), + ) + for _ in stream: + pass + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] == model + assert span.attributes[ErrorAttributes.ERROR_TYPE] == "ConnectionError" + + +@pytest.mark.vcr() +def test_sync_messages_stream_closed_early_by_caller( + request, span_exporter, anthropic_client, instrument_no_content +): + """Caller-closing Messages.stream early finalizes the span without error.""" + _skip_if_cassette_missing_and_no_real_key(request) + model = "claude-sonnet-4-20250514" + messages = [{"role": "user", "content": "Say hello in one word."}] + + with anthropic_client.messages.stream( + model=model, + max_tokens=100, + messages=messages, + ) as stream: + next(stream) + stream.close() + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] == model + assert ErrorAttributes.ERROR_TYPE not in span.attributes + + @pytest.mark.vcr() def test_sync_messages_create_streaming_iteration( span_exporter, anthropic_client, instrument_no_content @@ -868,6 +1105,31 @@ def test_sync_messages_create_streaming_user_exception( assert span.attributes[ErrorAttributes.ERROR_TYPE] == "ValueError" +@pytest.mark.vcr() +def test_sync_messages_stream_user_exception( + request, span_exporter, anthropic_client, instrument_no_content +): + """Test that user raised exceptions from Messages.stream are propagated.""" + _skip_if_cassette_missing_and_no_real_key(request) + model = "claude-sonnet-4-20250514" + messages = [{"role": "user", "content": "Say hello in one word."}] + + with pytest.raises(ValueError, match="User raised exception"): + with anthropic_client.messages.stream( + model=model, + max_tokens=100, + messages=messages, + ) as stream: + for _ in stream: + raise ValueError("User raised exception") + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] == model + assert span.attributes[ErrorAttributes.ERROR_TYPE] == "ValueError" + + @pytest.mark.vcr() def test_sync_messages_create_instrumentation_error_swallowed( span_exporter, anthropic_client, instrument_no_content, monkeypatch