Skip to content

Commit 6e24f44

Browse files
committed
refactor: two-phase evaluation — concurrent correctness, sequential benchmarking
Split _run_in_worktree into two phases: - Phase 1 (concurrent): behavioral tests run in parallel across all candidates via task group. Failed candidates release their worktree slot immediately. - Phase 2 (sequential): only candidates that passed behavioral tests are benchmarked, one at a time, eliminating CPU/memory contention that would skew timing measurements. Candidates that pass Phase 1 hold their worktree slot until after benchmarking completes, avoiding re-setup cost.
1 parent 62ca5b6 commit 6e24f44

2 files changed

Lines changed: 163 additions & 81 deletions

File tree

codeflash/optimization/parallel_evaluator.py

Lines changed: 132 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,24 @@ class EvalFailure:
3636
diffs: list[TestDiff] = dataclasses.field(default_factory=list)
3737

3838

39+
@dataclasses.dataclass(slots=True)
40+
class _BehavioralPass:
41+
"""Intermediate result: candidate passed behavioral tests, ready for benchmarking."""
42+
43+
slot: WorktreeSlot
44+
candidate_index: int
45+
perf_test_files: list[str]
46+
test_env: dict[str, str]
47+
pytest_cmd_list: list[str]
48+
49+
3950
class ParallelCandidateEvaluator:
40-
"""Evaluates optimization candidates in parallel using git worktrees."""
51+
"""Evaluates optimization candidates in parallel using git worktrees.
52+
53+
Two-phase evaluation:
54+
Phase 1 (concurrent): behavioral correctness tests
55+
Phase 2 (sequential): benchmarking — one candidate at a time for accurate timing
56+
"""
4157

4258
def __init__(self, optimizer: FunctionOptimizer, pool_size: int = 4) -> None:
4359
self._optimizer = optimizer
@@ -53,31 +69,49 @@ async def evaluate_candidates(
5369
original_helper_code: dict[Path, str],
5470
file_path_to_helper_classes: dict[Path, set[str]],
5571
) -> list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]]:
56-
"""Evaluate candidates concurrently using worktrees."""
72+
"""Evaluate candidates: behavioral tests concurrently, benchmarks sequentially."""
5773
results: list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]] = [
5874
(node, None) for node, _, _ in candidates
5975
]
6076

61-
if candidates:
62-
async with WorktreePool(pool_size=self._pool_size) as pool:
63-
self._pool = pool
64-
async with anyio.create_task_group() as tg:
65-
for i, (node, idx, _cached) in enumerate(candidates):
66-
tg.start_soon(
67-
self._evaluate_and_store,
68-
i,
69-
node,
70-
idx,
71-
code_context,
72-
original_code_baseline,
73-
original_helper_code,
74-
file_path_to_helper_classes,
75-
results,
76-
)
77+
if not candidates:
78+
return results
79+
80+
async with WorktreePool(pool_size=self._pool_size) as pool:
81+
self._pool = pool
82+
83+
# Phase 1: concurrent behavioral tests
84+
behavioral_passes: list[tuple[int, CandidateNode, _BehavioralPass]] = []
85+
86+
async with anyio.create_task_group() as tg:
87+
for i, (node, idx, _cached) in enumerate(candidates):
88+
tg.start_soon(
89+
self._behavioral_phase,
90+
i,
91+
node,
92+
idx,
93+
code_context,
94+
original_code_baseline,
95+
original_helper_code,
96+
file_path_to_helper_classes,
97+
results,
98+
behavioral_passes,
99+
)
100+
101+
# Phase 2: sequential benchmarking (no CPU contention)
102+
for result_index, candidate_node, bp in behavioral_passes:
103+
try:
104+
bench_result = await self._benchmark_phase(bp, original_code_baseline)
105+
results[result_index] = (candidate_node, bench_result)
106+
except Exception as exc:
107+
logger.error(f"Benchmark for {candidate_node.candidate.optimization_id} raised: {exc}")
108+
results[result_index] = (candidate_node, Failure(EvalFailure(message=str(exc))))
109+
finally:
110+
await pool.release(bp.slot)
77111

