Skip to content

Commit 97f0965

Browse files
committed
fix: simplify parallel evaluator, add line profiler and immediate refinement dispatch
Move refinement/repair dispatch from lazy carry-over pattern to immediate ThreadPoolExecutor submission in function_optimizer.py. Add _run_line_profiler_for_winner() that runs real line profiling on the winning candidate after selection. Remove AsyncAiServiceClient dependency and dead code (PendingRefinement, _dispatch_refinements, etc.) from parallel_evaluator.py — it now only handles worktree-based evaluation.
1 parent ad64b14 commit 97f0965

2 files changed

Lines changed: 156 additions & 275 deletions

File tree

codeflash/languages/function_optimizer.py

Lines changed: 133 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,6 @@
133133
TestDiff,
134134
TestFileReview,
135135
)
136-
from codeflash.optimization.parallel_evaluator import PendingRepair
137136
from codeflash.verification.verification_utils import TestConfig
138137

139138

@@ -1021,6 +1020,34 @@ def handle_successful_candidate(
10211020

10221021
return best_optimization, benchmark_tree
10231022

1023+
def _run_line_profiler_for_winner(
1024+
self,
1025+
best_optimization: BestOptimization,
1026+
code_context: CodeOptimizationContext,
1027+
original_helper_code: dict[Path, str],
1028+
eval_ctx: CandidateEvaluationContext,
1029+
) -> BestOptimization:
1030+
"""Run line profiler on the winning candidate from parallel evaluation."""
1031+
try:
1032+
self.replace_function_and_helpers_with_optimized_code(
1033+
code_context=code_context,
1034+
optimized_code=best_optimization.candidate.source_code,
1035+
original_helper_code=original_helper_code,
1036+
)
1037+
with progress_bar("Running line-by-line profiling"):
1038+
lp_results = self.line_profiler_step(
1039+
code_context=code_context, original_helper_code=original_helper_code, candidate_index=0
1040+
)
1041+
eval_ctx.record_line_profiler_result(best_optimization.candidate.optimization_id, lp_results["str_out"])
1042+
best_optimization.line_profiler_test_results = lp_results
1043+
except (ValueError, SyntaxError, AttributeError) as e:
1044+
logger.warning(f"Line profiler failed for winning candidate: {e}")
1045+
finally:
1046+
self.write_code_and_helpers(
1047+
self.function_to_optimize_source_code, original_helper_code, self.function_to_optimize.file_path
1048+
)
1049+
return best_optimization
1050+
10241051
def select_best_optimization(
10251052
self,
10261053
eval_ctx: CandidateEvaluationContext,
@@ -1439,6 +1466,11 @@ def determine_best_candidate(
14391466
)
14401467

14411468
if best_optimization:
1469+
if parallel_pool_size > 1:
1470+
best_optimization = self._run_line_profiler_for_winner(
1471+
best_optimization, code_context, original_helper_code, eval_ctx
1472+
)
1473+
14421474
self.log_evaluation_results(
14431475
eval_ctx=eval_ctx,
14441476
best_optimization=best_optimization,
@@ -1463,18 +1495,14 @@ def _evaluate_candidates_parallel(
14631495
pool_size: int,
14641496
) -> None:
14651497
"""Evaluate candidates in parallel using git worktrees and async subprocess execution."""
1466-
from codeflash.optimization.parallel_evaluator import (
1467-
PendingBatchRefinement,
1468-
PendingRefinement,
1469-
run_parallel_evaluation,
1470-
)
1498+
from codeflash.optimization.parallel_evaluator import run_parallel_evaluation
1499+
1500+
ai_service_client = self.aiservice_client if exp_type == "EXP0" else self.local_aiservice_client
1501+
assert ai_service_client is not None
14711502

14721503
candidate_index = 0
1473-
carry_refinements: list[PendingRefinement | PendingBatchRefinement] = []
1474-
carry_repairs: list[PendingRepair] = []
14751504

14761505
while not processor.is_done():
1477-
# Drain available candidates from the queue into a batch
14781506
batch: list[tuple[CandidateNode, int, str | None]] = []
14791507
while len(batch) < pool_size:
14801508
candidate_node = processor.get_next_candidate()
@@ -1483,7 +1511,6 @@ def _evaluate_candidates_parallel(
14831511
candidate_index += 1
14841512
cached = processor.normalized_cache.get(candidate_node.candidate.optimization_id)
14851513

1486-
# Pre-filter: skip duplicates and identical-to-original
14871514
normalized_code = cached or self.language_support.normalize_code(
14881515
candidate_node.candidate.source_code.flat.strip()
14891516
)
@@ -1502,13 +1529,12 @@ def _evaluate_candidates_parallel(
15021529
)
15031530
batch.append((candidate_node, candidate_index, cached))
15041531

1505-
if not batch and not carry_refinements and not carry_repairs:
1532+
if not batch:
15061533
break
15071534

1508-
if batch:
1509-
logger.info(f"Evaluating batch of {len(batch)} candidates in parallel…")
1535+
logger.info(f"Evaluating batch of {len(batch)} candidates in parallel…")
15101536

1511-
results, refinement_futures, repair_futures = run_parallel_evaluation(
1537+
results, _, _ = run_parallel_evaluation(
15121538
optimizer=self,
15131539
candidates=batch,
15141540
code_context=code_context,
@@ -1518,29 +1544,21 @@ def _evaluate_candidates_parallel(
15181544
eval_ctx=eval_ctx,
15191545
exp_type=exp_type,
15201546
pool_size=pool_size,
1521-
pending_refinements=carry_refinements if carry_refinements else None,
1522-
pending_repairs=carry_repairs if carry_repairs else None,
15231547
)
15241548

1525-
# Append resolved futures from the async dispatch
1526-
self.future_all_refinements.extend(refinement_futures)
1527-
self.future_all_code_repair.extend(repair_futures)
1528-
carry_refinements = []
1529-
carry_repairs = []
1530-
1531-
# Process results: build refinement/repair requests for the next async pass
1549+
# Process results and dispatch refinement/repair futures immediately
15321550
batch_refiner_candidates: list[AIServiceBatchRefinerCandidate] = []
15331551
for (candidate_node, _idx, _), (_, run_result) in zip(batch, results):
15341552
candidate = candidate_node.candidate
15351553

15361554
if run_result is None or not is_successful(run_result):
15371555
eval_ctx.record_failed_candidate(candidate.optimization_id)
1538-
if run_result is not None and hasattr(run_result, "error"):
1539-
repair_pending = self._build_repair_request_if_possible(
1540-
candidate, [], eval_ctx, code_context, 0, exp_type
1556+
if run_result is not None and isinstance(run_result, Failure):
1557+
repair_future = self._dispatch_repair_if_possible(
1558+
candidate, eval_ctx, code_context, exp_type, ai_service_client
15411559
)
1542-
if repair_pending is not None:
1543-
carry_repairs.append(repair_pending)
1560+
if repair_future is not None:
1561+
self.future_all_code_repair.append(repair_future)
15441562
continue
15451563

15461564
candidate_result = run_result.unwrap()
@@ -1563,7 +1581,6 @@ def _evaluate_candidates_parallel(
15631581
) and quantity_of_tests_critic(candidate_result)
15641582

15651583
if is_successful_opt:
1566-
# Defer line profiling — use empty placeholder for now
15671584
empty_lp = {"timings": {}, "unit": 0, "str_out": ""}
15681585
best_optimization = BestOptimization(
15691586
candidate=candidate,
@@ -1591,67 +1608,96 @@ def _evaluate_candidates_parallel(
15911608
)
15921609
)
15931610

1594-
# Build pending refinement for the next batch's async boundary
1611+
# Dispatch refinement immediately so CandidateProcessor sees it
15951612
if batch_refiner_candidates:
1596-
if len(batch_refiner_candidates) > 1:
1597-
carry_refinements.append(
1598-
PendingBatchRefinement(
1599-
original_source_code=code_context.read_writable_code.markdown,
1600-
read_only_dependency_code=code_context.read_only_context_code,
1601-
original_line_profiler_results=original_code_baseline.line_profile_results["str_out"],
1602-
trace_id=self.get_trace_id(exp_type),
1603-
language=self.function_to_optimize.language,
1604-
language_version=self.language_support.language_version,
1605-
function_references=function_references,
1606-
candidates=batch_refiner_candidates,
1607-
rerun_trace_id=self.rerun_trace_id,
1608-
)
1609-
)
1610-
else:
1611-
c = batch_refiner_candidates[0]
1612-
carry_refinements.append(
1613-
PendingRefinement(
1614-
request=[
1615-
AIServiceRefinerRequest(
1616-
optimization_id=c.optimization_id,
1617-
original_source_code=code_context.read_writable_code.markdown,
1618-
read_only_dependency_code=code_context.read_only_context_code,
1619-
original_code_runtime=c.original_code_runtime,
1620-
optimized_source_code=c.optimized_source_code,
1621-
optimized_explanation=c.optimized_explanation,
1622-
optimized_code_runtime=c.optimized_code_runtime,
1623-
speedup=c.speedup,
1624-
trace_id=self.get_trace_id(exp_type),
1625-
original_line_profiler_results=original_code_baseline.line_profile_results[
1626-
"str_out"
1627-
],
1628-
optimized_line_profiler_results=c.optimized_line_profiler_results,
1629-
function_references=function_references,
1630-
language=self.function_to_optimize.language,
1631-
language_version=self.language_support.language_version,
1632-
)
1633-
],
1634-
rerun_trace_id=self.rerun_trace_id,
1635-
)
1636-
)
1613+
self._dispatch_refinement(
1614+
batch_refiner_candidates,
1615+
code_context,
1616+
original_code_baseline,
1617+
exp_type,
1618+
function_references,
1619+
ai_service_client,
1620+
)
16371621

1638-
# Flush any remaining pending requests in a final async pass
1639-
if carry_refinements or carry_repairs:
1640-
_, final_ref_futures, final_rep_futures = run_parallel_evaluation(
1641-
optimizer=self,
1642-
candidates=[],
1643-
code_context=code_context,
1644-
original_code_baseline=original_code_baseline,
1645-
original_helper_code=original_helper_code,
1646-
file_path_to_helper_classes=file_path_to_helper_classes,
1647-
eval_ctx=eval_ctx,
1648-
exp_type=exp_type,
1649-
pool_size=pool_size,
1650-
pending_refinements=carry_refinements,
1651-
pending_repairs=carry_repairs,
1622+
def _dispatch_refinement(
1623+
self,
1624+
batch_refiner_candidates: list[AIServiceBatchRefinerCandidate],
1625+
code_context: CodeOptimizationContext,
1626+
original_code_baseline: OriginalCodeBaseline,
1627+
exp_type: str,
1628+
function_references: str,
1629+
ai_service_client: AiServiceClient,
1630+
) -> None:
1631+
"""Submit refinement request to thread pool so CandidateProcessor can consume results."""
1632+
if len(batch_refiner_candidates) > 1:
1633+
future = self.executor.submit(
1634+
ai_service_client.optimize_code_refinement_batch,
1635+
original_source_code=code_context.read_writable_code.markdown,
1636+
read_only_dependency_code=code_context.read_only_context_code,
1637+
original_line_profiler_results=original_code_baseline.line_profile_results["str_out"],
1638+
trace_id=self.get_trace_id(exp_type),
1639+
language=self.function_to_optimize.language,
1640+
language_version=self.language_support.language_version,
1641+
function_references=function_references,
1642+
candidates=batch_refiner_candidates,
1643+
rerun_trace_id=self.rerun_trace_id,
16521644
)
1653-
self.future_all_refinements.extend(final_ref_futures)
1654-
self.future_all_code_repair.extend(final_rep_futures)
1645+
else:
1646+
c = batch_refiner_candidates[0]
1647+
future = self.executor.submit(
1648+
ai_service_client.optimize_code_refinement,
1649+
request=[
1650+
AIServiceRefinerRequest(
1651+
optimization_id=c.optimization_id,
1652+
original_source_code=code_context.read_writable_code.markdown,
1653+
read_only_dependency_code=code_context.read_only_context_code,
1654+
original_code_runtime=c.original_code_runtime,
1655+
optimized_source_code=c.optimized_source_code,
1656+
optimized_explanation=c.optimized_explanation,
1657+
optimized_code_runtime=c.optimized_code_runtime,
1658+
speedup=c.speedup,
1659+
trace_id=self.get_trace_id(exp_type),
1660+
original_line_profiler_results=original_code_baseline.line_profile_results["str_out"],
1661+
optimized_line_profiler_results=c.optimized_line_profiler_results,
1662+
function_references=function_references,
1663+
language=self.function_to_optimize.language,
1664+
language_version=self.language_support.language_version,
1665+
)
1666+
],
1667+
rerun_trace_id=self.rerun_trace_id,
1668+
)
1669+
self.future_all_refinements.append(future)
1670+
1671+
def _dispatch_repair_if_possible(
1672+
self,
1673+
candidate: OptimizedCandidate,
1674+
eval_ctx: CandidateEvaluationContext,
1675+
code_context: CodeOptimizationContext,
1676+
exp_type: str,
1677+
ai_service_client: AiServiceClient,
1678+
) -> concurrent.futures.Future | None:
1679+
"""Submit a code repair request if the candidate is eligible."""
1680+
max_repairs = get_effort_value(EffortKeys.MAX_CODE_REPAIRS_PER_TRACE, self.effort)
1681+
if self.repair_counter >= max_repairs:
1682+
return None
1683+
1684+
successful_candidates_count = sum(1 for is_correct in eval_ctx.is_correct.values() if is_correct)
1685+
if successful_candidates_count >= MIN_CORRECT_CANDIDATES:
1686+
return None
1687+
1688+
if candidate.source not in (OptimizedCandidateSource.OPTIMIZE, OptimizedCandidateSource.OPTIMIZE_LP):
1689+
return None
1690+
1691+
self.repair_counter += 1
1692+
request = AIServiceCodeRepairRequest(
1693+
optimization_id=candidate.optimization_id,
1694+
original_source_code=code_context.read_writable_code.markdown,
1695+
modified_source_code=candidate.source_code.markdown,
1696+
test_diffs=[],
1697+
trace_id=self.function_trace_id[:-4] + exp_type if self.experiment_id else self.function_trace_id,
1698+
language=self.function_to_optimize.language,
1699+
)
1700+
return self.executor.submit(ai_service_client.code_repair, request=request, rerun_trace_id=self.rerun_trace_id)
16551701

16561702
def call_adaptive_optimize(
16571703
self,
@@ -3181,45 +3227,6 @@ def repair_if_possible(
31813227
)
31823228
)
31833229

3184-
def _build_repair_request_if_possible(
3185-
self,
3186-
candidate: OptimizedCandidate,
3187-
diffs: list[TestDiff],
3188-
eval_ctx: CandidateEvaluationContext,
3189-
code_context: CodeOptimizationContext,
3190-
test_results_count: int,
3191-
exp_type: str,
3192-
) -> PendingRepair | None:
3193-
"""Like repair_if_possible but returns a PendingRepair for async dispatch instead of submitting directly."""
3194-
from codeflash.optimization.parallel_evaluator import PendingRepair
3195-
3196-
max_repairs = get_effort_value(EffortKeys.MAX_CODE_REPAIRS_PER_TRACE, self.effort)
3197-
if self.repair_counter >= max_repairs:
3198-
return None
3199-
3200-
successful_candidates_count = sum(1 for is_correct in eval_ctx.is_correct.values() if is_correct)
3201-
if successful_candidates_count >= MIN_CORRECT_CANDIDATES:
3202-
return None
3203-
3204-
if candidate.source not in (OptimizedCandidateSource.OPTIMIZE, OptimizedCandidateSource.OPTIMIZE_LP):
3205-
return None
3206-
if not diffs:
3207-
return None
3208-
result_unmatched_perc = len(diffs) / test_results_count if test_results_count > 0 else 1.0
3209-
if result_unmatched_perc > get_effort_value(EffortKeys.REPAIR_UNMATCHED_PERCENTAGE_LIMIT, self.effort):
3210-
return None
3211-
3212-
self.repair_counter += 1
3213-
request = AIServiceCodeRepairRequest(
3214-
optimization_id=candidate.optimization_id,
3215-
original_source_code=code_context.read_writable_code.markdown,
3216-
modified_source_code=candidate.source_code.markdown,
3217-
test_diffs=diffs,
3218-
trace_id=self.function_trace_id[:-4] + exp_type if self.experiment_id else self.function_trace_id,
3219-
language=self.function_to_optimize.language,
3220-
)
3221-
return PendingRepair(request=request, rerun_trace_id=self.rerun_trace_id)
3222-
32233230
def run_optimized_candidate(
32243231
self,
32253232
*,

0 commit comments

Comments
 (0)