Skip to content

Commit d0586fd

Browse files
committed
fix: race conditions and re-staging bug in parallel evaluator
- Add replace_lock to serialize main-tree access in _replace_and_capture - Fix Phase 2 benchmark not writing candidate code to fresh worktree slot - Add _closed flag and ClosedResourceError suppression in pool release - Broaden exception handling and protect finally restore block - Remove unused eval_ctx/exp_type params from run_parallel_evaluation - Add tests for re-staging, partial pool init, restore-on-failure, empty candidates
1 parent 2d43b88 commit d0586fd

4 files changed

Lines changed: 176 additions & 22 deletions

File tree

codeflash/code_utils/worktree_pool.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ def __init__(self, pool_size: int = 4, base_dir: Path | None = None) -> None:
4646
self._send: anyio.abc.ObjectSendStream[WorktreeSlot] | None = None
4747
self._receive: anyio.abc.ObjectReceiveStream[WorktreeSlot] | None = None
4848
self._initialized = False
49+
self._closed = False
4950

5051
async def initialize(self) -> None:
5152
if self._initialized:
@@ -93,10 +94,15 @@ async def acquire(self) -> WorktreeSlot:
9394
return await self._receive.receive()
9495

9596
async def release(self, slot: WorktreeSlot) -> None:
97+
if self._closed:
98+
return
9699
assert self._send is not None
97-
await self._send.send(slot)
100+
with contextlib.suppress(anyio.ClosedResourceError):
101+
await self._send.send(slot)
98102

99103
async def cleanup(self) -> None:
104+
self._closed = True
105+
100106
if self._send is not None:
101107
await self._send.aclose()
102108
if self._receive is not None:

codeflash/languages/function_optimizer.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1541,8 +1541,6 @@ def _evaluate_candidates_parallel(
15411541
original_code_baseline=original_code_baseline,
15421542
original_helper_code=original_helper_code,
15431543
file_path_to_helper_classes=file_path_to_helper_classes,
1544-
eval_ctx=eval_ctx,
1545-
exp_type=exp_type,
15461544
pool_size=pool_size,
15471545
)
15481546

codeflash/optimization/parallel_evaluator.py

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
from codeflash.either import Result
1919
from codeflash.languages.function_optimizer import CandidateNode, FunctionOptimizer
2020
from codeflash.models.models import (
21-
CandidateEvaluationContext,
2221
CodeOptimizationContext,
2322
OptimizedCandidate,
2423
OptimizedCandidateResult,
@@ -45,6 +44,9 @@ class _BehavioralPass:
4544
test_env: dict[str, str]
4645
pytest_cmd_list: list[str]
4746
behavior_test_results: TestResults
47+
fto_code: str
48+
helper_codes: dict[Path, str]
49+
fto_file_path: Path
4850

4951

5052
class ParallelCandidateEvaluator:
@@ -59,6 +61,7 @@ def __init__(self, optimizer: FunctionOptimizer, pool_size: int = 4) -> None:
5961
self._optimizer = optimizer
6062
self._pool_size = pool_size
6163
self._pool: WorktreePool | None = None
64+
self._replace_lock = anyio.Lock()
6265

6366
async def evaluate_candidates(
6467
self,
@@ -92,7 +95,6 @@ async def evaluate_candidates(
9295
code_context,
9396
original_code_baseline,
9497
original_helper_code,
95-
file_path_to_helper_classes,
9698
results,
9799
behavioral_passes,
98100
)
@@ -119,7 +121,6 @@ async def _behavioral_phase(
119121
code_context: CodeOptimizationContext,
120122
original_code_baseline: OriginalCodeBaseline,
121123
original_helper_code: dict[Path, str],
122-
file_path_to_helper_classes: dict[Path, set[str]],
123124
results: list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]],
124125
behavioral_passes: list[tuple[int, CandidateNode, _BehavioralPass]],
125126
) -> None:
@@ -135,16 +136,10 @@ async def _behavioral_phase(
135136
original_code_baseline=original_code_baseline,
136137
original_helper_code=original_helper_code,
137138
)
138-
except BaseException as exc:
139-
if not isinstance(exc, Exception):
140-
await self._pool.release(slot)
141-
raise
142-
logger.error(f"Candidate {candidate_node.candidate.optimization_id} raised: {exc}")
143-
results[result_index] = (candidate_node, Failure(EvalFailure(message=str(exc))))
139+
except BaseException:
144140
await self._pool.release(slot)
145-
return
141+
raise
146142

147-
# Always release slot — Phase 2 re-acquires for benchmarking
148143
await self._pool.release(slot)
149144

