From 4da55ce21f7ec6aa004340e8d622e8a9afc7fc39 Mon Sep 17 00:00:00 2001 From: Kevin Turcios Date: Wed, 6 May 2026 19:32:28 -0500 Subject: [PATCH 1/4] test: parallel evaluation unit and integration tests Covers the full stack: pool lifecycle/cleanup, file isolation between slots, subprocess stdout/stderr/timeout, and evaluator logic (failure with diffs, success routing, concurrent multi-candidate). --- tests/test_parallel_evaluator.py | 344 +++++++++++++++++++++++++++++++ 1 file changed, 344 insertions(+) create mode 100644 tests/test_parallel_evaluator.py diff --git a/tests/test_parallel_evaluator.py b/tests/test_parallel_evaluator.py new file mode 100644 index 000000000..b62753039 --- /dev/null +++ b/tests/test_parallel_evaluator.py @@ -0,0 +1,344 @@ +"""Integration tests for the parallel candidate evaluation infrastructure.""" + +from __future__ import annotations + +import subprocess +import sys +from pathlib import Path +from unittest.mock import MagicMock, patch + +import anyio +import pytest + +from codeflash.either import Failure, Success, is_successful +from codeflash.languages.function_optimizer import CandidateNode +from codeflash.optimization.parallel_evaluator import EvalFailure, ParallelCandidateEvaluator + + +class TestWorktreePoolLifecycle: + def test_creates_n_worktrees_and_cleans_up(self, tmp_path: Path) -> None: + from unittest.mock import patch + + from codeflash.code_utils.worktree_pool import WorktreePool + + pool_size = 3 + base_dir = tmp_path.resolve() / "worktrees" + + # The pool needs a git root. We use the codeflash repo itself. + repo_root = Path(__file__).resolve().parents[1] + + async def _run() -> None: + with patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root): + pool = WorktreePool(pool_size=pool_size, base_dir=base_dir) + async with pool: + assert len(pool._slots) == pool_size + for slot in pool._slots: + assert slot.path.exists() + assert slot.path.is_dir() + assert (slot.path / ".codeflash_pool.pid").exists() + + # After cleanup, slots are cleared + assert len(pool._slots) == 0 + + anyio.run(_run) + + def test_acquire_release_round_trip(self, tmp_path: Path) -> None: + from unittest.mock import patch + + from codeflash.code_utils.worktree_pool import WorktreePool + + pool_size = 2 + base_dir = tmp_path.resolve() / "worktrees" + repo_root = Path(__file__).resolve().parents[1] + + async def _run() -> None: + with patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root): + async with WorktreePool(pool_size=pool_size, base_dir=base_dir) as pool: + slot1 = await pool.acquire() + slot2 = await pool.acquire() + + # Both slots should be distinct + assert slot1.index != slot2.index + assert slot1.path != slot2.path + + # Release one and re-acquire it + await pool.release(slot1) + reacquired = await pool.acquire() + assert reacquired.index == slot1.index + + await pool.release(slot2) + await pool.release(reacquired) + + anyio.run(_run) + + +class TestWorktreeSlotFileIsolation: + def test_write_to_one_slot_does_not_affect_another(self, tmp_path: Path) -> None: + from unittest.mock import patch + + from codeflash.code_utils.worktree_pool import WorktreePool + + pool_size = 2 + base_dir = tmp_path.resolve() / "worktrees" + repo_root = Path(__file__).resolve().parents[1] + test_file = repo_root / "codeflash" / "__init__.py" + + async def _run() -> None: + with patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root): + async with WorktreePool(pool_size=pool_size, base_dir=base_dir) as pool: + slot_a = await pool.acquire() + slot_b = await pool.acquire() + + sentinel = "# SLOT_A_SENTINEL_CONTENT\n" + await slot_a.write_candidate(test_file, sentinel) + + # slot_b's mirror of the same file should NOT contain the sentinel + mirrored_b = slot_b.mirror(test_file) + content_b = mirrored_b.read_text(encoding="utf-8") + assert sentinel not in content_b + + # slot_a's mirror should contain it + mirrored_a = slot_a.mirror(test_file) + content_a = mirrored_a.read_text(encoding="utf-8") + assert content_a == sentinel + + # Main tree should be unaffected + main_content = test_file.read_text(encoding="utf-8") + assert sentinel not in main_content + + await pool.release(slot_a) + await pool.release(slot_b) + + anyio.run(_run) + + +class TestAsyncExecuteTestSubprocess: + def test_runs_simple_command(self) -> None: + from codeflash.languages.python.test_runner import async_execute_test_subprocess + + cwd = Path(__file__).resolve().parent + + async def _run() -> subprocess.CompletedProcess[str]: + return await async_execute_test_subprocess( + cmd_list=[sys.executable, "-c", "print('hello world')"], cwd=cwd, env=None, timeout=30 + ) + + result = anyio.run(_run) + assert result.returncode == 0 + assert "hello world" in result.stdout + + def test_captures_stderr(self) -> None: + from codeflash.languages.python.test_runner import async_execute_test_subprocess + + cwd = Path(__file__).resolve().parent + + async def _run() -> subprocess.CompletedProcess[str]: + return await async_execute_test_subprocess( + cmd_list=[sys.executable, "-c", "import sys; sys.stderr.write('err_msg\\n')"], + cwd=cwd, + env=None, + timeout=30, + ) + + result = anyio.run(_run) + assert "err_msg" in result.stderr + + def test_timeout_raises(self) -> None: + from codeflash.languages.python.test_runner import async_execute_test_subprocess + + cwd = Path(__file__).resolve().parent + + async def _run() -> subprocess.CompletedProcess[str]: + return await async_execute_test_subprocess( + cmd_list=[sys.executable, "-c", "import time; time.sleep(60)"], cwd=cwd, env=None, timeout=1 + ) + + with pytest.raises(subprocess.TimeoutExpired): + anyio.run(_run) + + +class TestParallelCandidateEvaluator: + """Unit tests for the evaluator with mocked worktree operations.""" + + def _make_candidate_node(self, opt_id: str = "cand_1") -> CandidateNode: + from codeflash.models.models import CodeString, CodeStringsMarkdown, OptimizedCandidate + from codeflash.models.shared_types import OptimizedCandidateSource + + source_code = CodeStringsMarkdown(code_strings=[CodeString(code="def f(): pass", file_path=Path("test.py"))]) + candidate = OptimizedCandidate( + source_code=source_code, + explanation="test optimization", + optimization_id=opt_id, + source=OptimizedCandidateSource.OPTIMIZE, + ) + return CandidateNode(candidate) + + def _make_optimizer_mock(self, tmp_path: Path) -> MagicMock: + opt = MagicMock() + opt.function_to_optimize.file_path = str(tmp_path / "src" / "module.py") + opt.function_to_optimize_source_code = "def f(): pass" + opt.test_files.test_files = [] + opt.args.project_root = str(tmp_path) + opt.test_cfg = MagicMock() + opt.get_test_env.return_value = {"PATH": "/usr/bin"} + opt.language_support.build_pytest_cmd.return_value = [sys.executable, "-m", "pytest"] + opt.replace_function_and_helpers_with_optimized_code.return_value = True + opt.write_code_and_helpers = MagicMock() + return opt + + def test_code_replacement_failure_returns_eval_failure(self, tmp_path: Path) -> None: + opt = self._make_optimizer_mock(tmp_path) + opt.replace_function_and_helpers_with_optimized_code.return_value = False + + node = self._make_candidate_node() + evaluator = ParallelCandidateEvaluator(opt, pool_size=1) + + repo_root = Path(__file__).resolve().parents[1] + + async def _run() -> list: # type: ignore[type-arg] + with patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root): + return await evaluator.evaluate_candidates( + candidates=[(node, 0, None)], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + results = anyio.run(_run) + assert len(results) == 1 + _, result = results[0] + assert result is not None + assert not is_successful(result) + failure = result.failure() + assert isinstance(failure, EvalFailure) + assert "Code replacement failed" in failure.message + assert failure.diffs == [] + + def test_behavioral_mismatch_carries_diffs(self, tmp_path: Path) -> None: + from codeflash.models.models import TestDiff, TestDiffScope + + opt = self._make_optimizer_mock(tmp_path) + (tmp_path / "src").mkdir(parents=True) + (tmp_path / "src" / "module.py").write_text("def f(): pass", encoding="utf-8") + + node = self._make_candidate_node() + evaluator = ParallelCandidateEvaluator(opt, pool_size=1) + + repo_root = Path(__file__).resolve().parents[1] + mock_diffs = [TestDiff(scope=TestDiffScope.DID_PASS, original_pass=True, candidate_pass=False)] + + async def _run() -> list: # type: ignore[type-arg] + with ( + patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root), + patch.object( + ParallelCandidateEvaluator, + "_run_behavioral", + return_value=Failure(EvalFailure(message="Behavioral mismatch: 1 diffs", diffs=mock_diffs)), # type: ignore[arg-type] + ), + ): + return await evaluator.evaluate_candidates( + candidates=[(node, 0, None)], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + results = anyio.run(_run) + _, result = results[0] + assert not is_successful(result) + failure = result.failure() + assert len(failure.diffs) == 1 + assert failure.diffs[0].scope == TestDiffScope.DID_PASS + + def test_successful_candidate_returns_result(self, tmp_path: Path) -> None: + from codeflash.optimization.parallel_evaluator import _BehavioralPass + + opt = self._make_optimizer_mock(tmp_path) + (tmp_path / "src").mkdir(parents=True) + (tmp_path / "src" / "module.py").write_text("def f(): pass", encoding="utf-8") + + node = self._make_candidate_node() + evaluator = ParallelCandidateEvaluator(opt, pool_size=1) + + repo_root = Path(__file__).resolve().parents[1] + mock_result = MagicMock() + mock_result.best_test_runtime = 5000 + + async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg] + slot = MagicMock() + return Success( + _BehavioralPass(slot=slot, candidate_index=0, perf_test_files=[], test_env={}, pytest_cmd_list=[]) + ) + + async def _run() -> list: # type: ignore[type-arg] + with ( + patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root), + patch.object(ParallelCandidateEvaluator, "_run_behavioral", mock_behavioral), + patch.object(ParallelCandidateEvaluator, "_benchmark_phase", return_value=Success(mock_result)), + ): + return await evaluator.evaluate_candidates( + candidates=[(node, 0, None)], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + results = anyio.run(_run) + _, result = results[0] + assert is_successful(result) + assert result.unwrap().best_test_runtime == 5000 + + def test_multiple_candidates_evaluated_concurrently(self, tmp_path: Path) -> None: + from codeflash.optimization.parallel_evaluator import _BehavioralPass + + opt = self._make_optimizer_mock(tmp_path) + (tmp_path / "src").mkdir(parents=True) + (tmp_path / "src" / "module.py").write_text("def f(): pass", encoding="utf-8") + + nodes = [self._make_candidate_node(f"cand_{i}") for i in range(3)] + evaluator = ParallelCandidateEvaluator(opt, pool_size=3) + + repo_root = Path(__file__).resolve().parents[1] + mock_result = MagicMock() + mock_result.best_test_runtime = 1000 + + behavioral_call_count = 0 + + async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg] + nonlocal behavioral_call_count + behavioral_call_count += 1 + slot = MagicMock() + return Success( + _BehavioralPass(slot=slot, candidate_index=0, perf_test_files=[], test_env={}, pytest_cmd_list=[]) + ) + + benchmark_call_count = 0 + + async def mock_benchmark(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg] + nonlocal benchmark_call_count + benchmark_call_count += 1 + return Success(mock_result) + + async def _run() -> list: # type: ignore[type-arg] + with ( + patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root), + patch.object(ParallelCandidateEvaluator, "_run_behavioral", mock_behavioral), + patch.object(ParallelCandidateEvaluator, "_benchmark_phase", mock_benchmark), + ): + return await evaluator.evaluate_candidates( + candidates=[(n, i, None) for i, n in enumerate(nodes)], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + results = anyio.run(_run) + assert len(results) == 3 + assert behavioral_call_count == 3 + assert benchmark_call_count == 3 + for _, result in results: + assert is_successful(result) From 31d684b943638d4519ca80ac06b2180c13c6c4b0 Mon Sep 17 00:00:00 2001 From: Kevin Turcios Date: Wed, 6 May 2026 19:46:41 -0500 Subject: [PATCH 2/4] fix: resolve deadlock, Pydantic crash, and cleanup issues in parallel evaluator Critical fixes from code review: - Deadlock: slots are now released after behavioral tests (Phase 1), re-acquired for benchmarking (Phase 2). Previously, holding slots across phases caused deadlock when passes >= pool_size. - Pydantic ValidationError: behavior_test_results is now stored in _BehavioralPass and passed through to OptimizedCandidateResult. - Slot leak on cancellation: catch BaseException in _behavioral_phase. WorktreePool improvements: - Graceful partial creation failure (one slot failing doesn't crash pool). - Cleanup resilience (one rmtree failure doesn't abort others). - Stream lifecycle: close send/receive in cleanup(). - Async-safe: use anyio.Path for exists() checks. - Python 3.12+: use onexc instead of deprecated onerror for rmtree. - Remove dead code: PID file, unused restore_file method. Other fixes: - _run_line_profiler_for_winner: catch all exceptions. - _dispatch_repair_if_possible: skip when diffs are empty. - aiservice.py: pass language to _get_valid_candidates in batch path. - Remove unused AIServiceBatchRefinerRequest dataclass. - Fix result file path collision: include slot.index in filename. - Remove _code_replace_lock (no longer needed since slots are released immediately and _replace_and_capture is serialized by GIL). --- codeflash/api/aiservice.py | 2 +- codeflash/code_utils/worktree_pool.py | 69 +++++++++++++------- codeflash/languages/function_optimizer.py | 5 +- codeflash/models/models.py | 6 -- codeflash/optimization/parallel_evaluator.py | 56 ++++++++++------ tests/test_parallel_evaluator.py | 22 +++++-- 6 files changed, 104 insertions(+), 56 deletions(-) diff --git a/codeflash/api/aiservice.py b/codeflash/api/aiservice.py index 68351209c..95846be75 100644 --- a/codeflash/api/aiservice.py +++ b/codeflash/api/aiservice.py @@ -440,7 +440,7 @@ def optimize_code_refinement_batch( if response.status_code == 200: refined_optimizations = response.json()["refinements"] - return self._get_valid_candidates(refined_optimizations, OptimizedCandidateSource.REFINE) + return self._get_valid_candidates(refined_optimizations, OptimizedCandidateSource.REFINE, language=language) self.log_error_response(response, "generating batch optimized candidates", "cli-optimize-error-response") console.rule() diff --git a/codeflash/code_utils/worktree_pool.py b/codeflash/code_utils/worktree_pool.py index b4c40d24e..0bb8452d1 100644 --- a/codeflash/code_utils/worktree_pool.py +++ b/codeflash/code_utils/worktree_pool.py @@ -2,9 +2,9 @@ import contextlib import functools -import os import shutil import stat +import sys from pathlib import Path from typing import TYPE_CHECKING, Any @@ -17,6 +17,8 @@ from codeflash.cli_cmds.console import logger from codeflash.code_utils.git_utils import git_root_dir, mirror_path +_USE_ONEXC = sys.version_info >= (3, 12) + class WorktreeSlot: __slots__ = ("_git_root", "index", "path") @@ -34,10 +36,6 @@ async def write_candidate(self, file_path: Path, code: str) -> None: await mirrored.parent.mkdir(parents=True, exist_ok=True) await mirrored.write_text(code, encoding="utf-8") - async def restore_file(self, file_path: Path, original_code: str) -> None: - mirrored = anyio.Path(self.mirror(file_path)) - await mirrored.write_text(original_code, encoding="utf-8") - class WorktreePool: def __init__(self, pool_size: int = 4, base_dir: Path | None = None) -> None: @@ -54,36 +52,40 @@ async def initialize(self) -> None: return await anyio.Path(self._base_dir).mkdir(parents=True, exist_ok=True) + results: list[WorktreeSlot | None] = [None] * self._pool_size async with anyio.create_task_group() as tg: - results: list[WorktreeSlot | None] = [None] * self._pool_size for i in range(self._pool_size): tg.start_soon(self._create_slot_task, i, results) self._slots = [s for s in results if s is not None] - self._send, self._receive = anyio.create_memory_object_stream[WorktreeSlot](self._pool_size) + if not self._slots: + msg = "Failed to create any worktree slots" + raise RuntimeError(msg) + + self._send, self._receive = anyio.create_memory_object_stream[WorktreeSlot](len(self._slots)) for slot in self._slots: await self._send.send(slot) self._initialized = True logger.debug(f"WorktreePool initialized with {len(self._slots)} slots at {self._base_dir}") async def _create_slot_task(self, index: int, results: list[WorktreeSlot | None]) -> None: - results[index] = await self._create_slot(index) + try: + results[index] = await self._create_slot(index) + except Exception as exc: + logger.warning(f"Failed to create worktree slot {index}: {exc}") async def _create_slot(self, index: int) -> WorktreeSlot: slot_dir = self._base_dir / f"slot-{index}" - if slot_dir.exists(): - await anyio.to_thread.run_sync(functools.partial(shutil.rmtree, slot_dir, onerror=_handle_remove_readonly)) + if await anyio.Path(slot_dir).exists(): + await anyio.to_thread.run_sync(functools.partial(_rmtree_safe, slot_dir)) result = await anyio.run_process( ["git", "-C", str(self._git_root), "worktree", "add", "--detach", str(slot_dir), "HEAD"], check=False ) if result.returncode != 0: - msg = f"Failed to create worktree slot {index}: {result.stderr.decode()}" + msg = f"git worktree add failed for slot {index}: {result.stderr.decode()}" raise RuntimeError(msg) - pid_file = anyio.Path(slot_dir / ".codeflash_pool.pid") - await pid_file.write_text(str(os.getpid()), encoding="utf-8") - return WorktreeSlot(slot_dir, index, self._git_root) async def acquire(self) -> WorktreeSlot: @@ -95,21 +97,29 @@ async def release(self, slot: WorktreeSlot) -> None: await self._send.send(slot) async def cleanup(self) -> None: - async with anyio.create_task_group() as tg: - for slot in self._slots: - tg.start_soon(self._remove_slot_async, slot) + if self._send is not None: + await self._send.aclose() + if self._receive is not None: + await self._receive.aclose() + + for slot in self._slots: + try: + await self._remove_slot_async(slot) + except Exception as exc: + logger.warning(f"Failed to remove worktree slot {slot.index}: {exc}") + self._slots.clear() self._initialized = False - if self._base_dir.exists(): + if await anyio.Path(self._base_dir).exists(): with contextlib.suppress(Exception): await anyio.run_process(["git", "-C", str(self._git_root), "worktree", "prune"], check=False) with contextlib.suppress(OSError): - self._base_dir.rmdir() + await anyio.Path(self._base_dir).rmdir() async def _remove_slot_async(self, slot: WorktreeSlot) -> None: - if slot.path.exists(): - await anyio.to_thread.run_sync(functools.partial(shutil.rmtree, slot.path, onerror=_handle_remove_readonly)) + if await anyio.Path(slot.path).exists(): + await anyio.to_thread.run_sync(functools.partial(_rmtree_safe, slot.path)) async def __aenter__(self) -> Self: await self.initialize() @@ -119,7 +129,22 @@ async def __aexit__(self, *exc: object) -> None: await self.cleanup() -def _handle_remove_readonly(func: Callable[..., Any], path: str, exc_info: tuple[Any, ...]) -> None: +def _rmtree_safe(path: Path) -> None: + if _USE_ONEXC: + shutil.rmtree(path, onexc=_handle_remove_readonly_onexc) + else: + shutil.rmtree(path, onerror=_handle_remove_readonly_onerror) + + +def _handle_remove_readonly_onexc(func: Callable[..., Any], path: str, exc: BaseException) -> None: + if isinstance(exc, PermissionError): + Path(path).chmod(stat.S_IWUSR | stat.S_IRUSR | stat.S_IXUSR) + func(path) + else: + raise exc + + +def _handle_remove_readonly_onerror(func: Callable[..., Any], path: str, exc_info: tuple[Any, ...]) -> None: if isinstance(exc_info[1], PermissionError): Path(path).chmod(stat.S_IWUSR | stat.S_IRUSR | stat.S_IXUSR) func(path) diff --git a/codeflash/languages/function_optimizer.py b/codeflash/languages/function_optimizer.py index 7055969f7..e933f23c6 100644 --- a/codeflash/languages/function_optimizer.py +++ b/codeflash/languages/function_optimizer.py @@ -1040,7 +1040,7 @@ def _run_line_profiler_for_winner( ) eval_ctx.record_line_profiler_result(best_optimization.candidate.optimization_id, lp_results["str_out"]) best_optimization.line_profiler_test_results = lp_results - except (ValueError, SyntaxError, AttributeError) as e: + except (ValueError, SyntaxError, AttributeError, Exception) as e: logger.warning(f"Line profiler failed for winning candidate: {e}") finally: self.write_code_and_helpers( @@ -1684,6 +1684,9 @@ def _dispatch_repair_if_possible( test_diffs: list[TestDiff] | None = None, ) -> concurrent.futures.Future | None: """Submit a code repair request if the candidate is eligible.""" + if not test_diffs: + return None + max_repairs = get_effort_value(EffortKeys.MAX_CODE_REPAIRS_PER_TRACE, self.effort) if self.repair_counter >= max_repairs: return None diff --git a/codeflash/models/models.py b/codeflash/models/models.py index 33905d361..4e2cea2a9 100644 --- a/codeflash/models/models.py +++ b/codeflash/models/models.py @@ -62,12 +62,6 @@ class AIServiceBatchRefinerCandidate: optimized_line_profiler_results: str -@dataclass(frozen=True) -class AIServiceBatchRefinerRequest: - shared_context: dict[str, Any] - candidates: list[dict[str, Any]] - - # this should be possible to auto serialize @dataclass(frozen=True) class AdaptiveOptimizedCandidate: diff --git a/codeflash/optimization/parallel_evaluator.py b/codeflash/optimization/parallel_evaluator.py index 643bf0a2b..1dd4480ad 100644 --- a/codeflash/optimization/parallel_evaluator.py +++ b/codeflash/optimization/parallel_evaluator.py @@ -13,7 +13,6 @@ from codeflash.code_utils.config_consts import INDIVIDUAL_TESTCASE_TIMEOUT, TOTAL_LOOPING_TIME_EFFECTIVE from codeflash.code_utils.worktree_pool import WorktreePool, WorktreeSlot # noqa: TC001 from codeflash.either import Failure, Success -from codeflash.languages.python.test_runner import async_execute_test_subprocess if TYPE_CHECKING: from codeflash.either import Result @@ -25,6 +24,7 @@ OptimizedCandidateResult, OriginalCodeBaseline, TestDiff, + TestResults, ) @@ -40,18 +40,18 @@ class EvalFailure: class _BehavioralPass: """Intermediate result: candidate passed behavioral tests, ready for benchmarking.""" - slot: WorktreeSlot candidate_index: int perf_test_files: list[str] test_env: dict[str, str] pytest_cmd_list: list[str] + behavior_test_results: TestResults class ParallelCandidateEvaluator: """Evaluates optimization candidates in parallel using git worktrees. Two-phase evaluation: - Phase 1 (concurrent): behavioral correctness tests + Phase 1 (concurrent): behavioral correctness tests — slots released after each test Phase 2 (sequential): benchmarking — one candidate at a time for accurate timing """ @@ -59,7 +59,6 @@ def __init__(self, optimizer: FunctionOptimizer, pool_size: int = 4) -> None: self._optimizer = optimizer self._pool_size = pool_size self._pool: WorktreePool | None = None - self._code_replace_lock = anyio.Lock() async def evaluate_candidates( self, @@ -80,7 +79,7 @@ async def evaluate_candidates( async with WorktreePool(pool_size=self._pool_size) as pool: self._pool = pool - # Phase 1: concurrent behavioral tests + # Phase 1: concurrent behavioral tests (slots released after each test) behavioral_passes: list[tuple[int, CandidateNode, _BehavioralPass]] = [] async with anyio.create_task_group() as tg: @@ -100,14 +99,15 @@ async def evaluate_candidates( # Phase 2: sequential benchmarking (no CPU contention) for result_index, candidate_node, bp in behavioral_passes: + slot = await pool.acquire() try: - bench_result = await self._benchmark_phase(bp, original_code_baseline) + bench_result = await self._benchmark_phase(slot, bp, original_code_baseline) results[result_index] = (candidate_node, bench_result) except Exception as exc: logger.error(f"Benchmark for {candidate_node.candidate.optimization_id} raised: {exc}") results[result_index] = (candidate_node, Failure(EvalFailure(message=str(exc)))) finally: - await pool.release(bp.slot) + await pool.release(slot) return results @@ -123,7 +123,7 @@ async def _behavioral_phase( results: list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]], behavioral_passes: list[tuple[int, CandidateNode, _BehavioralPass]], ) -> None: - """Run behavioral tests for a candidate. On pass, hold the slot for benchmarking.""" + """Run behavioral tests for a candidate. Slot is always released after the test.""" assert self._pool is not None slot = await self._pool.acquire() try: @@ -135,18 +135,22 @@ async def _behavioral_phase( original_code_baseline=original_code_baseline, original_helper_code=original_helper_code, ) - except Exception as exc: + except BaseException as exc: + if not isinstance(exc, Exception): + await self._pool.release(slot) + raise logger.error(f"Candidate {candidate_node.candidate.optimization_id} raised: {exc}") results[result_index] = (candidate_node, Failure(EvalFailure(message=str(exc)))) await self._pool.release(slot) return + # Always release slot — Phase 2 re-acquires for benchmarking + await self._pool.release(slot) + if isinstance(outcome, Failure): results[result_index] = (candidate_node, outcome) - await self._pool.release(slot) return - # Behavioral pass — hold the slot for Phase 2 behavioral_passes.append((result_index, candidate_node, outcome.unwrap())) async def _run_behavioral( @@ -162,10 +166,9 @@ async def _run_behavioral( opt = self._optimizer fto = opt.function_to_optimize - async with self._code_replace_lock: - candidate_files = await anyio.to_thread.run_sync( - self._replace_and_capture, opt, code_context, candidate, original_helper_code - ) + candidate_files = await anyio.to_thread.run_sync( + self._replace_and_capture, opt, code_context, candidate, original_helper_code + ) if candidate_files is None: return Failure(EvalFailure(message="Code replacement failed")) @@ -198,13 +201,14 @@ async def _run_behavioral( test_env["PYTHONPATH"] = str(worktree_project_root) from codeflash.code_utils.compat import IS_POSIX, SAFE_SYS_EXECUTABLE + from codeflash.languages.python.test_runner import async_execute_test_subprocess pytest_cmd_list = opt.language_support.build_pytest_cmd(SAFE_SYS_EXECUTABLE, IS_POSIX) # type: ignore[attr-defined] blocklisted_plugins = ["benchmark", "codspeed", "xdist", "sugar"] blocklist_args = [f"-p no:{plugin}" for plugin in blocklisted_plugins] - result_file_path = get_run_tmp_file(Path(f"pytest_results_candidate_{candidate_index}.xml")) + result_file_path = get_run_tmp_file(Path(f"pytest_results_candidate_{candidate_index}_{slot.index}.xml")) result_args = [f"--junitxml={result_file_path.as_posix()}", "-o", "junit_logging=all"] pytest_test_env = test_env.copy() @@ -250,24 +254,32 @@ async def _run_behavioral( return Success( _BehavioralPass( - slot=slot, candidate_index=candidate_index, perf_test_files=perf_test_files, test_env=pytest_test_env, pytest_cmd_list=pytest_cmd_list, + behavior_test_results=behavior_test_results, ) ) async def _benchmark_phase( - self, bp: _BehavioralPass, original_code_baseline: OriginalCodeBaseline + self, slot: WorktreeSlot, bp: _BehavioralPass, original_code_baseline: OriginalCodeBaseline ) -> Result[OptimizedCandidateResult, EvalFailure]: """Run performance benchmarks sequentially for a candidate that passed behavioral tests.""" opt = self._optimizer + # Re-stage the candidate code in the acquired slot + fto = opt.function_to_optimize + for file in opt.test_files.test_files: + if file.benchmarking_file_path and file.benchmarking_file_path.exists(): + await slot.write_candidate( + file.benchmarking_file_path, file.benchmarking_file_path.read_text(encoding="utf-8") + ) + blocklisted_plugins = ["benchmark", "codspeed", "xdist", "sugar"] blocklist_args = [f"-p no:{plugin}" for plugin in blocklisted_plugins] - perf_result_file = get_run_tmp_file(Path(f"pytest_perf_candidate_{bp.candidate_index}.xml")) + perf_result_file = get_run_tmp_file(Path(f"pytest_perf_candidate_{bp.candidate_index}_{slot.index}.xml")) perf_result_args = [f"--junitxml={perf_result_file.as_posix()}", "-o", "junit_logging=all"] perf_pytest_args = [ @@ -282,8 +294,10 @@ async def _benchmark_phase( perf_cmd = bp.pytest_cmd_list + perf_pytest_args + blocklist_args + perf_result_args + bp.perf_test_files + from codeflash.languages.python.test_runner import async_execute_test_subprocess + try: - await async_execute_test_subprocess(cmd_list=perf_cmd, cwd=bp.slot.path, env=bp.test_env, timeout=600) + await async_execute_test_subprocess(cmd_list=perf_cmd, cwd=slot.path, env=bp.test_env, timeout=600) except subprocess.TimeoutExpired: logger.warning(f"Performance test timeout for candidate {bp.candidate_index}") return Failure(EvalFailure(message="Performance test timeout")) @@ -307,7 +321,7 @@ async def _benchmark_phase( OptimizedCandidateResult( max_loop_count=loop_count, best_test_runtime=total_timing, - behavior_test_results=None, + behavior_test_results=bp.behavior_test_results, benchmarking_test_results=perf_test_results, replay_benchmarking_test_results=None, optimization_candidate_index=bp.candidate_index, diff --git a/tests/test_parallel_evaluator.py b/tests/test_parallel_evaluator.py index b62753039..0ad5474ae 100644 --- a/tests/test_parallel_evaluator.py +++ b/tests/test_parallel_evaluator.py @@ -35,7 +35,6 @@ async def _run() -> None: for slot in pool._slots: assert slot.path.exists() assert slot.path.is_dir() - assert (slot.path / ".codeflash_pool.pid").exists() # After cleanup, slots are cleared assert len(pool._slots) == 0 @@ -266,10 +265,17 @@ def test_successful_candidate_returns_result(self, tmp_path: Path) -> None: mock_result = MagicMock() mock_result.best_test_runtime = 5000 + mock_behavior_results = MagicMock() + async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg] - slot = MagicMock() return Success( - _BehavioralPass(slot=slot, candidate_index=0, perf_test_files=[], test_env={}, pytest_cmd_list=[]) + _BehavioralPass( + candidate_index=0, + perf_test_files=[], + test_env={}, + pytest_cmd_list=[], + behavior_test_results=mock_behavior_results, + ) ) async def _run() -> list: # type: ignore[type-arg] @@ -306,13 +312,19 @@ def test_multiple_candidates_evaluated_concurrently(self, tmp_path: Path) -> Non mock_result.best_test_runtime = 1000 behavioral_call_count = 0 + mock_behavior_results = MagicMock() async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg] nonlocal behavioral_call_count behavioral_call_count += 1 - slot = MagicMock() return Success( - _BehavioralPass(slot=slot, candidate_index=0, perf_test_files=[], test_env={}, pytest_cmd_list=[]) + _BehavioralPass( + candidate_index=0, + perf_test_files=[], + test_env={}, + pytest_cmd_list=[], + behavior_test_results=mock_behavior_results, + ) ) benchmark_call_count = 0 From 2d43b88323217d5bfe591a7b9b5fd39bfe38e122 Mon Sep 17 00:00:00 2001 From: Kevin Turcios Date: Wed, 6 May 2026 19:50:58 -0500 Subject: [PATCH 3/4] fix: dispatch adaptive optimizations in parallel path + deadlock regression test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Parallel path now checks if a successful candidate was previously refined (via path_to_root ancestry). If so, dispatches adaptive optimization instead of batch refinement — matching sequential behavior. - Adds regression test: 6 candidates with pool_size=2 all pass, proving no deadlock occurs when passes exceed available slots. --- codeflash/languages/function_optimizer.py | 36 +++++++++++----- tests/test_parallel_evaluator.py | 50 +++++++++++++++++++++++ 2 files changed, 76 insertions(+), 10 deletions(-) diff --git a/codeflash/languages/function_optimizer.py b/codeflash/languages/function_optimizer.py index e933f23c6..0b8aefa57 100644 --- a/codeflash/languages/function_optimizer.py +++ b/codeflash/languages/function_optimizer.py @@ -1602,18 +1602,34 @@ def _evaluate_candidates_parallel( ) eval_ctx.valid_optimizations.append(best_optimization) - batch_refiner_candidates.append( - AIServiceBatchRefinerCandidate( - optimization_id=candidate.optimization_id, - optimized_source_code=candidate.source_code.markdown, - optimized_explanation=candidate.explanation, - optimized_code_runtime=candidate_result.best_test_runtime, - original_code_runtime=original_code_baseline.runtime, - speedup=f"{int(perf_gain * 100)}%", - optimized_line_profiler_results="", - ) + current_tree_candidates = candidate_node.path_to_root() + is_candidate_refined_before = any( + c.source == OptimizedCandidateSource.REFINE for c in current_tree_candidates ) + if is_candidate_refined_before: + future_adaptive = self.call_adaptive_optimize( + trace_id=self.get_trace_id(exp_type), + original_source_code=code_context.read_writable_code.markdown, + prev_candidates=current_tree_candidates, + eval_ctx=eval_ctx, + ai_service_client=ai_service_client, + ) + if future_adaptive: + self.future_adaptive_optimizations.append(future_adaptive) + else: + batch_refiner_candidates.append( + AIServiceBatchRefinerCandidate( + optimization_id=candidate.optimization_id, + optimized_source_code=candidate.source_code.markdown, + optimized_explanation=candidate.explanation, + optimized_code_runtime=candidate_result.best_test_runtime, + original_code_runtime=original_code_baseline.runtime, + speedup=f"{int(perf_gain * 100)}%", + optimized_line_profiler_results="", + ) + ) + # Dispatch refinement immediately so CandidateProcessor sees it if batch_refiner_candidates: self._dispatch_refinement( diff --git a/tests/test_parallel_evaluator.py b/tests/test_parallel_evaluator.py index 0ad5474ae..2880e30ba 100644 --- a/tests/test_parallel_evaluator.py +++ b/tests/test_parallel_evaluator.py @@ -354,3 +354,53 @@ async def _run() -> list: # type: ignore[type-arg] assert benchmark_call_count == 3 for _, result in results: assert is_successful(result) + + def test_more_candidates_than_slots_no_deadlock(self, tmp_path: Path) -> None: + """Regression test: more passing candidates than pool slots must not deadlock.""" + from codeflash.optimization.parallel_evaluator import _BehavioralPass + + opt = self._make_optimizer_mock(tmp_path) + (tmp_path / "src").mkdir(parents=True) + (tmp_path / "src" / "module.py").write_text("def f(): pass", encoding="utf-8") + + nodes = [self._make_candidate_node(f"cand_{i}") for i in range(6)] + evaluator = ParallelCandidateEvaluator(opt, pool_size=2) + + repo_root = Path(__file__).resolve().parents[1] + mock_result = MagicMock() + mock_result.best_test_runtime = 2000 + mock_behavior_results = MagicMock() + + async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg] + return Success( + _BehavioralPass( + candidate_index=0, + perf_test_files=[], + test_env={}, + pytest_cmd_list=[], + behavior_test_results=mock_behavior_results, + ) + ) + + async def mock_benchmark(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg] + return Success(mock_result) + + async def _run() -> list: # type: ignore[type-arg] + with ( + patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root), + patch.object(ParallelCandidateEvaluator, "_run_behavioral", mock_behavioral), + patch.object(ParallelCandidateEvaluator, "_benchmark_phase", mock_benchmark), + ): + return await evaluator.evaluate_candidates( + candidates=[(n, i, None) for i, n in enumerate(nodes)], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + # If this deadlocks, the test will timeout + results = anyio.run(_run) + assert len(results) == 6 + for _, result in results: + assert is_successful(result) From d0586fd6755fc094367ac99bfa5191afff1422a8 Mon Sep 17 00:00:00 2001 From: Kevin Turcios Date: Wed, 6 May 2026 20:40:59 -0500 Subject: [PATCH 4/4] fix: race conditions and re-staging bug in parallel evaluator - Add replace_lock to serialize main-tree access in _replace_and_capture - Fix Phase 2 benchmark not writing candidate code to fresh worktree slot - Add _closed flag and ClosedResourceError suppression in pool release - Broaden exception handling and protect finally restore block - Remove unused eval_ctx/exp_type params from run_parallel_evaluation - Add tests for re-staging, partial pool init, restore-on-failure, empty candidates --- codeflash/code_utils/worktree_pool.py | 8 +- codeflash/languages/function_optimizer.py | 2 - codeflash/optimization/parallel_evaluator.py | 42 +++--- tests/test_parallel_evaluator.py | 146 +++++++++++++++++++ 4 files changed, 176 insertions(+), 22 deletions(-) diff --git a/codeflash/code_utils/worktree_pool.py b/codeflash/code_utils/worktree_pool.py index 0bb8452d1..f71eba5d9 100644 --- a/codeflash/code_utils/worktree_pool.py +++ b/codeflash/code_utils/worktree_pool.py @@ -46,6 +46,7 @@ def __init__(self, pool_size: int = 4, base_dir: Path | None = None) -> None: self._send: anyio.abc.ObjectSendStream[WorktreeSlot] | None = None self._receive: anyio.abc.ObjectReceiveStream[WorktreeSlot] | None = None self._initialized = False + self._closed = False async def initialize(self) -> None: if self._initialized: @@ -93,10 +94,15 @@ async def acquire(self) -> WorktreeSlot: return await self._receive.receive() async def release(self, slot: WorktreeSlot) -> None: + if self._closed: + return assert self._send is not None - await self._send.send(slot) + with contextlib.suppress(anyio.ClosedResourceError): + await self._send.send(slot) async def cleanup(self) -> None: + self._closed = True + if self._send is not None: await self._send.aclose() if self._receive is not None: diff --git a/codeflash/languages/function_optimizer.py b/codeflash/languages/function_optimizer.py index 0b8aefa57..384ad6712 100644 --- a/codeflash/languages/function_optimizer.py +++ b/codeflash/languages/function_optimizer.py @@ -1541,8 +1541,6 @@ def _evaluate_candidates_parallel( original_code_baseline=original_code_baseline, original_helper_code=original_helper_code, file_path_to_helper_classes=file_path_to_helper_classes, - eval_ctx=eval_ctx, - exp_type=exp_type, pool_size=pool_size, ) diff --git a/codeflash/optimization/parallel_evaluator.py b/codeflash/optimization/parallel_evaluator.py index 1dd4480ad..22e0640bc 100644 --- a/codeflash/optimization/parallel_evaluator.py +++ b/codeflash/optimization/parallel_evaluator.py @@ -18,7 +18,6 @@ from codeflash.either import Result from codeflash.languages.function_optimizer import CandidateNode, FunctionOptimizer from codeflash.models.models import ( - CandidateEvaluationContext, CodeOptimizationContext, OptimizedCandidate, OptimizedCandidateResult, @@ -45,6 +44,9 @@ class _BehavioralPass: test_env: dict[str, str] pytest_cmd_list: list[str] behavior_test_results: TestResults + fto_code: str + helper_codes: dict[Path, str] + fto_file_path: Path class ParallelCandidateEvaluator: @@ -59,6 +61,7 @@ def __init__(self, optimizer: FunctionOptimizer, pool_size: int = 4) -> None: self._optimizer = optimizer self._pool_size = pool_size self._pool: WorktreePool | None = None + self._replace_lock = anyio.Lock() async def evaluate_candidates( self, @@ -92,7 +95,6 @@ async def evaluate_candidates( code_context, original_code_baseline, original_helper_code, - file_path_to_helper_classes, results, behavioral_passes, ) @@ -119,7 +121,6 @@ async def _behavioral_phase( code_context: CodeOptimizationContext, original_code_baseline: OriginalCodeBaseline, original_helper_code: dict[Path, str], - file_path_to_helper_classes: dict[Path, set[str]], results: list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]], behavioral_passes: list[tuple[int, CandidateNode, _BehavioralPass]], ) -> None: @@ -135,16 +136,10 @@ async def _behavioral_phase( original_code_baseline=original_code_baseline, original_helper_code=original_helper_code, ) - except BaseException as exc: - if not isinstance(exc, Exception): - await self._pool.release(slot) - raise - logger.error(f"Candidate {candidate_node.candidate.optimization_id} raised: {exc}") - results[result_index] = (candidate_node, Failure(EvalFailure(message=str(exc)))) + except BaseException: await self._pool.release(slot) - return + raise - # Always release slot — Phase 2 re-acquires for benchmarking await self._pool.release(slot) if isinstance(outcome, Failure): @@ -166,9 +161,11 @@ async def _run_behavioral( opt = self._optimizer fto = opt.function_to_optimize - candidate_files = await anyio.to_thread.run_sync( - self._replace_and_capture, opt, code_context, candidate, original_helper_code - ) + # Serialize main-tree access: replace_and_capture writes/reads/restores shared files + async with self._replace_lock: + candidate_files = await anyio.to_thread.run_sync( + self._replace_and_capture, opt, code_context, candidate, original_helper_code + ) if candidate_files is None: return Failure(EvalFailure(message="Code replacement failed")) @@ -259,6 +256,9 @@ async def _run_behavioral( test_env=pytest_test_env, pytest_cmd_list=pytest_cmd_list, behavior_test_results=behavior_test_results, + fto_code=fto_code, + helper_codes=helper_codes, + fto_file_path=Path(fto.file_path), ) ) @@ -269,7 +269,10 @@ async def _benchmark_phase( opt = self._optimizer # Re-stage the candidate code in the acquired slot - fto = opt.function_to_optimize + await slot.write_candidate(bp.fto_file_path, bp.fto_code) + for module_path, code in bp.helper_codes.items(): + await slot.write_candidate(module_path, code) + for file in opt.test_files.test_files: if file.benchmarking_file_path and file.benchmarking_file_path.exists(): await slot.write_candidate( @@ -352,11 +355,14 @@ def _replace_and_capture( fto_code = Path(fto.file_path).read_text("utf-8") helper_codes = {Path(p): Path(p).read_text("utf-8") for p in original_helper_code} return fto_code, helper_codes - except (ValueError, SyntaxError, AttributeError) as e: + except Exception as e: logger.error(f"Code replacement failed: {e}") return None finally: - opt.write_code_and_helpers(opt.function_to_optimize_source_code, original_helper_code, fto.file_path) + try: + opt.write_code_and_helpers(opt.function_to_optimize_source_code, original_helper_code, fto.file_path) + except Exception as restore_err: + logger.error(f"Failed to restore main tree after code replacement: {restore_err}") def run_parallel_evaluation( @@ -366,8 +372,6 @@ def run_parallel_evaluation( original_code_baseline: OriginalCodeBaseline, original_helper_code: dict[Path, str], file_path_to_helper_classes: dict[Path, set[str]], - eval_ctx: CandidateEvaluationContext, - exp_type: str, pool_size: int = 4, ) -> tuple[list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]], list, list]: """Entry point: run parallel candidate evaluation from sync code via anyio. diff --git a/tests/test_parallel_evaluator.py b/tests/test_parallel_evaluator.py index 2880e30ba..bb02e7735 100644 --- a/tests/test_parallel_evaluator.py +++ b/tests/test_parallel_evaluator.py @@ -5,6 +5,7 @@ import subprocess import sys from pathlib import Path +from typing import Any from unittest.mock import MagicMock, patch import anyio @@ -41,6 +42,40 @@ async def _run() -> None: anyio.run(_run) + def test_partial_pool_initialization(self, tmp_path: Path) -> None: + """Pool operates at reduced capacity if some slots fail to create.""" + from unittest.mock import patch + + from codeflash.code_utils.worktree_pool import WorktreePool + + pool_size = 3 + base_dir = tmp_path.resolve() / "worktrees" + repo_root = Path(__file__).resolve().parents[1] + + call_count = 0 + + original_create_slot = WorktreePool._create_slot + + async def failing_create_slot(self: Any, index: int) -> Any: + nonlocal call_count + call_count += 1 + if index == 1: + raise RuntimeError("Simulated git worktree failure") + return await original_create_slot(self, index) + + async def _run() -> None: + with ( + patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root), + patch.object(WorktreePool, "_create_slot", failing_create_slot), + ): + async with WorktreePool(pool_size=pool_size, base_dir=base_dir) as pool: + assert len(pool._slots) == 2 + slot = await pool.acquire() + assert slot.index != 1 + await pool.release(slot) + + anyio.run(_run) + def test_acquire_release_round_trip(self, tmp_path: Path) -> None: from unittest.mock import patch @@ -275,6 +310,9 @@ async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> test_env={}, pytest_cmd_list=[], behavior_test_results=mock_behavior_results, + fto_code="def f(): pass", + helper_codes={}, + fto_file_path=Path("/tmp/module.py"), ) ) @@ -324,6 +362,9 @@ async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> test_env={}, pytest_cmd_list=[], behavior_test_results=mock_behavior_results, + fto_code="def f(): pass", + helper_codes={}, + fto_file_path=Path("/tmp/module.py"), ) ) @@ -355,6 +396,108 @@ async def _run() -> list: # type: ignore[type-arg] for _, result in results: assert is_successful(result) + def test_benchmark_phase_restages_candidate_code(self, tmp_path: Path) -> None: + """Phase 2 must write fto_code and helper_codes to the slot before running benchmarks.""" + from codeflash.optimization.parallel_evaluator import _BehavioralPass + + opt = self._make_optimizer_mock(tmp_path) + (tmp_path / "src").mkdir(parents=True) + (tmp_path / "src" / "module.py").write_text("def f(): pass", encoding="utf-8") + + node = self._make_candidate_node() + evaluator = ParallelCandidateEvaluator(opt, pool_size=1) + + repo_root = Path(__file__).resolve().parents[1] + fto_code = "def f(): return 42 # optimized" + helper_path = tmp_path / "src" / "helpers.py" + helper_codes = {helper_path: "HELPER_CODE = True"} + + write_calls: list[tuple[Path, str]] = [] + + async def tracking_write_candidate(self_slot: object, file_path: Path, code: str) -> None: + write_calls.append((file_path, code)) + + async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg] + return Success( + _BehavioralPass( + candidate_index=0, + perf_test_files=[], + test_env={"PATH": "/usr/bin"}, + pytest_cmd_list=[sys.executable, "-m", "pytest"], + behavior_test_results=MagicMock(), + fto_code=fto_code, + helper_codes=helper_codes, + fto_file_path=Path(opt.function_to_optimize.file_path), + ) + ) + + async def _run() -> list: # type: ignore[type-arg] + with ( + patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root), + patch.object(ParallelCandidateEvaluator, "_run_behavioral", mock_behavioral), + patch( + "codeflash.code_utils.worktree_pool.WorktreeSlot.write_candidate", tracking_write_candidate + ), + patch( + "codeflash.languages.python.test_runner.async_execute_test_subprocess", + return_value=MagicMock(returncode=0, stdout="", stderr=""), + ), + patch( + "codeflash.verification.parse_test_output.parse_test_xml", + return_value=MagicMock(test_results=[MagicMock()], effective_loop_count=lambda: 10, total_passed_runtime=lambda: 5000), + ), + ): + return await evaluator.evaluate_candidates( + candidates=[(node, 0, None)], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + anyio.run(_run) + + written_codes = {p: c for p, c in write_calls} + assert Path(opt.function_to_optimize.file_path) in written_codes + assert written_codes[Path(opt.function_to_optimize.file_path)] == fto_code + assert helper_path in written_codes + assert written_codes[helper_path] == "HELPER_CODE = True" + + def test_empty_candidates_returns_empty(self, tmp_path: Path) -> None: + opt = self._make_optimizer_mock(tmp_path) + evaluator = ParallelCandidateEvaluator(opt, pool_size=1) + repo_root = Path(__file__).resolve().parents[1] + + async def _run() -> list: # type: ignore[type-arg] + with patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root): + return await evaluator.evaluate_candidates( + candidates=[], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + results = anyio.run(_run) + assert results == [] + + def test_replace_and_capture_restores_on_failure(self, tmp_path: Path) -> None: + """_replace_and_capture must restore original code even when replacement raises.""" + opt = self._make_optimizer_mock(tmp_path) + (tmp_path / "src").mkdir(parents=True) + original_code = "def f(): pass" + (tmp_path / "src" / "module.py").write_text(original_code, encoding="utf-8") + + opt.replace_function_and_helpers_with_optimized_code.side_effect = ValueError("bad code") + + result = ParallelCandidateEvaluator._replace_and_capture( + opt, MagicMock(), MagicMock(), {} + ) + assert result is None + opt.write_code_and_helpers.assert_called_once_with( + opt.function_to_optimize_source_code, {}, opt.function_to_optimize.file_path + ) + def test_more_candidates_than_slots_no_deadlock(self, tmp_path: Path) -> None: """Regression test: more passing candidates than pool slots must not deadlock.""" from codeflash.optimization.parallel_evaluator import _BehavioralPass @@ -379,6 +522,9 @@ async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> test_env={}, pytest_cmd_list=[], behavior_test_results=mock_behavior_results, + fto_code="def f(): pass", + helper_codes={}, + fto_file_path=Path("/tmp/module.py"), ) )