diff --git a/docs/architecture.md b/docs/architecture.md index f5e162b..5a2e691 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -110,7 +110,8 @@ Both agents write artifacts directly to the campaign directory (`iter_dir`) and **Implementations:** - `StubDispatcher` (`dispatch.py`) produces valid, schema-conformant artifacts without calling any LLM. Used for testing the orchestrator loop. -- `CLIDispatcher` (`cli_dispatch.py`) invokes `claude -p` as a subprocess, giving agents code access and shell tools. Agents write files directly to `iter_dir`. Supports `override_cwd()` context manager for pointing the executor at a git worktree. +- `CLIDispatcher` (`cli_dispatch.py`) invokes `claude -p` as a subprocess, giving agents code access and shell tools. Agents write files directly to `iter_dir`. Supports `override_cwd()` context manager for pointing the executor at a git worktree. Selected via `--agent api`. +- `SDKDispatcher` (`sdk_dispatch.py`) calls the Claude Agent SDK (`claude-agent-sdk`) instead of spawning a subprocess. Same artifact and metrics contract as `CLIDispatcher`; gains native streaming, programmatic prompt caching, and message-level retry. Selected via `--agent sdk`. Requires the optional `sdk` install extra (`pip install -e ".[sdk]"`). Inherits parse / validate / retry-with-feedback machinery from `CLIDispatcher` — only the transport changes. **Dispatch interface:** ```python @@ -122,7 +123,7 @@ dispatcher.dispatch( ) ``` -Both dispatchers share the same interface — `CLIDispatcher` extends `LLMDispatcher`. +All three dispatchers share the same interface. `CLIDispatcher` extends `LLMDispatcher`; `SDKDispatcher` extends `CLIDispatcher` and overrides only `_call_claude` and `preflight_check`. ## CLI Dispatch diff --git a/orchestrator/campaign.py b/orchestrator/campaign.py index 2ba6a84..99b7e15 100644 --- a/orchestrator/campaign.py +++ b/orchestrator/campaign.py @@ -206,15 +206,24 @@ def run_campaign( HumanGate(auto_response="approve") if auto_approve else HumanGate() ) - # Pre-flight: validate CLI + credentials before starting the campaign + # Pre-flight: validate CLI + credentials before starting the campaign. + # SDK mode pre-flights via claude-agent-sdk import; API mode via claude CLI. repo_path = campaign.get("target_system", {}).get("repo_path") if agent != "inline" and repo_path: - from orchestrator.cli_dispatch import CLIDispatcher - preflight_dispatcher = CLIDispatcher( - work_dir=work_dir, campaign=campaign, - model=_resolve_model(campaign, "design", model), - max_retries=max_cli_retries, - ) + if agent == "sdk": + from orchestrator.sdk_dispatch import SDKDispatcher + preflight_dispatcher = SDKDispatcher( + work_dir=work_dir, campaign=campaign, + model=_resolve_model(campaign, "design", model), + max_retries=max_cli_retries, + ) + else: + from orchestrator.cli_dispatch import CLIDispatcher + preflight_dispatcher = CLIDispatcher( + work_dir=work_dir, campaign=campaign, + model=_resolve_model(campaign, "design", model), + max_retries=max_cli_retries, + ) preflight_dispatcher.preflight_check() start_iter = _resume_completed_campaign(work_dir, max_iterations) @@ -353,7 +362,7 @@ def main() -> None: help="Timeout in seconds for claude -p calls (default: 1800)") parser.add_argument("--max-cli-retries", type=int, default=10, help="Max retries for claude -p failures (-1 = unbounded, default: 10)") - parser.add_argument("--agent", choices=["inline", "api"], default="api", + parser.add_argument("--agent", choices=["inline", "api", "sdk"], default="api", help="Dispatch backend: 'inline' emits prompts to stdout for the " "calling agent (no subprocess, no API key), " "'api' uses the LLM API (default: api)") diff --git a/orchestrator/cli.py b/orchestrator/cli.py index 755e9d9..4cb7e2c 100644 --- a/orchestrator/cli.py +++ b/orchestrator/cli.py @@ -310,7 +310,7 @@ def main(): p_run.add_argument("--auto-approve", action="store_true") p_run.add_argument("--timeout", type=int, default=1800) p_run.add_argument("--max-cli-retries", type=int, default=10) - p_run.add_argument("--agent", choices=["inline", "api"], default="api") + p_run.add_argument("--agent", choices=["inline", "api", "sdk"], default="api") p_run.set_defaults(func=_cmd_run) p_resume = subparsers.add_parser("resume") @@ -320,7 +320,7 @@ def main(): p_resume.add_argument("--auto-approve", action="store_true") p_resume.add_argument("--timeout", type=int, default=1800) p_resume.add_argument("--max-cli-retries", type=int, default=10) - p_resume.add_argument("--agent", choices=["inline", "api"], default="api") + p_resume.add_argument("--agent", choices=["inline", "api", "sdk"], default="api") p_resume.set_defaults(func=_cmd_resume) p_validate = subparsers.add_parser("validate") @@ -340,7 +340,7 @@ def main(): p_report.add_argument("target") p_report.add_argument("--model") p_report.add_argument("--timeout", type=int, default=1800) - p_report.add_argument("--agent", choices=["inline", "api"], default="api") + p_report.add_argument("--agent", choices=["inline", "api", "sdk"], default="api") p_report.set_defaults(func=_cmd_report) p_replay = subparsers.add_parser("replay") diff --git a/orchestrator/explore_design.py b/orchestrator/explore_design.py new file mode 100644 index 0000000..4d037d3 --- /dev/null +++ b/orchestrator/explore_design.py @@ -0,0 +1,257 @@ +"""Explore-then-synthesize DESIGN phase (issue #132). + +DESIGN today asks one Opus session to do two things at once: + + 1. Read the codebase to map metrics, knobs, prior findings, principles. + 2. Synthesize a hypothesis bundle from what it found. + +That's the canonical Claude-Code-pattern miss: broad exploration + small +synthesis is exactly what parallel Explore subagents are for. Phase A +of #132 ships the orchestration layer that makes the split possible +without changing what gets produced (problem.md + bundle.yaml). + +Stage A — parallel Explore: ``run_explore_stage(campaign, scopes, +runner)`` fans out one read-only subagent per scope and collects their +reports. + +Stage B — Opus synthesis: ``build_synthesis_prompt(reports, campaign, +iteration)`` produces the prompt body for the single Opus call that +turns the explorer reports + principles.json into problem.md + +bundle.yaml. + +Phase A is the orchestration helpers + their behavioral tests. The +dispatcher integration (SDKDispatcher spawning Explore subagents, +threading reports back into a synthesis call) lands in Phase B once +#121 merges and the team picks injection points. +""" +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Callable, Iterable + +# Default exploration scopes — one Explore subagent per scope. The +# scopes are deliberately overlapping a little so synthesis has +# redundant signal where it matters. +DEFAULT_EXPLORE_SCOPES: tuple[str, ...] = ( + "metrics", # observable metrics + how they're collected + "knobs", # controllable knobs + their value ranges + "prior_findings", # findings.json from previous iterations + "principles", # principles.json across the campaign + others +) + + +@dataclass +class ExploreReport: + scope: str + text: str + duration_ms: int = 0 + input_tokens: int = 0 + output_tokens: int = 0 + + def as_dict(self) -> dict: + return { + "scope": self.scope, + "text": self.text, + "duration_ms": self.duration_ms, + "input_tokens": self.input_tokens, + "output_tokens": self.output_tokens, + } + + +@dataclass +class ExploreStageResult: + reports: list[ExploreReport] = field(default_factory=list) + + @property + def total_input_tokens(self) -> int: + return sum(r.input_tokens for r in self.reports) + + @property + def total_output_tokens(self) -> int: + return sum(r.output_tokens for r in self.reports) + + def by_scope(self, scope: str) -> ExploreReport | None: + for r in self.reports: + if r.scope == scope: + return r + return None + + +def build_explore_prompt(scope: str, campaign: dict) -> str: + """Construct a read-only Explore subagent prompt for one scope. + + The subagent should be spawned with ``subagent_type="Explore"`` so + it cannot mutate the worktree. The prompt is short and scope-tight + on purpose; the synthesis call (Stage B) is where multi-aspect + integration happens. + """ + target = campaign.get("target_system", {}) + name = target.get("name", "the target system") + repo = target.get("repo_path", "(repo not configured)") + + if scope == "metrics": + focus = ( + "Map the observable metrics this system exposes and how they " + "are collected. Include the file/function where each metric is " + "computed." + ) + elif scope == "knobs": + focus = ( + "Map the controllable knobs / configuration parameters this " + "system exposes. For each knob, note its declared range and the " + "code path that consumes it." + ) + elif scope == "prior_findings": + focus = ( + "Read prior runs/iter-*/findings.json files in the campaign " + "directory. Summarize confirmed/refuted hypotheses and any open " + "questions surfaced by the most recent iteration." + ) + elif scope == "principles": + focus = ( + "Read principles.json in this campaign and any sibling campaigns " + "(via the campaign_index module if available). Flag principles " + "that touch the same mechanism we're about to design for." + ) + else: + focus = f"Investigate the '{scope}' aspect of the target system." + + return ( + f"# Explore: {scope}\n\n" + f"You are a read-only Explore subagent. **Do not modify any files.**\n" + f"Target: {name} (repo at {repo})\n\n" + f"## Focus\n{focus}\n\n" + f"## Output\n" + f"Return a markdown report of <= 500 lines. Cite file paths and " + f"line numbers. End with a one-paragraph summary the synthesizer " + f"can read in isolation.\n" + ) + + +ExploreRunner = Callable[[str, str, dict], ExploreReport] +"""Callable signature for running one Explore subagent. + +Takes (scope, prompt, campaign) and returns an ExploreReport. The +default real-world implementation spawns subagent_type="Explore" via +the SDK and reads the assistant's final text. Tests inject a deterministic +fake. +""" + + +def run_explore_stage( + campaign: dict, + *, + scopes: Iterable[str] = DEFAULT_EXPLORE_SCOPES, + runner: ExploreRunner, +) -> ExploreStageResult: + """Run one Explore subagent per scope and collect their reports. + + Phase A executes synchronously over the runner. Real parallel + fan-out (anyio gather over the SDK's async API) lands in Phase B + when the SDK runner ships its async surface. + """ + reports: list[ExploreReport] = [] + for scope in scopes: + prompt = build_explore_prompt(scope, campaign) + report = runner(scope, prompt, campaign) + reports.append(report) + return ExploreStageResult(reports=reports) + + +def make_sdk_explore_runner( + *, + sdk_runner: Callable, + cwd: Path | None = None, + model: str = "claude-haiku-4-5", + max_turns: int = 8, +) -> ExploreRunner: + """Build an ExploreRunner backed by an SDK subagent (#132 Phase B). + + Each scope spawns a read-only subagent (``subagent_type="Explore"``) + so the orchestrator gets parallel mapping without a giant Opus + session doing both walking and synthesis. Per the no-live-LLM + project principle (CLAUDE.md), this factory takes an injected + ``sdk_runner`` — production wiring constructs the real Anthropic + SDK runner; tests inject a recording fake. + + Defaults model to Haiku because read-only mapping is cheap and + benefits from speed over depth; deep synthesis happens in Stage B + (the single Opus call), not in Stage A. + """ + def _run(scope: str, prompt: str, campaign: dict) -> ExploreReport: + try: + result = sdk_runner( + prompt=prompt, + model=model, + cwd=cwd, + max_turns=max_turns, + system_prompt=None, + settings_path=None, + event_log_path=None, + subagent_type="Explore", + ) + except TypeError: + # Older runners without subagent_type — fall back to the + # base signature so the factory stays compatible across + # SDK API evolution. + result = sdk_runner( + prompt=prompt, model=model, cwd=cwd, max_turns=max_turns, + ) + + return ExploreReport( + scope=scope, + text=getattr(result, "text", "") or "", + duration_ms=int(getattr(result, "duration_ms", 0) or 0), + input_tokens=int(getattr(result, "input_tokens", 0) or 0), + output_tokens=int(getattr(result, "output_tokens", 0) or 0), + ) + + return _run + + +def build_synthesis_prompt( + stage_a: ExploreStageResult, + *, + campaign: dict, + iteration: int, + iter_dir: Path, +) -> str: + """Build the Opus synthesis prompt that turns Explore reports into + problem.md + bundle.yaml. + + The synthesizer never reads the codebase directly — it consumes only + the explorer reports + principles.json. That's the whole point of + the split: Opus on integration, not on file walks. + """ + target = campaign.get("target_system", {}) + rq = campaign.get("research_question", "(not set)") + + sections = [ + f"# Synthesize iteration {iteration}", + "", + "Four read-only Explore subagents have already mapped the system.", + "**Do not re-read the codebase.** Synthesize from the reports below.", + "", + f"## Research question\n{rq}", + "", + f"## Target\n{target.get('name', '?')} — {target.get('description', '')}", + "", + "## Explorer reports", + ] + for report in stage_a.reports: + sections.append("") + sections.append(f"### {report.scope}\n") + sections.append(report.text) + + sections.extend([ + "", + "## Required outputs", + f"- {iter_dir}/problem.md (markdown)", + f"- {iter_dir}/bundle.yaml (YAML, must validate against bundle.schema.yaml)", + "", + "Cite explorer reports by their `### ` heading when justifying " + "design choices. The reports are the source of truth for this " + "iteration's design.", + ]) + return "\n".join(sections) diff --git a/orchestrator/iteration.py b/orchestrator/iteration.py index 29e9712..2f5ac10 100644 --- a/orchestrator/iteration.py +++ b/orchestrator/iteration.py @@ -281,9 +281,15 @@ def _max_turns_for(phase_key: str) -> int: cli_dispatcher = inline_dispatcher llm_dispatcher = inline_dispatcher else: - # API mode: CLIDispatcher for code-access roles only (when repo_path is set) + # API or SDK mode: code-access dispatcher only when repo_path is set. + # SDK uses claude-agent-sdk; api uses the claude -p subprocess (CLIDispatcher). + if agent == "sdk": + from orchestrator.sdk_dispatch import SDKDispatcher + code_dispatcher_cls = SDKDispatcher + else: + code_dispatcher_cls = CLIDispatcher cli_dispatcher = ( - CLIDispatcher( + code_dispatcher_cls( work_dir=work_dir, campaign=campaign, model=_model_for("design"), timeout=timeout, max_turns=_max_turns_for("design"), @@ -493,7 +499,7 @@ def main() -> None: help="Timeout in seconds for claude -p calls (default: 1800)") parser.add_argument("--max-cli-retries", type=int, default=10, help="Max retries for claude -p failures (-1 = unbounded, default: 10)") - parser.add_argument("--agent", choices=["inline", "api"], default="api", + parser.add_argument("--agent", choices=["inline", "api", "sdk"], default="api", help="Dispatch backend: 'inline' emits prompts to stdout for the " "calling agent, 'api' uses the LLM API (default: api)") parser.add_argument("-v", "--verbose", action="store_true", diff --git a/orchestrator/sdk_dispatch.py b/orchestrator/sdk_dispatch.py new file mode 100644 index 0000000..020a0f0 --- /dev/null +++ b/orchestrator/sdk_dispatch.py @@ -0,0 +1,355 @@ +"""SDK-based agent dispatch for the Nous orchestrator. + +Calls the Claude Agent SDK in place of `claude -p` subprocess. Same +artifact and metrics contract as :class:`orchestrator.cli_dispatch.CLIDispatcher`; +this class swaps the transport without changing the orchestrator's contract +with the rest of Nous. + +Why SDK over `claude -p`: + * Native streaming → fast progress visibility (#127). + * Programmatic prompt caching → token savings (#122). + * Native subagent spawning → parallel arms without manual fork/join (#123). + * Message-level retry instead of subprocess restart. + +Design decisions worth knowing: + + * The actual SDK call is delegated to a ``sdk_runner`` callable. The + default lazily resolves to a real ``claude_agent_sdk`` runner; tests + inject a deterministic fake. The runner returns an ``SDKResult`` + (text + usage + cost + error flag); the dispatcher's job is to turn + that into on-disk artifacts and a metrics row, with retry on transient + failure. This keeps tests behavioral — they assert what's on disk, + not which method we called. + * Inherits from CLIDispatcher to reuse the parse/validate/retry-with-feedback + machinery used for fenced-output phases (gate summaries, etc.). +""" +from __future__ import annotations + +import logging +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Callable, Protocol, runtime_checkable + +from orchestrator.cli_dispatch import CLIDispatcher, _backoff_for +from orchestrator.metrics import log_metrics, log_retry_event + +logger = logging.getLogger(__name__) + + +class SDKTransientError(RuntimeError): + """Runner raises this for retryable transport-level failures.""" + + +@dataclass +class SDKResult: + """One SDK call's outcome. + + The dispatcher reads only these fields. Producers (real or fake) must + populate ``text`` (assistant final text); usage/cost fields default + to zero so trivial fakes need not set them. + """ + + text: str + input_tokens: int = 0 + output_tokens: int = 0 + cache_read_input_tokens: int = 0 + cache_creation_input_tokens: int = 0 + cost_usd: float = 0.0 + duration_ms: int = 0 + num_turns: int = 1 + is_error: bool = False + error_message: str = "" + extra: dict = field(default_factory=dict) + + +@runtime_checkable +class SDKRunner(Protocol): + """A callable that performs one SDK turn and returns an ``SDKResult``. + + Raise :class:`SDKTransientError` for retryable failures (network blips, + rate limits, mid-stream disconnect). Return ``SDKResult(is_error=True, + error_message=...)`` for API-reported errors that should also be retried. + Other exceptions bubble up as fatal. + """ + + def __call__( + self, + *, + prompt: str, + model: str, + cwd: Path | None, + max_turns: int, + system_prompt: str | None = None, + settings_path: Path | None = None, + ) -> SDKResult: + ... + + +def _default_sdk_runner_factory() -> SDKRunner: + """Return a runner that calls the real ``claude_agent_sdk``. + + Resolved lazily so that tests (and environments without the SDK + installed) don't fail at import time. + """ + + def _runner( + *, + prompt: str, + model: str, + cwd: Path | None, + max_turns: int, + system_prompt: str | None = None, + settings_path: Path | None = None, + ) -> SDKResult: + try: + import anyio + from claude_agent_sdk import ( # type: ignore[import-not-found] + ClaudeAgentOptions, + query, + ) + except ImportError as exc: + raise RuntimeError( + "claude-agent-sdk is not installed. " + "Install with `pip install claude-agent-sdk` or use --agent api." + ) from exc + + async def _run() -> SDKResult: + options = ClaudeAgentOptions( + model=model, + cwd=str(cwd) if cwd else None, + max_turns=max_turns, + system_prompt=system_prompt, + settings=str(settings_path) if settings_path else None, + ) + text_chunks: list[str] = [] + usage: dict = {} + cost_usd = 0.0 + duration_ms = 0 + num_turns = 0 + t0 = time.time() + async for message in query(prompt=prompt, options=options): + cls = type(message).__name__ + if cls == "AssistantMessage": + for block in getattr(message, "content", []): + if hasattr(block, "text"): + text_chunks.append(block.text) + elif cls == "ResultMessage": + usage = getattr(message, "usage", {}) or {} + cost_usd = float(getattr(message, "total_cost_usd", 0.0) or 0.0) + duration_ms = int(getattr(message, "duration_ms", 0) or 0) + num_turns = int(getattr(message, "num_turns", 0) or 0) + if getattr(message, "is_error", False): + return SDKResult( + text="".join(text_chunks), + error_message=str(getattr(message, "result", "unknown")), + is_error=True, + input_tokens=int(usage.get("input_tokens", 0) or 0), + output_tokens=int(usage.get("output_tokens", 0) or 0), + cache_read_input_tokens=int( + usage.get("cache_read_input_tokens", 0) or 0 + ), + cache_creation_input_tokens=int( + usage.get("cache_creation_input_tokens", 0) or 0 + ), + cost_usd=cost_usd, + duration_ms=duration_ms, + num_turns=num_turns, + ) + return SDKResult( + text="".join(text_chunks), + input_tokens=int(usage.get("input_tokens", 0) or 0), + output_tokens=int(usage.get("output_tokens", 0) or 0), + cache_read_input_tokens=int( + usage.get("cache_read_input_tokens", 0) or 0 + ), + cache_creation_input_tokens=int( + usage.get("cache_creation_input_tokens", 0) or 0 + ), + cost_usd=cost_usd, + duration_ms=duration_ms or int((time.time() - t0) * 1000), + num_turns=num_turns or 1, + ) + + try: + return anyio.run(_run) + except Exception as exc: + cls_name = type(exc).__name__ + transient_signals = ( + "ConnectionError", + "ReadTimeout", + "WriteTimeout", + "RemoteProtocolError", + "ServerDisconnectedError", + "TimeoutError", + ) + if any(sig in cls_name for sig in transient_signals): + raise SDKTransientError(f"{cls_name}: {exc}") from exc + raise + + return _runner + + +class SDKDispatcher(CLIDispatcher): + """Dispatch agent roles via the Claude Agent SDK. + + Inherits dispatch() / parse / retry-with-feedback / route logic from + :class:`CLIDispatcher`. Overrides ``_call_claude`` to use the SDK + runner instead of a subprocess, and ``preflight_check`` to verify + the SDK package is importable. + """ + + def __init__( + self, + work_dir: Path, + campaign: dict, + model: str = "claude-sonnet-4-6", + prompts_dir: Path | None = None, + timeout: int = 1800, + max_turns: int = 25, + max_retries: int | None = 10, + sdk_runner: Callable | None = None, + system_prompt: str | None = None, + settings_path: Path | None = None, + ) -> None: + super().__init__( + work_dir=work_dir, + campaign=campaign, + model=model, + prompts_dir=prompts_dir, + timeout=timeout, + max_turns=max_turns, + max_retries=max_retries, + ) + self._sdk_runner = sdk_runner or _default_sdk_runner_factory() + self._system_prompt = system_prompt + self._settings_path = settings_path + + # ------------------------------------------------------------------ + # Pre-flight + # ------------------------------------------------------------------ + + def preflight_check(self) -> None: + """Verify the SDK is reachable before starting a campaign.""" + try: + import claude_agent_sdk # type: ignore[import-not-found] # noqa: F401 + except ImportError as exc: + raise RuntimeError( + "Pre-flight check failed: claude-agent-sdk is not installed. " + "Install with `pip install claude-agent-sdk`, or pass --agent api " + "to use the OpenAI-compatible path instead." + ) from exc + logger.info("SDK pre-flight check passed (model=%s)", self.model) + + # ------------------------------------------------------------------ + # Core call with retry + # ------------------------------------------------------------------ + + def _call_claude(self, prompt: str, max_turns: int | None = None) -> str: + """Run one SDK turn with retry on transient failure. + + Mirrors CLIDispatcher._call_claude semantics: retry on transient + errors (with exponential backoff), log each failure to retry_log.jsonl, + log each completed call to llm_metrics.jsonl, give up after + max_retries. + """ + cwd = self._cwd + if cwd and not cwd.exists(): + raise RuntimeError( + f"SDKDispatcher cwd does not exist: {cwd}. " + f"Check that 'repo_path' in campaign.yaml is correct." + ) + turns = max_turns or self.max_turns + logger.info( + "SDK turn (model=%s, cwd=%s, max_turns=%d)", self.model, cwd, turns, + ) + + failure_count = 0 + original_prompt = prompt + while True: + try: + result = self._sdk_runner( + prompt=prompt, + model=self.model, + cwd=cwd, + max_turns=turns, + system_prompt=self._system_prompt, + settings_path=self._settings_path, + ) + except SDKTransientError as exc: + failure_count += 1 + self._log_retry("transient", failure_count, exc) + if self._exhausted(failure_count): + raise RuntimeError( + f"SDK still failing after {failure_count} attempt(s): {exc}" + ) from exc + time.sleep(_backoff_for(failure_count)) + prompt = self._maybe_resume_hint(prompt, original_prompt, "transient") + continue + + self._log_metrics_row(result) + + if result.is_error: + failure_count += 1 + self._log_retry( + "api_error", failure_count, RuntimeError(result.error_message), + ) + if self._exhausted(failure_count): + raise RuntimeError( + f"SDK returned error after {failure_count} attempt(s): " + f"{result.error_message}" + ) + time.sleep(_backoff_for(failure_count)) + prompt = self._maybe_resume_hint(prompt, original_prompt, "api_error") + continue + + return result.text + + # ------------------------------------------------------------------ + # Internals + # ------------------------------------------------------------------ + + def _exhausted(self, failure_count: int) -> bool: + return self.max_retries is not None and failure_count > self.max_retries + + def _log_retry(self, kind: str, attempt: int, exc: BaseException) -> None: + log_retry_event(self._metrics_path, { + "role": self._current_role, + "phase": self._current_phase, + "failure_type": kind, + "attempt": attempt, + "error": str(exc)[:500], + }) + + def _log_metrics_row(self, result: SDKResult) -> None: + log_metrics(self._metrics_path, { + "dispatcher": "sdk", + "role": self._current_role, + "phase": self._current_phase, + "model": self.model, + "input_tokens": result.input_tokens, + "output_tokens": result.output_tokens, + "cache_creation_input_tokens": result.cache_creation_input_tokens, + "cache_read_input_tokens": result.cache_read_input_tokens, + "cost_usd": result.cost_usd, + "duration_ms": result.duration_ms, + "num_turns": result.num_turns, + }) + + @staticmethod + def _maybe_resume_hint(prompt: str, original_prompt: str, kind: str) -> str: + """If the prompt has not yet been annotated with a resume hint, add one. + + Mirrors CLIDispatcher: tells the agent that the prior attempt was + interrupted so it picks up from existing artifacts rather than + starting fresh. + """ + marker = "\nNote: Your previous attempt was interrupted" + if marker in prompt: + return prompt + return ( + f"{original_prompt}\n\n---\n" + f"Note: Your previous attempt was interrupted ({kind}). " + f"Check the working directory for artifacts from your prior " + f"attempt and continue from where you left off." + ) diff --git a/pyproject.toml b/pyproject.toml index f0b9a53..0bfe2f7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,10 @@ dev = [ "pytest>=8.0", "pytest-cov>=4.0", ] +sdk = [ + "claude-agent-sdk>=0.0.20", + "anyio>=4.0", +] [project.scripts] nous = "orchestrator.cli:main" diff --git a/tests/test_explore_design.py b/tests/test_explore_design.py new file mode 100644 index 0000000..7e26ca6 --- /dev/null +++ b/tests/test_explore_design.py @@ -0,0 +1,229 @@ +"""Behavioral tests for the explore-then-synthesize DESIGN split (#132 Phase A).""" +from __future__ import annotations + +from pathlib import Path + +from orchestrator.explore_design import ( + DEFAULT_EXPLORE_SCOPES, + ExploreReport, + build_explore_prompt, + build_synthesis_prompt, + run_explore_stage, +) + + +def _campaign(**overrides): + base = { + "research_question": "What drives saturation?", + "target_system": { + "name": "BLIS", + "description": "Inference simulator.", + "observable_metrics": ["throughput", "latency"], + "controllable_knobs": ["batch_size", "scheduling"], + "repo_path": "/path/to/blis", + }, + } + base.update(overrides) + return base + + +# ─── Per-scope prompt builders ───────────────────────────────────────────── + +class TestBuildExplorePrompt: + + def test_metrics_prompt_focuses_on_observable_metrics(self): + out = build_explore_prompt("metrics", _campaign()) + assert "Explore: metrics" in out + assert "metric" in out.lower() + assert "BLIS" in out # target name appears + + def test_knobs_prompt_focuses_on_configuration(self): + out = build_explore_prompt("knobs", _campaign()) + assert "knob" in out.lower() or "config" in out.lower() + + def test_prior_findings_prompt_references_findings_json(self): + out = build_explore_prompt("prior_findings", _campaign()) + assert "findings.json" in out + + def test_principles_prompt_references_principles_store(self): + out = build_explore_prompt("principles", _campaign()) + assert "principles" in out.lower() + + def test_every_prompt_marks_explorer_read_only(self): + for scope in DEFAULT_EXPLORE_SCOPES: + out = build_explore_prompt(scope, _campaign()) + # Read-only enforcement must be EXPLICIT — Explore subagents + # don't have write tools, but the prompt should still say so. + assert "Do not modify" in out or "read-only" in out.lower() + + +# ─── Run stage A: collect reports ────────────────────────────────────────── + +class _RecordingRunner: + def __init__(self): + self.calls: list[dict] = [] + + def __call__(self, scope: str, prompt: str, campaign: dict) -> ExploreReport: + self.calls.append({"scope": scope, "prompt": prompt, "campaign": campaign}) + return ExploreReport( + scope=scope, + text=f"report for {scope}", + duration_ms=100, + input_tokens=200, + output_tokens=80, + ) + + +class TestRunExploreStage: + + def test_runs_one_subagent_per_default_scope(self): + runner = _RecordingRunner() + result = run_explore_stage(_campaign(), runner=runner) + + assert len(runner.calls) == len(DEFAULT_EXPLORE_SCOPES) + assert [r.scope for r in result.reports] == list(DEFAULT_EXPLORE_SCOPES) + + def test_custom_scopes_pass_through(self): + runner = _RecordingRunner() + run_explore_stage(_campaign(), scopes=["a", "b"], runner=runner) + assert [c["scope"] for c in runner.calls] == ["a", "b"] + + def test_aggregates_token_counts(self): + runner = _RecordingRunner() + result = run_explore_stage(_campaign(), runner=runner) + # 4 explorers × 200 input × 80 output. + assert result.total_input_tokens == 800 + assert result.total_output_tokens == 320 + + def test_lookup_by_scope_returns_correct_report(self): + runner = _RecordingRunner() + result = run_explore_stage(_campaign(), runner=runner) + report = result.by_scope("metrics") + assert report is not None + assert report.scope == "metrics" + + +# ─── Stage B: synthesis prompt ───────────────────────────────────────────── + +class TestBuildSynthesisPrompt: + + def _stage_a(self) -> "ExploreStageResult": # type: ignore[name-defined] + runner = _RecordingRunner() + return run_explore_stage(_campaign(), runner=runner) + + def test_includes_every_explorer_report_under_its_scope(self, tmp_path): + stage_a = self._stage_a() + out = build_synthesis_prompt( + stage_a, campaign=_campaign(), iteration=1, + iter_dir=tmp_path / "runs" / "iter-1", + ) + for scope in DEFAULT_EXPLORE_SCOPES: + assert f"### {scope}" in out + assert f"report for {scope}" in out + + def test_explicitly_forbids_re_reading_codebase(self, tmp_path): + stage_a = self._stage_a() + out = build_synthesis_prompt( + stage_a, campaign=_campaign(), iteration=1, + iter_dir=tmp_path / "runs" / "iter-1", + ) + assert "Do not re-read" in out + + def test_required_outputs_named(self, tmp_path): + stage_a = self._stage_a() + out = build_synthesis_prompt( + stage_a, campaign=_campaign(), iteration=2, + iter_dir=tmp_path / "runs" / "iter-2", + ) + assert "problem.md" in out + assert "bundle.yaml" in out + assert "iter-2" in out + assert "bundle.schema.yaml" in out + + def test_research_question_appears(self, tmp_path): + stage_a = self._stage_a() + out = build_synthesis_prompt( + stage_a, campaign=_campaign(), iteration=1, + iter_dir=tmp_path / "runs" / "iter-1", + ) + assert "What drives saturation?" in out + + +# ─── Phase B: SDK explore runner factory ─────────────────────────────────── + + +from dataclasses import dataclass as _dataclass + + +@_dataclass +class _LocalSDKResult: + """Local stand-in for SDKResult; the real one is duck-compatible.""" + text: str = "" + duration_ms: int = 0 + input_tokens: int = 0 + output_tokens: int = 0 + + +class TestMakeSdkExploreRunner: + """The factory wraps an injected sdk_runner so each Stage A scope + spawns a read-only Explore subagent. Tests assert what the runner + sends to the SDK and how it maps the response back to ExploreReport. + No live SDK call happens (no-live-LLM policy, see CLAUDE.md).""" + + def test_dispatches_each_scope_with_explore_subagent_type(self): + from orchestrator.explore_design import make_sdk_explore_runner + + sdk_calls: list[dict] = [] + + def sdk_runner(**kwargs): + sdk_calls.append(kwargs) + return _LocalSDKResult( + text="report", duration_ms=80, + input_tokens=300, output_tokens=120, + ) + + explore_runner = make_sdk_explore_runner( + sdk_runner=sdk_runner, cwd=None, model="claude-haiku-4-5", + max_turns=8, + ) + result = run_explore_stage(_campaign(), runner=explore_runner) + + assert len(sdk_calls) == len(DEFAULT_EXPLORE_SCOPES) + # Every call passes subagent_type=Explore — the harness signal + # for read-only mapping. + assert all(c.get("subagent_type") == "Explore" for c in sdk_calls) + assert all(r.text and r.input_tokens == 300 for r in result.reports) + assert result.total_input_tokens == 300 * len(DEFAULT_EXPLORE_SCOPES) + + def test_falls_back_when_sdk_runner_lacks_subagent_kwarg(self): + """Forward/backward compatibility: older sdk_runners without + subagent_type still work; the factory drops the kwarg on + TypeError and retries with the base signature.""" + from orchestrator.explore_design import make_sdk_explore_runner + + seen: list[dict] = [] + + def old_signature_runner(*, prompt, model, cwd, max_turns): + seen.append({"prompt": prompt, "max_turns": max_turns}) + return _LocalSDKResult(text="ok") + + explore_runner = make_sdk_explore_runner(sdk_runner=old_signature_runner) + run_explore_stage(_campaign(), scopes=["metrics"], runner=explore_runner) + + assert len(seen) == 1 + assert seen[0]["prompt"] + + def test_uses_haiku_by_default(self): + """Read-only mapping should be cheap — default model is Haiku.""" + from orchestrator.explore_design import make_sdk_explore_runner + + models: list[str] = [] + + def sdk_runner(**kwargs): + models.append(kwargs.get("model", "")) + return _LocalSDKResult() + + explore_runner = make_sdk_explore_runner(sdk_runner=sdk_runner) + run_explore_stage(_campaign(), scopes=["metrics"], runner=explore_runner) + + assert models[0].lower().startswith("claude-haiku") diff --git a/tests/test_sdk_dispatch.py b/tests/test_sdk_dispatch.py new file mode 100644 index 0000000..b6d4cf9 --- /dev/null +++ b/tests/test_sdk_dispatch.py @@ -0,0 +1,268 @@ +"""Behavioral tests for the SDK-based dispatcher. + +These tests do NOT mock the Claude Agent SDK directly. They inject a +``sdk_runner`` callable that returns a ``SDKResult`` — same contract the +real dispatcher uses internally — and assert what the dispatcher does +with that result: artifacts on disk, metrics rows, retry behavior. + +That is the contract the rest of Nous depends on. Tests below should +keep passing across SDK API churn as long as the dispatcher's responsibility +to write artifacts and emit metrics holds. + +No assertions about argv shape, internal helper calls, or which methods +the dispatcher invoked on the runner. That's structural — out of scope. +""" +from __future__ import annotations + +import json +from pathlib import Path + +import jsonschema +import pytest +import yaml + +from orchestrator.sdk_dispatch import SDKDispatcher, SDKResult, SDKTransientError + + +SCHEMAS_DIR = Path(__file__).resolve().parent.parent / "orchestrator" / "schemas" + + +def _load_schema(name: str) -> dict: + path = SCHEMAS_DIR / name + if path.suffix in (".yaml", ".yml"): + return yaml.safe_load(path.read_text()) + return json.loads(path.read_text()) + + +def _make_campaign(repo_path: Path | None = None) -> dict: + target = { + "name": "test-system", + "description": "A small test system used by behavioral tests.", + "observable_metrics": ["latency", "throughput"], + "controllable_knobs": ["batch_size", "concurrency"], + } + if repo_path is not None: + target["repo_path"] = str(repo_path) + return { + "research_question": "What drives latency?", + "target_system": target, + } + + +def _read_jsonl(path: Path) -> list[dict]: + if not path.exists(): + return [] + return [json.loads(line) for line in path.read_text().splitlines() if line.strip()] + + +class _ScriptedRunner: + """A runner that returns a queue of pre-staged results. + + Each call pops the next entry. Entries can be SDKResult objects (returned) + or BaseException instances (raised). When the queue is exhausted, raises + AssertionError — a test-only failure mode that signals the dispatcher + called the runner more times than expected. + """ + + def __init__(self, scripted: list): + self._scripted = list(scripted) + self.calls: list[dict] = [] + + def __call__(self, **kwargs) -> SDKResult: + self.calls.append(kwargs) + if not self._scripted: + raise AssertionError( + f"Runner exhausted; dispatcher called it {len(self.calls)} times " + f"but only {len(self.calls) - 1} responses were scripted." + ) + nxt = self._scripted.pop(0) + if isinstance(nxt, BaseException): + raise nxt + return nxt + + +# ─── Text-output phase (design): dispatcher writes assistant text to log ─── + +class TestSDKDispatchTextPhase: + """For design/execute-analyze, the SDK runs an agent that writes + artifacts via tool calls; the dispatcher persists the assistant's + final text message as a log.""" + + def test_writes_assistant_text_to_output_path(self, tmp_path): + runner = _ScriptedRunner([ + SDKResult(text="design log content here", input_tokens=100, output_tokens=50), + ]) + dispatcher = SDKDispatcher( + work_dir=tmp_path, + campaign=_make_campaign(tmp_path), + sdk_runner=runner, + ) + + out = tmp_path / "runs" / "iter-1" / "design_log.md" + dispatcher.dispatch("planner", "design", output_path=out, iteration=1) + + assert out.exists() + assert "design log content here" in out.read_text() + + def test_emits_one_metrics_row_per_call(self, tmp_path): + runner = _ScriptedRunner([ + SDKResult( + text="ok", + input_tokens=400, + output_tokens=120, + cache_read_input_tokens=300, + cache_creation_input_tokens=0, + cost_usd=0.021, + duration_ms=4500, + num_turns=3, + ), + ]) + dispatcher = SDKDispatcher( + work_dir=tmp_path, + campaign=_make_campaign(tmp_path), + sdk_runner=runner, + ) + + dispatcher.dispatch( + "planner", "design", + output_path=tmp_path / "runs" / "iter-1" / "design_log.md", + iteration=1, + ) + + rows = _read_jsonl(tmp_path / "llm_metrics.jsonl") + assert len(rows) == 1 + row = rows[0] + assert row["dispatcher"] == "sdk" + assert row["role"] == "planner" + assert row["phase"] == "design" + assert row["input_tokens"] == 400 + assert row["output_tokens"] == 120 + assert row["cache_read_input_tokens"] == 300 + assert row["cost_usd"] == pytest.approx(0.021) + assert row["num_turns"] == 3 + + +# ─── Structured-output phase: dispatcher parses + validates + writes JSON ── + +class TestSDKDispatchStructuredPhase: + """Gate-summary phase: SDK returns a fenced JSON; dispatcher parses, + validates against gate_summary.schema.json, writes JSON output.""" + + _SUMMARY = { + "gate_type": "design", + "summary": "Hypothesis bundle is well-formed and consistent with active principles.", + "key_points": [ + "Hypothesis bundle covers the four arms.", + "Methodology aligns with prior principles.", + ], + } + + def test_writes_valid_json_when_runner_returns_fenced_payload(self, tmp_path): + fenced = "```json\n" + json.dumps(self._SUMMARY) + "\n```" + runner = _ScriptedRunner([SDKResult(text=fenced)]) + dispatcher = SDKDispatcher( + work_dir=tmp_path, + campaign=_make_campaign(), + sdk_runner=runner, + ) + + out = tmp_path / "runs" / "iter-1" / "gate_summary.json" + dispatcher.dispatch( + "summarizer", "summarize-gate", + output_path=out, iteration=1, perspective="design", + ) + + assert out.exists() + parsed = json.loads(out.read_text()) + jsonschema.validate(parsed, _load_schema("gate_summary.schema.json")) + assert parsed["gate_type"] == "design" + + +# ─── Transient retry behavior ─────────────────────────────────────────────── + +class TestSDKDispatchTransientRetry: + + def test_retries_after_transient_error_then_succeeds(self, tmp_path, monkeypatch): + # Disable backoff sleep to keep the test fast. + monkeypatch.setattr( + "orchestrator.sdk_dispatch.time.sleep", lambda _s: None, + ) + runner = _ScriptedRunner([ + SDKTransientError("network blip"), + SDKResult(text="recovered text", input_tokens=10, output_tokens=5), + ]) + dispatcher = SDKDispatcher( + work_dir=tmp_path, + campaign=_make_campaign(tmp_path), + sdk_runner=runner, + max_retries=3, + ) + + out = tmp_path / "runs" / "iter-1" / "design_log.md" + dispatcher.dispatch("planner", "design", output_path=out, iteration=1) + + assert "recovered text" in out.read_text() + + retry_log = _read_jsonl(tmp_path / "retry_log.jsonl") + assert len(retry_log) == 1 + assert retry_log[0]["role"] == "planner" + assert retry_log[0]["phase"] == "design" + assert "network blip" in retry_log[0]["error"] + + def test_raises_after_retries_exhausted(self, tmp_path, monkeypatch): + monkeypatch.setattr( + "orchestrator.sdk_dispatch.time.sleep", lambda _s: None, + ) + runner = _ScriptedRunner([ + SDKTransientError("persistent failure"), + SDKTransientError("persistent failure"), + SDKTransientError("persistent failure"), + ]) + dispatcher = SDKDispatcher( + work_dir=tmp_path, + campaign=_make_campaign(tmp_path), + sdk_runner=runner, + max_retries=2, + ) + + with pytest.raises(RuntimeError, match="still failing"): + dispatcher.dispatch( + "planner", "design", + output_path=tmp_path / "runs" / "iter-1" / "design_log.md", + iteration=1, + ) + + retry_log = _read_jsonl(tmp_path / "retry_log.jsonl") + # Three failures = three retry-log rows. + assert len(retry_log) == 3 + + +# ─── Error result path ────────────────────────────────────────────────────── + +class TestSDKDispatchErrorResult: + """When the SDK returns is_error=True (e.g. API rejected the request), + the dispatcher treats it as transient unless explicitly fatal.""" + + def test_is_error_treated_as_transient_and_retried(self, tmp_path, monkeypatch): + monkeypatch.setattr( + "orchestrator.sdk_dispatch.time.sleep", lambda _s: None, + ) + runner = _ScriptedRunner([ + SDKResult(text="", is_error=True, error_message="rate limit exceeded"), + SDKResult(text="finally got through", input_tokens=10, output_tokens=5), + ]) + dispatcher = SDKDispatcher( + work_dir=tmp_path, + campaign=_make_campaign(tmp_path), + sdk_runner=runner, + max_retries=3, + ) + + out = tmp_path / "runs" / "iter-1" / "design_log.md" + dispatcher.dispatch("planner", "design", output_path=out, iteration=1) + + assert "finally got through" in out.read_text() + + retry_log = _read_jsonl(tmp_path / "retry_log.jsonl") + assert len(retry_log) == 1 + assert "rate limit exceeded" in retry_log[0]["error"]