Skip to content

Commit ab0cbf3

Browse files
committed
add test
1 parent 6e0695b commit ab0cbf3

File tree

1 file changed

+198
-0
lines changed

1 file changed

+198
-0
lines changed
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
# Test manual graph instrumentation using Python SDK.
2+
3+
import time
4+
5+
import pytest
6+
7+
from langfuse import Langfuse, observe
8+
from tests.api_wrapper import LangfuseAPI
9+
from tests.utils import create_uuid, get_api
10+
11+
12+
def test_observe_type_agent_instrumentation():
13+
"""Test @observe(type='AGENT') with the type-based approach.
14+
"""
15+
langfuse = Langfuse()
16+
api = get_api()
17+
18+
trace_name = f"type_based_graph_test_{create_uuid()}"
19+
20+
@observe(type="GENERATION")
21+
def start_agent():
22+
print("🔍 Executing start_agent function")
23+
time.sleep(0.1)
24+
return {"status": "started", "data": "initial_data"}
25+
26+
@observe(type="RETRIEVER")
27+
def process_agent():
28+
print("🔍 Executing process_agent function")
29+
time.sleep(0.1)
30+
return {"status": "processed", "data": "processed_data"}
31+
32+
@observe(type="TOOL")
33+
def tool_call():
34+
print("🔍 Executing tool_call function")
35+
time.sleep(0.1)
36+
return {"status": "intermediate", "data": "intermediate_data"}
37+
38+
@observe(type="GENERATION")
39+
def end_agent():
40+
print("🔍 Executing end_agent function")
41+
time.sleep(0.1)
42+
return {"status": "completed", "data": "final_data"}
43+
44+
# Run the workflow within a trace context
45+
with langfuse.start_as_current_span(
46+
name="agent_workflow", as_type="AGENT"
47+
) as root_span:
48+
langfuse.update_current_trace(name=trace_name)
49+
50+
start_result = start_agent()
51+
process_result = process_agent()
52+
tool_result = tool_call()
53+
end_result = end_agent()
54+
55+
workflow_result = {
56+
"start": start_result,
57+
"process": process_result,
58+
"tool": tool_result,
59+
"end": end_result,
60+
}
61+
62+
langfuse.flush()
63+
time.sleep(0.5)
64+
65+
traces = api.trace.list(limit=50)
66+
test_trace = None
67+
for i, trace_data in enumerate(traces.data):
68+
if trace_data.name == trace_name:
69+
test_trace = trace_data
70+
break
71+
72+
assert test_trace is not None, f"Could not find trace with name {trace_name}"
73+
74+
# Get the trace details including observations
75+
trace_details = api.trace.get(test_trace.id)
76+
all_observations = trace_details.observations
77+
78+
agent_observations = [
79+
obs
80+
for obs in all_observations
81+
if obs.type in ["AGENT", "TOOL", "RETRIEVER", "CHAIN", "EMBEDDING"]
82+
]
83+
84+
assert (
85+
len(agent_observations) == 4
86+
), f"Expected 4 observations, got {len(agent_observations)} out of {len(all_observations)} total observations"
87+
88+
# for agent_obs in agent_observations:
89+
# print(
90+
# f"{agent_obs.name} ({agent_obs.type}): {agent_obs.start_time} - {agent_obs.end_time}"
91+
# )
92+
93+
def test_observe_parallel_tool_execution():
94+
"""Test parallel tool execution where an agent starts multiple tools simultaneously.
95+
96+
Creates a graph structure:
97+
start_agent -> [tool_1, tool_2, tool_3] -> end_agent
98+
"""
99+
100+
langfuse = Langfuse()
101+
api = get_api()
102+
103+
trace_name = f"parallel_tools_test_{create_uuid()}"
104+
105+
@observe(type="AGENT")
106+
def start_agent():
107+
time.sleep(0.05)
108+
return {"status": "tools_initiated", "tool_count": 3}
109+
110+
@observe(type="TOOL")
111+
def search_tool():
112+
time.sleep(0.2)
113+
return {"tool": "search", "results": ["result1", "result2"]}
114+
115+
@observe(type="TOOL")
116+
def calculation_tool():
117+
time.sleep(0.15)
118+
return {"tool": "calc", "result": 42}
119+
120+
@observe(type="TOOL")
121+
def api_tool():
122+
time.sleep(0.1)
123+
return {"tool": "api", "data": {"status": "success"}}
124+
125+
@observe(type="AGENT")
126+
def end_agent():
127+
time.sleep(0.05)
128+
return {"status": "completed", "summary": "all_tools_processed"}
129+
130+
# Execute the parallel workflow
131+
with langfuse.start_as_current_span(
132+
name="parallel_workflow", as_type="SPAN"
133+
) as root_span:
134+
langfuse.update_current_trace(name=trace_name)
135+
start_result = start_agent()
136+
137+
# execute tools in parallel - but keep them in the same trace context
138+
# we simulate parallel execution with staggered starts
139+
140+
search_result = search_tool()
141+
time.sleep(0.01)
142+
calc_result = calculation_tool()
143+
time.sleep(0.01)
144+
api_result = api_tool()
145+
146+
tool_results = {
147+
"search": search_result,
148+
"calculation": calc_result,
149+
"api": api_result,
150+
}
151+
152+
end_result = end_agent()
153+
154+
workflow_result = {
155+
"start": start_result,
156+
"tools": tool_results,
157+
"end": end_result,
158+
}
159+
160+
langfuse.flush()
161+
time.sleep(0.5)
162+
163+
traces = api.trace.list(limit=50)
164+
test_trace = None
165+
166+
for i, trace_data in enumerate(traces.data):
167+
if trace_data.name == trace_name:
168+
test_trace = trace_data
169+
break
170+
171+
assert test_trace is not None, f"Could not find trace with name {trace_name}"
172+
173+
# Get trace details and filter observations
174+
trace_details = api.trace.get(test_trace.id)
175+
all_observations = trace_details.observations
176+
177+
graph_observations = [
178+
obs for obs in all_observations if obs.type in ["AGENT", "TOOL"]
179+
]
180+
181+
# Should have: start_agent (1) + 3 tools (3) + end_agent (1) = 5 total
182+
expected_count = 5
183+
assert (
184+
len(graph_observations) == expected_count
185+
), f"Expected {expected_count} graph observations, got {len(graph_observations)} out of {len(all_observations)} total"
186+
187+
# for obs in sorted(graph_observations, key=lambda x: x.start_time):
188+
# print(f" {obs.name} ({obs.type}): {obs.start_time} - {obs.end_time}")
189+
190+
agent_observations = [obs for obs in graph_observations if obs.type == "AGENT"]
191+
tool_observations = [obs for obs in graph_observations if obs.type == "TOOL"]
192+
193+
assert (
194+
len(agent_observations) == 2
195+
), f"Expected 2 AGENT observations, got {len(agent_observations)}"
196+
assert (
197+
len(tool_observations) == 3
198+
), f"Expected 3 TOOL observations, got {len(tool_observations)}"

0 commit comments

Comments
 (0)