-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathmain.py
More file actions
180 lines (132 loc) · 5.49 KB
/
main.py
File metadata and controls
180 lines (132 loc) · 5.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
"""LangChain agent with live streaming to agentevals.
This example demonstrates streaming traces and logs from a LangChain agent
to the agentevals dev server for real-time evaluation and visualization.
Key integration points:
1. OpenTelemetry GenAI instrumentation captures LLM calls
2. Spans (metadata) and logs (message content) are streamed via WebSocket
3. Real-time UI shows conversation, tool calls, and token usage
Prerequisites:
1. Install dependencies:
$ pip install -r requirements.txt
2. Start agentevals dev server:
$ agentevals serve --dev --port 8001
3. (Optional) Start UI for visualization:
$ cd ui && npm run dev
4. Set OpenAI API key:
$ export OPENAI_API_KEY="your-key-here"
Usage:
$ python examples/langchain_agent/main.py
The example will run 3 test queries and stream all trace data to the dev server.
View live results at http://localhost:5173
"""
import asyncio
import os
import threading
from agent import create_dice_agent
from dotenv import load_dotenv
from langchain_core.messages import HumanMessage, ToolMessage
from opentelemetry import trace
from opentelemetry._logs import set_logger_provider
from opentelemetry.instrumentation.openai_v2 import OpenAIInstrumentor
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk.trace import TracerProvider
from agentevals.streaming.processor import (
AgentEvalsLogStreamingProcessor,
AgentEvalsStreamingProcessor,
)
load_dotenv(override=True)
def setup_otel_streaming(ws_url: str, session_id: str, eval_set_id: str | None = None):
"""Configure OpenTelemetry for streaming traces and logs to agentevals.
Critical configuration:
1. Set OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT=true to capture
message content in logs (required for conversation display)
2. Create both TracerProvider (spans) and LoggerProvider (logs)
3. Add streaming processors for both spans and logs
4. Instrument OpenAI SDK AFTER importing LangChain
Args:
ws_url: WebSocket URL of agentevals dev server
session_id: Unique session identifier
eval_set_id: Optional evaluation set ID for matching
Returns:
tuple: (tracer_provider, logger_provider, processor, event_loop)
"""
os.environ["OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"] = "true"
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)
logger_provider = LoggerProvider()
set_logger_provider(logger_provider)
processor = AgentEvalsStreamingProcessor(
ws_url=ws_url,
session_id=session_id,
trace_id=os.urandom(16).hex(),
)
loop = asyncio.new_event_loop()
def run_loop_in_background():
asyncio.set_event_loop(loop)
loop.run_forever()
thread = threading.Thread(target=run_loop_in_background, daemon=True)
thread.start()
future = asyncio.run_coroutine_threadsafe(processor.connect(eval_set_id=eval_set_id), loop)
future.result()
tracer_provider.add_span_processor(processor)
log_processor = AgentEvalsLogStreamingProcessor(processor)
logger_provider.add_log_record_processor(log_processor)
OpenAIInstrumentor().instrument()
return tracer_provider, logger_provider, processor, loop
def main():
if not os.getenv("OPENAI_API_KEY"):
print("⚠️ OPENAI_API_KEY not set. Set it with:")
print(" export OPENAI_API_KEY='your-key-here'")
return
session_id = f"langchain-session-{os.urandom(4).hex()}"
print("Setting up OpenTelemetry streaming...")
tracer_provider, logger_provider, processor, loop = setup_otel_streaming(
ws_url="ws://localhost:8001/ws/traces",
session_id=session_id,
eval_set_id="langchain_agent_eval",
)
print("✓ Connected to agentevals dev server")
print(f" Session: {session_id}")
print(" View live: http://localhost:5173")
print()
print("🎲 LangChain Dice Agent - Live Dev Mode")
print("=" * 50)
print()
llm_with_tools, tools = create_dice_agent()
test_queries = [
"Hi! Can you help me?",
"Roll a 20-sided die for me",
"Is the number you rolled prime?",
]
messages = []
for i, query in enumerate(test_queries, 1):
print(f"\n[{i}/{len(test_queries)}] User: {query}")
messages.append(HumanMessage(content=query))
max_iterations = 5
for iteration in range(max_iterations):
response = llm_with_tools.invoke(messages)
messages.append(response)
if not response.tool_calls:
agent_response = response.content
print(f" Agent: {agent_response}")
break
for tool_call in response.tool_calls:
tool_name = tool_call["name"]
tool_args = tool_call["args"]
selected_tool = {t.name: t for t in tools}.get(tool_name)
if selected_tool:
tool_result = selected_tool.invoke(tool_args)
messages.append(ToolMessage(content=str(tool_result), tool_call_id=tool_call["id"]))
else:
print(" Agent: [Max iterations reached]")
print()
print("✓ Agent execution complete")
print()
tracer_provider.force_flush()
logger_provider.force_flush()
print("✓ All traces and logs flushed to server")
future = asyncio.run_coroutine_threadsafe(processor.shutdown_async(), loop)
future.result()
print("✓ Session ended")
if __name__ == "__main__":
main()