150145
if isinstance(outcome, Failure):
@@ -166,9 +161,11 @@ async def _run_behavioral(
166161
opt = self._optimizer
167162
fto = opt.function_to_optimize
168163

169-
candidate_files = await anyio.to_thread.run_sync(
170-
self._replace_and_capture, opt, code_context, candidate, original_helper_code
171-
)
164+
# Serialize main-tree access: replace_and_capture writes/reads/restores shared files
165+
async with self._replace_lock:
166+
candidate_files = await anyio.to_thread.run_sync(
167+
self._replace_and_capture, opt, code_context, candidate, original_helper_code
168+
)
172169

173170
if candidate_files is None:
174171
return Failure(EvalFailure(message="Code replacement failed"))
@@ -259,6 +256,9 @@ async def _run_behavioral(
259256
test_env=pytest_test_env,
260257
pytest_cmd_list=pytest_cmd_list,
261258
behavior_test_results=behavior_test_results,
259+
fto_code=fto_code,
260+
helper_codes=helper_codes,
261+
fto_file_path=Path(fto.file_path),
262262
)
263263
)
264264

@@ -269,7 +269,10 @@ async def _benchmark_phase(
269269
opt = self._optimizer
270270

271271
# Re-stage the candidate code in the acquired slot
272-
fto = opt.function_to_optimize
272+
await slot.write_candidate(bp.fto_file_path, bp.fto_code)
273+
for module_path, code in bp.helper_codes.items():
274+
await slot.write_candidate(module_path, code)
275+
273276
for file in opt.test_files.test_files:
274277
if file.benchmarking_file_path and file.benchmarking_file_path.exists():
275278
await slot.write_candidate(
@@ -352,11 +355,14 @@ def _replace_and_capture(
352355
fto_code = Path(fto.file_path).read_text("utf-8")
353356
helper_codes = {Path(p): Path(p).read_text("utf-8") for p in original_helper_code}
354357
return fto_code, helper_codes
355-
except (ValueError, SyntaxError, AttributeError) as e:
358+
except Exception as e:
356359
logger.error(f"Code replacement failed: {e}")
357360
return None
358361
finally:
359-
opt.write_code_and_helpers(opt.function_to_optimize_source_code, original_helper_code, fto.file_path)
362+
try:
363+
opt.write_code_and_helpers(opt.function_to_optimize_source_code, original_helper_code, fto.file_path)
364+
except Exception as restore_err:
365+
logger.error(f"Failed to restore main tree after code replacement: {restore_err}")
360366

361367

362368
def run_parallel_evaluation(
@@ -366,8 +372,6 @@ def run_parallel_evaluation(
366372
original_code_baseline: OriginalCodeBaseline,
367373
original_helper_code: dict[Path, str],
368374
file_path_to_helper_classes: dict[Path, set[str]],
369-
eval_ctx: CandidateEvaluationContext,
370-
exp_type: str,
371375
pool_size: int = 4,
372376
) -> tuple[list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]], list, list]:
373377
"""Entry point: run parallel candidate evaluation from sync code via anyio.

tests/test_parallel_evaluator.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import subprocess
66
import sys
77
from pathlib import Path
8+
from typing import Any
89
from unittest.mock import MagicMock, patch
910

1011
import anyio
@@ -41,6 +42,40 @@ async def _run() -> None:
4142

4243
anyio.run(_run)
4344

45+
def test_partial_pool_initialization(self, tmp_path: Path) -> None:
46+
"""Pool operates at reduced capacity if some slots fail to create."""
47+
from unittest.mock import patch
48+
49+
from codeflash.code_utils.worktree_pool import WorktreePool
50+
51+
pool_size = 3
52+
base_dir = tmp_path.resolve() / "worktrees"
53+
repo_root = Path(__file__).resolve().parents[1]
54+
55+
call_count = 0
56+
57+
original_create_slot = WorktreePool._create_slot
58+
59+
async def failing_create_slot(self: Any, index: int) -> Any:
60+
nonlocal call_count
61+
call_count += 1
62+
if index == 1:
63+
raise RuntimeError("Simulated git worktree failure")
64+
return await original_create_slot(self, index)
65+
66+
async def _run() -> None:
67+
with (
68+
patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root),
69+
patch.object(WorktreePool, "_create_slot", failing_create_slot),
70+
):
71+
async with WorktreePool(pool_size=pool_size, base_dir=base_dir) as pool:
72+
assert len(pool._slots) == 2
73+
slot = await pool.acquire()
74+
assert slot.index != 1
75+
await pool.release(slot)
76+
77+
anyio.run(_run)
78+
4479
def test_acquire_release_round_trip(self, tmp_path: Path) -> None:
4580
from unittest.mock import patch
4681

@@ -275,6 +310,9 @@ async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) ->
275310
test_env={},
276311
pytest_cmd_list=[],
277312
behavior_test_results=mock_behavior_results,
313+
fto_code="def f(): pass",
314+
helper_codes={},
315+
fto_file_path=Path("/tmp/module.py"),
278316
)
279317
)
280318

@@ -324,6 +362,9 @@ async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) ->
324362
test_env={},
325363
pytest_cmd_list=[],
326364
behavior_test_results=mock_behavior_results,
365+
fto_code="def f(): pass",
366+
helper_codes={},
367+
fto_file_path=Path("/tmp/module.py"),
327368
)
328369
)
329370

@@ -355,6 +396,108 @@ async def _run() -> list: # type: ignore[type-arg]
355396
for _, result in results:
356397
assert is_successful(result)
357398

399+
def test_benchmark_phase_restages_candidate_code(self, tmp_path: Path) -> None:
400+
"""Phase 2 must write fto_code and helper_codes to the slot before running benchmarks."""
401+
from codeflash.optimization.parallel_evaluator import _BehavioralPass
402+
403+
opt = self._make_optimizer_mock(tmp_path)
404+
(tmp_path / "src").mkdir(parents=True)
405+
(tmp_path / "src" / "module.py").write_text("def f(): pass", encoding="utf-8")
406+
407+
node = self._make_candidate_node()
408+
evaluator = ParallelCandidateEvaluator(opt, pool_size=1)
409+
410+
repo_root = Path(__file__).resolve().parents[1]
411+
fto_code = "def f(): return 42 # optimized"
412+
helper_path = tmp_path / "src" / "helpers.py"
413+
helper_codes = {helper_path: "HELPER_CODE = True"}
414+
415+
write_calls: list[tuple[Path, str]] = []
416+
417+
async def tracking_write_candidate(self_slot: object, file_path: Path, code: str) -> None:
418+
write_calls.append((file_path, code))
419+
420+
async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg]
421+
return Success(
422+
_BehavioralPass(
423+
candidate_index=0,
424+
perf_test_files=[],
425+
test_env={"PATH": "/usr/bin"},
426+
pytest_cmd_list=[sys.executable, "-m", "pytest"],
427+
behavior_test_results=MagicMock(),
428+
fto_code=fto_code,
429+
helper_codes=helper_codes,
430+
fto_file_path=Path(opt.function_to_optimize.file_path),
431+
)
432+
)
433+
434+
async def _run() -> list: # type: ignore[type-arg]
435+
with (
436+
patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root),
437+
patch.object(ParallelCandidateEvaluator, "_run_behavioral", mock_behavioral),
438+
patch(
439+
"codeflash.code_utils.worktree_pool.WorktreeSlot.write_candidate", tracking_write_candidate
440+
),
441+
patch(
442+
"codeflash.languages.python.test_runner.async_execute_test_subprocess",
443+
return_value=MagicMock(returncode=0, stdout="", stderr=""),
444+
),
445+
patch(
446+
"codeflash.verification.parse_test_output.parse_test_xml",
447+
return_value=MagicMock(test_results=[MagicMock()], effective_loop_count=lambda: 10, total_passed_runtime=lambda: 5000),
448+
),
449+
):
450+
return await evaluator.evaluate_candidates(
451+
candidates=[(node, 0, None)],
452+
code_context=MagicMock(),
453+
original_code_baseline=MagicMock(),
454+
original_helper_code={},
455+
file_path_to_helper_classes={},
456+
)
457+
458+
anyio.run(_run)
459+
460+
written_codes = {p: c for p, c in write_calls}
461+
assert Path(opt.function_to_optimize.file_path) in written_codes
462+
assert written_codes[Path(opt.function_to_optimize.file_path)] == fto_code
463+
assert helper_path in written_codes
464+
assert written_codes[helper_path] == "HELPER_CODE = True"
465+
466+
def test_empty_candidates_returns_empty(self, tmp_path: Path) -> None:
467+
opt = self._make_optimizer_mock(tmp_path)
468+
evaluator = ParallelCandidateEvaluator(opt, pool_size=1)
469+
repo_root = Path(__file__).resolve().parents[1]
470+
471+
async def _run() -> list: # type: ignore[type-arg]
472+
with patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root):
473+
return await evaluator.evaluate_candidates(
474+
candidates=[],
475+
code_context=MagicMock(),
476+
original_code_baseline=MagicMock(),
477+
original_helper_code={},
478+
file_path_to_helper_classes={},
479+
)
480+
481+
results = anyio.run(_run)
482+
assert results == []
483+
484+
def test_replace_and_capture_restores_on_failure(self, tmp_path: Path) -> None:
485+
"""_replace_and_capture must restore original code even when replacement raises."""
486+
opt = self._make_optimizer_mock(tmp_path)
487+
(tmp_path / "src").mkdir(parents=True)
488+
original_code = "def f(): pass"
489+
(tmp_path / "src" / "module.py").write_text(original_code, encoding="utf-8")
490+
491+
opt.replace_function_and_helpers_with_optimized_code.side_effect = ValueError("bad code")
492+
493+
result = ParallelCandidateEvaluator._replace_and_capture(
494+
opt, MagicMock(), MagicMock(), {}
495+
)
496+
assert result is None
497+
opt.write_code_and_helpers.assert_called_once_with(
498+
opt.function_to_optimize_source_code, {}, opt.function_to_optimize.file_path
499+
)
500+
358501
def test_more_candidates_than_slots_no_deadlock(self, tmp_path: Path) -> None:
359502
"""Regression test: more passing candidates than pool slots must not deadlock."""
360503
from codeflash.optimization.parallel_evaluator import _BehavioralPass
@@ -379,6 +522,9 @@ async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) ->
379522
test_env={},
380523
pytest_cmd_list=[],
381524
behavior_test_results=mock_behavior_results,
525+
fto_code="def f(): pass",
526+
helper_codes={},
527+
fto_file_path=Path("/tmp/module.py"),
382528
)
383529
)
384530

0 commit comments

Comments
 (0)