|
| 1 | +# mypy: ignore-errors |
| 2 | +from __future__ import annotations |
| 3 | + |
| 4 | +import dataclasses |
| 5 | +import subprocess |
| 6 | +from pathlib import Path |
| 7 | +from typing import TYPE_CHECKING |
| 8 | + |
| 9 | +import anyio |
| 10 | + |
| 11 | +from codeflash.cli_cmds.console import logger |
| 12 | +from codeflash.code_utils.code_utils import get_run_tmp_file |
| 13 | +from codeflash.code_utils.config_consts import INDIVIDUAL_TESTCASE_TIMEOUT, TOTAL_LOOPING_TIME_EFFECTIVE |
| 14 | +from codeflash.code_utils.worktree_pool import WorktreePool, WorktreeSlot # noqa: TC001 |
| 15 | +from codeflash.either import Failure, Success |
| 16 | +from codeflash.languages.python.test_runner import async_execute_test_subprocess |
| 17 | + |
| 18 | +if TYPE_CHECKING: |
| 19 | + from codeflash.either import Result |
| 20 | + from codeflash.languages.function_optimizer import CandidateNode, FunctionOptimizer |
| 21 | + from codeflash.models.models import ( |
| 22 | + CandidateEvaluationContext, |
| 23 | + CodeOptimizationContext, |
| 24 | + OptimizedCandidate, |
| 25 | + OptimizedCandidateResult, |
| 26 | + OriginalCodeBaseline, |
| 27 | + TestDiff, |
| 28 | + ) |
| 29 | + |
| 30 | + |
| 31 | +@dataclasses.dataclass(slots=True) |
| 32 | +class EvalFailure: |
| 33 | + """Structured failure from parallel evaluation, carrying test diffs for repair.""" |
| 34 | + |
| 35 | + message: str |
| 36 | + diffs: list[TestDiff] = dataclasses.field(default_factory=list) |
| 37 | + |
| 38 | + |
| 39 | +@dataclasses.dataclass(slots=True) |
| 40 | +class _BehavioralPass: |
| 41 | + """Intermediate result: candidate passed behavioral tests, ready for benchmarking.""" |
| 42 | + |
| 43 | + slot: WorktreeSlot |
| 44 | + candidate_index: int |
| 45 | + perf_test_files: list[str] |
| 46 | + test_env: dict[str, str] |
| 47 | + pytest_cmd_list: list[str] |
| 48 | + |
| 49 | + |
| 50 | +class ParallelCandidateEvaluator: |
| 51 | + """Evaluates optimization candidates in parallel using git worktrees. |
| 52 | +
|
| 53 | + Two-phase evaluation: |
| 54 | + Phase 1 (concurrent): behavioral correctness tests |
| 55 | + Phase 2 (sequential): benchmarking — one candidate at a time for accurate timing |
| 56 | + """ |
| 57 | + |
| 58 | + def __init__(self, optimizer: FunctionOptimizer, pool_size: int = 4) -> None: |
| 59 | + self._optimizer = optimizer |
| 60 | + self._pool_size = pool_size |
| 61 | + self._pool: WorktreePool | None = None |
| 62 | + self._code_replace_lock = anyio.Lock() |
| 63 | + |
| 64 | + async def evaluate_candidates( |
| 65 | + self, |
| 66 | + candidates: list[tuple[CandidateNode, int, str | None]], |
| 67 | + code_context: CodeOptimizationContext, |
| 68 | + original_code_baseline: OriginalCodeBaseline, |
| 69 | + original_helper_code: dict[Path, str], |
| 70 | + file_path_to_helper_classes: dict[Path, set[str]], |
| 71 | + ) -> list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]]: |
| 72 | + """Evaluate candidates: behavioral tests concurrently, benchmarks sequentially.""" |
| 73 | + results: list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]] = [ |
| 74 | + (node, None) for node, _, _ in candidates |
| 75 | + ] |
| 76 | + |
| 77 | + if not candidates: |
| 78 | + return results |
| 79 | + |
| 80 | + async with WorktreePool(pool_size=self._pool_size) as pool: |
| 81 | + self._pool = pool |
| 82 | + |
| 83 | + # Phase 1: concurrent behavioral tests |
| 84 | + behavioral_passes: list[tuple[int, CandidateNode, _BehavioralPass]] = [] |
| 85 | + |
| 86 | + async with anyio.create_task_group() as tg: |
| 87 | + for i, (node, idx, _cached) in enumerate(candidates): |
| 88 | + tg.start_soon( |
| 89 | + self._behavioral_phase, |
| 90 | + i, |
| 91 | + node, |
| 92 | + idx, |
| 93 | + code_context, |
| 94 | + original_code_baseline, |
| 95 | + original_helper_code, |
| 96 | + file_path_to_helper_classes, |
| 97 | + results, |
| 98 | + behavioral_passes, |
| 99 | + ) |
| 100 | + |
| 101 | + # Phase 2: sequential benchmarking (no CPU contention) |
| 102 | + for result_index, candidate_node, bp in behavioral_passes: |
| 103 | + try: |
| 104 | + bench_result = await self._benchmark_phase(bp, original_code_baseline) |
| 105 | + results[result_index] = (candidate_node, bench_result) |
| 106 | + except Exception as exc: |
| 107 | + logger.error(f"Benchmark for {candidate_node.candidate.optimization_id} raised: {exc}") |
| 108 | + results[result_index] = (candidate_node, Failure(EvalFailure(message=str(exc)))) |
| 109 | + finally: |
| 110 | + await pool.release(bp.slot) |
| 111 | + |
| 112 | + return results |
| 113 | + |
| 114 | + async def _behavioral_phase( |
| 115 | + self, |
| 116 | + result_index: int, |
| 117 | + candidate_node: CandidateNode, |
| 118 | + candidate_index: int, |
| 119 | + code_context: CodeOptimizationContext, |
| 120 | + original_code_baseline: OriginalCodeBaseline, |
| 121 | + original_helper_code: dict[Path, str], |
| 122 | + file_path_to_helper_classes: dict[Path, set[str]], |
| 123 | + results: list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]], |
| 124 | + behavioral_passes: list[tuple[int, CandidateNode, _BehavioralPass]], |
| 125 | + ) -> None: |
| 126 | + """Run behavioral tests for a candidate. On pass, hold the slot for benchmarking.""" |
| 127 | + assert self._pool is not None |
| 128 | + slot = await self._pool.acquire() |
| 129 | + try: |
| 130 | + outcome = await self._run_behavioral( |
| 131 | + slot=slot, |
| 132 | + candidate=candidate_node.candidate, |
| 133 | + candidate_index=candidate_index, |
| 134 | + code_context=code_context, |
| 135 | + original_code_baseline=original_code_baseline, |
| 136 | + original_helper_code=original_helper_code, |
| 137 | + ) |
| 138 | + except Exception as exc: |
| 139 | + logger.error(f"Candidate {candidate_node.candidate.optimization_id} raised: {exc}") |
| 140 | + results[result_index] = (candidate_node, Failure(EvalFailure(message=str(exc)))) |
| 141 | + await self._pool.release(slot) |
| 142 | + return |
| 143 | + |
| 144 | + if isinstance(outcome, Failure): |
| 145 | + results[result_index] = (candidate_node, outcome) |
| 146 | + await self._pool.release(slot) |
| 147 | + return |
| 148 | + |
| 149 | + # Behavioral pass — hold the slot for Phase 2 |
| 150 | + behavioral_passes.append((result_index, candidate_node, outcome.unwrap())) |
| 151 | + |
| 152 | + async def _run_behavioral( |
| 153 | + self, |
| 154 | + slot: WorktreeSlot, |
| 155 | + candidate: OptimizedCandidate, |
| 156 | + candidate_index: int, |
| 157 | + code_context: CodeOptimizationContext, |
| 158 | + original_code_baseline: OriginalCodeBaseline, |
| 159 | + original_helper_code: dict[Path, str], |
| 160 | + ) -> Result[_BehavioralPass, EvalFailure]: |
| 161 | + """Run behavioral tests in a worktree. Returns pass info or failure.""" |
| 162 | + opt = self._optimizer |
| 163 | + fto = opt.function_to_optimize |
| 164 | + |
| 165 | + async with self._code_replace_lock: |
| 166 | + candidate_files = await anyio.to_thread.run_sync( |
| 167 | + self._replace_and_capture, opt, code_context, candidate, original_helper_code |
| 168 | + ) |
| 169 | + |
| 170 | + if candidate_files is None: |
| 171 | + return Failure(EvalFailure(message="Code replacement failed")) |
| 172 | + |
| 173 | + fto_code, helper_codes = candidate_files |
| 174 | + await slot.write_candidate(Path(fto.file_path), fto_code) |
| 175 | + for module_abspath, helper_code in helper_codes.items(): |
| 176 | + await slot.write_candidate(module_abspath, helper_code) |
| 177 | + |
| 178 | + # Copy instrumented test files into the worktree |
| 179 | + behavior_test_files: list[str] = [] |
| 180 | + perf_test_files: list[str] = [] |
| 181 | + for file in opt.test_files.test_files: |
| 182 | + src = file.instrumented_behavior_file_path |
| 183 | + if src.exists(): |
| 184 | + await slot.write_candidate(src, src.read_text(encoding="utf-8")) |
| 185 | + behavior_test_files.append(str(slot.mirror(src))) |
| 186 | + |
| 187 | + if file.benchmarking_file_path and file.benchmarking_file_path.exists(): |
| 188 | + await slot.write_candidate( |
| 189 | + file.benchmarking_file_path, file.benchmarking_file_path.read_text(encoding="utf-8") |
| 190 | + ) |
| 191 | + perf_test_files.append(str(slot.mirror(file.benchmarking_file_path))) |
| 192 | + |
| 193 | + # Build test environment and command |
| 194 | + test_env = opt.get_test_env( |
| 195 | + codeflash_loop_index=0, codeflash_test_iteration=candidate_index, codeflash_tracer_disable=1 |
| 196 | + ) |
| 197 | + worktree_project_root = slot.mirror(Path(opt.args.project_root)) |
| 198 | + test_env["PYTHONPATH"] = str(worktree_project_root) |
| 199 | + |
| 200 | + from codeflash.code_utils.compat import IS_POSIX, SAFE_SYS_EXECUTABLE |
| 201 | + |
| 202 | + pytest_cmd_list = opt.language_support.build_pytest_cmd(SAFE_SYS_EXECUTABLE, IS_POSIX) # type: ignore[attr-defined] |
| 203 | + |
| 204 | + blocklisted_plugins = ["benchmark", "codspeed", "xdist", "sugar"] |
| 205 | + blocklist_args = [f"-p no:{plugin}" for plugin in blocklisted_plugins] |
| 206 | + |
| 207 | + result_file_path = get_run_tmp_file(Path(f"pytest_results_candidate_{candidate_index}.xml")) |
| 208 | + result_args = [f"--junitxml={result_file_path.as_posix()}", "-o", "junit_logging=all"] |
| 209 | + |
| 210 | + pytest_test_env = test_env.copy() |
| 211 | + pytest_test_env["PYTEST_PLUGINS"] = "codeflash.verification.pytest_plugin" |
| 212 | + |
| 213 | + common_pytest_args = [ |
| 214 | + "--capture=tee-sys", |
| 215 | + "-q", |
| 216 | + "--codeflash_loops_scope=session", |
| 217 | + "--codeflash_min_loops=1", |
| 218 | + "--codeflash_max_loops=1", |
| 219 | + f"--codeflash_seconds={TOTAL_LOOPING_TIME_EFFECTIVE}", |
| 220 | + f"--timeout={INDIVIDUAL_TESTCASE_TIMEOUT}", |
| 221 | + ] |
| 222 | + |
| 223 | + cmd = pytest_cmd_list + common_pytest_args + blocklist_args + result_args + behavior_test_files |
| 224 | + |
| 225 | + try: |
| 226 | + behavior_result = await async_execute_test_subprocess( |
| 227 | + cmd_list=cmd, cwd=slot.path, env=pytest_test_env, timeout=600 |
| 228 | + ) |
| 229 | + except subprocess.TimeoutExpired: |
| 230 | + logger.warning(f"Behavioral test timeout for candidate {candidate_index}") |
| 231 | + return Failure(EvalFailure(message="Behavioral test timeout")) |
| 232 | + |
| 233 | + from codeflash.verification.parse_test_output import parse_test_xml |
| 234 | + |
| 235 | + behavior_test_results = parse_test_xml( |
| 236 | + result_file_path, test_files=opt.test_files, test_config=opt.test_cfg, run_result=behavior_result |
| 237 | + ) |
| 238 | + |
| 239 | + if not behavior_test_results.test_results: |
| 240 | + return Failure(EvalFailure(message="No behavioral test results")) |
| 241 | + |
| 242 | + from codeflash.verification.equivalence import compare_test_results |
| 243 | + |
| 244 | + match, diffs = compare_test_results( |
| 245 | + original_code_baseline.behavior_test_results, behavior_test_results, pass_fail_only=True |
| 246 | + ) |
| 247 | + |
| 248 | + if not match: |
| 249 | + return Failure(EvalFailure(message=f"Behavioral mismatch: {len(diffs)} diffs", diffs=diffs)) |
| 250 | + |
| 251 | + return Success( |
| 252 | + _BehavioralPass( |
| 253 | + slot=slot, |
| 254 | + candidate_index=candidate_index, |
| 255 | + perf_test_files=perf_test_files, |
| 256 | + test_env=pytest_test_env, |
| 257 | + pytest_cmd_list=pytest_cmd_list, |
| 258 | + ) |
| 259 | + ) |
| 260 | + |
| 261 | + async def _benchmark_phase( |
| 262 | + self, bp: _BehavioralPass, original_code_baseline: OriginalCodeBaseline |
| 263 | + ) -> Result[OptimizedCandidateResult, EvalFailure]: |
| 264 | + """Run performance benchmarks sequentially for a candidate that passed behavioral tests.""" |
| 265 | + opt = self._optimizer |
| 266 | + |
| 267 | + blocklisted_plugins = ["benchmark", "codspeed", "xdist", "sugar"] |
| 268 | + blocklist_args = [f"-p no:{plugin}" for plugin in blocklisted_plugins] |
| 269 | + |
| 270 | + perf_result_file = get_run_tmp_file(Path(f"pytest_perf_candidate_{bp.candidate_index}.xml")) |
| 271 | + perf_result_args = [f"--junitxml={perf_result_file.as_posix()}", "-o", "junit_logging=all"] |
| 272 | + |
| 273 | + perf_pytest_args = [ |
| 274 | + "--capture=tee-sys", |
| 275 | + "-q", |
| 276 | + "--codeflash_loops_scope=session", |
| 277 | + "--codeflash_min_loops=5", |
| 278 | + "--codeflash_max_loops=250", |
| 279 | + f"--codeflash_seconds={TOTAL_LOOPING_TIME_EFFECTIVE}", |
| 280 | + f"--timeout={INDIVIDUAL_TESTCASE_TIMEOUT}", |
| 281 | + ] |
| 282 | + |
| 283 | + perf_cmd = bp.pytest_cmd_list + perf_pytest_args + blocklist_args + perf_result_args + bp.perf_test_files |
| 284 | + |
| 285 | + try: |
| 286 | + await async_execute_test_subprocess(cmd_list=perf_cmd, cwd=bp.slot.path, env=bp.test_env, timeout=600) |
| 287 | + except subprocess.TimeoutExpired: |
| 288 | + logger.warning(f"Performance test timeout for candidate {bp.candidate_index}") |
| 289 | + return Failure(EvalFailure(message="Performance test timeout")) |
| 290 | + |
| 291 | + from codeflash.verification.parse_test_output import parse_test_xml |
| 292 | + |
| 293 | + perf_test_results = parse_test_xml(perf_result_file, test_files=opt.test_files, test_config=opt.test_cfg) |
| 294 | + |
| 295 | + if not perf_test_results.test_results: |
| 296 | + return Failure(EvalFailure(message="No performance test results")) |
| 297 | + |
| 298 | + loop_count = perf_test_results.effective_loop_count() |
| 299 | + total_timing = perf_test_results.total_passed_runtime() |
| 300 | + |
| 301 | + if total_timing == 0: |
| 302 | + return Failure(EvalFailure(message="Zero runtime for optimized candidate")) |
| 303 | + |
| 304 | + from codeflash.models.models import OptimizedCandidateResult |
| 305 | + |
| 306 | + return Success( |
| 307 | + OptimizedCandidateResult( |
| 308 | + max_loop_count=loop_count, |
| 309 | + best_test_runtime=total_timing, |
| 310 | + behavior_test_results=None, |
| 311 | + benchmarking_test_results=perf_test_results, |
| 312 | + replay_benchmarking_test_results=None, |
| 313 | + optimization_candidate_index=bp.candidate_index, |
| 314 | + total_candidate_timing=total_timing, |
| 315 | + async_throughput=None, |
| 316 | + concurrency_metrics=None, |
| 317 | + ) |
| 318 | + ) |
| 319 | + |
| 320 | + @staticmethod |
| 321 | + def _replace_and_capture( |
| 322 | + opt: FunctionOptimizer, |
| 323 | + code_context: CodeOptimizationContext, |
| 324 | + candidate: OptimizedCandidate, |
| 325 | + original_helper_code: dict[Path, str], |
| 326 | + ) -> tuple[str, dict[Path, str]] | None: |
| 327 | + """Apply code replacement to main tree, capture the result, restore original.""" |
| 328 | + fto = opt.function_to_optimize |
| 329 | + try: |
| 330 | + did_update = opt.replace_function_and_helpers_with_optimized_code( |
| 331 | + code_context=code_context, |
| 332 | + optimized_code=candidate.source_code, |
| 333 | + original_helper_code=original_helper_code, |
| 334 | + ) |
| 335 | + if not did_update: |
| 336 | + return None |
| 337 | + |
| 338 | + fto_code = Path(fto.file_path).read_text("utf-8") |
| 339 | + helper_codes = {Path(p): Path(p).read_text("utf-8") for p in original_helper_code} |
| 340 | + return fto_code, helper_codes |
| 341 | + except (ValueError, SyntaxError, AttributeError) as e: |
| 342 | + logger.error(f"Code replacement failed: {e}") |
| 343 | + return None |
| 344 | + finally: |
| 345 | + opt.write_code_and_helpers(opt.function_to_optimize_source_code, original_helper_code, fto.file_path) |
| 346 | + |
| 347 | + |
| 348 | +def run_parallel_evaluation( |
| 349 | + optimizer: FunctionOptimizer, |
| 350 | + candidates: list[tuple[CandidateNode, int, str | None]], |
| 351 | + code_context: CodeOptimizationContext, |
| 352 | + original_code_baseline: OriginalCodeBaseline, |
| 353 | + original_helper_code: dict[Path, str], |
| 354 | + file_path_to_helper_classes: dict[Path, set[str]], |
| 355 | + eval_ctx: CandidateEvaluationContext, |
| 356 | + exp_type: str, |
| 357 | + pool_size: int = 4, |
| 358 | +) -> tuple[list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]], list, list]: |
| 359 | + """Entry point: run parallel candidate evaluation from sync code via anyio. |
| 360 | +
|
| 361 | + Returns (eval_results, [], []). The empty lists maintain backward compatibility. |
| 362 | + """ |
| 363 | + evaluator = ParallelCandidateEvaluator(optimizer, pool_size=pool_size) |
| 364 | + |
| 365 | + async def _run() -> list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]]: |
| 366 | + return await evaluator.evaluate_candidates( |
| 367 | + candidates=candidates, |
| 368 | + code_context=code_context, |
| 369 | + original_code_baseline=original_code_baseline, |
| 370 | + original_helper_code=original_helper_code, |
| 371 | + file_path_to_helper_classes=file_path_to_helper_classes, |
| 372 | + ) |
| 373 | + |
| 374 | + results = anyio.run(_run) |
| 375 | + return results, [], [] |
0 commit comments