Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- LLM span attributes for feature parity with LangChain instrumentation:
- `gen_ai.response.model` extracted from raw LLM response with fallback chain
- `gen_ai.response.finish_reasons` from response choices
- `gen_ai.request.max_tokens` from LLM metadata/Settings
- `gen_ai.request.stream` flag (true when streaming detected)
- `gen_ai.response.time_to_first_chunk` (TTFT) for streaming calls
- `gen_ai.tool.definitions` via agent context propagation (gated by `OTEL_INSTRUMENTATION_GENAI_CAPTURE_TOOL_DEFINITIONS`)
- TTFT tracking via LlamaIndex event system (`event_handler.py`):
- `TTFTTracker` class for recording start times and calculating TTFT
- `LlamaindexEventHandler` listening to `LLMChatInProgressEvent` for per-chunk timing
- ContextVar correlation bridging callback handler and event handler
- `gen_ai.client.operation.time_to_first_chunk` histogram metric emission for streaming LLM calls
- Agent tool registration in `wrap_agent_run()` for tool definitions propagation across async boundaries
- `find_agent_with_tools()` fallback in invocation manager for ContextVar propagation

## [0.1.1] - 2026-01-30

### Fixed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# LlamaIndex Example Environment Variables
# Copy this file to .env and fill in your values

# =============================================================================
# Option 1: Circuit (Internal LLM Gateway) - OAuth2 mode
# =============================================================================
LLM_TOKEN_URL=https://your-token-endpoint/oauth2/token
LLM_CLIENT_ID=your-client-id
LLM_CLIENT_SECRET=your-client-secret
LLM_BASE_URL=https://your-circuit-base-url
LLM_APP_KEY=your-app-key


# =============================================================================
# Common Settings
# =============================================================================
LLM_MODEL=gpt-5-nano

# =============================================================================
# Observability
# =============================================================================
OTEL_SERVICE_NAME=llamaindex-example
OTEL_RESOURCE_ATTRIBUTES=deployment.environment=demo
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
OTEL_EXPORTER_OTLP_PROTOCOL=grpc
OTEL_LOGS_EXPORTER=otlp

# Message content capture
OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT=true
OTEL_INSTRUMENTATION_GENAI_CAPTURE_TOOL_DEFINITIONS=true
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
from opentelemetry.instrumentation.llamaindex.callback_handler import (
LlamaindexCallbackHandler,
)
from opentelemetry.instrumentation.llamaindex.invocation_manager import (
_InvocationManager,
)
from opentelemetry.instrumentation.llamaindex.event_handler import (
LlamaindexEventHandler,
TTFTTracker,
)
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.instrumentation.llamaindex.workflow_instrumentation import (
wrap_agent_run,
Expand Down Expand Up @@ -40,10 +47,29 @@ def _instrument(self, **kwargs):
logger_provider=logger_provider,
)

# Create shared TTFT tracker and invocation manager
ttft_tracker = TTFTTracker()
invocation_manager = _InvocationManager()
invocation_manager.set_ttft_tracker(ttft_tracker)

llamaindexCallBackHandler = LlamaindexCallbackHandler(
telemetry_handler=self._telemetry_handler
telemetry_handler=self._telemetry_handler,
invocation_manager=invocation_manager,
)

# Create and register event handler for TTFT tracking
event_handler = LlamaindexEventHandler(ttft_tracker=ttft_tracker)
self._event_handler = event_handler
try:
from llama_index.core.instrumentation import get_dispatcher

dispatcher = get_dispatcher()
dispatcher.add_event_handler(event_handler)
self._dispatcher = dispatcher
except Exception:
# Event system might not be available in older versions
self._dispatcher = None

wrap_function_wrapper(
module="llama_index.core.callbacks.base",
name="CallbackManager.__init__",
Expand Down Expand Up @@ -90,6 +116,19 @@ def _instrument(self, **kwargs):

def _uninstrument(self, **kwargs):
unwrap("llama_index.core.callbacks.base", "CallbackManager.__init__")
# Clean up event handler registration
if (
hasattr(self, "_dispatcher")
and self._dispatcher
and hasattr(self, "_event_handler")
):
try:
# Note: LlamaIndex dispatcher may not have remove_event_handler
# In that case, the handler will be garbage collected when
# the instrumentor is destroyed
pass
except Exception:
pass


class _BaseCallbackManagerInitWrapper:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from typing import Any, Dict, List, Optional

from llama_index.core.callbacks.base_handler import BaseCallbackHandler
Expand All @@ -16,9 +17,13 @@
Workflow,
ToolCall,
)
from opentelemetry.util.genai.utils import (
should_capture_tool_definitions as _should_capture_tool_definitions,
)

