Skip to content

Commit 31d684b

Browse files
committed
fix: resolve deadlock, Pydantic crash, and cleanup issues in parallel evaluator
Critical fixes from code review: - Deadlock: slots are now released after behavioral tests (Phase 1), re-acquired for benchmarking (Phase 2). Previously, holding slots across phases caused deadlock when passes >= pool_size. - Pydantic ValidationError: behavior_test_results is now stored in _BehavioralPass and passed through to OptimizedCandidateResult. - Slot leak on cancellation: catch BaseException in _behavioral_phase. WorktreePool improvements: - Graceful partial creation failure (one slot failing doesn't crash pool). - Cleanup resilience (one rmtree failure doesn't abort others). - Stream lifecycle: close send/receive in cleanup(). - Async-safe: use anyio.Path for exists() checks. - Python 3.12+: use onexc instead of deprecated onerror for rmtree. - Remove dead code: PID file, unused restore_file method. Other fixes: - _run_line_profiler_for_winner: catch all exceptions. - _dispatch_repair_if_possible: skip when diffs are empty. - aiservice.py: pass language to _get_valid_candidates in batch path. - Remove unused AIServiceBatchRefinerRequest dataclass. - Fix result file path collision: include slot.index in filename. - Remove _code_replace_lock (no longer needed since slots are released immediately and _replace_and_capture is serialized by GIL).
1 parent 4da55ce commit 31d684b

6 files changed

Lines changed: 104 additions & 56 deletions

File tree

