1818import time
1919from typing import Any , Dict , List , Optional
2020
21+ from opentelemetry import baggage
2122from opentelemetry import context as otel_context
2223from opentelemetry .instrumentation .claude_agent_sdk .utils import (
2324 extract_usage_from_result_message ,
2728from opentelemetry .trace import set_span_in_context
2829from opentelemetry .util .genai .extended_handler import (
2930 ExtendedTelemetryHandler ,
30- get_extended_telemetry_handler ,
31+ )
32+ from opentelemetry .util .genai .extended_semconv .gen_ai_extended_attributes import (
33+ GEN_AI_SESSION_ID ,
34+ GEN_AI_USER_ID ,
3135)
3236from opentelemetry .util .genai .extended_types import (
3337 ExecuteToolInvocation ,
4650
4751logger = logging .getLogger (__name__ )
4852
49- # Storage for tool runs managed by client (created from response stream)
50- # Key: tool_use_id, Value: tool_invocation
51- _client_managed_runs : Dict [str , ExecuteToolInvocation ] = {}
53+
54+ def _current_baggage_value (key : str ) -> Optional [str ]:
55+ try :
56+ value = baggage .get_baggage (key )
57+ except Exception :
58+ return None
59+ if value is None :
60+ return None
61+ text = str (value ).strip ()
62+ return text or None
63+
64+
65+ def _entry_baggage_identity_attributes () -> Dict [str , str ]:
66+ attributes : Dict [str , str ] = {}
67+ session_id = _current_baggage_value (GEN_AI_SESSION_ID )
68+ user_id = _current_baggage_value (GEN_AI_USER_ID )
69+ if session_id :
70+ attributes [GEN_AI_SESSION_ID ] = session_id
71+ if user_id :
72+ attributes [GEN_AI_USER_ID ] = user_id
73+ return attributes
74+
75+
76+ def _apply_session_identity (
77+ invocation : Any , session_id : Optional [str ]
78+ ) -> None :
79+ """Apply Entry baggage identity first, then Claude's own session fallback."""
80+ entry_attributes = _entry_baggage_identity_attributes ()
81+ effective_session_id = (
82+ entry_attributes .get (GEN_AI_SESSION_ID ) or session_id
83+ )
84+
85+ if effective_session_id :
86+ if hasattr (invocation , "conversation_id" ):
87+ invocation .conversation_id = effective_session_id
88+ invocation .attributes [GEN_AI_SESSION_ID ] = effective_session_id
89+
90+ for key , value in entry_attributes .items ():
91+ invocation .attributes [key ] = value
5292
5393
54- def _clear_client_managed_runs () -> None :
94+ def _set_session_id (
95+ agent_invocation : InvokeAgentInvocation , session_id : Optional [str ]
96+ ) -> None :
97+ """Set Entry session id or Claude session id on an agent invocation."""
98+ _apply_session_identity (agent_invocation , session_id )
99+
100+
101+ def _set_llm_session_id (
102+ llm_invocation : LLMInvocation , session_id : Optional [str ]
103+ ) -> None :
104+ """Set Entry session id or Claude session id on an LLM invocation."""
105+ _apply_session_identity (llm_invocation , session_id )
106+
107+
108+ def _clear_client_managed_runs (
109+ handler : ExtendedTelemetryHandler ,
110+ client_managed_runs : Dict [str , ExecuteToolInvocation ],
111+ ) -> None :
55112 """Clear all client-managed tool runs.
56113
57114 This should be called when a conversation ends to avoid memory leaks
58115 and to clean up any orphaned tool runs.
59116 """
60- global _client_managed_runs
61-
62- try :
63- handler = get_extended_telemetry_handler ()
64- except Exception :
65- # If we can't get the handler (e.g., instrumentation not initialized),
66- # we still need to clear the tracking dictionary to prevent memory leaks.
67- _client_managed_runs .clear ()
68- return
69-
70117 # End any orphaned tool runs
71- for tool_use_id , tool_invocation in list (_client_managed_runs .items ()):
118+ for tool_use_id , tool_invocation in list (client_managed_runs .items ()):
72119 try :
73120 handler .fail_execute_tool (
74121 tool_invocation ,
@@ -83,7 +130,7 @@ def _clear_client_managed_runs() -> None:
83130 # Best effort cleanup: continue processing remaining tools.
84131 pass
85132
86- _client_managed_runs .clear ()
133+ client_managed_runs .clear ()
87134
88135
89136def _extract_message_parts (msg : Any ) -> List [Any ]:
@@ -112,6 +159,7 @@ def _create_tool_spans_from_message(
112159 handler : ExtendedTelemetryHandler ,
113160 agent_invocation : InvokeAgentInvocation ,
114161 active_task_stack : List [Any ],
162+ client_managed_runs : Dict [str , ExecuteToolInvocation ],
115163 exclude_tool_names : Optional [List [str ]] = None ,
116164) -> None :
117165 """Create tool execution spans from ToolUseBlocks in an AssistantMessage.
@@ -163,8 +211,11 @@ def _create_tool_spans_from_message(
163211 tool_call_arguments = tool_input ,
164212 tool_description = tool_name ,
165213 )
214+ _apply_session_identity (
215+ tool_invocation , agent_invocation .conversation_id
216+ )
166217 handler .start_execute_tool (tool_invocation )
167- _client_managed_runs [tool_use_id ] = tool_invocation
218+ client_managed_runs [tool_use_id ] = tool_invocation
168219
169220 # If this is a Task tool, create a SubAgent span under it
170221 # https://platform.claude.com/docs/en/agent-sdk/python#task
@@ -203,6 +254,10 @@ def _create_tool_spans_from_message(
203254 agent_description = task_description ,
204255 input_messages = input_messages ,
205256 )
257+ _set_session_id (
258+ subagent_invocation ,
259+ agent_invocation .conversation_id ,
260+ )
206261
207262 # Start SubAgent span
208263 handler .start_invoke_agent (subagent_invocation )
@@ -271,6 +326,7 @@ def _process_assistant_message(
271326 handler : ExtendedTelemetryHandler ,
272327 collected_messages : List [Dict [str , Any ]],
273328 active_task_stack : List [Any ],
329+ client_managed_runs : Dict [str , ExecuteToolInvocation ],
274330) -> None :
275331 """Process AssistantMessage: create LLM turn, extract parts, create tool spans."""
276332 parts = _extract_message_parts (msg )
@@ -353,7 +409,11 @@ def _process_assistant_message(
353409 turn_tracker .close_llm_turn ()
354410
355411 _create_tool_spans_from_message (
356- msg , handler , agent_invocation , active_task_stack
412+ msg ,
413+ handler ,
414+ agent_invocation ,
415+ active_task_stack ,
416+ client_managed_runs ,
357417 )
358418
359419
@@ -363,6 +423,7 @@ def _process_user_message(
363423 handler : ExtendedTelemetryHandler ,
364424 collected_messages : List [Dict [str , Any ]],
365425 active_task_stack : List [Any ],
426+ client_managed_runs : Dict [str , ExecuteToolInvocation ],
366427) -> None :
367428 """Process UserMessage: close tool spans, collect message content, mark next LLM start."""
368429 user_parts : List [MessagePart ] = []
@@ -376,8 +437,8 @@ def _process_user_message(
376437
377438 if block_type == "ToolResultBlock" :
378439 tool_use_id = getattr (block , "tool_use_id" , None )
379- if tool_use_id and tool_use_id in _client_managed_runs :
380- tool_invocation = _client_managed_runs .pop (tool_use_id )
440+ if tool_use_id and tool_use_id in client_managed_runs :
441+ tool_invocation = client_managed_runs .pop (tool_use_id )
381442
382443 # Set tool response
383444 tool_content = getattr (block , "content" , None )
@@ -533,7 +594,21 @@ def _process_system_message(
533594 if hasattr (msg , "data" ) and isinstance (msg .data , dict ):
534595 session_id = msg .data .get ("session_id" )
535596 if session_id :
536- agent_invocation .conversation_id = session_id
597+ _set_session_id (agent_invocation , session_id )
598+
599+
600+ def _process_stream_event_message (
601+ msg : Any ,
602+ agent_invocation : InvokeAgentInvocation ,
603+ ) -> None :
604+ """Process StreamEvent: extract session_id when streaming mode exposes it early."""
605+ session_id = getattr (msg , "session_id" , None )
606+ if not session_id :
607+ event = getattr (msg , "event" , None )
608+ if isinstance (event , dict ):
609+ session_id = event .get ("session_id" )
610+
611+ _set_session_id (agent_invocation , session_id )
537612
538613
539614def _process_result_message (
@@ -543,6 +618,8 @@ def _process_result_message(
543618) -> None :
544619 """Process ResultMessage: update session_id (fallback), token usage, and close any open LLM turn."""
545620
621+ _set_session_id (agent_invocation , getattr (msg , "session_id" , None ))
622+ turn_tracker .set_session_id (agent_invocation .conversation_id )
546623 _update_token_usage (agent_invocation , turn_tracker , msg )
547624
548625 if turn_tracker .current_llm_invocation :
@@ -554,6 +631,7 @@ async def _process_agent_invocation_stream(
554631 handler : ExtendedTelemetryHandler ,
555632 model : str ,
556633 prompt : str ,
634+ session_id : Optional [str ] = None ,
557635) -> Any :
558636 """Unified handler for processing agent invocation stream.
559637
@@ -564,18 +642,15 @@ async def _process_agent_invocation_stream(
564642 provider = infer_provider_from_base_url (),
565643 agent_name = "claude-agent" ,
566644 request_model = model ,
567- conversation_id = "" ,
645+ conversation_id = None ,
568646 input_messages = [
569647 InputMessage (role = "user" , parts = [Text (content = prompt )])
570648 ]
571649 if prompt
572650 else [],
573651 )
652+ _set_session_id (agent_invocation , session_id )
574653
575- # Attach empty context to clear any previous context, ensuring each query
576- # creates an independent root trace. This is important for scenarios where
577- # multiple queries are called in the same script - each should have its own trace_id.
578- empty_context_token = otel_context .attach (otel_context .Context ())
579654 handler .start_invoke_agent (agent_invocation )
580655
581656 query_start_time = time .time ()
@@ -589,13 +664,16 @@ async def _process_agent_invocation_stream(
589664 # When a Task tool is created, it's pushed here
590665 # When its ToolResultBlock is received, it's popped
591666 active_task_stack : List [Any ] = []
667+ client_managed_runs : Dict [str , ExecuteToolInvocation ] = {}
592668
593669 try :
594670 async for msg in wrapped_stream :
595671 msg_type = type (msg ).__name__
596672
597673 if msg_type == "SystemMessage" :
598674 _process_system_message (msg , agent_invocation )
675+ elif msg_type == "StreamEvent" :
676+ _process_stream_event_message (msg , agent_invocation )
599677 elif msg_type == "AssistantMessage" :
600678 _process_assistant_message (
601679 msg ,
@@ -606,6 +684,7 @@ async def _process_agent_invocation_stream(
606684 handler ,
607685 collected_messages ,
608686 active_task_stack ,
687+ client_managed_runs ,
609688 )
610689 elif msg_type == "UserMessage" :
611690 _process_user_message (
@@ -614,6 +693,7 @@ async def _process_agent_invocation_stream(
614693 handler ,
615694 collected_messages ,
616695 active_task_stack ,
696+ client_managed_runs ,
617697 )
618698 elif msg_type == "ResultMessage" :
619699 _process_result_message (msg , agent_invocation , turn_tracker )
@@ -648,11 +728,7 @@ async def _process_agent_invocation_stream(
648728 # Span closure failure should not break the application
649729 pass
650730
651- # Detach empty context token to restore the original context.
652- # Note: stop_invoke_agent/fail_invoke_agent already detached invocation.context_token,
653- # which restored to empty context. Now we detach empty_context_token to restore further.
654- otel_context .detach (empty_context_token )
655- _clear_client_managed_runs ()
731+ _clear_client_managed_runs (handler , client_managed_runs )
656732
657733
658734class AssistantTurnTracker :
@@ -728,8 +804,8 @@ def start_llm_turn(
728804 # Add conversation_id (session_id) to LLM span attributes
729805 # This is a custom extension beyond standard GenAI semantic conventions
730806 if agent_invocation and agent_invocation .conversation_id :
731- llm_invocation . attributes [ "gen_ai.conversation.id" ] = (
732- agent_invocation .conversation_id
807+ _set_llm_session_id (
808+ llm_invocation , agent_invocation .conversation_id
733809 )
734810
735811 self .handler .start_llm (llm_invocation )
@@ -774,6 +850,12 @@ def update_usage(
774850 if output_tokens is not None :
775851 target_invocation .output_tokens = output_tokens
776852
853+ def set_session_id (self , session_id : Optional [str ]) -> None :
854+ """Update the open LLM invocation with a late session id."""
855+ target_invocation = self .current_llm_invocation
856+ if target_invocation :
857+ _set_llm_session_id (target_invocation , session_id )
858+
777859 def close_llm_turn (self ) -> None :
778860 """Close the current LLM invocation span."""
779861 if self .current_llm_invocation :
@@ -798,6 +880,7 @@ def wrap_claude_client_init(wrapped, instance, args, kwargs, handler=None):
798880
799881 instance ._otel_handler = handler
800882 instance ._otel_prompt = None
883+ instance ._otel_session_id = None
801884
802885 return result
803886
@@ -808,6 +891,10 @@ def wrap_claude_client_query(wrapped, instance, args, kwargs, handler=None):
808891 instance ._otel_prompt = str (
809892 kwargs .get ("prompt" ) or (args [0 ] if args else "" )
810893 )
894+ session_id = kwargs .get ("session_id" )
895+ if session_id is None and len (args ) > 1 :
896+ session_id = args [1 ]
897+ instance ._otel_session_id = session_id
811898
812899 return wrapped (* args , ** kwargs )
813900
@@ -835,6 +922,7 @@ async def wrap_claude_client_receive_response(
835922 handler = handler ,
836923 model = model ,
837924 prompt = prompt ,
925+ session_id = getattr (instance , "_otel_session_id" , None ),
838926 ):
839927 yield msg
840928
@@ -852,11 +940,13 @@ async def wrap_query(wrapped, instance, args, kwargs, handler=None):
852940
853941 model = get_model_from_options_or_env (options )
854942 prompt_str = str (prompt ) if isinstance (prompt , str ) else ""
943+ session_id = getattr (options , "resume" , None ) if options else None
855944
856945 async for message in _process_agent_invocation_stream (
857946 wrapped (* args , ** kwargs ),
858947 handler = handler ,
859948 model = model ,
860949 prompt = prompt_str ,
950+ session_id = session_id ,
861951 ):
862952 yield message
0 commit comments