Skip to content

Commit aa7d582

Browse files
committed
feat: add WorktreePool for isolated git worktree management
Pool of N git worktree slots with async acquire/release semantics. Each slot provides write_candidate() and mirror() for file isolation.
1 parent 1df9a1a commit aa7d582

2 files changed

Lines changed: 128 additions & 0 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,3 +429,4 @@ code_to_optimize/**/package-lock.json
429429

430430
# Other tools
431431
.codeflash/
432+
.codeflash_eval_worktrees/
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
from __future__ import annotations
2+
3+
import contextlib
4+
import functools
5+
import os
6+
import shutil
7+
import stat
8+
from pathlib import Path
9+
from typing import TYPE_CHECKING, Any
10+
11+
import anyio
12+
13+
if TYPE_CHECKING:
14+
from collections.abc import Callable
15+
from typing import Self
16+
17+
from codeflash.cli_cmds.console import logger
18+
from codeflash.code_utils.git_utils import git_root_dir, mirror_path
19+
20+
21+
class WorktreeSlot:
22+
__slots__ = ("_git_root", "index", "path")
23+
24+
def __init__(self, path: Path, index: int, git_root: Path) -> None:
25+
self.path = path
26+
self.index = index
27+
self._git_root = git_root
28+
29+
def mirror(self, original_path: Path) -> Path:
30+
return mirror_path(original_path, self._git_root, self.path)
31+
32+
async def write_candidate(self, file_path: Path, code: str) -> None:
33+
mirrored = anyio.Path(self.mirror(file_path))
34+
await mirrored.parent.mkdir(parents=True, exist_ok=True)
35+
await mirrored.write_text(code, encoding="utf-8")
36+
37+
async def restore_file(self, file_path: Path, original_code: str) -> None:
38+
mirrored = anyio.Path(self.mirror(file_path))
39+
await mirrored.write_text(original_code, encoding="utf-8")
40+
41+
42+
class WorktreePool:
43+
def __init__(self, pool_size: int = 4, base_dir: Path | None = None) -> None:
44+
self._pool_size = pool_size
45+
self._git_root = git_root_dir()
46+
self._base_dir = base_dir or (self._git_root / ".codeflash_eval_worktrees")
47+
self._slots: list[WorktreeSlot] = []
48+
self._send: anyio.abc.ObjectSendStream[WorktreeSlot] | None = None
49+
self._receive: anyio.abc.ObjectReceiveStream[WorktreeSlot] | None = None
50+
self._initialized = False
51+
52+
async def initialize(self) -> None:
53+
if self._initialized:
54+
return
55+
await anyio.Path(self._base_dir).mkdir(parents=True, exist_ok=True)
56+
57+
async with anyio.create_task_group() as tg:
58+
results: list[WorktreeSlot | None] = [None] * self._pool_size
59+
for i in range(self._pool_size):
60+
tg.start_soon(self._create_slot_task, i, results)
61+
62+
self._slots = [s for s in results if s is not None]
63+
self._send, self._receive = anyio.create_memory_object_stream[WorktreeSlot](self._pool_size)
64+
for slot in self._slots:
65+
await self._send.send(slot)
66+
self._initialized = True
67+
logger.debug(f"WorktreePool initialized with {len(self._slots)} slots at {self._base_dir}")
68+
69+
async def _create_slot_task(self, index: int, results: list[WorktreeSlot | None]) -> None:
70+
results[index] = await self._create_slot(index)
71+
72+
async def _create_slot(self, index: int) -> WorktreeSlot:
73+
slot_dir = self._base_dir / f"slot-{index}"
74+
if slot_dir.exists():
75+
await anyio.to_thread.run_sync(functools.partial(shutil.rmtree, slot_dir, onerror=_handle_remove_readonly))
76+
77+
result = await anyio.run_process(
78+
["git", "-C", str(self._git_root), "worktree", "add", "--detach", str(slot_dir), "HEAD"], check=False
79+
)
80+
if result.returncode != 0:
81+
msg = f"Failed to create worktree slot {index}: {result.stderr.decode()}"
82+
raise RuntimeError(msg)
83+
84+
pid_file = anyio.Path(slot_dir / ".codeflash_pool.pid")
85+
await pid_file.write_text(str(os.getpid()), encoding="utf-8")
86+
87+
return WorktreeSlot(slot_dir, index, self._git_root)
88+
89+
async def acquire(self) -> WorktreeSlot:
90+
assert self._receive is not None
91+
return await self._receive.receive()
92+
93+
async def release(self, slot: WorktreeSlot) -> None:
94+
assert self._send is not None
95+
await self._send.send(slot)
96+
97+
async def cleanup(self) -> None:
98+
async with anyio.create_task_group() as tg:
99+
for slot in self._slots:
100+
tg.start_soon(self._remove_slot_async, slot)
101+
self._slots.clear()
102+
self._initialized = False
103+
104+
if self._base_dir.exists():
105+
with contextlib.suppress(Exception):
106+
await anyio.run_process(["git", "-C", str(self._git_root), "worktree", "prune"], check=False)
107+
with contextlib.suppress(OSError):
108+
self._base_dir.rmdir()
109+
110+
async def _remove_slot_async(self, slot: WorktreeSlot) -> None:
111+
if slot.path.exists():
112+
await anyio.to_thread.run_sync(functools.partial(shutil.rmtree, slot.path, onerror=_handle_remove_readonly))
113+
114+
async def __aenter__(self) -> Self:
115+
await self.initialize()
116+
return self
117+
118+
async def __aexit__(self, *exc: object) -> None:
119+
await self.cleanup()
120+
121+
122+
def _handle_remove_readonly(func: Callable[..., Any], path: str, exc_info: tuple[Any, ...]) -> None:
123+
if isinstance(exc_info[1], PermissionError):
124+
Path(path).chmod(stat.S_IWUSR | stat.S_IRUSR | stat.S_IXUSR)
125+
func(path)
126+
else:
127+
raise exc_info[1]

0 commit comments

Comments
 (0)