codeflash/api/aiservice.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ def optimize_code_refinement_batch(
440440

441441
if response.status_code == 200:
442442
refined_optimizations = response.json()["refinements"]
443-
return self._get_valid_candidates(refined_optimizations, OptimizedCandidateSource.REFINE)
443+
return self._get_valid_candidates(refined_optimizations, OptimizedCandidateSource.REFINE, language=language)
444444

445445
self.log_error_response(response, "generating batch optimized candidates", "cli-optimize-error-response")
446446
console.rule()

codeflash/code_utils/worktree_pool.py

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
import contextlib
44
import functools
5-
import os
65
import shutil
76
import stat
7+
import sys
88
from pathlib import Path
99
from typing import TYPE_CHECKING, Any
1010

@@ -17,6 +17,8 @@
1717
from codeflash.cli_cmds.console import logger
1818
from codeflash.code_utils.git_utils import git_root_dir, mirror_path
1919

20+
_USE_ONEXC = sys.version_info >= (3, 12)
21+
2022

2123
class WorktreeSlot:
2224
__slots__ = ("_git_root", "index", "path")
@@ -34,10 +36,6 @@ async def write_candidate(self, file_path: Path, code: str) -> None:
3436
await mirrored.parent.mkdir(parents=True, exist_ok=True)
3537
await mirrored.write_text(code, encoding="utf-8")
3638

37-
async def restore_file(self, file_path: Path, original_code: str) -> None:
38-
mirrored = anyio.Path(self.mirror(file_path))
39-
await mirrored.write_text(original_code, encoding="utf-8")
40-
4139

4240
class WorktreePool:
4341
def __init__(self, pool_size: int = 4, base_dir: Path | None = None) -> None:
@@ -54,36 +52,40 @@ async def initialize(self) -> None:
5452
return
5553
await anyio.Path(self._base_dir).mkdir(parents=True, exist_ok=True)
5654

55+
results: list[WorktreeSlot | None] = [None] * self._pool_size
5756
async with anyio.create_task_group() as tg:
58-
results: list[WorktreeSlot | None] = [None] * self._pool_size
5957
for i in range(self._pool_size):
6058
tg.start_soon(self._create_slot_task, i, results)
6159

6260
self._slots = [s for s in results if s is not None]
63-
self._send, self._receive = anyio.create_memory_object_stream[WorktreeSlot](self._pool_size)
61+
if not self._slots:
62+
msg = "Failed to create any worktree slots"
63+
raise RuntimeError(msg)
64+
65+
self._send, self._receive = anyio.create_memory_object_stream[WorktreeSlot](len(self._slots))
6466
for slot in self._slots:
6567
await self._send.send(slot)
6668
self._initialized = True
6769
logger.debug(f"WorktreePool initialized with {len(self._slots)} slots at {self._base_dir}")
6870

6971
async def _create_slot_task(self, index: int, results: list[WorktreeSlot | None]) -> None:
70-
results[index] = await self._create_slot(index)
72+
try:
73+
results[index] = await self._create_slot(index)
74+
except Exception as exc:
75+
logger.warning(f"Failed to create worktree slot {index}: {exc}")
7176

7277
async def _create_slot(self, index: int) -> WorktreeSlot:
7378
slot_dir = self._base_dir / f"slot-{index}"
74-
if slot_dir.exists():
75-
await anyio.to_thread.run_sync(functools.partial(shutil.rmtree, slot_dir, onerror=_handle_remove_readonly))
79+
if await anyio.Path(slot_dir).exists():
80+
await anyio.to_thread.run_sync(functools.partial(_rmtree_safe, slot_dir))
7681

7782
result = await anyio.run_process(
7883
["git", "-C", str(self._git_root), "worktree", "add", "--detach", str(slot_dir), "HEAD"], check=False
7984
)
8085
if result.returncode != 0:
81-
msg = f"Failed to create worktree slot {index}: {result.stderr.decode()}"
86+
msg = f"git worktree add failed for slot {index}: {result.stderr.decode()}"
8287
raise RuntimeError(msg)
8388

84-
pid_file = anyio.Path(slot_dir / ".codeflash_pool.pid")
85-
await pid_file.write_text(str(os.getpid()), encoding="utf-8")
86-
8789
return WorktreeSlot(slot_dir, index, self._git_root)
8890

8991
async def acquire(self) -> WorktreeSlot:
@@ -95,21 +97,29 @@ async def release(self, slot: WorktreeSlot) -> None:
9597
await self._send.send(slot)
9698

9799
async def cleanup(self) -> None:
98-
async with anyio.create_task_group() as tg:
99-
for slot in self._slots:
100-
tg.start_soon(self._remove_slot_async, slot)
100+
if self._send is not None:
101+
await self._send.aclose()
102+
if self._receive is not None:
103+
await self._receive.aclose()
104+
105+
for slot in self._slots:
106+
try:
107+
await self._remove_slot_async(slot)
108+
except Exception as exc:
109+
logger.warning(f"Failed to remove worktree slot {slot.index}: {exc}")
110+
101111
self._slots.clear()
102112
self._initialized = False
103113

104-
if self._base_dir.exists():
114+
if await anyio.Path(self._base_dir).exists():
105115
with contextlib.suppress(Exception):
106116
await anyio.run_process(["git", "-C", str(self._git_root), "worktree", "prune"], check=False)
107117
with contextlib.suppress(OSError):
108-
self._base_dir.rmdir()
118+
await anyio.Path(self._base_dir).rmdir()
109119

110120
async def _remove_slot_async(self, slot: WorktreeSlot) -> None:
111-
if slot.path.exists():
112-
await anyio.to_thread.run_sync(functools.partial(shutil.rmtree, slot.path, onerror=_handle_remove_readonly))
121+
if await anyio.Path(slot.path).exists():
122+
await anyio.to_thread.run_sync(functools.partial(_rmtree_safe, slot.path))
113123

114124
async def __aenter__(self) -> Self:
115125
await self.initialize()
@@ -119,7 +129,22 @@ async def __aexit__(self, *exc: object) -> None:
119129
await self.cleanup()
120130

121131

122-
def _handle_remove_readonly(func: Callable[..., Any], path: str, exc_info: tuple[Any, ...]) -> None:
132+
def _rmtree_safe(path: Path) -> None:
133+
if _USE_ONEXC:
134+
shutil.rmtree(path, onexc=_handle_remove_readonly_onexc)
135+
else:
136+
shutil.rmtree(path, onerror=_handle_remove_readonly_onerror)
137+
138+
139+
def _handle_remove_readonly_onexc(func: Callable[..., Any], path: str, exc: BaseException) -> None:
140+
if isinstance(exc, PermissionError):
141+
Path(path).chmod(stat.S_IWUSR | stat.S_IRUSR | stat.S_IXUSR)
142+
func(path)
143+
else:
144+
raise exc
145+
146+
147+
def _handle_remove_readonly_onerror(func: Callable[..., Any], path: str, exc_info: tuple[Any, ...]) -> None:
123148
if isinstance(exc_info[1], PermissionError):
124149
Path(path).chmod(stat.S_IWUSR | stat.S_IRUSR | stat.S_IXUSR)
125150
func(path)

codeflash/languages/function_optimizer.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1040,7 +1040,7 @@ def _run_line_profiler_for_winner(
10401040
)
10411041
eval_ctx.record_line_profiler_result(best_optimization.candidate.optimization_id, lp_results["str_out"])
10421042
best_optimization.line_profiler_test_results = lp_results
1043-
except (ValueError, SyntaxError, AttributeError) as e:
1043+
except (ValueError, SyntaxError, AttributeError, Exception) as e:
10441044
logger.warning(f"Line profiler failed for winning candidate: {e}")
10451045
finally:
10461046
self.write_code_and_helpers(
@@ -1684,6 +1684,9 @@ def _dispatch_repair_if_possible(
16841684
test_diffs: list[TestDiff] | None = None,
16851685
) -> concurrent.futures.Future | None:
16861686
"""Submit a code repair request if the candidate is eligible."""
1687+
if not test_diffs:
1688+
return None
1689+
16871690
max_repairs = get_effort_value(EffortKeys.MAX_CODE_REPAIRS_PER_TRACE, self.effort)
16881691
if self.repair_counter >= max_repairs:
16891692
return None

codeflash/models/models.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,6 @@ class AIServiceBatchRefinerCandidate:
6262
optimized_line_profiler_results: str
6363

6464

65-
@dataclass(frozen=True)
66-
class AIServiceBatchRefinerRequest:
67-
shared_context: dict[str, Any]
68-
candidates: list[dict[str, Any]]
69-
70-
7165
# this should be possible to auto serialize
7266
@dataclass(frozen=True)
7367
class AdaptiveOptimizedCandidate:

codeflash/optimization/parallel_evaluator.py

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
from codeflash.code_utils.config_consts import INDIVIDUAL_TESTCASE_TIMEOUT, TOTAL_LOOPING_TIME_EFFECTIVE
1414
from codeflash.code_utils.worktree_pool import WorktreePool, WorktreeSlot # noqa: TC001
1515
from codeflash.either import Failure, Success
16-
from codeflash.languages.python.test_runner import async_execute_test_subprocess
1716

1817
if TYPE_CHECKING:
1918
from codeflash.either import Result
@@ -25,6 +24,7 @@
2524
OptimizedCandidateResult,
2625
OriginalCodeBaseline,
2726
TestDiff,
27+
TestResults,
2828
)
2929

3030

@@ -40,26 +40,25 @@ class EvalFailure:
4040
class _BehavioralPass:
4141
"""Intermediate result: candidate passed behavioral tests, ready for benchmarking."""
4242

43-
slot: WorktreeSlot
4443
candidate_index: int
4544
perf_test_files: list[str]
4645
test_env: dict[str, str]
4746
pytest_cmd_list: list[str]
47+
behavior_test_results: TestResults
4848

4949

5050
class ParallelCandidateEvaluator:
5151
"""Evaluates optimization candidates in parallel using git worktrees.
5252
5353
Two-phase evaluation:
54-
Phase 1 (concurrent): behavioral correctness tests
54+
Phase 1 (concurrent): behavioral correctness tests — slots released after each test
5555
Phase 2 (sequential): benchmarking — one candidate at a time for accurate timing
5656
"""
5757

5858
def __init__(self, optimizer: FunctionOptimizer, pool_size: int = 4) -> None:
5959
self._optimizer = optimizer
6060
self._pool_size = pool_size
6161
self._pool: WorktreePool | None = None
62-
self._code_replace_lock = anyio.Lock()
6362

6463
async def evaluate_candidates(
6564
self,
@@ -80,7 +79,7 @@ async def evaluate_candidates(
8079
async with WorktreePool(pool_size=self._pool_size) as pool:
8180
self._pool = pool
8281

83-
# Phase 1: concurrent behavioral tests
82+
# Phase 1: concurrent behavioral tests (slots released after each test)
8483
behavioral_passes: list[tuple[int, CandidateNode, _BehavioralPass]] = []
8584

8685
async with anyio.create_task_group() as tg:
@@ -100,14 +99,15 @@ async def evaluate_candidates(
10099

101100
# Phase 2: sequential benchmarking (no CPU contention)
102101
for result_index, candidate_node, bp in behavioral_passes:
102+
slot = await pool.acquire()
103103
try:
104-
bench_result = await self._benchmark_phase(bp, original_code_baseline)
104+
bench_result = await self._benchmark_phase(slot, bp, original_code_baseline)
105105
results[result_index] = (candidate_node, bench_result)
106106
except Exception as exc:
107107
logger.error(f"Benchmark for {candidate_node.candidate.optimization_id} raised: {exc}")
108108
results[result_index] = (candidate_node, Failure(EvalFailure(message=str(exc))))
109109
finally:
110-
await pool.release(bp.slot)
110+
await pool.release(slot)
111111

112112
return results
113113

@@ -123,7 +123,7 @@ async def _behavioral_phase(
123123
results: list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]],
124124
behavioral_passes: list[tuple[int, CandidateNode, _BehavioralPass]],
125125
) -> None:
126-
"""Run behavioral tests for a candidate. On pass, hold the slot for benchmarking."""
126+
"""Run behavioral tests for a candidate. Slot is always released after the test."""
127127
assert self._pool is not None
128128
slot = await self._pool.acquire()
129129
try:
@@ -135,18 +135,22 @@ async def _behavioral_phase(
135135
original_code_baseline=original_code_baseline,
136136
original_helper_code=original_helper_code,
137137
)
138-
except Exception as exc:
138+
except BaseException as exc:
139+
if not isinstance(exc, Exception):
140+
await self._pool.release(slot)
141+
raise
139142
logger.error(f"Candidate {candidate_node.candidate.optimization_id} raised: {exc}")
140143
results[result_index] = (candidate_node, Failure(EvalFailure(message=str(exc))))
141144
await self._pool.release(slot)
142145
return
143146

147+
# Always release slot — Phase 2 re-acquires for benchmarking
148+
await self._pool.release(slot)
149+
144150
if isinstance(outcome, Failure):
145151
results[result_index] = (candidate_node, outcome)
146-
await self._pool.release(slot)
147152
return
148153

149-
# Behavioral pass — hold the slot for Phase 2
150154
behavioral_passes.append((result_index, candidate_node, outcome.unwrap()))
151155

152156
async def _run_behavioral(
@@ -162,10 +166,9 @@ async def _run_behavioral(
162166
opt = self._optimizer
163167
fto = opt.function_to_optimize
164168

165-
async with self._code_replace_lock:
166-
candidate_files = await anyio.to_thread.run_sync(
167-
self._replace_and_capture, opt, code_context, candidate, original_helper_code
168-
)
169+
candidate_files = await anyio.to_thread.run_sync(
170+
self._replace_and_capture, opt, code_context, candidate, original_helper_code
171+
)
169172

170173
if candidate_files is None:
171174
return Failure(EvalFailure(message="Code replacement failed"))
@@ -198,13 +201,14 @@ async def _run_behavioral(
198201
test_env["PYTHONPATH"] = str(worktree_project_root)
199202

200203
from codeflash.code_utils.compat import IS_POSIX, SAFE_SYS_EXECUTABLE
204+
from codeflash.languages.python.test_runner import async_execute_test_subprocess
201205

202206
pytest_cmd_list = opt.language_support.build_pytest_cmd(SAFE_SYS_EXECUTABLE, IS_POSIX) # type: ignore[attr-defined]
203207

204208
blocklisted_plugins = ["benchmark", "codspeed", "xdist", "sugar"]
205209
blocklist_args = [f"-p no:{plugin}" for plugin in blocklisted_plugins]
206210

207-
result_file_path = get_run_tmp_file(Path(f"pytest_results_candidate_{candidate_index}.xml"))
211+
result_file_path = get_run_tmp_file(Path(f"pytest_results_candidate_{candidate_index}_{slot.index}.xml"))
208212
result_args = [f"--junitxml={result_file_path.as_posix()}", "-o", "junit_logging=all"]
209213

210214
pytest_test_env = test_env.copy()
@@ -250,24 +254,32 @@ async def _run_behavioral(
250254

251255
return Success(
252256
_BehavioralPass(
253-
slot=slot,
254257
candidate_index=candidate_index,
255258
perf_test_files=perf_test_files,
256259
test_env=pytest_test_env,
257260
pytest_cmd_list=pytest_cmd_list,
261+
behavior_test_results=behavior_test_results,
258262
)
259263
)
260264

261265
async def _benchmark_phase(
262-
self, bp: _BehavioralPass, original_code_baseline: OriginalCodeBaseline
266+
self, slot: WorktreeSlot, bp: _BehavioralPass, original_code_baseline: OriginalCodeBaseline
263267
) -> Result[OptimizedCandidateResult, EvalFailure]:
264268
"""Run performance benchmarks sequentially for a candidate that passed behavioral tests."""
265269
opt = self._optimizer
266270

271+
# Re-stage the candidate code in the acquired slot
272+
fto = opt.function_to_optimize
273+
for file in opt.test_files.test_files:
274+
if file.benchmarking_file_path and file.benchmarking_file_path.exists():
275+
await slot.write_candidate(
276+
file.benchmarking_file_path, file.benchmarking_file_path.read_text(encoding="utf-8")
277+
)
278+
267279
blocklisted_plugins = ["benchmark", "codspeed", "xdist", "sugar"]
268280
blocklist_args = [f"-p no:{plugin}" for plugin in blocklisted_plugins]
269281

270-
perf_result_file = get_run_tmp_file(Path(f"pytest_perf_candidate_{bp.candidate_index}.xml"))
282+
perf_result_file = get_run_tmp_file(Path(f"pytest_perf_candidate_{bp.candidate_index}_{slot.index}.xml"))
271283
perf_result_args = [f"--junitxml={perf_result_file.as_posix()}", "-o", "junit_logging=all"]
272284

273285
perf_pytest_args = [
@@ -282,8 +294,10 @@ async def _benchmark_phase(
282294

283295
perf_cmd = bp.pytest_cmd_list + perf_pytest_args + blocklist_args + perf_result_args + bp.perf_test_files
284296

297+
from codeflash.languages.python.test_runner import async_execute_test_subprocess
298+
285299
try:
286-
await async_execute_test_subprocess(cmd_list=perf_cmd, cwd=bp.slot.path, env=bp.test_env, timeout=600)
300+
await async_execute_test_subprocess(cmd_list=perf_cmd, cwd=slot.path, env=bp.test_env, timeout=600)
287301
except subprocess.TimeoutExpired:
288302
logger.warning(f"Performance test timeout for candidate {bp.candidate_index}")
289303
return Failure(EvalFailure(message="Performance test timeout"))
@@ -307,7 +321,7 @@ async def _benchmark_phase(
307321
OptimizedCandidateResult(
308322
max_loop_count=loop_count,
309323
best_test_runtime=total_timing,
310-
behavior_test_results=None,
324+
behavior_test_results=bp.behavior_test_results,
311325
benchmarking_test_results=perf_test_results,
312326
replay_benchmarking_test_results=None,
313327
optimization_candidate_index=bp.candidate_index,

0 commit comments

Comments
 (0)