Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 100 additions & 86 deletions sentry_sdk/integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
from sentry_sdk.tracing import Span
from sentry_sdk._types import TextPart

from anthropic import AsyncStream
from anthropic.types import RawMessageStreamEvent


class _RecordedUsage:
output_tokens: int = 0
Expand Down Expand Up @@ -389,6 +392,98 @@
span.__exit__(None, None, None)


def _patch_streaming_response_iterator(
result: "AsyncStream[RawMessageStreamEvent]",
span: "sentry_sdk.tracing.Span",
integration: "AnthropicIntegration",
) -> None:
"""
Responsible for closing the `gen_ai.chat` span and setting attributes acquired during response consumption.
"""
old_iterator = result._iterator

def new_iterator() -> "Iterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

async def new_iterator_async() -> "AsyncIterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

async for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

if str(type(result._iterator)) == "<class 'async_generator'>":
result._iterator = new_iterator_async()
else:
result._iterator = new_iterator()


def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
integration = kwargs.pop("integration")
if integration is None:
Expand All @@ -415,6 +510,11 @@

result = yield f, args, kwargs

is_streaming_response = kwargs.get("stream", False)
if is_streaming_response:
_patch_streaming_response_iterator(result, span, integration)
return result

Check warning on line 516 in sentry_sdk/integrations/anthropic.py

View workflow job for this annotation

GitHub Actions / warden: find-bugs

Unprotected _iterator access can crash user applications when stream=True response lacks _iterator attribute

The refactored code accesses `result._iterator` directly (lines 403, 481) without verifying the attribute exists, and the call to `_patch_streaming_response_iterator` (line 515) is not wrapped in `capture_internal_exceptions()`. Previously, the streaming path was inside `capture_internal_exceptions()` with `hasattr(result, "_iterator")` check. If `stream=True` is passed but the response object doesn't have `_iterator` (e.g., API version mismatch or unexpected response type), an unhandled `AttributeError` will propagate to the user's application instead of being gracefully logged.

Check warning on line 516 in sentry_sdk/integrations/anthropic.py

View workflow job for this annotation

GitHub Actions / warden: code-review

Streaming response handling no longer protected by capture_internal_exceptions

The streaming path now checks `kwargs.get("stream", False)` and calls `_patch_streaming_response_iterator` before entering the `capture_internal_exceptions()` block. Previously, the streaming detection (`hasattr(result, "_iterator")`) was inside the exception handling block. If `stream=True` is passed but the result unexpectedly lacks `_iterator` (e.g., API behavior change, error responses), an `AttributeError` will propagate instead of being gracefully handled, potentially breaking the instrumented code path.
Comment thread
alexander-alderman-webb marked this conversation as resolved.
Outdated

with capture_internal_exceptions():
if hasattr(result, "content"):
(
Expand Down Expand Up @@ -444,92 +544,6 @@
content_blocks=content_blocks,
finish_span=True,
)

# Streaming response
elif hasattr(result, "_iterator"):
old_iterator = result._iterator

def new_iterator() -> "Iterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

async def new_iterator_async() -> "AsyncIterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

async for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

if str(type(result._iterator)) == "<class 'async_generator'>":
result._iterator = new_iterator_async()
else:
result._iterator = new_iterator()

else:
span.set_data("unknown_response", True)
span.__exit__(None, None, None)
Expand Down
Loading