78112
return results
79113

80-
async def _evaluate_and_store(
114+
async def _behavioral_phase(
81115
self,
82116
result_index: int,
83117
candidate_node: CandidateNode,
@@ -87,65 +121,44 @@ async def _evaluate_and_store(
87121
original_helper_code: dict[Path, str],
88122
file_path_to_helper_classes: dict[Path, set[str]],
89123
results: list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]],
124+
behavioral_passes: list[tuple[int, CandidateNode, _BehavioralPass]],
90125
) -> None:
91-
"""Evaluate a single candidate and store the result."""
126+
"""Run behavioral tests for a candidate. On pass, hold the slot for benchmarking."""
92127
assert self._pool is not None
93128
slot = await self._pool.acquire()
94129
try:
95-
result = await self._run_in_worktree(
130+
outcome = await self._run_behavioral(
96131
slot=slot,
97132
candidate=candidate_node.candidate,
98133
candidate_index=candidate_index,
99134
code_context=code_context,
100135
original_code_baseline=original_code_baseline,
101136
original_helper_code=original_helper_code,
102-
file_path_to_helper_classes=file_path_to_helper_classes,
103137
)
104-
results[result_index] = (candidate_node, result)
105138
except Exception as exc:
106139
logger.error(f"Candidate {candidate_node.candidate.optimization_id} raised: {exc}")
107140
results[result_index] = (candidate_node, Failure(EvalFailure(message=str(exc))))
108-
finally:
109141
await self._pool.release(slot)
142+
return
110143

111-
@staticmethod
112-
def _replace_and_capture(
113-
opt: FunctionOptimizer,
114-
code_context: CodeOptimizationContext,
115-
candidate: OptimizedCandidate,
116-
original_helper_code: dict[Path, str],
117-
) -> tuple[str, dict[Path, str]] | None:
118-
"""Apply code replacement to main tree, capture the result, restore original."""
119-
fto = opt.function_to_optimize
120-
try:
121-
did_update = opt.replace_function_and_helpers_with_optimized_code(
122-
code_context=code_context,
123-
optimized_code=candidate.source_code,
124-
original_helper_code=original_helper_code,
125-
)
126-
if not did_update:
127-
return None
144+
if isinstance(outcome, Failure):
145+
results[result_index] = (candidate_node, outcome)
146+
await self._pool.release(slot)
147+
return
128148

129-
fto_code = Path(fto.file_path).read_text("utf-8")
130-
helper_codes = {Path(p): Path(p).read_text("utf-8") for p in original_helper_code}
131-
return fto_code, helper_codes
132-
except (ValueError, SyntaxError, AttributeError) as e:
133-
logger.error(f"Code replacement failed: {e}")
134-
return None
135-
finally:
136-
opt.write_code_and_helpers(opt.function_to_optimize_source_code, original_helper_code, fto.file_path)
149+
# Behavioral pass — hold the slot for Phase 2
150+
behavioral_passes.append((result_index, candidate_node, outcome.unwrap()))
137151

138-
async def _run_in_worktree(
152+
async def _run_behavioral(
139153
self,
140154
slot: WorktreeSlot,
141155
candidate: OptimizedCandidate,
142156
candidate_index: int,
143157
code_context: CodeOptimizationContext,
144158
original_code_baseline: OriginalCodeBaseline,
145159
original_helper_code: dict[Path, str],
146-
file_path_to_helper_classes: dict[Path, set[str]],
147-
) -> Result[OptimizedCandidateResult, EvalFailure]:
148-
"""Run behavioral and performance tests for a candidate in a worktree slot."""
160+
) -> Result[_BehavioralPass, EvalFailure]:
161+
"""Run behavioral tests in a worktree. Returns pass info or failure."""
149162
opt = self._optimizer
150163
fto = opt.function_to_optimize
151164

@@ -162,7 +175,7 @@ async def _run_in_worktree(
162175
for module_abspath, helper_code in helper_codes.items():
163176
await slot.write_candidate(module_abspath, helper_code)
164177

165-
# Copy instrumented test files into the worktree (they're runtime-generated, not in git)
178+
# Copy instrumented test files into the worktree
166179
behavior_test_files: list[str] = []
167180
perf_test_files: list[str] = []
168181
for file in opt.test_files.test_files:
@@ -177,8 +190,7 @@ async def _run_in_worktree(
177190
)
178191
perf_test_files.append(str(slot.mirror(file.benchmarking_file_path)))
179192

180-
# Run behavioral tests in the worktree
181-
worktree_cwd = slot.path
193+
# Build test environment and command
182194
test_env = opt.get_test_env(
183195
codeflash_loop_index=0, codeflash_test_iteration=candidate_index, codeflash_tracer_disable=1
184196
)
@@ -189,6 +201,15 @@ async def _run_in_worktree(
189201

190202
pytest_cmd_list = opt.language_support.build_pytest_cmd(SAFE_SYS_EXECUTABLE, IS_POSIX) # type: ignore[attr-defined]
191203

204+
blocklisted_plugins = ["benchmark", "codspeed", "xdist", "sugar"]
205+
blocklist_args = [f"-p no:{plugin}" for plugin in blocklisted_plugins]
206+
207+
result_file_path = get_run_tmp_file(Path(f"pytest_results_candidate_{candidate_index}.xml"))
208+
result_args = [f"--junitxml={result_file_path.as_posix()}", "-o", "junit_logging=all"]
209+
210+
pytest_test_env = test_env.copy()
211+
pytest_test_env["PYTEST_PLUGINS"] = "codeflash.verification.pytest_plugin"
212+
192213
common_pytest_args = [
193214
"--capture=tee-sys",
194215
"-q",
@@ -199,20 +220,11 @@ async def _run_in_worktree(
199220
f"--timeout={INDIVIDUAL_TESTCASE_TIMEOUT}",
200221
]
201222

202-
blocklisted_plugins = ["benchmark", "codspeed", "xdist", "sugar"]
203-
blocklist_args = [f"-p no:{plugin}" for plugin in blocklisted_plugins]
204-
205-
result_file_path = get_run_tmp_file(Path(f"pytest_results_candidate_{candidate_index}.xml"))
206-
result_args = [f"--junitxml={result_file_path.as_posix()}", "-o", "junit_logging=all"]
207-
208-
pytest_test_env = test_env.copy()
209-
pytest_test_env["PYTEST_PLUGINS"] = "codeflash.verification.pytest_plugin"
210-
211223
cmd = pytest_cmd_list + common_pytest_args + blocklist_args + result_args + behavior_test_files
212224

213225
try:
214226
behavior_result = await async_execute_test_subprocess(
215-
cmd_list=cmd, cwd=worktree_cwd, env=pytest_test_env, timeout=600
227+
cmd_list=cmd, cwd=slot.path, env=pytest_test_env, timeout=600
216228
)
217229
except subprocess.TimeoutExpired:
218230
logger.warning(f"Behavioral test timeout for candidate {candidate_index}")
@@ -236,8 +248,26 @@ async def _run_in_worktree(
236248
if not match:
237249
return Failure(EvalFailure(message=f"Behavioral mismatch: {len(diffs)} diffs", diffs=diffs))
238250

239-
# Run performance tests in the worktree
240-
perf_result_file = get_run_tmp_file(Path(f"pytest_perf_candidate_{candidate_index}.xml"))
251+
return Success(
252+
_BehavioralPass(
253+
slot=slot,
254+
candidate_index=candidate_index,
255+
perf_test_files=perf_test_files,
256+
test_env=pytest_test_env,
257+
pytest_cmd_list=pytest_cmd_list,
258+
)
259+
)
260+
261+
async def _benchmark_phase(
262+
self, bp: _BehavioralPass, original_code_baseline: OriginalCodeBaseline
263+
) -> Result[OptimizedCandidateResult, EvalFailure]:
264+
"""Run performance benchmarks sequentially for a candidate that passed behavioral tests."""
265+
opt = self._optimizer
266+
267+
blocklisted_plugins = ["benchmark", "codspeed", "xdist", "sugar"]
268+
blocklist_args = [f"-p no:{plugin}" for plugin in blocklisted_plugins]
269+
270+
perf_result_file = get_run_tmp_file(Path(f"pytest_perf_candidate_{bp.candidate_index}.xml"))
241271
perf_result_args = [f"--junitxml={perf_result_file.as_posix()}", "-o", "junit_logging=all"]
242272

243273
perf_pytest_args = [
@@ -250,14 +280,16 @@ async def _run_in_worktree(
250280
f"--timeout={INDIVIDUAL_TESTCASE_TIMEOUT}",
251281
]
252282

253-
perf_cmd = pytest_cmd_list + perf_pytest_args + blocklist_args + perf_result_args + perf_test_files
283+
perf_cmd = bp.pytest_cmd_list + perf_pytest_args + blocklist_args + perf_result_args + bp.perf_test_files
254284

255285
try:
256-
await async_execute_test_subprocess(cmd_list=perf_cmd, cwd=worktree_cwd, env=pytest_test_env, timeout=600)
286+
await async_execute_test_subprocess(cmd_list=perf_cmd, cwd=bp.slot.path, env=bp.test_env, timeout=600)
257287
except subprocess.TimeoutExpired:
258-
logger.warning(f"Performance test timeout for candidate {candidate_index}")
288+
logger.warning(f"Performance test timeout for candidate {bp.candidate_index}")
259289
return Failure(EvalFailure(message="Performance test timeout"))
260290

291+
from codeflash.verification.parse_test_output import parse_test_xml
292+
261293
perf_test_results = parse_test_xml(perf_result_file, test_files=opt.test_files, test_config=opt.test_cfg)
262294

263295
if not perf_test_results.test_results:
@@ -275,16 +307,43 @@ async def _run_in_worktree(
275307
OptimizedCandidateResult(
276308
max_loop_count=loop_count,
277309
best_test_runtime=total_timing,
278-
behavior_test_results=behavior_test_results,
310+
behavior_test_results=None,
279311
benchmarking_test_results=perf_test_results,
280312
replay_benchmarking_test_results=None,
281-
optimization_candidate_index=candidate_index,
313+
optimization_candidate_index=bp.candidate_index,
282314
total_candidate_timing=total_timing,
283315
async_throughput=None,
284316
concurrency_metrics=None,
285317
)
286318
)
287319

320+
@staticmethod
321+
def _replace_and_capture(
322+
opt: FunctionOptimizer,
323+
code_context: CodeOptimizationContext,
324+
candidate: OptimizedCandidate,
325+
original_helper_code: dict[Path, str],
326+
) -> tuple[str, dict[Path, str]] | None:
327+
"""Apply code replacement to main tree, capture the result, restore original."""
328+
fto = opt.function_to_optimize
329+
try:
330+
did_update = opt.replace_function_and_helpers_with_optimized_code(
331+
code_context=code_context,
332+
optimized_code=candidate.source_code,
333+
original_helper_code=original_helper_code,
334+
)
335+
if not did_update:
336+
return None
337+
338+
fto_code = Path(fto.file_path).read_text("utf-8")
339+
helper_codes = {Path(p): Path(p).read_text("utf-8") for p in original_helper_code}
340+
return fto_code, helper_codes
341+
except (ValueError, SyntaxError, AttributeError) as e:
342+
logger.error(f"Code replacement failed: {e}")
343+
return None
344+
finally:
345+
opt.write_code_and_helpers(opt.function_to_optimize_source_code, original_helper_code, fto.file_path)
346+
288347

289348
def run_parallel_evaluation(
290349
optimizer: FunctionOptimizer,

0 commit comments

Comments
 (0)