Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 101 additions & 86 deletions sentry_sdk/integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
from sentry_sdk.tracing import Span
from sentry_sdk._types import TextPart

from anthropic import AsyncStream
from anthropic.types import RawMessageStreamEvent


class _RecordedUsage:
output_tokens: int = 0
Expand Down Expand Up @@ -389,6 +392,99 @@
span.__exit__(None, None, None)


def _patch_streaming_response_iterator(
result: "AsyncStream[RawMessageStreamEvent]",
span: "sentry_sdk.tracing.Span",
):
"""
Responsible for closing the `gen_ai.chat` span and setting attributes acquired during response consumption.
"""
integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)

Check warning on line 402 in sentry_sdk/integrations/anthropic.py

View workflow job for this annotation

GitHub Actions / warden: code-review

Missing null check for integration in streaming response handler could cause AttributeError

The new `_patch_streaming_response_iterator` function fetches `integration` via `sentry_sdk.get_client().get_integration(AnthropicIntegration)` at line 402, but doesn't check if it's `None` before passing it to `_set_output_data`. If the client configuration changes or gets disabled between when the original call started and when the stream is consumed, `integration` could be `None`, causing an `AttributeError` when `_set_output_data` accesses `integration.include_prompts` at line 358.

old_iterator = result._iterator

def new_iterator() -> "Iterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

async def new_iterator_async() -> "AsyncIterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

async for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

if str(type(result._iterator)) == "<class 'async_generator'>":
result._iterator = new_iterator_async()
else:
result._iterator = new_iterator()


def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
integration = kwargs.pop("integration")
if integration is None:
Expand All @@ -415,6 +511,11 @@

result = yield f, args, kwargs

is_streaming_response = kwargs.get("stream", False)
if is_streaming_response:
_patch_streaming_response_iterator(result, span)

Check warning on line 516 in sentry_sdk/integrations/anthropic.py

View workflow job for this annotation

GitHub Actions / warden: code-review

[RK8-6U8] Missing null check for integration in streaming response handler could cause AttributeError (additional location)

The new `_patch_streaming_response_iterator` function fetches `integration` via `sentry_sdk.get_client().get_integration(AnthropicIntegration)` at line 402, but doesn't check if it's `None` before passing it to `_set_output_data`. If the client configuration changes or gets disabled between when the original call started and when the stream is consumed, `integration` could be `None`, causing an `AttributeError` when `_set_output_data` accesses `integration.include_prompts` at line 358.
return result

with capture_internal_exceptions():
if hasattr(result, "content"):
(
Expand Down Expand Up @@ -444,92 +545,6 @@
content_blocks=content_blocks,
finish_span=True,
)

# Streaming response
elif hasattr(result, "_iterator"):
old_iterator = result._iterator

def new_iterator() -> "Iterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

async def new_iterator_async() -> "AsyncIterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

async for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

if str(type(result._iterator)) == "<class 'async_generator'>":
result._iterator = new_iterator_async()
else:
result._iterator = new_iterator()

else:
span.set_data("unknown_response", True)
span.__exit__(None, None, None)
Expand Down
Loading