Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- LLM span attributes for async agent flows:
- `gen_ai.response.model` extracted from raw LLM response with fallback to request model
- `gen_ai.response.finish_reasons` from response choices
- `gen_ai.request.max_tokens` with fallback chain (serialized -> metadata -> Settings.llm)
- `gen_ai.tool.definitions` via agent context propagation (gated by `OTEL_INSTRUMENTATION_GENAI_CAPTURE_TOOL_DEFINITIONS`)
- Provider detection from LLM class name
- Agent tool registration in `wrap_agent_run()` for tool definitions propagation across async boundaries
- `find_agent_with_tools()` fallback in invocation manager when ContextVar does not propagate across asyncio tasks

### Fixed
- Corrected retrieval span `gen_ai.operation.name` from `"retrieve"` to `"retrieval"` per OpenTelemetry semantic conventions. Removed explicit override in callback handler; now uses the `RetrievalInvocation` dataclass default from `util-genai`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from opentelemetry.instrumentation.llamaindex.callback_handler import (
LlamaindexCallbackHandler,
)
from opentelemetry.instrumentation.llamaindex.invocation_manager import (
_InvocationManager,
)
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.instrumentation.llamaindex.workflow_instrumentation import (
wrap_agent_run,
Expand Down Expand Up @@ -40,8 +43,12 @@ def _instrument(self, **kwargs):
logger_provider=logger_provider,
)

# Create shared invocation manager for callback handler and workflow instrumentation
invocation_manager = _InvocationManager()

llamaindexCallBackHandler = LlamaindexCallbackHandler(
telemetry_handler=self._telemetry_handler
telemetry_handler=self._telemetry_handler,
invocation_manager=invocation_manager,
)

wrap_function_wrapper(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from typing import Any, Dict, List, Optional

from llama_index.core.callbacks.base_handler import BaseCallbackHandler
Expand All @@ -16,6 +17,9 @@
Workflow,
ToolCall,
)
from opentelemetry.util.genai.utils import (
should_capture_tool_definitions as _should_capture_tool_definitions,
)

from .invocation_manager import _InvocationManager
from .vendor_detection import detect_vendor_from_class
Expand Down Expand Up @@ -121,14 +125,15 @@ class LlamaindexCallbackHandler(BaseCallbackHandler):
def __init__(
self,
telemetry_handler: Optional[TelemetryHandler] = None,
invocation_manager: Optional[_InvocationManager] = None,
) -> None:
super().__init__(
event_starts_to_ignore=[],
event_ends_to_ignore=[],
)
self._handler = telemetry_handler
self._auto_workflow_ids: List[str] = [] # Track auto-created workflows (stack)
self._invocation_manager = _InvocationManager()
self._invocation_manager = invocation_manager or _InvocationManager()

def start_trace(self, trace_id: Optional[str] = None) -> None:
"""Start a trace - required by BaseCallbackHandler."""
Expand Down Expand Up @@ -314,9 +319,86 @@ def _handle_llm_start(
serialized.get("model") or serialized.get("model_name") or "unknown"
)

# Detect provider from class name
class_name = serialized.get("class_name", "")
provider = detect_vendor_from_class(class_name)

# Extract tool definitions if available (requires capture enabled)
tool_definitions = []
if _should_capture_tool_definitions():
# Check multiple locations where LlamaIndex might store tools
tools = (
serialized.get("tools")
or serialized.get("functions")
or payload.get("tools", [])
or payload.get("functions", [])
or serialized.get("additional_kwargs", {}).get("tools", [])
or serialized.get("additional_kwargs", {}).get("functions", [])
)

# Fallback: inherit tools from parent agent context (LlamaIndex stores
# tools on Agent, not in LLM callback payload like LangChain)
if not tools:
context_agent = self._invocation_manager.get_current_agent_invocation()
if context_agent and hasattr(context_agent, "_agent_tools"):
tools = getattr(context_agent, "_agent_tools", [])

# Second fallback: walk parent chain to find the nearest agent with tools
# (ContextVar may not propagate in all execution contexts)
if not tools:
nearest_agent = self._find_nearest_agent(parent_id)
if nearest_agent and hasattr(nearest_agent, "_agent_tools"):
tools = getattr(nearest_agent, "_agent_tools", [])

if tools:
for tool in tools:
# LlamaIndex FunctionTool stores metadata in tool.metadata
metadata = getattr(tool, "metadata", None)
if metadata:
tool_name = getattr(metadata, "name", None)
tool_desc = getattr(metadata, "description", None)
else:
# Fallback for dict-like or other tool formats
tool_name = _get_attr(tool, "name") or _get_attr(
tool, "function_name"
)
tool_desc = _get_attr(tool, "description")

if tool_name:
tool_def = {
"type": "function",
"name": _safe_str(tool_name),
}
if tool_desc:
Comment thread
shuningc marked this conversation as resolved.
tool_def["description"] = _safe_str(tool_desc)
tool_definitions.append(tool_def)

# Extract additional parameters if available
temperature = serialized.get("temperature")
max_tokens = serialized.get("max_tokens")
# Try multiple locations for max_tokens (CustomLLM may not serialize this)
max_tokens = (
serialized.get("max_tokens")
or serialized.get("num_output") # LlamaIndex metadata field
or payload.get("additional_kwargs", {}).get("max_tokens")
)
# Also check metadata.num_output from serialized (LlamaIndex stores it there)
if not max_tokens:
metadata = serialized.get("metadata", {})
if isinstance(metadata, dict):
max_tokens = metadata.get("num_output")
# Fallback: try to get from Settings.llm directly
if not max_tokens:
try:
from llama_index.core import Settings

