From 27a5cc9d0df239ad223b1fbffa5bd31fd91594a2 Mon Sep 17 00:00:00 2001 From: shuningc Date: Sun, 19 Apr 2026 17:24:05 -0700 Subject: [PATCH 1/2] Add streaming TTFT (Time To First Token) support for LlamaIndex instrumentation Introduces an EventHandler that listens to LlamaIndex instrumentation event system (LLMChatStartEvent/LLMChatInProgressEvent) to measure the time between an LLM request and the first streaming token. This metric is recorded as gen_ai.response.time_to_first_chunk on the LLM span. The implementation bridges two systems: - CallbackHandler (fires on_event_start/on_event_end with event_id) - EventHandler (fires per-token with span_id) A ContextVar correlates the callback event_id with the event span_id, and TTFTTracker calculates the delta. Co-Authored-By: Claude Opus 4.6 --- .../CHANGELOG.md | 9 + .../instrumentation/llamaindex/__init__.py | 41 ++- .../llamaindex/callback_handler.py | 20 +- .../llamaindex/event_handler.py | 179 ++++++++++++ .../llamaindex/invocation_manager.py | 31 +- .../tests/test_ttft.py | 270 ++++++++++++++++++ 6 files changed, 547 insertions(+), 3 deletions(-) create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/event_handler.py create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_ttft.py diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/CHANGELOG.md index 9ccb35f0..c9a02a16 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/CHANGELOG.md +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/CHANGELOG.md @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- Streaming TTFT (Time To First Token) support for LLM spans: + - `gen_ai.response.time_to_first_chunk` attribute measuring latency to first streaming token + - `gen_ai.request.stream` flag (true when streaming detected, false otherwise) +- TTFT tracking via LlamaIndex event system (`event_handler.py`): + - `TTFTTracker` class for recording start times and calculating TTFT + - `LlamaindexEventHandler` listening to `LLMChatStartEvent`/`LLMChatInProgressEvent` for per-chunk timing + - ContextVar correlation bridging callback handler (event_id) with event handler (span_id) + ### Fixed - Corrected retrieval span `gen_ai.operation.name` from `"retrieve"` to `"retrieval"` per OpenTelemetry semantic conventions. Removed explicit override in callback handler; now uses the `RetrievalInvocation` dataclass default from `util-genai`. diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py index f4619fe5..0b3c332a 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py @@ -4,6 +4,13 @@ from opentelemetry.instrumentation.llamaindex.callback_handler import ( LlamaindexCallbackHandler, ) +from opentelemetry.instrumentation.llamaindex.invocation_manager import ( + _InvocationManager, +) +from opentelemetry.instrumentation.llamaindex.event_handler import ( + LlamaindexEventHandler, + TTFTTracker, +) from opentelemetry.instrumentation.utils import unwrap from opentelemetry.instrumentation.llamaindex.workflow_instrumentation import ( wrap_agent_run, @@ -40,10 +47,29 @@ def _instrument(self, **kwargs): logger_provider=logger_provider, ) + # Create shared TTFT tracker and invocation manager + ttft_tracker = TTFTTracker() + invocation_manager = _InvocationManager() + invocation_manager.set_ttft_tracker(ttft_tracker) + llamaindexCallBackHandler = LlamaindexCallbackHandler( - telemetry_handler=self._telemetry_handler + telemetry_handler=self._telemetry_handler, + invocation_manager=invocation_manager, ) + # Create and register event handler for TTFT tracking + event_handler = LlamaindexEventHandler(ttft_tracker=ttft_tracker) + self._event_handler = event_handler + try: + from llama_index.core.instrumentation import get_dispatcher + + dispatcher = get_dispatcher() + dispatcher.add_event_handler(event_handler) + self._dispatcher = dispatcher + except Exception: + # Event system might not be available in older versions + self._dispatcher = None + wrap_function_wrapper( module="llama_index.core.callbacks.base", name="CallbackManager.__init__", @@ -90,6 +116,19 @@ def _instrument(self, **kwargs): def _uninstrument(self, **kwargs): unwrap("llama_index.core.callbacks.base", "CallbackManager.__init__") + # Clean up event handler registration + if ( + hasattr(self, "_dispatcher") + and self._dispatcher + and hasattr(self, "_event_handler") + ): + try: + # Note: LlamaIndex dispatcher may not have remove_event_handler + # In that case, the handler will be garbage collected when + # the instrumentor is destroyed + pass + except Exception: + pass class _BaseCallbackManagerInitWrapper: diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py index 89e042b3..fdcc7e7e 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py @@ -19,6 +19,7 @@ from .invocation_manager import _InvocationManager from .vendor_detection import detect_vendor_from_class +from .event_handler import set_current_llm_event_id def _safe_str(value: Any) -> str: @@ -121,6 +122,7 @@ class LlamaindexCallbackHandler(BaseCallbackHandler): def __init__( self, telemetry_handler: Optional[TelemetryHandler] = None, + invocation_manager: Optional[_InvocationManager] = None, ) -> None: super().__init__( event_starts_to_ignore=[], @@ -128,7 +130,7 @@ def __init__( ) self._handler = telemetry_handler self._auto_workflow_ids: List[str] = [] # Track auto-created workflows (stack) - self._invocation_manager = _InvocationManager() + self._invocation_manager = invocation_manager or _InvocationManager() def start_trace(self, trace_id: Optional[str] = None) -> None: """Start a trace - required by BaseCallbackHandler.""" @@ -308,6 +310,9 @@ def _handle_llm_start( if not self._handler or not payload: return + # Set current event_id for TTFT correlation with EventHandler + set_current_llm_event_id(event_id) + # Extract model information and parameters from payload serialized = payload.get("serialized", {}) model_name = ( @@ -469,6 +474,19 @@ def _handle_llm_end( if llm_inv.output_tokens is None: llm_inv.output_tokens = _get_attr(usage, "output_tokens") + # Get TTFT from EventHandler via InvocationManager + ttft = self._invocation_manager.get_ttft_for_event(event_id) + if ttft is not None: + llm_inv.attributes["gen_ai.response.time_to_first_chunk"] = ttft + llm_inv.request_stream = True + else: + # Explicitly mark as non-streaming when no TTFT was recorded + if llm_inv.request_stream is None: + llm_inv.request_stream = False + + # Clear current event_id + set_current_llm_event_id(None) + # Stop the LLM invocation llm_inv = self._handler.stop_llm(llm_inv) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/event_handler.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/event_handler.py new file mode 100644 index 00000000..5008e082 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/event_handler.py @@ -0,0 +1,179 @@ +""" +TTFT (Time To First Token) tracking for LlamaIndex using the instrumentation event system. + +This module provides: +- TTFTTracker: Records start times and calculates TTFT when first streaming token arrives +- LlamaindexEventHandler: Listens to LLM streaming events and populates TTFTTracker +- ContextVar correlation: Bridges callback handler (event_id) with event handler (span_id) +""" + +import time +from contextvars import ContextVar +from typing import Any, Dict, Optional + +from llama_index.core.instrumentation.events.llm import ( + LLMChatInProgressEvent, + LLMChatStartEvent, +) + +try: + from llama_index.core.instrumentation.event_handlers import BaseEventHandler +except ImportError: + # For versions of llama_index that don't have BaseEventHandler + BaseEventHandler = object # type: ignore + + +# ContextVar to store the current LLM event_id from callback handler +# This allows EventHandler to correlate its span_id with callback's event_id +_current_llm_event_id: ContextVar[Optional[str]] = ContextVar( + "_current_llm_event_id", default=None +) + + +def set_current_llm_event_id(event_id: Optional[str]) -> None: + """Set the current LLM event_id from callback handler.""" + _current_llm_event_id.set(event_id) + + +def get_current_llm_event_id() -> Optional[str]: + """Get the current LLM event_id from callback handler.""" + return _current_llm_event_id.get() + + +class TTFTTracker: + """Track Time To First Token for streaming LLM responses. + + This class: + - Records when an LLM call starts (by span_id from instrumentation events) + - Records when the first streaming token arrives + - Calculates TTFT = first_token_time - start_time + - Maps callback event_id to instrumentation span_id for cross-correlation + """ + + def __init__(self) -> None: + # span_id -> start_time (when LLM call started) + self._start_times: Dict[str, float] = {} + # span_id -> ttft (calculated time to first token) + self._ttft_values: Dict[str, float] = {} + # span_id -> bool (whether first token has been received) + self._first_token_received: Dict[str, bool] = {} + # event_id -> span_id (map callback event_id to instrumentation span_id) + self._event_span_map: Dict[str, str] = {} + + def record_start(self, span_id: str) -> None: + """Record the start time for an LLM call.""" + self._start_times[span_id] = time.perf_counter() + self._first_token_received[span_id] = False + + def record_first_token(self, span_id: str) -> Optional[float]: + """Record when the first token arrives and calculate TTFT. + + Returns TTFT in seconds if this is the first token, None otherwise. + """ + if span_id not in self._start_times: + return None + + if self._first_token_received.get(span_id, False): + # Already received first token + return None + + self._first_token_received[span_id] = True + ttft = time.perf_counter() - self._start_times[span_id] + self._ttft_values[span_id] = ttft + return ttft + + def get_ttft(self, span_id: str) -> Optional[float]: + """Get the TTFT for a span_id, if available.""" + return self._ttft_values.get(span_id) + + def is_streaming(self, span_id: str) -> bool: + """Check if streaming has started for this span.""" + return self._first_token_received.get(span_id, False) + + def associate_event_span(self, event_id: str, span_id: str) -> None: + """Associate a callback event_id with an instrumentation span_id.""" + self._event_span_map[event_id] = span_id + + def get_span_for_event(self, event_id: str) -> Optional[str]: + """Get the span_id associated with an event_id.""" + return self._event_span_map.get(event_id) + + def get_ttft_by_event(self, event_id: str) -> Optional[float]: + """Get TTFT using callback event_id.""" + span_id = self._event_span_map.get(event_id) + if span_id: + return self.get_ttft(span_id) + return None + + def is_streaming_by_event(self, event_id: str) -> bool: + """Check if streaming has started using callback event_id.""" + span_id = self._event_span_map.get(event_id) + if span_id: + return self.is_streaming(span_id) + return False + + def cleanup(self, span_id: str) -> None: + """Clean up tracking data for a completed span.""" + self._start_times.pop(span_id, None) + self._ttft_values.pop(span_id, None) + self._first_token_received.pop(span_id, None) + # Also clean up event mapping + event_ids_to_remove = [ + eid for eid, sid in self._event_span_map.items() if sid == span_id + ] + for event_id in event_ids_to_remove: + self._event_span_map.pop(event_id, None) + + def cleanup_by_event(self, event_id: str) -> None: + """Clean up tracking data using callback event_id.""" + span_id = self._event_span_map.pop(event_id, None) + if span_id: + self.cleanup(span_id) + + +class LlamaindexEventHandler(BaseEventHandler): + """Event handler that captures LLM streaming events for TTFT calculation. + + This handler: + 1. Listens for LLMChatStartEvent to record start time + 2. Listens for LLMChatInProgressEvent (first token) to calculate TTFT + 3. Associates callback event_id with instrumentation span_id via ContextVar + """ + + def __init__(self, ttft_tracker: TTFTTracker) -> None: + self._tracker = ttft_tracker + + @classmethod + def class_name(cls) -> str: + """Return the class name for LlamaIndex dispatcher.""" + return "LlamaindexTTFTEventHandler" + + def handle(self, event: Any, **kwargs: Any) -> None: + """Handle LlamaIndex instrumentation events.""" + if isinstance(event, LLMChatStartEvent): + self._handle_start(event) + elif isinstance(event, LLMChatInProgressEvent): + self._handle_progress(event) + + def _handle_start(self, event: LLMChatStartEvent) -> None: + """Handle LLM chat start event - record start time.""" + span_id = str(event.span_id) if hasattr(event, "span_id") else None + if not span_id: + return + + # Record start time + self._tracker.record_start(span_id) + + # Associate with callback event_id if available + event_id = get_current_llm_event_id() + if event_id: + self._tracker.associate_event_span(event_id, span_id) + + def _handle_progress(self, event: LLMChatInProgressEvent) -> None: + """Handle LLM chat in-progress event - record first token.""" + span_id = str(event.span_id) if hasattr(event, "span_id") else None + if not span_id: + return + + # Record first token (TTFTTracker handles deduplication) + self._tracker.record_first_token(span_id) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/invocation_manager.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/invocation_manager.py index 1f831be1..54171d90 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/invocation_manager.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/invocation_manager.py @@ -14,7 +14,7 @@ from contextvars import ContextVar, Token from dataclasses import dataclass, field -from typing import Any, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union from opentelemetry.util.genai.types import ( AgentInvocation, @@ -25,6 +25,9 @@ ToolCall, ) +if TYPE_CHECKING: + from .event_handler import TTFTTracker + __all__ = ["_InvocationManager"] @@ -126,3 +129,29 @@ def get_current_agent_invocation(self) -> Optional[Any]: if not key: return None return self._agent_invocation_by_key.get(key) + + # ==================== TTFT Tracking Methods ==================== + + def set_ttft_tracker(self, tracker: "TTFTTracker") -> None: + """Set the TTFTTracker instance for TTFT correlation.""" + self._ttft_tracker = tracker + + def get_ttft_for_event(self, event_id: str) -> Optional[float]: + """Get TTFT for a callback event_id, if available.""" + tracker = getattr(self, "_ttft_tracker", None) + if tracker: + return tracker.get_ttft_by_event(event_id) + return None + + def is_streaming_event(self, event_id: str) -> bool: + """Check if streaming has started for a callback event_id.""" + tracker = getattr(self, "_ttft_tracker", None) + if tracker: + return tracker.is_streaming_by_event(event_id) + return False + + def cleanup_event_tracking(self, event_id: str) -> None: + """Clean up TTFT tracking data for an event_id.""" + tracker = getattr(self, "_ttft_tracker", None) + if tracker: + tracker.cleanup_by_event(event_id) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_ttft.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_ttft.py new file mode 100644 index 00000000..fa210a66 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_ttft.py @@ -0,0 +1,270 @@ +"""Test TTFT (Time To First Token) tracking for LlamaIndex instrumentation. + +Tests the TTFTTracker, LlamaindexEventHandler, and the correlation between +callback event_id and instrumentation span_id via ContextVar. +""" + +import time + +from opentelemetry.instrumentation.llamaindex.event_handler import ( + TTFTTracker, + LlamaindexEventHandler, + set_current_llm_event_id, + get_current_llm_event_id, +) +from opentelemetry.instrumentation.llamaindex.invocation_manager import ( + _InvocationManager, +) + + +# ==================== TTFTTracker Unit Tests ==================== + + +class TestTTFTTracker: + """Test TTFTTracker in isolation.""" + + def test_record_start_and_first_token(self): + tracker = TTFTTracker() + tracker.record_start("span-1") + time.sleep(0.01) # small delay to get measurable TTFT + ttft = tracker.record_first_token("span-1") + + assert ttft is not None + assert ttft > 0 + assert ttft < 1.0 # should be much less than 1 second + + def test_second_token_returns_none(self): + tracker = TTFTTracker() + tracker.record_start("span-1") + tracker.record_first_token("span-1") + # Second call should return None + result = tracker.record_first_token("span-1") + assert result is None + + def test_get_ttft(self): + tracker = TTFTTracker() + tracker.record_start("span-1") + tracker.record_first_token("span-1") + + ttft = tracker.get_ttft("span-1") + assert ttft is not None + assert ttft > 0 + + def test_get_ttft_no_token(self): + tracker = TTFTTracker() + tracker.record_start("span-1") + assert tracker.get_ttft("span-1") is None + + def test_get_ttft_unknown_span(self): + tracker = TTFTTracker() + assert tracker.get_ttft("nonexistent") is None + + def test_is_streaming(self): + tracker = TTFTTracker() + tracker.record_start("span-1") + assert not tracker.is_streaming("span-1") + + tracker.record_first_token("span-1") + assert tracker.is_streaming("span-1") + + def test_associate_event_span(self): + tracker = TTFTTracker() + tracker.associate_event_span("event-1", "span-1") + tracker.record_start("span-1") + tracker.record_first_token("span-1") + + ttft = tracker.get_ttft_by_event("event-1") + assert ttft is not None + assert ttft > 0 + + def test_is_streaming_by_event(self): + tracker = TTFTTracker() + tracker.associate_event_span("event-1", "span-1") + tracker.record_start("span-1") + + assert not tracker.is_streaming_by_event("event-1") + tracker.record_first_token("span-1") + assert tracker.is_streaming_by_event("event-1") + + def test_cleanup(self): + tracker = TTFTTracker() + tracker.associate_event_span("event-1", "span-1") + tracker.record_start("span-1") + tracker.record_first_token("span-1") + + # Verify data exists + assert tracker.get_ttft("span-1") is not None + assert tracker.get_ttft_by_event("event-1") is not None + + # Cleanup + tracker.cleanup("span-1") + + assert tracker.get_ttft("span-1") is None + assert tracker.get_ttft_by_event("event-1") is None + assert not tracker.is_streaming("span-1") + + def test_cleanup_by_event(self): + tracker = TTFTTracker() + tracker.associate_event_span("event-1", "span-1") + tracker.record_start("span-1") + tracker.record_first_token("span-1") + + tracker.cleanup_by_event("event-1") + + assert tracker.get_ttft("span-1") is None + assert tracker.get_ttft_by_event("event-1") is None + + def test_multiple_concurrent_spans(self): + tracker = TTFTTracker() + tracker.associate_event_span("event-1", "span-1") + tracker.associate_event_span("event-2", "span-2") + + tracker.record_start("span-1") + time.sleep(0.01) + tracker.record_start("span-2") + time.sleep(0.01) + + tracker.record_first_token("span-2") # span-2 gets token first + time.sleep(0.01) + tracker.record_first_token("span-1") # span-1 gets token later + + ttft1 = tracker.get_ttft_by_event("event-1") + ttft2 = tracker.get_ttft_by_event("event-2") + + assert ttft1 is not None + assert ttft2 is not None + # span-1 started earlier but got token later, so its TTFT should be larger + assert ttft1 > ttft2 + + +# ==================== ContextVar Correlation Tests ==================== + + +class TestContextVarCorrelation: + """Test the ContextVar-based event_id <-> span_id correlation.""" + + def test_set_and_get_event_id(self): + set_current_llm_event_id("evt-123") + assert get_current_llm_event_id() == "evt-123" + + set_current_llm_event_id(None) + assert get_current_llm_event_id() is None + + def test_event_handler_associates_on_start(self): + """When LLMChatStartEvent fires, EventHandler should associate + the current event_id with the event's span_id.""" + tracker = TTFTTracker() + handler = LlamaindexEventHandler(ttft_tracker=tracker) + + # Simulate: CallbackHandler sets event_id before LLM call + set_current_llm_event_id("callback-event-42") + + # Simulate: LLMChatStartEvent fires + from llama_index.core.instrumentation.events.llm import LLMChatStartEvent + + start_event = LLMChatStartEvent( + messages=[], + model_dict={}, + additional_kwargs={}, + span_id="llama-span-99", + ) + handler.handle(start_event) + + # Verify association + assert tracker._event_span_map["callback-event-42"] == "llama-span-99" + + # Verify start time recorded + assert "llama-span-99" in tracker._start_times + + # Clean up + set_current_llm_event_id(None) + + def test_end_to_end_ttft_flow(self): + """Full flow: CallbackHandler sets event_id -> EventHandler records TTFT + -> InvocationManager retrieves TTFT by event_id.""" + tracker = TTFTTracker() + handler = LlamaindexEventHandler(ttft_tracker=tracker) + inv_mgr = _InvocationManager() + inv_mgr.set_ttft_tracker(tracker) + + # Step 1: CallbackHandler._handle_llm_start sets event_id + set_current_llm_event_id("cb-event-1") + + # Step 2: LLMChatStartEvent fires (inside LlamaIndex LLM call) + from llama_index.core.instrumentation.events.llm import ( + LLMChatStartEvent, + LLMChatInProgressEvent, + ) + from llama_index.core.llms import ChatResponse, ChatMessage + + start_event = LLMChatStartEvent( + messages=[], + model_dict={}, + additional_kwargs={}, + span_id="internal-span-1", + ) + handler.handle(start_event) + + # Step 3: Simulate some processing time + time.sleep(0.02) + + # Step 4: LLMChatInProgressEvent fires (first streaming chunk) + progress_event = LLMChatInProgressEvent( + messages=[], + response=ChatResponse(message=ChatMessage(content="Hello")), + span_id="internal-span-1", + ) + handler.handle(progress_event) + + # Step 5: Second chunk - should NOT update TTFT + time.sleep(0.01) + handler.handle(progress_event) + + # Step 6: CallbackHandler._handle_llm_end retrieves TTFT + ttft = inv_mgr.get_ttft_for_event("cb-event-1") + assert ttft is not None + assert ttft >= 0.02 # at least the sleep time + assert ttft < 1.0 + + # Also check streaming flag + assert inv_mgr.is_streaming_event("cb-event-1") + + # Step 7: Cleanup + inv_mgr.cleanup_event_tracking("cb-event-1") + set_current_llm_event_id(None) + + assert inv_mgr.get_ttft_for_event("cb-event-1") is None + + def test_non_streaming_no_ttft(self): + """Non-streaming calls should not have TTFT.""" + tracker = TTFTTracker() + handler = LlamaindexEventHandler(ttft_tracker=tracker) + inv_mgr = _InvocationManager() + inv_mgr.set_ttft_tracker(tracker) + + set_current_llm_event_id("cb-event-2") + + # Only start event, no in-progress (non-streaming) + from llama_index.core.instrumentation.events.llm import LLMChatStartEvent + + start_event = LLMChatStartEvent( + messages=[], + additional_kwargs={}, + model_dict={}, + span_id="internal-span-2", + ) + handler.handle(start_event) + + # No TTFT for non-streaming + assert inv_mgr.get_ttft_for_event("cb-event-2") is None + assert not inv_mgr.is_streaming_event("cb-event-2") + + set_current_llm_event_id(None) + + def test_no_tracker_graceful(self): + """InvocationManager without tracker should not crash.""" + inv_mgr = _InvocationManager() + # No tracker set + assert inv_mgr.get_ttft_for_event("any") is None + assert not inv_mgr.is_streaming_event("any") + inv_mgr.cleanup_event_tracking("any") # should not crash From a7c9ed2bde2664e2ea0afdebf8f609150f125818 Mon Sep 17 00:00:00 2001 From: shuningc Date: Mon, 20 Apr 2026 14:12:04 -0700 Subject: [PATCH 2/2] Dealing with weaknesses as commented --- .../instrumentation/llamaindex/__init__.py | 11 ++++++----- .../instrumentation/llamaindex/callback_handler.py | 10 ++++++---- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py index 0b3c332a..515dfd9b 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py @@ -116,17 +116,18 @@ def _instrument(self, **kwargs): def _uninstrument(self, **kwargs): unwrap("llama_index.core.callbacks.base", "CallbackManager.__init__") - # Clean up event handler registration + # Remove event handler from dispatcher to avoid duplicate TTFT measurements on re-instrumentation if ( hasattr(self, "_dispatcher") and self._dispatcher and hasattr(self, "_event_handler") ): try: - # Note: LlamaIndex dispatcher may not have remove_event_handler - # In that case, the handler will be garbage collected when - # the instrumentor is destroyed - pass + self._dispatcher.event_handlers = [ + h + for h in self._dispatcher.event_handlers + if h is not self._event_handler + ] except Exception: pass diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py index fdcc7e7e..8ed173d4 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py @@ -475,14 +475,16 @@ def _handle_llm_end( llm_inv.output_tokens = _get_attr(usage, "output_tokens") # Get TTFT from EventHandler via InvocationManager - ttft = self._invocation_manager.get_ttft_for_event(event_id) - if ttft is not None: - llm_inv.attributes["gen_ai.response.time_to_first_chunk"] = ttft + is_streaming = self._invocation_manager.is_streaming_event(event_id) + if is_streaming: llm_inv.request_stream = True + ttft = self._invocation_manager.get_ttft_for_event(event_id) + llm_inv.attributes["gen_ai.response.time_to_first_chunk"] = ttft else: - # Explicitly mark as non-streaming when no TTFT was recorded + # Explicitly mark as non-streaming when no streaming was detected if llm_inv.request_stream is None: llm_inv.request_stream = False + self._invocation_manager.cleanup_event_tracking(event_id) # Clear current event_id set_current_llm_event_id(None)