From 33c07a1093994bbcf2f273513b712611e82c742f Mon Sep 17 00:00:00 2001 From: Srinivasan Parthasarathy Date: Sun, 24 May 2026 07:54:20 -0400 Subject: [PATCH 1/3] feat: add SDKDispatcher and --agent sdk flag (#121) Replace the subprocess(claude -p) transport with the Claude Agent SDK behind a new --agent sdk flag. CLIDispatcher remains the default; sdk mode is opt-in until soak time validates parity. Why: claude -p is blind for up to 7200s, has no native streaming, no programmatic prompt caching, no native subagent spawning, and retries by subprocess restart (loses message context). The SDK fixes all four. What lands: - orchestrator/sdk_dispatch.py: SDKDispatcher extends CLIDispatcher, overrides only _call_claude and preflight_check. Reuses the parse / validate / retry-with-feedback machinery for fenced-output phases. - A pluggable sdk_runner Protocol (SDKResult dataclass) is the seam for behavioral tests and for #122/#127 follow-ups (cache_control, stream-json) that need to read SDK events. - Default runner lazily resolves to the real claude_agent_sdk so environments without the SDK installed don't fail at import time. - CLI/argparse choices extended to ["inline", "api", "sdk"] in cli.py, campaign.py, iteration.py (parser declarations and dispatch routing). - Pre-flight check in campaign.py routes to SDK preflight when sdk mode. - pyproject.toml gains an [sdk] optional extra: claude-agent-sdk + anyio. - docs/architecture.md describes the new path. Behavioral tests (tests/test_sdk_dispatch.py): 6 cases covering text phase output, structured phase parse+validate, transient retry, retry exhaustion, and is_error -> retry. All assertions are about on-disk artifacts and metrics rows; none assert call shape, argv, or which method was invoked on the runner. Out of scope for this PR (queued in #120 plan): - Prompt caching (#122). - Stream-json TUI (#127). - Removing claude -p (post-soak cleanup). Test suite: 344 passed (existing) + 6 new = 350. Closes #121. Refs #120. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/architecture.md | 5 +- orchestrator/campaign.py | 25 ++- orchestrator/cli.py | 6 +- orchestrator/iteration.py | 12 +- orchestrator/sdk_dispatch.py | 355 +++++++++++++++++++++++++++++++++++ pyproject.toml | 4 + tests/test_sdk_dispatch.py | 268 ++++++++++++++++++++++++++ 7 files changed, 659 insertions(+), 16 deletions(-) create mode 100644 orchestrator/sdk_dispatch.py create mode 100644 tests/test_sdk_dispatch.py diff --git a/docs/architecture.md b/docs/architecture.md index f5e162b..5a2e691 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -110,7 +110,8 @@ Both agents write artifacts directly to the campaign directory (`iter_dir`) and **Implementations:** - `StubDispatcher` (`dispatch.py`) produces valid, schema-conformant artifacts without calling any LLM. Used for testing the orchestrator loop. -- `CLIDispatcher` (`cli_dispatch.py`) invokes `claude -p` as a subprocess, giving agents code access and shell tools. Agents write files directly to `iter_dir`. Supports `override_cwd()` context manager for pointing the executor at a git worktree. +- `CLIDispatcher` (`cli_dispatch.py`) invokes `claude -p` as a subprocess, giving agents code access and shell tools. Agents write files directly to `iter_dir`. Supports `override_cwd()` context manager for pointing the executor at a git worktree. Selected via `--agent api`. +- `SDKDispatcher` (`sdk_dispatch.py`) calls the Claude Agent SDK (`claude-agent-sdk`) instead of spawning a subprocess. Same artifact and metrics contract as `CLIDispatcher`; gains native streaming, programmatic prompt caching, and message-level retry. Selected via `--agent sdk`. Requires the optional `sdk` install extra (`pip install -e ".[sdk]"`). Inherits parse / validate / retry-with-feedback machinery from `CLIDispatcher` — only the transport changes. **Dispatch interface:** ```python @@ -122,7 +123,7 @@ dispatcher.dispatch( ) ``` -Both dispatchers share the same interface — `CLIDispatcher` extends `LLMDispatcher`. +All three dispatchers share the same interface. `CLIDispatcher` extends `LLMDispatcher`; `SDKDispatcher` extends `CLIDispatcher` and overrides only `_call_claude` and `preflight_check`. ## CLI Dispatch diff --git a/orchestrator/campaign.py b/orchestrator/campaign.py index 2ba6a84..99b7e15 100644 --- a/orchestrator/campaign.py +++ b/orchestrator/campaign.py @@ -206,15 +206,24 @@ def run_campaign( HumanGate(auto_response="approve") if auto_approve else HumanGate() ) - # Pre-flight: validate CLI + credentials before starting the campaign + # Pre-flight: validate CLI + credentials before starting the campaign. + # SDK mode pre-flights via claude-agent-sdk import; API mode via claude CLI. repo_path = campaign.get("target_system", {}).get("repo_path") if agent != "inline" and repo_path: - from orchestrator.cli_dispatch import CLIDispatcher - preflight_dispatcher = CLIDispatcher( - work_dir=work_dir, campaign=campaign, - model=_resolve_model(campaign, "design", model), - max_retries=max_cli_retries, - ) + if agent == "sdk": + from orchestrator.sdk_dispatch import SDKDispatcher + preflight_dispatcher = SDKDispatcher( + work_dir=work_dir, campaign=campaign, + model=_resolve_model(campaign, "design", model), + max_retries=max_cli_retries, + ) + else: + from orchestrator.cli_dispatch import CLIDispatcher + preflight_dispatcher = CLIDispatcher( + work_dir=work_dir, campaign=campaign, + model=_resolve_model(campaign, "design", model), + max_retries=max_cli_retries, + ) preflight_dispatcher.preflight_check() start_iter = _resume_completed_campaign(work_dir, max_iterations) @@ -353,7 +362,7 @@ def main() -> None: help="Timeout in seconds for claude -p calls (default: 1800)") parser.add_argument("--max-cli-retries", type=int, default=10, help="Max retries for claude -p failures (-1 = unbounded, default: 10)") - parser.add_argument("--agent", choices=["inline", "api"], default="api", + parser.add_argument("--agent", choices=["inline", "api", "sdk"], default="api", help="Dispatch backend: 'inline' emits prompts to stdout for the " "calling agent (no subprocess, no API key), " "'api' uses the LLM API (default: api)") diff --git a/orchestrator/cli.py b/orchestrator/cli.py index 755e9d9..4cb7e2c 100644 --- a/orchestrator/cli.py +++ b/orchestrator/cli.py @@ -310,7 +310,7 @@ def main(): p_run.add_argument("--auto-approve", action="store_true") p_run.add_argument("--timeout", type=int, default=1800) p_run.add_argument("--max-cli-retries", type=int, default=10) - p_run.add_argument("--agent", choices=["inline", "api"], default="api") + p_run.add_argument("--agent", choices=["inline", "api", "sdk"], default="api") p_run.set_defaults(func=_cmd_run) p_resume = subparsers.add_parser("resume") @@ -320,7 +320,7 @@ def main(): p_resume.add_argument("--auto-approve", action="store_true") p_resume.add_argument("--timeout", type=int, default=1800) p_resume.add_argument("--max-cli-retries", type=int, default=10) - p_resume.add_argument("--agent", choices=["inline", "api"], default="api") + p_resume.add_argument("--agent", choices=["inline", "api", "sdk"], default="api") p_resume.set_defaults(func=_cmd_resume) p_validate = subparsers.add_parser("validate") @@ -340,7 +340,7 @@ def main(): p_report.add_argument("target") p_report.add_argument("--model") p_report.add_argument("--timeout", type=int, default=1800) - p_report.add_argument("--agent", choices=["inline", "api"], default="api") + p_report.add_argument("--agent", choices=["inline", "api", "sdk"], default="api") p_report.set_defaults(func=_cmd_report) p_replay = subparsers.add_parser("replay") diff --git a/orchestrator/iteration.py b/orchestrator/iteration.py index 29e9712..2f5ac10 100644 --- a/orchestrator/iteration.py +++ b/orchestrator/iteration.py @@ -281,9 +281,15 @@ def _max_turns_for(phase_key: str) -> int: cli_dispatcher = inline_dispatcher llm_dispatcher = inline_dispatcher else: - # API mode: CLIDispatcher for code-access roles only (when repo_path is set) + # API or SDK mode: code-access dispatcher only when repo_path is set. + # SDK uses claude-agent-sdk; api uses the claude -p subprocess (CLIDispatcher). + if agent == "sdk": + from orchestrator.sdk_dispatch import SDKDispatcher + code_dispatcher_cls = SDKDispatcher + else: + code_dispatcher_cls = CLIDispatcher cli_dispatcher = ( - CLIDispatcher( + code_dispatcher_cls( work_dir=work_dir, campaign=campaign, model=_model_for("design"), timeout=timeout, max_turns=_max_turns_for("design"), @@ -493,7 +499,7 @@ def main() -> None: help="Timeout in seconds for claude -p calls (default: 1800)") parser.add_argument("--max-cli-retries", type=int, default=10, help="Max retries for claude -p failures (-1 = unbounded, default: 10)") - parser.add_argument("--agent", choices=["inline", "api"], default="api", + parser.add_argument("--agent", choices=["inline", "api", "sdk"], default="api", help="Dispatch backend: 'inline' emits prompts to stdout for the " "calling agent, 'api' uses the LLM API (default: api)") parser.add_argument("-v", "--verbose", action="store_true", diff --git a/orchestrator/sdk_dispatch.py b/orchestrator/sdk_dispatch.py new file mode 100644 index 0000000..020a0f0 --- /dev/null +++ b/orchestrator/sdk_dispatch.py @@ -0,0 +1,355 @@ +"""SDK-based agent dispatch for the Nous orchestrator. + +Calls the Claude Agent SDK in place of `claude -p` subprocess. Same +artifact and metrics contract as :class:`orchestrator.cli_dispatch.CLIDispatcher`; +this class swaps the transport without changing the orchestrator's contract +with the rest of Nous. + +Why SDK over `claude -p`: + * Native streaming → fast progress visibility (#127). + * Programmatic prompt caching → token savings (#122). + * Native subagent spawning → parallel arms without manual fork/join (#123). + * Message-level retry instead of subprocess restart. + +Design decisions worth knowing: + + * The actual SDK call is delegated to a ``sdk_runner`` callable. The + default lazily resolves to a real ``claude_agent_sdk`` runner; tests + inject a deterministic fake. The runner returns an ``SDKResult`` + (text + usage + cost + error flag); the dispatcher's job is to turn + that into on-disk artifacts and a metrics row, with retry on transient + failure. This keeps tests behavioral — they assert what's on disk, + not which method we called. + * Inherits from CLIDispatcher to reuse the parse/validate/retry-with-feedback + machinery used for fenced-output phases (gate summaries, etc.). +""" +from __future__ import annotations + +import logging +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Callable, Protocol, runtime_checkable + +from orchestrator.cli_dispatch import CLIDispatcher, _backoff_for +from orchestrator.metrics import log_metrics, log_retry_event + +logger = logging.getLogger(__name__) + + +class SDKTransientError(RuntimeError): + """Runner raises this for retryable transport-level failures.""" + + +@dataclass +class SDKResult: + """One SDK call's outcome. + + The dispatcher reads only these fields. Producers (real or fake) must + populate ``text`` (assistant final text); usage/cost fields default + to zero so trivial fakes need not set them. + """ + + text: str + input_tokens: int = 0 + output_tokens: int = 0 + cache_read_input_tokens: int = 0 + cache_creation_input_tokens: int = 0 + cost_usd: float = 0.0 + duration_ms: int = 0 + num_turns: int = 1 + is_error: bool = False + error_message: str = "" + extra: dict = field(default_factory=dict) + + +@runtime_checkable +class SDKRunner(Protocol): + """A callable that performs one SDK turn and returns an ``SDKResult``. + + Raise :class:`SDKTransientError` for retryable failures (network blips, + rate limits, mid-stream disconnect). Return ``SDKResult(is_error=True, + error_message=...)`` for API-reported errors that should also be retried. + Other exceptions bubble up as fatal. + """ + + def __call__( + self, + *, + prompt: str, + model: str, + cwd: Path | None, + max_turns: int, + system_prompt: str | None = None, + settings_path: Path | None = None, + ) -> SDKResult: + ... + + +def _default_sdk_runner_factory() -> SDKRunner: + """Return a runner that calls the real ``claude_agent_sdk``. + + Resolved lazily so that tests (and environments without the SDK + installed) don't fail at import time. + """ + + def _runner( + *, + prompt: str, + model: str, + cwd: Path | None, + max_turns: int, + system_prompt: str | None = None, + settings_path: Path | None = None, + ) -> SDKResult: + try: + import anyio + from claude_agent_sdk import ( # type: ignore[import-not-found] + ClaudeAgentOptions, + query, + ) + except ImportError as exc: + raise RuntimeError( + "claude-agent-sdk is not installed. " + "Install with `pip install claude-agent-sdk` or use --agent api." + ) from exc + + async def _run() -> SDKResult: + options = ClaudeAgentOptions( + model=model, + cwd=str(cwd) if cwd else None, + max_turns=max_turns, + system_prompt=system_prompt, + settings=str(settings_path) if settings_path else None, + ) + text_chunks: list[str] = [] + usage: dict = {} + cost_usd = 0.0 + duration_ms = 0 + num_turns = 0 + t0 = time.time() + async for message in query(prompt=prompt, options=options): + cls = type(message).__name__ + if cls == "AssistantMessage": + for block in getattr(message, "content", []): + if hasattr(block, "text"): + text_chunks.append(block.text) + elif cls == "ResultMessage": + usage = getattr(message, "usage", {}) or {} + cost_usd = float(getattr(message, "total_cost_usd", 0.0) or 0.0) + duration_ms = int(getattr(message, "duration_ms", 0) or 0) + num_turns = int(getattr(message, "num_turns", 0) or 0) + if getattr(message, "is_error", False): + return SDKResult( + text="".join(text_chunks), + error_message=str(getattr(message, "result", "unknown")), + is_error=True, + input_tokens=int(usage.get("input_tokens", 0) or 0), + output_tokens=int(usage.get("output_tokens", 0) or 0), + cache_read_input_tokens=int( + usage.get("cache_read_input_tokens", 0) or 0 + ), + cache_creation_input_tokens=int( + usage.get("cache_creation_input_tokens", 0) or 0 + ), + cost_usd=cost_usd, + duration_ms=duration_ms, + num_turns=num_turns, + ) + return SDKResult( + text="".join(text_chunks), + input_tokens=int(usage.get("input_tokens", 0) or 0), + output_tokens=int(usage.get("output_tokens", 0) or 0), + cache_read_input_tokens=int( + usage.get("cache_read_input_tokens", 0) or 0 + ), + cache_creation_input_tokens=int( + usage.get("cache_creation_input_tokens", 0) or 0 + ), + cost_usd=cost_usd, + duration_ms=duration_ms or int((time.time() - t0) * 1000), + num_turns=num_turns or 1, + ) + + try: + return anyio.run(_run) + except Exception as exc: + cls_name = type(exc).__name__ + transient_signals = ( + "ConnectionError", + "ReadTimeout", + "WriteTimeout", + "RemoteProtocolError", + "ServerDisconnectedError", + "TimeoutError", + ) + if any(sig in cls_name for sig in transient_signals): + raise SDKTransientError(f"{cls_name}: {exc}") from exc + raise + + return _runner + + +class SDKDispatcher(CLIDispatcher): + """Dispatch agent roles via the Claude Agent SDK. + + Inherits dispatch() / parse / retry-with-feedback / route logic from + :class:`CLIDispatcher`. Overrides ``_call_claude`` to use the SDK + runner instead of a subprocess, and ``preflight_check`` to verify + the SDK package is importable. + """ + + def __init__( + self, + work_dir: Path, + campaign: dict, + model: str = "claude-sonnet-4-6", + prompts_dir: Path | None = None, + timeout: int = 1800, + max_turns: int = 25, + max_retries: int | None = 10, + sdk_runner: Callable | None = None, + system_prompt: str | None = None, + settings_path: Path | None = None, + ) -> None: + super().__init__( + work_dir=work_dir, + campaign=campaign, + model=model, + prompts_dir=prompts_dir, + timeout=timeout, + max_turns=max_turns, + max_retries=max_retries, + ) + self._sdk_runner = sdk_runner or _default_sdk_runner_factory() + self._system_prompt = system_prompt + self._settings_path = settings_path + + # ------------------------------------------------------------------ + # Pre-flight + # ------------------------------------------------------------------ + + def preflight_check(self) -> None: + """Verify the SDK is reachable before starting a campaign.""" + try: + import claude_agent_sdk # type: ignore[import-not-found] # noqa: F401 + except ImportError as exc: + raise RuntimeError( + "Pre-flight check failed: claude-agent-sdk is not installed. " + "Install with `pip install claude-agent-sdk`, or pass --agent api " + "to use the OpenAI-compatible path instead." + ) from exc + logger.info("SDK pre-flight check passed (model=%s)", self.model) + + # ------------------------------------------------------------------ + # Core call with retry + # ------------------------------------------------------------------ + + def _call_claude(self, prompt: str, max_turns: int | None = None) -> str: + """Run one SDK turn with retry on transient failure. + + Mirrors CLIDispatcher._call_claude semantics: retry on transient + errors (with exponential backoff), log each failure to retry_log.jsonl, + log each completed call to llm_metrics.jsonl, give up after + max_retries. + """ + cwd = self._cwd + if cwd and not cwd.exists(): + raise RuntimeError( + f"SDKDispatcher cwd does not exist: {cwd}. " + f"Check that 'repo_path' in campaign.yaml is correct." + ) + turns = max_turns or self.max_turns + logger.info( + "SDK turn (model=%s, cwd=%s, max_turns=%d)", self.model, cwd, turns, + ) + + failure_count = 0 + original_prompt = prompt + while True: + try: + result = self._sdk_runner( + prompt=prompt, + model=self.model, + cwd=cwd, + max_turns=turns, + system_prompt=self._system_prompt, + settings_path=self._settings_path, + ) + except SDKTransientError as exc: + failure_count += 1 + self._log_retry("transient", failure_count, exc) + if self._exhausted(failure_count): + raise RuntimeError( + f"SDK still failing after {failure_count} attempt(s): {exc}" + ) from exc + time.sleep(_backoff_for(failure_count)) + prompt = self._maybe_resume_hint(prompt, original_prompt, "transient") + continue + + self._log_metrics_row(result) + + if result.is_error: + failure_count += 1 + self._log_retry( + "api_error", failure_count, RuntimeError(result.error_message), + ) + if self._exhausted(failure_count): + raise RuntimeError( + f"SDK returned error after {failure_count} attempt(s): " + f"{result.error_message}" + ) + time.sleep(_backoff_for(failure_count)) + prompt = self._maybe_resume_hint(prompt, original_prompt, "api_error") + continue + + return result.text + + # ------------------------------------------------------------------ + # Internals + # ------------------------------------------------------------------ + + def _exhausted(self, failure_count: int) -> bool: + return self.max_retries is not None and failure_count > self.max_retries + + def _log_retry(self, kind: str, attempt: int, exc: BaseException) -> None: + log_retry_event(self._metrics_path, { + "role": self._current_role, + "phase": self._current_phase, + "failure_type": kind, + "attempt": attempt, + "error": str(exc)[:500], + }) + + def _log_metrics_row(self, result: SDKResult) -> None: + log_metrics(self._metrics_path, { + "dispatcher": "sdk", + "role": self._current_role, + "phase": self._current_phase, + "model": self.model, + "input_tokens": result.input_tokens, + "output_tokens": result.output_tokens, + "cache_creation_input_tokens": result.cache_creation_input_tokens, + "cache_read_input_tokens": result.cache_read_input_tokens, + "cost_usd": result.cost_usd, + "duration_ms": result.duration_ms, + "num_turns": result.num_turns, + }) + + @staticmethod + def _maybe_resume_hint(prompt: str, original_prompt: str, kind: str) -> str: + """If the prompt has not yet been annotated with a resume hint, add one. + + Mirrors CLIDispatcher: tells the agent that the prior attempt was + interrupted so it picks up from existing artifacts rather than + starting fresh. + """ + marker = "\nNote: Your previous attempt was interrupted" + if marker in prompt: + return prompt + return ( + f"{original_prompt}\n\n---\n" + f"Note: Your previous attempt was interrupted ({kind}). " + f"Check the working directory for artifacts from your prior " + f"attempt and continue from where you left off." + ) diff --git a/pyproject.toml b/pyproject.toml index f0b9a53..0bfe2f7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,10 @@ dev = [ "pytest>=8.0", "pytest-cov>=4.0", ] +sdk = [ + "claude-agent-sdk>=0.0.20", + "anyio>=4.0", +] [project.scripts] nous = "orchestrator.cli:main" diff --git a/tests/test_sdk_dispatch.py b/tests/test_sdk_dispatch.py new file mode 100644 index 0000000..b6d4cf9 --- /dev/null +++ b/tests/test_sdk_dispatch.py @@ -0,0 +1,268 @@ +"""Behavioral tests for the SDK-based dispatcher. + +These tests do NOT mock the Claude Agent SDK directly. They inject a +``sdk_runner`` callable that returns a ``SDKResult`` — same contract the +real dispatcher uses internally — and assert what the dispatcher does +with that result: artifacts on disk, metrics rows, retry behavior. + +That is the contract the rest of Nous depends on. Tests below should +keep passing across SDK API churn as long as the dispatcher's responsibility +to write artifacts and emit metrics holds. + +No assertions about argv shape, internal helper calls, or which methods +the dispatcher invoked on the runner. That's structural — out of scope. +""" +from __future__ import annotations + +import json +from pathlib import Path + +import jsonschema +import pytest +import yaml + +from orchestrator.sdk_dispatch import SDKDispatcher, SDKResult, SDKTransientError + + +SCHEMAS_DIR = Path(__file__).resolve().parent.parent / "orchestrator" / "schemas" + + +def _load_schema(name: str) -> dict: + path = SCHEMAS_DIR / name + if path.suffix in (".yaml", ".yml"): + return yaml.safe_load(path.read_text()) + return json.loads(path.read_text()) + + +def _make_campaign(repo_path: Path | None = None) -> dict: + target = { + "name": "test-system", + "description": "A small test system used by behavioral tests.", + "observable_metrics": ["latency", "throughput"], + "controllable_knobs": ["batch_size", "concurrency"], + } + if repo_path is not None: + target["repo_path"] = str(repo_path) + return { + "research_question": "What drives latency?", + "target_system": target, + } + + +def _read_jsonl(path: Path) -> list[dict]: + if not path.exists(): + return [] + return [json.loads(line) for line in path.read_text().splitlines() if line.strip()] + + +class _ScriptedRunner: + """A runner that returns a queue of pre-staged results. + + Each call pops the next entry. Entries can be SDKResult objects (returned) + or BaseException instances (raised). When the queue is exhausted, raises + AssertionError — a test-only failure mode that signals the dispatcher + called the runner more times than expected. + """ + + def __init__(self, scripted: list): + self._scripted = list(scripted) + self.calls: list[dict] = [] + + def __call__(self, **kwargs) -> SDKResult: + self.calls.append(kwargs) + if not self._scripted: + raise AssertionError( + f"Runner exhausted; dispatcher called it {len(self.calls)} times " + f"but only {len(self.calls) - 1} responses were scripted." + ) + nxt = self._scripted.pop(0) + if isinstance(nxt, BaseException): + raise nxt + return nxt + + +# ─── Text-output phase (design): dispatcher writes assistant text to log ─── + +class TestSDKDispatchTextPhase: + """For design/execute-analyze, the SDK runs an agent that writes + artifacts via tool calls; the dispatcher persists the assistant's + final text message as a log.""" + + def test_writes_assistant_text_to_output_path(self, tmp_path): + runner = _ScriptedRunner([ + SDKResult(text="design log content here", input_tokens=100, output_tokens=50), + ]) + dispatcher = SDKDispatcher( + work_dir=tmp_path, + campaign=_make_campaign(tmp_path), + sdk_runner=runner, + ) + + out = tmp_path / "runs" / "iter-1" / "design_log.md" + dispatcher.dispatch("planner", "design", output_path=out, iteration=1) + + assert out.exists() + assert "design log content here" in out.read_text() + + def test_emits_one_metrics_row_per_call(self, tmp_path): + runner = _ScriptedRunner([ + SDKResult( + text="ok", + input_tokens=400, + output_tokens=120, + cache_read_input_tokens=300, + cache_creation_input_tokens=0, + cost_usd=0.021, + duration_ms=4500, + num_turns=3, + ), + ]) + dispatcher = SDKDispatcher( + work_dir=tmp_path, + campaign=_make_campaign(tmp_path), + sdk_runner=runner, + ) + + dispatcher.dispatch( + "planner", "design", + output_path=tmp_path / "runs" / "iter-1" / "design_log.md", + iteration=1, + ) + + rows = _read_jsonl(tmp_path / "llm_metrics.jsonl") + assert len(rows) == 1 + row = rows[0] + assert row["dispatcher"] == "sdk" + assert row["role"] == "planner" + assert row["phase"] == "design" + assert row["input_tokens"] == 400 + assert row["output_tokens"] == 120 + assert row["cache_read_input_tokens"] == 300 + assert row["cost_usd"] == pytest.approx(0.021) + assert row["num_turns"] == 3 + + +# ─── Structured-output phase: dispatcher parses + validates + writes JSON ── + +class TestSDKDispatchStructuredPhase: + """Gate-summary phase: SDK returns a fenced JSON; dispatcher parses, + validates against gate_summary.schema.json, writes JSON output.""" + + _SUMMARY = { + "gate_type": "design", + "summary": "Hypothesis bundle is well-formed and consistent with active principles.", + "key_points": [ + "Hypothesis bundle covers the four arms.", + "Methodology aligns with prior principles.", + ], + } + + def test_writes_valid_json_when_runner_returns_fenced_payload(self, tmp_path): + fenced = "```json\n" + json.dumps(self._SUMMARY) + "\n```" + runner = _ScriptedRunner([SDKResult(text=fenced)]) + dispatcher = SDKDispatcher( + work_dir=tmp_path, + campaign=_make_campaign(), + sdk_runner=runner, + ) + + out = tmp_path / "runs" / "iter-1" / "gate_summary.json" + dispatcher.dispatch( + "summarizer", "summarize-gate", + output_path=out, iteration=1, perspective="design", + ) + + assert out.exists() + parsed = json.loads(out.read_text()) + jsonschema.validate(parsed, _load_schema("gate_summary.schema.json")) + assert parsed["gate_type"] == "design" + + +# ─── Transient retry behavior ─────────────────────────────────────────────── + +class TestSDKDispatchTransientRetry: + + def test_retries_after_transient_error_then_succeeds(self, tmp_path, monkeypatch): + # Disable backoff sleep to keep the test fast. + monkeypatch.setattr( + "orchestrator.sdk_dispatch.time.sleep", lambda _s: None, + ) + runner = _ScriptedRunner([ + SDKTransientError("network blip"), + SDKResult(text="recovered text", input_tokens=10, output_tokens=5), + ]) + dispatcher = SDKDispatcher( + work_dir=tmp_path, + campaign=_make_campaign(tmp_path), + sdk_runner=runner, + max_retries=3, + ) + + out = tmp_path / "runs" / "iter-1" / "design_log.md" + dispatcher.dispatch("planner", "design", output_path=out, iteration=1) + + assert "recovered text" in out.read_text() + + retry_log = _read_jsonl(tmp_path / "retry_log.jsonl") + assert len(retry_log) == 1 + assert retry_log[0]["role"] == "planner" + assert retry_log[0]["phase"] == "design" + assert "network blip" in retry_log[0]["error"] + + def test_raises_after_retries_exhausted(self, tmp_path, monkeypatch): + monkeypatch.setattr( + "orchestrator.sdk_dispatch.time.sleep", lambda _s: None, + ) + runner = _ScriptedRunner([ + SDKTransientError("persistent failure"), + SDKTransientError("persistent failure"), + SDKTransientError("persistent failure"), + ]) + dispatcher = SDKDispatcher( + work_dir=tmp_path, + campaign=_make_campaign(tmp_path), + sdk_runner=runner, + max_retries=2, + ) + + with pytest.raises(RuntimeError, match="still failing"): + dispatcher.dispatch( + "planner", "design", + output_path=tmp_path / "runs" / "iter-1" / "design_log.md", + iteration=1, + ) + + retry_log = _read_jsonl(tmp_path / "retry_log.jsonl") + # Three failures = three retry-log rows. + assert len(retry_log) == 3 + + +# ─── Error result path ────────────────────────────────────────────────────── + +class TestSDKDispatchErrorResult: + """When the SDK returns is_error=True (e.g. API rejected the request), + the dispatcher treats it as transient unless explicitly fatal.""" + + def test_is_error_treated_as_transient_and_retried(self, tmp_path, monkeypatch): + monkeypatch.setattr( + "orchestrator.sdk_dispatch.time.sleep", lambda _s: None, + ) + runner = _ScriptedRunner([ + SDKResult(text="", is_error=True, error_message="rate limit exceeded"), + SDKResult(text="finally got through", input_tokens=10, output_tokens=5), + ]) + dispatcher = SDKDispatcher( + work_dir=tmp_path, + campaign=_make_campaign(tmp_path), + sdk_runner=runner, + max_retries=3, + ) + + out = tmp_path / "runs" / "iter-1" / "design_log.md" + dispatcher.dispatch("planner", "design", output_path=out, iteration=1) + + assert "finally got through" in out.read_text() + + retry_log = _read_jsonl(tmp_path / "retry_log.jsonl") + assert len(retry_log) == 1 + assert "rate limit exceeded" in retry_log[0]["error"] From bcc82a7ae786308f619d7ce92df18390630bf5b8 Mon Sep 17 00:00:00 2001 From: Srinivasan Parthasarathy Date: Sun, 24 May 2026 08:44:42 -0400 Subject: [PATCH 2/3] feat: explore-then-synthesize DESIGN orchestration helpers (#132, Phase A) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stacks on #121. Phase A ships the orchestration layer that makes splitting DESIGN into Stage A (parallel Explore subagents) + Stage B (Opus synthesis) possible without changing what gets produced (problem.md + bundle.yaml). DESIGN today asks one Opus session to do both codebase mapping AND bundle synthesis. That's the canonical Claude-Code-pattern miss: broad exploration + small synthesis is exactly what parallel Explore subagents are for. Phase A is the orchestration helpers; Phase B (lands when #121 merges and the team picks injection points) wires the SDKDispatcher to actually spawn Explore subagents and thread reports through to the synthesis call. Phase A surface: * DEFAULT_EXPLORE_SCOPES — four scopes the issue calls out: metrics, knobs, prior_findings, principles. Each gets its own Explore subagent. * build_explore_prompt(scope, campaign) — produces a tight, scope-focused prompt for a read-only Explore subagent. Multi-aspect integration is NOT this prompt's job (Stage B does that). * run_explore_stage(campaign, *, scopes, runner) — fans out one subagent per scope via an injected runner callable, collects ExploreReports. Synchronous in Phase A; the SDK's async fan-out lands in Phase B. * build_synthesis_prompt(stage_a, *, campaign, iteration, iter_dir) — Opus prompt that consumes only the Explore reports + principles.json, produces problem.md + bundle.yaml, EXPLICITLY forbids re-reading the codebase ("Do not re-read"). That's the whole point of the split: Opus on integration, not on file walks. Behavioral tests (13 in tests/test_explore_design.py): build_explore_prompt: - metrics scope focuses on observable metrics - knobs scope focuses on configuration parameters - prior_findings references findings.json - principles references the principle store - EVERY scope marks the explorer read-only (the prompt is defense-in-depth on top of subagent_type="Explore") run_explore_stage: - one subagent per default scope (4 calls) - custom scopes pass through - token counts aggregate across reports - by_scope() lookup returns the right report build_synthesis_prompt: - every explorer report appears under its `### ` heading - explicit "Do not re-read" instruction - problem.md + bundle.yaml + iter-N + bundle.schema.yaml all named - research question appears Out of scope (Phase B): - SDKDispatcher integration (spawning subagent_type="Explore" via SDK) - anyio.gather over the four explorer calls for actual parallelism - Token-budget measurement on a representative campaign (criterion "DESIGN cost drops by ≥30%") - Wall-clock measurement on multi-aspect explorations Test suite (this branch, stacked on #121): 344 + 13 new = 357 passing. Refs #120, #132. Stacked on #136. Co-Authored-By: Claude Opus 4.7 (1M context) --- orchestrator/explore_design.py | 206 +++++++++++++++++++++++++++++++++ tests/test_explore_design.py | 149 ++++++++++++++++++++++++ 2 files changed, 355 insertions(+) create mode 100644 orchestrator/explore_design.py create mode 100644 tests/test_explore_design.py diff --git a/orchestrator/explore_design.py b/orchestrator/explore_design.py new file mode 100644 index 0000000..91b57bf --- /dev/null +++ b/orchestrator/explore_design.py @@ -0,0 +1,206 @@ +"""Explore-then-synthesize DESIGN phase (issue #132). + +DESIGN today asks one Opus session to do two things at once: + + 1. Read the codebase to map metrics, knobs, prior findings, principles. + 2. Synthesize a hypothesis bundle from what it found. + +That's the canonical Claude-Code-pattern miss: broad exploration + small +synthesis is exactly what parallel Explore subagents are for. Phase A +of #132 ships the orchestration layer that makes the split possible +without changing what gets produced (problem.md + bundle.yaml). + +Stage A — parallel Explore: ``run_explore_stage(campaign, scopes, +runner)`` fans out one read-only subagent per scope and collects their +reports. + +Stage B — Opus synthesis: ``build_synthesis_prompt(reports, campaign, +iteration)`` produces the prompt body for the single Opus call that +turns the explorer reports + principles.json into problem.md + +bundle.yaml. + +Phase A is the orchestration helpers + their behavioral tests. The +dispatcher integration (SDKDispatcher spawning Explore subagents, +threading reports back into a synthesis call) lands in Phase B once +#121 merges and the team picks injection points. +""" +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Callable, Iterable + +# Default exploration scopes — one Explore subagent per scope. The +# scopes are deliberately overlapping a little so synthesis has +# redundant signal where it matters. +DEFAULT_EXPLORE_SCOPES: tuple[str, ...] = ( + "metrics", # observable metrics + how they're collected + "knobs", # controllable knobs + their value ranges + "prior_findings", # findings.json from previous iterations + "principles", # principles.json across the campaign + others +) + + +@dataclass +class ExploreReport: + scope: str + text: str + duration_ms: int = 0 + input_tokens: int = 0 + output_tokens: int = 0 + + def as_dict(self) -> dict: + return { + "scope": self.scope, + "text": self.text, + "duration_ms": self.duration_ms, + "input_tokens": self.input_tokens, + "output_tokens": self.output_tokens, + } + + +@dataclass +class ExploreStageResult: + reports: list[ExploreReport] = field(default_factory=list) + + @property + def total_input_tokens(self) -> int: + return sum(r.input_tokens for r in self.reports) + + @property + def total_output_tokens(self) -> int: + return sum(r.output_tokens for r in self.reports) + + def by_scope(self, scope: str) -> ExploreReport | None: + for r in self.reports: + if r.scope == scope: + return r + return None + + +def build_explore_prompt(scope: str, campaign: dict) -> str: + """Construct a read-only Explore subagent prompt for one scope. + + The subagent should be spawned with ``subagent_type="Explore"`` so + it cannot mutate the worktree. The prompt is short and scope-tight + on purpose; the synthesis call (Stage B) is where multi-aspect + integration happens. + """ + target = campaign.get("target_system", {}) + name = target.get("name", "the target system") + repo = target.get("repo_path", "(repo not configured)") + + if scope == "metrics": + focus = ( + "Map the observable metrics this system exposes and how they " + "are collected. Include the file/function where each metric is " + "computed." + ) + elif scope == "knobs": + focus = ( + "Map the controllable knobs / configuration parameters this " + "system exposes. For each knob, note its declared range and the " + "code path that consumes it." + ) + elif scope == "prior_findings": + focus = ( + "Read prior runs/iter-*/findings.json files in the campaign " + "directory. Summarize confirmed/refuted hypotheses and any open " + "questions surfaced by the most recent iteration." + ) + elif scope == "principles": + focus = ( + "Read principles.json in this campaign and any sibling campaigns " + "(via the campaign_index module if available). Flag principles " + "that touch the same mechanism we're about to design for." + ) + else: + focus = f"Investigate the '{scope}' aspect of the target system." + + return ( + f"# Explore: {scope}\n\n" + f"You are a read-only Explore subagent. **Do not modify any files.**\n" + f"Target: {name} (repo at {repo})\n\n" + f"## Focus\n{focus}\n\n" + f"## Output\n" + f"Return a markdown report of <= 500 lines. Cite file paths and " + f"line numbers. End with a one-paragraph summary the synthesizer " + f"can read in isolation.\n" + ) + + +ExploreRunner = Callable[[str, str, dict], ExploreReport] +"""Callable signature for running one Explore subagent. + +Takes (scope, prompt, campaign) and returns an ExploreReport. The +default real-world implementation spawns subagent_type="Explore" via +the SDK and reads the assistant's final text. Tests inject a deterministic +fake. +""" + + +def run_explore_stage( + campaign: dict, + *, + scopes: Iterable[str] = DEFAULT_EXPLORE_SCOPES, + runner: ExploreRunner, +) -> ExploreStageResult: + """Run one Explore subagent per scope and collect their reports. + + Phase A executes synchronously over the runner. Real parallel + fan-out (anyio gather over the SDK's async API) lands in Phase B + when the SDK runner ships its async surface. + """ + reports: list[ExploreReport] = [] + for scope in scopes: + prompt = build_explore_prompt(scope, campaign) + report = runner(scope, prompt, campaign) + reports.append(report) + return ExploreStageResult(reports=reports) + + +def build_synthesis_prompt( + stage_a: ExploreStageResult, + *, + campaign: dict, + iteration: int, + iter_dir: Path, +) -> str: + """Build the Opus synthesis prompt that turns Explore reports into + problem.md + bundle.yaml. + + The synthesizer never reads the codebase directly — it consumes only + the explorer reports + principles.json. That's the whole point of + the split: Opus on integration, not on file walks. + """ + target = campaign.get("target_system", {}) + rq = campaign.get("research_question", "(not set)") + + sections = [ + f"# Synthesize iteration {iteration}", + "", + "Four read-only Explore subagents have already mapped the system.", + "**Do not re-read the codebase.** Synthesize from the reports below.", + "", + f"## Research question\n{rq}", + "", + f"## Target\n{target.get('name', '?')} — {target.get('description', '')}", + "", + "## Explorer reports", + ] + for report in stage_a.reports: + sections.append("") + sections.append(f"### {report.scope}\n") + sections.append(report.text) + + sections.extend([ + "", + "## Required outputs", + f"- {iter_dir}/problem.md (markdown)", + f"- {iter_dir}/bundle.yaml (YAML, must validate against bundle.schema.yaml)", + "", + "Cite explorer reports by their `### ` heading when justifying " + "design choices. The reports are the source of truth for this " + "iteration's design.", + ]) + return "\n".join(sections) diff --git a/tests/test_explore_design.py b/tests/test_explore_design.py new file mode 100644 index 0000000..c87b565 --- /dev/null +++ b/tests/test_explore_design.py @@ -0,0 +1,149 @@ +"""Behavioral tests for the explore-then-synthesize DESIGN split (#132 Phase A).""" +from __future__ import annotations + +from pathlib import Path + +from orchestrator.explore_design import ( + DEFAULT_EXPLORE_SCOPES, + ExploreReport, + build_explore_prompt, + build_synthesis_prompt, + run_explore_stage, +) + + +def _campaign(**overrides): + base = { + "research_question": "What drives saturation?", + "target_system": { + "name": "BLIS", + "description": "Inference simulator.", + "observable_metrics": ["throughput", "latency"], + "controllable_knobs": ["batch_size", "scheduling"], + "repo_path": "/path/to/blis", + }, + } + base.update(overrides) + return base + + +# ─── Per-scope prompt builders ───────────────────────────────────────────── + +class TestBuildExplorePrompt: + + def test_metrics_prompt_focuses_on_observable_metrics(self): + out = build_explore_prompt("metrics", _campaign()) + assert "Explore: metrics" in out + assert "metric" in out.lower() + assert "BLIS" in out # target name appears + + def test_knobs_prompt_focuses_on_configuration(self): + out = build_explore_prompt("knobs", _campaign()) + assert "knob" in out.lower() or "config" in out.lower() + + def test_prior_findings_prompt_references_findings_json(self): + out = build_explore_prompt("prior_findings", _campaign()) + assert "findings.json" in out + + def test_principles_prompt_references_principles_store(self): + out = build_explore_prompt("principles", _campaign()) + assert "principles" in out.lower() + + def test_every_prompt_marks_explorer_read_only(self): + for scope in DEFAULT_EXPLORE_SCOPES: + out = build_explore_prompt(scope, _campaign()) + # Read-only enforcement must be EXPLICIT — Explore subagents + # don't have write tools, but the prompt should still say so. + assert "Do not modify" in out or "read-only" in out.lower() + + +# ─── Run stage A: collect reports ────────────────────────────────────────── + +class _RecordingRunner: + def __init__(self): + self.calls: list[dict] = [] + + def __call__(self, scope: str, prompt: str, campaign: dict) -> ExploreReport: + self.calls.append({"scope": scope, "prompt": prompt, "campaign": campaign}) + return ExploreReport( + scope=scope, + text=f"report for {scope}", + duration_ms=100, + input_tokens=200, + output_tokens=80, + ) + + +class TestRunExploreStage: + + def test_runs_one_subagent_per_default_scope(self): + runner = _RecordingRunner() + result = run_explore_stage(_campaign(), runner=runner) + + assert len(runner.calls) == len(DEFAULT_EXPLORE_SCOPES) + assert [r.scope for r in result.reports] == list(DEFAULT_EXPLORE_SCOPES) + + def test_custom_scopes_pass_through(self): + runner = _RecordingRunner() + run_explore_stage(_campaign(), scopes=["a", "b"], runner=runner) + assert [c["scope"] for c in runner.calls] == ["a", "b"] + + def test_aggregates_token_counts(self): + runner = _RecordingRunner() + result = run_explore_stage(_campaign(), runner=runner) + # 4 explorers × 200 input × 80 output. + assert result.total_input_tokens == 800 + assert result.total_output_tokens == 320 + + def test_lookup_by_scope_returns_correct_report(self): + runner = _RecordingRunner() + result = run_explore_stage(_campaign(), runner=runner) + report = result.by_scope("metrics") + assert report is not None + assert report.scope == "metrics" + + +# ─── Stage B: synthesis prompt ───────────────────────────────────────────── + +class TestBuildSynthesisPrompt: + + def _stage_a(self) -> "ExploreStageResult": # type: ignore[name-defined] + runner = _RecordingRunner() + return run_explore_stage(_campaign(), runner=runner) + + def test_includes_every_explorer_report_under_its_scope(self, tmp_path): + stage_a = self._stage_a() + out = build_synthesis_prompt( + stage_a, campaign=_campaign(), iteration=1, + iter_dir=tmp_path / "runs" / "iter-1", + ) + for scope in DEFAULT_EXPLORE_SCOPES: + assert f"### {scope}" in out + assert f"report for {scope}" in out + + def test_explicitly_forbids_re_reading_codebase(self, tmp_path): + stage_a = self._stage_a() + out = build_synthesis_prompt( + stage_a, campaign=_campaign(), iteration=1, + iter_dir=tmp_path / "runs" / "iter-1", + ) + assert "Do not re-read" in out + + def test_required_outputs_named(self, tmp_path): + stage_a = self._stage_a() + out = build_synthesis_prompt( + stage_a, campaign=_campaign(), iteration=2, + iter_dir=tmp_path / "runs" / "iter-2", + ) + assert "problem.md" in out + assert "bundle.yaml" in out + assert "iter-2" in out + assert "bundle.schema.yaml" in out + + def test_research_question_appears(self, tmp_path): + stage_a = self._stage_a() + out = build_synthesis_prompt( + stage_a, campaign=_campaign(), iteration=1, + iter_dir=tmp_path / "runs" / "iter-1", + ) + assert "What drives saturation?" in out From a186f2a26d79018e1caaab35cabebae94895c051 Mon Sep 17 00:00:00 2001 From: Srinivasan Parthasarathy Date: Sun, 24 May 2026 19:20:54 -0400 Subject: [PATCH 3/3] feat: make_sdk_explore_runner factory for Stage A (#132 Phase B) Closes the SDK-integration gap from #149 (Phase A): adds make_sdk_explore_runner(*, sdk_runner, cwd, model, max_turns) that returns an ExploreRunner-shaped callable backed by a read-only Explore subagent (subagent_type='Explore'). Per the no-live-LLM project principle (CLAUDE.md), the factory takes an injected sdk_runner. Production wiring constructs the real Anthropic SDK runner; tests inject a recording fake. Defaults model to Haiku because read-only mapping is cheap and benefits from speed over depth; deep synthesis happens in Stage B (the single Opus call), not Stage A. Three new behavioral tests: test_dispatches_each_scope_with_explore_subagent_type: With four default scopes, the SDK runner is called four times, each with subagent_type='Explore'. Reports carry the runner's text + token counts; total_input_tokens aggregates correctly. test_falls_back_when_sdk_runner_lacks_subagent_kwarg: Older runners without subagent_type kwarg are accommodated via TypeError fallback to the base signature. Forward/backward compatibility across SDK API evolution. test_uses_haiku_by_default: Default model is Haiku (read-only mapping should be cheap). A local _LocalSDKResult stand-in keeps this branch independent of sdk_dispatch.py; the real SDKResult is duck-compatible. Closes #132. --- orchestrator/explore_design.py | 51 ++++++++++++++++++++++ tests/test_explore_design.py | 80 ++++++++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+) diff --git a/orchestrator/explore_design.py b/orchestrator/explore_design.py index 91b57bf..4d037d3 100644 --- a/orchestrator/explore_design.py +++ b/orchestrator/explore_design.py @@ -159,6 +159,57 @@ def run_explore_stage( return ExploreStageResult(reports=reports) +def make_sdk_explore_runner( + *, + sdk_runner: Callable, + cwd: Path | None = None, + model: str = "claude-haiku-4-5", + max_turns: int = 8, +) -> ExploreRunner: + """Build an ExploreRunner backed by an SDK subagent (#132 Phase B). + + Each scope spawns a read-only subagent (``subagent_type="Explore"``) + so the orchestrator gets parallel mapping without a giant Opus + session doing both walking and synthesis. Per the no-live-LLM + project principle (CLAUDE.md), this factory takes an injected + ``sdk_runner`` — production wiring constructs the real Anthropic + SDK runner; tests inject a recording fake. + + Defaults model to Haiku because read-only mapping is cheap and + benefits from speed over depth; deep synthesis happens in Stage B + (the single Opus call), not in Stage A. + """ + def _run(scope: str, prompt: str, campaign: dict) -> ExploreReport: + try: + result = sdk_runner( + prompt=prompt, + model=model, + cwd=cwd, + max_turns=max_turns, + system_prompt=None, + settings_path=None, + event_log_path=None, + subagent_type="Explore", + ) + except TypeError: + # Older runners without subagent_type — fall back to the + # base signature so the factory stays compatible across + # SDK API evolution. + result = sdk_runner( + prompt=prompt, model=model, cwd=cwd, max_turns=max_turns, + ) + + return ExploreReport( + scope=scope, + text=getattr(result, "text", "") or "", + duration_ms=int(getattr(result, "duration_ms", 0) or 0), + input_tokens=int(getattr(result, "input_tokens", 0) or 0), + output_tokens=int(getattr(result, "output_tokens", 0) or 0), + ) + + return _run + + def build_synthesis_prompt( stage_a: ExploreStageResult, *, diff --git a/tests/test_explore_design.py b/tests/test_explore_design.py index c87b565..7e26ca6 100644 --- a/tests/test_explore_design.py +++ b/tests/test_explore_design.py @@ -147,3 +147,83 @@ def test_research_question_appears(self, tmp_path): iter_dir=tmp_path / "runs" / "iter-1", ) assert "What drives saturation?" in out + + +# ─── Phase B: SDK explore runner factory ─────────────────────────────────── + + +from dataclasses import dataclass as _dataclass + + +@_dataclass +class _LocalSDKResult: + """Local stand-in for SDKResult; the real one is duck-compatible.""" + text: str = "" + duration_ms: int = 0 + input_tokens: int = 0 + output_tokens: int = 0 + + +class TestMakeSdkExploreRunner: + """The factory wraps an injected sdk_runner so each Stage A scope + spawns a read-only Explore subagent. Tests assert what the runner + sends to the SDK and how it maps the response back to ExploreReport. + No live SDK call happens (no-live-LLM policy, see CLAUDE.md).""" + + def test_dispatches_each_scope_with_explore_subagent_type(self): + from orchestrator.explore_design import make_sdk_explore_runner + + sdk_calls: list[dict] = [] + + def sdk_runner(**kwargs): + sdk_calls.append(kwargs) + return _LocalSDKResult( + text="report", duration_ms=80, + input_tokens=300, output_tokens=120, + ) + + explore_runner = make_sdk_explore_runner( + sdk_runner=sdk_runner, cwd=None, model="claude-haiku-4-5", + max_turns=8, + ) + result = run_explore_stage(_campaign(), runner=explore_runner) + + assert len(sdk_calls) == len(DEFAULT_EXPLORE_SCOPES) + # Every call passes subagent_type=Explore — the harness signal + # for read-only mapping. + assert all(c.get("subagent_type") == "Explore" for c in sdk_calls) + assert all(r.text and r.input_tokens == 300 for r in result.reports) + assert result.total_input_tokens == 300 * len(DEFAULT_EXPLORE_SCOPES) + + def test_falls_back_when_sdk_runner_lacks_subagent_kwarg(self): + """Forward/backward compatibility: older sdk_runners without + subagent_type still work; the factory drops the kwarg on + TypeError and retries with the base signature.""" + from orchestrator.explore_design import make_sdk_explore_runner + + seen: list[dict] = [] + + def old_signature_runner(*, prompt, model, cwd, max_turns): + seen.append({"prompt": prompt, "max_turns": max_turns}) + return _LocalSDKResult(text="ok") + + explore_runner = make_sdk_explore_runner(sdk_runner=old_signature_runner) + run_explore_stage(_campaign(), scopes=["metrics"], runner=explore_runner) + + assert len(seen) == 1 + assert seen[0]["prompt"] + + def test_uses_haiku_by_default(self): + """Read-only mapping should be cheap — default model is Haiku.""" + from orchestrator.explore_design import make_sdk_explore_runner + + models: list[str] = [] + + def sdk_runner(**kwargs): + models.append(kwargs.get("model", "")) + return _LocalSDKResult() + + explore_runner = make_sdk_explore_runner(sdk_runner=sdk_runner) + run_explore_stage(_campaign(), scopes=["metrics"], runner=explore_runner) + + assert models[0].lower().startswith("claude-haiku")