Skip to content

Commit 411beb7

Browse files
nagkumar91Copilot
andcommitted
Add GenAI memory span instrumentation
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 5560324 commit 411beb7

3 files changed

Lines changed: 340 additions & 1 deletion

File tree

instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313
([#3889](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3889))
1414
- Added log and metrics provider to langchain genai utils handler
1515
([#4214](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4214))
16+
- Added retriever memory search span/event instrumentation aligned with the
17+
GenAI memory semantic convention proposal.
18+
([#3250](https://github.com/open-telemetry/semantic-conventions/pull/3250))

instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py

Lines changed: 196 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from __future__ import annotations
1616

17-
from typing import Any, Optional
17+
from typing import Any, Optional, Sequence
1818
from uuid import UUID
1919

2020
from langchain_core.callbacks import BaseCallbackHandler
@@ -24,14 +24,60 @@
2424
from opentelemetry.instrumentation.langchain.invocation_manager import (
2525
_InvocationManager,
2626
)
27+
from opentelemetry.semconv._incubating.attributes import (
28+
gen_ai_attributes as GenAI,
29+
)
2730
from opentelemetry.util.genai.handler import TelemetryHandler
2831
from opentelemetry.util.genai.types import (
32+
ContentCapturingMode,
2933
Error,
3034
InputMessage,
3135
LLMInvocation,
3236
OutputMessage,
3337
Text,
3438
)
39+
from opentelemetry.util.genai.utils import (
40+
get_content_capturing_mode,
41+
is_experimental_mode,
42+
)
43+
44+
GEN_AI_MEMORY_STORE_ID = getattr(
45+
GenAI, "GEN_AI_MEMORY_STORE_ID", "gen_ai.memory.store.id"
46+
)
47+
GEN_AI_MEMORY_STORE_NAME = getattr(
48+
GenAI, "GEN_AI_MEMORY_STORE_NAME", "gen_ai.memory.store.name"
49+
)
50+
GEN_AI_MEMORY_QUERY = getattr(
51+
GenAI, "GEN_AI_MEMORY_QUERY", "gen_ai.memory.query"
52+
)
53+
GEN_AI_MEMORY_SEARCH_RESULT_COUNT = getattr(
54+
GenAI,
55+
"GEN_AI_MEMORY_SEARCH_RESULT_COUNT",
56+
"gen_ai.memory.search.result.count",
57+
)
58+
GEN_AI_MEMORY_NAMESPACE = getattr(
59+
GenAI, "GEN_AI_MEMORY_NAMESPACE", "gen_ai.memory.namespace"
60+
)
61+
62+
_SEARCH_MEMORY_MEMBER = getattr(
63+
getattr(GenAI, "GenAiOperationNameValues", object()),
64+
"SEARCH_MEMORY",
65+
None,
66+
)
67+
SEARCH_MEMORY_OPERATION = (
68+
_SEARCH_MEMORY_MEMBER.value
69+
if _SEARCH_MEMORY_MEMBER is not None
70+
else "search_memory"
71+
)
72+
73+
_RETRIEVAL_MEMBER = getattr(
74+
getattr(GenAI, "GenAiOperationNameValues", object()),
75+
"RETRIEVAL",
76+
None,
77+
)
78+
RETRIEVAL_OPERATION = (
79+
_RETRIEVAL_MEMBER.value if _RETRIEVAL_MEMBER is not None else "retrieval"
80+
)
3581

3682

3783
class OpenTelemetryLangChainCallbackHandler(BaseCallbackHandler):
@@ -44,6 +90,62 @@ def __init__(self, telemetry_handler: TelemetryHandler) -> None:
4490
self._telemetry_handler = telemetry_handler
4591
self._invocation_manager = _InvocationManager()
4692

93+
@staticmethod
94+
def _resolve_retriever_store_name(
95+
serialized: dict[str, Any],
96+
metadata: Optional[dict[str, Any]],
97+
) -> Optional[str]:
98+
if metadata and metadata.get("memory_store_name"):
99+
return str(metadata["memory_store_name"])
100+
if metadata and metadata.get("ls_retriever_name"):
101+
return str(metadata["ls_retriever_name"])
102+
name = serialized.get("name")
103+
return str(name) if isinstance(name, str) and name else None
104+
105+
@staticmethod
106+
def _resolve_retriever_store_id(
107+
serialized: dict[str, Any],
108+
metadata: Optional[dict[str, Any]],
109+
) -> Optional[str]:
110+
if metadata and metadata.get("memory_store_id"):
111+
return str(metadata["memory_store_id"])
112+
113+
serialized_id = serialized.get("id")
114+
if isinstance(serialized_id, str) and serialized_id:
115+
return serialized_id
116+
if isinstance(serialized_id, list) and serialized_id:
117+
try:
118+
return ".".join(str(part) for part in serialized_id) # type: ignore[reportUnknownArgumentType, reportUnknownVariableType]
119+
except TypeError:
120+
return None
121+
return None
122+
123+
@staticmethod
124+
def _should_capture_memory_query() -> bool:
125+
if not is_experimental_mode():
126+
return False
127+
try:
128+
mode = get_content_capturing_mode()
129+
except ValueError:
130+
return False
131+
return mode in (
132+
ContentCapturingMode.SPAN_ONLY,
133+
ContentCapturingMode.SPAN_AND_EVENT,
134+
)
135+
136+
@staticmethod
137+
def _is_memory_retriever(
138+
metadata: Optional[dict[str, Any]],
139+
) -> bool:
140+
"""Detect if a retriever is a memory retriever based on metadata hints."""
141+
if not metadata:
142+
return False
143+
return bool(
144+
metadata.get("memory_store_name")
145+
or metadata.get("memory_store_id")
146+
or metadata.get("is_memory_retriever")
147+
)
148+
47149
def on_chat_model_start(
48150
self,
49151
serialized: dict[str, Any],
@@ -268,3 +370,96 @@ def on_llm_error(
268370
)
269371
if llm_invocation.span and not llm_invocation.span.is_recording():
270372
self._invocation_manager.delete_invocation_state(run_id=run_id)
373+
374+
def on_retriever_start(
375+
self,
376+
serialized: dict[str, Any],
377+
query: str,
378+
*,
379+
run_id: UUID,
380+
parent_run_id: Optional[UUID] = None,
381+
tags: Optional[list[str]] = None,
382+
metadata: Optional[dict[str, Any]] = None,
383+
**kwargs: Any,
384+
) -> None:
385+
provider = "unknown"
386+
if metadata is not None:
387+
provider = metadata.get("ls_provider", "unknown")
388+
389+
attributes: dict[str, Any] = {}
390+
is_memory = self._is_memory_retriever(metadata)
391+
operation = (
392+
SEARCH_MEMORY_OPERATION if is_memory else RETRIEVAL_OPERATION
393+
)
394+
395+
if store_name := self._resolve_retriever_store_name(
396+
serialized, metadata
397+
):
398+
attributes[GEN_AI_MEMORY_STORE_NAME] = store_name
399+
if store_id := self._resolve_retriever_store_id(serialized, metadata):
400+
attributes[GEN_AI_MEMORY_STORE_ID] = store_id
401+
if query and self._should_capture_memory_query():
402+
attributes[GEN_AI_MEMORY_QUERY] = query
403+
if metadata and metadata.get("memory_namespace"):
404+
attributes[GEN_AI_MEMORY_NAMESPACE] = metadata["memory_namespace"]
405+
406+
llm_invocation = LLMInvocation(
407+
request_model="",
408+
provider=provider,
409+
operation_name=operation,
410+
attributes=attributes,
411+
)
412+
llm_invocation = self._telemetry_handler.start_llm(
413+
invocation=llm_invocation
414+
)
415+
if llm_invocation.span and store_name:
416+
llm_invocation.span.update_name(f"{operation} {store_name}")
417+
self._invocation_manager.add_invocation_state(
418+
run_id=run_id,
419+
parent_run_id=parent_run_id,
420+
invocation=llm_invocation,
421+
)
422+
423+
def on_retriever_end(
424+
self,
425+
documents: Sequence[Any],
426+
*,
427+
run_id: UUID,
428+
parent_run_id: Optional[UUID] = None,
429+
**kwargs: Any,
430+
) -> None:
431+
llm_invocation = self._invocation_manager.get_invocation(run_id=run_id)
432+
if llm_invocation is None or not isinstance(
433+
llm_invocation, LLMInvocation
434+
):
435+
return
436+
437+
llm_invocation.attributes[GEN_AI_MEMORY_SEARCH_RESULT_COUNT] = len(
438+
documents
439+
)
440+
llm_invocation = self._telemetry_handler.stop_llm(
441+
invocation=llm_invocation
442+
)
443+
if llm_invocation.span and not llm_invocation.span.is_recording():
444+
self._invocation_manager.delete_invocation_state(run_id=run_id)
445+
446+
def on_retriever_error(
447+
self,
448+
error: BaseException,
449+
*,
450+
run_id: UUID,
451+
parent_run_id: Optional[UUID] = None,
452+
**kwargs: Any,
453+
) -> None:
454+
llm_invocation = self._invocation_manager.get_invocation(run_id=run_id)
455+
if llm_invocation is None or not isinstance(
456+
llm_invocation, LLMInvocation
457+
):
458+
return
459+
460+
error_otel = Error(message=str(error), type=type(error))
461+
llm_invocation = self._telemetry_handler.fail_llm(
462+
invocation=llm_invocation, error=error_otel
463+
)
464+
if llm_invocation.span and not llm_invocation.span.is_recording():
465+
self._invocation_manager.delete_invocation_state(run_id=run_id)
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
from __future__ import annotations
2+
3+
from unittest import mock
4+
from uuid import uuid4
5+
6+
from opentelemetry.instrumentation.langchain.callback_handler import (
7+
GEN_AI_MEMORY_QUERY,
8+
GEN_AI_MEMORY_SEARCH_RESULT_COUNT,
9+
GEN_AI_MEMORY_STORE_ID,
10+
GEN_AI_MEMORY_STORE_NAME,
11+
RETRIEVAL_OPERATION,
12+
SEARCH_MEMORY_OPERATION,
13+
OpenTelemetryLangChainCallbackHandler,
14+
)
15+
from opentelemetry.util.genai.types import ContentCapturingMode
16+
17+
18+
def _build_handler():
19+
telemetry_handler = mock.Mock()
20+
21+
def _start(invocation):
22+
span = mock.Mock()
23+
span.is_recording.return_value = True
24+
invocation.span = span
25+
return invocation
26+
27+
telemetry_handler.start_llm.side_effect = _start
28+
telemetry_handler.stop_llm.side_effect = lambda invocation: invocation
29+
telemetry_handler.fail_llm.side_effect = (
30+
lambda invocation, error: invocation
31+
)
32+
return (
33+
OpenTelemetryLangChainCallbackHandler(telemetry_handler),
34+
telemetry_handler,
35+
)
36+
37+
38+
def test_retriever_defaults_to_retrieval_without_memory_metadata(monkeypatch):
39+
"""Retrievers without memory metadata should emit 'retrieval' operation."""
40+
handler, telemetry_handler = _build_handler()
41+
monkeypatch.setattr(
42+
"opentelemetry.instrumentation.langchain.callback_handler.is_experimental_mode",
43+
lambda: False,
44+
)
45+
46+
run_id = uuid4()
47+
handler.on_retriever_start(
48+
serialized={"name": "PineconeRetriever"},
49+
query="what is RAG?",
50+
run_id=run_id,
51+
metadata={"ls_provider": "pinecone"},
52+
)
53+
54+
invocation = handler._invocation_manager.get_invocation(run_id)
55+
assert invocation is not None
56+
assert invocation.operation_name == RETRIEVAL_OPERATION
57+
telemetry_handler.start_llm.assert_called_once()
58+
59+
60+
def test_retriever_uses_search_memory_with_memory_metadata(monkeypatch):
61+
"""Retrievers with memory_store_name in metadata should emit 'search_memory'."""
62+
handler, telemetry_handler = _build_handler()
63+
monkeypatch.setattr(
64+
"opentelemetry.instrumentation.langchain.callback_handler.is_experimental_mode",
65+
lambda: True,
66+
)
67+
monkeypatch.setattr(
68+
"opentelemetry.instrumentation.langchain.callback_handler.get_content_capturing_mode",
69+
lambda: ContentCapturingMode.SPAN_ONLY,
70+
)
71+
72+
run_id = uuid4()
73+
handler.on_retriever_start(
74+
serialized={
75+
"name": "SessionMemoryRetriever",
76+
"id": ["langchain", "retriever", "session"],
77+
},
78+
query="user preferences",
79+
run_id=run_id,
80+
metadata={
81+
"ls_provider": "openai",
82+
"memory_store_name": "SessionMemoryRetriever",
83+
"memory_namespace": "user-123",
84+
},
85+
)
86+
87+
invocation = handler._invocation_manager.get_invocation(run_id)
88+
assert invocation is not None
89+
assert invocation.operation_name == SEARCH_MEMORY_OPERATION
90+
assert (
91+
invocation.attributes[GEN_AI_MEMORY_STORE_NAME]
92+
== "SessionMemoryRetriever"
93+
)
94+
assert (
95+
invocation.attributes[GEN_AI_MEMORY_STORE_ID]
96+
== "langchain.retriever.session"
97+
)
98+
assert invocation.attributes[GEN_AI_MEMORY_QUERY] == "user preferences"
99+
telemetry_handler.start_llm.assert_called_once()
100+
101+
102+
def test_on_retriever_end_sets_search_result_count(monkeypatch):
103+
handler, telemetry_handler = _build_handler()
104+
monkeypatch.setattr(
105+
"opentelemetry.instrumentation.langchain.callback_handler.is_experimental_mode",
106+
lambda: False,
107+
)
108+
109+
run_id = uuid4()
110+
handler.on_retriever_start(
111+
serialized={"name": "MemoryRetriever"},
112+
query="q",
113+
run_id=run_id,
114+
metadata={"ls_provider": "openai"},
115+
)
116+
handler.on_retriever_end(documents=[object(), object()], run_id=run_id)
117+
118+
telemetry_handler.stop_llm.assert_called_once()
119+
stop_invocation = telemetry_handler.stop_llm.call_args.kwargs["invocation"]
120+
assert stop_invocation.attributes[GEN_AI_MEMORY_SEARCH_RESULT_COUNT] == 2
121+
122+
123+
def test_on_retriever_error_fails_invocation(monkeypatch):
124+
handler, telemetry_handler = _build_handler()
125+
monkeypatch.setattr(
126+
"opentelemetry.instrumentation.langchain.callback_handler.is_experimental_mode",
127+
lambda: False,
128+
)
129+
130+
run_id = uuid4()
131+
handler.on_retriever_start(
132+
serialized={"name": "VectorRetriever"},
133+
query="q",
134+
run_id=run_id,
135+
metadata={"ls_provider": "openai"},
136+
)
137+
handler.on_retriever_error(RuntimeError("retrieval failed"), run_id=run_id)
138+
139+
telemetry_handler.fail_llm.assert_called_once()
140+
fail_invocation = telemetry_handler.fail_llm.call_args.kwargs["invocation"]
141+
assert fail_invocation.operation_name == RETRIEVAL_OPERATION

0 commit comments

Comments
 (0)