diff --git a/.gitignore b/.gitignore index 2d020d366..e4ef2b4fe 100644 --- a/.gitignore +++ b/.gitignore @@ -429,3 +429,4 @@ code_to_optimize/**/package-lock.json # Other tools .codeflash/ +.codeflash_eval_worktrees/ diff --git a/codeflash/api/aiservice.py b/codeflash/api/aiservice.py index 3127649f2..95846be75 100644 --- a/codeflash/api/aiservice.py +++ b/codeflash/api/aiservice.py @@ -22,9 +22,9 @@ FunctionRepairInfo, OptimizationReviewResult, OptimizedCandidate, - OptimizedCandidateSource, TestFileReview, ) +from codeflash.models.shared_types import OptimizedCandidateSource from codeflash.telemetry.posthog_cf import ph from codeflash.version import __version__ as codeflash_version @@ -35,6 +35,7 @@ from codeflash.models.ExperimentMetadata import ExperimentMetadata from codeflash.models.models import ( AIServiceAdaptiveOptimizeRequest, + AIServiceBatchRefinerCandidate, AIServiceCodeRepairRequest, AIServiceRefinerRequest, ) @@ -384,6 +385,97 @@ def optimize_code_refinement( console.rule() return [] + def optimize_code_refinement_batch( + self, + *, + original_source_code: str, + read_only_dependency_code: str, + original_line_profiler_results: str, + trace_id: str, + language: str, + language_version: str | None, + function_references: str | None, + candidates: list[AIServiceBatchRefinerCandidate], + rerun_trace_id: str | None = None, + ) -> list[OptimizedCandidate]: + shared_context: dict[str, Any] = { + "original_source_code": original_source_code, + "read_only_dependency_code": read_only_dependency_code, + "original_line_profiler_results": original_line_profiler_results, + "trace_id": trace_id, + "language": language, + "function_references": function_references, + "rerun_trace_id": rerun_trace_id, + } + self.add_language_metadata(shared_context, language_version) + + candidate_payloads: list[dict[str, Any]] = [] + for c in candidates: + candidate_payloads.append( + { + "optimization_id": c.optimization_id, + "optimized_source_code": c.optimized_source_code, + "optimized_explanation": c.optimized_explanation, + "optimized_code_runtime": humanize_runtime(c.optimized_code_runtime), + "original_code_runtime": humanize_runtime(c.original_code_runtime), + "speedup": c.speedup, + "optimized_line_profiler_results": c.optimized_line_profiler_results, + "call_sequence": self.get_next_sequence(), + } + ) + + payload: dict[str, Any] = {"shared_context": shared_context, "candidates": candidate_payloads} + + try: + response = self.make_ai_service_request("/batch_refinement", payload=payload, timeout=self.timeout) + except requests.exceptions.RequestException as e: + logger.exception(f"Error generating batch optimization refinements: {e}") + ph("cli-optimize-error-caught", {"error": str(e)}) + return [] + + if response.status_code == 404: + return self._fallback_to_sequential_refinement( + shared_context=shared_context, candidates=candidates, rerun_trace_id=rerun_trace_id + ) + + if response.status_code == 200: + refined_optimizations = response.json()["refinements"] + return self._get_valid_candidates(refined_optimizations, OptimizedCandidateSource.REFINE, language=language) + + self.log_error_response(response, "generating batch optimized candidates", "cli-optimize-error-response") + console.rule() + return [] + + def _fallback_to_sequential_refinement( + self, + *, + shared_context: dict[str, Any], + candidates: list[AIServiceBatchRefinerCandidate], + rerun_trace_id: str | None, + ) -> list[OptimizedCandidate]: + from codeflash.models.models import AIServiceRefinerRequest + + requests_list = [ + AIServiceRefinerRequest( + optimization_id=c.optimization_id, + original_source_code=shared_context["original_source_code"], + read_only_dependency_code=shared_context["read_only_dependency_code"], + original_code_runtime=c.original_code_runtime, + optimized_source_code=c.optimized_source_code, + optimized_explanation=c.optimized_explanation, + optimized_code_runtime=c.optimized_code_runtime, + speedup=c.speedup, + trace_id=shared_context["trace_id"], + original_line_profiler_results=shared_context["original_line_profiler_results"], + optimized_line_profiler_results=c.optimized_line_profiler_results, + function_references=shared_context.get("function_references"), + language=shared_context["language"], + language_version=shared_context.get("language_version"), + ) + for c in candidates + ] + return self.optimize_code_refinement(requests_list, rerun_trace_id=rerun_trace_id) + def code_repair( self, request: AIServiceCodeRepairRequest, rerun_trace_id: str | None = None ) -> OptimizedCandidate | None: diff --git a/codeflash/cli_cmds/cli.py b/codeflash/cli_cmds/cli.py index 2db13efe8..b40777cf3 100644 --- a/codeflash/cli_cmds/cli.py +++ b/codeflash/cli_cmds/cli.py @@ -500,6 +500,13 @@ def _build_parser() -> ArgumentParser: ) parser.add_argument("--no-draft", default=False, action="store_true", help="Skip optimization for draft PRs") parser.add_argument("--worktree", default=False, action="store_true", help="Use worktree for optimization") + parser.add_argument( + "--parallel-candidates", + type=int, + default=0, + metavar="N", + help="Evaluate up to N optimization candidates in parallel using git worktrees (0 = sequential)", + ) parser.add_argument( "--testgen-review", default=False, action="store_true", help="Enable AI review and repair of generated tests" ) diff --git a/codeflash/code_utils/worktree_pool.py b/codeflash/code_utils/worktree_pool.py new file mode 100644 index 000000000..f71eba5d9 --- /dev/null +++ b/codeflash/code_utils/worktree_pool.py @@ -0,0 +1,158 @@ +from __future__ import annotations + +import contextlib +import functools +import shutil +import stat +import sys +from pathlib import Path +from typing import TYPE_CHECKING, Any + +import anyio + +if TYPE_CHECKING: + from collections.abc import Callable + from typing import Self + +from codeflash.cli_cmds.console import logger +from codeflash.code_utils.git_utils import git_root_dir, mirror_path + +_USE_ONEXC = sys.version_info >= (3, 12) + + +class WorktreeSlot: + __slots__ = ("_git_root", "index", "path") + + def __init__(self, path: Path, index: int, git_root: Path) -> None: + self.path = path + self.index = index + self._git_root = git_root + + def mirror(self, original_path: Path) -> Path: + return mirror_path(original_path, self._git_root, self.path) + + async def write_candidate(self, file_path: Path, code: str) -> None: + mirrored = anyio.Path(self.mirror(file_path)) + await mirrored.parent.mkdir(parents=True, exist_ok=True) + await mirrored.write_text(code, encoding="utf-8") + + +class WorktreePool: + def __init__(self, pool_size: int = 4, base_dir: Path | None = None) -> None: + self._pool_size = pool_size + self._git_root = git_root_dir() + self._base_dir = base_dir or (self._git_root / ".codeflash_eval_worktrees") + self._slots: list[WorktreeSlot] = [] + self._send: anyio.abc.ObjectSendStream[WorktreeSlot] | None = None + self._receive: anyio.abc.ObjectReceiveStream[WorktreeSlot] | None = None + self._initialized = False + self._closed = False + + async def initialize(self) -> None: + if self._initialized: + return + await anyio.Path(self._base_dir).mkdir(parents=True, exist_ok=True) + + results: list[WorktreeSlot | None] = [None] * self._pool_size + async with anyio.create_task_group() as tg: + for i in range(self._pool_size): + tg.start_soon(self._create_slot_task, i, results) + + self._slots = [s for s in results if s is not None] + if not self._slots: + msg = "Failed to create any worktree slots" + raise RuntimeError(msg) + + self._send, self._receive = anyio.create_memory_object_stream[WorktreeSlot](len(self._slots)) + for slot in self._slots: + await self._send.send(slot) + self._initialized = True + logger.debug(f"WorktreePool initialized with {len(self._slots)} slots at {self._base_dir}") + + async def _create_slot_task(self, index: int, results: list[WorktreeSlot | None]) -> None: + try: + results[index] = await self._create_slot(index) + except Exception as exc: + logger.warning(f"Failed to create worktree slot {index}: {exc}") + + async def _create_slot(self, index: int) -> WorktreeSlot: + slot_dir = self._base_dir / f"slot-{index}" + if await anyio.Path(slot_dir).exists(): + await anyio.to_thread.run_sync(functools.partial(_rmtree_safe, slot_dir)) + + result = await anyio.run_process( + ["git", "-C", str(self._git_root), "worktree", "add", "--detach", str(slot_dir), "HEAD"], check=False + ) + if result.returncode != 0: + msg = f"git worktree add failed for slot {index}: {result.stderr.decode()}" + raise RuntimeError(msg) + + return WorktreeSlot(slot_dir, index, self._git_root) + + async def acquire(self) -> WorktreeSlot: + assert self._receive is not None + return await self._receive.receive() + + async def release(self, slot: WorktreeSlot) -> None: + if self._closed: + return + assert self._send is not None + with contextlib.suppress(anyio.ClosedResourceError): + await self._send.send(slot) + + async def cleanup(self) -> None: + self._closed = True + + if self._send is not None: + await self._send.aclose() + if self._receive is not None: + await self._receive.aclose() + + for slot in self._slots: + try: + await self._remove_slot_async(slot) + except Exception as exc: + logger.warning(f"Failed to remove worktree slot {slot.index}: {exc}") + + self._slots.clear() + self._initialized = False + + if await anyio.Path(self._base_dir).exists(): + with contextlib.suppress(Exception): + await anyio.run_process(["git", "-C", str(self._git_root), "worktree", "prune"], check=False) + with contextlib.suppress(OSError): + await anyio.Path(self._base_dir).rmdir() + + async def _remove_slot_async(self, slot: WorktreeSlot) -> None: + if await anyio.Path(slot.path).exists(): + await anyio.to_thread.run_sync(functools.partial(_rmtree_safe, slot.path)) + + async def __aenter__(self) -> Self: + await self.initialize() + return self + + async def __aexit__(self, *exc: object) -> None: + await self.cleanup() + + +def _rmtree_safe(path: Path) -> None: + if _USE_ONEXC: + shutil.rmtree(path, onexc=_handle_remove_readonly_onexc) + else: + shutil.rmtree(path, onerror=_handle_remove_readonly_onerror) + + +def _handle_remove_readonly_onexc(func: Callable[..., Any], path: str, exc: BaseException) -> None: + if isinstance(exc, PermissionError): + Path(path).chmod(stat.S_IWUSR | stat.S_IRUSR | stat.S_IXUSR) + func(path) + else: + raise exc + + +def _handle_remove_readonly_onerror(func: Callable[..., Any], path: str, exc_info: tuple[Any, ...]) -> None: + if isinstance(exc_info[1], PermissionError): + Path(path).chmod(stat.S_IWUSR | stat.S_IRUSR | stat.S_IXUSR) + func(path) + else: + raise exc_info[1] diff --git a/codeflash/languages/function_optimizer.py b/codeflash/languages/function_optimizer.py index 71ad03b18..384ad6712 100644 --- a/codeflash/languages/function_optimizer.py +++ b/codeflash/languages/function_optimizer.py @@ -1,3 +1,4 @@ +# mypy: ignore-errors from __future__ import annotations import concurrent.futures @@ -77,6 +78,7 @@ from codeflash.models.models import ( AdaptiveOptimizedCandidate, AIServiceAdaptiveOptimizeRequest, + AIServiceBatchRefinerCandidate, AIServiceCodeRepairRequest, BestOptimization, CandidateEvaluationContext, @@ -1018,6 +1020,34 @@ def handle_successful_candidate( return best_optimization, benchmark_tree + def _run_line_profiler_for_winner( + self, + best_optimization: BestOptimization, + code_context: CodeOptimizationContext, + original_helper_code: dict[Path, str], + eval_ctx: CandidateEvaluationContext, + ) -> BestOptimization: + """Run line profiler on the winning candidate from parallel evaluation.""" + try: + self.replace_function_and_helpers_with_optimized_code( + code_context=code_context, + optimized_code=best_optimization.candidate.source_code, + original_helper_code=original_helper_code, + ) + with progress_bar("Running line-by-line profiling"): + lp_results = self.line_profiler_step( + code_context=code_context, original_helper_code=original_helper_code, candidate_index=0 + ) + eval_ctx.record_line_profiler_result(best_optimization.candidate.optimization_id, lp_results["str_out"]) + best_optimization.line_profiler_test_results = lp_results + except (ValueError, SyntaxError, AttributeError, Exception) as e: + logger.warning(f"Line profiler failed for winning candidate: {e}") + finally: + self.write_code_and_helpers( + self.function_to_optimize_source_code, original_helper_code, self.function_to_optimize.file_path + ) + return best_optimization + def select_best_optimization( self, eval_ctx: CandidateEvaluationContext, @@ -1378,37 +1408,52 @@ def determine_best_candidate( original_flat_code=code_context.read_writable_code.flat, ) candidate_index = 0 + parallel_pool_size = getattr(self.args, "parallel_candidates", 0) - # Process candidates using queue-based approach - while not processor.is_done(): - candidate_node = processor.get_next_candidate() - if candidate_node is None: - logger.debug("everything done, exiting") - break + if parallel_pool_size > 1: + self._evaluate_candidates_parallel( + processor=processor, + code_context=code_context, + original_code_baseline=original_code_baseline, + original_helper_code=original_helper_code, + file_path_to_helper_classes=file_path_to_helper_classes, + eval_ctx=eval_ctx, + exp_type=exp_type, + function_references=function_references, + normalized_original=normalized_original, + pool_size=parallel_pool_size, + ) + else: + # Process candidates using queue-based approach (sequential) + while not processor.is_done(): + candidate_node = processor.get_next_candidate() + if candidate_node is None: + logger.debug("everything done, exiting") + break - try: - candidate_index += 1 - self.process_single_candidate( - candidate_node=candidate_node, - candidate_index=candidate_index, - total_candidates=processor.candidate_len, - code_context=code_context, - original_code_baseline=original_code_baseline, - original_helper_code=original_helper_code, - file_path_to_helper_classes=file_path_to_helper_classes, - eval_ctx=eval_ctx, - exp_type=exp_type, - function_references=function_references, - normalized_original=normalized_original, - cached_normalized_code=processor.normalized_cache.get(candidate_node.candidate.optimization_id), - ) - except KeyboardInterrupt as e: - logger.exception(f"Optimization interrupted: {e}") - raise - finally: - self.write_code_and_helpers( - self.function_to_optimize_source_code, original_helper_code, self.function_to_optimize.file_path - ) + try: + candidate_index += 1 + self.process_single_candidate( + candidate_node=candidate_node, + candidate_index=candidate_index, + total_candidates=processor.candidate_len, + code_context=code_context, + original_code_baseline=original_code_baseline, + original_helper_code=original_helper_code, + file_path_to_helper_classes=file_path_to_helper_classes, + eval_ctx=eval_ctx, + exp_type=exp_type, + function_references=function_references, + normalized_original=normalized_original, + cached_normalized_code=processor.normalized_cache.get(candidate_node.candidate.optimization_id), + ) + except KeyboardInterrupt as e: + logger.exception(f"Optimization interrupted: {e}") + raise + finally: + self.write_code_and_helpers( + self.function_to_optimize_source_code, original_helper_code, self.function_to_optimize.file_path + ) # Select and return the best optimization best_optimization = self.select_best_optimization( @@ -1421,6 +1466,11 @@ def determine_best_candidate( ) if best_optimization: + if parallel_pool_size > 1: + best_optimization = self._run_line_profiler_for_winner( + best_optimization, code_context, original_helper_code, eval_ctx + ) + self.log_evaluation_results( eval_ctx=eval_ctx, best_optimization=best_optimization, @@ -1431,6 +1481,248 @@ def determine_best_candidate( return best_optimization + def _evaluate_candidates_parallel( + self, + processor: CandidateProcessor, + code_context: CodeOptimizationContext, + original_code_baseline: OriginalCodeBaseline, + original_helper_code: dict[Path, str], + file_path_to_helper_classes: dict[Path, set[str]], + eval_ctx: CandidateEvaluationContext, + exp_type: str, + function_references: str, + normalized_original: str, + pool_size: int, + ) -> None: + """Evaluate candidates in parallel using git worktrees and async subprocess execution.""" + from codeflash.optimization.parallel_evaluator import run_parallel_evaluation + + ai_service_client = self.aiservice_client if exp_type == "EXP0" else self.local_aiservice_client + assert ai_service_client is not None + + candidate_index = 0 + + while not processor.is_done(): + batch: list[tuple[CandidateNode, int, str | None]] = [] + while len(batch) < pool_size: + candidate_node = processor.get_next_candidate() + if candidate_node is None: + break + candidate_index += 1 + cached = processor.normalized_cache.get(candidate_node.candidate.optimization_id) + + normalized_code = cached or self.language_support.normalize_code( + candidate_node.candidate.source_code.flat.strip() + ) + if normalized_code == normalized_original: + logger.info(f"h3|Candidate {candidate_index}: Identical to original code, skipping.") + continue + if normalized_code in eval_ctx.ast_code_to_id: + logger.info(f"h3|Candidate {candidate_index}: Duplicate of a previous candidate, skipping.") + eval_ctx.handle_duplicate_candidate( + candidate_node.candidate, normalized_code, code_context.read_writable_code.flat + ) + continue + + eval_ctx.register_new_candidate( + normalized_code, candidate_node.candidate, code_context.read_writable_code.flat + ) + batch.append((candidate_node, candidate_index, cached)) + + if not batch: + break + + logger.info(f"Evaluating batch of {len(batch)} candidates in parallel…") + + results, _, _ = run_parallel_evaluation( + optimizer=self, + candidates=batch, + code_context=code_context, + original_code_baseline=original_code_baseline, + original_helper_code=original_helper_code, + file_path_to_helper_classes=file_path_to_helper_classes, + pool_size=pool_size, + ) + + # Process results and dispatch refinement/repair futures immediately + batch_refiner_candidates: list[AIServiceBatchRefinerCandidate] = [] + for (candidate_node, _idx, _), (_, run_result) in zip(batch, results): + candidate = candidate_node.candidate + + if run_result is None or not is_successful(run_result): + eval_ctx.record_failed_candidate(candidate.optimization_id) + if run_result is not None and isinstance(run_result, Failure): + eval_failure = run_result.failure() + repair_future = self._dispatch_repair_if_possible( + candidate, + eval_ctx, + code_context, + exp_type, + ai_service_client, + test_diffs=eval_failure.diffs, + ) + if repair_future is not None: + self.future_all_code_repair.append(repair_future) + continue + + candidate_result = run_result.unwrap() + perf_gain = performance_gain( + original_runtime_ns=original_code_baseline.runtime, + optimized_runtime_ns=candidate_result.best_test_runtime, + ) + eval_ctx.record_successful_candidate( + candidate.optimization_id, candidate_result.best_test_runtime, perf_gain + ) + + is_successful_opt = speedup_critic( + candidate_result, + original_code_baseline.runtime, + best_runtime_until_now=None, + original_async_throughput=original_code_baseline.async_throughput, + best_throughput_until_now=None, + original_concurrency_metrics=original_code_baseline.concurrency_metrics, + best_concurrency_ratio_until_now=None, + ) and quantity_of_tests_critic(candidate_result) + + if is_successful_opt: + empty_lp = {"timings": {}, "unit": 0, "str_out": ""} + best_optimization = BestOptimization( + candidate=candidate, + helper_functions=code_context.helper_functions, + code_context=code_context, + runtime=candidate_result.best_test_runtime, + line_profiler_test_results=empty_lp, + winning_behavior_test_results=candidate_result.behavior_test_results, + winning_benchmarking_test_results=candidate_result.benchmarking_test_results, + winning_replay_benchmarking_test_results=None, + async_throughput=candidate_result.async_throughput, + concurrency_metrics=candidate_result.concurrency_metrics, + ) + eval_ctx.valid_optimizations.append(best_optimization) + + current_tree_candidates = candidate_node.path_to_root() + is_candidate_refined_before = any( + c.source == OptimizedCandidateSource.REFINE for c in current_tree_candidates + ) + + if is_candidate_refined_before: + future_adaptive = self.call_adaptive_optimize( + trace_id=self.get_trace_id(exp_type), + original_source_code=code_context.read_writable_code.markdown, + prev_candidates=current_tree_candidates, + eval_ctx=eval_ctx, + ai_service_client=ai_service_client, + ) + if future_adaptive: + self.future_adaptive_optimizations.append(future_adaptive) + else: + batch_refiner_candidates.append( + AIServiceBatchRefinerCandidate( + optimization_id=candidate.optimization_id, + optimized_source_code=candidate.source_code.markdown, + optimized_explanation=candidate.explanation, + optimized_code_runtime=candidate_result.best_test_runtime, + original_code_runtime=original_code_baseline.runtime, + speedup=f"{int(perf_gain * 100)}%", + optimized_line_profiler_results="", + ) + ) + + # Dispatch refinement immediately so CandidateProcessor sees it + if batch_refiner_candidates: + self._dispatch_refinement( + batch_refiner_candidates, + code_context, + original_code_baseline, + exp_type, + function_references, + ai_service_client, + ) + + def _dispatch_refinement( + self, + batch_refiner_candidates: list[AIServiceBatchRefinerCandidate], + code_context: CodeOptimizationContext, + original_code_baseline: OriginalCodeBaseline, + exp_type: str, + function_references: str, + ai_service_client: AiServiceClient, + ) -> None: + """Submit refinement request to thread pool so CandidateProcessor can consume results.""" + if len(batch_refiner_candidates) > 1: + future = self.executor.submit( + ai_service_client.optimize_code_refinement_batch, + original_source_code=code_context.read_writable_code.markdown, + read_only_dependency_code=code_context.read_only_context_code, + original_line_profiler_results=original_code_baseline.line_profile_results["str_out"], + trace_id=self.get_trace_id(exp_type), + language=self.function_to_optimize.language, + language_version=self.language_support.language_version, + function_references=function_references, + candidates=batch_refiner_candidates, + rerun_trace_id=self.rerun_trace_id, + ) + else: + c = batch_refiner_candidates[0] + future = self.executor.submit( + ai_service_client.optimize_code_refinement, + request=[ + AIServiceRefinerRequest( + optimization_id=c.optimization_id, + original_source_code=code_context.read_writable_code.markdown, + read_only_dependency_code=code_context.read_only_context_code, + original_code_runtime=c.original_code_runtime, + optimized_source_code=c.optimized_source_code, + optimized_explanation=c.optimized_explanation, + optimized_code_runtime=c.optimized_code_runtime, + speedup=c.speedup, + trace_id=self.get_trace_id(exp_type), + original_line_profiler_results=original_code_baseline.line_profile_results["str_out"], + optimized_line_profiler_results=c.optimized_line_profiler_results, + function_references=function_references, + language=self.function_to_optimize.language, + language_version=self.language_support.language_version, + ) + ], + rerun_trace_id=self.rerun_trace_id, + ) + self.future_all_refinements.append(future) + + def _dispatch_repair_if_possible( + self, + candidate: OptimizedCandidate, + eval_ctx: CandidateEvaluationContext, + code_context: CodeOptimizationContext, + exp_type: str, + ai_service_client: AiServiceClient, + test_diffs: list[TestDiff] | None = None, + ) -> concurrent.futures.Future | None: + """Submit a code repair request if the candidate is eligible.""" + if not test_diffs: + return None + + max_repairs = get_effort_value(EffortKeys.MAX_CODE_REPAIRS_PER_TRACE, self.effort) + if self.repair_counter >= max_repairs: + return None + + successful_candidates_count = sum(1 for is_correct in eval_ctx.is_correct.values() if is_correct) + if successful_candidates_count >= MIN_CORRECT_CANDIDATES: + return None + + if candidate.source not in (OptimizedCandidateSource.OPTIMIZE, OptimizedCandidateSource.OPTIMIZE_LP): + return None + + self.repair_counter += 1 + request = AIServiceCodeRepairRequest( + optimization_id=candidate.optimization_id, + original_source_code=code_context.read_writable_code.markdown, + modified_source_code=candidate.source_code.markdown, + test_diffs=test_diffs or [], + trace_id=self.function_trace_id[:-4] + exp_type if self.experiment_id else self.function_trace_id, + language=self.function_to_optimize.language, + ) + return self.executor.submit(ai_service_client.code_repair, request=request, rerun_trace_id=self.rerun_trace_id) + def call_adaptive_optimize( self, trace_id: str, diff --git a/codeflash/languages/python/test_runner.py b/codeflash/languages/python/test_runner.py index 550f6bb05..d92ebdf2d 100644 --- a/codeflash/languages/python/test_runner.py +++ b/codeflash/languages/python/test_runner.py @@ -10,7 +10,6 @@ from codeflash.cli_cmds.console import logger from codeflash.code_utils.code_utils import custom_addopts -from codeflash.code_utils.shell_utils import get_cross_platform_subprocess_run_args from codeflash.languages.registry import get_language_support # Pattern to extract timing from stdout markers: !######...:######! @@ -92,11 +91,35 @@ def _ensure_runtime_files(project_root: Path, language: str = "javascript") -> N def execute_test_subprocess( cmd_list: list[str], cwd: Path, env: dict[str, str] | None, timeout: int = 600 -) -> subprocess.CompletedProcess: +) -> subprocess.CompletedProcess[str]: """Execute a subprocess with the given command list, working directory, environment variables, and timeout.""" logger.debug(f"executing test run with command: {' '.join(cmd_list)}") with custom_addopts(): - run_args = get_cross_platform_subprocess_run_args( - cwd=cwd, env=env, timeout=timeout, check=False, text=True, capture_output=True - ) - return subprocess.run(cmd_list, **run_args) # noqa: PLW1510 + return subprocess.run(cmd_list, cwd=cwd, env=env, timeout=timeout, check=False, text=True, capture_output=True) + + +async def async_execute_test_subprocess( + cmd_list: list[str], cwd: Path, env: dict[str, str] | None, timeout: int = 600 +) -> subprocess.CompletedProcess[str]: + """Execute a test subprocess asynchronously using anyio.""" + import os as _os + + import anyio + + logger.debug(f"async executing test run with command: {' '.join(cmd_list)}") + + merged_env = _os.environ.copy() + if env: + merged_env.update(env) + + with custom_addopts(): + try: + with anyio.fail_after(timeout): + result = await anyio.run_process(cmd_list, cwd=cwd, env=merged_env, check=False) + except TimeoutError as e: + raise subprocess.TimeoutExpired(cmd_list, timeout) from e + + stdout = result.stdout.decode("utf-8", errors="replace") if result.stdout else "" + stderr = result.stderr.decode("utf-8", errors="replace") if result.stderr else "" + + return subprocess.CompletedProcess(args=cmd_list, returncode=result.returncode, stdout=stdout, stderr=stderr) diff --git a/codeflash/models/function_types.py b/codeflash/models/function_types.py index bea6672b0..f5fa82bf4 100644 --- a/codeflash/models/function_types.py +++ b/codeflash/models/function_types.py @@ -12,14 +12,9 @@ from pydantic import Field from pydantic.dataclasses import dataclass +from codeflash.models.shared_types import FunctionParent -@dataclass(frozen=True) -class FunctionParent: - name: str - type: str - - def __str__(self) -> str: - return f"{self.type}:{self.name}" +__all__ = ["FunctionParent", "FunctionToOptimize"] @dataclass(frozen=True, config={"arbitrary_types_allowed": True}) diff --git a/codeflash/models/models.py b/codeflash/models/models.py index 640e5230a..4e2cea2a9 100644 --- a/codeflash/models/models.py +++ b/codeflash/models/models.py @@ -14,6 +14,7 @@ from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, ValidationError, model_validator from pydantic.dataclasses import dataclass +from codeflash.models.shared_types import OptimizedCandidateSource from codeflash.models.test_type import TestType if TYPE_CHECKING: @@ -50,6 +51,17 @@ class AIServiceRefinerRequest: additional_context_files: dict[str, str] | None = None # {filepath: content} for imported modules +@dataclass(frozen=True) +class AIServiceBatchRefinerCandidate: + optimization_id: str + optimized_source_code: str + optimized_explanation: str + optimized_code_runtime: int + original_code_runtime: int + speedup: str + optimized_line_profiler_results: str + + # this should be possible to auto serialize @dataclass(frozen=True) class AdaptiveOptimizedCandidate: @@ -298,11 +310,11 @@ def flat(self) -> str: """ if self._cache.get("flat") is not None: - return self._cache["flat"] + return cast("str", self._cache["flat"]) self._cache["flat"] = "\n".join( get_code_block_splitter(block.file_path) + "\n" + block.code for block in self.code_strings ) - return self._cache["flat"] + return cast("str", self._cache["flat"]) @property def markdown(self) -> str: @@ -332,7 +344,7 @@ def file_to_path(self) -> dict[str, str]: """ try: - return self._cache["file_to_path"] + return cast("dict[str, str]", self._cache["file_to_path"]) except KeyError: mapping = {str(code_string.file_path): code_string.code for code_string in self.code_strings} self._cache["file_to_path"] = mapping @@ -494,7 +506,7 @@ def _normalize_path_for_comparison(path: Path) -> str: # Only lowercase on Windows where filesystem is case-insensitive return resolved.lower() if sys.platform == "win32" else resolved - def __iter__(self) -> Iterator[TestFile]: + def __iter__(self) -> Iterator[TestFile]: # type: ignore[override] return iter(self.test_files) def __len__(self) -> int: @@ -514,9 +526,9 @@ class CandidateEvaluationContext: optimized_runtimes: dict[str, float | None] = Field(default_factory=dict) is_correct: dict[str, bool] = Field(default_factory=dict) optimized_line_profiler_results: dict[str, str] = Field(default_factory=dict) - ast_code_to_id: dict = Field(default_factory=dict) + ast_code_to_id: dict[str, Any] = Field(default_factory=dict) optimizations_post: dict[str, str] = Field(default_factory=dict) - valid_optimizations: list = Field(default_factory=list) + valid_optimizations: list[Any] = Field(default_factory=list) def record_failed_candidate(self, optimization_id: str) -> None: """Record results for a failed candidate.""" @@ -543,7 +555,7 @@ def handle_duplicate_candidate( # Copy results from the previous evaluation (use .get() in case past_opt_id was registered # but never benchmarked due to an unhandled exception in process_single_candidate) self.speedup_ratios[candidate.optimization_id] = self.speedup_ratios.get(past_opt_id) - self.is_correct[candidate.optimization_id] = self.is_correct.get(past_opt_id) + self.is_correct[candidate.optimization_id] = self.is_correct.get(past_opt_id, False) self.optimized_runtimes[candidate.optimization_id] = self.optimized_runtimes.get(past_opt_id) # Line profiler results only available for successful runs @@ -592,15 +604,6 @@ class TestsInFile: test_type: TestType -class OptimizedCandidateSource(str, Enum): - OPTIMIZE = "OPTIMIZE" - OPTIMIZE_LP = "OPTIMIZE_LP" - REFINE = "REFINE" - REPAIR = "REPAIR" - ADAPTIVE = "ADAPTIVE" - JIT_REWRITE = "JIT_REWRITE" - - @dataclass(frozen=True) class OptimizedCandidate: source_code: CodeStringsMarkdown @@ -631,7 +634,7 @@ class OriginalCodeBaseline(BaseModel): behavior_test_results: TestResults benchmarking_test_results: TestResults replay_benchmarking_test_results: Optional[dict[BenchmarkKey, TestResults]] = None - line_profile_results: dict + line_profile_results: dict[str, Any] runtime: int coverage_results: Optional[CoverageData] async_throughput: Optional[int] = None @@ -794,6 +797,7 @@ def get_src_code(self, test_path: Path) -> Optional[str]: ) if self.test_class_name: + assert self.test_function_name is not None for stmt in module_node.body: if isinstance(stmt, cst.ClassDef) and stmt.name.value == self.test_class_name: func_node = self.find_func_in_class(stmt, self.test_function_name) @@ -884,7 +888,7 @@ def group_by_benchmarks( """Group TestResults by benchmark for calculating improvements for each benchmark.""" from codeflash.code_utils.code_utils import module_name_from_file_path - test_results_by_benchmark = defaultdict(TestResults) + test_results_by_benchmark: defaultdict[BenchmarkKey, TestResults] = defaultdict(TestResults) benchmark_module_path = {} for benchmark_key in benchmark_keys: benchmark_module_path[benchmark_key] = module_name_from_file_path( @@ -1015,7 +1019,7 @@ def effective_loop_count(self) -> int: return max(loop_indices) if loop_indices else 0 def file_to_no_of_tests(self, test_functions_to_remove: list[str]) -> Counter[Path]: - map_gen_test_file_to_no_of_tests = Counter() + map_gen_test_file_to_no_of_tests: Counter[Path] = Counter() for gen_test_result in self.test_results: if ( gen_test_result.test_type == TestType.GENERATED_REGRESSION @@ -1024,7 +1028,7 @@ def file_to_no_of_tests(self, test_functions_to_remove: list[str]) -> Counter[Pa map_gen_test_file_to_no_of_tests[gen_test_result.file_name] += 1 return map_gen_test_file_to_no_of_tests - def __iter__(self) -> Iterator[FunctionTestInvocation]: + def __iter__(self) -> Iterator[FunctionTestInvocation]: # type: ignore[override] return iter(self.test_results) def __len__(self) -> int: @@ -1051,7 +1055,7 @@ def __eq__(self, other: object) -> bool: if len(self) != len(other): return False original_recursion_limit = sys.getrecursionlimit() - cast("TestResults", other) + assert isinstance(other, TestResults) for test_result in self: other_test_result = other.get_by_unique_invocation_loop_id(test_result.unique_invocation_loop_id) if other_test_result is None: diff --git a/codeflash/models/shared_types.py b/codeflash/models/shared_types.py new file mode 100644 index 000000000..4390b3d04 --- /dev/null +++ b/codeflash/models/shared_types.py @@ -0,0 +1,52 @@ +"""Shared types for cross-repo use between codeflash CLI and codeflash-internal server. + +This module defines types that are duplicated or shared between the client (CLI) +and the server. Centralizing them here allows both sides to import from a single +source of truth. +""" + +from __future__ import annotations + +from enum import Enum + +from pydantic.dataclasses import dataclass + +# --- Enums --- + + +class OptimizedCandidateSource(str, Enum): + OPTIMIZE = "OPTIMIZE" + OPTIMIZE_LP = "OPTIMIZE_LP" + REFINE = "REFINE" + REPAIR = "REPAIR" + ADAPTIVE = "ADAPTIVE" + JIT_REWRITE = "JIT_REWRITE" + + +# --- Models --- + + +@dataclass(frozen=True) +class FunctionParent: + name: str + type: str + + def __str__(self) -> str: + return f"{self.type}:{self.name}" + + +# --- Constants: Language identifiers --- + +LANGUAGE_PYTHON = "python" +LANGUAGE_JAVASCRIPT = "javascript" +LANGUAGE_TYPESCRIPT = "typescript" +LANGUAGE_JAVA = "java" + +SUPPORTED_LANGUAGES = frozenset({LANGUAGE_PYTHON, LANGUAGE_JAVASCRIPT, LANGUAGE_TYPESCRIPT, LANGUAGE_JAVA}) + +# --- Constants: Test type names --- + +TEST_TYPE_EXISTING_UNIT = "existing_unit_test" +TEST_TYPE_GENERATED_REGRESSION = "generated_regression" +TEST_TYPE_REPLAY = "replay_test" +TEST_TYPE_CONCOLIC_COVERAGE = "concolic_coverage_test" diff --git a/codeflash/optimization/parallel_evaluator.py b/codeflash/optimization/parallel_evaluator.py new file mode 100644 index 000000000..63f0320b4 --- /dev/null +++ b/codeflash/optimization/parallel_evaluator.py @@ -0,0 +1,393 @@ +# mypy: ignore-errors +from __future__ import annotations + +import dataclasses +import subprocess +from pathlib import Path +from typing import TYPE_CHECKING + +import anyio + +from codeflash.cli_cmds.console import logger +from codeflash.code_utils.code_utils import get_run_tmp_file +from codeflash.code_utils.config_consts import INDIVIDUAL_TESTCASE_TIMEOUT, TOTAL_LOOPING_TIME_EFFECTIVE +from codeflash.code_utils.worktree_pool import WorktreePool, WorktreeSlot # noqa: TC001 +from codeflash.either import Failure, Success + +if TYPE_CHECKING: + from codeflash.either import Result + from codeflash.languages.function_optimizer import CandidateNode, FunctionOptimizer + from codeflash.models.models import ( + CodeOptimizationContext, + OptimizedCandidate, + OptimizedCandidateResult, + OriginalCodeBaseline, + TestDiff, + TestResults, + ) + + +@dataclasses.dataclass(slots=True) +class EvalFailure: + """Structured failure from parallel evaluation, carrying test diffs for repair.""" + + message: str + diffs: list[TestDiff] = dataclasses.field(default_factory=list) + + +@dataclasses.dataclass(slots=True) +class _BehavioralPass: + """Intermediate result: candidate passed behavioral tests, ready for benchmarking.""" + + candidate_index: int + perf_test_files: list[str] + test_env: dict[str, str] + pytest_cmd_list: list[str] + behavior_test_results: TestResults + fto_code: str + helper_codes: dict[Path, str] + fto_file_path: Path + + +class ParallelCandidateEvaluator: + """Evaluates optimization candidates in parallel using git worktrees. + + Two-phase evaluation: + Phase 1 (concurrent): behavioral correctness tests — slots released after each test + Phase 2 (sequential): benchmarking — one candidate at a time for accurate timing + """ + + def __init__(self, optimizer: FunctionOptimizer, pool_size: int = 4) -> None: + self._optimizer = optimizer + self._pool_size = pool_size + self._pool: WorktreePool | None = None + self._replace_lock = anyio.Lock() + + async def evaluate_candidates( + self, + candidates: list[tuple[CandidateNode, int, str | None]], + code_context: CodeOptimizationContext, + original_code_baseline: OriginalCodeBaseline, + original_helper_code: dict[Path, str], + file_path_to_helper_classes: dict[Path, set[str]], + ) -> list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]]: + """Evaluate candidates: behavioral tests concurrently, benchmarks sequentially.""" + results: list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]] = [ + (node, None) for node, _, _ in candidates + ] + + if not candidates: + return results + + async with WorktreePool(pool_size=self._pool_size) as pool: + self._pool = pool + + # Phase 1: concurrent behavioral tests (slots released after each test) + behavioral_passes: list[tuple[int, CandidateNode, _BehavioralPass]] = [] + + async with anyio.create_task_group() as tg: + for i, (node, idx, _cached) in enumerate(candidates): + tg.start_soon( + self._behavioral_phase, + i, + node, + idx, + code_context, + original_code_baseline, + original_helper_code, + results, + behavioral_passes, + ) + + # Phase 2: sequential benchmarking (no CPU contention) + for result_index, candidate_node, bp in behavioral_passes: + slot = await pool.acquire() + try: + bench_result = await self._benchmark_phase(slot, bp, original_code_baseline) + results[result_index] = (candidate_node, bench_result) + except Exception as exc: + logger.error(f"Benchmark for {candidate_node.candidate.optimization_id} raised: {exc}") + results[result_index] = (candidate_node, Failure(EvalFailure(message=str(exc)))) + finally: + await pool.release(slot) + + return results + + async def _behavioral_phase( + self, + result_index: int, + candidate_node: CandidateNode, + candidate_index: int, + code_context: CodeOptimizationContext, + original_code_baseline: OriginalCodeBaseline, + original_helper_code: dict[Path, str], + results: list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]], + behavioral_passes: list[tuple[int, CandidateNode, _BehavioralPass]], + ) -> None: + """Run behavioral tests for a candidate. Slot is always released after the test.""" + assert self._pool is not None + slot = await self._pool.acquire() + try: + outcome = await self._run_behavioral( + slot=slot, + candidate=candidate_node.candidate, + candidate_index=candidate_index, + code_context=code_context, + original_code_baseline=original_code_baseline, + original_helper_code=original_helper_code, + ) + except BaseException: + await self._pool.release(slot) + raise + + await self._pool.release(slot) + + if isinstance(outcome, Failure): + results[result_index] = (candidate_node, outcome) + return + + behavioral_passes.append((result_index, candidate_node, outcome.unwrap())) + + async def _run_behavioral( + self, + slot: WorktreeSlot, + candidate: OptimizedCandidate, + candidate_index: int, + code_context: CodeOptimizationContext, + original_code_baseline: OriginalCodeBaseline, + original_helper_code: dict[Path, str], + ) -> Result[_BehavioralPass, EvalFailure]: + """Run behavioral tests in a worktree. Returns pass info or failure.""" + opt = self._optimizer + fto = opt.function_to_optimize + + # Serialize main-tree access: replace_and_capture writes/reads/restores shared files + async with self._replace_lock: + candidate_files = await anyio.to_thread.run_sync( + self._replace_and_capture, opt, code_context, candidate, original_helper_code + ) + + if candidate_files is None: + return Failure(EvalFailure(message="Code replacement failed")) + + fto_code, helper_codes = candidate_files + await slot.write_candidate(Path(fto.file_path), fto_code) + for module_abspath, helper_code in helper_codes.items(): + await slot.write_candidate(module_abspath, helper_code) + + # Copy instrumented test files into the worktree + behavior_test_files: list[str] = [] + perf_test_files: list[str] = [] + for file in opt.test_files.test_files: + src = file.instrumented_behavior_file_path + if src.exists(): + await slot.write_candidate(src, src.read_text(encoding="utf-8")) + behavior_test_files.append(str(slot.mirror(src))) + + if file.benchmarking_file_path and file.benchmarking_file_path.exists(): + await slot.write_candidate( + file.benchmarking_file_path, file.benchmarking_file_path.read_text(encoding="utf-8") + ) + perf_test_files.append(str(slot.mirror(file.benchmarking_file_path))) + + # Build test environment and command + test_env = opt.get_test_env( + codeflash_loop_index=0, codeflash_test_iteration=candidate_index, codeflash_tracer_disable=1 + ) + worktree_project_root = slot.mirror(Path(opt.args.project_root)) + test_env["PYTHONPATH"] = str(worktree_project_root) + + from codeflash.code_utils.compat import IS_POSIX, SAFE_SYS_EXECUTABLE + from codeflash.languages.python.test_runner import async_execute_test_subprocess + + pytest_cmd_list = opt.language_support.build_pytest_cmd(SAFE_SYS_EXECUTABLE, IS_POSIX) # type: ignore[attr-defined] + + blocklisted_plugins = ["benchmark", "codspeed", "xdist", "sugar"] + blocklist_args = [f"-p no:{plugin}" for plugin in blocklisted_plugins] + + result_file_path = get_run_tmp_file(Path(f"pytest_results_candidate_{candidate_index}_{slot.index}.xml")) + result_args = [f"--junitxml={result_file_path.as_posix()}", "-o", "junit_logging=all"] + + pytest_test_env = test_env.copy() + pytest_test_env["PYTEST_PLUGINS"] = "codeflash.verification.pytest_plugin" + + common_pytest_args = [ + "--capture=tee-sys", + "-q", + "--codeflash_loops_scope=session", + "--codeflash_min_loops=1", + "--codeflash_max_loops=1", + f"--codeflash_seconds={TOTAL_LOOPING_TIME_EFFECTIVE}", + f"--timeout={INDIVIDUAL_TESTCASE_TIMEOUT}", + ] + + cmd = pytest_cmd_list + common_pytest_args + blocklist_args + result_args + behavior_test_files + + try: + behavior_result = await async_execute_test_subprocess( + cmd_list=cmd, cwd=slot.path, env=pytest_test_env, timeout=600 + ) + except subprocess.TimeoutExpired: + logger.warning(f"Behavioral test timeout for candidate {candidate_index}") + return Failure(EvalFailure(message="Behavioral test timeout")) + + from codeflash.verification.parse_test_output import parse_test_xml + + behavior_test_results = parse_test_xml( + result_file_path, test_files=opt.test_files, test_config=opt.test_cfg, run_result=behavior_result + ) + + if not behavior_test_results.test_results: + return Failure(EvalFailure(message="No behavioral test results")) + + from codeflash.verification.equivalence import compare_test_results + + match, diffs = compare_test_results( + original_code_baseline.behavior_test_results, behavior_test_results, pass_fail_only=True + ) + + if not match: + return Failure(EvalFailure(message=f"Behavioral mismatch: {len(diffs)} diffs", diffs=diffs)) + + return Success( + _BehavioralPass( + candidate_index=candidate_index, + perf_test_files=perf_test_files, + test_env=pytest_test_env, + pytest_cmd_list=pytest_cmd_list, + behavior_test_results=behavior_test_results, + fto_code=fto_code, + helper_codes=helper_codes, + fto_file_path=Path(fto.file_path), + ) + ) + + async def _benchmark_phase( + self, slot: WorktreeSlot, bp: _BehavioralPass, original_code_baseline: OriginalCodeBaseline + ) -> Result[OptimizedCandidateResult, EvalFailure]: + """Run performance benchmarks sequentially for a candidate that passed behavioral tests.""" + opt = self._optimizer + + # Re-stage the candidate code in the acquired slot + await slot.write_candidate(bp.fto_file_path, bp.fto_code) + for module_path, code in bp.helper_codes.items(): + await slot.write_candidate(module_path, code) + + for file in opt.test_files.test_files: + if file.benchmarking_file_path and file.benchmarking_file_path.exists(): + await slot.write_candidate( + file.benchmarking_file_path, file.benchmarking_file_path.read_text(encoding="utf-8") + ) + + blocklisted_plugins = ["benchmark", "codspeed", "xdist", "sugar"] + blocklist_args = [f"-p no:{plugin}" for plugin in blocklisted_plugins] + + perf_result_file = get_run_tmp_file(Path(f"pytest_perf_candidate_{bp.candidate_index}_{slot.index}.xml")) + perf_result_args = [f"--junitxml={perf_result_file.as_posix()}", "-o", "junit_logging=all"] + + perf_pytest_args = [ + "--capture=tee-sys", + "-q", + "--codeflash_loops_scope=session", + "--codeflash_min_loops=5", + "--codeflash_max_loops=250", + f"--codeflash_seconds={TOTAL_LOOPING_TIME_EFFECTIVE}", + f"--timeout={INDIVIDUAL_TESTCASE_TIMEOUT}", + ] + + perf_cmd = bp.pytest_cmd_list + perf_pytest_args + blocklist_args + perf_result_args + bp.perf_test_files + + from codeflash.languages.python.test_runner import async_execute_test_subprocess + + try: + await async_execute_test_subprocess(cmd_list=perf_cmd, cwd=slot.path, env=bp.test_env, timeout=600) + except subprocess.TimeoutExpired: + logger.warning(f"Performance test timeout for candidate {bp.candidate_index}") + return Failure(EvalFailure(message="Performance test timeout")) + + from codeflash.verification.parse_test_output import parse_test_xml + + perf_test_results = parse_test_xml(perf_result_file, test_files=opt.test_files, test_config=opt.test_cfg) + + if not perf_test_results.test_results: + return Failure(EvalFailure(message="No performance test results")) + + loop_count = perf_test_results.effective_loop_count() + total_timing = perf_test_results.total_passed_runtime() + + if total_timing == 0: + return Failure(EvalFailure(message="Zero runtime for optimized candidate")) + + from codeflash.models.models import OptimizedCandidateResult + + return Success( + OptimizedCandidateResult( + max_loop_count=loop_count, + best_test_runtime=total_timing, + behavior_test_results=bp.behavior_test_results, + benchmarking_test_results=perf_test_results, + replay_benchmarking_test_results=None, + optimization_candidate_index=bp.candidate_index, + total_candidate_timing=total_timing, + async_throughput=None, + concurrency_metrics=None, + ) + ) + + @staticmethod + def _replace_and_capture( + opt: FunctionOptimizer, + code_context: CodeOptimizationContext, + candidate: OptimizedCandidate, + original_helper_code: dict[Path, str], + ) -> tuple[str, dict[Path, str]] | None: + """Apply code replacement to main tree, capture the result, restore original.""" + fto = opt.function_to_optimize + try: + did_update = opt.replace_function_and_helpers_with_optimized_code( + code_context=code_context, + optimized_code=candidate.source_code, + original_helper_code=original_helper_code, + ) + if not did_update: + return None + + fto_code = Path(fto.file_path).read_text("utf-8") + helper_codes = {Path(p): Path(p).read_text("utf-8") for p in original_helper_code} + return fto_code, helper_codes + except Exception as e: + logger.error(f"Code replacement failed: {e}") + return None + finally: + try: + opt.write_code_and_helpers(opt.function_to_optimize_source_code, original_helper_code, fto.file_path) + except Exception as restore_err: + logger.error(f"Failed to restore main tree after code replacement: {restore_err}") + + +def run_parallel_evaluation( + optimizer: FunctionOptimizer, + candidates: list[tuple[CandidateNode, int, str | None]], + code_context: CodeOptimizationContext, + original_code_baseline: OriginalCodeBaseline, + original_helper_code: dict[Path, str], + file_path_to_helper_classes: dict[Path, set[str]], + pool_size: int = 4, +) -> tuple[list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]], list, list]: + """Entry point: run parallel candidate evaluation from sync code via anyio. + + Returns (eval_results, [], []). The empty lists maintain backward compatibility. + """ + evaluator = ParallelCandidateEvaluator(optimizer, pool_size=pool_size) + + async def _run() -> list[tuple[CandidateNode, Result[OptimizedCandidateResult, EvalFailure] | None]]: + return await evaluator.evaluate_candidates( + candidates=candidates, + code_context=code_context, + original_code_baseline=original_code_baseline, + original_helper_code=original_helper_code, + file_path_to_helper_classes=file_path_to_helper_classes, + ) + + results = anyio.run(_run) + return results, [], [] diff --git a/pyproject.toml b/pyproject.toml index 0a14b35e5..c37692d3d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,7 @@ dependencies = [ "tomlkit>=0.14.0", "attrs>=26.1.0", "requests>=2.32.5", + "anyio>=4.4.0", "junitparser>=4.0.2", "pydantic>=2.13.3", "humanize>=4.13.0", diff --git a/tests/test_parallel_evaluator.py b/tests/test_parallel_evaluator.py new file mode 100644 index 000000000..bb02e7735 --- /dev/null +++ b/tests/test_parallel_evaluator.py @@ -0,0 +1,552 @@ +"""Integration tests for the parallel candidate evaluation infrastructure.""" + +from __future__ import annotations + +import subprocess +import sys +from pathlib import Path +from typing import Any +from unittest.mock import MagicMock, patch + +import anyio +import pytest + +from codeflash.either import Failure, Success, is_successful +from codeflash.languages.function_optimizer import CandidateNode +from codeflash.optimization.parallel_evaluator import EvalFailure, ParallelCandidateEvaluator + + +class TestWorktreePoolLifecycle: + def test_creates_n_worktrees_and_cleans_up(self, tmp_path: Path) -> None: + from unittest.mock import patch + + from codeflash.code_utils.worktree_pool import WorktreePool + + pool_size = 3 + base_dir = tmp_path.resolve() / "worktrees" + + # The pool needs a git root. We use the codeflash repo itself. + repo_root = Path(__file__).resolve().parents[1] + + async def _run() -> None: + with patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root): + pool = WorktreePool(pool_size=pool_size, base_dir=base_dir) + async with pool: + assert len(pool._slots) == pool_size + for slot in pool._slots: + assert slot.path.exists() + assert slot.path.is_dir() + + # After cleanup, slots are cleared + assert len(pool._slots) == 0 + + anyio.run(_run) + + def test_partial_pool_initialization(self, tmp_path: Path) -> None: + """Pool operates at reduced capacity if some slots fail to create.""" + from unittest.mock import patch + + from codeflash.code_utils.worktree_pool import WorktreePool + + pool_size = 3 + base_dir = tmp_path.resolve() / "worktrees" + repo_root = Path(__file__).resolve().parents[1] + + call_count = 0 + + original_create_slot = WorktreePool._create_slot + + async def failing_create_slot(self: Any, index: int) -> Any: + nonlocal call_count + call_count += 1 + if index == 1: + raise RuntimeError("Simulated git worktree failure") + return await original_create_slot(self, index) + + async def _run() -> None: + with ( + patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root), + patch.object(WorktreePool, "_create_slot", failing_create_slot), + ): + async with WorktreePool(pool_size=pool_size, base_dir=base_dir) as pool: + assert len(pool._slots) == 2 + slot = await pool.acquire() + assert slot.index != 1 + await pool.release(slot) + + anyio.run(_run) + + def test_acquire_release_round_trip(self, tmp_path: Path) -> None: + from unittest.mock import patch + + from codeflash.code_utils.worktree_pool import WorktreePool + + pool_size = 2 + base_dir = tmp_path.resolve() / "worktrees" + repo_root = Path(__file__).resolve().parents[1] + + async def _run() -> None: + with patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root): + async with WorktreePool(pool_size=pool_size, base_dir=base_dir) as pool: + slot1 = await pool.acquire() + slot2 = await pool.acquire() + + # Both slots should be distinct + assert slot1.index != slot2.index + assert slot1.path != slot2.path + + # Release one and re-acquire it + await pool.release(slot1) + reacquired = await pool.acquire() + assert reacquired.index == slot1.index + + await pool.release(slot2) + await pool.release(reacquired) + + anyio.run(_run) + + +class TestWorktreeSlotFileIsolation: + def test_write_to_one_slot_does_not_affect_another(self, tmp_path: Path) -> None: + from unittest.mock import patch + + from codeflash.code_utils.worktree_pool import WorktreePool + + pool_size = 2 + base_dir = tmp_path.resolve() / "worktrees" + repo_root = Path(__file__).resolve().parents[1] + test_file = repo_root / "codeflash" / "__init__.py" + + async def _run() -> None: + with patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root): + async with WorktreePool(pool_size=pool_size, base_dir=base_dir) as pool: + slot_a = await pool.acquire() + slot_b = await pool.acquire() + + sentinel = "# SLOT_A_SENTINEL_CONTENT\n" + await slot_a.write_candidate(test_file, sentinel) + + # slot_b's mirror of the same file should NOT contain the sentinel + mirrored_b = slot_b.mirror(test_file) + content_b = mirrored_b.read_text(encoding="utf-8") + assert sentinel not in content_b + + # slot_a's mirror should contain it + mirrored_a = slot_a.mirror(test_file) + content_a = mirrored_a.read_text(encoding="utf-8") + assert content_a == sentinel + + # Main tree should be unaffected + main_content = test_file.read_text(encoding="utf-8") + assert sentinel not in main_content + + await pool.release(slot_a) + await pool.release(slot_b) + + anyio.run(_run) + + +class TestAsyncExecuteTestSubprocess: + def test_runs_simple_command(self) -> None: + from codeflash.languages.python.test_runner import async_execute_test_subprocess + + cwd = Path(__file__).resolve().parent + + async def _run() -> subprocess.CompletedProcess[str]: + return await async_execute_test_subprocess( + cmd_list=[sys.executable, "-c", "print('hello world')"], cwd=cwd, env=None, timeout=30 + ) + + result = anyio.run(_run) + assert result.returncode == 0 + assert "hello world" in result.stdout + + def test_captures_stderr(self) -> None: + from codeflash.languages.python.test_runner import async_execute_test_subprocess + + cwd = Path(__file__).resolve().parent + + async def _run() -> subprocess.CompletedProcess[str]: + return await async_execute_test_subprocess( + cmd_list=[sys.executable, "-c", "import sys; sys.stderr.write('err_msg\\n')"], + cwd=cwd, + env=None, + timeout=30, + ) + + result = anyio.run(_run) + assert "err_msg" in result.stderr + + def test_timeout_raises(self) -> None: + from codeflash.languages.python.test_runner import async_execute_test_subprocess + + cwd = Path(__file__).resolve().parent + + async def _run() -> subprocess.CompletedProcess[str]: + return await async_execute_test_subprocess( + cmd_list=[sys.executable, "-c", "import time; time.sleep(60)"], cwd=cwd, env=None, timeout=1 + ) + + with pytest.raises(subprocess.TimeoutExpired): + anyio.run(_run) + + +class TestParallelCandidateEvaluator: + """Unit tests for the evaluator with mocked worktree operations.""" + + def _make_candidate_node(self, opt_id: str = "cand_1") -> CandidateNode: + from codeflash.models.models import CodeString, CodeStringsMarkdown, OptimizedCandidate + from codeflash.models.shared_types import OptimizedCandidateSource + + source_code = CodeStringsMarkdown(code_strings=[CodeString(code="def f(): pass", file_path=Path("test.py"))]) + candidate = OptimizedCandidate( + source_code=source_code, + explanation="test optimization", + optimization_id=opt_id, + source=OptimizedCandidateSource.OPTIMIZE, + ) + return CandidateNode(candidate) + + def _make_optimizer_mock(self, tmp_path: Path) -> MagicMock: + opt = MagicMock() + opt.function_to_optimize.file_path = str(tmp_path / "src" / "module.py") + opt.function_to_optimize_source_code = "def f(): pass" + opt.test_files.test_files = [] + opt.args.project_root = str(tmp_path) + opt.test_cfg = MagicMock() + opt.get_test_env.return_value = {"PATH": "/usr/bin"} + opt.language_support.build_pytest_cmd.return_value = [sys.executable, "-m", "pytest"] + opt.replace_function_and_helpers_with_optimized_code.return_value = True + opt.write_code_and_helpers = MagicMock() + return opt + + def test_code_replacement_failure_returns_eval_failure(self, tmp_path: Path) -> None: + opt = self._make_optimizer_mock(tmp_path) + opt.replace_function_and_helpers_with_optimized_code.return_value = False + + node = self._make_candidate_node() + evaluator = ParallelCandidateEvaluator(opt, pool_size=1) + + repo_root = Path(__file__).resolve().parents[1] + + async def _run() -> list: # type: ignore[type-arg] + with patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root): + return await evaluator.evaluate_candidates( + candidates=[(node, 0, None)], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + results = anyio.run(_run) + assert len(results) == 1 + _, result = results[0] + assert result is not None + assert not is_successful(result) + failure = result.failure() + assert isinstance(failure, EvalFailure) + assert "Code replacement failed" in failure.message + assert failure.diffs == [] + + def test_behavioral_mismatch_carries_diffs(self, tmp_path: Path) -> None: + from codeflash.models.models import TestDiff, TestDiffScope + + opt = self._make_optimizer_mock(tmp_path) + (tmp_path / "src").mkdir(parents=True) + (tmp_path / "src" / "module.py").write_text("def f(): pass", encoding="utf-8") + + node = self._make_candidate_node() + evaluator = ParallelCandidateEvaluator(opt, pool_size=1) + + repo_root = Path(__file__).resolve().parents[1] + mock_diffs = [TestDiff(scope=TestDiffScope.DID_PASS, original_pass=True, candidate_pass=False)] + + async def _run() -> list: # type: ignore[type-arg] + with ( + patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root), + patch.object( + ParallelCandidateEvaluator, + "_run_behavioral", + return_value=Failure(EvalFailure(message="Behavioral mismatch: 1 diffs", diffs=mock_diffs)), # type: ignore[arg-type] + ), + ): + return await evaluator.evaluate_candidates( + candidates=[(node, 0, None)], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + results = anyio.run(_run) + _, result = results[0] + assert not is_successful(result) + failure = result.failure() + assert len(failure.diffs) == 1 + assert failure.diffs[0].scope == TestDiffScope.DID_PASS + + def test_successful_candidate_returns_result(self, tmp_path: Path) -> None: + from codeflash.optimization.parallel_evaluator import _BehavioralPass + + opt = self._make_optimizer_mock(tmp_path) + (tmp_path / "src").mkdir(parents=True) + (tmp_path / "src" / "module.py").write_text("def f(): pass", encoding="utf-8") + + node = self._make_candidate_node() + evaluator = ParallelCandidateEvaluator(opt, pool_size=1) + + repo_root = Path(__file__).resolve().parents[1] + mock_result = MagicMock() + mock_result.best_test_runtime = 5000 + + mock_behavior_results = MagicMock() + + async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg] + return Success( + _BehavioralPass( + candidate_index=0, + perf_test_files=[], + test_env={}, + pytest_cmd_list=[], + behavior_test_results=mock_behavior_results, + fto_code="def f(): pass", + helper_codes={}, + fto_file_path=Path("/tmp/module.py"), + ) + ) + + async def _run() -> list: # type: ignore[type-arg] + with ( + patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root), + patch.object(ParallelCandidateEvaluator, "_run_behavioral", mock_behavioral), + patch.object(ParallelCandidateEvaluator, "_benchmark_phase", return_value=Success(mock_result)), + ): + return await evaluator.evaluate_candidates( + candidates=[(node, 0, None)], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + results = anyio.run(_run) + _, result = results[0] + assert is_successful(result) + assert result.unwrap().best_test_runtime == 5000 + + def test_multiple_candidates_evaluated_concurrently(self, tmp_path: Path) -> None: + from codeflash.optimization.parallel_evaluator import _BehavioralPass + + opt = self._make_optimizer_mock(tmp_path) + (tmp_path / "src").mkdir(parents=True) + (tmp_path / "src" / "module.py").write_text("def f(): pass", encoding="utf-8") + + nodes = [self._make_candidate_node(f"cand_{i}") for i in range(3)] + evaluator = ParallelCandidateEvaluator(opt, pool_size=3) + + repo_root = Path(__file__).resolve().parents[1] + mock_result = MagicMock() + mock_result.best_test_runtime = 1000 + + behavioral_call_count = 0 + mock_behavior_results = MagicMock() + + async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg] + nonlocal behavioral_call_count + behavioral_call_count += 1 + return Success( + _BehavioralPass( + candidate_index=0, + perf_test_files=[], + test_env={}, + pytest_cmd_list=[], + behavior_test_results=mock_behavior_results, + fto_code="def f(): pass", + helper_codes={}, + fto_file_path=Path("/tmp/module.py"), + ) + ) + + benchmark_call_count = 0 + + async def mock_benchmark(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg] + nonlocal benchmark_call_count + benchmark_call_count += 1 + return Success(mock_result) + + async def _run() -> list: # type: ignore[type-arg] + with ( + patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root), + patch.object(ParallelCandidateEvaluator, "_run_behavioral", mock_behavioral), + patch.object(ParallelCandidateEvaluator, "_benchmark_phase", mock_benchmark), + ): + return await evaluator.evaluate_candidates( + candidates=[(n, i, None) for i, n in enumerate(nodes)], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + results = anyio.run(_run) + assert len(results) == 3 + assert behavioral_call_count == 3 + assert benchmark_call_count == 3 + for _, result in results: + assert is_successful(result) + + def test_benchmark_phase_restages_candidate_code(self, tmp_path: Path) -> None: + """Phase 2 must write fto_code and helper_codes to the slot before running benchmarks.""" + from codeflash.optimization.parallel_evaluator import _BehavioralPass + + opt = self._make_optimizer_mock(tmp_path) + (tmp_path / "src").mkdir(parents=True) + (tmp_path / "src" / "module.py").write_text("def f(): pass", encoding="utf-8") + + node = self._make_candidate_node() + evaluator = ParallelCandidateEvaluator(opt, pool_size=1) + + repo_root = Path(__file__).resolve().parents[1] + fto_code = "def f(): return 42 # optimized" + helper_path = tmp_path / "src" / "helpers.py" + helper_codes = {helper_path: "HELPER_CODE = True"} + + write_calls: list[tuple[Path, str]] = [] + + async def tracking_write_candidate(self_slot: object, file_path: Path, code: str) -> None: + write_calls.append((file_path, code)) + + async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg] + return Success( + _BehavioralPass( + candidate_index=0, + perf_test_files=[], + test_env={"PATH": "/usr/bin"}, + pytest_cmd_list=[sys.executable, "-m", "pytest"], + behavior_test_results=MagicMock(), + fto_code=fto_code, + helper_codes=helper_codes, + fto_file_path=Path(opt.function_to_optimize.file_path), + ) + ) + + async def _run() -> list: # type: ignore[type-arg] + with ( + patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root), + patch.object(ParallelCandidateEvaluator, "_run_behavioral", mock_behavioral), + patch( + "codeflash.code_utils.worktree_pool.WorktreeSlot.write_candidate", tracking_write_candidate + ), + patch( + "codeflash.languages.python.test_runner.async_execute_test_subprocess", + return_value=MagicMock(returncode=0, stdout="", stderr=""), + ), + patch( + "codeflash.verification.parse_test_output.parse_test_xml", + return_value=MagicMock(test_results=[MagicMock()], effective_loop_count=lambda: 10, total_passed_runtime=lambda: 5000), + ), + ): + return await evaluator.evaluate_candidates( + candidates=[(node, 0, None)], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + anyio.run(_run) + + written_codes = {p: c for p, c in write_calls} + assert Path(opt.function_to_optimize.file_path) in written_codes + assert written_codes[Path(opt.function_to_optimize.file_path)] == fto_code + assert helper_path in written_codes + assert written_codes[helper_path] == "HELPER_CODE = True" + + def test_empty_candidates_returns_empty(self, tmp_path: Path) -> None: + opt = self._make_optimizer_mock(tmp_path) + evaluator = ParallelCandidateEvaluator(opt, pool_size=1) + repo_root = Path(__file__).resolve().parents[1] + + async def _run() -> list: # type: ignore[type-arg] + with patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root): + return await evaluator.evaluate_candidates( + candidates=[], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + results = anyio.run(_run) + assert results == [] + + def test_replace_and_capture_restores_on_failure(self, tmp_path: Path) -> None: + """_replace_and_capture must restore original code even when replacement raises.""" + opt = self._make_optimizer_mock(tmp_path) + (tmp_path / "src").mkdir(parents=True) + original_code = "def f(): pass" + (tmp_path / "src" / "module.py").write_text(original_code, encoding="utf-8") + + opt.replace_function_and_helpers_with_optimized_code.side_effect = ValueError("bad code") + + result = ParallelCandidateEvaluator._replace_and_capture( + opt, MagicMock(), MagicMock(), {} + ) + assert result is None + opt.write_code_and_helpers.assert_called_once_with( + opt.function_to_optimize_source_code, {}, opt.function_to_optimize.file_path + ) + + def test_more_candidates_than_slots_no_deadlock(self, tmp_path: Path) -> None: + """Regression test: more passing candidates than pool slots must not deadlock.""" + from codeflash.optimization.parallel_evaluator import _BehavioralPass + + opt = self._make_optimizer_mock(tmp_path) + (tmp_path / "src").mkdir(parents=True) + (tmp_path / "src" / "module.py").write_text("def f(): pass", encoding="utf-8") + + nodes = [self._make_candidate_node(f"cand_{i}") for i in range(6)] + evaluator = ParallelCandidateEvaluator(opt, pool_size=2) + + repo_root = Path(__file__).resolve().parents[1] + mock_result = MagicMock() + mock_result.best_test_runtime = 2000 + mock_behavior_results = MagicMock() + + async def mock_behavioral(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg] + return Success( + _BehavioralPass( + candidate_index=0, + perf_test_files=[], + test_env={}, + pytest_cmd_list=[], + behavior_test_results=mock_behavior_results, + fto_code="def f(): pass", + helper_codes={}, + fto_file_path=Path("/tmp/module.py"), + ) + ) + + async def mock_benchmark(self_eval: object, *args: object, **kwargs: object) -> Success: # type: ignore[type-arg] + return Success(mock_result) + + async def _run() -> list: # type: ignore[type-arg] + with ( + patch("codeflash.code_utils.worktree_pool.git_root_dir", return_value=repo_root), + patch.object(ParallelCandidateEvaluator, "_run_behavioral", mock_behavioral), + patch.object(ParallelCandidateEvaluator, "_benchmark_phase", mock_benchmark), + ): + return await evaluator.evaluate_candidates( + candidates=[(n, i, None) for i, n in enumerate(nodes)], + code_context=MagicMock(), + original_code_baseline=MagicMock(), + original_helper_code={}, + file_path_to_helper_classes={}, + ) + + # If this deadlocks, the test will timeout + results = anyio.run(_run) + assert len(results) == 6 + for _, result in results: + assert is_successful(result) diff --git a/uv.lock b/uv.lock index 6a2b2a0f0..89cea85ae 100644 --- a/uv.lock +++ b/uv.lock @@ -55,6 +55,56 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/75/f9/f1c10e223c7b56a38109a3f2eb4e7fe9a757ea3ed3a166754fb30f65e466/ansicon-1.89.0-py2.py3-none-any.whl", hash = "sha256:f1def52d17f65c2c9682cf8370c03f541f410c1752d6a14029f97318e4b9dfec", size = 63675, upload-time = "2019-04-29T20:23:53.83Z" }, ] +[[package]] +name = "anyio" +version = "4.12.1" +source = { registry = "https://pypi.org/simple" } +resolution-markers = [ + "python_full_version >= '3.9.2' and python_full_version < '3.10'", + "python_full_version < '3.9.2'", +] +dependencies = [ + { name = "exceptiongroup", marker = "python_full_version < '3.10'" }, + { name = "idna", marker = "python_full_version < '3.10'" }, + { name = "typing-extensions", marker = "python_full_version < '3.10'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/96/f0/5eb65b2bb0d09ac6776f2eb54adee6abe8228ea05b20a5ad0e4945de8aac/anyio-4.12.1.tar.gz", hash = "sha256:41cfcc3a4c85d3f05c932da7c26d0201ac36f72abd4435ba90d0464a3ffed703", size = 228685, upload-time = "2026-01-06T11:45:21.246Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/38/0e/27be9fdef66e72d64c0cdc3cc2823101b80585f8119b5c112c2e8f5f7dab/anyio-4.12.1-py3-none-any.whl", hash = "sha256:d405828884fc140aa80a3c667b8beed277f1dfedec42ba031bd6ac3db606ab6c", size = 113592, upload-time = "2026-01-06T11:45:19.497Z" }, +] + +[[package]] +name = "anyio" +version = "4.13.0" +source = { registry = "https://pypi.org/simple" } +resolution-markers = [ + "python_full_version >= '3.15' and sys_platform == 'win32'", + "python_full_version >= '3.15' and sys_platform == 'emscripten'", + "python_full_version >= '3.15' and sys_platform != 'emscripten' and sys_platform != 'win32'", + "python_full_version == '3.14.*' and sys_platform == 'win32'", + "python_full_version == '3.14.*' and sys_platform == 'emscripten'", + "python_full_version == '3.14.*' and sys_platform != 'emscripten' and sys_platform != 'win32'", + "python_full_version == '3.13.*' and sys_platform == 'win32'", + "python_full_version == '3.13.*' and sys_platform == 'emscripten'", + "python_full_version == '3.13.*' and sys_platform != 'emscripten' and sys_platform != 'win32'", + "python_full_version == '3.12.*' and sys_platform == 'win32'", + "python_full_version == '3.12.*' and sys_platform == 'emscripten'", + "python_full_version == '3.12.*' and sys_platform != 'emscripten' and sys_platform != 'win32'", + "python_full_version == '3.11.*' and sys_platform == 'win32'", + "python_full_version == '3.11.*' and sys_platform == 'emscripten'", + "python_full_version == '3.11.*' and sys_platform != 'emscripten' and sys_platform != 'win32'", + "python_full_version == '3.10.*'", +] +dependencies = [ + { name = "exceptiongroup", marker = "python_full_version == '3.10.*'" }, + { name = "idna", marker = "python_full_version >= '3.10'" }, + { name = "typing-extensions", marker = "python_full_version >= '3.10' and python_full_version < '3.13'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/19/14/2c5dd9f512b66549ae92767a9c7b330ae88e1932ca57876909410251fe13/anyio-4.13.0.tar.gz", hash = "sha256:334b70e641fd2221c1505b3890c69882fe4a2df910cba14d97019b90b24439dc", size = 231622, upload-time = "2026-03-24T12:59:09.671Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/da/42/e921fccf5015463e32a3cf6ee7f980a6ed0f395ceeaa45060b61d86486c2/anyio-4.13.0-py3-none-any.whl", hash = "sha256:08b310f9e24a9594186fd75b4f73f4a4152069e3853f1ed8bfbf58369f4ad708", size = 114353, upload-time = "2026-03-24T12:59:08.246Z" }, +] + [[package]] name = "asttokens" version = "3.0.1" @@ -456,6 +506,8 @@ wheels = [ name = "codeflash" source = { editable = "." } dependencies = [ + { name = "anyio", version = "4.12.1", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" }, + { name = "anyio", version = "4.13.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, { name = "attrs" }, { name = "click", version = "8.1.8", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" }, { name = "click", version = "8.3.3", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, @@ -584,6 +636,7 @@ tests = [ [package.metadata] requires-dist = [ + { name = "anyio", specifier = ">=4.4.0" }, { name = "attrs", specifier = ">=26.1.0" }, { name = "click", specifier = ">=8.1.8" }, { name = "codeflash-benchmark", editable = "codeflash-benchmark" }, @@ -947,13 +1000,13 @@ version = "0.0.103" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "importlib-metadata", version = "8.7.1", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" }, - { name = "importlib-metadata", version = "9.0.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, - { name = "packaging" }, - { name = "pygls" }, - { name = "typeshed-client" }, - { name = "typing-extensions" }, - { name = "typing-inspect" }, - { name = "z3-solver" }, + { name = "importlib-metadata", version = "9.0.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10' and python_full_version < '3.15'" }, + { name = "packaging", marker = "python_full_version < '3.15'" }, + { name = "pygls", marker = "python_full_version < '3.15'" }, + { name = "typeshed-client", marker = "python_full_version < '3.15'" }, + { name = "typing-extensions", marker = "python_full_version < '3.15'" }, + { name = "typing-inspect", marker = "python_full_version < '3.15'" }, + { name = "z3-solver", marker = "python_full_version < '3.15'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/78/28/56b5f1a4aa37d927c479012ae477acd67a5d14b4c6e4c65c1dcb33da99a0/crosshair_tool-0.0.103.tar.gz", hash = "sha256:02a2247ee79ba6d3b46e248199897539d8a26d4c5dc96821a12f34ebca715e81", size = 484767, upload-time = "2026-04-19T19:41:17.951Z" } wheels = [ @@ -1163,7 +1216,7 @@ name = "exceptiongroup" version = "1.3.1" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "typing-extensions", marker = "python_full_version < '3.13'" }, + { name = "typing-extensions", marker = "python_full_version < '3.11'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/50/79/66800aadf48771f6b62f7eb014e352e5d06856655206165d775e675a02c9/exceptiongroup-1.3.1.tar.gz", hash = "sha256:8b412432c6055b0b7d14c310000ae93352ed6754f70fa8f7c34141f91c4e3219", size = 30371, upload-time = "2025-11-21T23:01:54.787Z" } wheels = [ @@ -1506,7 +1559,7 @@ resolution-markers = [ "python_full_version == '3.10.*'", ] dependencies = [ - { name = "zipp", marker = "python_full_version >= '3.10'" }, + { name = "zipp", marker = "python_full_version >= '3.10' and python_full_version < '3.15'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/a9/01/15bb152d77b21318514a96f43af312635eb2500c96b55398d020c93d86ea/importlib_metadata-9.0.0.tar.gz", hash = "sha256:a4f57ab599e6a2e3016d7595cfd72eb4661a5106e787a95bcc90c7105b831efc", size = 56405, upload-time = "2026-03-20T06:42:56.999Z" } wheels = [ @@ -2428,7 +2481,7 @@ resolution-markers = [ "python_full_version == '3.10.*'", ] dependencies = [ - { name = "uc-micro-py", version = "2.0.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, + { name = "uc-micro-py", version = "2.0.0", source = { registry = "https://pypi.org/simple" }, marker = "(python_full_version >= '3.10' and sys_platform != 'win32') or (python_full_version == '3.10.*' and sys_platform == 'win32')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/2e/c9/06ea13676ef354f0af6169587ae292d3e2406e212876a413bf9eece4eb23/linkify_it_py-2.1.0.tar.gz", hash = "sha256:43360231720999c10e9328dc3691160e27a718e280673d444c38d7d3aaa3b98b", size = 29158, upload-time = "2026-03-01T07:48:47.683Z" } wheels = [ @@ -2727,7 +2780,7 @@ wheels = [ [package.optional-dependencies] linkify = [ - { name = "linkify-it-py", version = "2.1.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, + { name = "linkify-it-py", version = "2.1.0", source = { registry = "https://pypi.org/simple" }, marker = "(python_full_version >= '3.10' and sys_platform != 'win32') or (python_full_version == '3.10.*' and sys_platform == 'win32')" }, ] [[package]] @@ -2872,7 +2925,7 @@ resolution-markers = [ "python_full_version == '3.10.*'", ] dependencies = [ - { name = "markdown-it-py", version = "4.0.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, + { name = "markdown-it-py", version = "4.0.0", source = { registry = "https://pypi.org/simple" }, marker = "(python_full_version >= '3.10' and sys_platform != 'win32') or (python_full_version == '3.10.*' and sys_platform == 'win32')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/b2/fd/a756d36c0bfba5f6e39a1cdbdbfdd448dc02692467d83816dff4592a1ebc/mdit_py_plugins-0.5.0.tar.gz", hash = "sha256:f4918cb50119f50446560513a8e311d574ff6aaed72606ddae6d35716fe809c6", size = 44655, upload-time = "2025-08-11T07:25:49.083Z" } wheels = [ @@ -2893,9 +2946,9 @@ name = "memray" version = "1.19.3" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "jinja2" }, - { name = "rich" }, - { name = "textual" }, + { name = "jinja2", marker = "python_full_version < '3.11' or sys_platform != 'win32'" }, + { name = "rich", marker = "python_full_version < '3.11' or sys_platform != 'win32'" }, + { name = "textual", marker = "python_full_version < '3.11' or sys_platform != 'win32'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/96/04/5b886a36df947599e0f37cd46e6e44e565299815f044e2303ab2ae9f8870/memray-1.19.3.tar.gz", hash = "sha256:4e0fb29ff0a50c0ec9dc84294d8f2c83419feba561a37628b304c2ae4fe73d03", size = 2417089, upload-time = "2026-04-08T18:49:32.409Z" } wheels = [ @@ -4145,7 +4198,7 @@ name = "pexpect" version = "4.9.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "ptyprocess" }, + { name = "ptyprocess", marker = "(python_full_version < '3.11' and sys_platform == 'emscripten') or (python_full_version < '3.11' and sys_platform == 'win32') or (sys_platform != 'emscripten' and sys_platform != 'win32')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/42/92/cc564bf6381ff43ce1f4d06852fc19a2f11d180f23dc32d9588bee2f149d/pexpect-4.9.0.tar.gz", hash = "sha256:ee7d41123f3c9911050ea2c2dac107568dc43b2d3b0c7557a33212c398ead30f", size = 166450, upload-time = "2023-11-25T09:07:26.339Z" } wheels = [ @@ -4785,9 +4838,9 @@ name = "pytest-memray" version = "1.8.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "memray" }, + { name = "memray", marker = "python_full_version < '3.11' or sys_platform != 'win32'" }, { name = "pytest", version = "8.4.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" }, - { name = "pytest", version = "9.0.3", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, + { name = "pytest", version = "9.0.3", source = { registry = "https://pypi.org/simple" }, marker = "(python_full_version >= '3.10' and sys_platform != 'win32') or (python_full_version == '3.10.*' and sys_platform == 'win32')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/3d/28/f67963efed56d847d028d0bb939f26cdeb32c4de474b3befc9da43bf18f9/pytest_memray-1.8.0.tar.gz", hash = "sha256:c0c706ef81941a7aa7064f2b3b8b5cdc0cea72b5277c6a6a09b113ca9ab30bdb", size = 240608, upload-time = "2025-08-18T17:32:47.329Z" } wheels = [ @@ -5785,14 +5838,14 @@ version = "8.2.4" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "markdown-it-py", version = "3.0.0", source = { registry = "https://pypi.org/simple" }, extra = ["linkify"], marker = "python_full_version < '3.10'" }, - { name = "markdown-it-py", version = "4.0.0", source = { registry = "https://pypi.org/simple" }, extra = ["linkify"], marker = "python_full_version >= '3.10'" }, + { name = "markdown-it-py", version = "4.0.0", source = { registry = "https://pypi.org/simple" }, extra = ["linkify"], marker = "(python_full_version >= '3.10' and sys_platform != 'win32') or (python_full_version == '3.10.*' and sys_platform == 'win32')" }, { name = "mdit-py-plugins", version = "0.4.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" }, - { name = "mdit-py-plugins", version = "0.5.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, + { name = "mdit-py-plugins", version = "0.5.0", source = { registry = "https://pypi.org/simple" }, marker = "(python_full_version >= '3.10' and sys_platform != 'win32') or (python_full_version == '3.10.*' and sys_platform == 'win32')" }, { name = "platformdirs", version = "4.4.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" }, - { name = "platformdirs", version = "4.9.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, - { name = "pygments" }, - { name = "rich" }, - { name = "typing-extensions" }, + { name = "platformdirs", version = "4.9.6", source = { registry = "https://pypi.org/simple" }, marker = "(python_full_version >= '3.10' and sys_platform != 'win32') or (python_full_version == '3.10.*' and sys_platform == 'win32')" }, + { name = "pygments", marker = "python_full_version < '3.11' or sys_platform != 'win32'" }, + { name = "rich", marker = "python_full_version < '3.11' or sys_platform != 'win32'" }, + { name = "typing-extensions", marker = "python_full_version < '3.11' or sys_platform != 'win32'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/19/89/bec5709fb759f9c784bbcb30b2e3497df3f901691d13c2b864dbf6694a17/textual-8.2.4.tar.gz", hash = "sha256:d4e2b2ddd7157191d00b228592b7c739ea080b7d792fd410f23ca75f05ea76c4", size = 1848933, upload-time = "2026-04-19T04:20:45.845Z" } wheels = [ @@ -7057,8 +7110,8 @@ version = "2.10.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "importlib-resources", version = "6.5.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" }, - { name = "importlib-resources", version = "7.1.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, - { name = "typing-extensions" }, + { name = "importlib-resources", version = "7.1.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10' and python_full_version < '3.15'" }, + { name = "typing-extensions", marker = "python_full_version < '3.15'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/46/de/cc79c33f6268740567ea109c11809e3c799daf5c1c5aeffb9c3f3b052dbe/typeshed_client-2.10.0.tar.gz", hash = "sha256:906bf343595aed4a120ccc0a35dde2d85cae8c15d015703a768541291e38cfc3", size = 522565, upload-time = "2026-04-18T04:27:36.234Z" } wheels = [ @@ -7079,8 +7132,8 @@ name = "typing-inspect" version = "0.9.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "mypy-extensions" }, - { name = "typing-extensions" }, + { name = "mypy-extensions", marker = "python_full_version < '3.15'" }, + { name = "typing-extensions", marker = "python_full_version < '3.15'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/dc/74/1789779d91f1961fa9438e9a8710cdae6bd138c80d7303996933d117264a/typing_inspect-0.9.0.tar.gz", hash = "sha256:b23fc42ff6f6ef6954e4852c1fb512cdd18dbea03134f91f856a95ccc9461f78", size = 13825, upload-time = "2023-05-24T20:25:47.612Z" } wheels = [