Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- Streaming TTFT (Time To First Token) support for LLM spans:
- `gen_ai.response.time_to_first_chunk` attribute measuring latency to first streaming token
- `gen_ai.request.stream` flag (true when streaming detected, false otherwise)
- TTFT tracking via LlamaIndex event system (`event_handler.py`):
- `TTFTTracker` class for recording start times and calculating TTFT
- `LlamaindexEventHandler` listening to `LLMChatStartEvent`/`LLMChatInProgressEvent` for per-chunk timing
- ContextVar correlation bridging callback handler (event_id) with event handler (span_id)

### Fixed
- Corrected retrieval span `gen_ai.operation.name` from `"retrieve"` to `"retrieval"` per OpenTelemetry semantic conventions. Removed explicit override in callback handler; now uses the `RetrievalInvocation` dataclass default from `util-genai`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
from opentelemetry.instrumentation.llamaindex.callback_handler import (
LlamaindexCallbackHandler,
)
from opentelemetry.instrumentation.llamaindex.invocation_manager import (
_InvocationManager,
)
from opentelemetry.instrumentation.llamaindex.event_handler import (
LlamaindexEventHandler,
TTFTTracker,
)
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.instrumentation.llamaindex.workflow_instrumentation import (
wrap_agent_run,
Expand Down Expand Up @@ -40,10 +47,29 @@ def _instrument(self, **kwargs):
logger_provider=logger_provider,
)

# Create shared TTFT tracker and invocation manager
ttft_tracker = TTFTTracker()
invocation_manager = _InvocationManager()
invocation_manager.set_ttft_tracker(ttft_tracker)

llamaindexCallBackHandler = LlamaindexCallbackHandler(
telemetry_handler=self._telemetry_handler
telemetry_handler=self._telemetry_handler,
invocation_manager=invocation_manager,
)

# Create and register event handler for TTFT tracking
event_handler = LlamaindexEventHandler(ttft_tracker=ttft_tracker)
self._event_handler = event_handler
try:
from llama_index.core.instrumentation import get_dispatcher

dispatcher = get_dispatcher()
dispatcher.add_event_handler(event_handler)
self._dispatcher = dispatcher
except Exception:
# Event system might not be available in older versions
self._dispatcher = None

