Skip to content

Commit 4b8886d

Browse files
committed
feat: two-phase parallel candidate evaluator via worktrees
Phase 1 (concurrent): behavioral correctness tests run in parallel. Failed candidates release their worktree slot immediately. Phase 2 (sequential): only passing candidates get benchmarked, one at a time, for accurate timing without CPU contention. EvalFailure carries test diffs for repair context.
1 parent 60a6f2b commit 4b8886d

1 file changed

Lines changed: 375 additions & 0 deletions

File tree

Lines changed: 375 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,375 @@
1+
# mypy: ignore-errors
2+
from __future__ import annotations
3+
4+
import dataclasses
5+
import subprocess
6+
from pathlib import Path
7+
from typing import TYPE_CHECKING
8+
9+
import anyio
10+
11+
from codeflash.cli_cmds.console import logger
12+
from codeflash.code_utils.code_utils import get_run_tmp_file
13+
from codeflash.code_utils.config_consts import INDIVIDUAL_TESTCASE_TIMEOUT, TOTAL_LOOPING_TIME_EFFECTIVE
14+
from codeflash.code_utils.worktree_pool import WorktreePool, WorktreeSlot # noqa: TC001
15+
from codeflash.either import Failure, Success
16+
from codeflash.languages.python.test_runner import async_execute_test_subprocess
17+
18+
if TYPE_CHECKING:
19+
from codeflash.either import Result
20+
from codeflash.languages.function_optimizer import CandidateNode, FunctionOptimizer
21+
from codeflash.models.models import (
22+
CandidateEvaluationContext,
23+
CodeOptimizationContext,
24+
OptimizedCandidate,
25+
OptimizedCandidateResult,
26+
OriginalCodeBaseline,
27+
TestDiff,
28+
)
29+
30+
31+
@dataclasses.dataclass(slots=True)
32+
class EvalFailure:
33+
"""Structured failure from parallel evaluation, carrying test diffs for repair."""
34+
35+
message: str
36+
diffs: list[TestDiff] = dataclasses.field(default_factory=list)
37+
38+
39+
@dataclasses.dataclass(slots=True)
40+
class _BehavioralPass:
41+
"""Intermediate result: candidate passed behavioral tests, ready for benchmarking."""
42+
43+
slot: WorktreeSlot
44+
candidate_index: int
45+
perf_test_files: list[str]
46+
test_env: dict[str, str]
47+
pytest_cmd_list: list[str]
48+
49+
50+
class ParallelCandidateEvaluator:
51+
"""Evaluates optimization candidates in parallel using git worktrees.
52+
53+
Two-phase evaluation:
54+
Phase 1 (concurrent): behavioral correctness tests
55+
Phase 2 (sequential): benchmarking — one candidate at a time for accurate timing
56+
"""
57+
58+
def __init__(self, optimizer: FunctionOptimizer, pool_size: int = 4) -> None:
59+
self._optimizer = optimizer
60+
self._pool_size = pool_size
61+
self._pool: WorktreePool | None = None
62+
self._code_replace_lock = anyio.Lock()
63+
64+
async def evaluate_candidates(
65+
self,
66+
candidates: list[tuple[CandidateNode, int, str | None]],
67+
code_context: CodeOptimizationContext,
68+
original_code_baseline: OriginalCodeBaseline,
69+
original_helper_code: dict[Path, str],
70+
file_path_to_helper_classes: dict[Path, set[str]],
71+
) -> list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]]:
72+
"""Evaluate candidates: behavioral tests concurrently, benchmarks sequentially."""
73+
results: list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]] = [
74+
(node, None) for node, _, _ in candidates
75+
]
76+
77+
if not candidates:
78+
return results
79+
80+
async with WorktreePool(pool_size=self._pool_size) as pool:
81+
self._pool = pool
82+
83+
# Phase 1: concurrent behavioral tests
84+
behavioral_passes: list[tuple[int, CandidateNode, _BehavioralPass]] = []
85+
86+
async with anyio.create_task_group() as tg:
87+
for i, (node, idx, _cached) in enumerate(candidates):
88+
tg.start_soon(
89+
self._behavioral_phase,
90+
i,
91+
node,
92+
idx,
93+
code_context,
94+
original_code_baseline,
95+
original_helper_code,
96+
file_path_to_helper_classes,
97+
results,
98+
behavioral_passes,
99+
)
100+
101+
# Phase 2: sequential benchmarking (no CPU contention)
102+
for result_index, candidate_node, bp in behavioral_passes:
103+
try:
104+
bench_result = await self._benchmark_phase(bp, original_code_baseline)
105+
results[result_index] = (candidate_node, bench_result)
106+
except Exception as exc:
107+
logger.error(f"Benchmark for {candidate_node.candidate.optimization_id} raised: {exc}")
108+
results[result_index] = (candidate_node, Failure(EvalFailure(message=str(exc))))
109+
finally:
110+
await pool.release(bp.slot)
111+
112+
return results
113+
114+
async def _behavioral_phase(
115+
self,
116+
result_index: int,
117+
candidate_node: CandidateNode,
118+
candidate_index: int,
119+
code_context: CodeOptimizationContext,
120+
original_code_baseline: OriginalCodeBaseline,
121+
original_helper_code: dict[Path, str],
122+
file_path_to_helper_classes: dict[Path, set[str]],
123+
results: list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]],
124+
behavioral_passes: list[tuple[int, CandidateNode, _BehavioralPass]],
125+
) -> None:
126+
"""Run behavioral tests for a candidate. On pass, hold the slot for benchmarking."""
127+
assert self._pool is not None
128+
slot = await self._pool.acquire()
129+
try:
130+
outcome = await self._run_behavioral(
131+
slot=slot,
132+
candidate=candidate_node.candidate,
133+
candidate_index=candidate_index,
134+
code_context=code_context,
135+
original_code_baseline=original_code_baseline,
136+
original_helper_code=original_helper_code,
137+
)
138+
except Exception as exc:
139+
logger.error(f"Candidate {candidate_node.candidate.optimization_id} raised: {exc}")
140+
results[result_index] = (candidate_node, Failure(EvalFailure(message=str(exc))))
141+
await self._pool.release(slot)
142+
return
143+
144+
if isinstance(outcome, Failure):
145+
results[result_index] = (candidate_node, outcome)
146+
await self._pool.release(slot)
147+
return
148+
149+
# Behavioral pass — hold the slot for Phase 2
150+
behavioral_passes.append((result_index, candidate_node, outcome.unwrap()))
151+
152+
async def _run_behavioral(
153+
self,
154+
slot: WorktreeSlot,
155+
candidate: OptimizedCandidate,
156+
candidate_index: int,
157+
code_context: CodeOptimizationContext,
158+
original_code_baseline: OriginalCodeBaseline,
159+
original_helper_code: dict[Path, str],
160+
) -> Result[_BehavioralPass, EvalFailure]:
161+
"""Run behavioral tests in a worktree. Returns pass info or failure."""
162+
opt = self._optimizer
163+
fto = opt.function_to_optimize
164+
165+
async with self._code_replace_lock:
166+
candidate_files = await anyio.to_thread.run_sync(
167+
self._replace_and_capture, opt, code_context, candidate, original_helper_code
168+
)
169+
170+
if candidate_files is None:
171+
return Failure(EvalFailure(message="Code replacement failed"))
172+
173+
fto_code, helper_codes = candidate_files
174+
await slot.write_candidate(Path(fto.file_path), fto_code)
175+
for module_abspath, helper_code in helper_codes.items():
176+
await slot.write_candidate(module_abspath, helper_code)
177+
178+
# Copy instrumented test files into the worktree
179+
behavior_test_files: list[str] = []
180+
perf_test_files: list[str] = []
181+
for file in opt.test_files.test_files:
182+
src = file.instrumented_behavior_file_path
183+
if src.exists():
184+
await slot.write_candidate(src, src.read_text(encoding="utf-8"))
185+
behavior_test_files.append(str(slot.mirror(src)))
186+
187+
if file.benchmarking_file_path and file.benchmarking_file_path.exists():
188+
await slot.write_candidate(
189+
file.benchmarking_file_path, file.benchmarking_file_path.read_text(encoding="utf-8")
190+
)
191+
perf_test_files.append(str(slot.mirror(file.benchmarking_file_path)))
192+
193+
# Build test environment and command
194+
test_env = opt.get_test_env(
195+
codeflash_loop_index=0, codeflash_test_iteration=candidate_index, codeflash_tracer_disable=1
196+
)
197+
worktree_project_root = slot.mirror(Path(opt.args.project_root))
198+
test_env["PYTHONPATH"] = str(worktree_project_root)
199+
200+
from codeflash.code_utils.compat import IS_POSIX, SAFE_SYS_EXECUTABLE
201+
202+
pytest_cmd_list = opt.language_support.build_pytest_cmd(SAFE_SYS_EXECUTABLE, IS_POSIX) # type: ignore[attr-defined]
203+
204+
blocklisted_plugins = ["benchmark", "codspeed", "xdist", "sugar"]
205+
blocklist_args = [f"-p no:{plugin}" for plugin in blocklisted_plugins]
206+
207+
result_file_path = get_run_tmp_file(Path(f"pytest_results_candidate_{candidate_index}.xml"))
208+
result_args = [f"--junitxml={result_file_path.as_posix()}", "-o", "junit_logging=all"]
209+
210+
pytest_test_env = test_env.copy()
211+
pytest_test_env["PYTEST_PLUGINS"] = "codeflash.verification.pytest_plugin"
212+
213+
common_pytest_args = [
214+
"--capture=tee-sys",
215+
"-q",
216+
"--codeflash_loops_scope=session",
217+
"--codeflash_min_loops=1",
218+
"--codeflash_max_loops=1",
219+
f"--codeflash_seconds={TOTAL_LOOPING_TIME_EFFECTIVE}",
220+
f"--timeout={INDIVIDUAL_TESTCASE_TIMEOUT}",
221+
]
222+
223+
cmd = pytest_cmd_list + common_pytest_args + blocklist_args + result_args + behavior_test_files
224+
225+
try:
226+
behavior_result = await async_execute_test_subprocess(
227+
cmd_list=cmd, cwd=slot.path, env=pytest_test_env, timeout=600
228+
)
229+
except subprocess.TimeoutExpired:
230+
logger.warning(f"Behavioral test timeout for candidate {candidate_index}")
231+
return Failure(EvalFailure(message="Behavioral test timeout"))
232+
233+
from codeflash.verification.parse_test_output import parse_test_xml
234+
235+
behavior_test_results = parse_test_xml(
236+
result_file_path, test_files=opt.test_files, test_config=opt.test_cfg, run_result=behavior_result
237+
)
238+
239+
if not behavior_test_results.test_results:
240+
return Failure(EvalFailure(message="No behavioral test results"))
241+
242+
from codeflash.verification.equivalence import compare_test_results
243+
244+
match, diffs = compare_test_results(
245+
original_code_baseline.behavior_test_results, behavior_test_results, pass_fail_only=True
246+
)
247+
248+
if not match:
249+
return Failure(EvalFailure(message=f"Behavioral mismatch: {len(diffs)} diffs", diffs=diffs))
250+
251+
return Success(
252+
_BehavioralPass(
253+
slot=slot,
254+
candidate_index=candidate_index,
255+
perf_test_files=perf_test_files,
256+
test_env=pytest_test_env,
257+
pytest_cmd_list=pytest_cmd_list,
258+
)
259+
)
260+
261+
async def _benchmark_phase(
262+
self, bp: _BehavioralPass, original_code_baseline: OriginalCodeBaseline
263+
) -> Result[OptimizedCandidateResult, EvalFailure]:
264+
"""Run performance benchmarks sequentially for a candidate that passed behavioral tests."""
265+
opt = self._optimizer
266+
267+
blocklisted_plugins = ["benchmark", "codspeed", "xdist", "sugar"]
268+
blocklist_args = [f"-p no:{plugin}" for plugin in blocklisted_plugins]
269+
270+
perf_result_file = get_run_tmp_file(Path(f"pytest_perf_candidate_{bp.candidate_index}.xml"))
271+
perf_result_args = [f"--junitxml={perf_result_file.as_posix()}", "-o", "junit_logging=all"]
272+
273+
perf_pytest_args = [
274+
"--capture=tee-sys",
275+
"-q",
276+
"--codeflash_loops_scope=session",
277+
"--codeflash_min_loops=5",
278+
"--codeflash_max_loops=250",
279+
f"--codeflash_seconds={TOTAL_LOOPING_TIME_EFFECTIVE}",
280+
f"--timeout={INDIVIDUAL_TESTCASE_TIMEOUT}",
281+
]
282+
283+
perf_cmd = bp.pytest_cmd_list + perf_pytest_args + blocklist_args + perf_result_args + bp.perf_test_files
284+
285+
try:
286+
await async_execute_test_subprocess(cmd_list=perf_cmd, cwd=bp.slot.path, env=bp.test_env, timeout=600)
287+
except subprocess.TimeoutExpired:
288+
logger.warning(f"Performance test timeout for candidate {bp.candidate_index}")
289+
return Failure(EvalFailure(message="Performance test timeout"))
290+
291+
from codeflash.verification.parse_test_output import parse_test_xml
292+
293+
perf_test_results = parse_test_xml(perf_result_file, test_files=opt.test_files, test_config=opt.test_cfg)
294+
295+
if not perf_test_results.test_results:
296+
return Failure(EvalFailure(message="No performance test results"))
297+
298+
loop_count = perf_test_results.effective_loop_count()
299+
total_timing = perf_test_results.total_passed_runtime()
300+
301+
if total_timing == 0:
302+
return Failure(EvalFailure(message="Zero runtime for optimized candidate"))
303+
304+
from codeflash.models.models import OptimizedCandidateResult
305+
306+
return Success(
307+
OptimizedCandidateResult(
308+
max_loop_count=loop_count,
309+
best_test_runtime=total_timing,
310+
behavior_test_results=None,
311+
benchmarking_test_results=perf_test_results,
312+
replay_benchmarking_test_results=None,
313+
optimization_candidate_index=bp.candidate_index,
314+
total_candidate_timing=total_timing,
315+
async_throughput=None,
316+
concurrency_metrics=None,
317+
)
318+
)
319+
320+
@staticmethod
321+
def _replace_and_capture(
322+
opt: FunctionOptimizer,
323+
code_context: CodeOptimizationContext,
324+
candidate: OptimizedCandidate,
325+
original_helper_code: dict[Path, str],
326+
) -> tuple[str, dict[Path, str]] | None:
327+
"""Apply code replacement to main tree, capture the result, restore original."""
328+
fto = opt.function_to_optimize
329+
try:
330+
did_update = opt.replace_function_and_helpers_with_optimized_code(
331+
code_context=code_context,
332+
optimized_code=candidate.source_code,
333+
original_helper_code=original_helper_code,
334+
)
335+
if not did_update:
336+
return None
337+
338+
fto_code = Path(fto.file_path).read_text("utf-8")
339+
helper_codes = {Path(p): Path(p).read_text("utf-8") for p in original_helper_code}
340+
return fto_code, helper_codes
341+
except (ValueError, SyntaxError, AttributeError) as e:
342+
logger.error(f"Code replacement failed: {e}")
343+
return None
344+
finally:
345+
opt.write_code_and_helpers(opt.function_to_optimize_source_code, original_helper_code, fto.file_path)
346+
347+
348+
def run_parallel_evaluation(
349+
optimizer: FunctionOptimizer,
350+
candidates: list[tuple[CandidateNode, int, str | None]],
351+
code_context: CodeOptimizationContext,
352+
original_code_baseline: OriginalCodeBaseline,
353+
original_helper_code: dict[Path, str],
354+
file_path_to_helper_classes: dict[Path, set[str]],
355+
eval_ctx: CandidateEvaluationContext,
356+
exp_type: str,
357+
pool_size: int = 4,
358+
) -> tuple[list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]], list, list]:
359+
"""Entry point: run parallel candidate evaluation from sync code via anyio.
360+
361+
Returns (eval_results, [], []). The empty lists maintain backward compatibility.
362+
"""
363+
evaluator = ParallelCandidateEvaluator(optimizer, pool_size=pool_size)
364+
365+
async def _run() -> list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]]:
366+
return await evaluator.evaluate_candidates(
367+
candidates=candidates,
368+
code_context=code_context,
369+
original_code_baseline=original_code_baseline,
370+
original_helper_code=original_helper_code,
371+
file_path_to_helper_classes=file_path_to_helper_classes,
372+
)
373+
374+
results = anyio.run(_run)
375+
return results, [], []

0 commit comments

Comments
 (0)