Skip to content

Commit 3067e5f

Browse files
committed
input output messages and updated example
1 parent 79f6814 commit 3067e5f

6 files changed

Lines changed: 603 additions & 42 deletions

File tree

instrumentation-genai/opentelemetry-instrumentation-langchain/examples/agent/main.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@
1010

1111
from uuid import uuid4
1212

13-
from langchain.agents import create_agent
1413
from langchain_core.messages import HumanMessage
1514
from langchain_core.tools import tool
1615
from langchain_openai import ChatOpenAI
16+
from langgraph.prebuilt import create_react_agent
1717

1818
from opentelemetry import _logs, metrics, trace
1919
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
@@ -80,9 +80,7 @@ def main():
8080
)
8181

8282
session_id = str(uuid4())
83-
agent = create_agent(
84-
llm, tools=[multiply, add], name="coordinator"
85-
).with_config(
83+
agent = create_react_agent(llm, tools=[multiply, add]).with_config(
8684
{
8785
"metadata": {
8886
"agent_name": "coordinator",

instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/main.py

Lines changed: 65 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,17 @@
22
# SPDX-License-Identifier: Apache-2.0
33

44
"""
5-
LangGraph StateGraph example with an LLM node.
5+
LangGraph StateGraph example with two LLM nodes.
66
7-
Similar to the manual example (../manual/main.py) but uses LangGraph's StateGraph
8-
with a node that calls ChatOpenAI. OpenTelemetry LangChain instrumentation traces
9-
the LLM calls made from within the graph node.
7+
Graph topology:
8+
9+
START → researcher → summariser → END
10+
11+
Steps:
12+
1. *researcher* – gathers factual background on the user's question.
13+
2. *summariser* – condenses the researcher's output into a concise answer.
14+
15+
OpenTelemetry LangChain instrumentation traces both LLM calls.
1016
"""
1117

1218
from typing import Annotated
@@ -37,8 +43,9 @@
3743

3844
# Configure tracing
3945
trace.set_tracer_provider(TracerProvider())
40-
span_processor = BatchSpanProcessor(OTLPSpanExporter())
41-
trace.get_tracer_provider().add_span_processor(span_processor)
46+
trace.get_tracer_provider().add_span_processor(
47+
BatchSpanProcessor(OTLPSpanExporter())
48+
)
4249

4350
# Configure logging
4451
_logs.set_logger_provider(LoggerProvider())
@@ -49,66 +56,87 @@
4956
# Configure metrics
5057
metrics.set_meter_provider(
5158
MeterProvider(
52-
metric_readers=[
53-
PeriodicExportingMetricReader(
54-
OTLPMetricExporter(),
55-
),
56-
]
59+
metric_readers=[PeriodicExportingMetricReader(OTLPMetricExporter())]
5760
)
5861
)
5962

6063

6164
class GraphState(TypedDict):
62-
"""State for the graph; messages are accumulated with add_messages."""
65+
"""State shared across all graph nodes."""
6366

6467
messages: Annotated[list, add_messages]
68+
research: str
6569

6670

6771
def build_graph(llm: ChatOpenAI):
68-
"""Build a StateGraph with a single LLM node."""
69-
70-
def llm_node(state: GraphState) -> dict:
71-
"""Node that invokes the LLM with the current messages."""
72-
response = llm.invoke(state["messages"])
72+
"""Build a StateGraph with a researcher node and a summariser node."""
73+
74+
def researcher(state: GraphState) -> dict:
75+
"""Gather factual background on the last user message."""
76+
response = llm.invoke(
77+
[
78+
SystemMessage(
79+
content="You are a research assistant. Provide 2-3 factual sentences."
80+
),
81+
HumanMessage(content=state["messages"][-1].content),
82+
]
83+
)
84+
return {
85+
"research": response.content,
86+
"messages": [response],
87+
}
88+
89+
def summariser(state: GraphState) -> dict:
90+
"""Condense the researcher's output into one concise sentence."""
91+
response = llm.invoke(
92+
[
93+
SystemMessage(
94+
content="You are an expert summariser. Condense the text below into one clear sentence."
95+
),
96+
HumanMessage(content=state["research"]),
97+
]
98+
)
7399
return {"messages": [response]}
74100

75101
builder = StateGraph(GraphState)
76-
builder.add_node("llm", llm_node)
77-
builder.add_edge(START, "llm")
78-
builder.add_edge("llm", END)
102+
builder.add_node("researcher", researcher)
103+
builder.add_node("summariser", summariser)
104+
105+
builder.add_edge(START, "researcher")
106+
builder.add_edge("researcher", "summariser")
107+
builder.add_edge("summariser", END)
108+
79109
return builder.compile()
80110

81111

82112
def main():
83-
# Set up instrumentation (traces LLM calls from within graph nodes)
84113
LangChainInstrumentor().instrument()
85114

86-
# ChatOpenAI setup
87115
llm = ChatOpenAI(
88116
model="gpt-3.5-turbo",
89117
temperature=0.1,
90-
max_tokens=100,
91-
top_p=0.9,
92-
frequency_penalty=0.5,
93-
presence_penalty=0.5,
94-
stop=["\n", "Human:", "AI:"],
95-
seed=100,
118+
max_tokens=200,
119+
seed=42,
96120
)
97121

98122
graph = build_graph(llm)
99123

100-
initial_messages = [
101-
SystemMessage(content="You are a helpful assistant!"),
102-
HumanMessage(content="What is the capital of France?"),
103-
]
124+
question = "What is the capital of France?"
125+
print(f"Question: {question}\n")
126+
127+
result = graph.invoke(
128+
{
129+
"messages": [HumanMessage(content=question)],
130+
"research": "",
131+
}
132+
)
104133

105-
result = graph.invoke({"messages": initial_messages})
134+
print("Research output:")
135+
print(f" {result['research']}\n")
106136

107-
print("LangGraph output (messages):")
108-
for msg in result.get("messages", []):
109-
print(f" {type(msg).__name__}: {msg.content}")
137+
print("Final summary:")
138+
print(f" {result['messages'][-1].content}")
110139

111-
# Un-instrument after use
112140
LangChainInstrumentor().uninstrument()
113141

114142

instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ classifiers = [
2626
]
2727
dependencies = [
2828
"opentelemetry-instrumentation ~= 0.60b0",
29-
"opentelemetry-util-genai >= 0.4b0.dev",
29+
"opentelemetry-util-genai >= 0.4b0",
3030
]
3131

3232
[project.optional-dependencies]

instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
classify_chain_run,
1919
resolve_agent_name,
2020
)
21+
from opentelemetry.instrumentation.langchain.utils import (
22+
make_input_message,
23+
make_last_output_message,
24+
)
2125
from opentelemetry.util.genai.handler import TelemetryHandler
2226
from opentelemetry.util.genai.invocation import (
2327
AgentInvocation,
@@ -65,6 +69,7 @@ def on_chain_start(
6569
workflow = self._telemetry_handler.start_workflow(
6670
name=workflow_name_override or workflow_name
6771
)
72+
workflow.input_messages = make_input_message(inputs)
6873
self._invocation_manager.add_invocation_state(
6974
run_id, parent_run_id, workflow
7075
)
@@ -92,6 +97,7 @@ def on_chain_start(
9297
else "unknown",
9398
)
9499
agent.agent_name = suggested_agent_name
100+
agent.input_messages = make_input_message(inputs)
95101

96102
if metadata:
97103
agent.agent_id = metadata.get("agent_id")
@@ -146,6 +152,9 @@ def on_chain_end(
146152
self._invocation_manager.delete_invocation_state(run_id)
147153
return
148154

155+
if isinstance(invocation, (WorkflowInvocation, AgentInvocation)):
156+
invocation.output_messages = make_last_output_message(outputs)
157+
149158
invocation.stop()
150159

151160
if not invocation.span.is_recording():
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Copyright The OpenTelemetry Authors
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
import json
5+
from typing import Any, Optional
6+
7+
from langchain_core.messages import AIMessage
8+
9+
from opentelemetry.util.genai.types import (
10+
InputMessage,
11+
OutputMessage,
12+
Text,
13+
)
14+
15+
16+
def make_input_message(data: Any) -> list[InputMessage]:
17+
"""Create structured input message with full data as JSON."""
18+
if not isinstance(data, dict):
19+
return []
20+
input_messages: list[InputMessage] = []
21+
messages = data.get("messages")
22+
if messages is not None:
23+
for msg in messages:
24+
content = getattr(msg, "content", "")
25+
if content:
26+
input_message = InputMessage(
27+
role="user", parts=[Text(content)]
28+
)
29+
input_messages.append(input_message)
30+
return input_messages
31+
# Fallback: serialize non-message state fields as input.
32+
# Common in LangGraph where nodes use structured state fields
33+
# (e.g., user_query) rather than a message list.
34+
exclude_keys = {"messages", "intermediate_steps"}
35+
input_data = {
36+
k: v
37+
for k, v in data.items()
38+
if k not in exclude_keys and v is not None
39+
}
40+
if input_data:
41+
serialized = serialize(input_data)
42+
if serialized:
43+
return [InputMessage(role="user", parts=[Text(serialized)])]
44+
return input_messages
45+
46+
47+
def make_output_message(data: dict[str, Any]) -> list[OutputMessage]:
48+
"""Create structured output message with full data as JSON."""
49+
if not isinstance(data, dict):
50+
return []
51+
output_messages: list[OutputMessage] = []
52+
messages = data.get("messages")
53+
if messages is None:
54+
return []
55+
for msg in messages:
56+
content = getattr(msg, "content", "")
57+
if content:
58+
if isinstance(msg, AIMessage):
59+
output_message = OutputMessage(
60+
role="assistant",
61+
parts=[Text(msg.content)],
62+
finish_reason="stop",
63+
)
64+
output_messages.append(output_message)
65+
return output_messages
66+
67+
68+
def make_last_output_message(data: dict[str, Any]) -> list[OutputMessage]:
69+
"""Extract only the last AI message as the output.
70+
71+
For Workflow and AgentInvocation spans, the final AI message best represents
72+
the actual output. Intermediate AI messages (e.g., tool-call decisions) are
73+
already captured in child LLM invocation spans.
74+
"""
75+
all_messages = make_output_message(data)
76+
if all_messages:
77+
return [all_messages[-1]]
78+
return []
79+
80+
81+
def serialize(obj: Any) -> Optional[str]:
82+
"""Serialize object to JSON string.
83+
84+
Uses default=str to handle non-JSON-serializable objects (like LangChain
85+
message objects) by converting them to their string representation while
86+
keeping the overall structure as valid JSON.
87+
"""
88+
if obj is None:
89+
return None
90+
try:
91+
return json.dumps(obj, ensure_ascii=False, default=str)
92+
except (TypeError, ValueError):
93+
return None

0 commit comments

Comments
 (0)