diff --git a/codeflash/api/aiservice.py b/codeflash/api/aiservice.py index 68351209c..95846be75 100644 --- a/codeflash/api/aiservice.py +++ b/codeflash/api/aiservice.py @@ -440,7 +440,7 @@ def optimize_code_refinement_batch( if response.status_code == 200: refined_optimizations = response.json()["refinements"] - return self._get_valid_candidates(refined_optimizations, OptimizedCandidateSource.REFINE) + return self._get_valid_candidates(refined_optimizations, OptimizedCandidateSource.REFINE, language=language) self.log_error_response(response, "generating batch optimized candidates", "cli-optimize-error-response") console.rule() diff --git a/codeflash/code_utils/worktree_pool.py b/codeflash/code_utils/worktree_pool.py index b4c40d24e..f71eba5d9 100644 --- a/codeflash/code_utils/worktree_pool.py +++ b/codeflash/code_utils/worktree_pool.py @@ -2,9 +2,9 @@ import contextlib import functools -import os import shutil import stat +import sys from pathlib import Path from typing import TYPE_CHECKING, Any @@ -17,6 +17,8 @@ from codeflash.cli_cmds.console import logger from codeflash.code_utils.git_utils import git_root_dir, mirror_path +_USE_ONEXC = sys.version_info >= (3, 12) + class WorktreeSlot: __slots__ = ("_git_root", "index", "path") @@ -34,10 +36,6 @@ async def write_candidate(self, file_path: Path, code: str) -> None: await mirrored.parent.mkdir(parents=True, exist_ok=True) await mirrored.write_text(code, encoding="utf-8") - async def restore_file(self, file_path: Path, original_code: str) -> None: - mirrored = anyio.Path(self.mirror(file_path)) - await mirrored.write_text(original_code, encoding="utf-8") - class WorktreePool: def __init__(self, pool_size: int = 4, base_dir: Path | None = None) -> None: @@ -48,42 +46,47 @@ def __init__(self, pool_size: int = 4, base_dir: Path | None = None) -> None: self._send: anyio.abc.ObjectSendStream[WorktreeSlot] | None = None self._receive: anyio.abc.ObjectReceiveStream[WorktreeSlot] | None = None self._initialized = False + self._closed = False async def initialize(self) -> None: if self._initialized: return await anyio.Path(self._base_dir).mkdir(parents=True, exist_ok=True) + results: list[WorktreeSlot | None] = [None] * self._pool_size async with anyio.create_task_group() as tg: - results: list[WorktreeSlot | None] = [None] * self._pool_size for i in range(self._pool_size): tg.start_soon(self._create_slot_task, i, results) self._slots = [s for s in results if s is not None] - self._send, self._receive = anyio.create_memory_object_stream[WorktreeSlot](self._pool_size) + if not self._slots: + msg = "Failed to create any worktree slots" + raise RuntimeError(msg) + + self._send, self._receive = anyio.create_memory_object_stream[WorktreeSlot](len(self._slots)) for slot in self._slots: await self._send.send(slot) self._initialized = True logger.debug(f"WorktreePool initialized with {len(self._slots)} slots at {self._base_dir}") async def _create_slot_task(self, index: int, results: list[WorktreeSlot | None]) -> None: - results[index] = await self._create_slot(index) + try: + results[index] = await self._create_slot(index) + except Exception as exc: + logger.warning(f"Failed to create worktree slot {index}: {exc}") async def _create_slot(self, index: int) -> WorktreeSlot: slot_dir = self._base_dir / f"slot-{index}" - if slot_dir.exists(): - await anyio.to_thread.run_sync(functools.partial(shutil.rmtree, slot_dir, onerror=_handle_remove_readonly)) + if await anyio.Path(slot_dir).exists(): + await anyio.to_thread.run_sync(functools.partial(_rmtree_safe, slot_dir)) result = await anyio.run_process( ["git", "-C", str(self._git_root), "worktree", "add", "--detach", str(slot_dir), "HEAD"], check=False ) if result.returncode != 0: - msg = f"Failed to create worktree slot {index}: {result.stderr.decode()}" + msg = f"git worktree add failed for slot {index}: {result.stderr.decode()}" raise RuntimeError(msg) - pid_file = anyio.Path(slot_dir / ".codeflash_pool.pid") - await pid_file.write_text(str(os.getpid()), encoding="utf-8") - return WorktreeSlot(slot_dir, index, self._git_root) async def acquire(self) -> WorktreeSlot: @@ -91,25 +94,38 @@ async def acquire(self) -> WorktreeSlot: return await self._receive.receive() async def release(self, slot: WorktreeSlot) -> None: + if self._closed: + return assert self._send is not None - await self._send.send(slot) + with contextlib.suppress(anyio.ClosedResourceError): + await self._send.send(slot) async def cleanup(self) -> None: - async with anyio.create_task_group() as tg: - for slot in self._slots: - tg.start_soon(self._remove_slot_async, slot) + self._closed = True + + if self._send is not None: + await self._send.aclose() + if self._receive is not None: + await self._receive.aclose() + + for slot in self._slots: + try: + await self._remove_slot_async(slot) + except Exception as exc: + logger.warning(f"Failed to remove worktree slot {slot.index}: {exc}") + self._slots.clear() self._initialized = False - if self._base_dir.exists(): + if await anyio.Path(self._base_dir).exists(): with contextlib.suppress(Exception): await anyio.run_process(["git", "-C", str(self._git_root), "worktree", "prune"], check=False) with contextlib.suppress(OSError): - self._base_dir.rmdir() + await anyio.Path(self._base_dir).rmdir() async def _remove_slot_async(self, slot: WorktreeSlot) -> None: - if slot.path.exists(): - await anyio.to_thread.run_sync(functools.partial(shutil.rmtree, slot.path, onerror=_handle_remove_readonly)) + if await anyio.Path(slot.path).exists(): + await anyio.to_thread.run_sync(functools.partial(_rmtree_safe, slot.path)) async def __aenter__(self) -> Self: await self.initialize() @@ -119,7 +135,22 @@ async def __aexit__(self, *exc: object) -> None: await self.cleanup() -def _handle_remove_readonly(func: Callable[..., Any], path: str, exc_info: tuple[Any, ...]) -> None: +def _rmtree_safe(path: Path) -> None: + if _USE_ONEXC: + shutil.rmtree(path, onexc=_handle_remove_readonly_onexc) + else: + shutil.rmtree(path, onerror=_handle_remove_readonly_onerror) + + +def _handle_remove_readonly_onexc(func: Callable[..., Any], path: str, exc: BaseException) -> None: + if isinstance(exc, PermissionError): + Path(path).chmod(stat.S_IWUSR | stat.S_IRUSR | stat.S_IXUSR) + func(path) + else: + raise exc + + +def _handle_remove_readonly_onerror(func: Callable[..., Any], path: str, exc_info: tuple[Any, ...]) -> None: if isinstance(exc_info[1], PermissionError): Path(path).chmod(stat.S_IWUSR | stat.S_IRUSR | stat.S_IXUSR) func(path) diff --git a/codeflash/languages/function_optimizer.py b/codeflash/languages/function_optimizer.py index 7055969f7..384ad6712 100644 --- a/codeflash/languages/function_optimizer.py +++ b/codeflash/languages/function_optimizer.py @@ -1040,7 +1040,7 @@ def _run_line_profiler_for_winner( ) eval_ctx.record_line_profiler_result(best_optimization.candidate.optimization_id, lp_results["str_out"]) best_optimization.line_profiler_test_results = lp_results - except (ValueError, SyntaxError, AttributeError) as e: + except (ValueError, SyntaxError, AttributeError, Exception) as e: logger.warning(f"Line profiler failed for winning candidate: {e}") finally: self.write_code_and_helpers( @@ -1541,8 +1541,6 @@ def _evaluate_candidates_parallel( original_code_baseline=original_code_baseline, original_helper_code=original_helper_code, file_path_to_helper_classes=file_path_to_helper_classes, - eval_ctx=eval_ctx, - exp_type=exp_type, pool_size=pool_size, ) @@ -1602,18 +1600,34 @@ def _evaluate_candidates_parallel( ) eval_ctx.valid_optimizations.append(best_optimization) - batch_refiner_candidates.append( - AIServiceBatchRefinerCandidate( - optimization_id=candidate.optimization_id, - optimized_source_code=candidate.source_code.markdown, - optimized_explanation=candidate.explanation, - optimized_code_runtime=candidate_result.best_test_runtime, - original_code_runtime=original_code_baseline.runtime, - speedup=f"{int(perf_gain * 100)}%", - optimized_line_profiler_results="", - ) + current_tree_candidates = candidate_node.path_to_root() + is_candidate_refined_before = any( + c.source == OptimizedCandidateSource.REFINE for c in current_tree_candidates ) + if is_candidate_refined_before: + future_adaptive = self.call_adaptive_optimize( + trace_id=self.get_trace_id(exp_type), + original_source_code=code_context.read_writable_code.markdown, + prev_candidates=current_tree_candidates, + eval_ctx=eval_ctx, + ai_service_client=ai_service_client, + ) + if future_adaptive: + self.future_adaptive_optimizations.append(future_adaptive) + else: + batch_refiner_candidates.append( + AIServiceBatchRefinerCandidate( + optimization_id=candidate.optimization_id, + optimized_source_code=candidate.source_code.markdown, + optimized_explanation=candidate.explanation, + optimized_code_runtime=candidate_result.best_test_runtime, + original_code_runtime=original_code_baseline.runtime, + speedup=f"{int(perf_gain * 100)}%", + optimized_line_profiler_results="", + ) + ) + # Dispatch refinement immediately so CandidateProcessor sees it if batch_refiner_candidates: self._dispatch_refinement( @@ -1684,6 +1698,9 @@ def _dispatch_repair_if_possible( test_diffs: list[TestDiff] | None = None, ) -> concurrent.futures.Future | None: """Submit a code repair request if the candidate is eligible.""" + if not test_diffs: + return None + max_repairs = get_effort_value(EffortKeys.MAX_CODE_REPAIRS_PER_TRACE, self.effort) if self.repair_counter >= max_repairs: return None diff --git a/codeflash/models/models.py b/codeflash/models/models.py index 33905d361..4e2cea2a9 100644 --- a/codeflash/models/models.py +++ b/codeflash/models/models.py @@ -62,12 +62,6 @@ class AIServiceBatchRefinerCandidate: optimized_line_profiler_results: str -@dataclass(frozen=True) -class AIServiceBatchRefinerRequest: - shared_context: dict[str, Any] - candidates: list[dict[str, Any]] - - # this should be possible to auto serialize @dataclass(frozen=True) class AdaptiveOptimizedCandidate: diff --git a/codeflash/optimization/parallel_evaluator.py b/codeflash/optimization/parallel_evaluator.py index 643bf0a2b..22e0640bc 100644 --- a/codeflash/optimization/parallel_evaluator.py +++ b/codeflash/optimization/parallel_evaluator.py @@ -13,18 +13,17 @@ from codeflash.code_utils.config_consts import INDIVIDUAL_TESTCASE_TIMEOUT, TOTAL_LOOPING_TIME_EFFECTIVE from codeflash.code_utils.worktree_pool import WorktreePool, WorktreeSlot # noqa: TC001 from codeflash.either import Failure, Success -from codeflash.languages.python.test_runner import async_execute_test_subprocess if TYPE_CHECKING: from codeflash.either import Result from codeflash.languages.function_optimizer import CandidateNode, FunctionOptimizer from codeflash.models.models import ( - CandidateEvaluationContext, CodeOptimizationContext, OptimizedCandidate, OptimizedCandidateResult, OriginalCodeBaseline, TestDiff, + TestResults, ) @@ -40,18 +39,21 @@ class EvalFailure: class _BehavioralPass: """Intermediate result: candidate passed behavioral tests, ready for benchmarking.""" - slot: WorktreeSlot candidate_index: int perf_test_files: list[str] test_env: dict[str, str] pytest_cmd_list: list[str] + behavior_test_results: TestResults + fto_code: str + helper_codes: dict[Path, str] + fto_file_path: Path class ParallelCandidateEvaluator: """Evaluates optimization candidates in parallel using git worktrees. Two-phase evaluation: - Phase 1 (concurrent): behavioral correctness tests + Phase 1 (concurrent): behavioral correctness tests — slots released after each test Phase 2 (sequential): benchmarking — one candidate at a time for accurate timing """ @@ -59,7 +61,7 @@ def __init__(self, optimizer: FunctionOptimizer, pool_size: int = 4) -> None: self._optimizer = optimizer self._pool_size = pool_size self._pool: WorktreePool | None = None - self._code_replace_lock = anyio.Lock() + self._replace_lock = anyio.Lock() async def evaluate_candidates( self, @@ -80,7 +82,7 @@ async def evaluate_candidates( async with WorktreePool(pool_size=self._pool_size) as pool: self._pool = pool - # Phase 1: concurrent behavioral tests + # Phase 1: concurrent behavioral tests (slots released after each test) behavioral_passes: list[tuple[int, CandidateNode, _BehavioralPass]] = [] async with anyio.create_task_group() as tg: @@ -93,21 +95,21 @@ async def evaluate_candidates( code_context, original_code_baseline, original_helper_code, - file_path_to_helper_classes, results, behavioral_passes, ) # Phase 2: sequential benchmarking (no CPU contention) for result_index, candidate_node, bp in behavioral_passes: + slot = await pool.acquire() try: - bench_result = await self._benchmark_phase(bp, original_code_baseline) + bench_result = await self._benchmark_phase(slot, bp, original_code_baseline) results[result_index] = (candidate_node, bench_result) except Exception as exc: logger.error(f"Benchmark for {candidate_node.candidate.optimization_id} raised: {exc}") results[result_index] = (candidate_node, Failure(EvalFailure(message=str(exc)))) finally: - await pool.release(bp.slot) + await pool.release(slot) return results @@ -119,11 +121,10 @@ async def _behavioral_phase( code_context: CodeOptimizationContext, original_code_baseline: OriginalCodeBaseline, original_helper_code: dict[Path, str], - file_path_to_helper_classes: dict[Path, set[str]], results: list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]], behavioral_passes: list[tuple[int, CandidateNode, _BehavioralPass]], ) -> None: - """Run behavioral tests for a candidate. On pass, hold the slot for benchmarking.""" + """Run behavioral tests for a candidate. Slot is always released after the test.""" assert self._pool is not None slot = await self._pool.acquire() try: @@ -135,18 +136,16 @@ async def _behavioral_phase( original_code_baseline=original_code_baseline, original_helper_code=original_helper_code, ) - except Exception as exc: - logger.error(f"Candidate {candidate_node.candidate.optimization_id} raised: {exc}") - results[result_index] = (candidate_node, Failure(EvalFailure(message=str(exc)))) + except BaseException: await self._pool.release(slot) - return + raise + + await self._pool.release(slot) if isinstance(outcome, Failure): results[result_index] = (candidate_node, outcome) - await self._pool.release(slot) return - # Behavioral pass — hold the slot for Phase 2 behavioral_passes.append((result_index, candidate_node, outcome.unwrap())) async def _run_behavioral( @@ -162,7 +161,8 @@ async def _run_behavioral( opt = self._optimizer fto = opt.function_to_optimize - async with self._code_replace_lock: + # Serialize main-tree access: replace_and_capture writes/reads/restores shared files + async with self._replace_lock: candidate_files = await anyio.to_thread.run_sync( self._replace_and_capture, opt, code_context, candidate, original_helper_code ) @@ -198,13 +198,14 @@ async def _run_behavioral( test_env["PYTHONPATH"] = str(worktree_project_root) from codeflash.code_utils.compat import IS_POSIX, SAFE_SYS_EXECUTABLE + from codeflash.languages.python.test_runner import async_execute_test_subprocess pytest_cmd_list = opt.language_support.build_pytest_cmd(SAFE_SYS_EXECUTABLE, IS_POSIX) # type: ignore[attr-defined] blocklisted_plugins = ["benchmark", "codspeed", "xdist", "sugar"] blocklist_args = [f"-p no:{plugin}" for plugin in blocklisted_plugins] - result_file_path = get_run_tmp_file(Path(f"pytest_results_candidate_{candidate_index}.xml")) + result_file_path = get_run_tmp_file(Path(f"pytest_results_candidate_{candidate_index}_{slot.index}.xml")) result_args = [f"--junitxml={result_file_path.as_posix()}", "-o", "junit_logging=all"] pytest_test_env = test_env.copy() @@ -250,24 +251,38 @@ async def _run_behavioral( return Success( _BehavioralPass( - slot=slot, candidate_index=candidate_index, perf_test_files=perf_test_files, test_env=pytest_test_env, pytest_cmd_list=pytest_cmd_list, + behavior_test_results=behavior_test_results, + fto_code=fto_code, + helper_codes=helper_codes, + fto_file_path=Path(fto.file_path), ) ) async def _benchmark_phase( - self, bp: _BehavioralPass, original_code_baseline: OriginalCodeBaseline + self, slot: WorktreeSlot, bp: _BehavioralPass, original_code_baseline: OriginalCodeBaseline ) -> Result[OptimizedCandidateResult, EvalFailure]: """Run performance benchmarks sequentially for a candidate that passed behavioral tests.""" opt = self._optimizer + # Re-stage the candidate code in the acquired slot + await slot.write_candidate(bp.fto_file_path, bp.fto_code) + for module_path, code in bp.helper_codes.items(): + await slot.write_candidate(module_path, code) + + for file in opt.test_files.test_files: + if file.benchmarking_file_path and file.benchmarking_file_path.exists(): + await slot.write_candidate( + file.benchmarking_file_path, file.benchmarking_file_path.read_text(encoding="utf-8") + ) + blocklisted_plugins = ["benchmark", "codspeed", "xdist", "sugar"] blocklist_args = [f"-p no:{plugin}" for plugin in blocklisted_plugins] - perf_result_file = get_run_tmp_file(Path(f"pytest_perf_candidate_{bp.candidate_index}.xml")) + perf_result_file = get_run_tmp_file(Path(f"pytest_perf_candidate_{bp.candidate_index}_{slot.index}.xml")) perf_result_args = [f"--junitxml={perf_result_file.as_posix()}", "-o", "junit_logging=all"] perf_pytest_args = [ @@ -282,8 +297,10 @@ async def _benchmark_phase( perf_cmd = bp.pytest_cmd_list + perf_pytest_args + blocklist_args + perf_result_args + bp.perf_test_files + from codeflash.languages.python.test_runner import async_execute_test_subprocess + try: - await async_execute_test_subprocess(cmd_list=perf_cmd, cwd=bp.slot.path, env=bp.test_env, timeout=600) + await async_execute_test_subprocess(cmd_list=perf_cmd, cwd=slot.path, env=bp.test_env, timeout=600) except subprocess.TimeoutExpired: logger.warning(f"Performance test timeout for candidate {bp.candidate_index}") return Failure(EvalFailure(message="Performance test timeout")) @@ -307,7 +324,7 @@ async def _benchmark_phase( OptimizedCandidateResult( max_loop_count=loop_count, best_test_runtime=total_timing, - behavior_test_results=None, + behavior_test_results=bp.behavior_test_results, benchmarking_test_results=perf_test_results, replay_benchmarking_test_results=None, optimization_candidate_index=bp.candidate_index, @@ -338,11 +355,14 @@ def _replace_and_capture( fto_code = Path(fto.file_path).read_text("utf-8") helper_codes = {Path(p): Path(p).read_text("utf-8") for p in original_helper_code} return fto_code, helper_codes - except (ValueError, SyntaxError, AttributeError) as e: + except Exception as e: logger.error(f"Code replacement failed: {e}") return None finally: - opt.write_code_and_helpers(opt.function_to_optimize_source_code, original_helper_code, fto.file_path) + try: + opt.write_code_and_helpers(opt.function_to_optimize_source_code, original_helper_code, fto.file_path) + except Exception as restore_err: + logger.error(f"Failed to restore main tree after code replacement: {restore_err}") def run_parallel_evaluation( @@ -352,8 +372,6 @@ def run_parallel_evaluation( original_code_baseline: OriginalCodeBaseline, original_helper_code: dict[Path, str], file_path_to_helper_classes: dict[Path, set[str]], - eval_ctx: CandidateEvaluationContext, - exp_type: str, pool_size: int = 4, ) -> tuple[list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]], list, list]: """Entry point: run parallel candidate evaluation from sync code via anyio. diff --git a/tests/test_parallel_evaluator.py b/tests/test_parallel_evaluator.py new file mode 100644 index 000000000..bb02e7735 --- /dev/null +++ b/tests/test_parallel_evaluator.py @@ -0,0 +1,552 @@ +"""Integration tests for the parallel candidate evaluation infrastructure.""" + +from __future__ import annotations + +import subprocess +import sys +from pathlib import Path +from typing import Any +from unittest.mock import MagicMock, patch + +import anyio +import pytest + +from codeflash.either import Failure, Success, is_successful +from codeflash.languages.function_optimizer import CandidateNode +from codeflash.optimization.parallel_evaluator import EvalFailure, ParallelCandidateEvaluator + + +class TestWorktreePoolLifecycle: + def test_creates_n_worktrees_and_cleans_up(self, tmp_path: Path) -> None: + from unittest.mock import patch + + from codeflash.code_utils.worktree_pool import WorktreePool + + pool_size = 3 + base_dir = tmp_path.resolve() / "worktrees" + + # The pool needs a git root. We use the codeflash repo itself. + repo_root = Path(__file__).resolve().parents[1] + + async def _run() -> None: + with patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root): + pool = WorktreePool(pool_size=pool_size, base_dir=base_dir) + async with pool: + assert len(pool._slots) == pool_size + for slot in pool._slots: + assert slot.path.exists() + assert slot.path.is_dir() + + # After cleanup, slots are cleared + assert len(pool._slots) == 0 + + anyio.run(_run) + + def test_partial_pool_initialization(self, tmp_path: Path) -> None: + """Pool operates at reduced capacity if some slots fail to create.""" + from unittest.mock import patch + + from codeflash.code_utils.worktree_pool import WorktreePool + + pool_size = 3 + base_dir = tmp_path.resolve() / "worktrees" + repo_root = Path(__file__).resolve().parents[1] + + call_count = 0 + + original_create_slot = WorktreePool._create_slot + + async def failing_create_slot(self: Any, index: int) -> Any: + nonlocal call_count + call_count += 1 + if index == 1: + raise RuntimeError("Simulated git worktree failure") + return await original_create_slot(self, index) + + async def _run() -> None: + with ( + patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root), + patch.object(WorktreePool, "_create_slot", failing_create_slot), + ): + async with WorktreePool(pool_size=pool_size, base_dir=base_dir) as pool: + assert len(pool._slots) == 2 + slot = await pool.acquire() + assert slot.index != 1 + await pool.release(slot) + + anyio.run(_run) + + def test_acquire_release_round_trip(self, tmp_path: Path) -> None: + from unittest.mock import patch + + from codeflash.code_utils.worktree_pool import WorktreePool + + pool_size = 2 + base_dir = tmp_path.resolve() / "worktrees" + repo_root = Path(__file__).resolve().parents[1] + + async def _run() -> None: + with patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root): + async with WorktreePool(pool_size=pool_size, base_dir=base_dir) as pool: + slot1 = await pool.acquire() + slot2 = await pool.acquire() + + # Both slots should be distinct + assert slot1.index != slot2.index + assert slot1.path != slot2.path + + # Release one and re-acquire it + await pool.release(slot1) + reacquired = await pool.acquire() + assert reacquired.index == slot1.index + + await pool.release(slot2) + await pool.release(reacquired) + + anyio.run(_run) + + +class TestWorktreeSlotFileIsolation: + def test_write_to_one_slot_does_not_affect_another(self, tmp_path: Path) -> None: + from unittest.mock import patch + + from codeflash.code_utils.worktree_pool import WorktreePool + + pool_size = 2 + base_dir = tmp_path.resolve() / "worktrees" + repo_root = Path(__file__).resolve().parents[1] + test_file = repo_root / "codeflash" / "__init__.py" + + async def _run() -> None: + with patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root): + async with WorktreePool(pool_size=pool_size, base_dir=base_dir) as pool: + slot_a = await pool.acquire() + slot_b = await pool.acquire() + + sentinel = "# SLOT_A_SENTINEL_CONTENT\n" + await slot_a.write_candidate(test_file, sentinel) + + # slot_b's mirror of the same file should NOT contain the sentinel + mirrored_b = slot_b.mirror(test_file) + content_b = mirrored_b.read_text(encoding="utf-8") + assert sentinel not in content_b + + # slot_a's mirror should contain it + mirrored_a = slot_a.mirror(test_file) + content_a = mirrored_a.read_text(encoding="utf-8") + assert content_a == sentinel + + # Main tree should be unaffected + main_content = test_file.read_text(encoding="utf-8") + assert sentinel not in main_content + + await pool.release(slot_a) + await pool.release(slot_b) + + anyio.run(_run) + + +class TestAsyncExecuteTestSubprocess: + def test_runs_simple_command(self) -> None: + from codeflash.languages.python.test_runner import async_execute_test_subprocess + + cwd = Path(__file__).resolve().parent + + async def _run() -> subprocess.CompletedProcess[str]: + return await async_execute_test_subprocess( + cmd_list=[sys.executable, "-c", "print('hello world')"], cwd=cwd, env=None, timeout=30 + ) + + result = anyio.run(_run) + assert result.returncode == 0 + assert "hello world" in result.stdout + + def test_captures_stderr(self) -> None: + from codeflash.languages.python.test_runner import async_execute_test_subprocess + + cwd = Path(__file__).resolve().parent + + async def _run() -> subprocess.CompletedProcess[str]: + return await async_execute_test_subprocess( + cmd_list=[sys.executable, "-c", "import sys; sys.stderr.write('err_msg\\n')"], + cwd=cwd, + env=None, + timeout=30, + ) + + result = anyio.run(_run) + assert "err_msg" in result.stderr + + def test_timeout_raises(self) -> None: + from codeflash.languages.python.test_runner import async_execute_test_subprocess + + cwd = Path(__file__).resolve().parent + + async def _run() -> subprocess.CompletedProcess[str]: + return await async_execute_test_subprocess( + cmd_list=[sys.executable, "-c", "import time; time.sleep(60)"], cwd=cwd, env=None, timeout=1 + ) + + with pytest.raises(subprocess.TimeoutExpired): + anyio.run(_run) + + +class TestParallelCandidateEvaluator: + """Unit tests for the evaluator with mocked worktree operations.""" + + def _make_candidate_node(self, opt_id: str = "cand_1") -> CandidateNode: + from codeflash.models.models import CodeString, CodeStringsMarkdown, OptimizedCandidate + from codeflash.models.shared_types import OptimizedCandidateSource + + source_code = CodeStringsMarkdown(code_strings=[CodeString(code="def f(): pass", file_path=Path("test.py"))]) + candidate = OptimizedCandidate( + source_code=source_code, + explanation="test optimization", + optimization_id=opt_id, + source=OptimizedCandidateSource.OPTIMIZE, + ) + return CandidateNode(candidate) + + def _make_optimizer_mock(self, tmp_path: Path) -> MagicMock: + opt = MagicMock() + opt.function_to_optimize.file_path = str(tmp_path / "src" / "module.py") + opt.function_to_optimize_source_code = "def f(): pass" + opt.test_files.test_files = [] + opt.args.project_root = str(tmp_path) + opt.test_cfg = MagicMock() + opt.get_test_env.return_value = {"PATH": "/usr/bin"} + opt.language_support.build_pytest_cmd.return_value = [sys.executable, "-m", "pytest"] + opt.replace_function_and_helpers_with_optimized_code.return_value = True + opt.write_code_and_helpers = MagicMock() + return opt + + def test_code_replacement_failure_returns_eval_failure(self, tmp_path: Path) -> None: + opt = self._make_optimizer_mock(tmp_path) + opt.replace_function_and_helpers_with_optimized_code.return_value = False + + node = self._make_candidate_node() + evaluator = ParallelCandidateEvaluator(opt, pool_size=1) + + repo_root = Path(__file__).resolve().parents[1] + + async def _run() -> list: # type: ignore[type-arg] + with patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root): + return await evaluator.evaluate_candidates( + candidates=[(node, 0, None)], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + results = anyio.run(_run) + assert len(results) == 1 + _, result = results[0] + assert result is not None + assert not is_successful(result) + failure = result.failure() + assert isinstance(failure, EvalFailure) + assert "Code replacement failed" in failure.message + assert failure.diffs == [] + + def test_behavioral_mismatch_carries_diffs(self, tmp_path: Path) -> None: + from codeflash.models.models import TestDiff, TestDiffScope + + opt = self._make_optimizer_mock(tmp_path) + (tmp_path / "src").mkdir(parents=True) + (tmp_path / "src" / "module.py").write_text("def f(): pass", encoding="utf-8") + + node = self._make_candidate_node() + evaluator = ParallelCandidateEvaluator(opt, pool_size=1) + + repo_root = Path(__file__).resolve().parents[1] + mock_diffs = [TestDiff(scope=TestDiffScope.DID_PASS, original_pass=True, candidate_pass=False)] + + async def _run() -> list: # type: ignore[type-arg] + with ( + patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root), + patch.object( + ParallelCandidateEvaluator, + "_run_behavioral", + return_value=Failure(EvalFailure(message="Behavioral mismatch: 1 diffs", diffs=mock_diffs)), # type: ignore[arg-type] + ), + ): + return await evaluator.evaluate_candidates( + candidates=[(node, 0, None)], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + results = anyio.run(_run) + _, result = results[0] + assert not is_successful(result) + failure = result.failure() + assert len(failure.diffs) == 1 + assert failure.diffs[0].scope == TestDiffScope.DID_PASS + + def test_successful_candidate_returns_result(self, tmp_path: Path) -> None: + from codeflash.optimization.parallel_evaluator import _BehavioralPass + + opt = self._make_optimizer_mock(tmp_path) + (tmp_path / "src").mkdir(parents=True) + (tmp_path / "src" / "module.py").write_text("def f(): pass", encoding="utf-8") + + node = self._make_candidate_node() + evaluator = ParallelCandidateEvaluator(opt, pool_size=1) + + repo_root = Path(__file__).resolve().parents[1] + mock_result = MagicMock() + mock_result.best_test_runtime = 5000 + + mock_behavior_results = MagicMock() + + async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg] + return Success( + _BehavioralPass( + candidate_index=0, + perf_test_files=[], + test_env={}, + pytest_cmd_list=[], + behavior_test_results=mock_behavior_results, + fto_code="def f(): pass", + helper_codes={}, + fto_file_path=Path("/tmp/module.py"), + ) + ) + + async def _run() -> list: # type: ignore[type-arg] + with ( + patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root), + patch.object(ParallelCandidateEvaluator, "_run_behavioral", mock_behavioral), + patch.object(ParallelCandidateEvaluator, "_benchmark_phase", return_value=Success(mock_result)), + ): + return await evaluator.evaluate_candidates( + candidates=[(node, 0, None)], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + results = anyio.run(_run) + _, result = results[0] + assert is_successful(result) + assert result.unwrap().best_test_runtime == 5000 + + def test_multiple_candidates_evaluated_concurrently(self, tmp_path: Path) -> None: + from codeflash.optimization.parallel_evaluator import _BehavioralPass + + opt = self._make_optimizer_mock(tmp_path) + (tmp_path / "src").mkdir(parents=True) + (tmp_path / "src" / "module.py").write_text("def f(): pass", encoding="utf-8") + + nodes = [self._make_candidate_node(f"cand_{i}") for i in range(3)] + evaluator = ParallelCandidateEvaluator(opt, pool_size=3) + + repo_root = Path(__file__).resolve().parents[1] + mock_result = MagicMock() + mock_result.best_test_runtime = 1000 + + behavioral_call_count = 0 + mock_behavior_results = MagicMock() + + async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg] + nonlocal behavioral_call_count + behavioral_call_count += 1 + return Success( + _BehavioralPass( + candidate_index=0, + perf_test_files=[], + test_env={}, + pytest_cmd_list=[], + behavior_test_results=mock_behavior_results, + fto_code="def f(): pass", + helper_codes={}, + fto_file_path=Path("/tmp/module.py"), + ) + ) + + benchmark_call_count = 0 + + async def mock_benchmark(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg] + nonlocal benchmark_call_count + benchmark_call_count += 1 + return Success(mock_result) + + async def _run() -> list: # type: ignore[type-arg] + with ( + patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root), + patch.object(ParallelCandidateEvaluator, "_run_behavioral", mock_behavioral), + patch.object(ParallelCandidateEvaluator, "_benchmark_phase", mock_benchmark), + ): + return await evaluator.evaluate_candidates( + candidates=[(n, i, None) for i, n in enumerate(nodes)], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + results = anyio.run(_run) + assert len(results) == 3 + assert behavioral_call_count == 3 + assert benchmark_call_count == 3 + for _, result in results: + assert is_successful(result) + + def test_benchmark_phase_restages_candidate_code(self, tmp_path: Path) -> None: + """Phase 2 must write fto_code and helper_codes to the slot before running benchmarks.""" + from codeflash.optimization.parallel_evaluator import _BehavioralPass + + opt = self._make_optimizer_mock(tmp_path) + (tmp_path / "src").mkdir(parents=True) + (tmp_path / "src" / "module.py").write_text("def f(): pass", encoding="utf-8") + + node = self._make_candidate_node() + evaluator = ParallelCandidateEvaluator(opt, pool_size=1) + + repo_root = Path(__file__).resolve().parents[1] + fto_code = "def f(): return 42 # optimized" + helper_path = tmp_path / "src" / "helpers.py" + helper_codes = {helper_path: "HELPER_CODE = True"} + + write_calls: list[tuple[Path, str]] = [] + + async def tracking_write_candidate(self_slot: object, file_path: Path, code: str) -> None: + write_calls.append((file_path, code)) + + async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg] + return Success( + _BehavioralPass( + candidate_index=0, + perf_test_files=[], + test_env={"PATH": "/usr/bin"}, + pytest_cmd_list=[sys.executable, "-m", "pytest"], + behavior_test_results=MagicMock(), + fto_code=fto_code, + helper_codes=helper_codes, + fto_file_path=Path(opt.function_to_optimize.file_path), + ) + ) + + async def _run() -> list: # type: ignore[type-arg] + with ( + patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root), + patch.object(ParallelCandidateEvaluator, "_run_behavioral", mock_behavioral), + patch( + "codeflash.code_utils.worktree_pool.WorktreeSlot.write_candidate", tracking_write_candidate + ), + patch( + "codeflash.languages.python.test_runner.async_execute_test_subprocess", + return_value=MagicMock(returncode=0, stdout="", stderr=""), + ), + patch( + "codeflash.verification.parse_test_output.parse_test_xml", + return_value=MagicMock(test_results=[MagicMock()], effective_loop_count=lambda: 10, total_passed_runtime=lambda: 5000), + ), + ): + return await evaluator.evaluate_candidates( + candidates=[(node, 0, None)], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + anyio.run(_run) + + written_codes = {p: c for p, c in write_calls} + assert Path(opt.function_to_optimize.file_path) in written_codes + assert written_codes[Path(opt.function_to_optimize.file_path)] == fto_code + assert helper_path in written_codes + assert written_codes[helper_path] == "HELPER_CODE = True" + + def test_empty_candidates_returns_empty(self, tmp_path: Path) -> None: + opt = self._make_optimizer_mock(tmp_path) + evaluator = ParallelCandidateEvaluator(opt, pool_size=1) + repo_root = Path(__file__).resolve().parents[1] + + async def _run() -> list: # type: ignore[type-arg] + with patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root): + return await evaluator.evaluate_candidates( + candidates=[], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + results = anyio.run(_run) + assert results == [] + + def test_replace_and_capture_restores_on_failure(self, tmp_path: Path) -> None: + """_replace_and_capture must restore original code even when replacement raises.""" + opt = self._make_optimizer_mock(tmp_path) + (tmp_path / "src").mkdir(parents=True) + original_code = "def f(): pass" + (tmp_path / "src" / "module.py").write_text(original_code, encoding="utf-8") + + opt.replace_function_and_helpers_with_optimized_code.side_effect = ValueError("bad code") + + result = ParallelCandidateEvaluator._replace_and_capture( + opt, MagicMock(), MagicMock(), {} + ) + assert result is None + opt.write_code_and_helpers.assert_called_once_with( + opt.function_to_optimize_source_code, {}, opt.function_to_optimize.file_path + ) + + def test_more_candidates_than_slots_no_deadlock(self, tmp_path: Path) -> None: + """Regression test: more passing candidates than pool slots must not deadlock.""" + from codeflash.optimization.parallel_evaluator import _BehavioralPass + + opt = self._make_optimizer_mock(tmp_path) + (tmp_path / "src").mkdir(parents=True) + (tmp_path / "src" / "module.py").write_text("def f(): pass", encoding="utf-8") + + nodes = [self._make_candidate_node(f"cand_{i}") for i in range(6)] + evaluator = ParallelCandidateEvaluator(opt, pool_size=2) + + repo_root = Path(__file__).resolve().parents[1] + mock_result = MagicMock() + mock_result.best_test_runtime = 2000 + mock_behavior_results = MagicMock() + + async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg] + return Success( + _BehavioralPass( + candidate_index=0, + perf_test_files=[], + test_env={}, + pytest_cmd_list=[], + behavior_test_results=mock_behavior_results, + fto_code="def f(): pass", + helper_codes={}, + fto_file_path=Path("/tmp/module.py"), + ) + ) + + async def mock_benchmark(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg] + return Success(mock_result) + + async def _run() -> list: # type: ignore[type-arg] + with ( + patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root), + patch.object(ParallelCandidateEvaluator, "_run_behavioral", mock_behavioral), + patch.object(ParallelCandidateEvaluator, "_benchmark_phase", mock_benchmark), + ): + return await evaluator.evaluate_candidates( + candidates=[(n, i, None) for i, n in enumerate(nodes)], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + # If this deadlocks, the test will timeout + results = anyio.run(_run) + assert len(results) == 6 + for _, result in results: + assert is_successful(result)