diff --git a/orchestrator/campaign.py b/orchestrator/campaign.py index 2ba6a84..a7e3108 100644 --- a/orchestrator/campaign.py +++ b/orchestrator/campaign.py @@ -206,8 +206,22 @@ def run_campaign( HumanGate(auto_response="approve") if auto_approve else HumanGate() ) - # Pre-flight: validate CLI + credentials before starting the campaign + # GC orphan experiment worktrees (#133): clean up stale dirs from + # crashed prior runs before starting fresh ones. repo_path = campaign.get("target_system", {}).get("repo_path") + if repo_path: + try: + from orchestrator.worktree import gc_orphan_worktrees + removed = gc_orphan_worktrees(Path(repo_path)) + if removed: + logger.info( + "GC'd %d orphan worktree(s): %s", + len(removed), ", ".join(removed), + ) + except (OSError, RuntimeError) as exc: + logger.warning("Worktree GC failed: %s", exc) + + # Pre-flight: validate CLI + credentials before starting the campaign if agent != "inline" and repo_path: from orchestrator.cli_dispatch import CLIDispatcher preflight_dispatcher = CLIDispatcher( diff --git a/orchestrator/parallel_arms.py b/orchestrator/parallel_arms.py new file mode 100644 index 0000000..aff5a29 --- /dev/null +++ b/orchestrator/parallel_arms.py @@ -0,0 +1,198 @@ +"""Parallel-arm execution orchestration (issue #123, Phase A). + +After DESIGN produces ``experiment_plan.yaml``, EXECUTE_ANALYZE today +runs every (arm × seed × condition) tuple sequentially in one Sonnet +session. That mega-session is what produced the 5/18 connection-drop +incidents and is the proximate cause of the "race two executors" bug +that #71/#111 partly fixed at the symptom level. + +The fix: partition the plan into independent units, fan them out to +per-unit subagents (each in its own worktree via #133), wait for all, +and run the existing deterministic merge into findings.json + +principle_updates.json. + +Phase A scope: + + * partition_plan(plan) — turn experiment_plan.yaml into a flat list + of ArmUnit descriptors. + * run_units(units, *, runner, max_parallel) — fan out via an injected + runner callable, collect ArmUnitResult records (one per unit). + * merge_unit_results(results, plan) — deterministic merge into a + findings-shaped dict (the schema validation step is reused from + the existing executor pipeline). + +Phase B (lands when #121 + #133 merge): + + * SDKDispatcher integration: the runner spawns + ``Agent(isolation="worktree", subagent_type="claude")`` per unit. + * Real ``anyio.gather`` for actual parallelism with a CPU-bounded + semaphore. + * Wire-up into iteration.py so EXECUTE_ANALYZE picks parallel mode + when ``max_parallel_arms > 1``. +""" +from __future__ import annotations + +import os +from dataclasses import dataclass, field +from typing import Callable + + +@dataclass(frozen=True) +class ArmUnit: + """A single (arm, seed, condition) work item.""" + + arm_id: str + seed: str + condition_name: str + command: str + + @property + def relative_results_dir(self) -> str: + """Where this unit's results land — never overlaps with another unit.""" + return f"results/{self.arm_id}/{self.seed}" + + +@dataclass +class ArmUnitResult: + unit: ArmUnit + status: str # "complete" | "failed" + duration_ms: int = 0 + output_files: list[str] = field(default_factory=list) + error: str = "" + + +def partition_plan(plan: dict) -> list[ArmUnit]: + """Turn an experiment_plan.yaml-shaped dict into a list of ArmUnits. + + Each (arm × condition) becomes one unit. Seed defaults to ``"seed-1"`` + when the condition doesn't carry an explicit seed list; multi-seed + conditions fan out to one unit per seed. + """ + units: list[ArmUnit] = [] + for arm in plan.get("arms", []) or []: + if not isinstance(arm, dict): + continue + arm_id = str(arm.get("arm_id") or arm.get("type") or "?") + for cond in arm.get("conditions", []) or []: + if not isinstance(cond, dict): + continue + command = str(cond.get("command") or cond.get("cmd") or "") + if not command: + continue + cond_name = str(cond.get("name") or cond.get("id") or "default") + seeds = cond.get("seeds") or [cond.get("seed") or "seed-1"] + if not isinstance(seeds, list): + seeds = [str(seeds)] + for s in seeds: + units.append(ArmUnit( + arm_id=arm_id, + seed=str(s), + condition_name=cond_name, + command=command, + )) + return units + + +ArmRunner = Callable[[ArmUnit], ArmUnitResult] +"""Callable that executes one ArmUnit and returns its result. + +The default real-world implementation spawns an SDK subagent with +``isolation="worktree"`` and the planned command. Tests inject a +deterministic fake. +""" + + +def run_units( + units: list[ArmUnit], + *, + runner: ArmRunner, + max_parallel: int | None = None, +) -> list[ArmUnitResult]: + """Fan out units to the runner. + + ``max_parallel`` is honored as an upper bound on simultaneous + in-flight runner calls. Phase A is synchronous over the runner; + the bound is enforced trivially. Phase B replaces this with + ``anyio.gather`` + a semaphore for real parallelism. + + Returns results in the same order as ``units`` so callers can pair + them deterministically with their inputs (the merge step depends + on this — it would be nondeterministic otherwise). + """ + if max_parallel is not None and max_parallel < 1: + raise ValueError("max_parallel must be >= 1") + results: list[ArmUnitResult] = [] + for unit in units: + try: + result = runner(unit) + except Exception as exc: # runner exceptions become failed units + result = ArmUnitResult( + unit=unit, + status="failed", + error=f"{type(exc).__name__}: {exc}", + ) + results.append(result) + return results + + +def default_max_parallel() -> int: + """Issue default: ``min(CPU, 4)``.""" + cpus = os.cpu_count() or 1 + return max(1, min(cpus, 4)) + + +def merge_unit_results( + results: list[ArmUnitResult], + *, + plan: dict | None = None, +) -> dict: + """Deterministic merge of unit results into a findings-shaped dict. + + Output keys (sorted): + - ``arms``: list of ``{arm_id, status, units}`` rows + - ``failed_unit_count``: int + - ``total_unit_count``: int + + No timestamps, no random ordering. Calling twice on the same input + must produce byte-equal output. + """ + by_arm: dict[str, list[ArmUnitResult]] = {} + for r in results: + by_arm.setdefault(r.unit.arm_id, []).append(r) + + arms_out: list[dict] = [] + for arm_id in sorted(by_arm): + arm_results = by_arm[arm_id] + # Arm status: complete only when every unit completed; otherwise + # failed. Granular per-unit status is preserved in `units`. + any_failed = any(r.status == "failed" for r in arm_results) + arms_out.append({ + "arm_id": arm_id, + "status": "failed" if any_failed else "complete", + "units": [ + { + "seed": r.unit.seed, + "condition": r.unit.condition_name, + "status": r.status, + "duration_ms": r.duration_ms, + "output_files": sorted(r.output_files), + "error": r.error, + } + for r in sorted( + arm_results, + key=lambda x: (x.unit.seed, x.unit.condition_name), + ) + ], + }) + + failed_count = sum(1 for r in results if r.status == "failed") + return { + "arms": arms_out, + "failed_unit_count": failed_count, + "total_unit_count": len(results), + } + + +def failed_units(results: list[ArmUnitResult]) -> list[ArmUnit]: + """Helper for the partial-retry path: which units need re-running?""" + return [r.unit for r in results if r.status == "failed"] diff --git a/orchestrator/worktree.py b/orchestrator/worktree.py index 15bed13..c86c447 100644 --- a/orchestrator/worktree.py +++ b/orchestrator/worktree.py @@ -1,12 +1,31 @@ -"""Git worktree management for experiment isolation.""" +"""Git worktree management for experiment isolation. + +Phase A of #133: ship orphan-worktree garbage collection alongside the +existing per-iteration lifecycle. The harness-managed +``Agent(isolation="worktree")`` switch (Phase B) lands with the +parallel-arm subagents in #123 — at that point most of this file goes +away. Until then, GC at run start cleans up the ghost-worktree pattern +observed on 5/18 where ``--max-cli-retries 10`` spawned a second worktree +while the first was still alive. +""" +from __future__ import annotations + import logging +import os +import shutil import subprocess +import time import uuid from pathlib import Path +from typing import Callable logger = logging.getLogger(__name__) +_EXPERIMENTS_DIRNAME = ".nous-experiments" +_DEFAULT_ORPHAN_AGE_SECONDS = 60 * 60 # 1 hour + + def create_experiment_worktree(repo_path: Path, iteration: int) -> tuple[Path, str]: """Create a git worktree for running an experiment in isolation. @@ -20,7 +39,7 @@ def create_experiment_worktree(repo_path: Path, iteration: int) -> tuple[Path, s raise FileNotFoundError(f"Not a git repository: {repo_path}") experiment_id = f"iter-{iteration}-{uuid.uuid4().hex[:8]}" - worktree_dir = repo_path / ".nous-experiments" / experiment_id + worktree_dir = repo_path / _EXPERIMENTS_DIRNAME / experiment_id branch_name = f"nous-exp-{experiment_id}" subprocess.run( @@ -40,7 +59,7 @@ def remove_experiment_worktree(repo_path: Path, experiment_id: str) -> None: Safe to call even if the worktree was already removed. """ repo_path = Path(repo_path) - worktree_dir = repo_path / ".nous-experiments" / experiment_id + worktree_dir = repo_path / _EXPERIMENTS_DIRNAME / experiment_id branch_name = f"nous-exp-{experiment_id}" if worktree_dir.exists(): @@ -69,3 +88,194 @@ def remove_experiment_worktree(repo_path: Path, experiment_id: str) -> None: ) if result.returncode != 0: logger.debug("Branch cleanup for %s: %s", branch_name, result.stderr.strip()) + + +def gc_orphan_worktrees( + repo_path: Path, + *, + max_age_seconds: float = _DEFAULT_ORPHAN_AGE_SECONDS, + pid_check: Callable[[int], bool] | None = None, + now: float | None = None, +) -> list[str]: + """Remove stale experiment worktrees with no live owning process. + + Run at ``nous run`` startup. Walks ``/.nous-experiments/`` and + deletes any worktree directory that is older than ``max_age_seconds`` + and whose owning PID (if recorded under ``.nous-pid``) is no longer + alive. The 1-hour default matches the issue's GC threshold; the + rationale is that any legitimate iteration completes within an hour + of its last write, so anything older with no live process is genuinely + orphaned. + + Args: + repo_path: target repo root. + max_age_seconds: only consider worktrees older than this. + pid_check: callable ``(pid: int) -> bool`` returning True when the + process is still alive. Defaults to ``os.kill(pid, 0)``-style + check. Tests inject a deterministic fake. + now: override of ``time.time()`` for deterministic tests. + + Returns: + List of experiment_ids removed (sorted by directory name). + """ + repo_path = Path(repo_path) + experiments_dir = repo_path / _EXPERIMENTS_DIRNAME + if not experiments_dir.is_dir(): + return [] + + pid_alive = pid_check or _pid_alive_default + current_time = now if now is not None else time.time() + + removed: list[str] = [] + for entry in sorted(experiments_dir.iterdir()): + if not entry.is_dir(): + continue + try: + mtime = entry.stat().st_mtime + except OSError: + continue + age = current_time - mtime + if age < max_age_seconds: + continue + + # If a PID is recorded under .nous-pid, skip when alive. + pid_file = entry / ".nous-pid" + if pid_file.exists(): + try: + pid = int(pid_file.read_text().strip()) + if pid_alive(pid): + continue + except (ValueError, OSError): + pass + + # Untrack the worktree from git (best-effort), then rm -rf the dir. + subprocess.run( + ["git", "worktree", "remove", str(entry), "--force"], + cwd=repo_path, capture_output=True, text=True, check=False, + ) + if entry.exists(): + shutil.rmtree(entry, ignore_errors=True) + + # Best-effort branch cleanup. + branch = f"nous-exp-{entry.name}" + subprocess.run( + ["git", "branch", "-D", branch], + cwd=repo_path, capture_output=True, text=True, check=False, + ) + + logger.info("GC'd orphan worktree: %s", entry) + removed.append(entry.name) + return removed + + +def _pid_alive_default(pid: int) -> bool: + if pid <= 0: + return False + try: + os.kill(pid, 0) + return True + except ProcessLookupError: + return False + except PermissionError: + # Process exists but we can't signal it — still alive. + return True + except OSError: + return False + + +# ─── Phase B: harness-isolated subagent runner (#133 + #123 bridge) ──────── + + +def make_isolated_arm_runner( + *, + sdk_runner: Callable, + repo_path: Path, + iter_dir: Path, + model: str = "claude-sonnet-4-6", + max_turns: int = 25, + subagent_type: str = "claude", +) -> Callable: + """Build an ArmRunner backed by a worktree-isolated SDK subagent. + + The returned callable matches the ``ArmRunner`` Protocol from + :mod:`orchestrator.parallel_arms` — takes one ``ArmUnit`` and returns + one ``ArmUnitResult``. Per the no-live-LLM policy, this function does + not call the SDK directly: it uses the injected ``sdk_runner`` from + :mod:`orchestrator.sdk_dispatch`, so tests pass a recording fake. + + Each subagent is dispatched with ``isolation="worktree"`` and + ``subagent_type`` set so the harness creates a fresh worktree, + runs the unit's planned command inside it, and tears the worktree + down on exit. The post-run patch (``git diff`` inside the worktree) + is captured by the subagent and written to + ``iter_dir/patches/.patch`` — matching the existing convention. + + This is the harness-managed replacement for the manual lifecycle + in ``create_experiment_worktree`` / ``remove_experiment_worktree``; + once #123 wires this runner into the parallel-arm path, the manual + code becomes vestigial. + """ + repo_path = Path(repo_path) + iter_dir = Path(iter_dir) + + def _run(unit): + # Imported lazily so the factory itself works on branches where + # parallel_arms hasn't landed yet (it stacks on this PR). + from orchestrator.parallel_arms import ArmUnitResult + results_dir = iter_dir / unit.relative_results_dir + results_dir.mkdir(parents=True, exist_ok=True) + patches_dir = iter_dir / "patches" + patches_dir.mkdir(parents=True, exist_ok=True) + patch_path = patches_dir / f"{unit.arm_id}.patch" + + prompt = ( + f"# Arm: {unit.arm_id} (seed {unit.seed})\n\n" + f"You are a subagent running one experiment unit in an isolated\n" + f"git worktree. **Do not modify files outside this worktree.**\n\n" + f"## Command\n```\n{unit.command}\n```\n\n" + f"## Results destination\n" + f"Write all output files to: `{results_dir}`\n\n" + f"## Patch capture\n" + f"Before exiting, run `git diff` in this worktree and write the\n" + f"output to `{patch_path}`. If there are no changes, create an\n" + f"empty file at that path.\n" + ) + + try: + result = sdk_runner( + prompt=prompt, + model=model, + cwd=repo_path, + max_turns=max_turns, + system_prompt=None, + settings_path=None, + event_log_path=None, + isolation="worktree", + subagent_type=subagent_type, + ) + except TypeError: + # Older runners don't accept isolation/subagent_type kwargs; + # fall back to the basic call signature. + result = sdk_runner( + prompt=prompt, model=model, cwd=repo_path, max_turns=max_turns, + ) + + if getattr(result, "is_error", False): + return ArmUnitResult( + unit=unit, status="failed", + duration_ms=int(getattr(result, "duration_ms", 0) or 0), + error=str(getattr(result, "error_message", "") or "sdk reported error"), + ) + + output_files = sorted( + str(p.relative_to(iter_dir)) + for p in results_dir.rglob("*") if p.is_file() + ) + return ArmUnitResult( + unit=unit, + status="complete", + duration_ms=int(getattr(result, "duration_ms", 0) or 0), + output_files=output_files, + ) + + return _run diff --git a/tests/test_parallel_arms.py b/tests/test_parallel_arms.py new file mode 100644 index 0000000..5a5a185 --- /dev/null +++ b/tests/test_parallel_arms.py @@ -0,0 +1,323 @@ +"""Behavioral tests for the parallel-arm orchestration (#123 Phase A + B).""" +from __future__ import annotations + +import json +from dataclasses import dataclass +from pathlib import Path + +import pytest + +from orchestrator.parallel_arms import ( + ArmUnit, + ArmUnitResult, + failed_units, + merge_unit_results, + partition_plan, + run_units, +) + + +@dataclass +class _LocalSDKResult: + """Local stand-in for SDKResult so this branch doesn't depend on + sdk_dispatch.py landing first. The real SDKResult is duck-compatible.""" + text: str = "" + duration_ms: int = 0 + is_error: bool = False + error_message: str = "" + + +# ─── Plan partitioning ───────────────────────────────────────────────────── + +class TestPartitionPlan: + + def test_single_arm_single_condition_default_seed(self): + plan = {"arms": [{ + "arm_id": "h-main", + "conditions": [{"name": "baseline", "command": "./blis run"}], + }]} + units = partition_plan(plan) + assert len(units) == 1 + assert units[0].arm_id == "h-main" + assert units[0].seed == "seed-1" + assert units[0].condition_name == "baseline" + assert units[0].command == "./blis run" + + def test_multi_seed_condition_fans_out(self): + plan = {"arms": [{ + "arm_id": "h-main", + "conditions": [{ + "name": "x", "command": "./run", + "seeds": ["s1", "s2", "s3"], + }], + }]} + units = partition_plan(plan) + assert len(units) == 3 + assert sorted(u.seed for u in units) == ["s1", "s2", "s3"] + + def test_multiple_arms_and_conditions(self): + plan = {"arms": [ + {"arm_id": "h-main", "conditions": [ + {"name": "a", "command": "./a"}, + {"name": "b", "command": "./b"}, + ]}, + {"arm_id": "h-ablation", "conditions": [ + {"name": "c", "command": "./c"}, + ]}, + ]} + units = partition_plan(plan) + assert len(units) == 3 + ids = sorted((u.arm_id, u.condition_name) for u in units) + assert ids == [("h-ablation", "c"), ("h-main", "a"), ("h-main", "b")] + + def test_relative_results_dir_does_not_overlap(self): + plan = {"arms": [{ + "arm_id": "h-main", + "conditions": [{ + "name": "x", "command": "./run", "seeds": ["s1", "s2"], + }], + }]} + units = partition_plan(plan) + dirs = {u.relative_results_dir for u in units} + assert len(dirs) == 2 # s1 and s2 land in different paths + + def test_skips_arms_without_command(self): + plan = {"arms": [{ + "arm_id": "h-main", + "conditions": [{"name": "no-cmd"}], + }]} + assert partition_plan(plan) == [] + + +# ─── Run units ───────────────────────────────────────────────────────────── + +class _RecordingRunner: + def __init__(self, statuses: dict[str, str] | None = None): + self.calls: list[ArmUnit] = [] + self.statuses = statuses or {} + + def __call__(self, unit: ArmUnit) -> ArmUnitResult: + self.calls.append(unit) + status = self.statuses.get(unit.arm_id, "complete") + return ArmUnitResult( + unit=unit, status=status, duration_ms=100, + output_files=[f"{unit.relative_results_dir}/out.json"], + ) + + +class TestRunUnits: + + def test_results_returned_in_input_order(self): + units = [ + ArmUnit("h-main", "s1", "x", "./a"), + ArmUnit("h-main", "s2", "x", "./a"), + ArmUnit("h-ablation", "s1", "y", "./b"), + ] + runner = _RecordingRunner() + results = run_units(units, runner=runner) + assert [r.unit.seed for r in results] == ["s1", "s2", "s1"] + + def test_runner_exception_becomes_failed_unit(self): + units = [ArmUnit("h-main", "s1", "x", "./a")] + + def crash(_): + raise RuntimeError("boom") + + results = run_units(units, runner=crash) + assert results[0].status == "failed" + assert "boom" in results[0].error + assert "RuntimeError" in results[0].error + + def test_max_parallel_must_be_positive(self): + with pytest.raises(ValueError): + run_units([], runner=_RecordingRunner(), max_parallel=0) + + +# ─── Merge ───────────────────────────────────────────────────────────────── + +class TestMergeUnitResults: + + def _results(self) -> list[ArmUnitResult]: + return [ + ArmUnitResult( + unit=ArmUnit("h-main", "s1", "x", "./a"), + status="complete", duration_ms=100, + output_files=["results/h-main/s1/out.json"], + ), + ArmUnitResult( + unit=ArmUnit("h-main", "s2", "x", "./a"), + status="complete", duration_ms=120, + output_files=["results/h-main/s2/out.json"], + ), + ArmUnitResult( + unit=ArmUnit("h-ablation", "s1", "y", "./b"), + status="failed", error="exit 1", + ), + ] + + def test_arms_grouped_by_arm_id(self): + out = merge_unit_results(self._results()) + ids = [a["arm_id"] for a in out["arms"]] + # Sorted for determinism. + assert ids == ["h-ablation", "h-main"] + + def test_arm_status_failed_when_any_unit_failed(self): + out = merge_unit_results(self._results()) + by_id = {a["arm_id"]: a for a in out["arms"]} + assert by_id["h-ablation"]["status"] == "failed" + assert by_id["h-main"]["status"] == "complete" + + def test_failed_count_correct(self): + out = merge_unit_results(self._results()) + assert out["failed_unit_count"] == 1 + assert out["total_unit_count"] == 3 + + def test_byte_equal_across_repeated_calls(self): + a = json.dumps(merge_unit_results(self._results()), sort_keys=True) + b = json.dumps(merge_unit_results(self._results()), sort_keys=True) + assert a == b + + def test_units_within_arm_sorted_by_seed_and_condition(self): + results = [ + ArmUnitResult(unit=ArmUnit("h-main", "s2", "b", "./x"), status="complete"), + ArmUnitResult(unit=ArmUnit("h-main", "s1", "a", "./x"), status="complete"), + ArmUnitResult(unit=ArmUnit("h-main", "s1", "b", "./x"), status="complete"), + ] + out = merge_unit_results(results) + seeds = [u["seed"] for u in out["arms"][0]["units"]] + conds = [u["condition"] for u in out["arms"][0]["units"]] + assert list(zip(seeds, conds)) == [("s1", "a"), ("s1", "b"), ("s2", "b")] + + +# ─── Partial-retry helper ────────────────────────────────────────────────── + +class TestFailedUnits: + + def test_returns_only_failed_units(self): + results = [ + ArmUnitResult(unit=ArmUnit("h-main", "s1", "x", "./a"), status="complete"), + ArmUnitResult(unit=ArmUnit("h-main", "s2", "x", "./a"), status="failed"), + ArmUnitResult(unit=ArmUnit("h-ablation", "s1", "y", "./b"), status="failed"), + ] + failed = failed_units(results) + assert len(failed) == 2 + assert all(r.arm_id != "h-main" or r.seed == "s2" for r in failed) + + +# ─── Phase B: end-to-end with the harness-isolated SDK runner ───────────── + + +class TestEndToEndWithIsolatedRunner: + """The full chain: partition_plan -> make_isolated_arm_runner -> + run_units -> merge_unit_results. The SDK side is injected via a + fake; per the no-live-LLM policy (CLAUDE.md), no real subagent is + spawned. The test asserts the orchestration contract — every unit + is dispatched with isolation=worktree to a non-overlapping results + dir, failures are isolated, and the merged output is deterministic. + """ + + def _plan(self): + return {"arms": [ + {"arm_id": "h-main", "conditions": [ + {"name": "x", "command": "./run --arm main"}, + ]}, + {"arm_id": "h-ablation", "conditions": [ + {"name": "y", "command": "./run --arm ablation", + "seeds": ["s1", "s2"]}, + ]}, + ]} + + def _success_runner(self): + SDKResult = _LocalSDKResult # noqa: N806 + + sdk_calls: list[dict] = [] + + def sdk_runner(**kwargs): + sdk_calls.append(kwargs) + prompt = kwargs.get("prompt", "") + # Simulate the subagent writing a file in its results dir. + for line in prompt.splitlines(): + if line.startswith("Write all output files to:"): + target = line.split("`", 1)[1].rstrip("`") + Path(target).mkdir(parents=True, exist_ok=True) + (Path(target) / "out.json").write_text("{}") + return SDKResult(text="done", duration_ms=120) + + return sdk_runner, sdk_calls + + def test_three_units_dispatched_with_isolation_kwarg(self, tmp_path): + from orchestrator.worktree import make_isolated_arm_runner + + iter_dir = tmp_path / "iter-1" + iter_dir.mkdir(parents=True) + sdk_runner, sdk_calls = self._success_runner() + + runner = make_isolated_arm_runner( + sdk_runner=sdk_runner, repo_path=tmp_path, iter_dir=iter_dir, + ) + units = partition_plan(self._plan()) + assert len(units) == 3 + + results = run_units(units, runner=runner) + assert len(sdk_calls) == 3 + assert all(c.get("isolation") == "worktree" for c in sdk_calls) + + merged = merge_unit_results(results) + assert [a["arm_id"] for a in merged["arms"]] == ["h-ablation", "h-main"] + assert all(a["status"] == "complete" for a in merged["arms"]) + + def test_partial_failure_isolated_to_one_arm(self, tmp_path): + from orchestrator.worktree import make_isolated_arm_runner + SDKResult = _LocalSDKResult # noqa: N806 + + iter_dir = tmp_path / "iter-1" + iter_dir.mkdir(parents=True) + + def sdk_runner(**kwargs): + prompt = kwargs.get("prompt", "") + if "h-ablation" in prompt: + return SDKResult( + text="", is_error=True, error_message="exit 1", + ) + for line in prompt.splitlines(): + if line.startswith("Write all output files to:"): + target = line.split("`", 1)[1].rstrip("`") + Path(target).mkdir(parents=True, exist_ok=True) + (Path(target) / "out.json").write_text("{}") + return SDKResult(text="ok") + + runner = make_isolated_arm_runner( + sdk_runner=sdk_runner, repo_path=tmp_path, iter_dir=iter_dir, + ) + merged = merge_unit_results( + run_units(partition_plan(self._plan()), runner=runner) + ) + by_arm = {a["arm_id"]: a for a in merged["arms"]} + assert by_arm["h-main"]["status"] == "complete" + assert by_arm["h-ablation"]["status"] == "failed" + assert merged["failed_unit_count"] == 2 + assert merged["total_unit_count"] == 3 + + def test_no_two_units_share_results_dir(self, tmp_path): + from orchestrator.worktree import make_isolated_arm_runner + + iter_dir = tmp_path / "iter-1" + iter_dir.mkdir(parents=True) + sdk_runner, _ = self._success_runner() + seen_dirs: list[str] = [] + + def capturing(**kwargs): + for line in kwargs.get("prompt", "").splitlines(): + if line.startswith("Write all output files to:"): + seen_dirs.append(line.split("`", 1)[1].rstrip("`")) + return sdk_runner(**kwargs) + + runner = make_isolated_arm_runner( + sdk_runner=capturing, repo_path=tmp_path, iter_dir=iter_dir, + ) + run_units(partition_plan(self._plan()), runner=runner) + + # Acceptance criterion: no two subagents ever write to the same + # results path. + assert len(seen_dirs) == 3 + assert len(set(seen_dirs)) == 3 diff --git a/tests/test_worktree_gc.py b/tests/test_worktree_gc.py new file mode 100644 index 0000000..60ebaed --- /dev/null +++ b/tests/test_worktree_gc.py @@ -0,0 +1,198 @@ +"""Behavioral tests for orphan-worktree GC (#133 Phase A). + +Synthesizes ``/.nous-experiments/`` directories with controlled +mtimes and PID files, calls gc_orphan_worktrees, asserts which were +removed. Tests inject a fake clock + fake pid_check so they're +deterministic across machines. +""" +from __future__ import annotations + +import os +import subprocess +from pathlib import Path + +from orchestrator.worktree import gc_orphan_worktrees + + +def _init_git_repo(repo: Path) -> None: + repo.mkdir(parents=True, exist_ok=True) + subprocess.run(["git", "init", "-q"], cwd=repo, check=True) + subprocess.run(["git", "config", "user.email", "t@t"], cwd=repo, check=True) + subprocess.run(["git", "config", "user.name", "t"], cwd=repo, check=True) + (repo / "f.txt").write_text("x") + subprocess.run(["git", "add", "."], cwd=repo, check=True, capture_output=True) + subprocess.run( + ["git", "commit", "-q", "-m", "init"], cwd=repo, check=True, + capture_output=True, + ) + + +def _make_worktree_dir( + repo: Path, exp_id: str, *, mtime: float, pid: int | None = None, +) -> Path: + d = repo / ".nous-experiments" / exp_id + d.mkdir(parents=True, exist_ok=True) + (d / "marker").write_text("x") + if pid is not None: + (d / ".nous-pid").write_text(str(pid)) + os.utime(d, (mtime, mtime)) + return d + + +class TestGcOrphanWorktrees: + + def test_no_experiments_dir_returns_empty(self, tmp_path): + _init_git_repo(tmp_path) + assert gc_orphan_worktrees(tmp_path) == [] + + def test_removes_old_worktree_with_no_pid_file(self, tmp_path): + _init_git_repo(tmp_path) + old_mtime = 1000.0 # well in the past + _make_worktree_dir(tmp_path, "iter-1-aaaa", mtime=old_mtime) + + removed = gc_orphan_worktrees( + tmp_path, max_age_seconds=60, now=old_mtime + 3600, + ) + + assert removed == ["iter-1-aaaa"] + assert not (tmp_path / ".nous-experiments" / "iter-1-aaaa").exists() + + def test_keeps_recent_worktree(self, tmp_path): + _init_git_repo(tmp_path) + recent = 5000.0 + _make_worktree_dir(tmp_path, "iter-2-bbbb", mtime=recent) + + removed = gc_orphan_worktrees( + tmp_path, max_age_seconds=3600, now=recent + 30, + ) + + assert removed == [] + assert (tmp_path / ".nous-experiments" / "iter-2-bbbb").exists() + + def test_keeps_old_worktree_when_pid_alive(self, tmp_path): + _init_git_repo(tmp_path) + old = 1000.0 + _make_worktree_dir(tmp_path, "iter-3-cccc", mtime=old, pid=12345) + + # Inject an "always alive" pid_check; the dir should be kept + # despite being older than max_age_seconds. + removed = gc_orphan_worktrees( + tmp_path, max_age_seconds=60, now=old + 3600, + pid_check=lambda pid: True, + ) + + assert removed == [] + assert (tmp_path / ".nous-experiments" / "iter-3-cccc").exists() + + def test_removes_old_worktree_when_pid_dead(self, tmp_path): + _init_git_repo(tmp_path) + old = 1000.0 + _make_worktree_dir(tmp_path, "iter-4-dddd", mtime=old, pid=12345) + + removed = gc_orphan_worktrees( + tmp_path, max_age_seconds=60, now=old + 3600, + pid_check=lambda pid: False, + ) + + assert removed == ["iter-4-dddd"] + assert not (tmp_path / ".nous-experiments" / "iter-4-dddd").exists() + + def test_invalid_pid_file_treated_as_no_pid(self, tmp_path): + _init_git_repo(tmp_path) + old = 1000.0 + d = _make_worktree_dir(tmp_path, "iter-5-eeee", mtime=old) + (d / ".nous-pid").write_text("not-an-int") + os.utime(d, (old, old)) + + removed = gc_orphan_worktrees( + tmp_path, max_age_seconds=60, now=old + 3600, + ) + assert removed == ["iter-5-eeee"] + + def test_multiple_worktrees_partial_removal_is_sorted(self, tmp_path): + _init_git_repo(tmp_path) + old = 1000.0 + recent = 5000.0 + _make_worktree_dir(tmp_path, "iter-1-aaaa", mtime=old) + _make_worktree_dir(tmp_path, "iter-2-bbbb", mtime=recent) + _make_worktree_dir(tmp_path, "iter-3-cccc", mtime=old) + + removed = gc_orphan_worktrees( + tmp_path, max_age_seconds=60, now=recent + 30, + ) + # recent (iter-2) should still exist; old ones gone. + assert removed == ["iter-1-aaaa", "iter-3-cccc"] + assert (tmp_path / ".nous-experiments" / "iter-2-bbbb").exists() + + def test_zero_leftover_worktrees_after_gc_for_age_match(self, tmp_path): + """Acceptance criterion: /.nous-experiments/ has zero + leftover entries after a multi-arm campaign that GC'd everything.""" + _init_git_repo(tmp_path) + old = 1000.0 + for i in range(5): + _make_worktree_dir(tmp_path, f"iter-{i}-x", mtime=old) + + gc_orphan_worktrees(tmp_path, max_age_seconds=60, now=old + 3600) + + leftovers = [ + p for p in (tmp_path / ".nous-experiments").iterdir() if p.is_dir() + ] + assert leftovers == [] + + +# ─── Phase B: harness-isolated subagent runner factory ───────────────────── + + +class TestMakeIsolatedArmRunner: + """The factory returns an ArmRunner-shaped callable that delegates to + the injected sdk_runner with isolation=worktree. Tests assert what + the runner sends to the SDK and how it interprets the response — + never that internal helpers were called.""" + + def _unit(self): + # Local stand-in for parallel_arms.ArmUnit so this test runs on + # the #133 branch before #123's parallel_arms.py lands. The real + # ArmUnit is duck-compatible with this shape. + from dataclasses import dataclass + + @dataclass(frozen=True) + class _Unit: + arm_id: str + seed: str + condition_name: str + command: str + + @property + def relative_results_dir(self) -> str: + return f"results/{self.arm_id}/{self.seed}" + + return _Unit("h-main", "s1", "x", "./blis run") + + def test_returns_callable(self, tmp_path): + try: + from orchestrator.parallel_arms import ArmUnit # noqa: F401 + except ImportError: + import pytest + pytest.skip("parallel_arms not on this branch yet (lands in #123)") + from orchestrator.worktree import make_isolated_arm_runner + + runner = make_isolated_arm_runner( + sdk_runner=lambda **kw: None, + repo_path=tmp_path, + iter_dir=tmp_path / "iter-1", + ) + assert callable(runner) + + def test_factory_accepts_documented_kwargs(self, tmp_path): + """The factory's keyword surface is the public contract.""" + from orchestrator.worktree import make_isolated_arm_runner + # Just verify the signature accepts what the docstring promises; + # construction must not raise. + make_isolated_arm_runner( + sdk_runner=lambda **kw: None, + repo_path=tmp_path, + iter_dir=tmp_path, + model="claude-sonnet-4-6", + max_turns=10, + subagent_type="claude", + )