Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 121 additions & 76 deletions sentry_sdk/integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
get_start_span_function,
transform_anthropic_content_part,
)
from sentry_sdk.consts import OP, SPANDATA, SPANSTATUS
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations import _check_minimum_version, DidNotEnable, Integration
from sentry_sdk.scope import should_send_default_pii
from sentry_sdk.tracing_utils import set_span_errored
from sentry_sdk.utils import (
capture_internal_exceptions,
event_from_exception,
Expand Down Expand Up @@ -195,8 +194,6 @@ def setup_once() -> None:


def _capture_exception(exc: "Any") -> None:
set_span_errored()

event, hint = event_from_exception(
exc,
client_options=sentry_sdk.get_client().options,
Expand Down Expand Up @@ -600,7 +597,10 @@ def _set_output_data(
)


def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
def _sentry_patched_create_sync(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
"""
Creates and manages an AI Client Span for both non-streaming and streaming calls.
"""
integration = kwargs.pop("integration")
if integration is None:
return f(*args, **kwargs)
Expand All @@ -624,7 +624,14 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A

_set_create_input_data(span, kwargs, integration)

result = yield f, args, kwargs
try:
result = f(*args, **kwargs)
except Exception as exc:
exc_info = sys.exc_info()
with capture_internal_exceptions():
_capture_exception(exc)
span.__exit__(*exc_info)
reraise(*exc_info)

if isinstance(result, Stream):
result._span = span
Expand All @@ -638,6 +645,82 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A

return result

with capture_internal_exceptions():
if hasattr(result, "content"):
(
input_tokens,
output_tokens,
cache_read_input_tokens,
cache_write_input_tokens,
) = _get_token_usage(result)

content_blocks = []
for content_block in result.content:
if hasattr(content_block, "to_dict"):
content_blocks.append(content_block.to_dict())
elif hasattr(content_block, "model_dump"):
content_blocks.append(content_block.model_dump())
elif hasattr(content_block, "text"):
content_blocks.append({"type": "text", "text": content_block.text})

_set_output_data(
span=span,
integration=integration,
model=getattr(result, "model", None),
input_tokens=input_tokens,
output_tokens=output_tokens,
cache_read_input_tokens=cache_read_input_tokens,
cache_write_input_tokens=cache_write_input_tokens,
content_blocks=content_blocks,
response_id=getattr(result, "id", None),
finish_reason=getattr(result, "stop_reason", None),
)
span.__exit__(None, None, None)
else:
span.set_data("unknown_response", True)
span.__exit__(None, None, None)

return result


async def _sentry_patched_create_async(
f: "Any", *args: "Any", **kwargs: "Any"
) -> "Any":
"""
Creates and manages an AI Client Span for both non-streaming and streaming calls.
"""
integration = kwargs.pop("integration")
if integration is None:
return await f(*args, **kwargs)

if "messages" not in kwargs:
return await f(*args, **kwargs)

try:
iter(kwargs["messages"])
except TypeError:
return await f(*args, **kwargs)

model = kwargs.get("model", "")

span = get_start_span_function()(
op=OP.GEN_AI_CHAT,
name=f"chat {model}".strip(),
origin=AnthropicIntegration.origin,
)
span.__enter__()

_set_create_input_data(span, kwargs, integration)

try:
result = await f(*args, **kwargs)
except Exception as exc:
exc_info = sys.exc_info()
with capture_internal_exceptions():
_capture_exception(exc)
span.__exit__(*exc_info)
reraise(*exc_info)

if isinstance(result, AsyncStream):
result._span = span
result._integration = integration
Expand Down Expand Up @@ -689,41 +772,14 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A


def _wrap_message_create(f: "Any") -> "Any":
def _execute_sync(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
gen = _sentry_patched_create_common(f, *args, **kwargs)

try:
f, args, kwargs = next(gen)
except StopIteration as e:
return e.value

try:
try:
result = f(*args, **kwargs)
except Exception as exc:
exc_info = sys.exc_info()
with capture_internal_exceptions():
_capture_exception(exc)
reraise(*exc_info)

return gen.send(result)
except StopIteration as e:
return e.value

@wraps(f)
def _sentry_patched_create_sync(*args: "Any", **kwargs: "Any") -> "Any":
def _sentry_wrapped_create_sync(*args: "Any", **kwargs: "Any") -> "Any":
integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)
kwargs["integration"] = integration

try:
return _execute_sync(f, *args, **kwargs)
finally:
span = sentry_sdk.get_current_span()
if span is not None and span.status == SPANSTATUS.INTERNAL_ERROR:
with capture_internal_exceptions():
span.__exit__(None, None, None)
return _sentry_patched_create_sync(f, *args, **kwargs)

return _sentry_patched_create_sync
return _sentry_wrapped_create_sync


def _initialize_data_accumulation_state(stream: "Union[Stream, MessageStream]") -> None:
Expand Down Expand Up @@ -810,41 +866,14 @@ def close(self: "Union[Stream, MessageStream]") -> None:


def _wrap_message_create_async(f: "Any") -> "Any":
async def _execute_async(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
gen = _sentry_patched_create_common(f, *args, **kwargs)

try:
f, args, kwargs = next(gen)
except StopIteration as e:
return await e.value

try:
try:
result = await f(*args, **kwargs)
except Exception as exc:
exc_info = sys.exc_info()
with capture_internal_exceptions():
_capture_exception(exc)
reraise(*exc_info)

return gen.send(result)
except StopIteration as e:
return e.value

@wraps(f)
async def _sentry_patched_create_async(*args: "Any", **kwargs: "Any") -> "Any":
async def _sentry_wrapped_create_async(*args: "Any", **kwargs: "Any") -> "Any":
integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)
kwargs["integration"] = integration

try:
return await _execute_async(f, *args, **kwargs)
finally:
span = sentry_sdk.get_current_span()
if span is not None and span.status == SPANSTATUS.INTERNAL_ERROR:
with capture_internal_exceptions():
span.__exit__(None, None, None)
return await _sentry_patched_create_async(f, *args, **kwargs)

return _sentry_patched_create_async
return _sentry_wrapped_create_async


def _wrap_async_close(
Expand Down Expand Up @@ -892,22 +921,21 @@ def _wrap_message_stream_manager_enter(f: "Any") -> "Any":

@wraps(f)
def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStream":
stream = f(self)
if not hasattr(self, "_max_tokens"):
return stream
return f(self)

integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)

if integration is None:
return stream
return f(self)

if self._messages is None:
return stream
return f(self)

try:
iter(self._messages)
except TypeError:
return stream
return f(self)

span = get_start_span_function()(
op=OP.GEN_AI_CHAT,
Expand All @@ -930,6 +958,15 @@ def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStream":
tools=self._tools,
)

try:
stream = f(self)
except Exception as exc:
exc_info = sys.exc_info()
with capture_internal_exceptions():
_capture_exception(exc)
span.__exit__(*exc_info)
reraise(*exc_info)

stream._span = span
stream._integration = integration

Expand Down Expand Up @@ -979,22 +1016,21 @@ def _wrap_async_message_stream_manager_aenter(f: "Any") -> "Any":
async def _sentry_patched_aenter(
self: "AsyncMessageStreamManager",
) -> "AsyncMessageStream":
stream = await f(self)
if not hasattr(self, "_max_tokens"):
return stream
return await f(self)

integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)

if integration is None:
return stream
return await f(self)

if self._messages is None:
return stream
return await f(self)

try:
iter(self._messages)
except TypeError:
return stream
return await f(self)

span = get_start_span_function()(
op=OP.GEN_AI_CHAT,
Expand All @@ -1017,6 +1053,15 @@ async def _sentry_patched_aenter(
tools=self._tools,
)

try:
stream = await f(self)
except Exception as exc:
exc_info = sys.exc_info()
with capture_internal_exceptions():
_capture_exception(exc)
span.__exit__(*exc_info)
reraise(*exc_info)

stream._span = span
stream._integration = integration

Expand Down
2 changes: 0 additions & 2 deletions tests/integrations/anthropic/test_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2204,7 +2204,6 @@ def test_span_status_error(sentry_init, capture_events):
assert error["level"] == "error"
assert transaction["spans"][0]["status"] == "internal_error"
assert transaction["spans"][0]["tags"]["status"] == "internal_error"
assert transaction["contexts"]["trace"]["status"] == "internal_error"
assert transaction["spans"][0]["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic"
assert transaction["spans"][0]["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat"

Expand All @@ -2230,7 +2229,6 @@ async def test_span_status_error_async(sentry_init, capture_events):
assert error["level"] == "error"
assert transaction["spans"][0]["status"] == "internal_error"
assert transaction["spans"][0]["tags"]["status"] == "internal_error"
assert transaction["contexts"]["trace"]["status"] == "internal_error"
assert transaction["spans"][0]["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic"
assert transaction["spans"][0]["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat"

Expand Down
Loading