@@ -89,6 +89,44 @@ class _RecordedUsage:
8989 cache_read_input_tokens : "Optional[int]" = 0
9090
9191
92+ class _StreamSpanContext :
93+ def __init__ (
94+ self , stream : "Union[Stream, MessageStream, AsyncStream, AsyncMessageStream]"
95+ ) -> None :
96+ self ._stream = stream
97+
98+ def __enter__ (self ) -> "_StreamSpanContext" :
99+ return self
100+
101+ def __exit__ (
102+ self ,
103+ exc_type : "Optional[type[BaseException]]" ,
104+ exc_val : "Optional[BaseException]" ,
105+ exc_tb : "Optional[Any]" ,
106+ ) -> bool :
107+ with capture_internal_exceptions ():
108+ if not hasattr (self ._stream , "_span" ):
109+ return False
110+
111+ if not hasattr (self ._stream , "_model" ):
112+ self ._stream ._span .__exit__ (exc_type , exc_val , exc_tb )
113+ del self ._stream ._span
114+ return False
115+
116+ _set_streaming_output_data (
117+ span = self ._stream ._span ,
118+ integration = self ._stream ._integration ,
119+ model = self ._stream ._model ,
120+ usage = self ._stream ._usage ,
121+ content_blocks = self ._stream ._content_blocks ,
122+ response_id = self ._stream ._response_id ,
123+ finish_reason = self ._stream ._finish_reason ,
124+ )
125+
126+ self ._stream ._span .__exit__ (exc_type , exc_val , exc_tb )
127+ del self ._stream ._span
128+
129+
92130class AnthropicIntegration (Integration ):
93131 identifier = "anthropic"
94132 origin = f"auto.ai.{ identifier } "
@@ -446,7 +484,7 @@ def _wrap_synchronous_message_iterator(
446484 Sets information received while iterating the response stream on the AI Client Span.
447485 Responsible for closing the AI Client Span unless the span has already been closed in the close() patch.
448486 """
449- try :
487+ with _StreamSpanContext ( stream ) :
450488 for event in iterator :
451489 # Message and content types are aliases for corresponding Raw* types, introduced in
452490 # https://github.com/anthropics/anthropic-sdk-python/commit/bc9d11cd2addec6976c46db10b7c89a8c276101a
@@ -466,21 +504,6 @@ def _wrap_synchronous_message_iterator(
466504
467505 _accumulate_event_data (stream , event )
468506 yield event
469- finally :
470- exc_info = sys .exc_info ()
471- with capture_internal_exceptions ():
472- if hasattr (stream , "_span" ):
473- _set_streaming_output_data (
474- span = stream ._span ,
475- integration = stream ._integration ,
476- model = stream ._model ,
477- usage = stream ._usage ,
478- content_blocks = stream ._content_blocks ,
479- response_id = stream ._response_id ,
480- finish_reason = stream ._finish_reason ,
481- )
482- stream ._span .__exit__ (* exc_info )
483- del stream ._span
484507
485508
486509async def _wrap_asynchronous_message_iterator (
@@ -491,7 +514,7 @@ async def _wrap_asynchronous_message_iterator(
491514 Sets information received while iterating the response stream on the AI Client Span.
492515 Responsible for closing the AI Client Span unless the span has already been closed in the close() patch.
493516 """
494- try :
517+ with _StreamSpanContext ( stream ) :
495518 async for event in iterator :
496519 # Message and content types are aliases for corresponding Raw* types, introduced in
497520 # https://github.com/anthropics/anthropic-sdk-python/commit/bc9d11cd2addec6976c46db10b7c89a8c276101a
@@ -511,21 +534,6 @@ async def _wrap_asynchronous_message_iterator(
511534
512535 _accumulate_event_data (stream , event )
513536 yield event
514- finally :
515- exc_info = sys .exc_info ()
516- with capture_internal_exceptions ():
517- if hasattr (stream , "_span" ):
518- _set_streaming_output_data (
519- span = stream ._span ,
520- integration = stream ._integration ,
521- model = stream ._model ,
522- usage = stream ._usage ,
523- content_blocks = stream ._content_blocks ,
524- response_id = stream ._response_id ,
525- finish_reason = stream ._finish_reason ,
526- )
527- stream ._span .__exit__ (* exc_info )
528- del stream ._span
529537
530538
531539def _set_output_data (
@@ -784,28 +792,9 @@ def _wrap_close(
784792 """
785793
786794 def close (self : "Union[Stream, MessageStream]" ) -> None :
787- if not hasattr (self , "_span" ):
788- return f (self )
789-
790- if not hasattr (self , "_model" ):
791- self ._span .__exit__ (None , None , None )
792- del self ._span
795+ with _StreamSpanContext (self ):
793796 return f (self )
794797
795- _set_streaming_output_data (
796- span = self ._span ,
797- integration = self ._integration ,
798- model = self ._model ,
799- usage = self ._usage ,
800- content_blocks = self ._content_blocks ,
801- response_id = self ._response_id ,
802- finish_reason = self ._finish_reason ,
803- )
804- self ._span .__exit__ (None , None , None )
805- del self ._span
806-
807- return f (self )
808-
809798 return close
810799
811800
0 commit comments