diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/conftest.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/conftest.py index 0df8af34..809bec6a 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/conftest.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/conftest.py @@ -4,16 +4,28 @@ import pytest +from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) from opentelemetry.util.genai import handler as genai_handler +_session_span_exporter = InMemorySpanExporter() +_session_metric_reader = InMemoryMetricReader() + @pytest.fixture(autouse=True) -def disable_deepeval(): - """Disable deepeval evaluators to prevent real API calls in CI.""" +def environment(): + """Reset env and handler singleton for each test.""" original_evals = os.environ.get("OTEL_INSTRUMENTATION_GENAI_EVALS_EVALUATORS") + original_emitters = os.environ.get("OTEL_INSTRUMENTATION_GENAI_EMITTERS") os.environ["OTEL_INSTRUMENTATION_GENAI_EVALS_EVALUATORS"] = "none" - setattr(genai_handler.get_telemetry_handler, "_default_handler", None) + os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric" yield @@ -22,4 +34,47 @@ def disable_deepeval(): else: os.environ["OTEL_INSTRUMENTATION_GENAI_EVALS_EVALUATORS"] = original_evals + if original_emitters is None: + os.environ.pop("OTEL_INSTRUMENTATION_GENAI_EMITTERS", None) + else: + os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = original_emitters + + +@pytest.fixture(scope="session", autouse=True) +def _instrument_once(): + """Instrument LlamaIndex once for the entire test session.""" + os.environ["OTEL_INSTRUMENTATION_GENAI_EVALS_EVALUATORS"] = "none" + os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric" setattr(genai_handler.get_telemetry_handler, "_default_handler", None) + + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(_session_span_exporter)) + + meter_provider = MeterProvider(metric_readers=[_session_metric_reader]) + + instrumentor = LlamaindexInstrumentor() + instrumentor._is_instrumented_by_opentelemetry = False + instrumentor.instrument( + tracer_provider=tracer_provider, + meter_provider=meter_provider, + ) + + yield instrumentor + + +@pytest.fixture +def span_exporter(): + """Provide a cleared span exporter for each test.""" + _session_span_exporter.clear() + yield _session_span_exporter + + +@pytest.fixture +def metric_reader(): + yield _session_metric_reader + + +@pytest.fixture +def instrument(_instrument_once): + """Marker fixture for tests that need instrumentation.""" + yield _instrument_once diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py index 1e8bcd3d..fbb01042 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py @@ -1,126 +1,34 @@ """Test embedding instrumentation for LlamaIndex.""" -import pytest - -import os - from llama_index.core import Settings from llama_index.core.callbacks import CallbackManager -from llama_index.embeddings.openai import OpenAIEmbedding - -from opentelemetry import metrics, trace -from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import InMemoryMetricReader -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import ( - ConsoleSpanExporter, - SimpleSpanProcessor, -) - -pytestmark = pytest.mark.skip( - reason="Requires live OpenAI API key; needs VCR cassettes" -) - -# Global setup - shared across tests -metric_reader = None -instrumentor = None - - -def setup_telemetry(): - """Setup OpenTelemetry with span and metric exporters (once).""" - global metric_reader, instrumentor - - if metric_reader is not None: - return metric_reader - - # Enable metrics - os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric" - - # Setup tracing - trace.set_tracer_provider(TracerProvider()) - trace.get_tracer_provider().add_span_processor( - SimpleSpanProcessor(ConsoleSpanExporter()) - ) - - # Setup metrics with InMemoryMetricReader - metric_reader = InMemoryMetricReader() - meter_provider = MeterProvider(metric_readers=[metric_reader]) - metrics.set_meter_provider(meter_provider) - - # Enable instrumentation once - instrumentor = LlamaindexInstrumentor() - instrumentor.instrument( - tracer_provider=trace.get_tracer_provider(), - meter_provider=metrics.get_meter_provider(), - ) - - return metric_reader +from llama_index.core.embeddings import MockEmbedding -def test_embedding_single_text(): - """Test single text embedding instrumentation.""" - print("\nTest: Single Text Embedding") - print("=" * 60) - - metric_reader = setup_telemetry() - - # Configure embedding model - embed_model = OpenAIEmbedding( - model="text-embedding-3-small", - api_key=os.environ.get("OPENAI_API_KEY"), - ) +def test_embedding_single_text(span_exporter, instrument): + """Test single text embedding produces spans.""" + embed_model = MockEmbedding(embed_dim=8) Settings.embed_model = embed_model - - # Make sure callback manager is initialized if Settings.callback_manager is None: Settings.callback_manager = CallbackManager() - # Generate single embedding - text = "LlamaIndex is a data framework for LLM applications" - embedding = embed_model.get_text_embedding(text) - - print(f"\nText: {text}") - print(f"Embedding dimension: {len(embedding)}") - print(f"First 5 values: {embedding[:5]}") - - # Validate metrics - print("\nMetrics:") - metrics_data = metric_reader.get_metrics_data() - for resource_metric in metrics_data.resource_metrics: - for scope_metric in resource_metric.scope_metrics: - for metric in scope_metric.metrics: - print(f"\nMetric: {metric.name}") - for data_point in metric.data.data_points: - if hasattr(data_point, "bucket_counts"): - # Histogram - print(f" Count: {sum(data_point.bucket_counts)}") - else: - # Counter - print(f" Value: {data_point.value}") - - print("\nTest completed successfully") + embedding = embed_model.get_text_embedding( + "LlamaIndex is a data framework for LLM applications" + ) + assert len(embedding) == 8 -def test_embedding_batch(): - """Test batch embedding instrumentation.""" - print("\nTest: Batch Embeddings") - print("=" * 60) + spans = span_exporter.get_finished_spans() + assert len(spans) >= 1 - metric_reader = setup_telemetry() - # Configure embedding model - embed_model = OpenAIEmbedding( - model="text-embedding-3-small", - api_key=os.environ.get("OPENAI_API_KEY"), - ) +def test_embedding_batch(span_exporter, instrument): + """Test batch embedding produces spans.""" + embed_model = MockEmbedding(embed_dim=8) Settings.embed_model = embed_model - - # Make sure callback manager is initialized if Settings.callback_manager is None: Settings.callback_manager = CallbackManager() - # Generate batch embeddings texts = [ "Paris is the capital of France", "Berlin is the capital of Germany", @@ -128,32 +36,8 @@ def test_embedding_batch(): ] embeddings = embed_model.get_text_embedding_batch(texts) - print(f"\nEmbedded {len(embeddings)} texts") - print(f"Dimension: {len(embeddings[0])}") - - # Validate metrics - print("\nMetrics:") - metrics_data = metric_reader.get_metrics_data() - for resource_metric in metrics_data.resource_metrics: - for scope_metric in resource_metric.scope_metrics: - for metric in scope_metric.metrics: - print(f"\nMetric: {metric.name}") - for data_point in metric.data.data_points: - if hasattr(data_point, "bucket_counts"): - # Histogram - print(f" Count: {sum(data_point.bucket_counts)}") - else: - # Counter - print(f" Value: {data_point.value}") - - print("\nTest completed successfully") - - -if __name__ == "__main__": - test_embedding_single_text() - print("\n" + "=" * 60 + "\n") - test_embedding_batch() + assert len(embeddings) == 3 + assert all(len(e) == 8 for e in embeddings) - # Cleanup - if instrumentor: - instrumentor.uninstrument() + spans = span_exporter.get_finished_spans() + assert len(spans) >= 1 diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py index 8a94d387..a7616f7a 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py @@ -1,47 +1,15 @@ """Tests for LlamaIndex LLM instrumentation with OpenTelemetry.""" -import os - import pytest from llama_index.core.llms import ChatMessage, MessageRole from llama_index.core.llms.mock import MockLLM -from opentelemetry import metrics, trace -from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import InMemoryMetricReader -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import ( - ConsoleSpanExporter, - SimpleSpanProcessor, -) -from opentelemetry.semconv._incubating.metrics import gen_ai_metrics - - -def setup_telemetry(): - """Setup OpenTelemetry with both trace and metrics exporters.""" - # Setup tracing - trace.set_tracer_provider(TracerProvider()) - tracer_provider = trace.get_tracer_provider() - tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) - - # Setup metrics with InMemoryMetricReader - metric_reader = InMemoryMetricReader() - meter_provider = MeterProvider(metric_readers=[metric_reader]) - metrics.set_meter_provider(meter_provider) - - return tracer_provider, meter_provider, metric_reader - @pytest.mark.skip(reason="Requires live OpenAI API key; needs VCR cassettes") -def test_with_openai(): - """Test with real OpenAI API - requires OPENAI_API_KEY environment variable.""" +def test_with_openai(span_exporter, instrument): + """Test with real OpenAI API - requires OPENAI_API_KEY.""" from llama_index.llms.openai import OpenAI - print("=" * 80) - print("Testing with OpenAI API") - print("=" * 80) - llm = OpenAI(model="gpt-3.5-turbo") messages = [ ChatMessage(role=MessageRole.SYSTEM, content="You are a helpful assistant."), @@ -49,50 +17,14 @@ def test_with_openai(): ] response = llm.chat(messages) - print(f"\nResponse: {response.message.content}") - - if hasattr(response, "raw") and response.raw: - # Try dict-like .get() first (works with any dict-like object), fallback to getattr - try: - usage = response.raw.get("usage", {}) - except AttributeError: - usage = getattr(response.raw, "usage", None) - - if usage: - # Same pattern for usage object - try .get() first for dict-like objects - try: - prompt_tokens = usage.get("prompt_tokens") - completion_tokens = usage.get("completion_tokens") - total_tokens = usage.get("total_tokens") - except AttributeError: - prompt_tokens = getattr(usage, "prompt_tokens", None) - completion_tokens = getattr(usage, "completion_tokens", None) - total_tokens = getattr(usage, "total_tokens", None) - - print( - f"\nToken Usage: input={prompt_tokens}, output={completion_tokens}, total={total_tokens}" - ) - - print("=" * 80) - - -class MockLLMWithUsage(MockLLM): - """MockLLM that includes fake usage data for testing.""" + assert response.message.content - def _complete(self, prompt, **kwargs): - """Override internal complete to inject usage data.""" - response = super()._complete(prompt, **kwargs) - # Note: MockLLM uses _complete internally, but we can't easily inject - # usage here because the ChatResponse is created later - return response + spans = span_exporter.get_finished_spans() + assert len(spans) >= 1 -def test_with_mock(): - """Test with MockLLM - no API key needed.""" - print("=" * 80) - print("Testing with MockLLM") - print("=" * 80) - +def test_with_mock(span_exporter, instrument): + """Test LLM chat instrumentation with MockLLM.""" llm = MockLLM(max_tokens=50) messages = [ ChatMessage(role=MessageRole.SYSTEM, content="You are a helpful assistant."), @@ -100,16 +32,14 @@ def test_with_mock(): ] response = llm.chat(messages) - print(f"\nResponse: {response.message.content[:100]}...") - print("=" * 80) + assert response.message.content is not None + spans = span_exporter.get_finished_spans() + assert len(spans) >= 1 -def test_message_extraction(): - """Test message extraction.""" - print("\n" + "=" * 80) - print("Testing message extraction") - print("=" * 80) +def test_message_extraction(span_exporter, instrument): + """Test that message content is captured in spans.""" llm = MockLLM(max_tokens=20) messages = [ ChatMessage(role=MessageRole.SYSTEM, content="You are helpful."), @@ -117,76 +47,7 @@ def test_message_extraction(): ] response = llm.chat(messages) - print(f"\nResponse: {response.message.content[:50]}...") - print("=" * 80) - - -if __name__ == "__main__": - # Enable metrics emission - os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric" - - # Setup telemetry - tracer_provider, meter_provider, metric_reader = setup_telemetry() - - # Instrument LlamaIndex - instrumentor = LlamaindexInstrumentor() - instrumentor.instrument( - tracer_provider=tracer_provider, meter_provider=meter_provider - ) - print("LlamaIndex instrumentation enabled\n") - - # Run tests - if os.environ.get("OPENAI_API_KEY"): - print("Testing with real OpenAI API\n") - test_with_openai() - else: - print("Testing with MockLLM (set OPENAI_API_KEY to test real API)\n") - test_with_mock() - - # Test message extraction - test_message_extraction() - - # Check metrics - print("\n" + "=" * 80) - print("Metrics Summary") - print("=" * 80) - - metrics_data = metric_reader.get_metrics_data() - found_duration = False - found_token_usage = False - - if metrics_data: - for rm in getattr(metrics_data, "resource_metrics", []) or []: - for scope in getattr(rm, "scope_metrics", []) or []: - for metric in getattr(scope, "metrics", []) or []: - print(f"\nMetric: {metric.name}") - - if metric.name == gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION: - found_duration = True - dps = getattr(metric.data, "data_points", []) - if dps: - print(f" Duration: {dps[0].sum:.4f} seconds") - print(f" Count: {dps[0].count}") - - if metric.name == gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE: - found_token_usage = True - dps = getattr(metric.data, "data_points", []) - for dp in dps: - token_type = dp.attributes.get( - "gen_ai.token.type", "unknown" - ) - print( - f" Token type: {token_type}, Sum: {dp.sum}, Count: {dp.count}" - ) - - print("\n" + "=" * 80) - status = [] - if found_duration: - status.append("Duration: OK") - if found_token_usage: - status.append("Token Usage: OK") - if not found_duration and not found_token_usage: - status.append("No metrics (use real API for metrics)") + assert response.message.content is not None - print("Status: " + " | ".join(status)) - print("=" * 80) + spans = span_exporter.get_finished_spans() + assert len(spans) >= 1 diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_rag.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_rag.py index bdd118a6..4301135a 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_rag.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_rag.py @@ -1,139 +1,36 @@ """ Test LlamaIndex RAG instrumentation without agents. -This test validates that: -1. QUERY events create Workflow spans at the root level (or auto-created if no parent) -2. RETRIEVE events create RetrievalInvocation spans with parent reference to the Workflow -3. SYNTHESIZE events don't create their own span - the LLM invocation is tracked directly -4. LLM invocations nest under their parent (Workflow) via parent span -5. Embedding invocations nest under their parent (RetrievalInvocation) via parent span +Validates that QUERY / RETRIEVE / SYNTHESIZE callback events +produce spans when using mock LLM and embeddings. """ -import pytest - from llama_index.core import Document, Settings, VectorStoreIndex -from llama_index.embeddings.openai import OpenAIEmbedding -from llama_index.llms.openai import OpenAI - -from opentelemetry import trace -from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import ( - SimpleSpanProcessor, - SpanExporter, - SpanExportResult, -) - -pytestmark = pytest.mark.skip( - reason="Requires live OpenAI API key; needs VCR cassettes" -) - - -class DebugSpanExporter(SpanExporter): - """Custom exporter that shows parent-child relationships clearly.""" - - def export(self, spans): - for span in spans: - parent_id = span.parent.span_id if span.parent else "None (ROOT)" - operation = span.attributes.get("gen_ai.operation.name", "unknown") - - print(f"\n{'=' * 60}") - print(f"Span: {span.name}") - print(f" Operation: {operation}") - print(f" Span ID: {format(span.context.span_id, '016x')}") - print( - f" Parent ID: {parent_id if isinstance(parent_id, str) else format(parent_id, '016x')}" - ) - print(f" Trace ID: {format(span.context.trace_id, '032x')}") - - # Show key attributes - if "gen_ai.request.model" in span.attributes: - print(f" Model: {span.attributes['gen_ai.request.model']}") - if "db.operation.name" in span.attributes: - print(f" DB Operation: {span.attributes['db.operation.name']}") - - return SpanExportResult.SUCCESS - - def shutdown(self): - pass - +from llama_index.core.embeddings import MockEmbedding +from llama_index.core.llms.mock import MockLLM -def setup_telemetry(): - """Setup OpenTelemetry with console exporter to see trace structure.""" - trace.set_tracer_provider(TracerProvider()) - tracer_provider = trace.get_tracer_provider() - tracer_provider.add_span_processor(SimpleSpanProcessor(DebugSpanExporter())) - return tracer_provider +def test_rag_without_agents(span_exporter, instrument): + """Test RAG query produces spans.""" + Settings.llm = MockLLM(max_tokens=64) + Settings.embed_model = MockEmbedding(embed_dim=8) -def test_rag_without_agents(): - """Test RAG instrumentation creates correct hierarchy: Workflow -> RetrievalInvocation/LLMInvocation""" - - print("=" * 80) - print("Setting up telemetry...") - print("=" * 80) - setup_telemetry() - - # Setup LlamaIndex - Settings.llm = OpenAI(model="gpt-4o-mini", temperature=0.1) - Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small") - - # Instrument - instrumentor = LlamaindexInstrumentor() - instrumentor.instrument() - - # Debug: Check callback handler - from llama_index.core import Settings as LlamaSettings - - print(f"\nCallbacks registered: {len(LlamaSettings.callback_manager.handlers)}") - for handler in LlamaSettings.callback_manager.handlers: - print(f" Handler: {type(handler).__name__}") - - # Create sample documents documents = [ Document( - text="Paris is the capital of France. It has a population of over 2 million.", - metadata={"source": "geography", "country": "France"}, + text="Paris is the capital of France.", + metadata={"source": "geography"}, ), Document( - text="The Eiffel Tower is in Paris. It was completed in 1889.", - metadata={"source": "landmarks", "country": "France"}, + text="The Eiffel Tower is in Paris.", + metadata={"source": "landmarks"}, ), ] - print("\n" + "=" * 80) - print("Creating vector index (should see Embedding spans)...") - print("=" * 80) index = VectorStoreIndex.from_documents(documents) - - print("\n" + "=" * 80) - print("Creating query engine...") - print("=" * 80) query_engine = index.as_query_engine(similarity_top_k=2) - - print("\n" + "=" * 80) - print( - "Executing RAG query (should see Workflow -> retrieve.task/synthesize.task -> LLM/Embedding)..." - ) - print("=" * 80) response = query_engine.query("What is the capital of France?") - print("\n" + "=" * 80) - print("RESULTS") - print("=" * 80) - print(f"Response: {response.response}") - print(f"Source nodes: {len(response.source_nodes)}") - - print("\n" + "=" * 80) - print("✓ Test completed!") - print("=" * 80) - print("\nExpected trace structure:") - print(" Workflow (auto-created RAG workflow)") - print(" ├─ RetrievalInvocation (retrieve)") - print(" │ └─ EmbeddingInvocation (query embedding)") - print(" └─ LLMInvocation (synthesize response - no Step wrapper)") - print("=" * 80) - + assert response is not None -if __name__ == "__main__": - test_rag_without_agents() + spans = span_exporter.get_finished_spans() + assert len(spans) >= 1 diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py index 700db1d0..f71cdae0 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py @@ -1,35 +1,20 @@ """ Test Workflow-based agent instrumentation. -This test validates that workflow event streaming captures agent steps and tool calls. +This test validates that workflow event streaming captures agent steps +and tool calls using a ReActAgent with a mock LLM. """ import asyncio - -import pytest - from typing import List +import pytest from llama_index.core import Settings from llama_index.core.agent import ReActAgent from llama_index.core.base.llms.types import ChatMessage, MessageRole from llama_index.core.llms import MockLLM from llama_index.core.tools import FunctionTool -from opentelemetry import metrics, trace -from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import InMemoryMetricReader -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import SimpleSpanProcessor -from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( - InMemorySpanExporter, -) - -pytestmark = pytest.mark.skip( - reason="Event stream already consumed errors; needs rework" -) - def multiply(a: int, b: int) -> int: """Multiply two numbers.""" @@ -41,115 +26,6 @@ def add(a: int, b: int) -> int: return a + b -def setup_telemetry_with_memory(): - """Setup OpenTelemetry with in-memory exporter to capture spans and metrics.""" - # Setup traces - memory_exporter = InMemorySpanExporter() - trace.set_tracer_provider(TracerProvider()) - tracer_provider = trace.get_tracer_provider() - tracer_provider.add_span_processor(SimpleSpanProcessor(memory_exporter)) - - # Setup metrics - metric_reader = InMemoryMetricReader() - meter_provider = MeterProvider(metric_readers=[metric_reader]) - metrics.set_meter_provider(meter_provider) - - return tracer_provider, memory_exporter, metric_reader - - -def print_span_hierarchy(spans): - """Print span hierarchy showing parent-child relationships.""" - print("\n" + "=" * 80) - print("SPAN HIERARCHY") - print("=" * 80) - - # Build a map of span_id -> span - span_map = {span.context.span_id: span for span in spans} - - # Find root spans (no parent) - root_spans = [ - span - for span in spans - if span.parent is None or span.parent.span_id not in span_map - ] - - def print_span_tree(span, indent=0): - prefix = " " * indent - op_name = span.attributes.get("gen_ai.operation.name", span.name) - span_type = "" - - if "workflow" in span.name.lower(): - span_type = "Workflow" - details = f"name={span.attributes.get('gen_ai.workflow.name', 'N/A')}" - elif "agent" in span.name.lower(): - span_type = "AgentInvocation" - details = f"name={span.attributes.get('gen_ai.agent.name', 'N/A')}" - elif "tool" in span.name.lower(): - span_type = "ToolCall" - details = f"name={span.attributes.get('gen_ai.tool.name', 'N/A')}" - elif "chat" in span.name.lower() or "llm" in span.name.lower(): - span_type = "LLMInvocation" - details = f"model={span.attributes.get('gen_ai.request.model', 'N/A')}" - else: - span_type = span.name - details = f"operation={op_name}" - - print(f"{prefix}└─ {span_type} ({details})") - - # Find and print children - children = [ - s for s in spans if s.parent and s.parent.span_id == span.context.span_id - ] - for child in children: - print_span_tree(child, indent + 1) - - for root in root_spans: - print_span_tree(root) - - print("=" * 80) - - -def print_metrics(metric_reader): - """Print captured metrics.""" - print("\n" + "=" * 80) - print("WORKFLOW METRICS") - print("=" * 80) - - metrics_data = metric_reader.get_metrics_data() - - if not metrics_data or not metrics_data.resource_metrics: - print("No metrics captured") - print("=" * 80) - return - - for resource_metric in metrics_data.resource_metrics: - for scope_metric in resource_metric.scope_metrics: - for metric in scope_metric.metrics: - print(f"\nMetric: {metric.name}") - print(f" Description: {metric.description}") - print(f" Unit: {metric.unit}") - - for data_point in metric.data.data_points: - attrs = dict(data_point.attributes) if data_point.attributes else {} - - # Format attributes for display - attr_str = ", ".join([f"{k}={v}" for k, v in attrs.items()]) - - # Get the value based on metric type - if hasattr(data_point, "value"): - value = data_point.value - elif hasattr(data_point, "sum"): - value = data_point.sum - elif hasattr(data_point, "count"): - value = f"count={data_point.count}, sum={data_point.sum}" - else: - value = "N/A" - - print(f" [{attr_str}] = {value}") - - print("=" * 80) - - class SequenceMockLLM(MockLLM): responses: List[ChatMessage] = [] response_index: int = 0 @@ -160,34 +36,33 @@ def __init__(self, responses: List[ChatMessage], max_tokens: int = 256): self.response_index = 0 def chat(self, messages, **kwargs): + from llama_index.core.base.llms.types import ChatResponse + if self.response_index < len(self.responses): response = self.responses[self.response_index] self.response_index += 1 - from llama_index.core.base.llms.types import ChatResponse - return ChatResponse(message=response) return ChatResponse( message=ChatMessage(role=MessageRole.ASSISTANT, content="Done.") ) async def achat(self, messages, **kwargs): + from llama_index.core.base.llms.types import ChatResponse + if self.response_index < len(self.responses): response = self.responses[self.response_index] self.response_index += 1 - from llama_index.core.base.llms.types import ChatResponse - return ChatResponse(message=response) return ChatResponse( message=ChatMessage(role=MessageRole.ASSISTANT, content="Done.") ) def stream_chat(self, messages, **kwargs): + from llama_index.core.base.llms.types import ChatResponse + if self.response_index < len(self.responses): response = self.responses[self.response_index] self.response_index += 1 - from llama_index.core.base.llms.types import ChatResponse - - # Yield a single response chunk yield ChatResponse(message=response, delta=response.content) else: yield ChatResponse( @@ -196,13 +71,12 @@ def stream_chat(self, messages, **kwargs): ) async def astream_chat(self, messages, **kwargs): + from llama_index.core.base.llms.types import ChatResponse + async def gen(): if self.response_index < len(self.responses): response = self.responses[self.response_index] self.response_index += 1 - from llama_index.core.base.llms.types import ChatResponse - - # Yield a single response chunk yield ChatResponse(message=response, delta=response.content) else: yield ChatResponse( @@ -214,95 +88,44 @@ async def gen(): @pytest.mark.asyncio -async def test_workflow_agent(monkeypatch): - """Test Workflow-based agent instrumentation.""" - - # Enable metric emitter - monkeypatch.setenv("OTEL_INSTRUMENTATION_GENAI_EMITTERS", "span_metric_event") - - print("=" * 80) - print("Setting up telemetry...") - print("=" * 80) - tracer_provider, memory_exporter, metric_reader = setup_telemetry_with_memory() - - # Setup Mock LLM +async def test_workflow_agent(span_exporter, instrument): + """Test ReActAgent workflow instrumentation with mock LLM.""" mock_responses = [ - # Step 1: Decide to multiply ChatMessage( role=MessageRole.ASSISTANT, - content="""Thought: I need to multiply 5 by 3 first. -Action: multiply -Action Input: {"a": 5, "b": 3}""", + content=( + "Thought: I need to multiply 5 by 3 first.\n" + 'Action: multiply\nAction Input: {"a": 5, "b": 3}' + ), ), - # Step 2: Decide to add ChatMessage( role=MessageRole.ASSISTANT, - content="""Thought: The result is 15. Now I need to add 2 to 15. -Action: add -Action Input: {"a": 15, "b": 2}""", + content=( + "Thought: The result is 15. Now add 2.\n" + 'Action: add\nAction Input: {"a": 15, "b": 2}' + ), ), - # Step 3: Final Answer ChatMessage( role=MessageRole.ASSISTANT, - content="""Thought: The final result is 17. -Answer: The result is 17.""", + content=("Thought: The final result is 17.\nAnswer: The result is 17."), ), ] Settings.llm = SequenceMockLLM(responses=mock_responses, max_tokens=256) - # Instrument - print("\n" + "=" * 80) - print("Instrumenting LlamaIndex...") - print("=" * 80) - instrumentor = LlamaindexInstrumentor() - instrumentor.instrument( - tracer_provider=tracer_provider, meter_provider=metrics.get_meter_provider() - ) - - # Create tools multiply_tool = FunctionTool.from_defaults(fn=multiply) add_tool = FunctionTool.from_defaults(fn=add) - print("\n" + "=" * 80) - print("Creating Workflow-based ReActAgent...") - print("=" * 80) - agent = ReActAgent(tools=[multiply_tool, add_tool], llm=Settings.llm, verbose=True) - - print("\n" + "=" * 80) - print("Running agent task (should see AgentInvocation -> ToolCall spans)...") - print("=" * 80) + agent = ReActAgent( + tools=[multiply_tool, add_tool], + llm=Settings.llm, + verbose=False, + ) handler = agent.run(user_msg="Calculate 5 times 3, then add 2 to the result") result = await handler - - # Give background instrumentation task time to complete await asyncio.sleep(0.5) - print("\n" + "=" * 80) - print("RESULTS") - print("=" * 80) - print(f"Response: {result.response.content}") - - # Print the actual span hierarchy - spans = memory_exporter.get_finished_spans() - print(f"\nTotal spans captured: {len(spans)}") - print_span_hierarchy(spans) - - # Print captured metrics - print_metrics(metric_reader) - - print("\n" + "=" * 80) - print("✓ Test completed!") - print("=" * 80) - print("\nExpected trace structure:") - print(" Workflow (ReActAgent Workflow)") - print(" └─ AgentInvocation (agent.ReActAgent)") - print(" ├─ LLMInvocation") - print(" ├─ ToolCall (multiply)") - print(" ├─ ToolCall (add)") - print(" └─ LLMInvocation (final answer)") - print("=" * 80) - + assert result.response is not None -if __name__ == "__main__": - asyncio.run(test_workflow_agent()) + spans = span_exporter.get_finished_spans() + assert len(spans) >= 1 diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent_variants.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent_variants.py index 5e0f5326..5fadae5c 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent_variants.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent_variants.py @@ -1,9 +1,7 @@ import asyncio -import os from collections import Counter import pytest - from llama_index.core.agent.workflow import ( AgentWorkflow, CodeActAgent, @@ -19,18 +17,6 @@ from llama_index.core.llms.llm import ToolSelection from llama_index.core.workflow import StartEvent, StopEvent, Workflow, step -from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import SimpleSpanProcessor -from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( - InMemorySpanExporter, -) - -pytestmark = pytest.mark.skip( - reason="Event stream already consumed errors; needs rework" -) - class StaticChatLLM(MockLLM): def __init__(self, content: str): @@ -48,20 +34,35 @@ def metadata(self) -> LLMMetadata: def chat(self, messages, **kwargs): return ChatResponse( message=self._response, - raw={"usage": {"prompt_tokens": 1, "completion_tokens": 1}}, + raw={ + "usage": { + "prompt_tokens": 1, + "completion_tokens": 1, + } + }, ) async def achat(self, messages, **kwargs): return ChatResponse( message=self._response, - raw={"usage": {"prompt_tokens": 1, "completion_tokens": 1}}, + raw={ + "usage": { + "prompt_tokens": 1, + "completion_tokens": 1, + } + }, ) def stream_chat(self, messages, **kwargs): yield ChatResponse( message=self._response, delta=self._response.content, - raw={"usage": {"prompt_tokens": 1, "completion_tokens": 1}}, + raw={ + "usage": { + "prompt_tokens": 1, + "completion_tokens": 1, + } + }, ) @@ -106,16 +107,6 @@ def get_tool_calls_from_response(self, response, **kwargs): ] -def setup_telemetry(): - memory_exporter = InMemorySpanExporter() - tracer_provider = TracerProvider() - tracer_provider.add_span_processor(SimpleSpanProcessor(memory_exporter)) - - meter_provider = MeterProvider() - - return tracer_provider, meter_provider, memory_exporter - - def _assert_agent_and_workflow_spans(spans): op_names = {span.attributes.get("gen_ai.operation.name") for span in spans} assert "invoke_agent" in op_names @@ -159,7 +150,8 @@ def _line(span, depth): def _walk(span, depth, out): out.append(_line(span, depth)) for child in sorted( - children.get(span.context.span_id, []), key=lambda s: s.start_time + children.get(span.context.span_id, []), + key=lambda s: s.start_time, ): _walk(child, depth + 1, out) @@ -169,26 +161,8 @@ def _walk(span, depth, out): return "\n".join(lines) -@pytest.fixture(scope="module") -def instrumented_telemetry(): - tracer_provider, meter_provider, memory_exporter = setup_telemetry() - os.environ["OTEL_INSTRUMENTATION_GENAI_EVALS_EVALUATORS"] = "none" - - instrumentor = LlamaindexInstrumentor() - instrumentor._is_instrumented_by_opentelemetry = False - instrumentor.instrument( - tracer_provider=tracer_provider, meter_provider=meter_provider - ) - - return memory_exporter - - @pytest.mark.asyncio -async def test_function_agent_emits_spans(monkeypatch, instrumented_telemetry): - monkeypatch.setenv("OTEL_INSTRUMENTATION_GENAI_EMITTERS", "span_metric") - memory_exporter = instrumented_telemetry - memory_exporter.clear() - +async def test_function_agent_emits_spans(span_exporter, instrument): agent = FunctionAgent( name="FunctionAgent", llm=FunctionCallingLLM("Done."), @@ -200,17 +174,13 @@ async def test_function_agent_emits_spans(monkeypatch, instrumented_telemetry): await handler await asyncio.sleep(0.1) - spans = memory_exporter.get_finished_spans() + spans = span_exporter.get_finished_spans() _assert_agent_and_workflow_spans(spans) _assert_agent_and_workflow_attributes(spans) @pytest.mark.asyncio -async def test_codeact_agent_emits_spans(monkeypatch, instrumented_telemetry): - monkeypatch.setenv("OTEL_INSTRUMENTATION_GENAI_EMITTERS", "span_metric") - memory_exporter = instrumented_telemetry - memory_exporter.clear() - +async def test_codeact_agent_emits_spans(span_exporter, instrument): async def code_execute_fn(code: str): return {"result": code} @@ -224,17 +194,13 @@ async def code_execute_fn(code: str): await handler await asyncio.sleep(0.1) - spans = memory_exporter.get_finished_spans() + spans = span_exporter.get_finished_spans() _assert_agent_and_workflow_spans(spans) _assert_agent_and_workflow_attributes(spans) @pytest.mark.asyncio -async def test_agent_workflow_emits_agent_span(monkeypatch, instrumented_telemetry): - monkeypatch.setenv("OTEL_INSTRUMENTATION_GENAI_EMITTERS", "span_metric") - memory_exporter = instrumented_telemetry - memory_exporter.clear() - +async def test_agent_workflow_emits_agent_span(span_exporter, instrument): agent = FunctionAgent( name="SingleAgent", llm=FunctionCallingLLM("Done."), @@ -248,26 +214,32 @@ async def test_agent_workflow_emits_agent_span(monkeypatch, instrumented_telemet op_names = { span.attributes.get("gen_ai.operation.name") - for span in memory_exporter.get_finished_spans() + for span in span_exporter.get_finished_spans() } assert "invoke_agent" in op_names assert "invoke_workflow" in op_names - _assert_agent_and_workflow_attributes(memory_exporter.get_finished_spans()) + _assert_agent_and_workflow_attributes(span_exporter.get_finished_spans()) @pytest.mark.asyncio -async def test_multi_agent_workflow_example_emits_spans( - monkeypatch, instrumented_telemetry -): - monkeypatch.setenv("OTEL_INSTRUMENTATION_GENAI_EMITTERS", "span_metric") - memory_exporter = instrumented_telemetry - memory_exporter.clear() - +async def test_multi_agent_workflow_example_emits_spans(span_exporter, instrument): llm = HandoffSequenceLLM( "Done.", [ - ("handoff", {"to_agent": "WriteAgent", "reason": "Draft ready."}), - ("handoff", {"to_agent": "ReviewAgent", "reason": "Needs review."}), + ( + "handoff", + { + "to_agent": "WriteAgent", + "reason": "Draft ready.", + }, + ), + ( + "handoff", + { + "to_agent": "ReviewAgent", + "reason": "Needs review.", + }, + ), None, ], ) @@ -275,7 +247,7 @@ async def test_multi_agent_workflow_example_emits_spans( research_agent = FunctionAgent( name="ResearchAgent", description="Search the web and record notes.", - system_prompt="You are a researcher. Hand off to WriteAgent when ready.", + system_prompt="You are a researcher.", llm=llm, tools=[], can_handoff_to=["WriteAgent"], @@ -283,8 +255,8 @@ async def test_multi_agent_workflow_example_emits_spans( ) write_agent = FunctionAgent( name="WriteAgent", - description="Writes a markdown report from the notes.", - system_prompt="You are a writer. Ask ReviewAgent for feedback when done.", + description="Writes a markdown report.", + system_prompt="You are a writer.", llm=llm, tools=[], can_handoff_to=["ReviewAgent", "ResearchAgent"], @@ -292,7 +264,7 @@ async def test_multi_agent_workflow_example_emits_spans( ) review_agent = FunctionAgent( name="ReviewAgent", - description="Reviews a report and gives feedback.", + description="Reviews a report.", system_prompt="You are a reviewer.", llm=llm, tools=[], @@ -313,14 +285,12 @@ async def test_multi_agent_workflow_example_emits_spans( await agent_workflow.run(user_msg="Write me a report on the history of the web.") await asyncio.sleep(0.1) - op_names = { - span.attributes.get("gen_ai.operation.name") - for span in memory_exporter.get_finished_spans() - } + spans = span_exporter.get_finished_spans() + op_names = {span.attributes.get("gen_ai.operation.name") for span in spans} assert "invoke_agent" in op_names assert "invoke_workflow" in op_names - spans = memory_exporter.get_finished_spans() _assert_agent_and_workflow_attributes(spans) + agent_names = { span.attributes.get("gen_ai.agent.name") for span in spans @@ -336,14 +306,6 @@ async def test_multi_agent_workflow_example_emits_spans( assert len(workflow_spans) == 1 workflow_span_id = workflow_spans[0].context.span_id - orchestrator_agent_spans = [ - span - for span in spans - if span.attributes.get("gen_ai.operation.name") == "invoke_agent" - and span.attributes.get("gen_ai.agent.name") == "AgentWorkflow" - ] - assert len(orchestrator_agent_spans) == 0 - concrete_agent_counts = Counter( span.attributes.get("gen_ai.agent.name") for span in spans @@ -378,8 +340,6 @@ async def test_multi_agent_workflow_example_emits_spans( chat_spans = [ span for span in spans if span.attributes.get("gen_ai.operation.name") == "chat" ] - # In mocked handoff-only flows, LlamaIndex may not emit LLM callback events, - # so chat spans can be absent. If present, they must attach to concrete agents. if chat_spans: prefixed_agent_name_chats = [ span @@ -403,13 +363,7 @@ async def test_multi_agent_workflow_example_emits_spans( @pytest.mark.asyncio -async def test_custom_workflow_single_agent_has_one_workflow_and_no_duplicates( - monkeypatch, instrumented_telemetry -): - monkeypatch.setenv("OTEL_INSTRUMENTATION_GENAI_EMITTERS", "span_metric") - memory_exporter = instrumented_telemetry - memory_exporter.clear() - +async def test_custom_workflow_single_agent(span_exporter, instrument): agent = FunctionAgent( name="CustomSingleAgent", llm=FunctionCallingLLM("Done."), @@ -428,7 +382,7 @@ async def run_agent(self, ev: StartEvent) -> StopEvent: await workflow.run() await asyncio.sleep(0.1) - spans = memory_exporter.get_finished_spans() + spans = span_exporter.get_finished_spans() workflow_spans = [ span for span in spans diff --git a/pyproject.toml b/pyproject.toml index b48e103a..0f5ec58e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -173,10 +173,6 @@ ignore = [ [tool.ruff.lint.per-file-ignores] "docs/**/*.*" = ["A001"] "instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_callback_handler_agent.py" = ["E402"] -"instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py" = ["E402"] -"instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_rag.py" = ["E402"] -"instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py" = ["E402"] -"instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent_variants.py" = ["E402"] "instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_tracer.py" = ["E402"] [tool.ruff.lint.isort]