-
Notifications
You must be signed in to change notification settings - Fork 964
Expand file tree
/
Copy pathtest_process_parallel.py
More file actions
258 lines (205 loc) · 9.55 KB
/
test_process_parallel.py
File metadata and controls
258 lines (205 loc) · 9.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
"""
Tests for process-based parallel controller
"""
import asyncio
import os
import tempfile
import unittest
from unittest.mock import Mock, patch, MagicMock
import time
from concurrent.futures import Future
# Set dummy API key for testing
os.environ["OPENAI_API_KEY"] = "test"
from openevolve.config import Config, DatabaseConfig, EvaluatorConfig, LLMConfig, PromptConfig
from openevolve.database import Program, ProgramDatabase
from openevolve.process_parallel import ProcessParallelController, SerializableResult
class TestProcessParallel(unittest.TestCase):
"""Tests for process-based parallel controller"""
def setUp(self):
"""Set up test environment"""
self.test_dir = tempfile.mkdtemp()
# Create test config
self.config = Config()
self.config.max_iterations = 10
self.config.evaluator.parallel_evaluations = 2
self.config.evaluator.timeout = 10
self.config.database.num_islands = 2
self.config.database.in_memory = True
self.config.checkpoint_interval = 5
# Create test evaluation file
self.eval_content = """
def evaluate(program_path):
return {"score": 0.5, "performance": 0.6}
"""
self.eval_file = os.path.join(self.test_dir, "evaluator.py")
with open(self.eval_file, "w") as f:
f.write(self.eval_content)
# Create test database
self.database = ProgramDatabase(self.config.database)
# Add some test programs
for i in range(3):
program = Program(
id=f"test_{i}",
code=f"def func_{i}(): return {i}",
language="python",
metrics={"score": 0.5 + i * 0.1, "performance": 0.4 + i * 0.1},
iteration_found=0,
)
self.database.add(program)
def tearDown(self):
"""Clean up test environment"""
import shutil
shutil.rmtree(self.test_dir, ignore_errors=True)
def test_controller_initialization(self):
"""Test that controller initializes correctly"""
controller = ProcessParallelController(self.config, self.eval_file, self.database)
self.assertEqual(controller.num_workers, 2)
self.assertIsNone(controller.executor)
self.assertIsNotNone(controller.shutdown_event)
def test_controller_start_stop(self):
"""Test starting and stopping the controller"""
controller = ProcessParallelController(self.config, self.eval_file, self.database)
# Start controller
controller.start()
self.assertIsNotNone(controller.executor)
# Stop controller
controller.stop()
self.assertIsNone(controller.executor)
self.assertTrue(controller.shutdown_event.is_set())
def test_database_snapshot_creation(self):
"""Test creating database snapshot for workers"""
controller = ProcessParallelController(self.config, self.eval_file, self.database)
snapshot = controller._create_database_snapshot()
# Verify snapshot structure
self.assertIn("programs", snapshot)
self.assertIn("islands", snapshot)
self.assertIn("current_island", snapshot)
self.assertIn("artifacts", snapshot)
# Verify programs are serialized
self.assertEqual(len(snapshot["programs"]), 3)
for pid, prog_dict in snapshot["programs"].items():
self.assertIsInstance(prog_dict, dict)
self.assertIn("id", prog_dict)
self.assertIn("code", prog_dict)
def test_run_evolution_basic(self):
"""Test basic evolution run"""
async def run_test():
controller = ProcessParallelController(self.config, self.eval_file, self.database)
# Mock the executor to avoid actually spawning processes
with patch.object(controller, "_submit_iteration") as mock_submit:
# Create mock futures that complete immediately
mock_future1 = MagicMock()
mock_result1 = SerializableResult(
child_program_dict={
"id": "child_1",
"code": "def evolved(): return 1",
"language": "python",
"parent_id": "test_0",
"generation": 1,
"metrics": {"score": 0.7, "performance": 0.8},
"iteration_found": 1,
"metadata": {"changes": "test", "island": 0},
},
parent_id="test_0",
iteration_time=0.1,
iteration=1,
)
mock_future1.done.return_value = True
mock_future1.result.return_value = mock_result1
mock_future1.cancel.return_value = True
mock_submit.return_value = mock_future1
# Start controller
controller.start()
# Run evolution for 1 iteration
result = await controller.run_evolution(
start_iteration=1, max_iterations=1, target_score=None
)
# Verify iteration was submitted with island_id
mock_submit.assert_called_once_with(1, 0)
# Verify program was added to database
self.assertIn("child_1", self.database.programs)
child = self.database.get("child_1")
self.assertEqual(child.metrics["score"], 0.7)
# Run the async test
asyncio.run(run_test())
def test_request_shutdown(self):
"""Test graceful shutdown request"""
controller = ProcessParallelController(self.config, self.eval_file, self.database)
# Request shutdown
controller.request_shutdown()
# Verify shutdown event is set
self.assertTrue(controller.shutdown_event.is_set())
def test_hard_timeout_watchdog_requeues_stuck_future(self):
"""Test that watchdog cancels and requeues a stuck future."""
async def run_test():
controller = ProcessParallelController(self.config, self.eval_file, self.database)
controller.executor = Mock() # run_evolution requires a started executor
self.config.stuck_future_timeout = 0.01
self.config.stuck_future_max_retries = 1
stuck_future = MagicMock(spec=Future)
stuck_future.done.return_value = False
stuck_future.cancel.return_value = True
retry_future = MagicMock(spec=Future)
retry_future.done.return_value = True
retry_future.result.return_value = SerializableResult(
child_program_dict={
"id": "child_retry_success",
"code": "def evolved(): return 42",
"language": "python",
"parent_id": "test_0",
"generation": 1,
"metrics": {"score": 0.8, "performance": 0.9},
"iteration_found": 1,
"metadata": {"changes": "watchdog retry", "island": 0},
},
parent_id="test_0",
iteration_time=0.1,
iteration=1,
target_island=0,
)
with patch.object(
controller, "_submit_iteration", side_effect=[stuck_future, retry_future]
) as mock_submit:
await controller.run_evolution(start_iteration=1, max_iterations=1, target_score=None)
self.assertEqual(mock_submit.call_count, 2)
mock_submit.assert_any_call(1, 0)
self.assertTrue(stuck_future.cancel.called)
self.assertIn("child_retry_success", self.database.programs)
asyncio.run(run_test())
def test_hard_timeout_watchdog_respects_retry_limit(self):
"""Test that watchdog stops requeueing after max retries are exhausted."""
async def run_test():
controller = ProcessParallelController(self.config, self.eval_file, self.database)
controller.executor = Mock() # run_evolution requires a started executor
self.config.stuck_future_timeout = 0.01
self.config.stuck_future_max_retries = 0
stuck_future = MagicMock(spec=Future)
stuck_future.done.return_value = False
stuck_future.cancel.return_value = True
with patch.object(controller, "_submit_iteration", return_value=stuck_future) as mock_submit:
await controller.run_evolution(start_iteration=1, max_iterations=1, target_score=None)
self.assertEqual(mock_submit.call_count, 1)
self.assertTrue(stuck_future.cancel.called)
self.assertNotIn("child_retry_success", self.database.programs)
asyncio.run(run_test())
def test_serializable_result(self):
"""Test SerializableResult dataclass"""
result = SerializableResult(
child_program_dict={"id": "test", "code": "pass"},
parent_id="parent",
iteration_time=1.5,
iteration=10,
error=None,
)
# Verify attributes
self.assertEqual(result.child_program_dict["id"], "test")
self.assertEqual(result.parent_id, "parent")
self.assertEqual(result.iteration_time, 1.5)
self.assertEqual(result.iteration, 10)
self.assertIsNone(result.error)
# Test with error
error_result = SerializableResult(error="Test error", iteration=5)
self.assertEqual(error_result.error, "Test error")
self.assertIsNone(error_result.child_program_dict)
if __name__ == "__main__":
unittest.main()