Skip to content

Commit 2d43b88

Browse files
committed
fix: dispatch adaptive optimizations in parallel path + deadlock regression test
- Parallel path now checks if a successful candidate was previously refined (via path_to_root ancestry). If so, dispatches adaptive optimization instead of batch refinement — matching sequential behavior. - Adds regression test: 6 candidates with pool_size=2 all pass, proving no deadlock occurs when passes exceed available slots.
1 parent 31d684b commit 2d43b88

2 files changed

Lines changed: 76 additions & 10 deletions

File tree

codeflash/languages/function_optimizer.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1602,18 +1602,34 @@ def _evaluate_candidates_parallel(
16021602
)
16031603
eval_ctx.valid_optimizations.append(best_optimization)
16041604

1605-
batch_refiner_candidates.append(
1606-
AIServiceBatchRefinerCandidate(
1607-
optimization_id=candidate.optimization_id,
1608-
optimized_source_code=candidate.source_code.markdown,
1609-
optimized_explanation=candidate.explanation,
1610-
optimized_code_runtime=candidate_result.best_test_runtime,
1611-
original_code_runtime=original_code_baseline.runtime,
1612-
speedup=f"{int(perf_gain * 100)}%",
1613-
optimized_line_profiler_results="",
1614-
)
1605+
current_tree_candidates = candidate_node.path_to_root()
1606+
is_candidate_refined_before = any(
1607+
c.source == OptimizedCandidateSource.REFINE for c in current_tree_candidates
16151608
)
16161609

1610+
if is_candidate_refined_before:
1611+
future_adaptive = self.call_adaptive_optimize(
1612+
trace_id=self.get_trace_id(exp_type),
1613+
original_source_code=code_context.read_writable_code.markdown,
1614+
prev_candidates=current_tree_candidates,
1615+
eval_ctx=eval_ctx,
1616+
ai_service_client=ai_service_client,
1617+
)
1618+
if future_adaptive:
1619+
self.future_adaptive_optimizations.append(future_adaptive)
1620+
else:
1621+
batch_refiner_candidates.append(
1622+
AIServiceBatchRefinerCandidate(
1623+
optimization_id=candidate.optimization_id,
1624+
optimized_source_code=candidate.source_code.markdown,
1625+
optimized_explanation=candidate.explanation,
1626+
optimized_code_runtime=candidate_result.best_test_runtime,
1627+
original_code_runtime=original_code_baseline.runtime,
1628+
speedup=f"{int(perf_gain * 100)}%",
1629+
optimized_line_profiler_results="",
1630+
)
1631+
)
1632+
16171633
# Dispatch refinement immediately so CandidateProcessor sees it
16181634
if batch_refiner_candidates:
16191635
self._dispatch_refinement(

tests/test_parallel_evaluator.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,3 +354,53 @@ async def _run() -> list: # type: ignore[type-arg]
354354
assert benchmark_call_count == 3
355355
for _, result in results:
356356
assert is_successful(result)
357+
358+
def test_more_candidates_than_slots_no_deadlock(self, tmp_path: Path) -> None:
359+
"""Regression test: more passing candidates than pool slots must not deadlock."""
360+
from codeflash.optimization.parallel_evaluator import _BehavioralPass
361+
362+
opt = self._make_optimizer_mock(tmp_path)
363+
(tmp_path / "src").mkdir(parents=True)
364+
(tmp_path / "src" / "module.py").write_text("def f(): pass", encoding="utf-8")
365+
366+
nodes = [self._make_candidate_node(f"cand_{i}") for i in range(6)]
367+
evaluator = ParallelCandidateEvaluator(opt, pool_size=2)
368+
369+
repo_root = Path(__file__).resolve().parents[1]
370+
mock_result = MagicMock()
371+
mock_result.best_test_runtime = 2000
372+
mock_behavior_results = MagicMock()
373+
374+
async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg]
375+
return Success(
376+
_BehavioralPass(
377+
candidate_index=0,
378+
perf_test_files=[],
379+
test_env={},
380+
pytest_cmd_list=[],
381+
behavior_test_results=mock_behavior_results,
382+
)
383+
)
384+
385+
async def mock_benchmark(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg]
386+
return Success(mock_result)
387+
388+
async def _run() -> list: # type: ignore[type-arg]
389+
with (
390+
patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root),
391+
patch.object(ParallelCandidateEvaluator, "_run_behavioral", mock_behavioral),
392+
patch.object(ParallelCandidateEvaluator, "_benchmark_phase", mock_benchmark),
393+
):
394+
return await evaluator.evaluate_candidates(
395+
candidates=[(n, i, None) for i, n in enumerate(nodes)],
396+
code_context=MagicMock(),
397+
original_code_baseline=MagicMock(),
398+
original_helper_code={},
399+
file_path_to_helper_classes={},
400+
)
401+
402+
# If this deadlocks, the test will timeout
403+
results = anyio.run(_run)
404+
assert len(results) == 6
405+
for _, result in results:
406+
assert is_successful(result)

0 commit comments

Comments
 (0)