-
Notifications
You must be signed in to change notification settings - Fork 32
Add retrieval support in langchain #124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 7 commits
1b29c37
b9ac7ac
85886d8
cb278af
bbc1ef4
33d919e
caa313d
58b9858
815589c
d5650d4
c8de07d
4e5f5f0
5a64181
15f621e
97ebb84
067aad7
6d7866d
aabcbcf
b2017f3
3ac5ed3
067c2fe
febe947
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Added retrieval span support. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,10 +3,12 @@ | |
|
|
||
| from __future__ import annotations | ||
|
|
||
| from collections.abc import Sequence | ||
| from typing import Any, Optional, cast | ||
| from uuid import UUID | ||
|
|
||
| from langchain_core.callbacks import BaseCallbackHandler | ||
| from langchain_core.documents import Document | ||
| from langchain_core.messages import BaseMessage | ||
| from langchain_core.outputs import LLMResult | ||
|
|
||
|
|
@@ -26,6 +28,7 @@ | |
| from opentelemetry.util.genai.invocation import ( | ||
| AgentInvocation, | ||
| InferenceInvocation, | ||
| RetrievalInvocation, | ||
| WorkflowInvocation, | ||
| ) | ||
| from opentelemetry.util.genai.types import ( | ||
|
|
@@ -402,6 +405,78 @@ def on_llm_error( | |
| if not llm_invocation.span.is_recording(): | ||
| self._invocation_manager.delete_invocation_state(run_id=run_id) | ||
|
|
||
| def on_retriever_start( | ||
| self, | ||
| serialized: dict[str, Any], | ||
| query: str, | ||
| *, | ||
| run_id: UUID, | ||
| parent_run_id: Optional[UUID] = None, | ||
| tags: Optional[list[str]] = None, | ||
| metadata: Optional[dict[str, Any]] = None, | ||
| **kwargs: Any, | ||
| ) -> Any: | ||
| meta = metadata or {} | ||
| provider = meta.get("ls_vector_store_provider") or None | ||
| request_model = meta.get("ls_embedding_model") or None | ||
| retrieval = self._telemetry_handler.retrieval( | ||
| provider=provider, request_model=request_model | ||
| ) | ||
| retrieval.query_text = query | ||
| self._invocation_manager.add_invocation_state( | ||
| run_id, parent_run_id, retrieval | ||
| ) | ||
|
|
||
| def on_retriever_end( | ||
| self, | ||
| documents: Sequence[Document], | ||
| *, | ||
| run_id: UUID, | ||
| parent_run_id: Optional[UUID] = None, | ||
| **kwargs: Any, | ||
| ) -> Any: | ||
| invocation = self._invocation_manager.get_invocation(run_id=run_id) | ||
| if invocation is None or not isinstance( | ||
| invocation, RetrievalInvocation | ||
| ): | ||
| self._invocation_manager.delete_invocation_state(run_id) | ||
| return | ||
|
|
||
| invocation.documents = [ | ||
| { | ||
| "content": doc.page_content, | ||
| **({"id": doc.id} if doc.id is not None else {}), | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why doc_id falls back to an empty object? it should be a string. It seems it's required in semconv, but given it's optional in langchain, we should mark it optional in semconv. Could you please send an issue (or PR) to update it? And then just don't fallback to anything, leave it None
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| **{ | ||
| k: v | ||
| for k, v in cast(dict[str, Any], doc.metadata).items() | ||
| if v is not None | ||
| }, | ||
|
wrisa marked this conversation as resolved.
Outdated
|
||
| } | ||
|
wrisa marked this conversation as resolved.
|
||
| for doc in documents | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we don't ever populate score, but it should be available at least in some cases, e.g. https://reference.langchain.com/python/langchain-core/vectorstores/base/VectorStore/similarity_search_with_relevance_scores Is there a way to get it somehow? If it's not always available, we should also update semantic conventions to make it not required. Please create an issue (or PR) to make it optional. cc @JWinermaSplunk who added retrival span in semconv in case he has thoughts
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| ] | ||
| invocation.stop() | ||
| if not invocation.span.is_recording(): | ||
| self._invocation_manager.delete_invocation_state(run_id) | ||
|
|
||
| def on_retriever_error( | ||
| self, | ||
| error: BaseException, | ||
| *, | ||
| run_id: UUID, | ||
| parent_run_id: Optional[UUID] = None, | ||
| **kwargs: Any, | ||
| ) -> Any: | ||
| invocation = self._invocation_manager.get_invocation(run_id=run_id) | ||
| if invocation is None or not isinstance( | ||
| invocation, RetrievalInvocation | ||
| ): | ||
| self._invocation_manager.delete_invocation_state(run_id) | ||
| return | ||
|
|
||
| invocation.fail(error) | ||
| if not invocation.span.is_recording(): | ||
| self._invocation_manager.delete_invocation_state(run_id=run_id) | ||
|
|
||
| def _find_nearest_agent( | ||
| self, run_id: Optional[UUID] | ||
| ) -> Optional[AgentInvocation]: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| # Copyright The OpenTelemetry Authors | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| """Conformance scenario: langchain retrieval via VectorStoreRetriever.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from typing import Any | ||
|
|
||
| from langchain_core.callbacks import CallbackManagerForRetrieverRun | ||
| from langchain_core.documents import Document | ||
| from langchain_core.retrievers import BaseRetriever | ||
|
|
||
| from opentelemetry.instrumentation.genai.langchain import LangChainInstrumentor | ||
| from opentelemetry.sdk._logs import LoggerProvider | ||
| from opentelemetry.sdk.metrics import MeterProvider | ||
| from opentelemetry.sdk.trace import TracerProvider | ||
| from opentelemetry.test.weaver_live_check import LiveCheckReport | ||
| from opentelemetry.test_util_genai.conformance import ( | ||
| ExpectedViolation, | ||
| Scenario, | ||
| ) | ||
| from opentelemetry.test_util_genai.instrumentor import instrument | ||
|
|
||
|
|
||
| class _FakeRetriever(BaseRetriever): | ||
| """In-memory retriever that returns fixed documents without network calls.""" | ||
|
|
||
| def _get_relevant_documents( | ||
| self, query: str, *, run_manager: CallbackManagerForRetrieverRun | ||
| ) -> list[Document]: | ||
| return [ | ||
| Document( | ||
| page_content="Paris is the capital of France.", | ||
| id="doc-1", | ||
| metadata={"source": "wiki"}, | ||
| ), | ||
| Document( | ||
| page_content="The Eiffel Tower is located in Paris.", | ||
| id="doc-2", | ||
| metadata={"source": "wiki"}, | ||
| ), | ||
| ] | ||
|
|
||
| def _get_ls_params(self, **kwargs: Any) -> Any: | ||
| params = super()._get_ls_params(**kwargs) | ||
| params["ls_vector_store_provider"] = "FakeVectorStore" | ||
| return params | ||
|
|
||
|
|
||
| class RetrievalScenario(Scenario): | ||
| expected_spans = ("retrieval",) | ||
| expected_metrics = ("gen_ai.client.operation.duration",) | ||
| expected_violations = ( | ||
| # LangChain's Document type has no relevance score field; the | ||
| # instrumentation cannot populate gen_ai.retrieval.documents[].score. | ||
| ExpectedViolation( | ||
| advice_id="genai_content_schema", | ||
| message_substring="score", | ||
| ), | ||
| # _FakeRetriever is in-memory and has no backing server. | ||
| ExpectedViolation( | ||
| advice_id="genai_expected_attribute_missing", | ||
| message_substring="server.address", | ||
| ), | ||
| ) | ||
|
|
||
| def run( | ||
| self, | ||
| *, | ||
| tracer_provider: TracerProvider, | ||
| meter_provider: MeterProvider, | ||
| logger_provider: LoggerProvider, | ||
| vcr: Any, | ||
| ) -> None: | ||
| with instrument( | ||
| LangChainInstrumentor(), | ||
| tracer_provider=tracer_provider, | ||
| logger_provider=logger_provider, | ||
| meter_provider=meter_provider, | ||
| semconv="gen_ai_latest_experimental", | ||
| content_capture="SPAN_ONLY", | ||
| ): | ||
| retriever = _FakeRetriever() | ||
| # No VCR cassette needed — _FakeRetriever makes no network calls. | ||
| retriever.invoke("What is the capital of France?") | ||
|
|
||
| def validate(self, report: LiveCheckReport) -> None: | ||
| super().validate(report) | ||
| operations = [ | ||
| attr["value"] | ||
| for entry in report["samples"] | ||
| if "span" in entry | ||
| for attr in entry["span"]["attributes"] | ||
| if attr["name"] == "gen_ai.operation.name" | ||
| ] | ||
| assert "retrieval" in operations, ( | ||
| f"Expected a retrieval span; saw operations {operations}" | ||
| ) |
Uh oh!
There was an error while loading. Please reload this page.