Skip to content

Commit 95ec5c9

Browse files
committed
add span normalizer for tool input.value and output.value
1 parent 3dfdf20 commit 95ec5c9

3 files changed

Lines changed: 94 additions & 2 deletions

File tree

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
"""OpenTelemetry SpanProcessor for normalizing LlamaIndex tool call attributes.
2+
3+
LlamaIndex wraps tool arguments in {"kwargs": {...}} which differs from other
4+
frameworks like LangChain that use flat {"arg": value} format. This processor
5+
normalizes the format at the span level before exporters or dev terminal read it.
6+
"""
7+
8+
import json
9+
import logging
10+
from typing import Any, Optional
11+
12+
from opentelemetry.context import Context
13+
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
class AttributeNormalizingSpanProcessor(SpanProcessor):
19+
"""Normalizes LlamaIndex tool call attributes to match other frameworks.
20+
21+
Unwraps {"kwargs": {...}} to flat {...} format for consistency with LangChain.
22+
"""
23+
24+
def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
25+
"""Called when span starts - no action needed."""
26+
pass
27+
28+
def on_end(self, span: ReadableSpan) -> None:
29+
"""Normalize tool call attributes before span is consumed by exporters/terminal."""
30+
if not span._attributes:
31+
return
32+
33+
try:
34+
# Get the mutable internal attributes dict
35+
attrs: dict = span._attributes # type: ignore[attr-defined]
36+
37+
# Normalize tool call attributes
38+
for key in ("input.value", "output.value"):
39+
if key in attrs:
40+
original = attrs[key]
41+
normalized = self._normalize_attribute(key, original)
42+
43+
if normalized != original:
44+
attrs[key] = normalized
45+
if logger.isEnabledFor(logging.DEBUG):
46+
logger.debug(
47+
f"Normalized {key} in span '{span.name}': "
48+
f"{original[:50]}... → {normalized[:50]}..."
49+
)
50+
51+
except Exception as e:
52+
# Don't crash span processing if normalization fails
53+
logger.debug(
54+
f"Failed to normalize span '{getattr(span, 'name', 'unknown')}': {e}"
55+
)
56+
57+
def _normalize_attribute(self, key: str, value: Any) -> str:
58+
"""Unwrap LlamaIndex's kwargs wrapper if present."""
59+
if isinstance(value, str):
60+
try:
61+
value = json.loads(value)
62+
except Exception:
63+
pass
64+
if isinstance(value, dict):
65+
if key == "input.value":
66+
if "kwargs" in value:
67+
value = json.dumps(value["kwargs"])
68+
elif key == "output.value":
69+
value = json.dumps(
70+
{
71+
"content": value.get("raw_output"),
72+
"status": not value.get("is_error", False),
73+
"tool_call_id": value.get("tool_call_id"),
74+
}
75+
)
76+
return str(value)
77+
78+
def shutdown(self) -> None:
79+
"""Called on processor shutdown - no cleanup needed."""
80+
pass
81+
82+
def force_flush(self, timeout_millis: int = 30000) -> bool:
83+
"""Force flush - always succeeds (nothing to flush)."""
84+
return True

src/uipath_llamaindex/_cli/cli_dev.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
from ._runtime._context import UiPathLlamaIndexRuntimeContext
1414
from ._runtime._runtime import UiPathLlamaIndexRuntime
15+
from ._tracing._attribute_normalizer import AttributeNormalizingSpanProcessor
1516

1617
console = ConsoleLogger()
1718

@@ -24,6 +25,9 @@ def llamaindex_dev_middleware(interface: Optional[str]) -> MiddlewareResult:
2425
runtime_factory = UiPathRuntimeFactory(
2526
UiPathLlamaIndexRuntime, UiPathLlamaIndexRuntimeContext
2627
)
28+
runtime_factory.tracer_provider.add_span_processor(
29+
AttributeNormalizingSpanProcessor()
30+
)
2731
runtime_factory.add_instrumentor(LlamaIndexInstrumentor, get_current_span)
2832
app = UiPathDevTerminal(runtime_factory)
2933
asyncio.run(app.run_async())

src/uipath_llamaindex/_cli/cli_run.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
import logging
33
from os import environ as env
4-
from typing import Optional
4+
from typing import Any, Optional
55

66
from openinference.instrumentation.llama_index import (
77
LlamaIndexInstrumentor,
@@ -13,14 +13,15 @@
1313
from ._runtime._context import UiPathLlamaIndexRuntimeContext
1414
from ._runtime._exception import UiPathLlamaIndexRuntimeError
1515
from ._runtime._runtime import UiPathLlamaIndexRuntime
16+
from ._tracing._attribute_normalizer import AttributeNormalizingSpanProcessor
1617
from ._tracing._oteladapter import LlamaIndexExporter
1718
from ._utils._config import LlamaIndexConfig
1819

1920
logger = logging.getLogger(__name__)
2021

2122

2223
def llamaindex_run_middleware(
23-
entrypoint: Optional[str], input: Optional[str], resume: bool, **kwargs
24+
entrypoint: Optional[str], input: Optional[str], resume: bool, **kwargs: Any
2425
) -> MiddlewareResult:
2526
"""Middleware to handle LlamaIndex agent execution"""
2627

@@ -69,6 +70,9 @@ async def execute():
6970

7071
if context.job_id:
7172
runtime_factory.add_span_exporter(LlamaIndexExporter())
73+
runtime_factory.tracer_provider.add_span_processor(
74+
AttributeNormalizingSpanProcessor()
75+
)
7276

7377
runtime_factory.add_instrumentor(LlamaIndexInstrumentor, get_current_span)
7478

0 commit comments

Comments
 (0)