-
-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy pathtest_agent_tools.py
More file actions
389 lines (290 loc) · 13.3 KB
/
Copy pathtest_agent_tools.py
File metadata and controls
389 lines (290 loc) · 13.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
"""
Unit tests for robodm.agent.tools module.
"""
import base64
import io
from unittest.mock import MagicMock, Mock, patch
import numpy as np
import pytest
from PIL import Image
from robodm.agent.tools import ( # Legacy compatibility functions; New tool system
ImageAnalysisTool, ToolsManager, TrajectoryAnalysisTool,
VisionLanguageModelTool, analyze_image, analyze_trajectory, create_manager,
detect_scene_changes, extract_keyframes, get_registry)
class TestToolsManager:
"""Test cases for the new ToolsManager system."""
def test_tools_manager_init(self):
"""Test ToolsManager initialization."""
manager = ToolsManager()
# Should have tools registered
tools = manager.list_tools()
assert len(tools) > 0
# Check that essential tools are available
assert "robo2vlm" in tools
# Other tools may not be available due to import mocking in test environment
# This is acceptable as long as basic functionality works
def test_tools_manager_with_config(self):
"""Test ToolsManager with configuration."""
config = {
"tools": {
"robo2vlm": {
"temperature": 0.05,
"max_tokens": 512
}
},
"disabled_tools": ["analyze_trajectory"],
}
manager = ToolsManager(config=config)
enabled_tools = manager.list_tools(enabled_only=True)
# Should not include disabled tool
assert "analyze_trajectory" not in enabled_tools
assert "robo2vlm" in enabled_tools
def test_get_tool_instance(self):
"""Test getting tool instances."""
manager = ToolsManager()
# Get VLM tool
vlm_tool = manager.get_tool("robo2vlm")
assert vlm_tool is not None
assert hasattr(vlm_tool, "__call__")
# Get image analysis tool
img_tool = manager.get_tool("analyze_image")
assert img_tool is not None
assert hasattr(img_tool, "__call__")
def test_tools_namespace(self):
"""Test getting tools namespace for code execution."""
manager = ToolsManager()
namespace = manager.get_tools_namespace()
assert isinstance(namespace, dict)
assert "robo2vlm" in namespace
# Note: Other tools may not be available due to test environment mocking
# Test that available tools are callable
for tool in namespace.values():
assert hasattr(tool, "__call__")
class TestImageAnalysisTool:
"""Test cases for ImageAnalysisTool."""
def test_image_analysis_tool_init(self):
"""Test ImageAnalysisTool initialization."""
tool = ImageAnalysisTool(blur_threshold=80.0,
brightness_threshold=0.25)
assert tool.blur_threshold == 80.0
assert tool.brightness_threshold == 0.25
assert tool.enabled is True
def test_image_analysis_all_operations(self):
"""Test image analysis with all operations."""
tool = ImageAnalysisTool()
# Test with RGB image
rgb_image = np.random.randint(0, 255, (64, 64, 3), dtype=np.uint8)
result = tool(rgb_image, "all")
assert isinstance(result, dict)
assert "blur" in result
assert "brightness" in result
assert "features" in result
# Check blur analysis
assert "is_blurry" in result["blur"]
assert "laplacian_variance" in result["blur"]
assert "threshold" in result["blur"]
# Check brightness analysis
assert "mean_brightness" in result["brightness"]
assert "is_dark" in result["brightness"]
assert "is_bright" in result["brightness"]
# Check features
assert "shape" in result["features"]
assert "mean_rgb" in result["features"]
def test_image_analysis_specific_operations(self):
"""Test image analysis with specific operations."""
tool = ImageAnalysisTool()
image = np.random.randint(0, 255, (32, 32, 3), dtype=np.uint8)
# Test blur only
blur_result = tool(image, "blur")
assert "blur" in blur_result
assert "brightness" not in blur_result
# Test brightness only
brightness_result = tool(image, "brightness")
assert "brightness" in brightness_result
assert "blur" not in brightness_result
def test_image_analysis_legacy_function(self):
"""Test legacy analyze_image function."""
image = np.random.randint(0, 255, (32, 32, 3), dtype=np.uint8)
result = analyze_image(image, "all")
assert isinstance(result, dict)
assert "blur" in result or "brightness" in result or "features" in result
class TestVisionLanguageModelTool:
"""Test cases for VisionLanguageModelTool."""
@patch("robodm.agent.tools.implementations.LLM")
def test_vlm_tool_init(self, mock_llm_class):
"""Test VisionLanguageModelTool initialization."""
tool = VisionLanguageModelTool(model="test-model", temperature=0.05)
assert tool.model == "test-model"
assert tool.temperature == 0.05
assert tool.enabled is True
@patch("robodm.agent.tools.implementations.LLM")
def test_vlm_tool_call(self, mock_llm_class):
"""Test VisionLanguageModelTool call."""
# Mock VLM and response
mock_vlm = Mock()
mock_output = Mock()
mock_output.outputs = [Mock()]
mock_output.outputs[0].text = "Yes, there is occlusion in the image."
mock_vlm.generate.return_value = [mock_output]
mock_llm_class.return_value = mock_vlm
tool = VisionLanguageModelTool()
# Test data
frame = np.random.randint(0, 255, (64, 64, 3), dtype=np.uint8)
prompt = "Is there occlusion in this image?"
result = tool(frame, prompt)
assert result == "Yes, there is occlusion in the image."
mock_vlm.generate.assert_called_once()
# Check that the generated call includes image and text
call_args = mock_vlm.generate.call_args
multimodal_prompt = call_args[0][0][0] # First prompt in the list
assert len(multimodal_prompt) == 2 # image and text components
assert multimodal_prompt[0]["type"] == "image_url"
assert multimodal_prompt[1]["type"] == "text"
assert multimodal_prompt[1]["text"] == prompt
@patch("robodm.agent.tools.implementations.LLM")
def test_vlm_tool_error_handling(self, mock_llm_class):
"""Test VisionLanguageModelTool error handling."""
# Mock VLM to raise exception
mock_vlm = Mock()
mock_vlm.generate.side_effect = RuntimeError("VLM failed")
mock_llm_class.return_value = mock_vlm
tool = VisionLanguageModelTool()
frame = np.random.randint(0, 255, (64, 64, 3), dtype=np.uint8)
prompt = "test prompt"
result = tool(frame, prompt)
assert "Error in robo2vlm" in result
assert "VLM failed" in result
def test_vlm_tool_metadata(self):
"""Test VisionLanguageModelTool metadata."""
metadata = VisionLanguageModelTool.get_metadata()
assert metadata.name == "robo2vlm"
assert "vision-language model" in metadata.description.lower()
assert len(metadata.examples) > 0
assert "vision" in metadata.tags
def test_vlm_tool_validation(self):
"""Test VisionLanguageModelTool configuration validation."""
# Valid configuration
tool = VisionLanguageModelTool(temperature=0.1, max_tokens=256)
# Should not raise exception
# Invalid temperature
with pytest.raises(ValueError, match="Temperature must be between"):
VisionLanguageModelTool(temperature=3.0)
# Invalid max_tokens
with pytest.raises(ValueError, match="max_tokens must be positive"):
VisionLanguageModelTool(max_tokens=-1)
class TestTrajectoryAnalysisTool:
"""Test cases for TrajectoryAnalysisTool."""
def test_trajectory_tool_init(self):
"""Test TrajectoryAnalysisTool initialization."""
tool = TrajectoryAnalysisTool(anomaly_threshold=2.5,
min_length=15,
smoothing_window=7)
assert tool.anomaly_threshold == 2.5
assert tool.min_length == 15
assert tool.smoothing_window == 7
assert tool.enabled is True
def test_trajectory_statistics(self):
"""Test trajectory statistics computation."""
tool = TrajectoryAnalysisTool()
# Test data
data = np.random.randn(20, 6) # 20 timesteps, 6 joints
result = tool(data, "statistics")
assert isinstance(result, dict)
assert "length" in result
assert "mean" in result
assert "std" in result
assert "min" in result
assert "max" in result
assert "is_long_enough" in result
assert result["length"] == 20
assert len(result["mean"]) == 6 # 6 joints
def test_trajectory_velocity(self):
"""Test trajectory velocity computation."""
tool = TrajectoryAnalysisTool()
# Simple position data
data = np.array([[0, 0], [1, 1], [2, 2], [3, 3]]) # Linear motion
velocity = tool(data, "velocity")
assert isinstance(velocity, np.ndarray)
assert velocity.shape == (3, 2) # N-1 timesteps
# Should be constant velocity of [1, 1]
assert np.allclose(velocity, [[1, 1], [1, 1], [1, 1]])
def test_trajectory_anomaly_detection(self):
"""Test trajectory anomaly detection."""
tool = TrajectoryAnalysisTool(anomaly_threshold=2.0)
# Create data with clear anomaly
normal_data = np.random.randn(50, 3) * 0.1 # Small variance
anomaly_point = np.array([[10, 10, 10]]) # Clear outlier
data = np.vstack([normal_data[:25], anomaly_point, normal_data[25:]])
result = tool(data, "anomalies")
assert isinstance(result, dict)
assert "anomaly_indices" in result
assert "anomaly_count" in result
assert "anomaly_ratio" in result
# Should detect the anomaly at index 25
assert 25 in result["anomaly_indices"]
assert result["anomaly_count"] >= 1
def test_trajectory_smoothing(self):
"""Test trajectory smoothing."""
tool = TrajectoryAnalysisTool(smoothing_window=3)
# Noisy signal
t = np.linspace(0, 1, 20)
clean_signal = np.sin(2 * np.pi * t)
noisy_signal = clean_signal + 0.1 * np.random.randn(20)
data = noisy_signal.reshape(-1, 1)
smoothed = tool(data, "smooth")
assert isinstance(smoothed, np.ndarray)
assert smoothed.shape == data.shape
# Smoothed signal should have less variance
assert np.var(smoothed) <= np.var(data)
def test_trajectory_tool_metadata(self):
"""Test TrajectoryAnalysisTool metadata."""
metadata = TrajectoryAnalysisTool.get_metadata()
assert metadata.name == "analyze_trajectory"
assert "trajectory" in metadata.description.lower()
assert len(metadata.examples) > 0
assert "trajectory" in metadata.tags
def test_trajectory_legacy_function(self):
"""Test legacy analyze_trajectory function."""
data = np.random.randn(15, 4)
result = analyze_trajectory(data, "statistics")
assert isinstance(result, dict)
assert "length" in result
assert result["length"] == 15
class TestTrajectoryUtilities:
"""Test cases for trajectory utility functions."""
def test_extract_keyframes(self):
"""Test keyframe extraction."""
# Create sequence of images
images = np.random.randint(0, 255, (20, 64, 64, 3), dtype=np.uint8)
indices, keyframes = extract_keyframes(images, num_keyframes=5)
assert len(indices) == 5
assert keyframes.shape == (5, 64, 64, 3)
assert indices == [0, 4, 9, 14, 19] # Uniform sampling
def test_extract_keyframes_short_sequence(self):
"""Test keyframe extraction from short sequence."""
images = np.random.randint(0, 255, (3, 32, 32, 3), dtype=np.uint8)
indices, keyframes = extract_keyframes(images, num_keyframes=5)
# Should return all frames when requested more than available
assert len(indices) == 3
assert keyframes.shape == (3, 32, 32, 3)
def test_detect_scene_changes_with_vlm(self):
"""Test scene change detection using VLM tool."""
# Test the utility function
images = np.random.randint(0, 255, (4, 64, 64, 3), dtype=np.uint8)
# Mock VLM function
mock_vlm_func = Mock()
# Mock VLM responses for scene change detection
mock_vlm_func.side_effect = [
"Kitchen scene with table", # Frame 0 description
"Kitchen scene with table", # Frame 1 description (similar)
"yes", # Similarity check frame 1 (similar -> no change)
"Living room with sofa", # Frame 2 description (different)
"no", # Similarity check frame 2 (different -> change)
"Living room with sofa", # Frame 3 description (similar)
"yes", # Similarity check frame 3 (similar -> no change)
]
scene_changes = detect_scene_changes(images, mock_vlm_func)
assert len(scene_changes) == 1
assert scene_changes[0] == 2 # Scene change at frame 2
if __name__ == "__main__":
pytest.main([__file__, "-v"])