-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathtest_rollout_control_plane_integration.py
More file actions
603 lines (521 loc) · 23.3 KB
/
Copy pathtest_rollout_control_plane_integration.py
File metadata and controls
603 lines (521 loc) · 23.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
"""
Test Rollout System with Control Plane Integration
This module tests the complete rollout system with control plane separation,
ensuring that:
1. Data plane (tool responses) contain only observations
2. Control plane (MCP resources) contain rewards/termination info
3. Trajectories capture both planes correctly
4. Termination decisions use control plane signals
5. Rollout system works end-to-end with separated architecture
This validates the complete implementation of the control plane separation
feature in the rollout execution pipeline.
"""
import json
import sys
import tempfile
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import eval_protocol as ep
# Add examples directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "examples" / "frozen_lake_mcp"))
from eval_protocol.mcp.execution.manager import ExecutionManager
from eval_protocol.mcp.session.manager import GeneralMCPVectorEnv
from eval_protocol.types import DatasetRow, MCPSession, MCPToolCall, Trajectory
class MockPolicy:
"""Mock policy for testing that returns predetermined actions."""
def __init__(self, actions=None):
self.actions = actions or ["right", "down", "right", "down"]
self.step_count = 0
self.model_id = "mock-model"
async def __call__(self, tool_schema, env_index, conversation_history):
"""Return predetermined actions as tool calls."""
if self.step_count < len(self.actions):
action = self.actions[self.step_count]
else:
action = "right" # Default action
tool_calls = []
tool_call = MCPToolCall(tool_name="lake_move", arguments={"action": action})
tool_calls.append(tool_call)
self.step_count += 1
return tool_calls, None
def add_tool_response(
self,
env_index,
tool_call,
response,
conversation_history,
reward=0.0,
done=False,
info=None,
):
"""Mock method for conversation tracking - adds proper OpenAI-format messages."""
# Add assistant message with tool call
conversation_history.append(
{
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": tool_call.tool_call_id or f"call_{len(conversation_history)}",
"type": "function",
"function": {"name": tool_call.tool_name, "arguments": str(tool_call.arguments)},
}
],
}
)
# Add tool response message
conversation_history.append(
{
"role": "tool",
"content": response,
"tool_call_id": tool_call.tool_call_id or f"call_{len(conversation_history)-1}",
"control_plane_step": {
"step": env_index,
"reward": reward,
"terminated": done,
"info": info.get("control_plane", {}) if info else {},
"tool_calls": [f"{tool_call.tool_name}({tool_call.arguments})"],
"num_tool_calls": 1,
},
}
)
class TestRolloutControlPlaneIntegration:
"""Test rollout system with control plane integration."""
def setup_method(self):
"""Setup test environment."""
self.execution_manager = ExecutionManager()
@pytest.mark.asyncio
async def test_rollout_with_control_plane_separation(self):
"""
Test that rollout system properly handles control plane separation.
This test validates:
1. Tool responses contain only data plane info
2. Control plane resources provide rewards/termination
3. Trajectories capture both planes correctly
4. Termination uses control plane signals
"""
# Create mock sessions
sessions = [
MCPSession(
session_id="test_session_1",
base_url="http://localhost:8000",
seed=42,
model_id="test_model",
)
]
# Create dataset rows
dataset_rows = [
DatasetRow(
id="test_row_1",
seed=42,
system_prompt="You are playing FrozenLake",
user_prompt_template="Navigate to the goal",
environment_context={"grid_type": "4x4"},
)
]
# Mock the vector environment to simulate control plane separation
with (
patch.object(GeneralMCPVectorEnv, "__init__", return_value=None),
patch.object(GeneralMCPVectorEnv, "reset") as mock_reset,
patch.object(GeneralMCPVectorEnv, "step") as mock_step,
patch.object(GeneralMCPVectorEnv, "close") as mock_close,
patch.object(GeneralMCPVectorEnv, "format_user_prompt") as mock_format_user_prompt,
):
# Setup mock vector environment
mock_env = GeneralMCPVectorEnv(sessions, dataset_rows)
mock_env.sessions = sessions
mock_env.dataset_rows = dataset_rows
mock_env.n = 1
mock_env.user_prompt_formatter = lambda template, obs, context: template
mock_env.tool_schemas = [{"name": "lake_move", "description": "Move in FrozenLake"}]
# Mock reset to return initial state
mock_reset.return_value = (
{"position": 0, "grid": "4x4 FrozenLake"}, # single observation
[{"name": "lake_move", "description": "Move in FrozenLake"}], # single tool schema
)
# Mock format_user_prompt to return template
mock_format_user_prompt.return_value = "Navigate to the goal"
# Mock step to simulate control plane separation
step_responses = [
# Step 1: Move right, no reward, not terminated
(
{
"position": 1,
"grid": "4x4 FrozenLake",
}, # single observation (data plane only)
0.0, # single reward (from control plane)
False, # single done (from control plane)
{
"control_plane": {
"reward_source": "control_plane",
"status_source": "control_plane",
}
}, # single info
),
# Step 2: Move down, no reward, not terminated
(
{"position": 5, "grid": "4x4 FrozenLake"},
0.0,
False,
{
"control_plane": {
"reward_source": "control_plane",
"status_source": "control_plane",
}
},
),
# Step 3: Move right, reach goal, reward, terminated
(
{"position": 6, "grid": "4x4 FrozenLake"},
1.0, # Success reward from control plane
True, # Terminated from control plane
{
"control_plane": {
"reward_source": "control_plane",
"status_source": "control_plane",
}
},
),
]
step_call_count = 0
def mock_step_side_effect(env_index, tool_call):
nonlocal step_call_count
if step_call_count < len(step_responses):
result = step_responses[step_call_count]
step_call_count += 1
return result
else:
# Default to terminated if we run out of responses
return (
{"position": 6, "grid": "4x4 FrozenLake"},
0.0,
True,
{
"control_plane": {
"reward_source": "control_plane",
"status_source": "control_plane",
}
},
)
mock_step.side_effect = mock_step_side_effect
mock_close.return_value = None
# Create mock policy
policy = MockPolicy(["right", "down", "right"])
# Execute rollout
evaluation_rows = await self.execution_manager.execute_rollouts(mock_env, policy, steps=10)
# Validate results
assert len(evaluation_rows) == 1, "Should have one evaluation row"
evaluation_row = evaluation_rows[0]
# Extract trajectory information from messages' control_plane_step data
messages_with_control_plane = [
msg for msg in evaluation_row.messages if msg.control_plane_step is not None
]
steps = len(messages_with_control_plane)
total_reward = sum(msg.control_plane_step["reward"] for msg in messages_with_control_plane)
terminated = any(msg.control_plane_step["terminated"] for msg in messages_with_control_plane)
# Validate basic trajectory structure
assert steps == 3, f"Expected 3 steps, got {steps}"
assert total_reward == 1.0, f"Expected reward 1.0, got {total_reward}"
assert terminated == True, "Trajectory should be terminated"
# Validate that data plane and control plane are properly separated in messages
# Tool responses should only contain observations, rewards/termination are in control_plane_step
for msg in evaluation_row.messages:
if msg.role == "tool":
# Tool responses should only contain data plane information
content = msg.content or ""
# The content should not directly contain rewards or termination (they're in control_plane_step)
assert (
"reward" not in content.lower() or "reward_source" in content.lower()
), "Tool response should not directly contain reward"
# Validate control plane information from messages
rewards = [msg.control_plane_step["reward"] for msg in messages_with_control_plane]
assert rewards == [0.0, 0.0, 1.0], "Rewards should match control plane"
# Validate enhanced control plane tracking via messages
assert len(messages_with_control_plane) == 3, "Should have 3 messages with control plane steps"
for i, msg in enumerate(messages_with_control_plane):
cp_step = msg.control_plane_step
assert "step" in cp_step, "Control plane step should have step number"
assert "reward" in cp_step, "Control plane step should have reward"
assert "terminated" in cp_step, "Control plane step should have terminated status"
assert "info" in cp_step, "Control plane step should have control plane info"
assert "tool_calls" in cp_step, "Control plane step should have tool calls"
# Validate final step has termination
final_msg = messages_with_control_plane[-1]
final_cp_step = final_msg.control_plane_step
assert final_cp_step["terminated"] == True, "Final step should be terminated"
assert final_cp_step["reward"] == 1.0, "Final step should have correct reward"
assert final_cp_step["termination_reason"] == "control_plane_signal", "Should terminate via control plane"
assert final_cp_step["step"] == 2, "Should record final step"
# Validate policy interaction
assert policy.step_count == 3, "Policy should have been called 3 times"
@pytest.mark.asyncio
async def test_rollout_trajectory_recording_with_control_plane(self):
"""
Test that trajectory recording captures both data and control plane information.
"""
# Create a simple test scenario with manual trajectory construction
session = MCPSession(
session_id="test_session",
base_url="http://localhost",
seed=42,
model_id="test_model",
)
# Create a trajectory and manually populate it with control plane data
trajectory = Trajectory(
session=session,
observations=[],
actions=[],
rewards=[],
terminated=False,
total_reward=0.0,
steps=0,
duration=0.0,
control_plane_steps=[],
control_plane_summary={},
termination_reason="",
conversation_history=[],
usage={},
)
# Simulate steps with control plane separation
steps = [
{
"observation": {"position": 1, "grid": "4x4"},
"action": "lake_move(right)",
"reward": 0.0,
"terminated": False,
"control_plane_info": {"reward_source": "control_plane"},
},
{
"observation": {"position": 15, "grid": "4x4"},
"action": "lake_move(down)",
"reward": 1.0,
"terminated": True,
"control_plane_info": {
"reward_source": "control_plane",
"status_source": "control_plane",
},
},
]
# Simulate the rollout manager's trajectory building logic
trajectory.control_plane_steps = []
for i, step_data in enumerate(steps):
# Data plane recording
trajectory.observations.append(step_data["observation"])
trajectory.actions.append(step_data["action"])
trajectory.rewards.append(step_data["reward"])
trajectory.total_reward += step_data["reward"]
trajectory.steps += 1
# Control plane recording
control_plane_step = {
"step": i,
"reward": step_data["reward"],
"terminated": step_data["terminated"],
"info": step_data["control_plane_info"],
"tool_call": step_data["action"],
}
trajectory.control_plane_steps.append(control_plane_step)
if step_data["terminated"]:
trajectory.terminated = True
trajectory.control_plane_summary = {
"total_reward": trajectory.total_reward,
"termination_reason": "control_plane_signal",
"final_step": i,
"control_plane_source": step_data["control_plane_info"],
}
# Validate the trajectory structure
assert len(trajectory.observations) == 2, "Should have 2 observations"
assert len(trajectory.actions) == 2, "Should have 2 actions"
assert len(trajectory.rewards) == 2, "Should have 2 rewards"
assert len(trajectory.control_plane_steps) == 2, "Should have 2 control plane steps"
# Validate data plane contains only observations
for obs in trajectory.observations:
assert "position" in obs, "Observation should contain position"
assert "reward" not in obs, "Data plane should not contain reward"
assert "terminated" not in obs, "Data plane should not contain termination"
# Validate control plane contains rewards and termination info
assert trajectory.rewards == [0.0, 1.0], "Control plane should have rewards"
assert trajectory.total_reward == 1.0, "Control plane should track total reward"
assert trajectory.terminated == True, "Control plane should handle termination"
# Validate control plane summary
assert trajectory.control_plane_summary["total_reward"] == 1.0
assert trajectory.control_plane_summary["termination_reason"] == "control_plane_signal"
assert trajectory.control_plane_summary["final_step"] == 1
@pytest.mark.asyncio
async def test_rollout_handles_control_plane_failure_gracefully(self):
"""
Test that rollout system handles control plane failures gracefully.
"""
# Create mock sessions
sessions = [
MCPSession(
session_id="test_session",
base_url="http://localhost",
seed=42,
model_id="test_model",
)
]
dataset_rows = [
DatasetRow(
id="test_row",
seed=42,
system_prompt="Test",
user_prompt_template="Test",
environment_context={},
)
]
with (
patch.object(GeneralMCPVectorEnv, "__init__", return_value=None),
patch.object(GeneralMCPVectorEnv, "reset") as mock_reset,
patch.object(GeneralMCPVectorEnv, "step") as mock_step,
patch.object(GeneralMCPVectorEnv, "close") as mock_close,
patch.object(GeneralMCPVectorEnv, "format_user_prompt") as mock_format_user_prompt,
):
mock_env = GeneralMCPVectorEnv(sessions, dataset_rows)
mock_env.sessions = sessions
mock_env.dataset_rows = dataset_rows
mock_env.n = 1
mock_env.user_prompt_formatter = lambda template, obs, context: template
# Add tool_schemas attribute expected by execute_rollouts
mock_env.tool_schemas = [{"name": "move", "description": "Move"}]
# Mock reset
mock_reset.return_value = (
{"position": 0}, # single observation
[{"name": "move", "description": "Move"}], # single tool schema
)
# Mock step to simulate control plane failure (no control plane info)
mock_step.return_value = (
{"position": 1}, # single observation
0.0, # single reward (fallback)
False, # single done (fallback)
{}, # single info (no control plane)
)
mock_close.return_value = None
mock_format_user_prompt.return_value = "Test"
# Execute rollout with control plane failure
policy = MockPolicy(["right"])
evaluation_rows = await self.execution_manager.execute_rollouts(mock_env, policy, steps=1)
# Should still work, but without control plane info
assert len(evaluation_rows) == 1
evaluation_row = evaluation_rows[0]
# Extract trajectory information from messages
messages_with_control_plane = [
msg for msg in evaluation_row.messages if msg.control_plane_step is not None
]
steps = len(messages_with_control_plane)
total_reward = sum(msg.control_plane_step["reward"] for msg in messages_with_control_plane)
assert steps == 1
assert total_reward == 0.0
# Control plane steps should still be recorded (even if empty)
assert len(messages_with_control_plane) == 1
assert messages_with_control_plane[0].control_plane_step["info"] == {}
@pytest.mark.asyncio
async def test_rollout_creates_envs_from_url(self):
"""Ensure rollout can create environments automatically when given a URL."""
dataset = [
{
"id": "row1",
"system_prompt": "sys",
"user_prompt_template": "tmpl",
"environment_context": {"seed": 1},
}
]
policy = MockPolicy(["right"])
with (
patch("eval_protocol.mcp_env.make", new_callable=AsyncMock) as mock_make,
patch("eval_protocol.mcp_env.ExecutionManager") as MockManager,
):
mock_env = MagicMock()
mock_make.return_value = mock_env
manager_instance = MockManager.return_value
manager_instance.execute_rollouts = AsyncMock(return_value=["ok"])
result = await ep.rollout(
"http://localhost:1234/mcp/",
policy,
dataset=dataset,
model_id="test_model",
steps=5,
)
mock_make.assert_called_once_with(
"http://localhost:1234/mcp/",
evaluation_rows=None,
dataset=dataset,
model_id="test_model",
)
manager_instance.execute_rollouts.assert_called_once_with(
mock_make.return_value,
policy,
5,
None,
8,
None,
)
assert result == ["ok"]
def test_control_plane_trajectory_serialization(self):
"""
Test that trajectories with control plane information can be serialized.
"""
# Create a trajectory with control plane data
session = MCPSession(
session_id="test",
base_url="http://localhost",
seed=42,
model_id="test_model",
)
trajectory = Trajectory(
session=session,
observations=[{"position": 0}, {"position": 1}],
actions=["move(right)"],
rewards=[0.0],
terminated=False,
total_reward=0.0,
steps=1,
duration=1.0,
control_plane_steps=[],
control_plane_summary={},
termination_reason="",
conversation_history=[],
usage={},
)
# Add control plane data
trajectory.control_plane_steps = [
{
"step": 0,
"reward": 0.0,
"terminated": False,
"info": {"reward_source": "control_plane"},
"tool_call": "move(right)",
}
]
trajectory.control_plane_summary = {
"total_reward": 0.0,
"termination_reason": "control_plane_signal",
"final_step": 0,
"control_plane_source": {"reward_source": "control_plane"},
}
# Test serialization
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
trajectory_dict = {
"session_id": trajectory.session.session_id,
"observations": trajectory.observations,
"actions": trajectory.actions,
"rewards": trajectory.rewards,
"terminated": trajectory.terminated,
"total_reward": trajectory.total_reward,
"steps": trajectory.steps,
"duration": trajectory.duration,
"control_plane_steps": trajectory.control_plane_steps,
"control_plane_summary": trajectory.control_plane_summary,
}
json.dump(trajectory_dict, f)
f.flush()
# Test deserialization
with open(f.name, "r") as read_f:
loaded_data = json.load(read_f)
assert loaded_data["session_id"] == "test"
assert len(loaded_data["control_plane_steps"]) == 1
assert loaded_data["control_plane_summary"]["termination_reason"] == "control_plane_signal"
# Clean up
Path(f.name).unlink()
if __name__ == "__main__":
pytest.main([__file__, "-v"])