-
Notifications
You must be signed in to change notification settings - Fork 942
Expand file tree
/
Copy pathtest_process_parallel.py
More file actions
231 lines (186 loc) · 8.4 KB
/
test_process_parallel.py
File metadata and controls
231 lines (186 loc) · 8.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
"""
Tests for process-based parallel controller
"""
import asyncio
import os
import tempfile
import unittest
from unittest.mock import Mock, patch, MagicMock
import time
from concurrent.futures import Future
# Set dummy API key for testing
os.environ["OPENAI_API_KEY"] = "test"
from openevolve.config import Config, DatabaseConfig, EvaluatorConfig, LLMConfig, PromptConfig
from openevolve.database import Program, ProgramDatabase
from openevolve.process_parallel import ProcessParallelController, SerializableResult
class TestProcessParallel(unittest.TestCase):
"""Tests for process-based parallel controller"""
def setUp(self):
"""Set up test environment"""
self.test_dir = tempfile.mkdtemp()
# Create test config
self.config = Config()
self.config.max_iterations = 10
self.config.evaluator.parallel_evaluations = 2
self.config.evaluator.timeout = 10
self.config.database.num_islands = 2
self.config.database.in_memory = True
self.config.checkpoint_interval = 5
# Create test evaluation file
self.eval_content = """
def evaluate(program_path):
return {"score": 0.5, "performance": 0.6}
"""
self.eval_file = os.path.join(self.test_dir, "evaluator.py")
with open(self.eval_file, "w") as f:
f.write(self.eval_content)
# Create test database
self.database = ProgramDatabase(self.config.database)
# Add some test programs
for i in range(3):
program = Program(
id=f"test_{i}",
code=f"def func_{i}(): return {i}",
language="python",
metrics={"score": 0.5 + i * 0.1, "performance": 0.4 + i * 0.1},
iteration_found=0,
)
self.database.add(program)
def tearDown(self):
"""Clean up test environment"""
import shutil
shutil.rmtree(self.test_dir, ignore_errors=True)
def test_controller_initialization(self):
"""Test that controller initializes correctly"""
controller = ProcessParallelController(self.config, self.eval_file, self.database)
self.assertEqual(controller.num_workers, 2)
self.assertIsNone(controller.executor)
self.assertIsNotNone(controller.shutdown_event)
def test_controller_start_stop(self):
"""Test starting and stopping the controller"""
controller = ProcessParallelController(self.config, self.eval_file, self.database)
# Start controller
controller.start()
self.assertIsNotNone(controller.executor)
# Stop controller
controller.stop()
self.assertIsNone(controller.executor)
self.assertTrue(controller.shutdown_event.is_set())
def test_database_snapshot_creation(self):
"""Test creating database snapshot for workers"""
controller = ProcessParallelController(self.config, self.eval_file, self.database)
snapshot = controller._create_database_snapshot()
# Verify snapshot structure
self.assertIn("programs", snapshot)
self.assertIn("islands", snapshot)
self.assertIn("current_island", snapshot)
self.assertIn("artifacts", snapshot)
# Verify programs are serialized
self.assertEqual(len(snapshot["programs"]), 3)
for pid, prog_dict in snapshot["programs"].items():
self.assertIsInstance(prog_dict, dict)
self.assertIn("id", prog_dict)
self.assertIn("code", prog_dict)
def test_run_evolution_basic(self):
"""Test basic evolution run"""
async def run_test():
controller = ProcessParallelController(self.config, self.eval_file, self.database)
# Mock the executor to avoid actually spawning processes
with patch.object(controller, "_submit_iteration") as mock_submit:
# Create mock futures that complete immediately
mock_future1 = MagicMock()
mock_result1 = SerializableResult(
child_program_dict={
"id": "child_1",
"code": "def evolved(): return 1",
"language": "python",
"parent_id": "test_0",
"generation": 1,
"metrics": {"score": 0.7, "performance": 0.8},
"iteration_found": 1,
"metadata": {"changes": "test", "island": 0},
},
parent_id="test_0",
iteration_time=0.1,
iteration=1,
)
mock_future1.done.return_value = True
mock_future1.result.return_value = mock_result1
mock_future1.cancel.return_value = True
mock_submit.return_value = mock_future1
# Start controller
controller.start()
# Run evolution for 1 iteration
result = await controller.run_evolution(
start_iteration=1, max_iterations=1, target_score=None
)
# Verify iteration was submitted with island_id
mock_submit.assert_called_once_with(1, 0)
# Verify program was added to database
self.assertIn("child_1", self.database.programs)
child = self.database.get("child_1")
self.assertEqual(child.metrics["score"], 0.7)
# Run the async test
asyncio.run(run_test())
def test_request_shutdown(self):
"""Test graceful shutdown request"""
controller = ProcessParallelController(self.config, self.eval_file, self.database)
# Request shutdown
controller.request_shutdown()
# Verify shutdown event is set
self.assertTrue(controller.shutdown_event.is_set())
def test_target_score_fast_exit_cancels_pending_futures(self):
"""Test that reaching target score cancels pending futures immediately."""
async def run_test():
controller = ProcessParallelController(self.config, self.eval_file, self.database)
controller.executor = Mock() # run_evolution requires a started executor
done_future = MagicMock(spec=Future)
done_future.done.return_value = True
done_future.result.return_value = SerializableResult(
child_program_dict={
"id": "child_target_hit",
"code": "def evolved(): return 99",
"language": "python",
"parent_id": "test_0",
"generation": 1,
"metrics": {"combined_score": 0.95, "score": 0.95},
"iteration_found": 1,
"metadata": {"changes": "target score hit", "island": 0},
},
parent_id="test_0",
iteration_time=0.1,
iteration=1,
target_island=0,
)
pending_future = MagicMock(spec=Future)
pending_future.done.return_value = False
pending_future.cancel.return_value = True
with patch.object(
controller, "_submit_iteration", side_effect=[done_future, pending_future]
) as mock_submit:
await controller.run_evolution(start_iteration=1, max_iterations=2, target_score=0.9)
self.assertEqual(mock_submit.call_count, 2)
self.assertTrue(pending_future.cancel.called)
self.assertIn("child_target_hit", self.database.programs)
asyncio.run(run_test())
def test_serializable_result(self):
"""Test SerializableResult dataclass"""
result = SerializableResult(
child_program_dict={"id": "test", "code": "pass"},
parent_id="parent",
iteration_time=1.5,
iteration=10,
error=None,
)
# Verify attributes
self.assertEqual(result.child_program_dict["id"], "test")
self.assertEqual(result.parent_id, "parent")
self.assertEqual(result.iteration_time, 1.5)
self.assertEqual(result.iteration, 10)
self.assertIsNone(result.error)
# Test with error
error_result = SerializableResult(error="Test error", iteration=5)
self.assertEqual(error_result.error, "Test error")
self.assertIsNone(error_result.child_program_dict)
if __name__ == "__main__":
unittest.main()