llm = Settings.llm
if llm:
# Check LLM object's max_tokens or metadata.num_output
max_tokens = getattr(llm, "max_tokens", None)
if not max_tokens and hasattr(llm, "metadata"):
max_tokens = getattr(llm.metadata, "num_output", None)
except Exception:
pass
top_p = serialized.get("top_p")
frequency_penalty = serialized.get("frequency_penalty")
presence_penalty = serialized.get("presence_penalty")
Expand All @@ -341,6 +423,14 @@ def _handle_llm_start(
)
llm_inv.framework = "llamaindex"

# Set provider if detected
if provider:
llm_inv.provider = provider

# Set tool definitions if present (must be JSON string)
if tool_definitions:
llm_inv.tool_definitions = json.dumps(tool_definitions)

# Prefer explicit parent_id mapping; if it points to workflow, use active
# agent span only when that agent is a child of the resolved parent span.
parent_span = self._get_parent_span(parent_id, allow_fallback=False)
Expand Down Expand Up @@ -469,6 +559,47 @@ def _handle_llm_end(
if llm_inv.output_tokens is None:
llm_inv.output_tokens = _get_attr(usage, "output_tokens")

# Extract response model from raw response (check multiple locations)
response_model = None
if raw_response:
# Handle both dict and object raw_response
if isinstance(raw_response, dict):
response_model = raw_response.get("model") or raw_response.get(
"model_name"
)
else:
response_model = _get_attr(raw_response, "model") or _get_attr(
raw_response, "model_name"
)

# Fallback: check response message's additional_kwargs (LlamaIndex specific)
if not response_model:
message = _get_attr(response, "message")
if message:
additional_kwargs = _get_attr(message, "additional_kwargs")
if additional_kwargs:
response_model = _get_attr(additional_kwargs, "model")

if response_model:
llm_inv.response_model_name = _safe_str(response_model)

# Extract finish reasons from choices (separate from response_model)
if raw_response:
choices = _get_attr(raw_response, "choices", [])
if choices:
finish_reasons = []
for choice in choices:
finish_reason = _get_attr(choice, "finish_reason")
if finish_reason:
finish_reasons.append(_safe_str(finish_reason))
if finish_reasons:
llm_inv.response_finish_reasons = finish_reasons

# Fallback: use request model if response model not found
# This works even when response is None (e.g., LLM call errored)
if not llm_inv.response_model_name and llm_inv.request_model:
llm_inv.response_model_name = _safe_str(llm_inv.request_model)

# Stop the LLM invocation
llm_inv = self._handler.stop_llm(llm_inv)

Expand Down Expand Up @@ -603,6 +734,7 @@ def _handle_agent_step_start(
agent_type = None
agent_description = None
model_name = None
agent_tools: list[Any] = [] # Capture tools for propagation to child LLM calls

if step and hasattr(step, "step_state"):
# Try to get agent from step state
Expand All @@ -618,6 +750,9 @@ def _handle_agent_step_start(
model_name = getattr(llm, "model", None) or getattr(
llm, "model_name", None
)
# Capture tools from agent for propagation to child LLM calls
if hasattr(agent, "tools"):
agent_tools = getattr(agent, "tools", [])

# Create AgentInvocation for the agent execution
agent_invocation = AgentInvocation(
Expand All @@ -644,6 +779,10 @@ def _handle_agent_step_start(
if workflow_name:
agent_invocation.attributes["gen_ai.workflow.name"] = workflow_name

# Store tools for propagation to child LLM calls (internal attribute)
if agent_tools:
agent_invocation._agent_tools = agent_tools # type: ignore[attr-defined]

# Get parent span before starting the invocation
parent_span = self._get_parent_span(parent_id)
if parent_span:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ async def instrument_workflow_handler(
workflow_name = current_agent_attrs.get("gen_ai.workflow.name")
if workflow_name:
self._workflow_name = str(workflow_name)

# Agent is already registered in wrap_agent_run(), just track key for nested agents
context_key_token = None
if self._invocation_manager:
context_key_token = self._invocation_manager.set_current_agent_key(None)
Expand Down Expand Up @@ -342,6 +344,11 @@ def wrap_agent_run(wrapped, instance, args, kwargs):
if workflow_name:
current_agent.attributes["gen_ai.workflow.name"] = str(workflow_name)

# Capture tools from agent instance for propagation to child LLM spans
# This enables gen_ai.tool.definitions on LLM spans under this agent
if hasattr(instance, "tools"):
current_agent._agent_tools = getattr(instance, "tools", []) # type: ignore[attr-defined]

is_orchestrator_workflow = bool(
hasattr(instance, "agents")
and hasattr(instance, "root_agent")
Expand All @@ -354,6 +361,18 @@ def wrap_agent_run(wrapped, instance, args, kwargs):
if not is_orchestrator_workflow:
telemetry_handler.start_agent(current_agent)

# Register agent with invocation_manager AFTER start_agent (which sets span_id)
# and BEFORE wrapped() is called, so LLM callbacks can access _agent_tools
agent_key = None
if (
invocation_manager
and hasattr(current_agent, "span_id")
and current_agent.span_id
):
agent_key = f"{current_agent.span_id:016x}::{current_agent.agent_name}"
invocation_manager.register_agent_invocation(agent_key, current_agent)
invocation_manager.set_current_agent_key(agent_key)

# Call the original run() method to get the workflow handler
handler = wrapped(*args, **kwargs)

Expand Down
Loading
Loading