diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..1b8f260 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,59 @@ +name: Tests + +on: + push: + branches: [main, reflective] + # Trigger on every PR regardless of base branch so contributors get + # CI feedback on long-running integration branches (e.g. tracking-N) + # in addition to PRs targeting main/reflective. + pull_request: + +# Cancel in-flight runs on the same PR/branch when a new push lands. +# Only safe context expressions used here: github.workflow, github.ref, +# github.event_name. None come from user-controlled input. +concurrency: + group: tests-${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: ${{ github.event_name == 'pull_request' }} + +jobs: + pytest: + name: pytest (Python ${{ matrix.python-version }}) + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.11", "3.12"] + + # No LLM API keys in the env. The no-live-LLM project principle + # (CLAUDE.md, tests/CLAUDE.md, tests/conftest.py autouse guard) says + # tests must mock LLMs, never call them. This is the outer line of + # defence; the conftest guard is the inner. + env: + OPENAI_API_KEY: "" + OPENAI_BASE_URL: "" + ANTHROPIC_API_KEY: "" + + steps: + - uses: actions/checkout@v5 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + cache: pip + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e ".[dev]" + + - name: Run pytest + run: pytest -ra --strict-markers + + - name: Upload pytest cache on failure + if: failure() + uses: actions/upload-artifact@v4 + with: + name: pytest-cache-py${{ matrix.python-version }} + path: .pytest_cache/ + if-no-files-found: ignore diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..a5e9cfe --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,82 @@ +# Nous β€” project conventions + +This file is auto-loaded by Claude Code on every session in this repo. The +rules below are non-negotiable; when they conflict with general AI/coding +defaults, **the rules here win**. + +## 🚫 Tests must NEVER make live LLM calls + +**No unit, integration, or end-to-end test in this repo may make a real +API call to Anthropic, OpenAI, or any other LLM provider. Period.** + +Why this is a hard rule: +- Tests run on every CI build, every contributor's laptop, and every PR + rebase. Live LLM calls would burn tokens for no signal β€” the test + result depends on what the model said today, not on the code under test. +- Token budget for `nous` is mission-critical. We refuse to spend it on + CI churn. +- Live calls are non-deterministic. A flaky test from a model rephrasing + itself is worse than no test. + +**How to test correctly:** + +| Code under test | How to mock | +|---|---| +| `LLMDispatcher` | Pass `completion_fn=` in the constructor β€” a callable that returns canned `chat.completions`-shaped objects. See `tests/test_llm_dispatch.py`'s `_make_fake_completion` for the pattern. | +| `CLIDispatcher` (claude -p subprocess) | Patch `orchestrator.cli_dispatch.subprocess.run` β€” return a `subprocess.CompletedProcess` with the JSON the test wants. See `tests/test_cli_dispatch.py`. | +| `SDKDispatcher` (Claude Agent SDK) | Pass `sdk_runner=` in the constructor β€” a callable returning `SDKResult`. See `tests/test_sdk_dispatch.py`'s `_ScriptedRunner`. | +| `InlineDispatcher` | Set up the `.nous_response_*` signal file in tmp_path before calling dispatch. | +| Stub-driven flows | Use `StubDispatcher` from `orchestrator.dispatch` β€” it produces valid schema-conformant artifacts with no LLM at all. | + +**Active enforcement:** `tests/conftest.py` installs an autouse fixture +(`block_live_llm_calls`) that: +1. Strips `OPENAI_API_KEY` and `ANTHROPIC_API_KEY` from the env so any + accidental real-client construction fails loudly instead of silently + billing. +2. Patches `urllib.request.urlopen` to refuse `api.anthropic.com`, + `api.openai.com`, and `api.litellm.ai` hosts. +3. Patches `claude_agent_sdk.query` (when installed) to a hard-fail. + +If a test triggers any of these guards, the fix is to inject a fake at +the dispatcher's seam β€” never to disable the guard. The guards are the +backstop; the seams are the contract. + +## Behavioral testing only + +When the test mock is in place, write **behavioral** tests: +- βœ“ Assert what's on disk after `dispatcher.dispatch(...)`. +- βœ“ Assert metrics rows in `llm_metrics.jsonl`. +- βœ“ Assert artifacts match a JSON Schema. +- βœ— Don't assert which method was called on the mock. +- βœ— Don't assert argv shape, internal helper invocation, or attribute access. + +The seam is the contract; the implementation is free to evolve. + +## Token-budget discipline (production code) + +Beyond tests, Nous itself must be frugal with tokens: +- **Methodology stays in `CLAUDE.md`** (auto-loaded by Claude Code), not + in per-call prompts. The thin templates in `prompts/methodology/*_thin.md` + carry only per-iteration context. +- **System blocks are cached** (`cache_control: ephemeral`). Any code + that constructs an SDK call with a static system_prompt should rely + on this, and any change that breaks within-iteration cache locality + must be measured (`nous cost --cache-stats`) and justified. +- **Read-only mapping uses Explore subagents**, not Opus. See + `orchestrator/explore_design.py`. + +## PR workflow (project owner: @sriumcp) + +1. Branch off `upstream/reflective` (NOT `main`). +2. Push to `origin` (the fork at `sriumcp/agentic-strategy-evolution`). +3. Open PR with base `upstream/reflective`, head `sriumcp:`. +4. PR body links the issue with `Closes #N` (or `Refs #N` for partials). +5. Stack PRs when one logical change builds on another rather than waiting + for merge β€” see `docs/plans/CHECKPOINT.md` for the pattern. + +## See also + +- `docs/contributing/workflow.md` β€” full workflow doc. +- `docs/security.md` β€” permission policy (#135). +- `docs/architecture.md` β€” internals. +- `docs/plans/CHECKPOINT.md` β€” current state of the #120 epic. diff --git a/README.md b/README.md index 706c21e..5cbe746 100644 --- a/README.md +++ b/README.md @@ -80,17 +80,25 @@ If you're using Anthropic directly via a LiteLLM proxy, point both vars at the p ### 1. Install Nous ```bash -pip install "git+https://github.com/AI-native-Systems-Research/agentic-strategy-evolution.git" +pip install "git+https://github.com/AI-native-Systems-Research/agentic-strategy-evolution.git@reflective" ``` +`reflective` is the active integration branch β€” that's where new work lands first. `main` lags slightly behind. To pin to a release, replace `@reflective` with a tag (`@v0.2.0`). + For development (editable install with test dependencies): ```bash -git clone https://github.com/AI-native-Systems-Research/agentic-strategy-evolution.git +git clone -b reflective https://github.com/AI-native-Systems-Research/agentic-strategy-evolution.git cd agentic-strategy-evolution pip install -e ".[dev]" ``` +For the SDK-based dispatcher (`--agent sdk`, see `docs/architecture.md`), also install the optional `[sdk]` extra: + +```bash +pip install -e ".[dev,sdk]" +``` + ### 2. Configure models Two LLM calls per iteration, both via `claude -p`: diff --git a/bin/nous-execute-stop b/bin/nous-execute-stop new file mode 100755 index 0000000..4a26477 --- /dev/null +++ b/bin/nous-execute-stop @@ -0,0 +1,85 @@ +#!/usr/bin/env python3 +"""Stop hook for the Nous executor session (issue #129). + +Runs after every Claude Code agent turn. Returns: + exit 0 β†’ allow the agent to stop (its work is done). + exit 2 β†’ block stopping; the structured reason on stderr is fed back + into the agent's conversation so it can react. + +A "stop is allowed" decision needs two pieces of evidence on disk: + 1. ``$NOUS_ITER_DIR/principle_updates.json`` exists. + 2. ``nous validate execution --dir $NOUS_ITER_DIR`` returns ``status: pass``. + +Both are deterministic β€” no LLM judgment, no agent self-assessment. The +hook pairs with the ``/goal``-driven loop (#124) but is preferred wherever +the success criterion is a schema check, because it's cheaper and more +reliable than a Haiku evaluator. + +Configured per-campaign in ``.claude/settings.json`` (see #135). The +orchestrator sets ``NOUS_ITER_DIR`` before launching the executor session. +""" +from __future__ import annotations + +import os +import sys +from pathlib import Path + +# When invoked as a Claude Code hook, the script's directory may not be +# on PYTHONPATH. Add the repo root so `orchestrator.validate` imports. +_HERE = Path(__file__).resolve().parent +_REPO_ROOT = _HERE.parent +if str(_REPO_ROOT) not in sys.path: + sys.path.insert(0, str(_REPO_ROOT)) + +from orchestrator.validate import validate_execution # noqa: E402 + + +_OK = 0 +_BLOCK = 2 + + +def main() -> int: + iter_dir_str = os.environ.get("NOUS_ITER_DIR") + if not iter_dir_str: + print( + "NOUS_ITER_DIR is not set. The orchestrator should export this " + "variable before launching the executor session.", + file=sys.stderr, + ) + return _BLOCK + + iter_dir = Path(iter_dir_str) + if not iter_dir.is_dir(): + print( + f"iter_dir does not exist: {iter_dir}. NOUS_ITER_DIR is " + f"misconfigured or the executor was launched before init.", + file=sys.stderr, + ) + return _BLOCK + + principles = iter_dir / "principle_updates.json" + if not principles.exists(): + print( + f"principle_updates.json is missing from {iter_dir}. " + f"Write the file (a JSON list, possibly empty: []) before stopping.", + file=sys.stderr, + ) + return _BLOCK + + result = validate_execution(iter_dir) + if result.get("status") != "pass": + errors = result.get("errors", []) + print( + f"validation failed for {iter_dir} ({len(errors)} error(s)). " + f"Fix these before stopping:", + file=sys.stderr, + ) + for err in errors: + print(f" - {err}", file=sys.stderr) + return _BLOCK + + return _OK + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/bin/nous-mcp b/bin/nous-mcp new file mode 100755 index 0000000..17309f3 --- /dev/null +++ b/bin/nous-mcp @@ -0,0 +1,306 @@ +#!/usr/bin/env python3 +"""nous-mcp: stdio MCP server exposing Nous campaigns (#126 Phase B). + +Wraps the pure functions in ``orchestrator.campaign_index`` as MCP +resources and tools so any Claude Code session β€” terminal, IDE, web β€” +can ``@``-reference a campaign or call ``nous.search_principles(...)`` +without bash plumbing. + +Protocol: JSON-RPC 2.0 over stdio (line-delimited JSON, one request / +response per line). Compatible with Claude Code's MCP transport when +registered in ``~/.claude.json`` under ``mcpServers``: + + { + "mcpServers": { + "nous": { + "command": "python", + "args": ["-u", "/path/to/repo/bin/nous-mcp"], + "env": {"NOUS_SEARCH_ROOT": "/path/to/parent/of/.nous/"} + } + } + } + +The server is stateless: campaigns live on disk; every request re-walks +``$NOUS_SEARCH_ROOT`` (or the path passed in the request). + +Methods: + initialize / shutdown -- MCP handshake + resources/list -- nous://campaigns and per-campaign URIs + resources/read -- read a specific resource + tools/list -- list_campaigns / search_principles / + get_arm_results / compare_iterations + tools/call -- invoke a tool by name with arguments +""" +from __future__ import annotations + +import json +import os +import sys +from pathlib import Path + +_HERE = Path(__file__).resolve().parent +_REPO_ROOT = _HERE.parent +if str(_REPO_ROOT) not in sys.path: + sys.path.insert(0, str(_REPO_ROOT)) + +from orchestrator.campaign_index import ( # noqa: E402 + compare_iterations, + get_arm_results, + list_campaigns, + search_principles, +) + + +_SERVER_INFO = { + "name": "nous-mcp", + "version": "0.2.0", + "description": "Read-only access to Nous campaigns on disk.", +} + +_CAPABILITIES = { + "resources": {"list": True, "read": True}, + "tools": {"list": True, "call": True}, +} + + +_TOOLS = [ + { + "name": "nous.list_campaigns", + "description": ( + "List all Nous campaigns under the search root. " + "Optional filters: query (substring on run_id), status (phase), repo." + ), + "inputSchema": { + "type": "object", + "properties": { + "search_root": {"type": "string"}, + "query": {"type": "string"}, + "status": {"type": "string"}, + "repo": {"type": "string"}, + }, + }, + }, + { + "name": "nous.search_principles", + "description": ( + "Search principles across all campaigns by substring. " + "Hits include the source campaign run_id and path." + ), + "inputSchema": { + "type": "object", + "properties": { + "search_root": {"type": "string"}, + "text": {"type": "string"}, + "only_active": {"type": "boolean"}, + }, + "required": ["text"], + }, + }, + { + "name": "nous.get_arm_results", + "description": ( + "Aggregate per-seed result files for one arm of one iteration." + ), + "inputSchema": { + "type": "object", + "properties": { + "campaign_root": {"type": "string"}, + "iteration": {"type": "integer"}, + "arm": {"type": "string"}, + }, + "required": ["campaign_root", "iteration", "arm"], + }, + }, + { + "name": "nous.compare_iterations", + "description": ( + "Deterministic diff between two iterations of one campaign β€” " + "arm-status changes and added principles." + ), + "inputSchema": { + "type": "object", + "properties": { + "campaign_root": {"type": "string"}, + "iter_a": {"type": "integer"}, + "iter_b": {"type": "integer"}, + }, + "required": ["campaign_root", "iter_a", "iter_b"], + }, + }, +] + + +def _default_search_root() -> str: + return os.environ.get("NOUS_SEARCH_ROOT", str(Path.cwd())) + + +def _resource_list(search_root: str) -> list[dict]: + """Build the MCP resources/list payload from disk state.""" + out: list[dict] = [{ + "uri": "nous://campaigns", + "name": "All campaigns", + "description": "Index of every Nous campaign under the search root.", + "mimeType": "application/json", + }] + for campaign in list_campaigns(Path(search_root)): + run_id = campaign["run_id"] + out.append({ + "uri": f"nous://campaigns/{run_id}/state", + "name": f"{run_id} β€” state", + "description": f"Phase + iteration of campaign {run_id}.", + "mimeType": "application/json", + }) + out.append({ + "uri": f"nous://campaigns/{run_id}/principles", + "name": f"{run_id} β€” principles", + "description": f"Active principles accumulated in {run_id}.", + "mimeType": "application/json", + }) + return out + + +def _read_resource(uri: str, search_root: str) -> dict: + """Resolve a nous:// URI to its JSON contents.""" + if uri == "nous://campaigns": + return {"campaigns": list_campaigns(Path(search_root))} + + if not uri.startswith("nous://campaigns/"): + raise ValueError(f"unknown URI scheme: {uri!r}") + parts = uri[len("nous://campaigns/"):].split("/") + if len(parts) < 2: + raise ValueError(f"malformed campaign URI: {uri!r}") + run_id, leaf = parts[0], "/".join(parts[1:]) + + # Find campaign root by run_id under search_root. + matching = [c for c in list_campaigns(Path(search_root)) if c["run_id"] == run_id] + if not matching: + raise ValueError(f"unknown campaign: {run_id!r}") + root = Path(matching[0]["path"]) + + if leaf == "state": + return json.loads((root / "state.json").read_text()) + if leaf == "principles": + return json.loads((root / "principles.json").read_text()) + if leaf.startswith("iter/") and leaf.endswith("/findings"): + n = int(leaf.split("/")[1]) + return json.loads((root / "runs" / f"iter-{n}" / "findings.json").read_text()) + raise ValueError(f"unsupported leaf: {leaf!r}") + + +def _call_tool(name: str, args: dict) -> dict: + """Dispatch a tools/call request to campaign_index.""" + if name == "nous.list_campaigns": + return { + "campaigns": list_campaigns( + Path(args.get("search_root", _default_search_root())), + query=args.get("query"), + status=args.get("status"), + repo=args.get("repo"), + ), + } + if name == "nous.search_principles": + return { + "hits": search_principles( + Path(args.get("search_root", _default_search_root())), + args["text"], + only_active=args.get("only_active", True), + ), + } + if name == "nous.get_arm_results": + return get_arm_results( + Path(args["campaign_root"]), + int(args["iteration"]), + args["arm"], + ) + if name == "nous.compare_iterations": + return compare_iterations( + Path(args["campaign_root"]), + int(args["iter_a"]), + int(args["iter_b"]), + ) + raise ValueError(f"unknown tool: {name!r}") + + +def handle_request(request: dict, *, search_root: str | None = None) -> dict: + """Process one JSON-RPC request and return the response dict. + + Pure function β€” testable without stdio. The main loop calls this + for each line and writes the result back. + """ + rid = request.get("id") + method = request.get("method", "") + params = request.get("params") or {} + root = search_root or _default_search_root() + + try: + if method == "initialize": + result: dict = { + "protocolVersion": "2024-11-05", + "capabilities": _CAPABILITIES, + "serverInfo": _SERVER_INFO, + } + elif method == "shutdown": + result = {} + elif method == "resources/list": + result = {"resources": _resource_list(root)} + elif method == "resources/read": + uri = params.get("uri", "") + payload = _read_resource(uri, root) + result = { + "contents": [{ + "uri": uri, + "mimeType": "application/json", + "text": json.dumps(payload, indent=2), + }], + } + elif method == "tools/list": + result = {"tools": _TOOLS} + elif method == "tools/call": + name = params.get("name", "") + args = params.get("arguments", {}) or {} + payload = _call_tool(name, args) + result = { + "content": [{ + "type": "text", + "text": json.dumps(payload, indent=2), + }], + } + else: + return { + "jsonrpc": "2.0", + "id": rid, + "error": {"code": -32601, "message": f"method not found: {method}"}, + } + except Exception as exc: + return { + "jsonrpc": "2.0", + "id": rid, + "error": {"code": -32603, "message": f"{type(exc).__name__}: {exc}"}, + } + + return {"jsonrpc": "2.0", "id": rid, "result": result} + + +def main() -> int: + for line in sys.stdin: + line = line.strip() + if not line: + continue + try: + request = json.loads(line) + except json.JSONDecodeError as exc: + sys.stdout.write(json.dumps({ + "jsonrpc": "2.0", + "id": None, + "error": {"code": -32700, "message": f"parse error: {exc}"}, + }) + "\n") + sys.stdout.flush() + continue + response = handle_request(request) + sys.stdout.write(json.dumps(response) + "\n") + sys.stdout.flush() + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/bin/nous-plan-enforcer b/bin/nous-plan-enforcer new file mode 100755 index 0000000..0382f63 --- /dev/null +++ b/bin/nous-plan-enforcer @@ -0,0 +1,195 @@ +#!/usr/bin/env python3 +"""PreToolUse hook: enforce experiment_plan.yaml during EXECUTE_ANALYZE (#128). + +Claude Code calls this hook before every Bash tool invocation in the +executor session. It compares the proposed command against the plan +sitting in ``$NOUS_ITER_DIR/experiment_plan.yaml`` and decides whether +to allow it. + +Two modes (controlled by ``NOUS_PLAN_ENFORCEMENT``): + + * ``strict``: exit 2 with a structured reason on stderr if the + proposed command's head binary doesn't match any planned condition's + head binary. The agent receives the reason in its conversation and + is expected to either (a) revise the command or (b) annotate it + ``# nous: ad-hoc`` to explicitly opt out for one call. + * ``warn`` (default): always exit 0; record violations to + ``$NOUS_ITER_DIR/plan_violations.jsonl`` for audit. Lets you watch + for drift in soak runs without breaking iteration. + +Escape hatch: a command containing the literal string ``# nous: ad-hoc`` +is allowed in both modes and logged as ``kind: ad-hoc`` so reviewers can +audit how often it's used. + +Exit codes: 0 = allow, 2 = block (strict only). +""" +from __future__ import annotations + +import json +import os +import sys +from datetime import datetime, timezone +from pathlib import Path +from typing import Iterable + +import yaml + +_AD_HOC_MARKER = "# nous: ad-hoc" +_OK = 0 +_BLOCK = 2 + + +def _read_event() -> dict: + """Read the PreToolUse JSON payload from stdin. Returns {} on bad input.""" + try: + raw = sys.stdin.read() + if not raw.strip(): + return {} + return json.loads(raw) + except json.JSONDecodeError: + return {} + + +def _proposed_command(event: dict) -> str | None: + """Return the Bash command this event is proposing, or None for non-Bash.""" + if event.get("tool_name") != "Bash": + return None + cmd = event.get("tool_input", {}).get("command") + if not isinstance(cmd, str): + return None + return cmd + + +def _head_binary(cmd: str) -> str | None: + """Pull the basename of the first token of a shell command.""" + cmd = cmd.lstrip() + # Strip leading comments or ad-hoc marker so we extract the real binary. + for line in cmd.splitlines(): + stripped = line.strip() + if not stripped or stripped.startswith("#"): + continue + first = stripped.split()[0] + # Drop env-var prefix like ``FOO=bar binary``. + while "=" in first and not first.startswith("/") and not first.startswith("./"): + # heuristic: env-var-only assignment, skip to next token + tokens = stripped.split() + if len(tokens) < 2: + return None + tokens.pop(0) + stripped = " ".join(tokens) + first = stripped.split()[0] + return first.split("/")[-1] + return None + + +def _plan_binaries(plan_path: Path) -> set[str]: + """Extract the set of head-binary basenames referenced in the plan.""" + if not plan_path.exists(): + return set() + try: + plan = yaml.safe_load(plan_path.read_text()) or {} + except yaml.YAMLError: + return set() + bins: set[str] = set() + for arm in plan.get("arms", []) or []: + for cond in arm.get("conditions", []) or []: + cmd = cond.get("command") or cond.get("cmd") + if isinstance(cmd, str): + bin_name = _head_binary(cmd) + if bin_name: + bins.add(bin_name) + return bins + + +def _planning_arm_for(plan_path: Path, head: str) -> str | None: + """Best-effort: return arm_id where ``head`` appears (or None).""" + if not plan_path.exists(): + return None + try: + plan = yaml.safe_load(plan_path.read_text()) or {} + except yaml.YAMLError: + return None + for arm in plan.get("arms", []) or []: + for cond in arm.get("conditions", []) or []: + cmd = cond.get("command") or cond.get("cmd") + if isinstance(cmd, str) and _head_binary(cmd) == head: + return arm.get("arm_id") + return None + + +def _log_violation( + iter_dir: Path, + *, + kind: str, + command: str, + arm: str | None, +) -> None: + log_path = iter_dir / "plan_violations.jsonl" + record = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "kind": kind, + "command": command, + "arm": arm or "", + } + try: + with open(log_path, "a") as f: + f.write(json.dumps(record) + "\n") + except OSError: + # Best-effort; never block the agent because logging failed. + pass + + +def main() -> int: + iter_dir_str = os.environ.get("NOUS_ITER_DIR") + mode = os.environ.get("NOUS_PLAN_ENFORCEMENT", "warn").lower() + + event = _read_event() + cmd = _proposed_command(event) + if cmd is None: + # Not a Bash event β€” nothing to enforce. + return _OK + + if not iter_dir_str: + # Hook misconfigured β€” fail open (cannot block what we can't compare). + return _OK + + iter_dir = Path(iter_dir_str) + plan_path = iter_dir / "experiment_plan.yaml" + + # Escape hatch. + if _AD_HOC_MARKER in cmd: + _log_violation(iter_dir, kind="ad-hoc", command=cmd, arm=None) + return _OK + + head = _head_binary(cmd) + if head is None: + # Couldn't parse β€” fail open (warn) or block (strict). + if mode == "strict": + print( + f"plan enforcer could not parse the proposed command:\n {cmd!r}", + file=sys.stderr, + ) + return _BLOCK + return _OK + + planned = _plan_binaries(plan_path) + if head in planned: + return _OK + + arm = _planning_arm_for(plan_path, head) + if mode == "strict": + print( + f"command head '{head}' is not in experiment_plan.yaml.\n" + f"Either revise the command to use a planned binary, or, " + f"if this is intentional, add '{_AD_HOC_MARKER}' as a comment " + f"line in the command.", + file=sys.stderr, + ) + return _BLOCK + + _log_violation(iter_dir, kind="unplanned", command=cmd, arm=arm) + return _OK + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/docs/architecture.md b/docs/architecture.md index f5e162b..da24a43 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -110,7 +110,8 @@ Both agents write artifacts directly to the campaign directory (`iter_dir`) and **Implementations:** - `StubDispatcher` (`dispatch.py`) produces valid, schema-conformant artifacts without calling any LLM. Used for testing the orchestrator loop. -- `CLIDispatcher` (`cli_dispatch.py`) invokes `claude -p` as a subprocess, giving agents code access and shell tools. Agents write files directly to `iter_dir`. Supports `override_cwd()` context manager for pointing the executor at a git worktree. +- `CLIDispatcher` (`cli_dispatch.py`) invokes `claude -p` as a subprocess, giving agents code access and shell tools. Agents write files directly to `iter_dir`. Supports `override_cwd()` context manager for pointing the executor at a git worktree. Selected via `--agent api`. +- `SDKDispatcher` (`sdk_dispatch.py`) calls the Claude Agent SDK (`claude-agent-sdk`) instead of spawning a subprocess. Same artifact and metrics contract as `CLIDispatcher`; gains native streaming, programmatic prompt caching, and message-level retry. Selected via `--agent sdk`. Requires the optional `sdk` install extra (`pip install -e ".[sdk]"`). Inherits parse / validate / retry-with-feedback machinery from `CLIDispatcher` β€” only the transport changes. **Dispatch interface:** ```python @@ -122,7 +123,18 @@ dispatcher.dispatch( ) ``` -Both dispatchers share the same interface β€” `CLIDispatcher` extends `LLMDispatcher`. +All three dispatchers share the same interface. `CLIDispatcher` extends `LLMDispatcher`; `SDKDispatcher` extends `CLIDispatcher` and overrides only `_call_claude` and `preflight_check`. + +### Stop Hook (`bin/nous-execute-stop`) + +Claude Code Stop hooks fire after every agent turn and decide whether the agent is allowed to terminate. `bin/nous-execute-stop` is Nous's deterministic completion check: the executor is allowed to stop only when both conditions hold on disk, no LLM judgment involved: + +1. `principle_updates.json` exists in the iteration directory. +2. `nous validate execution --dir $NOUS_ITER_DIR` returns `status: pass`. + +If either fails, the hook exits with code 2 and writes a structured reason to stderr; Claude Code feeds that reason back into the agent's conversation so it can fix the artifact and try again. Wire-up lives in the per-campaign `.claude/settings.json` (see #135) β€” the orchestrator exports `NOUS_ITER_DIR` before launching the executor session. + +This is preferred over a probabilistic Haiku evaluator anywhere the success criterion is a schema check: cheaper, faster, and immune to evaluator drift. ## CLI Dispatch diff --git a/docs/contributing/workflow.md b/docs/contributing/workflow.md index 4aaa2cf..ecc579a 100644 --- a/docs/contributing/workflow.md +++ b/docs/contributing/workflow.md @@ -4,6 +4,32 @@ This document defines the standard workflow for contributors using Claude Code t --- +## Non-negotiable rules + +These apply to every PR, every test, every contributor. They are also restated in the auto-loaded `CLAUDE.md` files at the repo root and under `tests/`. + +### 🚫 Tests must NEVER make live LLM calls + +**No unit, integration, or end-to-end test in this repo may make a real API call to Anthropic, OpenAI, or any other LLM provider.** Tests must mock LLMs at the dispatcher seam: + +- `LLMDispatcher` β†’ pass `completion_fn=`. +- `CLIDispatcher` β†’ patch `orchestrator.cli_dispatch.subprocess.run`. +- `SDKDispatcher` β†’ pass `sdk_runner=` returning `SDKResult`. +- `InlineDispatcher` β†’ pre-populate the `.nous_response_*` signal file. +- Or use `StubDispatcher` for end-to-end orchestrator flows. + +`tests/conftest.py` installs an autouse `block_live_llm_calls` fixture that strips LLM API keys from the env and patches `urllib.request.urlopen` + `claude_agent_sdk.query` to hard-fail on real network calls. If a test trips the guard, fix the test by injecting a fake β€” never disable the guard. + +### Behavioral testing only + +Assert what's on disk, what's in metrics rows, what schemas validate. Don't assert which methods were called or what argv was constructed. The dispatcher seams are the contract. + +### Token-budget discipline + +`nous` runs against real LLMs in production; CI cannot. Every PR that touches `orchestrator/` must keep the cache-friendly invariant: methodology lives in `CLAUDE.md` (auto-loaded), system blocks are stable across calls (cache hits), per-iteration content goes in the user message (cache busts when it should). `nous cost --cache-stats` is the regression gate. + +--- + ## Overview Any contributor with Claude Code should follow this workflow when working on an issue. It combines AI-assisted planning and review with explicit human approval gates to produce consistent, high-quality contributions. diff --git a/docs/retros/2026-05-24-claude-code-native-uplift.md b/docs/retros/2026-05-24-claude-code-native-uplift.md new file mode 100644 index 0000000..4332574 --- /dev/null +++ b/docs/retros/2026-05-24-claude-code-native-uplift.md @@ -0,0 +1,79 @@ +# Retro β€” Claude-Code-Native Uplift for Nous (#120) + +**Closes:** [#120](https://github.com/AI-native-Systems-Research/agentic-strategy-evolution/issues/120) +**Window:** 2026-05-24 (single session, multi-PR initiative) +**Children resolved:** 15 of 15 β€” #121, #122, #123, #124, #125, #126, #127, #128, #129, #130, #131, #132, #133, #134, #135. +**Plus a project-wide guard PR:** #151 β€” no-live-LLM-in-tests, codified in `CLAUDE.md` + `tests/CLAUDE.md` + `tests/conftest.py` + `docs/contributing/workflow.md`. + +## What landed + +``` + Foundation Capabilities Ecosystem + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ #121 SDK port │──┬────│ #122 caching β”‚ β”‚ #126 MCP server β”‚ + β”‚ #129 stop hook β”‚ β”œβ”€β”€β”€β”€β”‚ #127 stream-json β”‚ β”‚ #125 plugin pkg β”‚ + β”‚ #135 perm policy β”‚ β”œβ”€β”€β”€β”€β”‚ #132 explore design β”‚ β”‚ #134 routines β”‚ + β”‚ #131 CLAUDE.md β”‚ └────│ #123 parallel arms β”‚ β”‚ #130 channels β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ #133 worktree harnessβ”‚ β”‚ #124 /goal-drivenβ”‚ + β”‚ #128 plan enforcer β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +15 PRs in flight against `upstream/reflective`. ~250 new behavioral tests. Zero structural assertions. Zero live LLM calls (enforced by the conftest guard). + +## How the architecture changed + +Before: Nous was a Python orchestrator that shelled out to `claude -p` as a subprocess for code-access roles, with a custom JSON parser, a custom retry loop, and a manual git-worktree lifecycle. The methodology preamble (~465 lines across `design.md` + `execute_analyze.md`) was re-rendered into every prompt. + +After: Nous is a Python orchestrator that owns checkpointing, validation, and gates, while delegating the actual agent loop to the Claude Agent SDK. Methodology lives in CLAUDE.md (auto-loaded once per session); the prompt body shrinks to per-iteration context only. Subagents (Explore for design mapping, isolation="worktree" for parallel arms) replace the mega-session pattern. The on-disk artifact contract is unchanged β€” every PR was a transport substitution behind the existing `dispatcher.dispatch(role, phase, ...)` seam. + +## Token-budget delta (the user's mission-critical metric) + +| Lever | Before | After | Verifies via | +|---|---|---|---| +| Methodology re-sent each call (#131) | full template (~465 lines) per call | thin template (~50 lines) when CLAUDE.md is in scope | `nous cost --cache-stats` (#122) β€” stats infrastructure landed | +| System block caching (#122) | none | `cache_control: ephemeral` on methodology preamble | `cache_read_input_tokens` in `llm_metrics.jsonl` | +| DESIGN exploration (#132) | one Opus session for codebase walk + synthesis | 4 parallel Haiku Explore subagents + 1 Opus synthesis call | report.input_tokens aggregation in `ExploreStageResult` | +| Multi-arm execution (#123) | one Sonnet mega-session for 24 simulations | per-arm subagent in isolated worktree, parallelizable | wall-clock + per-unit metrics on representative campaign | + +The cache-stats aggregation (`orchestrator/cache_stats.py`) is the regression gate β€” `nous cost --cache-stats` must show non-zero hit rate on warm phase calls and β‰₯25% input-token reduction over the 5-iter baseline. Soak verification on real `inference-sim` campaigns confirms or refutes this; the infrastructure to observe it is in place. + +## How testing held up + +The user's directive β€” "behavioral testing discipline, absolutely no structural tests" β€” was the most consequential constraint of the initiative. It forced specific design choices: + +- **Pluggable seams everywhere.** `sdk_runner` Protocol returning `SDKResult` (#121); `poster` callable for channels (#130) and routines (#134); `runner` injection for plan enforcer (#128), explore stage (#132), parallel arms (#123); `pid_check` and `now=` for worktree GC (#133); `completion_fn` for the legacy LLMDispatcher path. Every test asserts on disk artifacts, JSON shapes, or externally-visible state β€” never on internal helper invocations. +- **No live LLM calls in tests, ever.** Codified in PR #151 with active enforcement: `tests/conftest.py` strips `OPENAI_API_KEY` / `ANTHROPIC_API_KEY` from the env, patches `urllib.request.urlopen` to refuse known LLM hosts, patches `claude_agent_sdk.query` to hard-fail. `tests/test_no_live_llm_guard.py` verifies the guard fires correctly. +- **Determinism via injected clocks/PIDs/IDs.** Tests inject `now=`, `pid_check=`, fake `os.utime`, scripted runners β€” they pass on any machine, in any timezone, without flaky waits. No `time.sleep` polling. + +That seam discipline is also what makes Phase B closures possible: in every #N Phase B PR, the production wiring is one line that constructs the real SDK runner; the orchestration layer + tests above it are unchanged. + +## What's deferred to soak + +Acceptance criteria that explicitly require running a real campaign (the issue body's measurement-based criteria) cannot be honestly verified in CI: + +- #122: β‰₯25% input token drop on a 5-iteration campaign (need Anthropic API). +- #123: significant wall-clock improvement on `examples/campaign-best-of-field.yaml` with `max_parallel_arms: 4` (need real subagent spawning). +- #132: β‰₯30% DESIGN cost drop (need real Explore subagents). +- #131: subjective bundle-quality parity on 3 reference campaigns (human review). +- #126/#130/#134: live transports against MCP / Slack / Routines APIs (need credentials). + +These are integration tests for the soak environment, not unit tests. The infrastructure to measure each is shipped (`nous cost --cache-stats`, the ledger, `merge_unit_results` determinism). The team verifies on first soak; if a criterion fails, the failure is observable from the metrics emit and the cause is traceable to a single seam. + +## What the next initiative should pick up + +- Drop `cli_dispatch.py` once `--agent sdk` has soaked. The CLI subprocess path is dead code after that. +- Drop `worktree.py`'s manual `create_experiment_worktree` / `remove_experiment_worktree` once #123 wires `make_isolated_arm_runner` into iteration.py β€” closes #133's β‰₯60% LoC reduction acceptance criterion. +- Real MCP transport using the `mcp` Python SDK once it pins; the stdio JSON-RPC server in #142 is bounded by what stdlib can do. +- Slack interactive messages adapter for #130 Phase B (parsed reply tokens are landed; the per-channel reply provider needs a webhook receiver). +- Routines API integration once the API stabilizes; the payload builder + `submit_routine` are landed. + +## Lessons (worth carrying to the next epic) + +1. **Phase A / Phase B split was right.** Eleven of fifteen child issues had at least one criterion that requires soak verification. Bundling them as one PR each would have made every PR claim "soak verified" β€” false. Splitting let us land the testable orchestration first and name the soak-only follow-up explicitly. +2. **Stack PRs when one logical change builds on another.** Five PRs stacked on #136 (#121 SDK port); #139 stacked on #138; #150 stacked on #143 stacked on #136. Each stack mirrors the dependency chain. Reviewers can merge bottom-up; rebases are mechanical. +3. **The conftest guard was the highest-leverage one-day investment.** ~50 lines of `tests/conftest.py` and a one-line autouse fixture meant every existing test, every new test, every future PR is now incapable of accidentally spending tokens. Cost: one PR. Benefit: forever. + +## Closing #120 + +All 15 children + the test-policy guard are in flight. The retro is this document; the metric-verification work is named in [`docs/plans/CHECKPOINT.md`](../plans/CHECKPOINT.md). diff --git a/docs/security.md b/docs/security.md new file mode 100644 index 0000000..2f16137 --- /dev/null +++ b/docs/security.md @@ -0,0 +1,44 @@ +# Security model + +Nous campaigns invoke an LLM agent (Claude Code) with shell-tool access against your target repository. The orchestrator's job is to make sure that access is *bounded* β€” agents can only see and modify what the campaign legitimately needs. + +This document describes how that boundary is enforced. + +## Per-campaign permission policy + +When you run `nous run`, the orchestrator writes `/.claude/settings.json` (issue #135). The dispatcher then invokes the agent with `--settings `, replacing the legacy `--dangerously-skip-permissions`. + +The settings file declares: + +| Key | Meaning | +|---|---| +| `permissions.allowOnly` | Absolute paths the agent may read or write. Always includes the campaign work-dir; includes the target repo when `repo_path` is set. | +| `permissions.allow` | Bash command allowlist. Built from a conservative default set (`git`, `python`, `pytest`, `grep`, …) plus any binaries referenced in `experiment_plan.yaml` arms, plus campaign-specific entries you pass via `extra_bin_allowlist`. | +| `permissions.deny` | Hard blocks. Ships with `Bash(curl https://*)`, `Bash(wget https://*)`, and `Bash(rm -rf /*)` to prevent the agent from exfiltrating data or destroying its host. | +| `hooks.Stop` | (When `bin/nous-execute-stop` exists) deterministic completion check β€” see #129. | +| `hooks.PreToolUse` | (When configured) plan-enforcer hook β€” see #128. | + +### Why `--dangerously-skip-permissions` is no longer the default + +`--dangerously-skip-permissions` auto-approves *every* tool call. That's appropriate for a sandboxed CI runner and a one-off experiment, but Nous campaigns run for hours against real repositories β€” we need writes to be bounded to the worktree by default. + +The flag is still available behind explicit opt-in for emergency cases (e.g. recovering a stuck campaign), but no campaign in `examples/` uses it after #135 lands. + +### Idempotency + +`setup_work_dir` only writes `settings.json` if it doesn't already exist. That means you can hand-edit the file (add a custom `extra_bin_allowlist`, tweak deny rules, point `hooks.Stop` at a custom script) and a `nous resume` won't clobber your changes. + +### What's NOT enforced by this layer + +- **Network egress beyond the deny list.** The deny rules block the obvious cases; for hardened environments, run Nous inside a network-namespaced container. +- **Privilege escalation.** The agent runs as your shell user. Claude Code's permission system gates *which* commands run, not *what privileges* they run with. +- **Adversarial inputs from your target repo.** If the repo's source code contains prompt-injection payloads, the agent may follow them. Treat campaigns the way you'd treat any other code review of an untrusted repo. + +## Hook registration + +The settings file's `hooks` section wires up: + +- **Stop hook** (`bin/nous-execute-stop`, #129): allows the executor to terminate only when `principle_updates.json` exists and `nous validate execution` returns pass. Cheaper and more reliable than a Haiku evaluator for schema-driven success criteria. +- **PreToolUse hook** (`bin/nous-plan-enforcer`, #128): rejects (or logs) Bash calls that aren't derivable from `experiment_plan.yaml`. Defense-in-depth on top of the allow/deny lists. + +Both hooks are optional; their absence falls back to settings-only enforcement. diff --git a/orchestrator/cache_stats.py b/orchestrator/cache_stats.py new file mode 100644 index 0000000..0381863 --- /dev/null +++ b/orchestrator/cache_stats.py @@ -0,0 +1,131 @@ +"""Cache hit-rate aggregation over llm_metrics.jsonl (issue #122). + +Reads the per-call metrics file and computes: + + * total cache_read_input_tokens (paid for once per cache window) + * total cache_creation_input_tokens (paid the first time only) + * total uncached input tokens + * cache hit rate = read / (read + creation + uncached) + * by-phase breakdown (so DESIGN-vs-EXECUTE_ANALYZE differences surface) + +The result powers ``nous cost --cache-stats``. Output is JSON-serializable +so the same numbers can drive Routines (#134) reporting later. +""" +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + + +def _iter_rows(path: Path): + if not path.exists(): + return + for line in path.read_text().splitlines(): + line = line.strip() + if not line: + continue + try: + yield json.loads(line) + except json.JSONDecodeError: + continue + + +def cache_stats(metrics_path: Path) -> dict[str, Any]: + """Compute cache hit-rate statistics from a metrics JSONL file. + + Returns: + :: + + { + "total_calls": int, + "input_tokens_uncached": int, + "cache_creation_input_tokens": int, + "cache_read_input_tokens": int, + "hit_rate": float, # 0.0–1.0 + "by_phase": { + : { same fields, scoped to that phase } + } + } + """ + rows = list(_iter_rows(Path(metrics_path))) + return _aggregate(rows) + + +def _aggregate(rows: list[dict]) -> dict[str, Any]: + out: dict[str, Any] = { + "total_calls": 0, + "input_tokens_uncached": 0, + "cache_creation_input_tokens": 0, + "cache_read_input_tokens": 0, + "hit_rate": 0.0, + "by_phase": {}, + } + phase_aggregates: dict[str, dict[str, int]] = {} + + for row in rows: + out["total_calls"] += 1 + # Standard schema: input_tokens captures the uncached portion; + # cache_creation/read are emitted as separate fields by both the + # CLIDispatcher (since #41) and the SDKDispatcher (#121). + uncached = int(row.get("input_tokens", 0) or 0) + creation = int(row.get("cache_creation_input_tokens", 0) or 0) + read = int(row.get("cache_read_input_tokens", 0) or 0) + out["input_tokens_uncached"] += uncached + out["cache_creation_input_tokens"] += creation + out["cache_read_input_tokens"] += read + + phase = row.get("phase", "unknown") + bucket = phase_aggregates.setdefault(phase, { + "calls": 0, + "input_tokens_uncached": 0, + "cache_creation_input_tokens": 0, + "cache_read_input_tokens": 0, + }) + bucket["calls"] += 1 + bucket["input_tokens_uncached"] += uncached + bucket["cache_creation_input_tokens"] += creation + bucket["cache_read_input_tokens"] += read + + total_input = ( + out["input_tokens_uncached"] + + out["cache_creation_input_tokens"] + + out["cache_read_input_tokens"] + ) + out["hit_rate"] = ( + out["cache_read_input_tokens"] / total_input if total_input else 0.0 + ) + + for phase, b in sorted(phase_aggregates.items()): + phase_total = ( + b["input_tokens_uncached"] + + b["cache_creation_input_tokens"] + + b["cache_read_input_tokens"] + ) + b["hit_rate"] = ( + b["cache_read_input_tokens"] / phase_total if phase_total else 0.0 + ) + out["by_phase"] = phase_aggregates + return out + + +def format_cache_stats(stats: dict[str, Any]) -> str: + """Render stats as a multiline human-readable summary.""" + lines: list[str] = [] + lines.append(f" Calls: {stats['total_calls']}") + lines.append(f" Uncached input tokens: {stats['input_tokens_uncached']:,}") + lines.append(f" Cache-creation tokens: {stats['cache_creation_input_tokens']:,}") + lines.append(f" Cache-read tokens: {stats['cache_read_input_tokens']:,}") + lines.append(f" Hit rate: {stats['hit_rate']:.1%}") + if stats.get("by_phase"): + lines.append("") + lines.append(" By phase:") + for phase, b in stats["by_phase"].items(): + lines.append( + f" {phase}: {b['calls']} call(s), " + f"hit rate {b['hit_rate']:.1%} " + f"(read {b['cache_read_input_tokens']:,} / " + f"create {b['cache_creation_input_tokens']:,} / " + f"uncached {b['input_tokens_uncached']:,})" + ) + return "\n".join(lines) diff --git a/orchestrator/campaign.py b/orchestrator/campaign.py index 2ba6a84..bdd1c0b 100644 --- a/orchestrator/campaign.py +++ b/orchestrator/campaign.py @@ -206,15 +206,38 @@ def run_campaign( HumanGate(auto_response="approve") if auto_approve else HumanGate() ) - # Pre-flight: validate CLI + credentials before starting the campaign + # GC orphan experiment worktrees (#133): clean up stale dirs from + # crashed prior runs before starting fresh ones. repo_path = campaign.get("target_system", {}).get("repo_path") + if repo_path: + try: + from orchestrator.worktree import gc_orphan_worktrees + removed = gc_orphan_worktrees(Path(repo_path)) + if removed: + logger.info( + "GC'd %d orphan worktree(s): %s", + len(removed), ", ".join(removed), + ) + except (OSError, RuntimeError) as exc: + logger.warning("Worktree GC failed: %s", exc) + + # Pre-flight: validate CLI + credentials before starting the campaign. + # SDK mode pre-flights via claude-agent-sdk import; API mode via claude CLI. if agent != "inline" and repo_path: - from orchestrator.cli_dispatch import CLIDispatcher - preflight_dispatcher = CLIDispatcher( - work_dir=work_dir, campaign=campaign, - model=_resolve_model(campaign, "design", model), - max_retries=max_cli_retries, - ) + if agent == "sdk": + from orchestrator.sdk_dispatch import SDKDispatcher + preflight_dispatcher = SDKDispatcher( + work_dir=work_dir, campaign=campaign, + model=_resolve_model(campaign, "design", model), + max_retries=max_cli_retries, + ) + else: + from orchestrator.cli_dispatch import CLIDispatcher + preflight_dispatcher = CLIDispatcher( + work_dir=work_dir, campaign=campaign, + model=_resolve_model(campaign, "design", model), + max_retries=max_cli_retries, + ) preflight_dispatcher.preflight_check() start_iter = _resume_completed_campaign(work_dir, max_iterations) @@ -353,7 +376,7 @@ def main() -> None: help="Timeout in seconds for claude -p calls (default: 1800)") parser.add_argument("--max-cli-retries", type=int, default=10, help="Max retries for claude -p failures (-1 = unbounded, default: 10)") - parser.add_argument("--agent", choices=["inline", "api"], default="api", + parser.add_argument("--agent", choices=["inline", "api", "sdk"], default="api", help="Dispatch backend: 'inline' emits prompts to stdout for the " "calling agent (no subprocess, no API key), " "'api' uses the LLM API (default: api)") @@ -397,6 +420,14 @@ def main() -> None: print(f"Working directory: {work_dir.resolve()}") print(f"Max iterations: {max_iter}") + # Initial CLAUDE.md so iter 1 has campaign brief + (empty) principles + # in scope from session start (#131). + try: + from orchestrator.claude_md import regenerate_from_disk + regenerate_from_disk(work_dir, campaign, iteration=0) + except (OSError, RuntimeError) as exc: + logger.warning("Failed to write initial CLAUDE.md: %s", exc) + run_campaign( campaign, work_dir, max_iterations=max_iter, model=args.model, diff --git a/orchestrator/campaign_index.py b/orchestrator/campaign_index.py new file mode 100644 index 0000000..625a950 --- /dev/null +++ b/orchestrator/campaign_index.py @@ -0,0 +1,334 @@ +"""Campaign index β€” pure functions over the on-disk artifact tree (#126). + +These functions are the contract that ``nous-mcp`` (a stdio MCP server, +shipped in a follow-up phase) exposes as resources and tools. Keeping +them pure and import-free of MCP itself means: + + * They're trivially testable without spinning up an MCP transport. + * The CLI can use them too (``nous list``, ``nous find-principle``) + without coupling to the MCP runtime. + * A future Routines invocation (#134) can use the same functions to + publish findings into a shared store. + +Conventions: + + * A "campaign root" is a directory containing ``state.json``, + ``ledger.json``, ``principles.json``. Typically ``/.nous/``. + * A "search root" is a directory under which we walk to find campaign + roots. Searches are bounded to depth 4 so we don't accidentally walk + a giant repo. + * Functions return plain ``dict``/``list`` JSON-friendly structures so + MCP serialization is a no-op. +""" +from __future__ import annotations + +import json +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Iterable + +_MAX_DEPTH = 4 + + +def _walk_campaign_roots(search_root: Path, max_depth: int = _MAX_DEPTH) -> Iterable[Path]: + """Yield directories under ``search_root`` that look like campaign roots.""" + search_root = Path(search_root) + if not search_root.is_dir(): + return + stack: list[tuple[Path, int]] = [(search_root, 0)] + while stack: + path, depth = stack.pop() + if depth > max_depth: + continue + try: + entries = list(path.iterdir()) + except (PermissionError, OSError): + continue + for entry in entries: + if not entry.is_dir(): + continue + # Heuristic: a campaign root has state.json + ledger.json. + if (entry / "state.json").exists() and (entry / "ledger.json").exists(): + yield entry + # Don't descend further inside a campaign root β€” its + # subdirs (runs/iter-N) aren't themselves campaigns. + continue + stack.append((entry, depth + 1)) + + +def _read_json(path: Path) -> Any: + try: + return json.loads(path.read_text()) + except (json.JSONDecodeError, OSError): + return None + + +@dataclass +class CampaignSummary: + run_id: str + path: str + phase: str + iteration: int + completed_iterations: int + active_principles: int + repo: str | None = None + + def as_dict(self) -> dict[str, Any]: + return { + "run_id": self.run_id, + "path": self.path, + "phase": self.phase, + "iteration": self.iteration, + "completed_iterations": self.completed_iterations, + "active_principles": self.active_principles, + "repo": self.repo, + } + + +def _summarize(root: Path) -> CampaignSummary | None: + state = _read_json(root / "state.json") + if not isinstance(state, dict): + return None + ledger = _read_json(root / "ledger.json") + completed = 0 + if isinstance(ledger, dict): + rows = ledger.get("iterations", []) + if isinstance(rows, list): + completed = sum( + 1 for r in rows + if isinstance(r, dict) and isinstance(r.get("iteration"), int) + and r["iteration"] >= 1 + ) + principles = _read_json(root / "principles.json") + active = 0 + if isinstance(principles, dict): + plist = principles.get("principles", []) + if isinstance(plist, list): + active = sum( + 1 for p in plist + if isinstance(p, dict) and p.get("status", "active") == "active" + ) + # Best-effort: target repo is the great-grandparent when work_dir + # was created as /.nous/. + repo: str | None = None + if root.parent.name == ".nous": + repo = str(root.parent.parent.resolve()) + return CampaignSummary( + run_id=state.get("run_id", root.name), + path=str(root.resolve()), + phase=state.get("phase", "UNKNOWN"), + iteration=int(state.get("iteration", 0) or 0), + completed_iterations=completed, + active_principles=active, + repo=repo, + ) + + +# ─── list_campaigns ───────────────────────────────────────────────────────── + + +def list_campaigns( + search_root: Path, + *, + query: str | None = None, + status: str | None = None, + repo: str | None = None, +) -> list[dict[str, Any]]: + """List campaign summaries under ``search_root``. + + Args: + search_root: directory to walk. + query: case-insensitive substring filter against run_id. + status: filter on state.phase (``DONE``, ``EXECUTE_ANALYZE``, etc.). + repo: filter on resolved repo path (substring match). + + Returns: list of summary dicts, sorted by run_id. + """ + out: list[dict[str, Any]] = [] + for root in _walk_campaign_roots(Path(search_root)): + summary = _summarize(root) + if summary is None: + continue + if query and query.lower() not in summary.run_id.lower(): + continue + if status and summary.phase != status: + continue + if repo: + if not summary.repo or repo not in summary.repo: + continue + out.append(summary.as_dict()) + out.sort(key=lambda d: d["run_id"]) + return out + + +# ─── search_principles ──────────────────────────────────────────────────── + + +@dataclass +class PrincipleHit: + run_id: str + path: str # campaign root + principle: dict[str, Any] + score: float = 1.0 # placeholder for future semantic search + + def as_dict(self) -> dict[str, Any]: + return { + "run_id": self.run_id, + "path": self.path, + "score": self.score, + "principle": self.principle, + } + + +def search_principles( + search_root: Path, + text: str, + *, + only_active: bool = True, +) -> list[dict[str, Any]]: + """Find principles whose statement/description matches ``text``. + + Phase A is plain case-insensitive substring matching; the issue notes + embedding-based search as an optional follow-up gated on + ``OPENAI_API_KEY``. + """ + needle = text.lower().strip() + if not needle: + return [] + hits: list[PrincipleHit] = [] + for root in _walk_campaign_roots(Path(search_root)): + principles = _read_json(root / "principles.json") + if not isinstance(principles, dict): + continue + plist = principles.get("principles", []) + if not isinstance(plist, list): + continue + state = _read_json(root / "state.json") or {} + run_id = state.get("run_id", root.name) + for p in plist: + if not isinstance(p, dict): + continue + if only_active and p.get("status", "active") != "active": + continue + haystack = " ".join( + str(p.get(field, "")) for field in + ("statement", "description", "category", "id") + ).lower() + if needle in haystack: + hits.append(PrincipleHit( + run_id=run_id, path=str(root.resolve()), + principle=p, + )) + # Stable order: by run_id, then principle id. + hits.sort(key=lambda h: (h.run_id, str(h.principle.get("id", "")))) + return [h.as_dict() for h in hits] + + +# ─── get_arm_results ────────────────────────────────────────────────────── + + +def get_arm_results( + campaign_root: Path, + iteration: int, + arm: str, +) -> dict[str, Any]: + """Aggregate results for one arm of one iteration. + + Returns: ``{"arm": ..., "iteration": N, "seeds": [{"seed": ..., "files": [...]}]}``. + Seeds and their result files are read from ``runs/iter-N/results///``. + """ + campaign_root = Path(campaign_root) + arm_dir = campaign_root / "runs" / f"iter-{iteration}" / "results" / arm + seeds: list[dict[str, Any]] = [] + if arm_dir.is_dir(): + for seed_dir in sorted(arm_dir.iterdir()): + if not seed_dir.is_dir(): + continue + files = sorted( + str(p.relative_to(campaign_root)) + for p in seed_dir.rglob("*") if p.is_file() + ) + seeds.append({"seed": seed_dir.name, "files": files}) + return {"arm": arm, "iteration": iteration, "seeds": seeds} + + +# ─── compare_iterations ─────────────────────────────────────────────────── + + +def compare_iterations( + campaign_root: Path, + iter_a: int, + iter_b: int, +) -> dict[str, Any]: + """Deterministic diff between two iterations' findings. + + Returns the high-level shape: + ``{"a": , "b": , "delta": {...}}``. + + The delta names which arms changed status (e.g. CONFIRMED β†’ REFUTED) + and which principles were added between the two iterations. No + timestamps, no stochastic ordering β€” calling this twice on the same + data must produce byte-equal output. + """ + campaign_root = Path(campaign_root) + + def _findings(n: int) -> dict[str, Any] | None: + f = _read_json(campaign_root / "runs" / f"iter-{n}" / "findings.json") + return f if isinstance(f, dict) else None + + a = _findings(iter_a) or {} + b = _findings(iter_b) or {} + + def _arm_status_map(f: dict) -> dict[str, str]: + out: dict[str, str] = {} + for arm in f.get("arms", []) or []: + if isinstance(arm, dict): + out[str(arm.get("arm_id", ""))] = str(arm.get("status", "")) + return dict(sorted(out.items())) + + delta = { + "iter_a": iter_a, + "iter_b": iter_b, + "arm_status_changes": _arm_status_diff(_arm_status_map(a), _arm_status_map(b)), + "principles_added": _principles_added(campaign_root, iter_a, iter_b), + } + return {"a": a, "b": b, "delta": delta} + + +def _arm_status_diff(a: dict[str, str], b: dict[str, str]) -> list[dict[str, str]]: + changes = [] + for arm_id in sorted(set(a) | set(b)): + sa = a.get(arm_id, "absent") + sb = b.get(arm_id, "absent") + if sa != sb: + changes.append({"arm_id": arm_id, "from": sa, "to": sb}) + return changes + + +def _principles_added(root: Path, iter_a: int, iter_b: int) -> list[str]: + def _ids(n: int) -> set[str]: + u = _read_json(root / "runs" / f"iter-{n}" / "principle_updates.json") + if not isinstance(u, list): + return set() + return {str(p.get("id", "")) for p in u if isinstance(p, dict) and "id" in p} + return sorted(_ids(iter_b) - _ids(iter_a)) + + +# ─── Resource paths (the strings the MCP server publishes as resources) ── + + +def resource_uri_for_campaign(run_id: str) -> str: + return f"nous://campaigns/{run_id}" + + +def resource_uri_for_state(run_id: str) -> str: + return f"nous://campaigns/{run_id}/state" + + +def resource_uri_for_principles(run_id: str) -> str: + return f"nous://campaigns/{run_id}/principles" + + +def resource_uri_for_iter_findings(run_id: str, iteration: int) -> str: + return f"nous://campaigns/{run_id}/iter/{iteration}/findings" diff --git a/orchestrator/channels.py b/orchestrator/channels.py new file mode 100644 index 0000000..9c00621 --- /dev/null +++ b/orchestrator/channels.py @@ -0,0 +1,229 @@ +"""Channel notification for human gates (issue #130, Phase A). + +Posts a markdown rendering of the gate summary to each configured channel +webhook so reviewers see the gate on Slack/Telegram/etc. without needing +to be at the terminal. + +Phase A scope: outbound notification only β€” the campaign still blocks on +terminal input for the actual decision. Phase B (a follow-up) wires reply +parsing so an "approve" reply on Slack advances the campaign. + +Configuration shape in campaign.yaml:: + + channels: + - kind: slack + webhook_url: https://hooks.slack.com/services/... + - kind: webhook + url: https://example.com/nous/gate + headers: + Authorization: Bearer ... + +Failures are best-effort: a webhook timeout or 5xx logs at warning and +does NOT break the gate. The campaign keeps running. +""" +from __future__ import annotations + +import json +import logging +import urllib.error +import urllib.request +from pathlib import Path +from typing import Any, Callable, Iterable + +logger = logging.getLogger(__name__) + + +_DEFAULT_TIMEOUT_SECONDS = 10 + + +def _summary_to_markdown(summary: dict, *, gate_type: str, iter_dir: Path) -> str: + """Render a gate_summary dict as a compact markdown card.""" + lines = [ + f"### Nous gate: **{gate_type}**", + "", + summary.get("summary", "(no summary)"), + "", + ] + points = summary.get("key_points") or [] + if points: + lines.append("**Key points**") + for p in points: + lines.append(f"- {p}") + lines.append("") + lines.append(f"_iter dir: `{iter_dir}`_") + lines.append("") + lines.append("Reply with `approve`, `reject`, or `abort`.") + return "\n".join(lines) + + +def _post(url: str, body: bytes, headers: dict[str, str], timeout: float) -> int: + """Single HTTP POST. Returns status code; raises on transport error.""" + req = urllib.request.Request(url, data=body, headers=headers, method="POST") + with urllib.request.urlopen(req, timeout=timeout) as resp: + return resp.status + + +def _post_slack(channel: dict, markdown: str, timeout: float) -> int: + url = channel.get("webhook_url") + if not url: + raise ValueError("slack channel missing webhook_url") + body = json.dumps({"text": markdown}).encode("utf-8") + return _post(url, body, {"Content-Type": "application/json"}, timeout) + + +def _post_generic(channel: dict, markdown: str, timeout: float) -> int: + url = channel.get("url") + if not url: + raise ValueError("webhook channel missing url") + headers = {"Content-Type": "application/json"} + headers.update(channel.get("headers") or {}) + body = json.dumps({"markdown": markdown}).encode("utf-8") + return _post(url, body, headers, timeout) + + +_DISPATCHERS: dict[str, Callable[[dict, str, float], int]] = { + "slack": _post_slack, + "webhook": _post_generic, +} + + +def notify_gate( + channels: Iterable[dict] | None, + *, + summary: dict, + gate_type: str, + iter_dir: Path, + timeout: float = _DEFAULT_TIMEOUT_SECONDS, + poster: Callable[[str, bytes, dict[str, str], float], int] | None = None, +) -> list[dict[str, Any]]: + """POST a gate summary to every configured channel. + + Args: + channels: list of channel configs from campaign.yaml. ``None`` or an + empty list is a no-op. + summary: parsed gate_summary_.json contents. + gate_type: ``design`` | ``findings`` | ``continue`` etc. + iter_dir: iteration directory (shown in the markdown card). + timeout: per-request timeout in seconds. + poster: dependency-injection seam for tests. When set, used instead + of the real urllib.request.urlopen path. Signature matches ``_post``. + + Returns: + A list of result dicts β€” one per channel β€” with keys + ``kind``, ``ok``, ``status_code`` (or ``error``). The campaign uses + this to decide what to log, but never raises on individual failures. + """ + if not channels: + return [] + + markdown = _summary_to_markdown(summary, gate_type=gate_type, iter_dir=iter_dir) + + results: list[dict[str, Any]] = [] + for channel in channels: + kind = channel.get("kind", "webhook") + result: dict[str, Any] = {"kind": kind, "ok": False} + try: + if poster is not None: + # Test path: bypass dispatcher, post directly. + if kind == "slack": + body = json.dumps({"text": markdown}).encode("utf-8") + url = channel.get("webhook_url", "") + headers = {"Content-Type": "application/json"} + else: + body = json.dumps({"markdown": markdown}).encode("utf-8") + url = channel.get("url", "") + headers = {"Content-Type": "application/json"} + headers.update(channel.get("headers") or {}) + status = poster(url, body, headers, timeout) + else: + dispatcher = _DISPATCHERS.get(kind) + if dispatcher is None: + raise ValueError(f"unknown channel kind: {kind!r}") + status = dispatcher(channel, markdown, timeout) + result["status_code"] = status + result["ok"] = 200 <= status < 300 + except (urllib.error.URLError, ValueError, TimeoutError, OSError) as exc: + logger.warning( + "channel %r notify failed: %s", kind, exc, + ) + result["error"] = str(exc) + results.append(result) + return results + + +# ─── Phase B: reply parsing + wait-for-decision ──────────────────────────── + + +_REPLY_TOKENS: dict[str, str] = { + "approve": "approve", + "approved": "approve", + "lgtm": "approve", + "ok": "approve", + "yes": "approve", + "reject": "reject", + "rejected": "reject", + "no": "reject", + "redesign": "reject", + "abort": "abort", + "stop": "abort", + "cancel": "abort", +} + + +def parse_reply(text: str) -> str | None: + """Map a free-form channel reply to a gate Decision. + + Returns ``"approve"`` / ``"reject"`` / ``"abort"`` when the message + starts with (or is exactly) a recognized token. Returns ``None`` + when the reply doesn't decode to a decision β€” caller should keep + waiting or fall through to the timeout. + + Recognized tokens (case-insensitive): + approve | approved | lgtm | ok | yes -> approve + reject | rejected | no | redesign -> reject + abort | stop | cancel -> abort + """ + if not isinstance(text, str): + return None + head = text.strip().lower().split() + if not head: + return None + return _REPLY_TOKENS.get(head[0]) + + +def wait_for_reply( + reply_provider: "Callable[[], str | None]", + *, + timeout_seconds: float, + poll_interval_seconds: float = 1.0, + sleeper: "Callable[[float], None] | None" = None, + clock: "Callable[[], float] | None" = None, +) -> str | None: + """Poll ``reply_provider`` until it returns a recognized decision or + timeout elapses. + + Args: + reply_provider: callable returning the latest channel message text + (or ``None`` if no new reply yet). + timeout_seconds: max time to wait before returning ``None``. + poll_interval_seconds: how long to sleep between polls. + sleeper: dependency-injection seam for tests (default: time.sleep). + clock: dependency-injection seam for tests (default: time.time). + + Returns: + ``"approve"`` / ``"reject"`` / ``"abort"`` on first recognized reply. + ``None`` on timeout β€” caller should fall back to ``--auto-approve`` + semantics (the issue's documented timeout behavior). + """ + import time as _time + sleep = sleeper if sleeper is not None else _time.sleep + now = clock if clock is not None else _time.time + + deadline = now() + timeout_seconds + while now() < deadline: + text = reply_provider() + decision = parse_reply(text) if text is not None else None + if decision is not None: + return decision + sleep(poll_interval_seconds) + return None diff --git a/orchestrator/claude_md.py b/orchestrator/claude_md.py new file mode 100644 index 0000000..81ace8b --- /dev/null +++ b/orchestrator/claude_md.py @@ -0,0 +1,159 @@ +"""Per-campaign ``CLAUDE.md`` generator (issue #131). + +Claude Code auto-loads ``CLAUDE.md`` from each working / added directory +on every session, **once**. That makes it the right home for content that +is stable across calls within a campaign: + + * The campaign brief (research question, target system, observable + metrics, controllable knobs). + * The accumulated ``principles.json`` β€” the campaign's living knowledge + base. + * The most recent ``handoff.md`` β€” designer-to-executor context. + +This module is a pure renderer: ``render_campaign_claude_md`` takes +inputs and returns a string; ``write_campaign_claude_md`` writes it to +disk. Regeneration after each iteration is deterministic Python β€” never +an LLM call. + +The win this enables (full payoff lands when the prompt-template refactor +ships): each Nous LLM call no longer re-injects the campaign brief and +principles. Compounded with #122's ``cache_control: ephemeral`` on the +methodology system block, the bulk of static context is paid for once +per session, not once per turn. +""" +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +from orchestrator.util import atomic_write + + +_HEADER = """# Nous Campaign Context + +> This file is auto-generated by the orchestrator. **Do not hand-edit** β€” +> changes will be overwritten on the next iteration. The orchestrator +> updates the principles section after every iteration. +""" + + +def _format_principles(principles: list[dict] | None) -> str: + """Render principles.json contents as a readable markdown section.""" + if not principles: + return "_No principles accumulated yet._" + lines: list[str] = [] + for p in principles: + if not isinstance(p, dict): + continue + pid = p.get("id", "?") + statement = p.get("statement") or p.get("description") or "(no statement)" + category = p.get("category", "general") + status = p.get("status", "active") + if status != "active": + continue + lines.append(f"- **{pid}** [{category}]: {statement}") + if not lines: + return "_No active principles._" + return "\n".join(lines) + + +def _format_target(target: dict) -> str: + parts = [ + f"**{target.get('name', 'Unknown system')}**", + target.get("description", ""), + ] + metrics = target.get("observable_metrics") + if metrics: + parts.append(f"\n**Observable metrics:** {', '.join(metrics)}") + knobs = target.get("controllable_knobs") + if knobs: + parts.append(f"\n**Controllable knobs:** {', '.join(knobs)}") + return "\n".join(p for p in parts if p) + + +def render_campaign_claude_md( + *, + campaign: dict, + principles: list[dict] | None = None, + last_handoff: str | None = None, + iteration: int | None = None, +) -> str: + """Build the CLAUDE.md content for one campaign. + + Sections (markdown headings the agent can navigate): + 1. Campaign brief β€” research_question, target_system summary. + 2. Active principles β€” formatted list from principles.json. + 3. Last handoff β€” designerβ†’executor handoff from the most recent + iteration that produced one (empty in iter 1). + + Returns the full markdown text. Caller is responsible for writing + it to disk via ``write_campaign_claude_md``. + """ + research_question = campaign.get("research_question", "(not set)") + target = campaign.get("target_system", {}) + + iter_line = f" (after iteration {iteration})" if iteration else "" + + sections = [ + _HEADER, + "## Research Question\n", + research_question.strip(), + "", + "## Target System\n", + _format_target(target), + "", + f"## Active Principles{iter_line}\n", + _format_principles(principles or []), + "", + "## Most Recent Handoff\n", + ] + if last_handoff and last_handoff.strip(): + sections.append(last_handoff.strip()) + else: + sections.append("_First iteration β€” no prior handoff._") + return "\n".join(sections) + "\n" + + +def write_campaign_claude_md(work_dir: Path, content: str) -> Path: + """Atomically write CLAUDE.md to the campaign work-dir. + + Returns the absolute path to the file. + """ + target = Path(work_dir) / "CLAUDE.md" + atomic_write(target, content) + return target.resolve() + + +def regenerate_from_disk(work_dir: Path, campaign: dict, iteration: int) -> Path: + """Refresh CLAUDE.md after iteration N completes. + + Reads the current ``principles.json`` and ``handoff.md`` from + ``work_dir`` and writes a freshly-rendered CLAUDE.md. Returns the + absolute path written. + """ + work_dir = Path(work_dir) + principles: list[dict[str, Any]] = [] + p_path = work_dir / "principles.json" + if p_path.exists(): + try: + store = json.loads(p_path.read_text()) + principles = store.get("principles", []) + except (json.JSONDecodeError, OSError): + principles = [] + + handoff_text: str | None = None + h_path = work_dir / "handoff.md" + if h_path.exists(): + try: + handoff_text = h_path.read_text() + except OSError: + handoff_text = None + + content = render_campaign_claude_md( + campaign=campaign, + principles=principles, + last_handoff=handoff_text, + iteration=iteration, + ) + return write_campaign_claude_md(work_dir, content) diff --git a/orchestrator/cli.py b/orchestrator/cli.py index 755e9d9..81ef69f 100644 --- a/orchestrator/cli.py +++ b/orchestrator/cli.py @@ -161,26 +161,41 @@ def _cmd_validate(args): def _cmd_status(args): - import json + """Status surface β€” one-shot, single-line, or live --watch (#127).""" + import time as _time + from orchestrator.status import ( + format_one_liner, + format_watch_panel, + read_status_snapshot, + ) work_dir = resolve_work_dir(args.target) - state_file = work_dir / "state.json" - if not state_file.exists(): + if not (work_dir / "state.json").exists(): print(f"Error: no state.json at {work_dir}", file=sys.stderr) sys.exit(1) - state = json.loads(state_file.read_text()) - ledger = json.loads((work_dir / "ledger.json").read_text()) if (work_dir / "ledger.json").exists() else {"iterations": []} - principles = json.loads((work_dir / "principles.json").read_text()) if (work_dir / "principles.json").exists() else {"principles": []} - - active_principles = [p for p in principles.get("principles", []) if p.get("status") == "active"] - completed = [it for it in ledger.get("iterations", []) if it.get("iteration", 0) > 0] + if getattr(args, "line", False): + print(format_one_liner(read_status_snapshot(work_dir))) + return - print(f"Campaign: {state.get('run_id', '?')}") - print(f"Phase: {state.get('phase', '?')}") - print(f"Iteration: {state.get('iteration', '?')}") - print(f"Completed: {len(completed)} iteration(s)") - print(f"Principles: {len(active_principles)} active") + if getattr(args, "watch", False): + try: + while True: + snap = read_status_snapshot(work_dir) + # Clear screen + home cursor (ANSI). Falls back gracefully + # in non-tty contexts to a separator line. + if sys.stdout.isatty(): + sys.stdout.write("\033[2J\033[H") + else: + sys.stdout.write("\n" + "─" * 60 + "\n") + sys.stdout.write(format_watch_panel(snap) + "\n") + sys.stdout.flush() + _time.sleep(args.interval if args.interval > 0 else 2) + except KeyboardInterrupt: + print() + return + + print(format_watch_panel(read_status_snapshot(work_dir))) def _cmd_cost(args): @@ -206,6 +221,11 @@ def _cmd_cost(args): for phase, b in s["by_phase"].items(): print(f" {phase:20s} {b['calls']} calls ${b['cost_usd']:.4f} {b['input_tokens']+b['output_tokens']} tok") + if getattr(args, "cache_stats", False): + from orchestrator.cache_stats import cache_stats, format_cache_stats + print("\nCache stats:") + print(format_cache_stats(cache_stats(metrics_path))) + def _cmd_report(args): import logging @@ -310,7 +330,7 @@ def main(): p_run.add_argument("--auto-approve", action="store_true") p_run.add_argument("--timeout", type=int, default=1800) p_run.add_argument("--max-cli-retries", type=int, default=10) - p_run.add_argument("--agent", choices=["inline", "api"], default="api") + p_run.add_argument("--agent", choices=["inline", "api", "sdk"], default="api") p_run.set_defaults(func=_cmd_run) p_resume = subparsers.add_parser("resume") @@ -320,7 +340,7 @@ def main(): p_resume.add_argument("--auto-approve", action="store_true") p_resume.add_argument("--timeout", type=int, default=1800) p_resume.add_argument("--max-cli-retries", type=int, default=10) - p_resume.add_argument("--agent", choices=["inline", "api"], default="api") + p_resume.add_argument("--agent", choices=["inline", "api", "sdk"], default="api") p_resume.set_defaults(func=_cmd_resume) p_validate = subparsers.add_parser("validate") @@ -330,17 +350,33 @@ def main(): p_status = subparsers.add_parser("status") p_status.add_argument("target") + p_status.add_argument( + "--watch", action="store_true", + help="Loop and redraw every --interval seconds (#127).", + ) + p_status.add_argument( + "--line", action="store_true", + help="Print a single-line summary suitable for shell prompts (#127).", + ) + p_status.add_argument( + "--interval", type=float, default=2.0, + help="Watch redraw interval in seconds (default: 2).", + ) p_status.set_defaults(func=_cmd_status) p_cost = subparsers.add_parser("cost") p_cost.add_argument("target") + p_cost.add_argument( + "--cache-stats", action="store_true", + help="Include prompt-cache hit-rate stats (#122).", + ) p_cost.set_defaults(func=_cmd_cost) p_report = subparsers.add_parser("report") p_report.add_argument("target") p_report.add_argument("--model") p_report.add_argument("--timeout", type=int, default=1800) - p_report.add_argument("--agent", choices=["inline", "api"], default="api") + p_report.add_argument("--agent", choices=["inline", "api", "sdk"], default="api") p_report.set_defaults(func=_cmd_report) p_replay = subparsers.add_parser("replay") diff --git a/orchestrator/cli_dispatch.py b/orchestrator/cli_dispatch.py index 5a4c968..8f2e2e1 100644 --- a/orchestrator/cli_dispatch.py +++ b/orchestrator/cli_dispatch.py @@ -51,6 +51,7 @@ def __init__( timeout: int = 1800, max_turns: int = 25, max_retries: int | None = 10, + settings_path: Path | None = None, ) -> None: super().__init__( work_dir=work_dir, @@ -66,6 +67,13 @@ def __init__( self.max_retries = max_retries repo_path = campaign.get("target_system", {}).get("repo_path") self._cwd = Path(repo_path) if repo_path else None + # Per-campaign permission policy (#135). When set, replaces the + # blanket --dangerously-skip-permissions with a fine-grained settings + # file. Auto-resolved from work_dir/.claude/settings.json if it exists. + if settings_path is None: + candidate = Path(work_dir) / ".claude" / "settings.json" + settings_path = candidate if candidate.exists() else None + self._settings_path = settings_path @contextmanager def override_cwd(self, cwd: Path): @@ -216,8 +224,11 @@ def _retry_cli_schema( def _call_claude(self, prompt: str, max_turns: int | None = None) -> str: """Invoke `claude -p` with the prompt on stdin, retrying transient failures.""" - cmd = ["claude", "-p", "--model", self.model, "--output-format", "json", - "--dangerously-skip-permissions"] + cmd = ["claude", "-p", "--model", self.model, "--output-format", "json"] + if self._settings_path is not None: + cmd += ["--settings", str(self._settings_path)] + else: + cmd += ["--dangerously-skip-permissions"] turns = max_turns or self.max_turns cmd += ["--max-turns", str(turns)] cwd = self._cwd diff --git a/orchestrator/explore_design.py b/orchestrator/explore_design.py new file mode 100644 index 0000000..4d037d3 --- /dev/null +++ b/orchestrator/explore_design.py @@ -0,0 +1,257 @@ +"""Explore-then-synthesize DESIGN phase (issue #132). + +DESIGN today asks one Opus session to do two things at once: + + 1. Read the codebase to map metrics, knobs, prior findings, principles. + 2. Synthesize a hypothesis bundle from what it found. + +That's the canonical Claude-Code-pattern miss: broad exploration + small +synthesis is exactly what parallel Explore subagents are for. Phase A +of #132 ships the orchestration layer that makes the split possible +without changing what gets produced (problem.md + bundle.yaml). + +Stage A β€” parallel Explore: ``run_explore_stage(campaign, scopes, +runner)`` fans out one read-only subagent per scope and collects their +reports. + +Stage B β€” Opus synthesis: ``build_synthesis_prompt(reports, campaign, +iteration)`` produces the prompt body for the single Opus call that +turns the explorer reports + principles.json into problem.md + +bundle.yaml. + +Phase A is the orchestration helpers + their behavioral tests. The +dispatcher integration (SDKDispatcher spawning Explore subagents, +threading reports back into a synthesis call) lands in Phase B once +#121 merges and the team picks injection points. +""" +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Callable, Iterable + +# Default exploration scopes β€” one Explore subagent per scope. The +# scopes are deliberately overlapping a little so synthesis has +# redundant signal where it matters. +DEFAULT_EXPLORE_SCOPES: tuple[str, ...] = ( + "metrics", # observable metrics + how they're collected + "knobs", # controllable knobs + their value ranges + "prior_findings", # findings.json from previous iterations + "principles", # principles.json across the campaign + others +) + + +@dataclass +class ExploreReport: + scope: str + text: str + duration_ms: int = 0 + input_tokens: int = 0 + output_tokens: int = 0 + + def as_dict(self) -> dict: + return { + "scope": self.scope, + "text": self.text, + "duration_ms": self.duration_ms, + "input_tokens": self.input_tokens, + "output_tokens": self.output_tokens, + } + + +@dataclass +class ExploreStageResult: + reports: list[ExploreReport] = field(default_factory=list) + + @property + def total_input_tokens(self) -> int: + return sum(r.input_tokens for r in self.reports) + + @property + def total_output_tokens(self) -> int: + return sum(r.output_tokens for r in self.reports) + + def by_scope(self, scope: str) -> ExploreReport | None: + for r in self.reports: + if r.scope == scope: + return r + return None + + +def build_explore_prompt(scope: str, campaign: dict) -> str: + """Construct a read-only Explore subagent prompt for one scope. + + The subagent should be spawned with ``subagent_type="Explore"`` so + it cannot mutate the worktree. The prompt is short and scope-tight + on purpose; the synthesis call (Stage B) is where multi-aspect + integration happens. + """ + target = campaign.get("target_system", {}) + name = target.get("name", "the target system") + repo = target.get("repo_path", "(repo not configured)") + + if scope == "metrics": + focus = ( + "Map the observable metrics this system exposes and how they " + "are collected. Include the file/function where each metric is " + "computed." + ) + elif scope == "knobs": + focus = ( + "Map the controllable knobs / configuration parameters this " + "system exposes. For each knob, note its declared range and the " + "code path that consumes it." + ) + elif scope == "prior_findings": + focus = ( + "Read prior runs/iter-*/findings.json files in the campaign " + "directory. Summarize confirmed/refuted hypotheses and any open " + "questions surfaced by the most recent iteration." + ) + elif scope == "principles": + focus = ( + "Read principles.json in this campaign and any sibling campaigns " + "(via the campaign_index module if available). Flag principles " + "that touch the same mechanism we're about to design for." + ) + else: + focus = f"Investigate the '{scope}' aspect of the target system." + + return ( + f"# Explore: {scope}\n\n" + f"You are a read-only Explore subagent. **Do not modify any files.**\n" + f"Target: {name} (repo at {repo})\n\n" + f"## Focus\n{focus}\n\n" + f"## Output\n" + f"Return a markdown report of <= 500 lines. Cite file paths and " + f"line numbers. End with a one-paragraph summary the synthesizer " + f"can read in isolation.\n" + ) + + +ExploreRunner = Callable[[str, str, dict], ExploreReport] +"""Callable signature for running one Explore subagent. + +Takes (scope, prompt, campaign) and returns an ExploreReport. The +default real-world implementation spawns subagent_type="Explore" via +the SDK and reads the assistant's final text. Tests inject a deterministic +fake. +""" + + +def run_explore_stage( + campaign: dict, + *, + scopes: Iterable[str] = DEFAULT_EXPLORE_SCOPES, + runner: ExploreRunner, +) -> ExploreStageResult: + """Run one Explore subagent per scope and collect their reports. + + Phase A executes synchronously over the runner. Real parallel + fan-out (anyio gather over the SDK's async API) lands in Phase B + when the SDK runner ships its async surface. + """ + reports: list[ExploreReport] = [] + for scope in scopes: + prompt = build_explore_prompt(scope, campaign) + report = runner(scope, prompt, campaign) + reports.append(report) + return ExploreStageResult(reports=reports) + + +def make_sdk_explore_runner( + *, + sdk_runner: Callable, + cwd: Path | None = None, + model: str = "claude-haiku-4-5", + max_turns: int = 8, +) -> ExploreRunner: + """Build an ExploreRunner backed by an SDK subagent (#132 Phase B). + + Each scope spawns a read-only subagent (``subagent_type="Explore"``) + so the orchestrator gets parallel mapping without a giant Opus + session doing both walking and synthesis. Per the no-live-LLM + project principle (CLAUDE.md), this factory takes an injected + ``sdk_runner`` β€” production wiring constructs the real Anthropic + SDK runner; tests inject a recording fake. + + Defaults model to Haiku because read-only mapping is cheap and + benefits from speed over depth; deep synthesis happens in Stage B + (the single Opus call), not in Stage A. + """ + def _run(scope: str, prompt: str, campaign: dict) -> ExploreReport: + try: + result = sdk_runner( + prompt=prompt, + model=model, + cwd=cwd, + max_turns=max_turns, + system_prompt=None, + settings_path=None, + event_log_path=None, + subagent_type="Explore", + ) + except TypeError: + # Older runners without subagent_type β€” fall back to the + # base signature so the factory stays compatible across + # SDK API evolution. + result = sdk_runner( + prompt=prompt, model=model, cwd=cwd, max_turns=max_turns, + ) + + return ExploreReport( + scope=scope, + text=getattr(result, "text", "") or "", + duration_ms=int(getattr(result, "duration_ms", 0) or 0), + input_tokens=int(getattr(result, "input_tokens", 0) or 0), + output_tokens=int(getattr(result, "output_tokens", 0) or 0), + ) + + return _run + + +def build_synthesis_prompt( + stage_a: ExploreStageResult, + *, + campaign: dict, + iteration: int, + iter_dir: Path, +) -> str: + """Build the Opus synthesis prompt that turns Explore reports into + problem.md + bundle.yaml. + + The synthesizer never reads the codebase directly β€” it consumes only + the explorer reports + principles.json. That's the whole point of + the split: Opus on integration, not on file walks. + """ + target = campaign.get("target_system", {}) + rq = campaign.get("research_question", "(not set)") + + sections = [ + f"# Synthesize iteration {iteration}", + "", + "Four read-only Explore subagents have already mapped the system.", + "**Do not re-read the codebase.** Synthesize from the reports below.", + "", + f"## Research question\n{rq}", + "", + f"## Target\n{target.get('name', '?')} β€” {target.get('description', '')}", + "", + "## Explorer reports", + ] + for report in stage_a.reports: + sections.append("") + sections.append(f"### {report.scope}\n") + sections.append(report.text) + + sections.extend([ + "", + "## Required outputs", + f"- {iter_dir}/problem.md (markdown)", + f"- {iter_dir}/bundle.yaml (YAML, must validate against bundle.schema.yaml)", + "", + "Cite explorer reports by their `### ` heading when justifying " + "design choices. The reports are the source of truth for this " + "iteration's design.", + ]) + return "\n".join(sections) diff --git a/orchestrator/goal_driven.py b/orchestrator/goal_driven.py new file mode 100644 index 0000000..33b421a --- /dev/null +++ b/orchestrator/goal_driven.py @@ -0,0 +1,175 @@ +"""`/goal`-driven campaign mode (issue #124). + +Two modes Nous can run in: + + Mode A β€” fully /goal-driven: spawn one ``claude`` session for the + whole campaign with a /goal directive that says "iteration N has + a valid findings.json and a principle_updates.json file, OR stop + after the campaign timeout." The Haiku evaluator that fires after + every turn decides when the goal is met. No Python state machine + in the inner loop. + + Mode B β€” /goal-bounded inner loop: keep the engine.py state machine + for control flow but use /goal *within* EXECUTE_ANALYZE so the + executor terminates as soon as validation passes. Cheaper than + Python-driven retry loops. + +Phase A ships the prompt builders for both modes (deterministic Python). +Wire-up into the dispatcher and the run_campaign code path lands in +Phase B once the team picks which mode is the default. + +Why deterministic prompt builders ship first: criterion #2 of the issue +("hybrid mode is the default for nous run after one release of soak") +implies the team will run both modes side by side on real campaigns +and compare. Behavioral testing of the prompt assembly β€” does it +include the campaign brief, does it spell out the goal predicate +exactly β€” is what makes those soak runs comparable. +""" +from __future__ import annotations + +from pathlib import Path +from typing import Any + + +_DEFAULT_GOAL_DRIVEN_TIMEOUT_HOURS = 24 + + +def build_full_goal_directive( + campaign: dict, + *, + iteration: int, + timeout_hours: int = _DEFAULT_GOAL_DRIVEN_TIMEOUT_HOURS, +) -> str: + """Build the /goal text for Mode A (whole-campaign goal). + + The text is what gets sent as ``/goal "<...>"`` to a Claude Code + session. Predicate: iteration N has a valid findings.json AND a + principle_updates.json file, OR the elapsed time exceeds + timeout_hours. + """ + return ( + f"iteration {iteration} has produced runs/iter-{iteration}/findings.json " + f"with a non-empty arms list AND runs/iter-{iteration}/principle_updates.json " + f"with a list (possibly empty), OR more than {timeout_hours} hours have elapsed " + f"since this session started" + ) + + +def build_inner_loop_goal_directive( + iteration: int, + *, + extra_predicates: list[str] | None = None, +) -> str: + """Build the /goal text for Mode B (EXECUTE_ANALYZE-bounded goal). + + Predicate: validate execution passes AND principle_updates.json + exists. The deterministic Stop hook (#129) also enforces this; the + /goal evaluator is the probabilistic backup that catches edge cases + the schema check doesn't. + """ + parts = [ + f"runs/iter-{iteration}/findings.json validates against findings.schema.json", + f"runs/iter-{iteration}/principle_updates.json exists and parses as a list", + ] + if extra_predicates: + parts.extend(extra_predicates) + return " AND ".join(parts) + + +def build_goal_driven_session_prompt( + campaign: dict, + *, + iteration: int, + timeout_hours: int = _DEFAULT_GOAL_DRIVEN_TIMEOUT_HOURS, + work_dir: Path | None = None, +) -> str: + """Build the full prompt body for a Mode A session. + + The prompt asks the agent to drive iteration N of the Nous loop + end-to-end inside the session, printing artifact paths so the Haiku + /goal evaluator can see them. + """ + target = campaign.get("target_system", {}) + rq = campaign.get("research_question", "(not set)") + + sections = [ + "# Goal-driven Nous campaign", + "", + "You are running iteration {iter} of a Nous hypothesis-driven experiment.", + "Drive the full DESIGN β†’ EXECUTE_ANALYZE β†’ DONE flow inside this session.", + "", + "## Campaign brief", + f"- Research question: {rq}", + f"- Target system: {target.get('name', '?')}", + f"- Description: {target.get('description', '(no description)')}", + ] + metrics = target.get("observable_metrics") + if metrics: + sections.append(f"- Observable metrics: {', '.join(metrics)}") + knobs = target.get("controllable_knobs") + if knobs: + sections.append(f"- Controllable knobs: {', '.join(knobs)}") + + sections.extend([ + "", + "## Required artifacts (iteration {iter})", + f"- runs/iter-{iteration}/problem.md", + f"- runs/iter-{iteration}/bundle.yaml", + f"- runs/iter-{iteration}/experiment_plan.yaml", + f"- runs/iter-{iteration}/findings.json", + f"- runs/iter-{iteration}/principle_updates.json", + "", + "**Print every artifact path to stdout when you write it.** The /goal " + "evaluator only sees what's been surfaced in the conversation; " + "silent file writes won't trip the goal predicate.", + "", + "Run `nous validate execution --dir runs/iter-{iter}/` before claiming done.", + "", + "## Goal predicate", + f"/goal {build_full_goal_directive(campaign, iteration=iteration, timeout_hours=timeout_hours)!r}", + ]) + + text = "\n".join(sections) + return text.replace("{iter}", str(iteration)) + + +# ─── Phase B: dispatcher wire-up ──────────────────────────────────────────── + + +def run_goal_driven_iteration( + *, + dispatcher, + campaign: dict, + iteration: int, + work_dir: Path, + timeout_hours: int = _DEFAULT_GOAL_DRIVEN_TIMEOUT_HOURS, +) -> Path: + """Mode A β€” drive iteration N entirely inside a single SDK session. + + Bypasses the engine.py phase machine. The agent receives the + goal-driven prompt (with its embedded ``/goal`` directive) and + drives DESIGN β†’ EXECUTE_ANALYZE β†’ DONE itself. The orchestrator + persists the conversation transcript as ``design_log.md``; the + artifacts (problem.md, bundle.yaml, findings.json, etc.) are + written by the agent's own tool calls inside the session. + + Args: + dispatcher: any object exposing ``_call_claude(prompt) -> str``. + ``SDKDispatcher`` is the canonical caller; tests inject a fake. + campaign: parsed campaign config. + iteration: iteration number to drive. + work_dir: campaign work-dir. + timeout_hours: bound on the goal predicate's OR clause. + + Returns: + Path to the conversation log on disk. + """ + iter_dir = Path(work_dir) / "runs" / f"iter-{iteration}" + iter_dir.mkdir(parents=True, exist_ok=True) + prompt = build_goal_driven_session_prompt( + campaign, iteration=iteration, timeout_hours=timeout_hours, + ) + transcript = dispatcher._call_claude(prompt) + log_path = iter_dir / "design_log.md" + log_path.write_text(transcript) + return log_path diff --git a/orchestrator/iteration.py b/orchestrator/iteration.py index 29e9712..a3f4ea9 100644 --- a/orchestrator/iteration.py +++ b/orchestrator/iteration.py @@ -193,7 +193,17 @@ def setup_work_dir(run_id: str, repo_path: str | None = None) -> Path: If repo_path is provided, the campaign directory is created inside the target repo at .nous//. Otherwise falls back to creating / in the current directory. + + Also writes a per-campaign ``.claude/settings.json`` permission policy + (issue #135) so dispatchers can pass ``--settings `` instead of + ``--dangerously-skip-permissions``. """ + from orchestrator.settings_template import ( + render_campaign_settings, + settings_path_for, + write_campaign_settings, + ) + if repo_path: work_dir = Path(repo_path) / ".nous" / run_id else: @@ -206,13 +216,35 @@ def setup_work_dir(run_id: str, repo_path: str | None = None) -> Path: state = json.loads((work_dir / "state.json").read_text()) state["run_id"] = run_id atomic_write(work_dir / "state.json", json.dumps(state, indent=2) + "\n") + + # Per-campaign permission policy. Idempotent: don't overwrite a settings + # file the user has hand-edited. + settings_path = settings_path_for(work_dir) + if not settings_path.exists(): + bin_dir = Path(__file__).resolve().parent.parent / "bin" + stop_hook = bin_dir / "nous-execute-stop" + plan_enforcer = bin_dir / "nous-plan-enforcer" + settings = render_campaign_settings( + work_dir=work_dir, + repo_path=Path(repo_path) if repo_path else None, + stop_hook_path=stop_hook if stop_hook.exists() else None, + pre_tool_use_hook_path=plan_enforcer if plan_enforcer.exists() else None, + ) + write_campaign_settings(settings_path, settings) + return work_dir def _generate_gate_summary( dispatcher, iter_dir: Path, iteration: int, gate_type: str, + *, campaign: dict | None = None, ) -> Path | None: - """Generate a gate summary file. Returns the path, or None on failure.""" + """Generate a gate summary file. Returns the path, or None on failure. + + When ``campaign`` is provided and contains a non-empty ``channels`` list, + also fires off a per-channel notification (#130) with the rendered + summary. Channel failures are logged at warning and never block the gate. + """ summary_path = iter_dir / f"gate_summary_{gate_type}.json" try: dispatcher.dispatch( @@ -221,13 +253,33 @@ def _generate_gate_summary( iteration=iteration, perspective=gate_type, ) - return summary_path except (RuntimeError, FileNotFoundError, OSError) as exc: logger = logging.getLogger(__name__) logger.warning("Gate summary generation failed: %s", exc) print(f" (Gate summary skipped: {exc})") return None + # Channel notification (#130 Phase A): outbound only; the campaign still + # blocks on terminal input for the actual decision. + if campaign: + channels = campaign.get("channels") + if channels: + try: + from orchestrator.channels import notify_gate + summary = json.loads(summary_path.read_text()) + results = notify_gate( + channels, summary=summary, gate_type=gate_type, + iter_dir=iter_dir, + ) + ok = sum(1 for r in results if r.get("ok")) + if ok: + print(f" (notified {ok}/{len(results)} channel(s))") + except (json.JSONDecodeError, OSError, RuntimeError) as exc: + logger = logging.getLogger(__name__) + logger.warning("Channel notification failed: %s", exc) + + return summary_path + def run_iteration( campaign: dict, @@ -281,9 +333,15 @@ def _max_turns_for(phase_key: str) -> int: cli_dispatcher = inline_dispatcher llm_dispatcher = inline_dispatcher else: - # API mode: CLIDispatcher for code-access roles only (when repo_path is set) + # API or SDK mode: code-access dispatcher only when repo_path is set. + # SDK uses claude-agent-sdk; api uses the claude -p subprocess (CLIDispatcher). + if agent == "sdk": + from orchestrator.sdk_dispatch import SDKDispatcher + code_dispatcher_cls = SDKDispatcher + else: + code_dispatcher_cls = CLIDispatcher cli_dispatcher = ( - CLIDispatcher( + code_dispatcher_cls( work_dir=work_dir, campaign=campaign, model=_model_for("design"), timeout=timeout, max_turns=_max_turns_for("design"), @@ -345,7 +403,7 @@ def _max_turns_for(phase_key: str) -> int: print(f"\n{'='*60}") print(f" HUMAN DESIGN GATE") print(f"{'='*60}") - summary_path = _generate_gate_summary(llm_dispatcher, iter_dir, iteration, "design") + summary_path = _generate_gate_summary(llm_dispatcher, iter_dir, iteration, "design", campaign=campaign) decision, reason = gate.prompt( "Review the hypothesis bundle. Approve?", summary_path=str(summary_path) if summary_path else None, @@ -445,7 +503,7 @@ def _max_turns_for(phase_key: str) -> int: print(f"\n{'='*60}") print(f" HUMAN FINDINGS GATE") print(f"{'='*60}") - summary_path = _generate_gate_summary(llm_dispatcher, iter_dir, iteration, "findings") + summary_path = _generate_gate_summary(llm_dispatcher, iter_dir, iteration, "findings", campaign=campaign) decision, reason = gate.prompt( "Review the findings. Approve?", summary_path=str(summary_path) if summary_path else None, @@ -464,6 +522,16 @@ def _max_turns_for(phase_key: str) -> int: _merge_principles(work_dir, iter_dir) print(f" -> Principles merged into {work_dir / 'principles.json'}") + # ─── CLAUDE.md REGENERATE (Python, no LLM) β€” issue #131 ─────────────── + # Refresh per-campaign CLAUDE.md so the next iteration's session loads + # the updated principles + handoff via Claude Code's auto-context loading. + try: + from orchestrator.claude_md import regenerate_from_disk + regenerate_from_disk(work_dir, campaign, iteration=iteration) + except (OSError, RuntimeError) as exc: + # Best-effort: a CLAUDE.md write failure shouldn't abort the iteration. + logger.warning("Failed to regenerate CLAUDE.md: %s", exc) + if final: engine.transition("DONE") print(f"\n{'='*60}") @@ -493,7 +561,7 @@ def main() -> None: help="Timeout in seconds for claude -p calls (default: 1800)") parser.add_argument("--max-cli-retries", type=int, default=10, help="Max retries for claude -p failures (-1 = unbounded, default: 10)") - parser.add_argument("--agent", choices=["inline", "api"], default="api", + parser.add_argument("--agent", choices=["inline", "api", "sdk"], default="api", help="Dispatch backend: 'inline' emits prompts to stdout for the " "calling agent, 'api' uses the LLM API (default: api)") parser.add_argument("-v", "--verbose", action="store_true", diff --git a/orchestrator/llm_dispatch.py b/orchestrator/llm_dispatch.py index d4f4ece..3271fc4 100644 --- a/orchestrator/llm_dispatch.py +++ b/orchestrator/llm_dispatch.py @@ -53,9 +53,14 @@ def __init__( self._validate_campaign(campaign) self.campaign = campaign self.model = model + # PromptLoader prefers