Skip to content

Commit eb996e0

Browse files
nagkumar91Copilot
andcommitted
feat(langchain): enhance span manager and add event emitter
Enhance the span lifecycle manager and add event emission for non-LLM GenAI operations: - span_manager.py: Enhanced with ignored-run walkthrough (re-parenting children of skipped internal runs), per-thread agent stacks for hierarchy tracking, token usage accumulation and propagation to parent agent spans, and LangGraph Command(goto=...) transition support. - event_emitter.py: Emits semantic-convention-aligned log-record events for tool, agent, and retriever spans. All event emission is gated behind the content policy. Uses LogRecord instances linked to the active span context. Part 3 of a series breaking down #4389 into smaller reviewable PRs. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 07ab99d commit eb996e0

5 files changed

Lines changed: 1246 additions & 76 deletions

File tree

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Event emission for non-LLM GenAI operations in LangChain.
16+
17+
Emits semantic-convention-aligned log-record events for tool, agent, and
18+
retriever spans. LLM event emission is handled by the shared
19+
``TelemetryHandler`` and is **not** duplicated here.
20+
21+
All event emission is gated behind the content policy so that events are
22+
only produced when the user opts in via the
23+
``OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT`` /
24+
``OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT`` environment variables.
25+
"""
26+
27+
from __future__ import annotations
28+
29+
from typing import Any, Optional
30+
31+
from opentelemetry._logs import Logger, LoggerProvider, LogRecord, get_logger
32+
from opentelemetry.context import get_current
33+
from opentelemetry.instrumentation.langchain.content_recording import (
34+
get_content_policy,
35+
)
36+
from opentelemetry.instrumentation.langchain.version import __version__
37+
from opentelemetry.semconv.schemas import Schemas
38+
from opentelemetry.trace import Span
39+
from opentelemetry.trace.propagation import set_span_in_context
40+
41+
_REDACTED = "[redacted]"
42+
43+
44+
class EventEmitter:
45+
"""Emits GenAI semantic convention events for LangChain operations.
46+
47+
Events are emitted as ``LogRecord`` instances linked to the active span
48+
context, following the same pattern used by the OpenAI v2 instrumentor
49+
and the shared ``_maybe_emit_llm_event`` helper in ``span_utils``.
50+
"""
51+
52+
def __init__(
53+
self, logger_provider: Optional[LoggerProvider] = None
54+
) -> None:
55+
self._logger: Logger = get_logger(
56+
__name__,
57+
__version__,
58+
logger_provider,
59+
schema_url=Schemas.V1_37_0.value,
60+
)
61+
62+
# ------------------------------------------------------------------
63+
# Tool events
64+
# ------------------------------------------------------------------
65+
66+
def emit_tool_call_event(
67+
self,
68+
span: Span,
69+
tool_name: str,
70+
arguments: Optional[str] = None,
71+
tool_call_id: Optional[str] = None,
72+
) -> None:
73+
"""Emit a ``gen_ai.tool.call`` event when a tool is invoked."""
74+
if not self._should_emit():
75+
return
76+
77+
policy = get_content_policy()
78+
body: dict[str, Any] = {"name": tool_name}
79+
if tool_call_id:
80+
body["id"] = tool_call_id
81+
if arguments is not None:
82+
body["arguments"] = (
83+
arguments if policy.record_content else _REDACTED
84+
)
85+
86+
self._emit(span, "gen_ai.tool.call", body)
87+
88+
def emit_tool_result_event(
89+
self,
90+
span: Span,
91+
tool_name: str,
92+
result: Optional[str] = None,
93+
tool_call_id: Optional[str] = None,
94+
) -> None:
95+
"""Emit a ``gen_ai.tool.result`` event when a tool returns."""
96+
if not self._should_emit():
97+
return
98+
99+
policy = get_content_policy()
100+
body: dict[str, Any] = {"name": tool_name}
101+
if tool_call_id:
102+
body["id"] = tool_call_id
103+
if result is not None:
104+
body["result"] = result if policy.record_content else _REDACTED
105+
106+
self._emit(span, "gen_ai.tool.result", body)
107+
108+
# ------------------------------------------------------------------
109+
# Agent events
110+
# ------------------------------------------------------------------
111+
112+
def emit_agent_start_event(
113+
self,
114+
span: Span,
115+
agent_name: str,
116+
input_messages: Optional[str] = None,
117+
) -> None:
118+
"""Emit a ``gen_ai.agent.start`` event when an agent begins."""
119+
if not self._should_emit():
120+
return
121+
122+
policy = get_content_policy()
123+
body: dict[str, Any] = {"name": agent_name}
124+
if input_messages is not None:
125+
body["input"] = (
126+
input_messages if policy.record_content else _REDACTED
127+
)
128+
129+
self._emit(span, "gen_ai.agent.start", body)
130+
131+
def emit_agent_end_event(
132+
self,
133+
span: Span,
134+
agent_name: str,
135+
output_messages: Optional[str] = None,
136+
) -> None:
137+
"""Emit a ``gen_ai.agent.end`` event when an agent completes."""
138+
if not self._should_emit():
139+
return
140+
141+
policy = get_content_policy()
142+
body: dict[str, Any] = {"name": agent_name}
143+
if output_messages is not None:
144+
body["output"] = (
145+
output_messages if policy.record_content else _REDACTED
146+
)
147+
148+
self._emit(span, "gen_ai.agent.end", body)
149+
150+
# ------------------------------------------------------------------
151+
# Retriever events
152+
# ------------------------------------------------------------------
153+
154+
def emit_retriever_query_event(
155+
self,
156+
span: Span,
157+
retriever_name: str,
158+
query: Optional[str] = None,
159+
) -> None:
160+
"""Emit a ``gen_ai.retriever.query`` event for a retriever query."""
161+
if not self._should_emit():
162+
return
163+
164+
policy = get_content_policy()
165+
body: dict[str, Any] = {"name": retriever_name}
166+
if query is not None:
167+
body["query"] = query if policy.record_content else _REDACTED
168+
169+
self._emit(span, "gen_ai.retriever.query", body)
170+
171+
def emit_retriever_result_event(
172+
self,
173+
span: Span,
174+
retriever_name: str,
175+
documents: Optional[str] = None,
176+
) -> None:
177+
"""Emit a ``gen_ai.retriever.result`` event with retrieved docs."""
178+
if not self._should_emit():
179+
return
180+
181+
policy = get_content_policy()
182+
body: dict[str, Any] = {"name": retriever_name}
183+
if documents is not None:
184+
body["documents"] = (
185+
documents if policy.record_content else _REDACTED
186+
)
187+
188+
self._emit(span, "gen_ai.retriever.result", body)
189+
190+
# ------------------------------------------------------------------
191+
# Internal helpers
192+
# ------------------------------------------------------------------
193+
194+
@staticmethod
195+
def _should_emit() -> bool:
196+
"""Check whether event emission is enabled via content policy."""
197+
return get_content_policy().should_emit_events
198+
199+
def _emit(self, span: Span, event_name: str, body: dict[str, Any]) -> None:
200+
"""Create a ``LogRecord`` linked to *span* and emit it."""
201+
context = set_span_in_context(span, get_current())
202+
self._logger.emit(
203+
LogRecord(
204+
event_name=event_name,
205+
body=body,
206+
context=context,
207+
)
208+
)

0 commit comments

Comments
 (0)