Skip to content

Commit 9677b56

Browse files
committed
test: parallel evaluation unit and integration tests
- WorktreePool: lifecycle, acquire/release, file isolation - async_execute_test_subprocess: stdout, stderr, timeout - ParallelCandidateEvaluator: code replacement failure, behavioral mismatch with diffs, successful routing, concurrent evaluation
1 parent b2cf03d commit 9677b56

1 file changed

Lines changed: 344 additions & 0 deletions

File tree

tests/test_parallel_evaluator.py

Lines changed: 344 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,344 @@
1+
"""Integration tests for the parallel candidate evaluation infrastructure."""
2+
3+
from __future__ import annotations
4+
5+
import subprocess
6+
import sys
7+
from pathlib import Path
8+
from unittest.mock import MagicMock, patch
9+
10+
import anyio
11+
import pytest
12+
13+
from codeflash.either import Failure, Success, is_successful
14+
from codeflash.languages.function_optimizer import CandidateNode
15+
from codeflash.optimization.parallel_evaluator import EvalFailure, ParallelCandidateEvaluator
16+
17+
18+
class TestWorktreePoolLifecycle:
19+
def test_creates_n_worktrees_and_cleans_up(self, tmp_path: Path) -> None:
20+
from unittest.mock import patch
21+
22+
from codeflash.code_utils.worktree_pool import WorktreePool
23+
24+
pool_size = 3
25+
base_dir = tmp_path.resolve() / "worktrees"
26+
27+
# The pool needs a git root. We use the codeflash repo itself.
28+
repo_root = Path(__file__).resolve().parents[1]
29+
30+
async def _run() -> None:
31+
with patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root):
32+
pool = WorktreePool(pool_size=pool_size, base_dir=base_dir)
33+
async with pool:
34+
assert len(pool._slots) == pool_size
35+
for slot in pool._slots:
36+
assert slot.path.exists()
37+
assert slot.path.is_dir()
38+
assert (slot.path / ".codeflash_pool.pid").exists()
39+
40+
# After cleanup, slots are cleared
41+
assert len(pool._slots) == 0
42+
43+
anyio.run(_run)
44+
45+
def test_acquire_release_round_trip(self, tmp_path: Path) -> None:
46+
from unittest.mock import patch
47+
48+
from codeflash.code_utils.worktree_pool import WorktreePool
49+
50+
pool_size = 2
51+
base_dir = tmp_path.resolve() / "worktrees"
52+
repo_root = Path(__file__).resolve().parents[1]
53+
54+
async def _run() -> None:
55+
with patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root):
56+
async with WorktreePool(pool_size=pool_size, base_dir=base_dir) as pool:
57+
slot1 = await pool.acquire()
58+
slot2 = await pool.acquire()
59+
60+
# Both slots should be distinct
61+
assert slot1.index != slot2.index
62+
assert slot1.path != slot2.path
63+
64+
# Release one and re-acquire it
65+
await pool.release(slot1)
66+
reacquired = await pool.acquire()
67+
assert reacquired.index == slot1.index
68+
69+
await pool.release(slot2)
70+
await pool.release(reacquired)
71+
72+
anyio.run(_run)
73+
74+
75+
class TestWorktreeSlotFileIsolation:
76+
def test_write_to_one_slot_does_not_affect_another(self, tmp_path: Path) -> None:
77+
from unittest.mock import patch
78+
79+
from codeflash.code_utils.worktree_pool import WorktreePool
80+
81+
pool_size = 2
82+
base_dir = tmp_path.resolve() / "worktrees"
83+
repo_root = Path(__file__).resolve().parents[1]
84+
test_file = repo_root / "codeflash" / "__init__.py"
85+
86+
async def _run() -> None:
87+
with patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root):
88+
async with WorktreePool(pool_size=pool_size, base_dir=base_dir) as pool:
89+
slot_a = await pool.acquire()
90+
slot_b = await pool.acquire()
91+
92+
sentinel = "# SLOT_A_SENTINEL_CONTENT\n"
93+
await slot_a.write_candidate(test_file, sentinel)
94+
95+
# slot_b's mirror of the same file should NOT contain the sentinel
96+
mirrored_b = slot_b.mirror(test_file)
97+
content_b = mirrored_b.read_text(encoding="utf-8")
98+
assert sentinel not in content_b
99+
100+
# slot_a's mirror should contain it
101+
mirrored_a = slot_a.mirror(test_file)
102+
content_a = mirrored_a.read_text(encoding="utf-8")
103+
assert content_a == sentinel
104+
105+
# Main tree should be unaffected
106+
main_content = test_file.read_text(encoding="utf-8")
107+
assert sentinel not in main_content
108+
109+
await pool.release(slot_a)
110+
await pool.release(slot_b)
111+
112+
anyio.run(_run)
113+
114+
115+
class TestAsyncExecuteTestSubprocess:
116+
def test_runs_simple_command(self) -> None:
117+
from codeflash.languages.python.test_runner import async_execute_test_subprocess
118+
119+
cwd = Path(__file__).resolve().parent
120+
121+
async def _run() -> subprocess.CompletedProcess[str]:
122+
return await async_execute_test_subprocess(
123+
cmd_list=[sys.executable, "-c", "print('hello world')"], cwd=cwd, env=None, timeout=30
124+
)
125+
126+
result = anyio.run(_run)
127+
assert result.returncode == 0
128+
assert "hello world" in result.stdout
129+
130+
def test_captures_stderr(self) -> None:
131+
from codeflash.languages.python.test_runner import async_execute_test_subprocess
132+
133+
cwd = Path(__file__).resolve().parent
134+
135+
async def _run() -> subprocess.CompletedProcess[str]:
136+
return await async_execute_test_subprocess(
137+
cmd_list=[sys.executable, "-c", "import sys; sys.stderr.write('err_msg\\n')"],
138+
cwd=cwd,
139+
env=None,
140+
timeout=30,
141+
)
142+
143+
result = anyio.run(_run)
144+
assert "err_msg" in result.stderr
145+
146+
def test_timeout_raises(self) -> None:
147+
from codeflash.languages.python.test_runner import async_execute_test_subprocess
148+
149+
cwd = Path(__file__).resolve().parent
150+
151+
async def _run() -> subprocess.CompletedProcess[str]:
152+
return await async_execute_test_subprocess(
153+
cmd_list=[sys.executable, "-c", "import time; time.sleep(60)"], cwd=cwd, env=None, timeout=1
154+
)
155+
156+
with pytest.raises(subprocess.TimeoutExpired):
157+
anyio.run(_run)
158+
159+
160+
class TestParallelCandidateEvaluator:
161+
"""Unit tests for the evaluator with mocked worktree operations."""
162+
163+
def _make_candidate_node(self, opt_id: str = "cand_1") -> CandidateNode:
164+
from codeflash.models.models import CodeString, CodeStringsMarkdown, OptimizedCandidate
165+
from codeflash.models.shared_types import OptimizedCandidateSource
166+
167+
source_code = CodeStringsMarkdown(code_strings=[CodeString(code="def f(): pass", file_path=Path("test.py"))])
168+
candidate = OptimizedCandidate(
169+
source_code=source_code,
170+
explanation="test optimization",
171+
optimization_id=opt_id,
172+
source=OptimizedCandidateSource.OPTIMIZE,
173+
)
174+
return CandidateNode(candidate)
175+
176+
def _make_optimizer_mock(self, tmp_path: Path) -> MagicMock:
177+
opt = MagicMock()
178+
opt.function_to_optimize.file_path = str(tmp_path / "src" / "module.py")
179+
opt.function_to_optimize_source_code = "def f(): pass"
180+
opt.test_files.test_files = []
181+
opt.args.project_root = str(tmp_path)
182+
opt.test_cfg = MagicMock()
183+
opt.get_test_env.return_value = {"PATH": "/usr/bin"}
184+
opt.language_support.build_pytest_cmd.return_value = [sys.executable, "-m", "pytest"]
185+
opt.replace_function_and_helpers_with_optimized_code.return_value = True
186+
opt.write_code_and_helpers = MagicMock()
187+
return opt
188+
189+
def test_code_replacement_failure_returns_eval_failure(self, tmp_path: Path) -> None:
190+
opt = self._make_optimizer_mock(tmp_path)
191+
opt.replace_function_and_helpers_with_optimized_code.return_value = False
192+
193+
node = self._make_candidate_node()
194+
evaluator = ParallelCandidateEvaluator(opt, pool_size=1)
195+
196+
repo_root = Path(__file__).resolve().parents[1]
197+
198+
async def _run() -> list: # type: ignore[type-arg]
199+
with patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root):
200+
return await evaluator.evaluate_candidates(
201+
candidates=[(node, 0, None)],
202+
code_context=MagicMock(),
203+
original_code_baseline=MagicMock(),
204+
original_helper_code={},
205+
file_path_to_helper_classes={},
206+
)
207+
208+
results = anyio.run(_run)
209+
assert len(results) == 1
210+
_, result = results[0]
211+
assert result is not None
212+
assert not is_successful(result)
213+
failure = result.failure()
214+
assert isinstance(failure, EvalFailure)
215+
assert "Code replacement failed" in failure.message
216+
assert failure.diffs == []
217+
218+
def test_behavioral_mismatch_carries_diffs(self, tmp_path: Path) -> None:
219+
from codeflash.models.models import TestDiff, TestDiffScope
220+
221+
opt = self._make_optimizer_mock(tmp_path)
222+
(tmp_path / "src").mkdir(parents=True)
223+
(tmp_path / "src" / "module.py").write_text("def f(): pass", encoding="utf-8")
224+
225+
node = self._make_candidate_node()
226+
evaluator = ParallelCandidateEvaluator(opt, pool_size=1)
227+
228+
repo_root = Path(__file__).resolve().parents[1]
229+
mock_diffs = [TestDiff(scope=TestDiffScope.DID_PASS, original_pass=True, candidate_pass=False)]
230+
231+
async def _run() -> list: # type: ignore[type-arg]
232+
with (
233+
patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root),
234+
patch.object(
235+
ParallelCandidateEvaluator,
236+
"_run_behavioral",
237+
return_value=Failure(EvalFailure(message="Behavioral mismatch: 1 diffs", diffs=mock_diffs)), # type: ignore[arg-type]
238+
),
239+
):
240+
return await evaluator.evaluate_candidates(
241+
candidates=[(node, 0, None)],
242+
code_context=MagicMock(),
243+
original_code_baseline=MagicMock(),
244+
original_helper_code={},
245+
file_path_to_helper_classes={},
246+
)
247+
248+
results = anyio.run(_run)
249+
_, result = results[0]
250+
assert not is_successful(result)
251+
failure = result.failure()
252+
assert len(failure.diffs) == 1
253+
assert failure.diffs[0].scope == TestDiffScope.DID_PASS
254+
255+
def test_successful_candidate_returns_result(self, tmp_path: Path) -> None:
256+
from codeflash.optimization.parallel_evaluator import _BehavioralPass
257+
258+
opt = self._make_optimizer_mock(tmp_path)
259+
(tmp_path / "src").mkdir(parents=True)
260+
(tmp_path / "src" / "module.py").write_text("def f(): pass", encoding="utf-8")
261+
262+
node = self._make_candidate_node()
263+
evaluator = ParallelCandidateEvaluator(opt, pool_size=1)
264+
265+
repo_root = Path(__file__).resolve().parents[1]
266+
mock_result = MagicMock()
267+
mock_result.best_test_runtime = 5000
268+
269+
async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg]
270+
slot = MagicMock()
271+
return Success(
272+
_BehavioralPass(slot=slot, candidate_index=0, perf_test_files=[], test_env={}, pytest_cmd_list=[])
273+
)
274+
275+
async def _run() -> list: # type: ignore[type-arg]
276+
with (
277+
patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root),
278+
patch.object(ParallelCandidateEvaluator, "_run_behavioral", mock_behavioral),
279+
patch.object(ParallelCandidateEvaluator, "_benchmark_phase", return_value=Success(mock_result)),
280+
):
281+
return await evaluator.evaluate_candidates(
282+
candidates=[(node, 0, None)],
283+
code_context=MagicMock(),
284+
original_code_baseline=MagicMock(),
285+
original_helper_code={},
286+
file_path_to_helper_classes={},
287+
)
288+
289+
results = anyio.run(_run)
290+
_, result = results[0]
291+
assert is_successful(result)
292+
assert result.unwrap().best_test_runtime == 5000
293+
294+
def test_multiple_candidates_evaluated_concurrently(self, tmp_path: Path) -> None:
295+
from codeflash.optimization.parallel_evaluator import _BehavioralPass
296+
297+
opt = self._make_optimizer_mock(tmp_path)
298+
(tmp_path / "src").mkdir(parents=True)
299+
(tmp_path / "src" / "module.py").write_text("def f(): pass", encoding="utf-8")
300+
301+
nodes = [self._make_candidate_node(f"cand_{i}") for i in range(3)]
302+
evaluator = ParallelCandidateEvaluator(opt, pool_size=3)
303+
304+
repo_root = Path(__file__).resolve().parents[1]
305+
mock_result = MagicMock()
306+
mock_result.best_test_runtime = 1000
307+
308+
behavioral_call_count = 0
309+
310+
async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg]
311+
nonlocal behavioral_call_count
312+
behavioral_call_count += 1
313+
slot = MagicMock()
314+
return Success(
315+
_BehavioralPass(slot=slot, candidate_index=0, perf_test_files=[], test_env={}, pytest_cmd_list=[])
316+
)
317+
318+
benchmark_call_count = 0
319+
320+
async def mock_benchmark(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg]
321+
nonlocal benchmark_call_count
322+
benchmark_call_count += 1
323+
return Success(mock_result)
324+
325+
async def _run() -> list: # type: ignore[type-arg]
326+
with (
327+
patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root),
328+
patch.object(ParallelCandidateEvaluator, "_run_behavioral", mock_behavioral),
329+
patch.object(ParallelCandidateEvaluator, "_benchmark_phase", mock_benchmark),
330+
):
331+
return await evaluator.evaluate_candidates(
332+
candidates=[(n, i, None) for i, n in enumerate(nodes)],
333+
code_context=MagicMock(),
334+
original_code_baseline=MagicMock(),
335+
original_helper_code={},
336+
file_path_to_helper_classes={},
337+
)
338+
339+
results = anyio.run(_run)
340+
assert len(results) == 3
341+
assert behavioral_call_count == 3
342+
assert benchmark_call_count == 3
343+
for _, result in results:
344+
assert is_successful(result)

0 commit comments

Comments
 (0)