Skip to content
Merged
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4cf7e5a
ref(anthropic): Factor out streamed result handling
alexander-alderman-webb Feb 27, 2026
61a4cd0
ref(anthropic): Skip accumulation logic for unexpected types in strea…
alexander-alderman-webb Feb 27, 2026
6cdab16
.
alexander-alderman-webb Feb 27, 2026
db5dbb9
.
alexander-alderman-webb Feb 27, 2026
091506c
Merge branch 'webb/anthropic/separate-output-handling' into webb/anth…
alexander-alderman-webb Feb 27, 2026
d47b64d
.
alexander-alderman-webb Feb 27, 2026
384351f
remove duplicate import
alexander-alderman-webb Feb 27, 2026
55d5c27
.
alexander-alderman-webb Feb 27, 2026
fd84837
simplify
alexander-alderman-webb Feb 27, 2026
53859e4
.
alexander-alderman-webb Mar 10, 2026
e2d6d78
remove unused import
alexander-alderman-webb Mar 10, 2026
a01f7c1
add docstring
alexander-alderman-webb Mar 10, 2026
7837439
add return
alexander-alderman-webb Mar 10, 2026
0e06f49
remove return statement
alexander-alderman-webb Mar 10, 2026
c396a32
merge
alexander-alderman-webb Mar 10, 2026
d4022c5
Merge branch 'master' into webb/anthropic/only-raw-message-stream-events
alexander-alderman-webb Mar 10, 2026
4e3d1db
Merge branch 'master' into webb/anthropic/separate-output-handling
alexander-alderman-webb Mar 10, 2026
531c6a8
Merge branch 'webb/anthropic/separate-output-handling' into webb/anth…
alexander-alderman-webb Mar 10, 2026
917b8d4
.
alexander-alderman-webb Mar 11, 2026
aa6e58a
.
alexander-alderman-webb Mar 11, 2026
bd00e4e
.
alexander-alderman-webb Mar 11, 2026
a6da66e
merge
alexander-alderman-webb Mar 11, 2026
c988c26
fix type
alexander-alderman-webb Mar 11, 2026
3009ba6
Merge branch 'webb/anthropic/separate-output-handling' into webb/anth…
alexander-alderman-webb Mar 11, 2026
ab5e3bb
Merge branch 'master' into webb/anthropic/separate-output-handling
alexander-alderman-webb Mar 12, 2026
571db4e
Merge branch 'webb/anthropic/separate-output-handling' into webb/anth…
alexander-alderman-webb Mar 12, 2026
2d66d56
merge master
alexander-alderman-webb Mar 13, 2026
ad4b3c2
add docstring explaining type alises
alexander-alderman-webb Mar 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 147 additions & 86 deletions sentry_sdk/integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,18 @@
except ImportError:
Omit = None

from anthropic import Stream, AsyncStream
from anthropic.resources import AsyncMessages, Messages

from anthropic.types import (
MessageStartEvent,
MessageDeltaEvent,
MessageStopEvent,
ContentBlockStartEvent,
ContentBlockDeltaEvent,
ContentBlockStopEvent,
)

if TYPE_CHECKING:
from anthropic.types import MessageStreamEvent, TextBlockParam
except ImportError:
Expand All @@ -49,6 +59,8 @@
from sentry_sdk.tracing import Span
from sentry_sdk._types import TextPart

from anthropic.types import RawMessageStreamEvent


class _RecordedUsage:
output_tokens: int = 0
Expand Down Expand Up @@ -338,6 +350,129 @@ def _set_input_data(
span.set_data(SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools))


def _wrap_synchronous_message_iterator(
iterator: "Iterator[RawMessageStreamEvent]",
span: "Span",
integration: "AnthropicIntegration",
) -> "Iterator[RawMessageStreamEvent]":
"""
Sets information received while iterating the response stream on the AI Client Span.
Responsible for closing the AI Client Span.
"""

model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

for event in iterator:
if not isinstance(
event,
(
MessageStartEvent,
MessageDeltaEvent,
MessageStopEvent,
ContentBlockStartEvent,
ContentBlockDeltaEvent,
ContentBlockStopEvent,
),
):
yield event
continue

(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)


async def _wrap_asynchronous_message_iterator(
iterator: "AsyncIterator[RawMessageStreamEvent]",
span: "Span",
integration: "AnthropicIntegration",
) -> "AsyncIterator[RawMessageStreamEvent]":
"""
Sets information received while iterating the response stream on the AI Client Span.
Responsible for closing the AI Client Span.
"""
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

async for event in iterator:
if not isinstance(
event,
(
MessageStartEvent,
MessageDeltaEvent,
MessageStopEvent,
ContentBlockStartEvent,
ContentBlockDeltaEvent,
ContentBlockStopEvent,
),
):
yield event
continue

(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)


def _set_output_data(
span: "Span",
integration: "AnthropicIntegration",
Expand Down Expand Up @@ -415,6 +550,18 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A

result = yield f, args, kwargs

if isinstance(result, Stream):
result._iterator = _wrap_synchronous_message_iterator(
result._iterator, span, integration
)
return result

if isinstance(result, AsyncStream):
result._iterator = _wrap_asynchronous_message_iterator(
result._iterator, span, integration
)
return result

with capture_internal_exceptions():
if hasattr(result, "content"):
(
Expand Down Expand Up @@ -444,92 +591,6 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A
content_blocks=content_blocks,
finish_span=True,
)

# Streaming response
elif hasattr(result, "_iterator"):
old_iterator = result._iterator

def new_iterator() -> "Iterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

async def new_iterator_async() -> "AsyncIterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

async for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

if str(type(result._iterator)) == "<class 'async_generator'>":
result._iterator = new_iterator_async()
else:
result._iterator = new_iterator()

else:
span.set_data("unknown_response", True)
span.__exit__(None, None, None)
Expand Down
Loading