diff --git a/README.md b/README.md index 1acd93b..0c8c2d0 100644 --- a/README.md +++ b/README.md @@ -245,8 +245,10 @@ when ALL of these hold**: 4. The campaign's apparatus checks are robust to design-agent variation, and validate ATTRIBUTION (not just upstream totals, #252 / F7). -5. A stale ``principles.json`` ledger is acceptable. Auto-approve - never gates on it. + +(Note: ``principles.json`` staleness is NOT a precondition. +Auto-approve never gates on it, so the ledger's freshness doesn't +affect whether the gate would have caught a deviation.) **If any of these fail**, either run interactively (no ``--auto-approve``) so a human reviewer sees the design at the gate, diff --git a/docs/friction-245-resolution.md b/docs/friction-245-resolution.md index 189dd1a..9feb576 100644 --- a/docs/friction-245-resolution.md +++ b/docs/friction-245-resolution.md @@ -19,7 +19,7 @@ implementation in one hop. | F9 | [#254](https://github.com/AI-native-Systems-Research/agentic-strategy-evolution/issues/254) | LOW | `nous clean --orphaned` subcommand in `cli.py` (`_cmd_clean`); supports `--target-repo`, `--campaign`, `--dry-run` | (CLI smoke; mirrors `gc_orphan_worktrees`) | | F10 | [#255](https://github.com/AI-native-Systems-Research/agentic-strategy-evolution/issues/255) | MED | New section "`--auto-approve` safety preconditions" in `README.md`; `--auto-approve` help text references it | (docs only) | | F11 | [#256](https://github.com/AI-native-Systems-Research/agentic-strategy-evolution/issues/256) | MED | `_emit_high_build_warning` in `iteration.py` runs after DESIGN; emits a sized recommendation to raise `max_turns.execute_analyze` | `tests/test_friction_245.py::test_f11_*` | -| F12 | [#257](https://github.com/AI-native-Systems-Research/agentic-strategy-evolution/issues/257) | LOW | `aiter_with_silence_watchdog`'s `aclose` path now wraps in `asyncio.wait_for(timeout=5)` and explicitly catches `(TimeoutError, CancelledError, RuntimeError, GeneratorExit)` | (covered by existing watchdog tests; race is non-deterministic) | +| F12 | [#257](https://github.com/AI-native-Systems-Research/agentic-strategy-evolution/issues/257) | LOW | `aiter_with_silence_watchdog`'s `aclose` path now wraps in `asyncio.wait_for(timeout=5)` and explicitly catches `(TimeoutError, CancelledError, RuntimeError, GeneratorExit)`; broad fallback now logs at WARNING instead of swallowing silently | `tests/test_friction_245.py::test_f12_*` | | F13 | [#258](https://github.com/AI-native-Systems-Research/agentic-strategy-evolution/issues/258) | HIGH | `nous create-campaign` scaffold gains a commented `locked_parameters` block + `locked_workload`, `derived_from`, `sdk_timeouts.turn_silence_threshold_seconds` (per-phase), `plot_specs`. New `docs/campaign-authoring-guide.md` includes the "what to lock" inventory | (existing scaffold tests cover schema-validity) | | F14 | [#259](https://github.com/AI-native-Systems-Research/agentic-strategy-evolution/issues/259) | N/A | `docs/campaign-authoring-guide.md` includes "Rehearsal as scientific instrument" section + "Pre-lock unit check" | (docs only) | | F15 | [#260](https://github.com/AI-native-Systems-Research/agentic-strategy-evolution/issues/260) | HIGH | `bundle.experiment_spec.physical_realism_check` schema + `_validate_physical_realism` soft-warn when `k_realism_ratio < 0.5` and justification is empty/perfunctory | `tests/test_friction_245.py::test_f15_*` | diff --git a/orchestrator/campaign.py b/orchestrator/campaign.py index dfd6163..3ee38fa 100644 --- a/orchestrator/campaign.py +++ b/orchestrator/campaign.py @@ -587,7 +587,9 @@ def main() -> None: run_id = args.run_id or campaign.get("run_id") or campaign_path.parent.name + "-run" repo_path = campaign.get("target_system", {}).get("repo_path") - work_dir = setup_work_dir(run_id, repo_path=repo_path) + work_dir = setup_work_dir( + run_id, repo_path=repo_path, campaign_path=str(campaign_path), + ) print(f"Working directory: {work_dir.resolve()}") print(f"Max iterations: {max_iter}") diff --git a/orchestrator/cli.py b/orchestrator/cli.py index f78af4f..4521059 100644 --- a/orchestrator/cli.py +++ b/orchestrator/cli.py @@ -235,7 +235,9 @@ def _cmd_run(args): file=sys.stderr, ) - work_dir = setup_work_dir(run_id, repo_path=repo_path) + work_dir = setup_work_dir( + run_id, repo_path=repo_path, campaign_path=str(campaign_path), + ) max_iterations = args.max_iterations if args.max_iterations is not None else campaign.get("max_iterations", 10) # #188: --bundle / --problem-md / --handoff-md only apply to iter-1. @@ -303,8 +305,10 @@ def _cmd_resume(args): f"Got: {args.target}\n" f"This appears to be a work_dir. Use ``nous status " f"{args.target}`` to inspect the work_dir; ``nous resume`` " - f"needs the campaign yaml so it can re-validate the spec " - f"and re-emit reproducibility metadata (#253 / F8)." + f"needs the campaign yaml so it can re-validate the spec. " + f"(reproducibility_metadata captured at the original INIT " + f"is preserved — first-capture-wins, #262 / F17.) " + f"(#253 / F8)" f"{hint}", file=sys.stderr, ) @@ -934,8 +938,17 @@ def _cmd_clean(args): def _cmd_package(args): - """#263 (F18): tarball work_dir + reproduce.sh + Dockerfile + README.""" + """#263 (F18): tarball work_dir + reproduce.sh + Dockerfile + README. + + Staging artifacts (reproduce.sh, Dockerfile, PACKAGE_README.md) are + written to a temp directory and added to the tarball at the + ``/`` prefix — they never touch the work_dir on disk. + Successive runs of ``nous package`` produce identical tarballs + without accumulating staging files in the campaign's persistent + state. + """ import tarfile + import tempfile import textwrap work_dir = resolve_work_dir(args.target) @@ -1009,15 +1022,20 @@ def _cmd_package(args): ``` """) - # Stage these alongside the work_dir for tar inclusion. - pkg_root = work_dir - (pkg_root / "reproduce.sh").write_text(reproduce_sh) - (pkg_root / "reproduce.sh").chmod(0o755) - (pkg_root / "Dockerfile").write_text(dockerfile) - (pkg_root / "PACKAGE_README.md").write_text(readme) - - with tarfile.open(output, "w:gz") as tar: - tar.add(work_dir, arcname=work_dir.name) + # Stage to a temp directory (gone after this command), tar both + # work_dir AND the staged files at / prefix. The work_dir + # on disk is unchanged — this command is read-only with respect + # to the campaign's persistent state. + with tempfile.TemporaryDirectory() as tmp_root: + tmp = Path(tmp_root) + (tmp / "reproduce.sh").write_text(reproduce_sh) + (tmp / "reproduce.sh").chmod(0o755) + (tmp / "Dockerfile").write_text(dockerfile) + (tmp / "PACKAGE_README.md").write_text(readme) + with tarfile.open(output, "w:gz") as tar: + tar.add(work_dir, arcname=work_dir.name) + for staged in ("reproduce.sh", "Dockerfile", "PACKAGE_README.md"): + tar.add(tmp / staged, arcname=f"{work_dir.name}/{staged}") print(f"Wrote {output}") @@ -1184,10 +1202,11 @@ def main(): p_stop.add_argument( "--immediate", action="store_true", help="Event-boundary halt (#250 / F5). Writes a STOP_IMMEDIATE " - "sentinel that the SDK turn loop checks at each tool-call " - "return — aborts within seconds rather than at the next " - "phase boundary. Use when EXECUTE_ANALYZE is building " - "wrong code and you want to halt promptly.", + "sentinel that the SDK turn loop checks at each event " + "boundary (every SDK message) — aborts within seconds " + "rather than at the next phase boundary. Use when " + "EXECUTE_ANALYZE is building wrong code and you want " + "to halt promptly.", ) p_stop.set_defaults(func=_cmd_stop) diff --git a/orchestrator/iteration.py b/orchestrator/iteration.py index 27fda84..7577cde 100644 --- a/orchestrator/iteration.py +++ b/orchestrator/iteration.py @@ -34,7 +34,17 @@ from orchestrator.engine import Engine from orchestrator.gates import HumanGate +from orchestrator.lineage import ( + apply_derived_from_patch, + emit_cumulative_patch, + resolve_derived_from, +) from orchestrator.llm_dispatch import LLMDispatcher +from orchestrator.plot_specs import invoke_plot_specs +from orchestrator.reproducibility import ( + capture_reproducibility_metadata, + snapshot_iter_files, +) from orchestrator.util import atomic_write logger = logging.getLogger(__name__) @@ -724,7 +734,32 @@ def _merge_principles(work_dir: Path, iter_dir: Path) -> None: atomic_write(principles_path, json.dumps(store, indent=2) + "\n") -def setup_work_dir(run_id: str, repo_path: str | None = None) -> Path: +def _campaign_yaml_dir_from_state(work_dir: Path) -> Path | None: + """#263 (F18): resolve the campaign.yaml's directory from state.json. + + Plot scripts declared in ``campaign.plot_specs[].script`` are + relative to the campaign.yaml's directory. ``config_ref`` is + recorded in state.json at ``setup_work_dir`` time; this helper + is the single read site so a legacy campaign without + ``config_ref`` returns ``None`` rather than guessing. + """ + state_path = work_dir / "state.json" + if not state_path.exists(): + return None + try: + state = json.loads(state_path.read_text()) + except (OSError, json.JSONDecodeError): + return None + config_ref = state.get("config_ref") if isinstance(state, dict) else None + if not config_ref: + return None + return Path(config_ref).parent + + +def setup_work_dir( + run_id: str, repo_path: str | None = None, + campaign_path: str | None = None, +) -> Path: """Create and initialize a working directory from templates. See ``orchestrator/work_dir_resolver.py`` for the canonical @@ -817,13 +852,20 @@ def setup_work_dir(run_id: str, repo_path: str | None = None) -> Path: # detection and future cross-machine discovery. state["work_dir"] = str(work_dir.resolve()) state["repo_path"] = str(Path(repo_path).resolve()) if repo_path else None + # #263 (F18): record the campaign.yaml's absolute path so + # plot_specs scripts (declared relative to that file) can be + # resolved at REPORT/finalize time. Only set when provided — + # don't clobber a value already recorded by a prior setup. + if campaign_path is not None: + state["config_ref"] = str(Path(campaign_path).resolve()) + elif "config_ref" not in state: + state["config_ref"] = None # #262 (F17): auto-capture reproducibility metadata at INIT (before # any DESIGN turn fires). First capture wins — re-running INIT on # an existing campaign preserves the original commit/dirty/sha # values, which is what reviewers want (the state at campaign # start, not at iter-3 resume time). if "reproducibility_metadata" not in state: - from orchestrator.reproducibility import capture_reproducibility_metadata state["reproducibility_metadata"] = capture_reproducibility_metadata( Path(repo_path) if repo_path else None ) @@ -850,10 +892,12 @@ def setup_work_dir(run_id: str, repo_path: str | None = None) -> Path: def _emit_high_build_warning(bundle_path: Path, max_turns_execute_analyze: int) -> None: """#256 (F11): warn when bundle.code_changes implies a high BUILD count. - Threshold heuristic: ``len(arms-with-code_changes) >= 5`` OR - ``total_files >= 5``. Below the threshold, no warning. At/above, + Threshold: ``total_files >= 5`` (sum of code_changes entries + across all arms with a non-empty list). Below 5, silent. At/above, print a recommendation that the operator raise - ``campaign.max_turns.execute_analyze`` to ~``120 + 30 * total_files``. + ``campaign.max_turns.execute_analyze`` to ~``120 + 30 * total_files``, + suppressed when the operator already set a value at or above + that target. Pure print — never raises — so a misshapen bundle that the schema validator already rejected doesn't fail this advisory pass. @@ -1090,13 +1134,14 @@ def _max_turns_for(phase_key: str) -> int: # #262 (F17): snapshot latency/hardware config files into # runs/iter-N/snapshots/ so a future reviewer can diff the exact # numbers each iter ran with — even if the operator later edits the - # source-of-truth file in the target repo. Best-effort; missing - # files are skipped (the candidate list is target-agnostic). - if repo_path and not (iter_dir / "snapshots").exists(): + # source-of-truth file in the target repo. Idempotency lives in + # snapshot_iter_files itself (mkdir exist_ok + content overwrite), + # so we don't gate on the directory's existence — that would skip + # re-snapshotting on resume after a manual touch. + if repo_path: try: - from orchestrator.reproducibility import snapshot_iter_files snapshot_iter_files(Path(repo_path), iter_dir) - except (OSError, ImportError) as exc: + except OSError as exc: logger.warning("repro snapshot for iter-%d skipped: %s", iteration, exc) if engine.phase == "DONE": @@ -1268,23 +1313,17 @@ def _max_turns_for(phase_key: str) -> int: # preflight if the campaign declares one. Failure to apply # is surfaced loudly (the user must rebase the prior # campaign or update derived_from.iteration); we do NOT - # silently proceed. - try: - from orchestrator.lineage import ( - apply_derived_from_patch, - resolve_derived_from, - ) - derived_patch = resolve_derived_from( - campaign, repo_path=Path(repo_path), - ) - if derived_patch is not None: - ok, msg = apply_derived_from_patch(experiment_dir, derived_patch) - if ok: - print(f" derived_from: {msg}") - else: - raise RuntimeError(msg) - except ImportError: - pass + # silently proceed. Imports are at module top — a broken + # orchestrator.lineage is a self-inflicted bug, not an + # optional dependency. + derived_patch = resolve_derived_from( + campaign, repo_path=Path(repo_path), + ) + if derived_patch is not None: + ok, msg = apply_derived_from_patch(experiment_dir, derived_patch) + if not ok: + raise RuntimeError(msg) + print(f" derived_from: {msg}") if cli_dispatcher: import contextlib ctx = cli_dispatcher.override_cwd(experiment_dir) if experiment_dir else contextlib.nullcontext() @@ -1378,15 +1417,23 @@ def _max_turns_for(phase_key: str) -> int: # the experiment worktree branch. The cumulative form is what # future ``derived_from`` campaigns reuse; the per-arm patches # are incremental on the branch state and don't apply to a - # fresh main checkout. + # fresh main checkout. emit_cumulative_patch is best-effort + # internally (returns None on git failure) and writes a + # cumulative.patch.error sidecar so a later operator inspecting + # ``nous lineage`` can see why inheritance broke. if repo_path and experiment_id: - try: - from orchestrator.lineage import emit_cumulative_patch - emit_cumulative_patch( - Path(repo_path), f"nous-exp-{experiment_id}", iter_dir, + cumulative_path = emit_cumulative_patch( + Path(repo_path), f"nous-exp-{experiment_id}", iter_dir, + ) + if cumulative_path is None: + # I1: surface the failure to the user-facing console too, + # not just orchestrator.log — `derived_from` campaigns + # depend on this artifact. + print( + f" ⚠ cumulative.patch emit failed for iter-{iteration} — " + f"see {iter_dir / 'patches' / 'cumulative.patch.error'} " + f"if you plan to derive a future campaign from this run." ) - except (ImportError, OSError) as exc: - logger.warning("cumulative patch emit skipped: %s", exc) # Clean up worktree only on success if repo_path and experiment_id: remove_experiment_worktree(Path(repo_path), experiment_id) @@ -1433,15 +1480,27 @@ def _max_turns_for(phase_key: str) -> int: iteration=iteration, campaign=campaign, ) # #263 (F18): invoke plot_specs scripts after findings.json exists. - # Best-effort — plot failures never block the iteration. + # Best-effort — plot failures never block the iteration. The + # campaign_yaml_dir is read from state.json (recorded at INIT) + # so script paths declared in campaign.plot_specs[].script + # resolve relative to the campaign.yaml's directory, not + # work_dir's parent. if campaign.get("plot_specs"): try: - from orchestrator.plot_specs import invoke_plot_specs - results = invoke_plot_specs(campaign, iter_dir) + campaign_yaml_dir = _campaign_yaml_dir_from_state(work_dir) + results = invoke_plot_specs( + campaign, iter_dir, campaign_yaml_dir=campaign_yaml_dir, + ) ok = sum(1 for r in results if r.get("ok")) print(f" plot_specs: {ok}/{len(results)} succeeded → " f"{iter_dir / 'figures'}") - except (ImportError, OSError) as exc: + # Persist the per-spec result rows so failures are + # inspectable post-hoc, not just in the orchestrator log. + atomic_write( + iter_dir / "figures" / "plot_specs_results.json", + json.dumps(results, indent=2) + "\n", + ) + except OSError as exc: logger.warning("plot_specs invocation skipped: %s", exc) print(f" -> Principles merged into {work_dir / 'principles.json'}") print(f" -> best_found.json updated at {work_dir / 'best_found.json'}") @@ -1509,7 +1568,9 @@ def main() -> None: run_id = args.run_id or campaign.get("run_id") or campaign_path.parent.name + "-run" repo_path = campaign.get("target_system", {}).get("repo_path") - work_dir = setup_work_dir(run_id, repo_path=repo_path) + work_dir = setup_work_dir( + run_id, repo_path=repo_path, campaign_path=str(campaign_path), + ) print(f"Working directory: {work_dir.resolve()}") run_iteration( diff --git a/orchestrator/lineage.py b/orchestrator/lineage.py index 9ae1f0d..253903c 100644 --- a/orchestrator/lineage.py +++ b/orchestrator/lineage.py @@ -33,6 +33,8 @@ import subprocess from pathlib import Path +import yaml + logger = logging.getLogger(__name__) @@ -61,12 +63,21 @@ def emit_cumulative_patch( The cumulative form is what future campaigns reuse via ``derived_from``. The existing per-arm ``.patch`` files (incremental, branch-state-dependent) remain unchanged. + + Failure surface: when emission fails, a sidecar + ``patches/cumulative.patch.error`` is written with the git stderr. + ``summarize_lineage`` (and ``nous lineage`` via that helper) read + the sidecar and surface the failure to the operator. Without the + sidecar, a failed emission becomes a single warning line in + orchestrator.log that downstream ``derived_from`` campaigns will + silently miss months later. (Review I1.) """ repo_path = Path(repo_path) iter_dir = Path(iter_dir) patches_dir = iter_dir / "patches" patches_dir.mkdir(parents=True, exist_ok=True) cumulative_path = patches_dir / "cumulative.patch" + error_path = patches_dir / "cumulative.patch.error" main_ref = _git_main_ref(repo_path) try: @@ -76,14 +87,27 @@ def emit_cumulative_patch( ) except (subprocess.SubprocessError, OSError) as exc: logger.warning("emit_cumulative_patch: git diff failed (%s)", exc) + error_path.write_text( + f"git diff {main_ref}..{branch_name} subprocess error: {exc}\n" + ) return None if result.returncode != 0: logger.warning( "emit_cumulative_patch: git diff %s..%s failed: %s", main_ref, branch_name, result.stderr.strip(), ) + error_path.write_text( + f"git diff {main_ref}..{branch_name} returncode={result.returncode}\n" + f"stderr:\n{result.stderr}" + ) return None cumulative_path.write_text(result.stdout) + # Clean up any prior sidecar from a previous failed attempt. + if error_path.exists(): + try: + error_path.unlink() + except OSError: + pass return cumulative_path @@ -215,16 +239,22 @@ def summarize_lineage(work_dir: Path) -> dict: pass # Look for campaign.yaml.copy or sibling campaign yaml for derived_from. + # Errors (unreadable / malformed) are surfaced into the summary so + # ``nous lineage`` shows the operator why derived_from couldn't be + # determined, rather than silently leaving it as ``null``. for candidate in (work_dir / "campaign.yaml.copy", work_dir / "campaign.yaml"): if candidate.exists(): try: - import yaml as _yaml - campaign = _yaml.safe_load(candidate.read_text()) or {} - if isinstance(campaign.get("derived_from"), dict): - summary["derived_from"] = campaign["derived_from"] + campaign = yaml.safe_load(candidate.read_text()) or {} + except OSError as exc: + summary["campaign_yaml_error"] = f"unreadable: {exc}" + break + except yaml.YAMLError as exc: + summary["campaign_yaml_error"] = f"malformed: {exc}" break - except (OSError, Exception): # noqa: BLE001 — best-effort - pass + if isinstance(campaign.get("derived_from"), dict): + summary["derived_from"] = campaign["derived_from"] + break runs_dir = work_dir / "runs" if runs_dir.is_dir(): @@ -240,5 +270,11 @@ def summarize_lineage(work_dir: Path) -> dict: str(cumulative) if cumulative.is_file() and cumulative.stat().st_size > 0 else None ) + error_sidecar = entry / "patches" / "cumulative.patch.error" + if error_sidecar.is_file(): + try: + iter_info["cumulative_patch_error"] = error_sidecar.read_text().strip() + except OSError: + iter_info["cumulative_patch_error"] = "(error file unreadable)" summary["iterations"].append(iter_info) return summary diff --git a/orchestrator/plot_specs.py b/orchestrator/plot_specs.py index a62869c..5caf33f 100644 --- a/orchestrator/plot_specs.py +++ b/orchestrator/plot_specs.py @@ -46,10 +46,24 @@ def invoke_plot_specs( figures_dir.mkdir(parents=True, exist_ok=True) if campaign_yaml_dir is None: - # Fall back to the work_dir's parent — best-effort. Operators - # who need a different base can pass ``campaign_yaml_dir`` - # explicitly via ``_generate_report`` plumbing. - campaign_yaml_dir = iter_dir.parent.parent.parent + # No fallback by design: plot script paths declared in + # campaign.plot_specs[].script are relative to the + # campaign.yaml's directory, which is recorded as + # ``state.json["config_ref"]`` at setup_work_dir time + # (#263 / F18). The caller must read it via + # ``orchestrator.iteration._campaign_yaml_dir_from_state``. + # Returning an empty result is the right answer here — + # guessing a directory and silently failing to resolve + # scripts was the bug from review I1. + logger.warning( + "invoke_plot_specs called without campaign_yaml_dir; " + "skipping (script paths cannot be resolved without it)." + ) + return [ + {"id": (s or {}).get("id", ""), "ok": False, + "error": "no campaign_yaml_dir available"} + for s in specs if isinstance(s, dict) + ] out: list[dict] = [] for spec in specs: @@ -75,7 +89,7 @@ def invoke_plot_specs( } try: result = subprocess.run( - [_pick_interpreter(script_path), str(script_path)], + _build_command(script_path), env=env, capture_output=True, text=True, check=False, timeout=300, ) @@ -105,16 +119,22 @@ def invoke_plot_specs( return out -def _pick_interpreter(script_path: Path) -> str: - """Pick a sensible interpreter for a figure script. Honors - shebang via direct execution when the file is executable; else - dispatches by extension. Defaults to ``python3``. +def _build_command(script_path: Path) -> list[str]: + """Build the argv list for invoking ``script_path``. + + Dispatch by extension (``.py`` → python3, ``.sh``/``.bash`` → + bash). For executable files with no recognized extension, invoke + directly via the shebang (single-element argv) — the previous + ``_pick_interpreter`` returned the script as both interpreter + and argv[1], which made the script invoke itself with itself + as its first argument. Falls back to python3 for unknown + non-executable suffixes (the most common authoring shape). """ suffix = script_path.suffix.lower() - if suffix in (".py",): - return "python3" + if suffix == ".py": + return ["python3", str(script_path)] if suffix in (".sh", ".bash"): - return "bash" + return ["bash", str(script_path)] if os.access(script_path, os.X_OK): - return str(script_path) - return "python3" + return [str(script_path)] + return ["python3", str(script_path)] diff --git a/orchestrator/reproducibility.py b/orchestrator/reproducibility.py index eeeaa8b..122f694 100644 --- a/orchestrator/reproducibility.py +++ b/orchestrator/reproducibility.py @@ -11,9 +11,15 @@ exact numbers each iter ran with, even if the operator later edits the source-of-truth file in the target repo). -Pure Python, no LLM. Idempotent: re-running on an existing work_dir -preserves the original capture (you don't accidentally rewrite the -``repo_commit`` field after several iterations have run). +Pure Python, no LLM. Idempotent at the caller layer: +``setup_work_dir`` (in iteration.py) guards +``state["reproducibility_metadata"]`` before invoking the capture, +so re-running on an existing work_dir preserves the original +capture (you don't accidentally rewrite the ``repo_commit`` field +after several iterations have run). The same first-capture-wins +guard is also implemented in ``attach_to_state`` for external +tooling; production goes through ``setup_work_dir``'s inline +guard, not through ``attach_to_state``. """ from __future__ import annotations @@ -231,18 +237,37 @@ def snapshot_iter_files( def attach_to_state(work_dir: Path, block: dict) -> None: - """Persist the reproducibility_metadata block into state.json - (idempotent: don't overwrite a block already present unless the - captured_at is older than 24h, which signals a re-init). - - State-only — campaign.yaml stays user-set. The captured block is - surfaced via ``nous status`` from state.json. + """Persist the reproducibility_metadata block into state.json. + + First-capture-wins: if state.json already has a + ``reproducibility_metadata`` dict with ``captured_at``, we + leave it untouched. Re-running ``nous run`` on an existing + campaign therefore preserves the original commit/dirty/sha + values, which is what reviewers want (the state at campaign + start, not at iter-3 resume time). + + state.json must already exist and be valid JSON — this function + is meant to run after ``setup_work_dir``. A read failure means + something has gone seriously wrong (state corruption, or the + caller used the wrong work_dir), so we raise rather than + silently swallow. + + Note: the production capture path inlines this logic in + ``setup_work_dir`` (iteration.py); ``attach_to_state`` is the + public helper for tests and external tooling that need the + same semantics without going through ``setup_work_dir``. """ state_path = work_dir / "state.json" try: state = json.loads(state_path.read_text()) - except (OSError, json.JSONDecodeError): - return + except OSError as exc: + raise RuntimeError( + f"reproducibility: cannot read {state_path}: {exc}" + ) from exc + except json.JSONDecodeError as exc: + raise RuntimeError( + f"reproducibility: {state_path} is malformed: {exc}" + ) from exc existing = state.get("reproducibility_metadata") if isinstance(existing, dict) and "captured_at" in existing: # Already present — don't rewrite. The first capture wins, diff --git a/orchestrator/schemas/campaign.schema.yaml b/orchestrator/schemas/campaign.schema.yaml index 89748fd..12ebe84 100644 --- a/orchestrator/schemas/campaign.schema.yaml +++ b/orchestrator/schemas/campaign.schema.yaml @@ -151,8 +151,11 @@ properties: Live mid-turn watchdog (#205) override. Scalar form applies globally (legacy). Map form (#264 / F19) applies per-phase — recommended defaults: design=600, execute_analyze=120, - report=240. Phases not listed in the map fall back to - ``silence_threshold_seconds`` (or 600 if also unset). + report=240. Phases not listed in the map fall back to those + per-phase defaults (the SDKDispatcher seeds the map at + construction time, then overlays the operator's values). + ``silence_threshold_seconds`` is a separate post-mortem + threshold and does NOT participate in this resolution. sandbox: type: string @@ -357,14 +360,23 @@ properties: plot_specs: type: array description: > - Issue #263 (F18): declarative figure pipeline. After - ``findings.json`` is written in REPORT, nous invokes each - script with environment variables ``NOUS_RESULTS_DIR`` (the - iter's results/) and ``NOUS_FIGURES_DIR`` (the iter's - figures/, auto-created). The script's job is to read JSON - from results/ and write outputs to figures/. Solves the - "every paper rebuilds figure scaffolding" problem in one + Issue #263 (F18): declarative figure pipeline. After each + iteration's ``findings.json`` is written (during the + finalize/REPORT step inside ``run_iteration``, NOT a separate + end-of-campaign rollup), nous invokes each script with + environment variables ``NOUS_RESULTS_DIR`` (the iter's + results/) and ``NOUS_FIGURES_DIR`` (the iter's figures/, + auto-created). The script's job is to read JSON from + results/ and write outputs to figures/. Solves the "every + paper rebuilds figure scaffolding" problem in one campaign-level declaration. + + Script paths in ``script:`` are relative to the + ``campaign.yaml``'s directory (recorded as + ``state.json["config_ref"]`` at INIT time). Per-iter results + land at ``/runs/iter-N/figures/``; a + ``plot_specs_results.json`` sidecar in that directory records + ok/failure per-spec for post-hoc inspection. items: type: object required: [id, script] diff --git a/orchestrator/sdk_dispatch.py b/orchestrator/sdk_dispatch.py index c5b7d97..5152deb 100644 --- a/orchestrator/sdk_dispatch.py +++ b/orchestrator/sdk_dispatch.py @@ -179,8 +179,15 @@ async def aiter_with_silence_watchdog(aiter, threshold: float | None): await asyncio.wait_for(coro, timeout=5.0) # type: ignore[arg-type] except (asyncio.TimeoutError, asyncio.CancelledError, RuntimeError, GeneratorExit): pass # already running / racing — let the loop tear down naturally - except Exception: # noqa: BLE001 — best-effort cleanup - pass + except Exception as exc: # noqa: BLE001 — best-effort cleanup + # We're in a finally block already unwinding the + # primary exception; do NOT re-raise (would mask the + # original) but DO log so a real cleanup-side defect + # has an audit trail. Review I3. + logger.warning( + "aclose cleanup raised %s: %s", + type(exc).__name__, exc, + ) def summarize_silence_gaps(event_log_path: Path) -> dict: @@ -434,15 +441,22 @@ async def _run() -> SDKResult: # ``/runs/iter-N/inputs/executor_log.jsonl``) so # the loop can check for STOP_IMMEDIATE at each message # boundary without re-walking the filesystem every event. + # + # Path walk: executor_log.jsonl/.. = inputs ; inputs/.. = + # iter-N ; iter-N/.. = runs ; runs/.. = work_dir. + # Four ``.parent`` calls. ``.parent`` never raises (returns + # the filesystem root if the path is too short), so we + # defensively check the resolved path lives under a + # directory that actually exists rather than wrapping in + # try/except. stop_immediate_path: Path | None = None if event_log_path is not None: - # walk up to work_dir: inputs/.. = iter-N; iter-N/.. = runs; runs/.. = work_dir - try: - stop_immediate_path = ( - Path(event_log_path).parent.parent.parent.parent / "STOP_IMMEDIATE" - ) - except (IndexError, ValueError): - stop_immediate_path = None + candidate = ( + Path(event_log_path).parent.parent.parent.parent + / "STOP_IMMEDIATE" + ) + if candidate.parent.exists(): + stop_immediate_path = candidate async for message in aiter_with_silence_watchdog( aiter, turn_silence_threshold, ): @@ -597,10 +611,6 @@ def __init__( # ``campaign.sdk_timeouts.turn_silence_threshold_seconds`` to a # different value, or to 0 to disable the live watchdog while # keeping the post-mortem analyzer. - raw_turn_threshold = timeouts.get( - "turn_silence_threshold_seconds", - self._silence_threshold, - ) # #264 (F19): scalar (legacy) OR per-phase map. Per-phase # defaults — DESIGN's heavy reasoning between tool calls # earns 600s; EXECUTE_ANALYZE's frequent simulator calls @@ -611,7 +621,18 @@ def __init__( "execute_analyze": 120.0, "report": 240.0, } - if isinstance(raw_turn_threshold, dict): + # Only override the per-phase defaults when the operator + # explicitly set ``turn_silence_threshold_seconds``. If the + # field is absent, fall through and let the per-phase + # defaults stand — defaulting to ``silence_threshold_seconds`` + # would silently apply 600 to every phase, defeating the + # whole point of F19's per-phase split. + explicit_turn_threshold = "turn_silence_threshold_seconds" in timeouts + raw_turn_threshold = timeouts.get( + "turn_silence_threshold_seconds", + self._silence_threshold, + ) + if explicit_turn_threshold and isinstance(raw_turn_threshold, dict): for phase_key in ("design", "execute_analyze", "report"): if phase_key in raw_turn_threshold: try: @@ -637,7 +658,9 @@ def __init__( self._turn_silence_threshold = max( self._phase_silence_thresholds.values() ) - else: + elif explicit_turn_threshold: + # Operator set a scalar (legacy form) — apply it to every + # phase, preserving pre-F19 behavior. try: self._turn_silence_threshold = float(raw_turn_threshold) except (TypeError, ValueError) as exc: @@ -650,11 +673,17 @@ def __init__( f"campaign.sdk_timeouts.turn_silence_threshold_seconds must be " f">= 0, got {self._turn_silence_threshold}" ) - # Scalar form applies to every phase (backward compat). self._phase_silence_thresholds = { k: self._turn_silence_threshold for k in self._phase_silence_thresholds } + else: + # Neither map nor scalar set — keep the per-phase defaults. + # Legacy scalar attribute mirrors the highest default for + # callers that read it directly. + self._turn_silence_threshold = max( + self._phase_silence_thresholds.values() + ) # #127 Phase B: event log path is recomputed per-dispatch (it depends # on the iteration), so we don't store it on the dispatcher. self._event_log_path: Path | None = None @@ -702,8 +731,16 @@ def _resolve_turn_silence_threshold(self, phase: str) -> float: Resolution chain (highest priority first): 1. Bundle-side per-phase override (rehearsal-recorded). 2. Bundle-side scalar override (rehearsal-recorded, legacy). - 3. Campaign-side per-phase value (set in __init__). - 4. Phase default (design=600, execute_analyze=120, report=240). + 3. Campaign-side per-phase value, falling back to the + phase default (design=600, execute_analyze=120, report=240). + + Steps 3 and 4 from the friction-report write-up are merged + in ``self._phase_silence_thresholds``: the constructor seeds + the dict with the per-phase defaults, then overlays operator + values from ``campaign.sdk_timeouts.turn_silence_threshold_seconds``. + A single dict lookup therefore reads the right value + regardless of which layer set it. + Returns 0 only if every layer evaluated to 0 (operator opted out). """ bundle_per_phase = self._bundle_silence_phase_overrides diff --git a/orchestrator/validate.py b/orchestrator/validate.py index 5e2afb2..7e6d241 100644 --- a/orchestrator/validate.py +++ b/orchestrator/validate.py @@ -295,11 +295,30 @@ def _validate_locked_workload( deviations: list[str] = [] workload_yamls = sorted(inputs_dir.glob("*.yaml")) + sorted(inputs_dir.glob("*.yml")) for workload_path in workload_yamls: + # F20's whole point is to catch silent workload drift — + # malformed/unreadable input yaml is exactly the regime where + # DESIGN-vs-execution divergence hides. Surface it as a + # deviation so the validator's caller (validate_design) + # raises with a clear diagnostic, not a silent skip. try: - data = yaml.safe_load(workload_path.read_text()) - except (OSError, yaml.YAMLError): + text = workload_path.read_text() + except OSError as exc: + deviations.append( + f" - {workload_path.name}: cannot read ({exc})" + ) + continue + try: + data = yaml.safe_load(text) + except yaml.YAMLError as exc: + deviations.append( + f" - {workload_path.name}: malformed yaml ({exc})" + ) continue if not isinstance(data, dict): + deviations.append( + f" - {workload_path.name}: top-level is not a mapping " + f"({type(data).__name__})" + ) continue # Compare every top-level locked field. _walk_locked_workload( @@ -322,6 +341,15 @@ def _walk_locked_workload( """Recursive walk for #265: compare locked dict against actual, report any mismatch not present in ``declared`` (set of (tenant, field) tuples from workload_changes_from_canonical.diff). + + Tenant threading: assumes ``locked_workload`` has the canonical + shape ``{tenants: {: {…}}}`` — when we descend through + a ``tenants`` key, the next-level key IS the tenant id, and we + thread it into the (tenant, field-path) deviation tuple. Other + group keys (e.g. a hypothetical ``service_classes..…``) are + NOT threaded; they appear in deviation entries with ``tenant=None``, + which means matching them in workload_changes_from_canonical + requires a non-tenant-keyed declared diff entry. """ for key, expected in locked.items(): sub_path = f"{path}.{key}" if path else key @@ -466,7 +494,7 @@ def compute_campaign_spec_diff( --auto-approve. The diff is "soft" (informational) by default — F1's ``_validate_locked_parameters`` is the hard-fail layer. - Returns a dict with three sub-keys: + Returns a dict with five sub-keys: * ``locked_parameters_violations`` — list of {param, campaign, bundle} entries (these are also hard validation failures upstream; recorded here so an auditor sees them in one place). diff --git a/prompts/methodology/execute_analyze.md b/prompts/methodology/execute_analyze.md index 18d391c..d88d65e 100644 --- a/prompts/methodology/execute_analyze.md +++ b/prompts/methodology/execute_analyze.md @@ -37,10 +37,17 @@ bug I want to catch were present, would this invariant fail?* If the bug-of-interest involves attribution among items, your invariant must distinguish per-item, not just sum. -**Worked example (paper-memorytime-mirage, BLIS sim/kvtime/meter.go).** +**Worked example (paper-memorytime-mirage, BLIS ``sim/kvtime/meter.go``, +iter-1, 2026-05).** - Conservation invariant: ``Σ_RequestMap == UsedBlocks · BlockSize``. ✅ Always passed. - Per-tenant attribution: walked ``runningBatch``, not ``RequestMap``. -- Author's own comment: *"RequestMap may also contain requests NOT in runningBatch"* — i.e., orphans (preempted/swapped requests holding KV blocks). +- Author's own comment near the attribution loop: *"RequestMap may + also contain requests NOT in runningBatch"* — i.e., orphans + (preempted/swapped requests holding KV blocks). The exact line + number drifts as BLIS evolves; grep for ``runningBatch`` in + ``sim/kvtime/`` of the BLIS repo at the campaign's recorded + ``repo_commit`` (see ``state.json.reproducibility_metadata``) to + reproduce the snapshot the original analysis was against. - Result: orphans counted toward ``UsedBlocks · BlockSize`` (right-hand side of the conservation check) but NOT attributed to any tenant in ``Accumulated``. Per-tenant ``A_i(t)`` silently undercounted; conservation passed. The conservation check validated the upstream total, not the diff --git a/tests/test_friction_245.py b/tests/test_friction_245.py index fbf9e48..5dda4f5 100644 --- a/tests/test_friction_245.py +++ b/tests/test_friction_245.py @@ -4,6 +4,12 @@ Each F-entry is independently exercised. Mocks (per CLAUDE.md): no live LLM calls; injected fakes for any subprocess that would otherwise hit the network. + +Post-review additions (round 2): F4 auto_approve roundtrip, F19 +_resolve_turn_silence_threshold per-phase + scalar back-compat, F17 +first-capture-wins idempotency + repo_dirty capture, F11 boundary +at total_files=4/5, F12 RuntimeError swallowing, F20 declared- +deviation real assertion, F21 apply_derived_from_patch round-trip. """ from __future__ import annotations @@ -288,6 +294,19 @@ def test_f20_locked_workload_diff_fails_undeclared_deviation(tmp_path: Path): def test_f20_locked_workload_diff_passes_with_declared_deviation(tmp_path: Path): + """Declared deviation in workload_changes_from_canonical → no error. + + The walker's deviation tuple is ``(tenant, sub_path)`` where + sub_path is the dotted path BUILT during the walk. For a + locked_workload structured as ``{tenants: {A: {input_distribution: + {value: 1024}}}}``, the walker descends: + path="" + "tenants" → "tenants" + path="tenants" + "A" → "tenants.A" (tenant id captured) + path="tenants.A" + "input_distribution" → "tenants.A.input_distribution" + path="tenants.A.input_distribution" + "value" → "tenants.A.input_distribution.value" + So a declared diff entry must have field= + "tenants.A.input_distribution.value" and tenant="A" to match. + """ iter_dir = tmp_path / "iter-1" inputs_dir = iter_dir / "inputs" inputs_dir.mkdir(parents=True) @@ -298,18 +317,28 @@ def test_f20_locked_workload_diff_passes_with_declared_deviation(tmp_path: Path) }}} bundle = {"workload_changes_from_canonical": { "rationale": "Pivoted to unit-length construction.", - "diff": [{"tenant": "A", "field": "tenants.A.input_distribution.value", + "diff": [{"tenant": "A", + "field": "tenants.A.input_distribution.value", "from": 1024, "to": 4000}], }} - # When the field path matches the declared diff, it's allowed. - # The walker uses the (tenant, field-path) tuple as the key. errors = _validate_locked_workload(iter_dir, bundle, campaign) - # Either 0 errors (path matched) or the message describes declared deviation. - # The test's actual constraint: the workload yaml does NOT hard-fail - # the validate_design path. - # In practice the walker may match imperfectly on nested paths; the - # important test is the F20 declared-vs-undeclared bisect. - assert isinstance(errors, list) + assert errors == [], ( + f"declared deviation should silence the validator, got: {errors}" + ) + + +def test_f20_malformed_workload_yaml_surfaces_as_deviation(tmp_path: Path): + """C2 fix: malformed workload yaml is the regime F20 exists to + catch — it must surface as a hard validation error, not a silent + skip.""" + iter_dir = tmp_path / "iter-1" + inputs_dir = iter_dir / "inputs" + inputs_dir.mkdir(parents=True) + (inputs_dir / "workload.yaml").write_text("not: valid: yaml: [unbalanced") + campaign = {"locked_workload": {"tenants": {"A": {}}}} + errors = _validate_locked_workload(iter_dir, {}, campaign) + assert len(errors) == 1 + assert "malformed yaml" in errors[0] def test_f20_locked_workload_no_block_skips(tmp_path: Path): @@ -451,3 +480,466 @@ def test_f11_no_warning_for_low_count(tmp_path: Path, capsys): _emit_high_build_warning(bundle_path, max_turns_execute_analyze=120) captured = capsys.readouterr() assert "max_turns.execute_analyze" not in captured.out + + +@pytest.mark.parametrize("total_files,should_warn", [ + (3, False), + (4, False), # threshold boundary: 4 is silent + (5, True), # threshold boundary: 5 trips the warning + (6, True), +]) +def test_f11_warning_threshold_boundary( + tmp_path: Path, capsys, total_files: int, should_warn: bool, +): + """The threshold is `total_files >= 5` (line 881). Pin both sides + of the boundary to catch off-by-one regressions.""" + from orchestrator.iteration import _emit_high_build_warning + bundle_path = tmp_path / f"bundle-{total_files}.yaml" + bundle_path.write_text(yaml.safe_dump({ + "arms": [{"type": "h-main", "code_changes": [ + {"file": f"f{i}.go", "intent": "x", "rationale": "y"} + for i in range(total_files) + ]}], + })) + _emit_high_build_warning(bundle_path, max_turns_execute_analyze=120) + captured = capsys.readouterr() + assert ("max_turns.execute_analyze" in captured.out) is should_warn + + +def test_f11_suggestion_formula_matches_120_plus_30_per_file( + tmp_path: Path, capsys, +): + """Pin the suggested raise-target so a regression of the formula + (currently 120 + 30 * total_files) doesn't silently drift.""" + from orchestrator.iteration import _emit_high_build_warning + bundle_path = tmp_path / "bundle.yaml" + bundle_path.write_text(yaml.safe_dump({ + "arms": [{"type": "h-main", "code_changes": [ + {"file": f"f{i}.go", "intent": "x", "rationale": "y"} + for i in range(6) + ]}], + })) + _emit_high_build_warning(bundle_path, max_turns_execute_analyze=120) + captured = capsys.readouterr() + # 120 + 30*6 = 300 + assert "300" in captured.out + + +def test_f11_no_warning_when_operator_already_raised( + tmp_path: Path, capsys, +): + """Caller-side suppression: if max_turns.execute_analyze is already + at-or-above the suggested target, the heuristic stays silent.""" + from orchestrator.iteration import _emit_high_build_warning + bundle_path = tmp_path / "bundle.yaml" + bundle_path.write_text(yaml.safe_dump({ + "arms": [{"type": "h-main", "code_changes": [ + {"file": f"f{i}.go", "intent": "x", "rationale": "y"} + for i in range(6) + ]}], + })) + # Suggested = 120 + 30*6 = 300; operator at 400 should be silent. + _emit_high_build_warning(bundle_path, max_turns_execute_analyze=400) + captured = capsys.readouterr() + assert "max_turns.execute_analyze" not in captured.out + + +# ─── F4 / #249: campaign_spec_diff under --auto-approve, integration ────── + + +def test_f4_augment_writes_spec_diff_under_auto_approve(tmp_path: Path): + """End-to-end: when _augment_summary_with_spec_diff runs (whether + the LLM summarizer succeeded or failed), the JSON has both + `campaign_spec_diff` and `auto_approved=True`. This is the + headline F4 contract. + """ + from orchestrator.iteration import _augment_summary_with_spec_diff + iter_dir = tmp_path / "iter-1" + iter_dir.mkdir() + bundle = { + "metadata": {"iteration": 1, "family": "f", "research_question": "q"}, + "arms": [{"type": "h-main", "prediction": "p", "mechanism": "m", "diagnostic": "d"}], + "experiment_spec": {"verified_parameters": {"model": "qwen"}}, + } + (iter_dir / "bundle.yaml").write_text(yaml.safe_dump(bundle)) + summary_path = iter_dir / "gate_summary_design.json" + # Pre-existing summary from a (hypothetical) successful summarizer. + summary_path.write_text(json.dumps( + {"gate_type": "design", "summary": "ok", "key_points": []} + )) + campaign = {"locked_parameters": {"model": "llama"}} + _augment_summary_with_spec_diff( + summary_path, iter_dir, campaign, auto_approve=True, stub=False, + ) + payload = json.loads(summary_path.read_text()) + assert payload["auto_approved"] is True + assert "campaign_spec_diff" in payload + diff = payload["campaign_spec_diff"] + assert any( + v["param"] == "model" and v["bundle"] == "qwen" + for v in diff["locked_parameters_violations"] + ) + + +def test_f4_augment_emits_stub_when_summarizer_failed(tmp_path: Path): + """When the LLM summarizer fails (stub=True path), the spec diff + is STILL emitted into a fresh stub summary file. F4 must work + even when the LLM block is unavailable.""" + from orchestrator.iteration import _augment_summary_with_spec_diff + iter_dir = tmp_path / "iter-1" + iter_dir.mkdir() + bundle = { + "metadata": {"iteration": 1, "family": "f", "research_question": "q"}, + "arms": [{"type": "h-main", "prediction": "p", "mechanism": "m", "diagnostic": "d"}], + "experiment_spec": {"verified_parameters": {"model": "llama"}}, + } + (iter_dir / "bundle.yaml").write_text(yaml.safe_dump(bundle)) + summary_path = iter_dir / "gate_summary_design.json" + # No pre-existing summary — stub=True path. + _augment_summary_with_spec_diff( + summary_path, iter_dir, campaign={"locked_parameters": {"model": "llama"}}, + auto_approve=False, stub=True, + ) + payload = json.loads(summary_path.read_text()) + assert "campaign_spec_diff" in payload + assert payload["auto_approved"] is False + + +# ─── F19 / #264: behavioral test of _resolve_turn_silence_threshold ──────── + + +def _build_dispatcher(campaign: dict): + """Construct an SDKDispatcher without actually starting any SDK + work — we only need the threshold-resolution attributes + populated. Bypasses the live-call guards by injecting a no-op + sdk_runner. + """ + from orchestrator.sdk_dispatch import SDKDispatcher, SDKResult + + def _noop_runner(**kwargs): + return SDKResult(text="", input_tokens=0, output_tokens=0, + cache_creation_input_tokens=0, + cache_read_input_tokens=0, + cost_usd=0.0, duration_ms=0, num_turns=0, + is_error=False, error_message=None) + # The dispatcher's _validate_campaign requires target_system. + # Inject a minimal one — the threshold-resolution path doesn't + # care about its contents. + full_campaign = { + "target_system": {"name": "x", "description": "d"}, + **campaign, + } + return SDKDispatcher( + work_dir=Path("/tmp/nonexistent-workdir-for-test"), + campaign=full_campaign, + sdk_runner=_noop_runner, + ) + + +def test_f19_resolve_turn_silence_threshold_per_phase_map(): + """Per-phase map: each phase returns its declared value.""" + dispatcher = _build_dispatcher({ + "sdk_timeouts": {"turn_silence_threshold_seconds": { + "design": 800, "execute_analyze": 90, "report": 200, + }}, + }) + assert dispatcher._resolve_turn_silence_threshold("design") == 800 + assert dispatcher._resolve_turn_silence_threshold("execute_analyze") == 90 + assert dispatcher._resolve_turn_silence_threshold("report") == 200 + + +def test_f19_resolve_turn_silence_threshold_partial_map_falls_back_to_default(): + """A partial map — only design set — leaves execute_analyze and + report at their hardcoded defaults (120, 240).""" + dispatcher = _build_dispatcher({ + "sdk_timeouts": {"turn_silence_threshold_seconds": {"design": 999}}, + }) + assert dispatcher._resolve_turn_silence_threshold("design") == 999 + assert dispatcher._resolve_turn_silence_threshold("execute_analyze") == 120 + assert dispatcher._resolve_turn_silence_threshold("report") == 240 + + +def test_f19_resolve_turn_silence_threshold_scalar_back_compat(): + """Scalar form applies the same value to every phase.""" + dispatcher = _build_dispatcher({ + "sdk_timeouts": {"turn_silence_threshold_seconds": 333}, + }) + assert dispatcher._resolve_turn_silence_threshold("design") == 333 + assert dispatcher._resolve_turn_silence_threshold("execute_analyze") == 333 + assert dispatcher._resolve_turn_silence_threshold("report") == 333 + + +def test_f19_resolve_turn_silence_threshold_no_config_uses_phase_defaults(): + """No sdk_timeouts at all → hardcoded per-phase defaults.""" + dispatcher = _build_dispatcher({}) + assert dispatcher._resolve_turn_silence_threshold("design") == 600 + assert dispatcher._resolve_turn_silence_threshold("execute_analyze") == 120 + assert dispatcher._resolve_turn_silence_threshold("report") == 240 + + +# ─── F17 / #262: idempotency + repo_dirty ────────────────────────────────── + + +def test_f17_attach_to_state_first_capture_wins(tmp_path: Path): + """Re-running INIT preserves the first capture.""" + from orchestrator.reproducibility import attach_to_state + state_path = tmp_path / "state.json" + initial_block = { + "captured_at": "2026-05-01T00:00:00Z", + "repo_commit": "aaaaaaaaaaaa", + } + state_path.write_text(json.dumps({ + "run_id": "x", "reproducibility_metadata": initial_block, + })) + later_block = { + "captured_at": "2026-06-01T00:00:00Z", + "repo_commit": "bbbbbbbbbbbb", + } + attach_to_state(tmp_path, later_block) + state = json.loads(state_path.read_text()) + assert state["reproducibility_metadata"]["repo_commit"] == "aaaaaaaaaaaa" + assert state["reproducibility_metadata"]["captured_at"] == "2026-05-01T00:00:00Z" + + +def test_f17_attach_to_state_writes_when_missing(tmp_path: Path): + """On a fresh state.json with no metadata, write the block.""" + from orchestrator.reproducibility import attach_to_state + state_path = tmp_path / "state.json" + state_path.write_text(json.dumps({"run_id": "x"})) + block = {"captured_at": "2026-06-01T00:00:00Z", "repo_commit": "abc"} + attach_to_state(tmp_path, block) + state = json.loads(state_path.read_text()) + assert state["reproducibility_metadata"] == block + + +def test_f17_attach_to_state_raises_on_malformed_state(tmp_path: Path): + """Loud failure: a corrupt state.json is a real defect, not + something to swallow.""" + from orchestrator.reproducibility import attach_to_state + (tmp_path / "state.json").write_text("not valid json {") + with pytest.raises(RuntimeError, match="malformed"): + attach_to_state(tmp_path, {"captured_at": "x"}) + + +def test_f17_repo_dirty_false_on_clean_tree(tmp_path: Path): + """When `git status --porcelain` returns empty, repo_dirty=False.""" + from orchestrator.reproducibility import capture_reproducibility_metadata + repo = tmp_path / "repo" + repo.mkdir() + with patch("subprocess.run") as mock_run: + # rev-parse HEAD → ok; status --porcelain → empty. + mock_run.side_effect = lambda *args, **kwargs: subprocess.CompletedProcess( + args=args[0], + returncode=0, + stdout="abc1234\n" if "rev-parse" in args[0] else "", + stderr="", + ) + block = capture_reproducibility_metadata(repo) + assert block["repo_dirty"] is False + + +def test_f17_repo_dirty_true_on_modified_tree(tmp_path: Path): + """When porcelain reports changes, repo_dirty=True.""" + from orchestrator.reproducibility import capture_reproducibility_metadata + repo = tmp_path / "repo" + repo.mkdir() + def _fake_run(args, **kwargs): + if "status" in args: + return subprocess.CompletedProcess( + args=args, returncode=0, stdout=" M file.go\n", stderr="", + ) + if "rev-parse" in args: + return subprocess.CompletedProcess( + args=args, returncode=0, stdout="abc1234\n", stderr="", + ) + return subprocess.CompletedProcess(args=args, returncode=1, stdout="", stderr="") + with patch("subprocess.run", side_effect=_fake_run): + block = capture_reproducibility_metadata(repo) + assert block["repo_dirty"] is True + + +def test_f17_snapshot_iter_files_copies_hardware_config(tmp_path: Path): + """snapshot_iter_files copies hardware_config.json into iter_dir/snapshots/.""" + from orchestrator.reproducibility import snapshot_iter_files + repo = tmp_path / "repo" + repo.mkdir() + (repo / "hardware_config.json").write_text('{"H100": {}}') + iter_dir = tmp_path / "iter-1" + iter_dir.mkdir() + written = snapshot_iter_files(repo, iter_dir) + assert "hardware_config.json" in written + snapshot = iter_dir / "snapshots" / "hardware_config.json" + assert snapshot.exists() + assert snapshot.read_text() == '{"H100": {}}' + + +# ─── F12 / #257: aclose race-condition explicit cleanup contract ─────────── + + +def test_f12_aclose_runtime_error_is_swallowed(): + """When the underlying generator raises the documented "already + running" RuntimeError on aclose, the watchdog must NOT propagate + it (would mask the original failure that triggered the cleanup).""" + import asyncio as _asyncio + + from orchestrator.sdk_dispatch import aiter_with_silence_watchdog + + class _RacingAclose: + """Mimics an async iterator that raises RuntimeError on aclose.""" + + def __aiter__(self): + return self + + async def __anext__(self): + raise StopAsyncIteration + + async def aclose(self): + raise RuntimeError( + "aclose(): asynchronous generator is already running" + ) + + async def _drive(): + async for _ in aiter_with_silence_watchdog(_RacingAclose(), threshold=None): + pass + + # No exception should escape — the RuntimeError is in the + # documented swallow set. + _asyncio.run(_drive()) + + +def test_f12_aclose_arbitrary_exception_does_not_mask_primary(): + """A non-documented aclose exception is logged but doesn't + propagate (we're in a finally; the primary exception should + survive).""" + import asyncio as _asyncio + + from orchestrator.sdk_dispatch import aiter_with_silence_watchdog + + class _BrokenAclose: + def __aiter__(self): + return self + + async def __anext__(self): + raise StopAsyncIteration + + async def aclose(self): + raise ValueError("simulated cleanup defect") + + async def _drive(): + async for _ in aiter_with_silence_watchdog(_BrokenAclose(), threshold=None): + pass + + # ValueError is in the broad fallback (now logged); should not + # propagate. + _asyncio.run(_drive()) + + +# ─── F21 / #266: apply_derived_from_patch round-trip ─────────────────────── + + +def test_f21_apply_derived_from_patch_succeeds_with_clean_apply(tmp_path: Path): + from orchestrator.lineage import apply_derived_from_patch + patch_path = tmp_path / "p.patch" + patch_path.write_text("dummy") + with patch_subprocess(returncodes=[0, 0]) as calls: + ok, msg = apply_derived_from_patch(tmp_path, patch_path) + assert ok is True + assert "applied" in msg + # Two calls: --check + apply. + assert len(calls) == 2 + + +def test_f21_apply_derived_from_patch_fails_when_check_rejects(tmp_path: Path): + from orchestrator.lineage import apply_derived_from_patch + patch_path = tmp_path / "p.patch" + patch_path.write_text("dummy") + with patch_subprocess(returncodes=[1], stderrs=["error: patch does not apply"]): + ok, msg = apply_derived_from_patch(tmp_path, patch_path) + assert ok is False + assert "does not apply cleanly" in msg + + +def test_f21_full_round_trip(tmp_path: Path, monkeypatch): + """Emit cumulative.patch in campaign A, resolve it from campaign + B's derived_from, apply it. Single end-to-end contract test.""" + from orchestrator.lineage import ( + apply_derived_from_patch, + emit_cumulative_patch, + resolve_derived_from, + ) + # Stage prior campaign. + prior_work = tmp_path / "prior" + iter_dir = prior_work / "runs" / "iter-1" + iter_dir.mkdir(parents=True) + monkeypatch.setenv("NOUS_CAMPAIGN_PARENT", str(tmp_path)) + + # Emit cumulative.patch by stubbing git. + with patch_subprocess( + returncodes=[0, 0, 0], + stdouts=["origin/main\n", "diff --git a/x b/x\n+a\n", ""], + ): + path = emit_cumulative_patch(prior_work, "nous-exp-x", iter_dir) + assert path is not None + assert path.read_text().startswith("diff --git") + + # Now resolve from a campaign that points at "prior". + campaign = {"derived_from": {"campaign": "prior", "iteration": 1}} + resolved = resolve_derived_from(campaign) + assert resolved == path + + # And apply (stubbed git apply succeeds). + with patch_subprocess(returncodes=[0, 0]): + ok, msg = apply_derived_from_patch(tmp_path, resolved) + assert ok is True + + +def test_f21_emit_cumulative_patch_writes_error_sidecar_on_failure(tmp_path: Path): + """When git diff fails, a sidecar at patches/cumulative.patch.error + is written so summarize_lineage can surface the failure.""" + from orchestrator.lineage import emit_cumulative_patch + iter_dir = tmp_path + with patch_subprocess( + returncodes=[0, 1], + stdouts=["origin/main\n", ""], + stderrs=["", "fatal: bad revision"], + ): + result = emit_cumulative_patch(tmp_path, "nous-exp-x", iter_dir) + assert result is None + sidecar = iter_dir / "patches" / "cumulative.patch.error" + assert sidecar.exists() + assert "fatal: bad revision" in sidecar.read_text() + + +# ─── Helpers ─────────────────────────────────────────────────────────────── + + +from contextlib import contextmanager + + +@contextmanager +def patch_subprocess( + returncodes: list[int], + stdouts: list[str] | None = None, + stderrs: list[str] | None = None, +): + """Context manager that patches ``subprocess.run`` to return a + sequence of CompletedProcess objects with the given returncodes + (and optional stdout/stderr). Captures the calls in a list and + yields it to the caller for assertion. + """ + stdouts = stdouts or [""] * len(returncodes) + stderrs = stderrs or [""] * len(returncodes) + calls: list = [] + + def _fake_run(args, **kwargs): + calls.append(args) + i = len(calls) - 1 + return subprocess.CompletedProcess( + args=args, + returncode=returncodes[i] if i < len(returncodes) else 0, + stdout=stdouts[i] if i < len(stdouts) else "", + stderr=stderrs[i] if i < len(stderrs) else "", + ) + + with patch("subprocess.run", side_effect=_fake_run): + yield calls