Skip to content
Merged
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
319 changes: 232 additions & 87 deletions sentry_sdk/integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
except ImportError:
Omit = None

from anthropic import Stream, AsyncStream
from anthropic.resources import AsyncMessages, Messages

if TYPE_CHECKING:
Expand All @@ -45,10 +46,12 @@
raise DidNotEnable("Anthropic not installed")

if TYPE_CHECKING:
from typing import Any, AsyncIterator, Iterator, List, Optional, Union
from typing import Any, AsyncIterator, Iterator, List, Optional, Union, Callable
from sentry_sdk.tracing import Span
from sentry_sdk._types import TextPart

from anthropic.types import RawMessageStreamEvent


class _RecordedUsage:
output_tokens: int = 0
Expand All @@ -72,6 +75,127 @@
Messages.create = _wrap_message_create(Messages.create)
AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create)

Stream.__iter__ = _wrap_stream_iter(Stream.__iter__)
AsyncStream.__aiter__ = _wrap_async_stream_aiter(AsyncStream.__aiter__)


def _wrap_stream_iter(
f: "Callable[..., Iterator[RawMessageStreamEvent]]",
) -> "Callable[..., Iterator[RawMessageStreamEvent]]":
"""
Sets information received while iterating the response stream on the AI Client Span.
Responsible for closing the AI Client Span.
"""

@wraps(f)
def _patched_iter(self: "Stream") -> "Iterator[RawMessageStreamEvent]":
if not hasattr(self, "_sentry_span"):
for event in f(self):
yield event
Comment thread
alexander-alderman-webb marked this conversation as resolved.
Outdated
return

Check failure on line 95 in sentry_sdk/integrations/anthropic.py

View check run for this annotation

@sentry/warden / warden: find-bugs

Stream wrapper functions never execute span tracking due to missing _sentry_span attribute

The new `_wrap_stream_iter` and `_wrap_async_stream_aiter` functions check `hasattr(self, "_sentry_span")` (lines 92, 152) before executing span tracking logic. However, the `_sentry_span` and `_integration` attributes are never set on Stream/AsyncStream objects anywhere in the codebase. This causes the wrapper to always take the early-exit path (yielding events without tracking), rendering the span tracking code dead/unreachable.
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated

model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

for event in f(self):
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event
Comment thread
github-actions[bot] marked this conversation as resolved.
Outdated

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

span = self._sentry_span
integration = self._integration

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

return _patched_iter


def _wrap_async_stream_aiter(
f: "Callable[..., AsyncIterator[RawMessageStreamEvent]]",
) -> "Callable[..., AsyncIterator[RawMessageStreamEvent]]":
"""
Sets information received while iterating the response stream on the AI Client Span.
Responsible for closing the AI Client Span.
"""

@wraps(f)
async def _patched_aiter(
self: "AsyncStream",
) -> "AsyncIterator[RawMessageStreamEvent]":
if not hasattr(self, "_sentry_span"):
async for event in f(self):
yield event
return

model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

async for event in f(self):
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

span = self._sentry_span
integration = self._integration

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

return _patched_aiter


def _capture_exception(exc: "Any") -> None:
set_span_errored()
Expand Down Expand Up @@ -338,6 +462,101 @@
span.set_data(SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools))


def _wrap_synchronous_message_iterator(
iterator: "Iterator[RawMessageStreamEvent]",
span: "Span",
integration: "AnthropicIntegration",
) -> "Iterator[RawMessageStreamEvent]":
"""
Sets information received while iterating the response stream on the AI Client Span.
Responsible for closing the AI Client Span.
"""

model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

for event in iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)


async def _wrap_asynchronous_message_iterator(
iterator: "Iterator[RawMessageStreamEvent]",
span: "Span",
integration: "AnthropicIntegration",
) -> "Iterator[RawMessageStreamEvent]":

Check warning on line 517 in sentry_sdk/integrations/anthropic.py

View check run for this annotation

@sentry/warden / warden: find-bugs

Incorrect type hints in _wrap_asynchronous_message_iterator cause type checker failures

The function `_wrap_asynchronous_message_iterator` has incorrect type hints. The `iterator` parameter is typed as `Iterator[RawMessageStreamEvent]` but should be `AsyncIterator[RawMessageStreamEvent]` since it's used with `async for`. The return type is also `Iterator[RawMessageStreamEvent]` but should be `AsyncIterator[RawMessageStreamEvent]` since this is an async generator function. This will cause static type checkers to report errors and could lead to incorrect assumptions about the code's behavior.
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated
"""
Sets information received while iterating the response stream on the AI Client Span.
Responsible for closing the AI Client Span.
"""
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

async for event in iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)


def _set_output_data(
span: "Span",
integration: "AnthropicIntegration",
Expand Down Expand Up @@ -415,6 +634,18 @@

result = yield f, args, kwargs

if isinstance(result, Stream):
result._iterator = _wrap_synchronous_message_iterator(
result._iterator, span, integration
)
return result

if isinstance(result, AsyncStream):
result._iterator = _wrap_asynchronous_message_iterator(
result._iterator, span, integration
)
return result

with capture_internal_exceptions():
if hasattr(result, "content"):
(
Expand Down Expand Up @@ -444,92 +675,6 @@
content_blocks=content_blocks,
finish_span=True,
)

# Streaming response
elif hasattr(result, "_iterator"):
old_iterator = result._iterator

def new_iterator() -> "Iterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

async def new_iterator_async() -> "AsyncIterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

async for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

if str(type(result._iterator)) == "<class 'async_generator'>":
result._iterator = new_iterator_async()
else:
result._iterator = new_iterator()

else:
span.set_data("unknown_response", True)
span.__exit__(None, None, None)
Expand Down
Loading