wrap_function_wrapper(
module="llama_index.core.callbacks.base",
name="CallbackManager.__init__",
Expand Down Expand Up @@ -90,6 +116,20 @@ def _instrument(self, **kwargs):

def _uninstrument(self, **kwargs):
unwrap("llama_index.core.callbacks.base", "CallbackManager.__init__")
# Remove event handler from dispatcher to avoid duplicate TTFT measurements on re-instrumentation
if (
hasattr(self, "_dispatcher")
and self._dispatcher
and hasattr(self, "_event_handler")
):
try:
self._dispatcher.event_handlers = [
h
for h in self._dispatcher.event_handlers
if h is not self._event_handler
]
except Exception:
pass
Comment thread
shuningc marked this conversation as resolved.


class _BaseCallbackManagerInitWrapper:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from .invocation_manager import _InvocationManager
from .vendor_detection import detect_vendor_from_class
from .event_handler import set_current_llm_event_id


def _safe_str(value: Any) -> str:
Expand Down Expand Up @@ -121,14 +122,15 @@ class LlamaindexCallbackHandler(BaseCallbackHandler):
def __init__(
self,
telemetry_handler: Optional[TelemetryHandler] = None,
invocation_manager: Optional[_InvocationManager] = None,
) -> None:
super().__init__(
event_starts_to_ignore=[],
event_ends_to_ignore=[],
)
self._handler = telemetry_handler
self._auto_workflow_ids: List[str] = [] # Track auto-created workflows (stack)
self._invocation_manager = _InvocationManager()
self._invocation_manager = invocation_manager or _InvocationManager()

def start_trace(self, trace_id: Optional[str] = None) -> None:
"""Start a trace - required by BaseCallbackHandler."""
Expand Down Expand Up @@ -308,6 +310,9 @@ def _handle_llm_start(
if not self._handler or not payload:
return

# Set current event_id for TTFT correlation with EventHandler
set_current_llm_event_id(event_id)

# Extract model information and parameters from payload
serialized = payload.get("serialized", {})
model_name = (
Expand Down Expand Up @@ -469,6 +474,21 @@ def _handle_llm_end(
if llm_inv.output_tokens is None:
llm_inv.output_tokens = _get_attr(usage, "output_tokens")

# Get TTFT from EventHandler via InvocationManager
is_streaming = self._invocation_manager.is_streaming_event(event_id)
if is_streaming:
llm_inv.request_stream = True
ttft = self._invocation_manager.get_ttft_for_event(event_id)
llm_inv.attributes["gen_ai.response.time_to_first_chunk"] = ttft
else:
# Explicitly mark as non-streaming when no streaming was detected
if llm_inv.request_stream is None:
llm_inv.request_stream = False
self._invocation_manager.cleanup_event_tracking(event_id)

# Clear current event_id
set_current_llm_event_id(None)

# Stop the LLM invocation
llm_inv = self._handler.stop_llm(llm_inv)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
"""
TTFT (Time To First Token) tracking for LlamaIndex using the instrumentation event system.

This module provides:
- TTFTTracker: Records start times and calculates TTFT when first streaming token arrives
- LlamaindexEventHandler: Listens to LLM streaming events and populates TTFTTracker
- ContextVar correlation: Bridges callback handler (event_id) with event handler (span_id)
"""

import time
from contextvars import ContextVar
from typing import Any, Dict, Optional

from llama_index.core.instrumentation.events.llm import (
LLMChatInProgressEvent,
LLMChatStartEvent,
)

try:
from llama_index.core.instrumentation.event_handlers import BaseEventHandler
except ImportError:
# For versions of llama_index that don't have BaseEventHandler
BaseEventHandler = object # type: ignore


# ContextVar to store the current LLM event_id from callback handler
# This allows EventHandler to correlate its span_id with callback's event_id
_current_llm_event_id: ContextVar[Optional[str]] = ContextVar(
"_current_llm_event_id", default=None
)


def set_current_llm_event_id(event_id: Optional[str]) -> None:
"""Set the current LLM event_id from callback handler."""
_current_llm_event_id.set(event_id)


def get_current_llm_event_id() -> Optional[str]:
"""Get the current LLM event_id from callback handler."""
return _current_llm_event_id.get()


class TTFTTracker:
"""Track Time To First Token for streaming LLM responses.

This class:
- Records when an LLM call starts (by span_id from instrumentation events)
- Records when the first streaming token arrives
- Calculates TTFT = first_token_time - start_time
- Maps callback event_id to instrumentation span_id for cross-correlation
"""

def __init__(self) -> None:
# span_id -> start_time (when LLM call started)
self._start_times: Dict[str, float] = {}
# span_id -> ttft (calculated time to first token)
self._ttft_values: Dict[str, float] = {}
# span_id -> bool (whether first token has been received)
self._first_token_received: Dict[str, bool] = {}
# event_id -> span_id (map callback event_id to instrumentation span_id)
self._event_span_map: Dict[str, str] = {}

def record_start(self, span_id: str) -> None:
"""Record the start time for an LLM call."""
self._start_times[span_id] = time.perf_counter()
self._first_token_received[span_id] = False

def record_first_token(self, span_id: str) -> Optional[float]:
"""Record when the first token arrives and calculate TTFT.

Returns TTFT in seconds if this is the first token, None otherwise.
"""
if span_id not in self._start_times:
return None

if self._first_token_received.get(span_id, False):
# Already received first token
return None

self._first_token_received[span_id] = True
ttft = time.perf_counter() - self._start_times[span_id]
self._ttft_values[span_id] = ttft
return ttft

def get_ttft(self, span_id: str) -> Optional[float]:
"""Get the TTFT for a span_id, if available."""
return self._ttft_values.get(span_id)

def is_streaming(self, span_id: str) -> bool:
"""Check if streaming has started for this span."""
return self._first_token_received.get(span_id, False)

def associate_event_span(self, event_id: str, span_id: str) -> None:
"""Associate a callback event_id with an instrumentation span_id."""
self._event_span_map[event_id] = span_id

def get_span_for_event(self, event_id: str) -> Optional[str]:
"""Get the span_id associated with an event_id."""
return self._event_span_map.get(event_id)

def get_ttft_by_event(self, event_id: str) -> Optional[float]:
"""Get TTFT using callback event_id."""
span_id = self._event_span_map.get(event_id)
if span_id:
return self.get_ttft(span_id)
return None

def is_streaming_by_event(self, event_id: str) -> bool:
"""Check if streaming has started using callback event_id."""
span_id = self._event_span_map.get(event_id)
if span_id:
return self.is_streaming(span_id)
return False

def cleanup(self, span_id: str) -> None:
"""Clean up tracking data for a completed span."""
self._start_times.pop(span_id, None)
self._ttft_values.pop(span_id, None)
self._first_token_received.pop(span_id, None)
# Also clean up event mapping
event_ids_to_remove = [
eid for eid, sid in self._event_span_map.items() if sid == span_id
]
for event_id in event_ids_to_remove:
self._event_span_map.pop(event_id, None)

def cleanup_by_event(self, event_id: str) -> None:
"""Clean up tracking data using callback event_id."""
span_id = self._event_span_map.pop(event_id, None)
if span_id:
self.cleanup(span_id)


class LlamaindexEventHandler(BaseEventHandler):
"""Event handler that captures LLM streaming events for TTFT calculation.

This handler:
1. Listens for LLMChatStartEvent to record start time
2. Listens for LLMChatInProgressEvent (first token) to calculate TTFT
3. Associates callback event_id with instrumentation span_id via ContextVar
"""

def __init__(self, ttft_tracker: TTFTTracker) -> None:
self._tracker = ttft_tracker

@classmethod
def class_name(cls) -> str:
"""Return the class name for LlamaIndex dispatcher."""
return "LlamaindexTTFTEventHandler"

def handle(self, event: Any, **kwargs: Any) -> None:
"""Handle LlamaIndex instrumentation events."""
if isinstance(event, LLMChatStartEvent):
self._handle_start(event)
elif isinstance(event, LLMChatInProgressEvent):
self._handle_progress(event)

def _handle_start(self, event: LLMChatStartEvent) -> None:
"""Handle LLM chat start event - record start time."""
span_id = str(event.span_id) if hasattr(event, "span_id") else None
if not span_id:
return

# Record start time
self._tracker.record_start(span_id)

# Associate with callback event_id if available
event_id = get_current_llm_event_id()
if event_id:
self._tracker.associate_event_span(event_id, span_id)

def _handle_progress(self, event: LLMChatInProgressEvent) -> None:
"""Handle LLM chat in-progress event - record first token."""
span_id = str(event.span_id) if hasattr(event, "span_id") else None
if not span_id:
return

# Record first token (TTFTTracker handles deduplication)
self._tracker.record_first_token(span_id)
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from contextvars import ContextVar, Token
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

from opentelemetry.util.genai.types import (
AgentInvocation,
Expand All @@ -25,6 +25,9 @@
ToolCall,
)

if TYPE_CHECKING:
from .event_handler import TTFTTracker

__all__ = ["_InvocationManager"]


Expand Down Expand Up @@ -126,3 +129,29 @@ def get_current_agent_invocation(self) -> Optional[Any]:
if not key:
return None
return self._agent_invocation_by_key.get(key)

# ==================== TTFT Tracking Methods ====================

def set_ttft_tracker(self, tracker: "TTFTTracker") -> None:
"""Set the TTFTTracker instance for TTFT correlation."""
self._ttft_tracker = tracker

def get_ttft_for_event(self, event_id: str) -> Optional[float]:
"""Get TTFT for a callback event_id, if available."""
tracker = getattr(self, "_ttft_tracker", None)
if tracker:
return tracker.get_ttft_by_event(event_id)
return None

def is_streaming_event(self, event_id: str) -> bool:
"""Check if streaming has started for a callback event_id."""
tracker = getattr(self, "_ttft_tracker", None)
if tracker:
return tracker.is_streaming_by_event(event_id)
return False

def cleanup_event_tracking(self, event_id: str) -> None:
"""Clean up TTFT tracking data for an event_id."""
tracker = getattr(self, "_ttft_tracker", None)
if tracker:
Comment thread
shuningc marked this conversation as resolved.
tracker.cleanup_by_event(event_id)
Loading
Loading