Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added retrieval span support.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ classifiers = [
]
dependencies = [
"opentelemetry-instrumentation >= 0.62b0",
"opentelemetry-util-genai >= 1.0b0.dev",
"opentelemetry-util-genai >= 0.1b0",
]

[project.optional-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

from __future__ import annotations

from collections.abc import Sequence
from typing import Any, Optional, cast
from uuid import UUID

from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.documents import Document
from langchain_core.messages import BaseMessage
from langchain_core.outputs import LLMResult

Expand All @@ -26,6 +28,7 @@
from opentelemetry.util.genai.invocation import (
AgentInvocation,
InferenceInvocation,
RetrievalInvocation,
WorkflowInvocation,
)
from opentelemetry.util.genai.types import (
Expand Down Expand Up @@ -402,6 +405,78 @@ def on_llm_error(
if not llm_invocation.span.is_recording():
self._invocation_manager.delete_invocation_state(run_id=run_id)

def on_retriever_start(
self,
serialized: dict[str, Any],
query: str,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[list[str]] = None,
metadata: Optional[dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
meta = metadata or {}
provider = meta.get("ls_vector_store_provider") or None
request_model = meta.get("ls_embedding_model") or None
retrieval = self._telemetry_handler.retrieval(
provider=provider, request_model=request_model
)
retrieval.query_text = query
self._invocation_manager.add_invocation_state(
run_id, parent_run_id, retrieval
)

def on_retriever_end(
self,
documents: Sequence[Document],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
invocation = self._invocation_manager.get_invocation(run_id=run_id)
if invocation is None or not isinstance(
invocation, RetrievalInvocation
):
self._invocation_manager.delete_invocation_state(run_id)
return

invocation.documents = [
{
"content": doc.page_content,
**({"id": doc.id} if doc.id is not None else {}),
**{
k: v
for k, v in cast(dict[str, Any], doc.metadata).items()
if v is not None
},
}
Comment on lines +445 to +454
for doc in documents
]
invocation.stop()
if not invocation.span.is_recording():
self._invocation_manager.delete_invocation_state(run_id)

def on_retriever_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
invocation = self._invocation_manager.get_invocation(run_id=run_id)
if invocation is None or not isinstance(
invocation, RetrievalInvocation
):
self._invocation_manager.delete_invocation_state(run_id)
return

invocation.fail(error)
if not invocation.span.is_recording():
self._invocation_manager.delete_invocation_state(run_id=run_id)

def _find_nearest_agent(
self, run_id: Optional[UUID]
) -> Optional[AgentInvocation]:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0

"""Conformance scenario: langchain retrieval via VectorStoreRetriever."""

from __future__ import annotations

from typing import Any

from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever

from opentelemetry.instrumentation.genai.langchain import LangChainInstrumentor
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.test.weaver_live_check import LiveCheckReport
from opentelemetry.test_util_genai.conformance import (
ExpectedViolation,
Scenario,
)
from opentelemetry.test_util_genai.instrumentor import instrument


class _FakeRetriever(BaseRetriever):
"""In-memory retriever that returns fixed documents without network calls."""

def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> list[Document]:
return [
Document(
page_content="Paris is the capital of France.",
id="doc-1",
metadata={"source": "wiki"},
),
Document(
page_content="The Eiffel Tower is located in Paris.",
id="doc-2",
metadata={"source": "wiki"},
),
]

def _get_ls_params(self, **kwargs: Any) -> Any:
params = super()._get_ls_params(**kwargs)
params["ls_vector_store_provider"] = "FakeVectorStore"
return params


class RetrievalScenario(Scenario):
expected_spans = ("retrieval",)
expected_metrics = ("gen_ai.client.operation.duration",)
expected_violations = (
# LangChain's Document type has no relevance score field; the
# instrumentation cannot populate gen_ai.retrieval.documents[].score.
ExpectedViolation(
advice_id="genai_content_schema",
message_substring="score",
),
# _FakeRetriever is in-memory and has no backing server.
ExpectedViolation(
advice_id="genai_expected_attribute_missing",
message_substring="server.address",
),
)

def run(
self,
*,
tracer_provider: TracerProvider,
meter_provider: MeterProvider,
logger_provider: LoggerProvider,
vcr: Any,
) -> None:
with instrument(
LangChainInstrumentor(),
tracer_provider=tracer_provider,
logger_provider=logger_provider,
meter_provider=meter_provider,
semconv="gen_ai_latest_experimental",
content_capture="SPAN_ONLY",
):
retriever = _FakeRetriever()
# No VCR cassette needed — _FakeRetriever makes no network calls.
retriever.invoke("What is the capital of France?")

def validate(self, report: LiveCheckReport) -> None:
super().validate(report)
operations = [
attr["value"]
for entry in report["samples"]
if "span" in entry
for attr in entry["span"]["attributes"]
if attr["name"] == "gen_ai.operation.name"
]
assert "retrieval" in operations, (
f"Expected a retrieval span; saw operations {operations}"
)
Loading