Skip to content

Commit 553c1df

Browse files
merge
2 parents 121f09b + 3009ba6 commit 553c1df

File tree

1 file changed

+160
-159
lines changed

1 file changed

+160
-159
lines changed

sentry_sdk/integrations/anthropic.py

Lines changed: 160 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
raise DidNotEnable("Anthropic not installed")
5757

5858
if TYPE_CHECKING:
59-
from typing import Any, AsyncIterator, Iterator, List, Optional, Union, Callable
59+
from typing import Any, AsyncIterator, Iterator, List, Optional, Union
6060
from sentry_sdk.tracing import Span
6161
from sentry_sdk._types import TextPart
6262

@@ -99,112 +99,6 @@ def setup_once() -> None:
9999
)
100100

101101

102-
def _wrap_synchronous_message_iterator(iterator, span, integration):
103-
model = None
104-
usage = _RecordedUsage()
105-
content_blocks: "list[str]" = []
106-
107-
for event in iterator:
108-
if not isinstance(
109-
event,
110-
(
111-
MessageStartEvent,
112-
MessageDeltaEvent,
113-
MessageStopEvent,
114-
ContentBlockStartEvent,
115-
ContentBlockDeltaEvent,
116-
ContentBlockStopEvent,
117-
),
118-
):
119-
yield event
120-
continue
121-
122-
(
123-
model,
124-
usage,
125-
content_blocks,
126-
) = _collect_ai_data(
127-
event,
128-
model,
129-
usage,
130-
content_blocks,
131-
)
132-
yield event
133-
134-
# Anthropic's input_tokens excludes cached/cache_write tokens.
135-
# Normalize to total input tokens for correct cost calculations.
136-
total_input = (
137-
usage.input_tokens
138-
+ (usage.cache_read_input_tokens or 0)
139-
+ (usage.cache_write_input_tokens or 0)
140-
)
141-
142-
_set_output_data(
143-
span=span,
144-
integration=integration,
145-
model=model,
146-
input_tokens=total_input,
147-
output_tokens=usage.output_tokens,
148-
cache_read_input_tokens=usage.cache_read_input_tokens,
149-
cache_write_input_tokens=usage.cache_write_input_tokens,
150-
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
151-
finish_span=True,
152-
)
153-
154-
155-
async def _wrap_asynchronous_message_iterator(iterator, span, integration):
156-
model = None
157-
usage = _RecordedUsage()
158-
content_blocks: "list[str]" = []
159-
160-
async for event in iterator:
161-
if not isinstance(
162-
event,
163-
(
164-
MessageStartEvent,
165-
MessageDeltaEvent,
166-
MessageStopEvent,
167-
ContentBlockStartEvent,
168-
ContentBlockDeltaEvent,
169-
ContentBlockStopEvent,
170-
),
171-
):
172-
yield event
173-
continue
174-
175-
(
176-
model,
177-
usage,
178-
content_blocks,
179-
) = _collect_ai_data(
180-
event,
181-
model,
182-
usage,
183-
content_blocks,
184-
)
185-
yield event
186-
187-
# Anthropic's input_tokens excludes cached/cache_write tokens.
188-
# Normalize to total input tokens for correct cost calculations.
189-
total_input = (
190-
usage.input_tokens
191-
+ (usage.cache_read_input_tokens or 0)
192-
+ (usage.cache_write_input_tokens or 0)
193-
)
194-
195-
_set_output_data(
196-
span=span,
197-
integration=integration,
198-
model=model,
199-
input_tokens=total_input,
200-
output_tokens=usage.output_tokens,
201-
cache_read_input_tokens=usage.cache_read_input_tokens,
202-
cache_write_input_tokens=usage.cache_write_input_tokens,
203-
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
204-
finish_span=True,
205-
)
206-
207-
208102
def _capture_exception(exc: "Any") -> None:
209103
set_span_errored()
210104

@@ -517,6 +411,129 @@ def _set_stream_input_data(
517411
)
518412

519413