from .invocation_manager import _InvocationManager
from .vendor_detection import detect_vendor_from_class
from .event_handler import set_current_llm_event_id


def _safe_str(value: Any) -> str:
Expand Down Expand Up @@ -121,14 +126,15 @@ class LlamaindexCallbackHandler(BaseCallbackHandler):
def __init__(
self,
telemetry_handler: Optional[TelemetryHandler] = None,
invocation_manager: Optional[_InvocationManager] = None,
) -> None:
super().__init__(
event_starts_to_ignore=[],
event_ends_to_ignore=[],
)
self._handler = telemetry_handler
self._auto_workflow_ids: List[str] = [] # Track auto-created workflows (stack)
self._invocation_manager = _InvocationManager()
self._invocation_manager = invocation_manager or _InvocationManager()

def start_trace(self, trace_id: Optional[str] = None) -> None:
"""Start a trace - required by BaseCallbackHandler."""
Expand Down Expand Up @@ -308,15 +314,91 @@ def _handle_llm_start(
if not self._handler or not payload:
return

# Set current event_id for TTFT correlation with EventHandler
set_current_llm_event_id(event_id)

# Extract model information and parameters from payload
serialized = payload.get("serialized", {})
model_name = (
serialized.get("model") or serialized.get("model_name") or "unknown"
)

# Detect provider from class name
class_name = serialized.get("class_name", "")
provider = detect_vendor_from_class(class_name)

# Extract tool definitions if available (requires capture enabled)
tool_definitions = []
if _should_capture_tool_definitions():
# Check multiple locations where LlamaIndex might store tools
tools = (
serialized.get("tools")
or serialized.get("functions")
or payload.get("tools", [])
or payload.get("functions", [])
or serialized.get("additional_kwargs", {}).get("tools", [])
or serialized.get("additional_kwargs", {}).get("functions", [])
)

# Fallback: inherit tools from parent agent context (LlamaIndex stores
# tools on Agent, not in LLM callback payload like LangChain)
if not tools:
context_agent = self._invocation_manager.get_current_agent_invocation()
if context_agent and hasattr(context_agent, "_agent_tools"):
tools = getattr(context_agent, "_agent_tools", [])

# Second fallback: search for any agent with tools (ContextVar may not propagate)
if not tools:
agent_with_tools = self._invocation_manager.find_agent_with_tools()
if agent_with_tools:
tools = getattr(agent_with_tools, "_agent_tools", [])

if tools:
for tool in tools:
# LlamaIndex FunctionTool stores metadata in tool.metadata
metadata = getattr(tool, "metadata", None)
if metadata:
tool_name = getattr(metadata, "name", None)
tool_desc = getattr(metadata, "description", None)
else:
# Fallback for dict-like or other tool formats
tool_name = _get_attr(tool, "name") or _get_attr(
tool, "function_name"
)
tool_desc = _get_attr(tool, "description")

if tool_name:
tool_def = {"name": _safe_str(tool_name)}
if tool_desc:
tool_def["description"] = _safe_str(tool_desc)
tool_definitions.append(tool_def)

# Extract additional parameters if available
temperature = serialized.get("temperature")
max_tokens = serialized.get("max_tokens")
# Try multiple locations for max_tokens (CustomLLM may not serialize this)
max_tokens = (
serialized.get("max_tokens")
or serialized.get("num_output") # LlamaIndex metadata field
or payload.get("additional_kwargs", {}).get("max_tokens")
)
# Also check metadata.num_output from serialized (LlamaIndex stores it there)
if not max_tokens:
metadata = serialized.get("metadata", {})
if isinstance(metadata, dict):
max_tokens = metadata.get("num_output")
# Fallback: try to get from Settings.llm directly
if not max_tokens:
try:
from llama_index.core import Settings

llm = Settings.llm
if llm:
# Check LLM object's max_tokens or metadata.num_output
max_tokens = getattr(llm, "max_tokens", None)
if not max_tokens and hasattr(llm, "metadata"):
max_tokens = getattr(llm.metadata, "num_output", None)
except Exception:
pass
top_p = serialized.get("top_p")
frequency_penalty = serialized.get("frequency_penalty")
presence_penalty = serialized.get("presence_penalty")
Expand All @@ -341,6 +423,14 @@ def _handle_llm_start(
)
llm_inv.framework = "llamaindex"

# Set provider if detected
if provider:
llm_inv.provider = provider

# Set tool definitions if present (must be JSON string)
if tool_definitions:
llm_inv.tool_definitions = json.dumps(tool_definitions)

# Prefer explicit parent_id mapping; if it points to workflow, use active
# agent span only when that agent is a child of the resolved parent span.
parent_span = self._get_parent_span(parent_id, allow_fallback=False)
Expand Down Expand Up @@ -469,6 +559,60 @@ def _handle_llm_end(
if llm_inv.output_tokens is None:
llm_inv.output_tokens = _get_attr(usage, "output_tokens")

# Extract response model from raw response (check multiple locations)
response_model = None
if raw_response:
# Handle both dict and object raw_response
if isinstance(raw_response, dict):
response_model = raw_response.get("model") or raw_response.get(
"model_name"
)
else:
response_model = _get_attr(raw_response, "model") or _get_attr(
raw_response, "model_name"
)

# Fallback: check response message's additional_kwargs (LlamaIndex specific)
if not response_model:
message = _get_attr(response, "message")
if message:
additional_kwargs = _get_attr(message, "additional_kwargs")
if additional_kwargs:
response_model = _get_attr(additional_kwargs, "model")

if response_model:
llm_inv.response_model_name = _safe_str(response_model)

# Extract finish reasons from choices (separate from response_model)
if raw_response:
choices = _get_attr(raw_response, "choices", [])
if choices:
finish_reasons = []
for choice in choices:
finish_reason = _get_attr(choice, "finish_reason")
if finish_reason:
finish_reasons.append(_safe_str(finish_reason))
if finish_reasons:
llm_inv.response_finish_reasons = finish_reasons

# Fallback: use request model if response model not found
# This works even when response is None (e.g., LLM call errored)
if not llm_inv.response_model_name and llm_inv.request_model:
llm_inv.response_model_name = _safe_str(llm_inv.request_model)

# Get TTFT from EventHandler via InvocationManager
ttft = self._invocation_manager.get_ttft_for_event(event_id)
if ttft is not None:
llm_inv.attributes["gen_ai.response.time_to_first_chunk"] = ttft
llm_inv.request_stream = True
else:
# Explicitly mark as non-streaming when no TTFT was recorded
if llm_inv.request_stream is None:
llm_inv.request_stream = False

# Clear current event_id
set_current_llm_event_id(None)

# Stop the LLM invocation
llm_inv = self._handler.stop_llm(llm_inv)

Expand Down Expand Up @@ -603,6 +747,7 @@ def _handle_agent_step_start(
agent_type = None
agent_description = None
model_name = None
agent_tools: list[Any] = [] # Capture tools for propagation to child LLM calls

if step and hasattr(step, "step_state"):
# Try to get agent from step state
Expand All @@ -618,6 +763,9 @@ def _handle_agent_step_start(
model_name = getattr(llm, "model", None) or getattr(
llm, "model_name", None
)
# Capture tools from agent for propagation to child LLM calls
if hasattr(agent, "tools"):
agent_tools = getattr(agent, "tools", [])

# Create AgentInvocation for the agent execution
agent_invocation = AgentInvocation(
Expand All @@ -644,6 +792,10 @@ def _handle_agent_step_start(
if workflow_name:
agent_invocation.attributes["gen_ai.workflow.name"] = workflow_name

# Store tools for propagation to child LLM calls (internal attribute)
if agent_tools:
agent_invocation._agent_tools = agent_tools # type: ignore[attr-defined]

# Get parent span before starting the invocation
parent_span = self._get_parent_span(parent_id)
if parent_span:
Expand Down
Loading
Loading