Skip to content

Commit 06e438a

Browse files
committed
add invoke agent
1 parent 86d3a64 commit 06e438a

6 files changed

Lines changed: 244 additions & 630 deletions

File tree

instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py

Lines changed: 85 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,14 @@
2424
from opentelemetry.instrumentation.langchain.invocation_manager import (
2525
_InvocationManager,
2626
)
27+
from opentelemetry.instrumentation.langchain.operation_mapping import (
28+
OperationName,
29+
classify_chain_run,
30+
resolve_agent_name,
31+
)
2732
from opentelemetry.util.genai.handler import TelemetryHandler
2833
from opentelemetry.util.genai.invocation import (
34+
AgentInvocation,
2935
InferenceInvocation,
3036
WorkflowInvocation,
3137
)
@@ -35,11 +41,6 @@
3541
OutputMessage,
3642
Text,
3743
)
38-
from opentelemetry.instrumentation.langchain.operation_mapping import (
39-
OperationName,
40-
classify_chain_run,
41-
resolve_agent_name,
42-
)
4344

4445

4546
class OpenTelemetryLangChainCallbackHandler(BaseCallbackHandler):
@@ -72,21 +73,73 @@ def on_chain_start(
7273
workflow_name_override = (
7374
metadata.get("workflow_name") if metadata else None
7475
)
75-
wf = self._telemetry_handler.start_workflow(
76+
workflow = self._telemetry_handler.start_workflow(
7677
name=workflow_name_override or workflow_name
7778
)
78-
self._invocation_manager.add_invocation_state(run_id, None, wf)
79+
self._invocation_manager.add_invocation_state(
80+
run_id, parent_run_id, workflow
81+
)
7982
elif operation == OperationName.INVOKE_AGENT:
80-
agent_name = resolve_agent_name(serialized, metadata, kwargs)
81-
82-
if metadata:
83-
agent_id = metadata.get("agent_id")
84-
agent_desc = metadata.get("agent_description")
83+
# agent name passed by the user
84+
suggested_agent_name = resolve_agent_name(
85+
serialized, metadata, kwargs
86+
)
87+
# find if there is an agent already
88+
agent_invocation = self._find_nearest_agent(parent_run_id)
89+
agent_invocation_name = (
90+
agent_invocation.agent_name if agent_invocation else None
91+
)
92+
if suggested_agent_name:
93+
suggested_agent_name_lower = suggested_agent_name.lower()
94+
agent_invocation_name_lower = (
95+
agent_invocation_name.lower()
96+
if agent_invocation_name
97+
else None
98+
)
99+
if suggested_agent_name_lower != agent_invocation_name_lower:
100+
agent = self._telemetry_handler.start_invoke_local_agent(
101+
provider=metadata.get("ls_provider", "unknown")
102+
if metadata
103+
else "unknown",
104+
)
105+
agent.agent_name = suggested_agent_name
85106

86-
for key in ("thread_id", "session_id", "conversation_id"):
87-
conv_id = metadata.get(key)
107+
if metadata:
108+
agent.agent_id = metadata.get("agent_id")
109+
agent.agent_description = metadata.get(
110+
"agent_description"
111+
)
88112

89-
provider = metadata.get("ls_provider", "unknown")
113+
for key in (
114+
"thread_id",
115+
"session_id",
116+
"conversation_id",
117+
):
118+
conv_id = metadata.get(key)
119+
if conv_id:
120+
agent.conversation_id = conv_id
121+
break
122+
123+
self._invocation_manager.add_invocation_state(
124+
run_id, parent_run_id, agent
125+
)
126+
else:
127+
# We create invoke_agent span for the initial chain for agent. All follow-up chains invoked for agent invocation will not create agent span.
128+
self._invocation_manager.add_invocation_state(
129+
run_id, parent_run_id, None
130+
)
131+
else:
132+
# No agent name could be resolved; still register the run_id so that
133+
# parent-child traversal (e.g. _find_nearest_agent) is not broken for
134+
# any children of this node.
135+
self._invocation_manager.add_invocation_state(
136+
run_id, parent_run_id, None
137+
)
138+
else:
139+
# For unclassified chains, we still want to track them in the invocation manager to maintain the parent-child relationships, even though we won't create spans for them.
140+
self._invocation_manager.add_invocation_state(
141+
run_id, parent_run_id, None
142+
)
90143

91144
def on_chain_end(
92145
self,
@@ -98,9 +151,10 @@ def on_chain_end(
98151
) -> Any:
99152
invocation = self._invocation_manager.get_invocation(run_id=run_id)
100153
if invocation is None or not isinstance(
101-
invocation, WorkflowInvocation
154+
invocation, (WorkflowInvocation, AgentInvocation)
102155
):
103156
# If the invocation does not exist, we cannot set attributes or end it
157+
self._invocation_manager.delete_invocation_state(run_id)
104158
return
105159

106160
invocation.stop()
@@ -118,9 +172,10 @@ def on_chain_error(
118172
) -> Any:
119173
invocation = self._invocation_manager.get_invocation(run_id=run_id)
120174
if invocation is None or not isinstance(
121-
invocation, WorkflowInvocation
175+
invocation, (WorkflowInvocation, AgentInvocation)
122176
):
123177
# If the invocation does not exist, we cannot set attributes or end it
178+
self._invocation_manager.delete_invocation_state(run_id)
124179
return
125180

126181
invocation.fail(error)
@@ -349,3 +404,16 @@ def on_llm_error(
349404
llm_invocation.fail(error)
350405
if not llm_invocation.span.is_recording():
351406
self._invocation_manager.delete_invocation_state(run_id=run_id)
407+
408+
def _find_nearest_agent(
409+
self, run_id: Optional[UUID]
410+
) -> Optional[AgentInvocation]:
411+
current = run_id
412+
visited = set()
413+
while current is not None and current not in visited:
414+
visited.add(current)
415+
entity = self._invocation_manager.get_invocation(current)
416+
if isinstance(entity, AgentInvocation):
417+
return entity
418+
current = self._invocation_manager.get_parent_run_id(current)
419+
return None

instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
class _InvocationState:
2626
invocation: GenAIInvocation
2727
children: List[UUID] = field(default_factory=lambda: list())
28+
parent_run_id: Optional[UUID] = None
29+
ended: bool = False
2830

2931

3032
class _InvocationManager:
@@ -39,23 +41,45 @@ def add_invocation_state(
3941
self,
4042
run_id: UUID,
4143
parent_run_id: Optional[UUID],
42-
invocation: GenAIInvocation,
44+
invocation: Optional[GenAIInvocation],
4345
) -> None:
4446
invocation_state = _InvocationState(invocation=invocation)
45-
self._invocations[run_id] = invocation_state
4647

4748
if parent_run_id is not None and parent_run_id in self._invocations:
49+
invocation_state.parent_run_id = parent_run_id
50+
4851
parent_invocation_state = self._invocations[parent_run_id]
4952
parent_invocation_state.children.append(run_id)
5053

54+
self._invocations[run_id] = invocation_state
55+
5156
def get_invocation(self, run_id: UUID) -> Optional[GenAIInvocation]:
5257
invocation_state = self._invocations.get(run_id)
5358
return invocation_state.invocation if invocation_state else None
5459

60+
def get_parent_run_id(self, run_id: UUID) -> Optional[UUID]:
61+
invocation_state = self._invocations.get(run_id)
62+
return invocation_state.parent_run_id if invocation_state else None
63+
5564
def delete_invocation_state(self, run_id: UUID) -> None:
5665
invocation_state = self._invocations.get(run_id)
5766
if not invocation_state:
5867
return
59-
for child_id in list(invocation_state.children):
60-
self._invocations.pop(child_id, None)
68+
69+
invocation_state.ended = True
70+
71+
# Defer removal if any children are still live, so upward traversal
72+
# (e.g. _find_nearest_agent) can still walk through this node.
73+
if any(c in self._invocations for c in invocation_state.children):
74+
return
75+
6176
self._invocations.pop(run_id, None)
77+
78+
# Propagate cleanup upward: if the parent has already ended and has no
79+
# more live children, it can now be removed too.
80+
if invocation_state.parent_run_id:
81+
parent_state = self._invocations.get(
82+
invocation_state.parent_run_id
83+
)
84+
if parent_state is not None and parent_state.ended:
85+
self.delete_invocation_state(invocation_state.parent_run_id)

instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/operation_mapping.py

Lines changed: 13 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,7 @@
4747
class OperationName:
4848
"""Canonical GenAI semantic convention operation names."""
4949

50-
CHAT: str = GenAI.GenAiOperationNameValues.CHAT.value
51-
TEXT_COMPLETION: str = GenAI.GenAiOperationNameValues.TEXT_COMPLETION.value
5250
INVOKE_AGENT: str = GenAI.GenAiOperationNameValues.INVOKE_AGENT.value
53-
EXECUTE_TOOL: str = GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value
5451
# invoke_workflow is not yet in the semconv enum; use the expected
5552
# string value so the mapping is forward-compatible.
5653
INVOKE_WORKFLOW: str = "invoke_workflow"
@@ -101,7 +98,7 @@ def resolve_agent_name(
10198
if name:
10299
return str(name)
103100

104-
name = serialized.get("name")
101+
name = serialized.get("name") if serialized else None
105102
if name:
106103
return str(name)
107104

@@ -124,30 +121,6 @@ def _has_agent_signals(metadata: Optional[dict[str, Any]]) -> bool:
124121
)
125122

126123

127-
def _is_langgraph_agent_node(
128-
serialized: dict[str, Any],
129-
metadata: Optional[dict[str, Any]],
130-
kwargs: dict[str, Any],
131-
) -> bool:
132-
"""Detect a LangGraph agent node that is not a start/middleware node."""
133-
if not metadata:
134-
return False
135-
136-
node = metadata.get(LANGGRAPH_NODE_KEY)
137-
if not node:
138-
return False
139-
140-
# Exclude start and middleware nodes.
141-
if node == LANGGRAPH_START_NODE:
142-
return False
143-
144-
name = resolve_agent_name(serialized, metadata, kwargs)
145-
if name and name.startswith(MIDDLEWARE_PREFIX):
146-
return False
147-
148-
return True
149-
150-
151124
def _looks_like_workflow(
152125
serialized: dict[str, Any],
153126
metadata: Optional[dict[str, Any]],
@@ -162,13 +135,16 @@ def _looks_like_workflow(
162135
return True
163136

164137
# Heuristic: check for LangGraph identifier in the serialized repr.
165-
name = serialized.get("name", "")
166-
graph_id = (
167-
serialized.get("graph", {}).get("id", "")
168-
if isinstance(serialized.get("graph"), dict)
169-
else ""
170-
)
171-
return LANGGRAPH_IDENTIFIER in name or LANGGRAPH_IDENTIFIER in graph_id
138+
if serialized:
139+
name = serialized.get("name", "")
140+
graph_id = (
141+
serialized.get("graph", {}).get("id", "")
142+
if isinstance(serialized.get("graph"), dict)
143+
else ""
144+
)
145+
return LANGGRAPH_IDENTIFIER in name or LANGGRAPH_IDENTIFIER in graph_id
146+
147+
return True
172148

173149

174150
# ---------------------------------------------------------------------------
@@ -179,7 +155,6 @@ def _looks_like_workflow(
179155
def should_ignore_chain(
180156
metadata: Optional[dict[str, Any]],
181157
agent_name: Optional[str],
182-
parent_run_id: Optional[UUID],
183158
kwargs: dict[str, Any],
184159
) -> bool:
185160
"""Return True if the chain callback should be silently suppressed.
@@ -238,19 +213,16 @@ def classify_chain_run(
238213
agent_name = resolve_agent_name(serialized, metadata, kwargs)
239214

240215
# 1. Suppress known noise.
241-
if should_ignore_chain(metadata, agent_name, parent_run_id, kwargs):
216+
if should_ignore_chain(metadata, agent_name, kwargs):
242217
return None
243218

244219
# 2. Agent detection.
245220
if _has_agent_signals(metadata):
246221
return OperationName.INVOKE_AGENT
247222

248-
if _is_langgraph_agent_node(serialized, metadata, kwargs):
249-
return OperationName.INVOKE_AGENT
250-
251223
# 3. Workflow / orchestration detection.
252224
if _looks_like_workflow(serialized, metadata, parent_run_id):
253225
return OperationName.INVOKE_WORKFLOW
254226

255227
# 4. Default: suppress unclassified chains.
256-
return None
228+
return None

instrumentation-genai/opentelemetry-instrumentation-langchain/tests/conftest.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,10 @@ def deserialize(cassette_string):
208208
return yaml.load(cassette_string, Loader=yaml.Loader)
209209

210210

211-
@pytest.fixture(scope="module", autouse=True)
211+
@pytest.fixture(scope="function", autouse=True)
212212
def fixture_vcr(vcr):
213-
vcr.register_serializer("yaml", PrettyPrintJSONBody)
213+
if vcr is not None:
214+
vcr.register_serializer("yaml", PrettyPrintJSONBody)
214215
return vcr
215216

216217

0 commit comments

Comments
 (0)