414+
def _wrap_synchronous_message_iterator(
415+
iterator: "Iterator[RawMessageStreamEvent]",
416+
span: "Span",
417+
integration: "AnthropicIntegration",
418+
) -> "Iterator[RawMessageStreamEvent]":
419+
"""
420+
Sets information received while iterating the response stream on the AI Client Span.
421+
Responsible for closing the AI Client Span.
422+
"""
423+
424+
model = None
425+
usage = _RecordedUsage()
426+
content_blocks: "list[str]" = []
427+
428+
for event in iterator:
429+
if not isinstance(
430+
event,
431+
(
432+
MessageStartEvent,
433+
MessageDeltaEvent,
434+
MessageStopEvent,
435+
ContentBlockStartEvent,
436+
ContentBlockDeltaEvent,
437+
ContentBlockStopEvent,
438+
),
439+
):
440+
yield event
441+
continue
442+
443+
(
444+
model,
445+
usage,
446+
content_blocks,
447+
) = _collect_ai_data(
448+
event,
449+
model,
450+
usage,
451+
content_blocks,
452+
)
453+
yield event
454+
455+
# Anthropic's input_tokens excludes cached/cache_write tokens.
456+
# Normalize to total input tokens for correct cost calculations.
457+
total_input = (
458+
usage.input_tokens
459+
+ (usage.cache_read_input_tokens or 0)
460+
+ (usage.cache_write_input_tokens or 0)
461+
)
462+
463+
_set_output_data(
464+
span=span,
465+
integration=integration,
466+
model=model,
467+
input_tokens=total_input,
468+
output_tokens=usage.output_tokens,
469+
cache_read_input_tokens=usage.cache_read_input_tokens,
470+
cache_write_input_tokens=usage.cache_write_input_tokens,
471+
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
472+
finish_span=True,
473+
)
474+
475+
476+
async def _wrap_asynchronous_message_iterator(
477+
iterator: "AsyncIterator[RawMessageStreamEvent]",
478+
span: "Span",
479+
integration: "AnthropicIntegration",
480+
) -> "AsyncIterator[RawMessageStreamEvent]":
481+
"""
482+
Sets information received while iterating the response stream on the AI Client Span.
483+
Responsible for closing the AI Client Span.
484+
"""
485+
model = None
486+
usage = _RecordedUsage()
487+
content_blocks: "list[str]" = []
488+
489+
async for event in iterator:
490+
if not isinstance(
491+
event,
492+
(
493+
MessageStartEvent,
494+
MessageDeltaEvent,
495+
MessageStopEvent,
496+
ContentBlockStartEvent,
497+
ContentBlockDeltaEvent,
498+
ContentBlockStopEvent,
499+
),
500+
):
501+
yield event
502+
continue
503+
504+
(
505+
model,
506+
usage,
507+
content_blocks,
508+
) = _collect_ai_data(
509+
event,
510+
model,
511+
usage,
512+
content_blocks,
513+
)
514+
yield event
515+
516+
# Anthropic's input_tokens excludes cached/cache_write tokens.
517+
# Normalize to total input tokens for correct cost calculations.
518+
total_input = (
519+
usage.input_tokens
520+
+ (usage.cache_read_input_tokens or 0)
521+
+ (usage.cache_write_input_tokens or 0)
522+
)
523+
524+
_set_output_data(
525+
span=span,
526+
integration=integration,
527+
model=model,
528+
input_tokens=total_input,
529+
output_tokens=usage.output_tokens,
530+
cache_read_input_tokens=usage.cache_read_input_tokens,
531+
cache_write_input_tokens=usage.cache_write_input_tokens,
532+
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
533+
finish_span=True,
534+
)
535+
536+
520537
def _set_output_data(
521538
span: "Span",
522539
integration: "AnthropicIntegration",
@@ -718,55 +735,6 @@ async def _sentry_patched_create_async(*args: "Any", **kwargs: "Any") -> "Any":
718735
return _sentry_patched_create_async
719736

720737

721-
def _sentry_patched_stream_common(
722-
stream: "MessageStream",
723-
max_tokens: "int",
724-
messages: "Iterable[MessageParam]",
725-
model: "ModelParam",
726-
system: "Union[str, Iterable[TextBlockParam]]",
727-
temperature: "float",
728-
top_k: "int",
729-
top_p: "float",
730-
tools: "Iterable[ToolUnionParam]",
731-
) -> None:
732-
integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)
733-
734-
if integration is None:
735-
return stream
736-
737-
if messages is None:
738-
return stream
739-
740-
try:
741-
iter(messages)
742-
except TypeError:
743-
return stream
744-
745-
if model is None:
746-
model = ""
747-
748-
span = get_start_span_function()(
749-
op=OP.GEN_AI_CHAT,
750-
name=f"chat {model}".strip(),
751-
origin=AnthropicIntegration.origin,
752-
)
753-
span.__enter__()
754-
755-
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)
756-
_set_stream_input_data(
757-
span,
758-
integration,
759-
max_tokens=max_tokens,
760-
messages=messages,
761-
model=model,
762-
system=system,
763-
temperature=temperature,
764-
top_k=top_k,
765-
top_p=top_p,
766-
tools=tools,
767-
)
768-
769-
770738
def _wrap_message_stream(f: "Any") -> "Any":
771739
"""
772740
Attaches user-provided arguments to the returned context manager.
@@ -802,17 +770,50 @@ def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStream":
802770
if not hasattr(self, "_max_tokens"):
803771
return stream
804772

805-
_sentry_patched_stream_common(
806-
stream=stream,
773+
integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)
774+
775+
if integration is None:
776+
return stream
777+
778+
if self._messages is None:
779+
return stream
780+
781+
try:
782+
iter(self._messages)
783+
except TypeError:
784+
return stream
785+
786+
model = self._model
787+
if model is None:
788+
model = ""
789+
790+
span = get_start_span_function()(
791+
op=OP.GEN_AI_CHAT,
792+
name=f"chat {model}".strip(),
793+
origin=AnthropicIntegration.origin,
794+
)
795+
span.__enter__()
796+
797+
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)
798+
_set_stream_input_data(
799+
span,
800+
integration,
807801
max_tokens=self._max_tokens,
808802
messages=self._messages,
809-
model=self._model,
803+
model=model,
810804
system=self._system,
811805
temperature=self._temperature,
812806
top_k=self._top_k,
813807
top_p=self._top_p,
814808
tools=self._tools,
815809
)
810+
811+
stream._iterator = _wrap_synchronous_message_iterator(
812+
iterator=stream._iterator,
813+
span=span,
814+
integration=integration,
815+
)
816+
816817
return stream
817818

818819
return _sentry_patched_enter

0 commit comments

Comments
 (0)