@@ -499,18 +499,20 @@ def _converse_on_success(
499499 [stop_reason ],
500500 )
501501
502- event_logger = instrumentor_context .event_logger
503- choice = _Choice .from_converse (result , capture_content )
504- # this path is used by streaming apis, in that case we are already out of the span
505- # context so need to add the span context manually
506- span_ctx = span .get_span_context ()
507- event_logger .emit (
508- choice .to_choice_event (
509- trace_id = span_ctx .trace_id ,
510- span_id = span_ctx .span_id ,
511- trace_flags = span_ctx .trace_flags ,
502+ # In case of an early stream closure, the result may not contain outputs
503+ if self ._stream_has_output_content (result ):
504+ event_logger = instrumentor_context .event_logger
505+ choice = _Choice .from_converse (result , capture_content )
506+ # this path is used by streaming apis, in that case we are already out of the span
507+ # context so need to add the span context manually
508+ span_ctx = span .get_span_context ()
509+ event_logger .emit (
510+ choice .to_choice_event (
511+ trace_id = span_ctx .trace_id ,
512+ span_id = span_ctx .span_id ,
513+ trace_flags = span_ctx .trace_flags ,
514+ )
512515 )
513- )
514516
515517 metrics = instrumentor_context .metrics
516518 metrics_attributes = self ._extract_metrics_attributes ()
@@ -602,11 +604,14 @@ def _on_stream_error_callback(
602604 span : Span ,
603605 exception ,
604606 instrumentor_context : _BotocoreInstrumentorContext ,
607+ span_ended : bool ,
605608 ):
606609 span .set_status (Status (StatusCode .ERROR , str (exception )))
607610 if span .is_recording ():
608611 span .set_attribute (ERROR_TYPE , type (exception ).__qualname__ )
609- span .end ()
612+
613+ if not span_ended :
614+ span .end ()
610615
611616 metrics = instrumentor_context .metrics
612617 metrics_attributes = {
@@ -638,15 +643,17 @@ def on_success(
638643 result ["stream" ], EventStream
639644 ):
640645
641- def stream_done_callback (response ):
646+ def stream_done_callback (response , span_ended ):
642647 self ._converse_on_success (
643648 span , response , instrumentor_context , capture_content
644649 )
645- span .end ()
646650
647- def stream_error_callback (exception ):
651+ if not span_ended :
652+ span .end ()
653+
654+ def stream_error_callback (exception , span_ended ):
648655 self ._on_stream_error_callback (
649- span , exception , instrumentor_context
656+ span , exception , instrumentor_context , span_ended
650657 )
651658
652659 result ["stream" ] = ConverseStreamWrapper (
@@ -677,16 +684,17 @@ def stream_error_callback(exception):
677684 elif self ._call_context .operation == "InvokeModelWithResponseStream" :
678685 if "body" in result and isinstance (result ["body" ], EventStream ):
679686
680- def invoke_model_stream_done_callback (response ):
687+ def invoke_model_stream_done_callback (response , span_ended ):
681688 # the callback gets data formatted as the simpler converse API
682689 self ._converse_on_success (
683690 span , response , instrumentor_context , capture_content
684691 )
685- span .end ()
692+ if not span_ended :
693+ span .end ()
686694
687- def invoke_model_stream_error_callback (exception ):
695+ def invoke_model_stream_error_callback (exception , span_ended ):
688696 self ._on_stream_error_callback (
689- span , exception , instrumentor_context
697+ span , exception , instrumentor_context , span_ended
690698 )
691699
692700 result ["body" ] = InvokeModelWithResponseStreamWrapper (
@@ -781,9 +789,11 @@ def _handle_amazon_nova_response(
781789 GEN_AI_RESPONSE_FINISH_REASONS , [response_body ["stopReason" ]]
782790 )
783791
784- event_logger = instrumentor_context .event_logger
785- choice = _Choice .from_converse (response_body , capture_content )
786- event_logger .emit (choice .to_choice_event ())
792+ # In case of an early stream closure, the result may not contain outputs
793+ if self ._stream_has_output_content (response_body ):
794+ event_logger = instrumentor_context .event_logger
795+ choice = _Choice .from_converse (response_body , capture_content )
796+ event_logger .emit (choice .to_choice_event ())
787797
788798 metrics = instrumentor_context .metrics
789799 metrics_attributes = self ._extract_metrics_attributes ()
@@ -1004,3 +1014,8 @@ def on_error(
10041014 duration ,
10051015 attributes = metrics_attributes ,
10061016 )
1017+
1018+ def _stream_has_output_content (self , response_body : dict [str , Any ]):
1019+ return (
1020+ "output" in response_body and "message" in response_body ["output" ]
1021+ )
0 commit comments