Skip to content
160 changes: 96 additions & 64 deletions sentry_sdk/integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
get_start_span_function,
transform_anthropic_content_part,
)
from sentry_sdk.consts import OP, SPANDATA, SPANSTATUS
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations import _check_minimum_version, DidNotEnable, Integration
from sentry_sdk.scope import should_send_default_pii
from sentry_sdk.tracing_utils import set_span_errored
Expand Down Expand Up @@ -600,7 +600,10 @@ def _set_output_data(
)


def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
def _sentry_patched_create_sync(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
"""
Creates and manages an AI Client Span for both non-streaming and streaming calls.
"""
integration = kwargs.pop("integration")
if integration is None:
return f(*args, **kwargs)
Expand All @@ -624,7 +627,14 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A

_set_create_input_data(span, kwargs, integration)

result = yield f, args, kwargs
try:
result = f(*args, **kwargs)
except Exception as exc:
exc_info = sys.exc_info()
with capture_internal_exceptions():
_capture_exception(exc)
span.__exit__(None, None, None)
Comment thread
alexander-alderman-webb marked this conversation as resolved.
Comment thread
alexander-alderman-webb marked this conversation as resolved.
reraise(*exc_info)

if isinstance(result, Stream):
result._span = span
Expand All @@ -638,6 +648,82 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A

return result

with capture_internal_exceptions():
if hasattr(result, "content"):
(
input_tokens,
output_tokens,
cache_read_input_tokens,
cache_write_input_tokens,
) = _get_token_usage(result)

content_blocks = []
for content_block in result.content:
if hasattr(content_block, "to_dict"):
content_blocks.append(content_block.to_dict())
elif hasattr(content_block, "model_dump"):
content_blocks.append(content_block.model_dump())
elif hasattr(content_block, "text"):
content_blocks.append({"type": "text", "text": content_block.text})

_set_output_data(
span=span,
integration=integration,
model=getattr(result, "model", None),
input_tokens=input_tokens,
output_tokens=output_tokens,
cache_read_input_tokens=cache_read_input_tokens,
cache_write_input_tokens=cache_write_input_tokens,
content_blocks=content_blocks,
response_id=getattr(result, "id", None),
finish_reason=getattr(result, "stop_reason", None),
)
span.__exit__(None, None, None)
else:
span.set_data("unknown_response", True)
span.__exit__(None, None, None)

return result


async def _sentry_patched_create_async(
f: "Any", *args: "Any", **kwargs: "Any"
) -> "Any":
"""
Creates and manages an AI Client Span for both non-streaming and streaming calls.
"""
integration = kwargs.pop("integration")
if integration is None:
return await f(*args, **kwargs)

if "messages" not in kwargs:
return await f(*args, **kwargs)

try:
iter(kwargs["messages"])
except TypeError:
return await f(*args, **kwargs)

model = kwargs.get("model", "")

span = get_start_span_function()(
op=OP.GEN_AI_CHAT,
name=f"chat {model}".strip(),
origin=AnthropicIntegration.origin,
)
span.__enter__()

_set_create_input_data(span, kwargs, integration)

try:
result = await f(*args, **kwargs)
except Exception as exc:
exc_info = sys.exc_info()
with capture_internal_exceptions():
_capture_exception(exc)
span.__exit__(None, None, None)
reraise(*exc_info)

if isinstance(result, AsyncStream):
result._span = span
result._integration = integration
Expand Down Expand Up @@ -689,41 +775,14 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A


def _wrap_message_create(f: "Any") -> "Any":
def _execute_sync(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
gen = _sentry_patched_create_common(f, *args, **kwargs)

try:
f, args, kwargs = next(gen)
except StopIteration as e:
return e.value

try:
try:
result = f(*args, **kwargs)
except Exception as exc:
exc_info = sys.exc_info()
with capture_internal_exceptions():
_capture_exception(exc)
reraise(*exc_info)

return gen.send(result)
except StopIteration as e:
return e.value

@wraps(f)
def _sentry_patched_create_sync(*args: "Any", **kwargs: "Any") -> "Any":
def _sentry_wrapped_create_sync(*args: "Any", **kwargs: "Any") -> "Any":
integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)
kwargs["integration"] = integration

try:
return _execute_sync(f, *args, **kwargs)
finally:
span = sentry_sdk.get_current_span()
if span is not None and span.status == SPANSTATUS.INTERNAL_ERROR:
with capture_internal_exceptions():
span.__exit__(None, None, None)
return _sentry_patched_create_sync(f, *args, **kwargs)

return _sentry_patched_create_sync
return _sentry_wrapped_create_sync


def _initialize_data_accumulation_state(stream: "Union[Stream, MessageStream]") -> None:
Expand Down Expand Up @@ -810,41 +869,14 @@ def close(self: "Union[Stream, MessageStream]") -> None:


def _wrap_message_create_async(f: "Any") -> "Any":
async def _execute_async(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
gen = _sentry_patched_create_common(f, *args, **kwargs)

try:
f, args, kwargs = next(gen)
except StopIteration as e:
return await e.value

try:
try:
result = await f(*args, **kwargs)
except Exception as exc:
exc_info = sys.exc_info()
with capture_internal_exceptions():
_capture_exception(exc)
reraise(*exc_info)

return gen.send(result)
except StopIteration as e:
return e.value

@wraps(f)
async def _sentry_patched_create_async(*args: "Any", **kwargs: "Any") -> "Any":
async def _sentry_wrapped_create_async(*args: "Any", **kwargs: "Any") -> "Any":
integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)
kwargs["integration"] = integration

try:
return await _execute_async(f, *args, **kwargs)
finally:
span = sentry_sdk.get_current_span()
if span is not None and span.status == SPANSTATUS.INTERNAL_ERROR:
with capture_internal_exceptions():
span.__exit__(None, None, None)

return _sentry_patched_create_async
return await _sentry_patched_create_async(f, *args, **kwargs)

return _sentry_wrapped_create_async


def _wrap_async_close(
Expand Down
Loading