Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,28 @@

import pytest

from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import InMemoryMetricReader
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
)
from opentelemetry.util.genai import handler as genai_handler

_session_span_exporter = InMemorySpanExporter()
_session_metric_reader = InMemoryMetricReader()


@pytest.fixture(autouse=True)
def disable_deepeval():
"""Disable deepeval evaluators to prevent real API calls in CI."""
def environment():
"""Reset env and handler singleton for each test."""
original_evals = os.environ.get("OTEL_INSTRUMENTATION_GENAI_EVALS_EVALUATORS")
original_emitters = os.environ.get("OTEL_INSTRUMENTATION_GENAI_EMITTERS")

os.environ["OTEL_INSTRUMENTATION_GENAI_EVALS_EVALUATORS"] = "none"
setattr(genai_handler.get_telemetry_handler, "_default_handler", None)
os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric"

yield

Expand All @@ -22,4 +34,47 @@ def disable_deepeval():
else:
os.environ["OTEL_INSTRUMENTATION_GENAI_EVALS_EVALUATORS"] = original_evals

if original_emitters is None:
os.environ.pop("OTEL_INSTRUMENTATION_GENAI_EMITTERS", None)
else:
os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = original_emitters


@pytest.fixture(scope="session", autouse=True)
def _instrument_once():
"""Instrument LlamaIndex once for the entire test session."""
os.environ["OTEL_INSTRUMENTATION_GENAI_EVALS_EVALUATORS"] = "none"
os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric"
setattr(genai_handler.get_telemetry_handler, "_default_handler", None)

tracer_provider = TracerProvider()
tracer_provider.add_span_processor(SimpleSpanProcessor(_session_span_exporter))

meter_provider = MeterProvider(metric_readers=[_session_metric_reader])

instrumentor = LlamaindexInstrumentor()
instrumentor._is_instrumented_by_opentelemetry = False
instrumentor.instrument(
tracer_provider=tracer_provider,
meter_provider=meter_provider,
)

yield instrumentor


@pytest.fixture
def span_exporter():
"""Provide a cleared span exporter for each test."""
_session_span_exporter.clear()
yield _session_span_exporter


@pytest.fixture
def metric_reader():
yield _session_metric_reader


@pytest.fixture
def instrument(_instrument_once):
"""Marker fixture for tests that need instrumentation."""
yield _instrument_once
Original file line number Diff line number Diff line change
@@ -1,159 +1,43 @@
"""Test embedding instrumentation for LlamaIndex."""

import pytest

import os

from llama_index.core import Settings
from llama_index.core.callbacks import CallbackManager
from llama_index.embeddings.openai import OpenAIEmbedding

from opentelemetry import metrics, trace
from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import InMemoryMetricReader
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
SimpleSpanProcessor,
)

pytestmark = pytest.mark.skip(
reason="Requires live OpenAI API key; needs VCR cassettes"
)

# Global setup - shared across tests
metric_reader = None
instrumentor = None


def setup_telemetry():
"""Setup OpenTelemetry with span and metric exporters (once)."""
global metric_reader, instrumentor

if metric_reader is not None:
return metric_reader

# Enable metrics
os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric"

# Setup tracing
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(ConsoleSpanExporter())
)

# Setup metrics with InMemoryMetricReader
metric_reader = InMemoryMetricReader()
meter_provider = MeterProvider(metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)

# Enable instrumentation once
instrumentor = LlamaindexInstrumentor()
instrumentor.instrument(
tracer_provider=trace.get_tracer_provider(),
meter_provider=metrics.get_meter_provider(),
)

return metric_reader
from llama_index.core.embeddings import MockEmbedding


def test_embedding_single_text():
"""Test single text embedding instrumentation."""
print("\nTest: Single Text Embedding")
print("=" * 60)

metric_reader = setup_telemetry()

# Configure embedding model
embed_model = OpenAIEmbedding(
model="text-embedding-3-small",
api_key=os.environ.get("OPENAI_API_KEY"),
)
def test_embedding_single_text(span_exporter, instrument):
"""Test single text embedding produces spans."""
embed_model = MockEmbedding(embed_dim=8)
Settings.embed_model = embed_model

# Make sure callback manager is initialized
if Settings.callback_manager is None:
Settings.callback_manager = CallbackManager()

# Generate single embedding
text = "LlamaIndex is a data framework for LLM applications"
embedding = embed_model.get_text_embedding(text)

print(f"\nText: {text}")
print(f"Embedding dimension: {len(embedding)}")
print(f"First 5 values: {embedding[:5]}")

# Validate metrics
print("\nMetrics:")
metrics_data = metric_reader.get_metrics_data()
for resource_metric in metrics_data.resource_metrics:
for scope_metric in resource_metric.scope_metrics:
for metric in scope_metric.metrics:
print(f"\nMetric: {metric.name}")
for data_point in metric.data.data_points:
if hasattr(data_point, "bucket_counts"):
# Histogram
print(f" Count: {sum(data_point.bucket_counts)}")
else:
# Counter
print(f" Value: {data_point.value}")

print("\nTest completed successfully")
embedding = embed_model.get_text_embedding(
"LlamaIndex is a data framework for LLM applications"
)

assert len(embedding) == 8

def test_embedding_batch():
"""Test batch embedding instrumentation."""
print("\nTest: Batch Embeddings")
print("=" * 60)
spans = span_exporter.get_finished_spans()
assert len(spans) >= 1

metric_reader = setup_telemetry()

# Configure embedding model
embed_model = OpenAIEmbedding(
model="text-embedding-3-small",
api_key=os.environ.get("OPENAI_API_KEY"),
)
def test_embedding_batch(span_exporter, instrument):
"""Test batch embedding produces spans."""
embed_model = MockEmbedding(embed_dim=8)
Settings.embed_model = embed_model

# Make sure callback manager is initialized
if Settings.callback_manager is None:
Settings.callback_manager = CallbackManager()

# Generate batch embeddings
texts = [
"Paris is the capital of France",
"Berlin is the capital of Germany",
"Rome is the capital of Italy",
]
embeddings = embed_model.get_text_embedding_batch(texts)

print(f"\nEmbedded {len(embeddings)} texts")
print(f"Dimension: {len(embeddings[0])}")

# Validate metrics
print("\nMetrics:")
metrics_data = metric_reader.get_metrics_data()
for resource_metric in metrics_data.resource_metrics:
for scope_metric in resource_metric.scope_metrics:
for metric in scope_metric.metrics:
print(f"\nMetric: {metric.name}")
for data_point in metric.data.data_points:
if hasattr(data_point, "bucket_counts"):
# Histogram
print(f" Count: {sum(data_point.bucket_counts)}")
else:
# Counter
print(f" Value: {data_point.value}")

print("\nTest completed successfully")


if __name__ == "__main__":
test_embedding_single_text()
print("\n" + "=" * 60 + "\n")
test_embedding_batch()
assert len(embeddings) == 3
assert all(len(e) == 8 for e in embeddings)

# Cleanup
if instrumentor:
instrumentor.uninstrument()
spans = span_exporter.get_finished_spans()
assert len(spans) >= 1
Loading
Loading