-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun_iteration.py
More file actions
522 lines (461 loc) · 21.1 KB
/
run_iteration.py
File metadata and controls
522 lines (461 loc) · 21.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
#!/usr/bin/env python3
"""Run a single Nous iteration.
Usage:
python run_iteration.py examples/campaign.yaml
Creates a working directory named after the target system, copies templates,
and runs one full iteration with human gates for approval.
Set your LLM API key before running:
export OPENAI_API_KEY=sk-...
(or set OPENAI_BASE_URL for a proxy endpoint)
"""
import argparse
import json
import logging
import re
import shutil
import sys
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
import jsonschema
import yaml
from orchestrator.engine import Engine
from orchestrator.gates import HumanGate
from orchestrator.llm_dispatch import LLMDispatcher
from orchestrator.util import atomic_write
logger = logging.getLogger(__name__)
class IterationOutcome(str, Enum):
"""Outcome of a single iteration — used by run_campaign to decide next step."""
COMPLETED = "COMPLETED" # Final iteration, transitioned to DONE
CONTINUE = "CONTINUE" # Non-final iteration, stopped before DONE
ABORTED = "ABORTED" # Human aborted at a gate
REDESIGN = "REDESIGN" # Human rejected, needs redesign
TEMPLATES_DIR = Path(__file__).parent / "templates"
SCHEMAS_DIR = Path(__file__).parent / "schemas"
DEFAULTS_PATH = Path(__file__).parent / "defaults.yaml"
_ARM_TYPE_RE = re.compile(r"^[a-zA-Z0-9_-]+$")
# Phase ordering for resume logic
_PHASE_ORDER = [
"INIT", "DESIGN", "HUMAN_DESIGN_GATE",
"EXECUTE_ANALYZE", "HUMAN_FINDINGS_GATE",
"DONE",
]
_PHASE_INDEX = {p: i for i, p in enumerate(_PHASE_ORDER)}
def _save_human_feedback(iter_dir: Path, phase: str, reason: str) -> None:
"""Append human gate feedback to structured human_feedback.json."""
logger = logging.getLogger(__name__)
fb_path = iter_dir / "human_feedback.json"
if fb_path.exists():
try:
store = json.loads(fb_path.read_text())
except json.JSONDecodeError as exc:
logger.warning(
"Corrupt human_feedback.json at %s: %s. "
"Prior feedback entries will be lost.",
fb_path, exc,
)
store = {"design": [], "findings": []}
else:
store = {"design": [], "findings": []}
if not isinstance(store, dict):
logger.warning(
"human_feedback.json at %s has unexpected type %s. "
"Prior feedback entries will be lost.",
fb_path, type(store).__name__,
)
store = {"design": [], "findings": []}
entries = store.setdefault(phase, [])
entries.append({
"attempt": len(entries) + 1,
"reason": reason,
"timestamp": datetime.now(timezone.utc).isoformat(),
})
atomic_write(fb_path, json.dumps(store, indent=2) + "\n")
_YAML_FENCE_RE = re.compile(r"```yaml\s*\n(.*?)```", re.DOTALL | re.IGNORECASE)
_HANDOFF_RE = re.compile(r"^#{1,3}\s*Handoff\s*:?\s*$", re.MULTILINE | re.IGNORECASE)
def _split_design_output(raw: str, iter_dir: Path) -> None:
"""Split merged design output into problem.md, bundle.yaml, and handoff.md."""
# Extract handoff FIRST to avoid yaml fences in handoff confusing bundle parsing
handoff_md = ""
handoff_match = _HANDOFF_RE.search(raw)
if handoff_match:
handoff_md = raw[handoff_match.start():].strip()
raw_before_handoff = raw[:handoff_match.start()]
else:
logger.warning(
"Design output does not contain a '## Handoff' section. "
"Executor will run without designer context."
)
raw_before_handoff = raw
matches = _YAML_FENCE_RE.findall(raw_before_handoff)
if not matches:
raise RuntimeError(
"Design agent did not produce a ```yaml``` code fence. "
"Cannot extract hypothesis bundle from response."
)
bundle_yaml_str = matches[-1]
bundle = yaml.safe_load(bundle_yaml_str)
if not isinstance(bundle, dict):
raise RuntimeError(
f"Expected YAML object from design agent, got {type(bundle).__name__}"
)
schema = yaml.safe_load((SCHEMAS_DIR / "bundle.schema.yaml").read_text())
jsonschema.validate(bundle, schema)
last_fence_start = raw_before_handoff.rfind("```yaml")
if last_fence_start == -1:
last_fence_start = raw_before_handoff.rfind("```YAML")
problem_md = raw_before_handoff[:last_fence_start].rstrip()
if problem_md.endswith("---"):
problem_md = problem_md[:-3].rstrip()
iter_dir.mkdir(parents=True, exist_ok=True)
atomic_write(iter_dir / "problem.md", problem_md + "\n")
atomic_write(
iter_dir / "bundle.yaml",
yaml.safe_dump(bundle, default_flow_style=False, sort_keys=False),
)
if handoff_md:
# Save per-iteration snapshot for audit
atomic_write(iter_dir / "handoff_snapshot.md", handoff_md + "\n")
# Update campaign-level handoff (the living document)
atomic_write(iter_dir.parent.parent / "handoff.md", handoff_md + "\n")
def _enter_phase(engine, phase):
"""Transition to phase if needed. Returns True if phase work should run."""
current_idx = _PHASE_INDEX[engine.phase]
target_idx = _PHASE_INDEX[phase]
if current_idx > target_idx:
return False
if engine.phase != phase:
engine.transition(phase)
return True
def _merge_principles(work_dir: Path, iter_dir: Path) -> None:
"""Merge principle_updates.json into the shared principles.json store."""
updates_path = iter_dir / "principle_updates.json"
if not updates_path.exists():
return
updates = json.loads(updates_path.read_text())
if not updates:
return
if not isinstance(updates, list):
raise RuntimeError(
f"principle_updates.json should be a list, got {type(updates).__name__}. "
f"Check {updates_path}"
)
for i, p in enumerate(updates):
if not isinstance(p, dict) or "id" not in p:
raise RuntimeError(f"principle_updates.json entry {i} missing 'id': {p!r:.200}")
principles_path = work_dir / "principles.json"
if principles_path.exists():
store = json.loads(principles_path.read_text())
else:
store = {"principles": []}
existing = {p["id"]: p for p in store["principles"]}
for p in updates:
existing[p["id"]] = p
store["principles"] = list(existing.values())
atomic_write(principles_path, json.dumps(store, indent=2) + "\n")
def setup_work_dir(run_id: str, repo_path: str | None = None) -> Path:
"""Create and initialize a working directory from templates.
If repo_path is provided, the campaign directory is created inside
the target repo at .nous/<run_id>/. Otherwise falls back to creating
<run_id>/ in the current directory.
"""
if repo_path:
work_dir = Path(repo_path) / ".nous" / run_id
else:
work_dir = Path(run_id)
work_dir.mkdir(parents=True, exist_ok=True)
for t in ["state.json", "ledger.json", "principles.json"]:
dest = work_dir / t
if not dest.exists():
shutil.copy(TEMPLATES_DIR / t, dest)
state = json.loads((work_dir / "state.json").read_text())
state["run_id"] = run_id
atomic_write(work_dir / "state.json", json.dumps(state, indent=2) + "\n")
return work_dir
def _generate_gate_summary(
dispatcher, iter_dir: Path, iteration: int, gate_type: str,
) -> Path | None:
"""Generate a gate summary file. Returns the path, or None on failure."""
summary_path = iter_dir / f"gate_summary_{gate_type}.json"
try:
dispatcher.dispatch(
"summarizer", "summarize-gate",
output_path=summary_path,
iteration=iteration,
perspective=gate_type,
)
return summary_path
except (RuntimeError, FileNotFoundError, OSError) as exc:
logger = logging.getLogger(__name__)
logger.warning("Gate summary generation failed: %s", exc)
print(f" (Gate summary skipped: {exc})")
return None
def run_iteration(
campaign: dict,
work_dir: Path,
iteration: int = 1,
model: str | None = None,
final: bool = True,
auto_approve: bool = False,
timeout: int = 1800,
max_cli_retries: int | None = None,
) -> IterationOutcome:
"""Run a single iteration of the Nous loop.
Phases: DESIGN → HUMAN_DESIGN_GATE → EXECUTE_ANALYZE → HUMAN_FINDINGS_GATE → DONE
Args:
final: If True (default), transitions to DONE after principle merge.
auto_approve: If True, all human gates are automatically approved.
Returns:
An IterationOutcome value: COMPLETED, CONTINUE, ABORTED, or REDESIGN.
"""
engine = Engine(work_dir)
repo_path = campaign.get("target_system", {}).get("repo_path")
# Load defaults.yaml, then overlay campaign.models
defaults = {}
if DEFAULTS_PATH.exists():
defaults = yaml.safe_load(DEFAULTS_PATH.read_text()) or {}
default_models = defaults.get("models", {})
default_max_turns = defaults.get("max_turns", {})
campaign_models = campaign.get("models", {})
def _model_for(phase_key: str) -> str:
return campaign_models.get(phase_key) or default_models.get(phase_key) or model or "aws/claude-sonnet-4-5"
def _max_turns_for(phase_key: str) -> int:
return default_max_turns.get(phase_key, 25)
# CLIDispatcher for code-access roles; LLMDispatcher for API-only phases
from orchestrator.cli_dispatch import CLIDispatcher
cli_dispatcher = (
CLIDispatcher(
work_dir=work_dir, campaign=campaign,
model=_model_for("design"), timeout=timeout,
max_turns=_max_turns_for("design"),
max_retries=max_cli_retries,
) if repo_path else None
)
llm_dispatcher = LLMDispatcher(work_dir=work_dir, campaign=campaign, model=_model_for("design"))
gate = HumanGate(auto_response="approve") if auto_approve else HumanGate()
iter_dir = work_dir / "runs" / f"iter-{iteration}"
if engine.phase == "DONE":
print(f"Iteration {iteration} already complete.")
return IterationOutcome.COMPLETED
if engine.phase != "INIT":
print(f"\n Resuming from {engine.phase}\n")
# ─── DESIGN ───────────────────────────────────────────────────────────
if _enter_phase(engine, "DESIGN"):
print(f"\n{'='*60}")
print(f" DESIGN — exploring system and creating hypothesis bundle")
print(f"{'='*60}")
design_dispatcher = cli_dispatcher or llm_dispatcher
if cli_dispatcher:
# CLI path: agent writes files directly to iter_dir
design_dispatcher.dispatch(
"planner", "design",
output_path=iter_dir / "design_log.md", iteration=iteration,
)
else:
# LLM API path or stub: dispatch and check if files were written directly
output_file = iter_dir / "design_raw.md"
design_dispatcher.dispatch(
"planner", "design",
output_path=output_file, iteration=iteration,
)
# If the dispatcher wrote individual files (StubDispatcher),
# skip the text split. Otherwise parse the merged output.
if not (iter_dir / "bundle.yaml").exists():
raw_response = output_file.read_text()
_split_design_output(raw_response, iter_dir)
output_file.unlink()
# Validate design artifacts regardless of dispatch path
from orchestrator.validate import validate_design
result = validate_design(iter_dir)
if result["status"] == "fail":
raise RuntimeError(
f"Design artifacts failed validation:\n"
+ "\n".join(f" - {e}" for e in result["errors"])
)
print(f" -> {iter_dir / 'problem.md'}")
print(f" -> {iter_dir / 'bundle.yaml'}")
# ─── HUMAN DESIGN GATE ────────────────────────────────────────────────
if _enter_phase(engine, "HUMAN_DESIGN_GATE"):
print(f"\n{'='*60}")
print(f" HUMAN DESIGN GATE")
print(f"{'='*60}")
summary_path = _generate_gate_summary(llm_dispatcher, iter_dir, iteration, "design")
decision, reason = gate.prompt(
"Review the hypothesis bundle. Approve?",
summary_path=str(summary_path) if summary_path else None,
files=[str(iter_dir / "bundle.yaml"), str(iter_dir / "problem.md")],
)
if decision == "reject":
_save_human_feedback(iter_dir, "design", reason or "(Rejected without specific feedback)")
print("Design rejected. Re-run after revising.")
engine.transition("DESIGN")
return IterationOutcome.REDESIGN
if decision == "abort":
print("Aborted.")
return IterationOutcome.ABORTED
# ─── EXECUTE + ANALYZE ────────────────────────────────────────────────
experiment_dir = experiment_id = None
if _enter_phase(engine, "EXECUTE_ANALYZE"):
print(f"\n{'='*60}")
print(f" EXECUTE + ANALYZE — building, running, and analyzing")
print(f"{'='*60}")
if cli_dispatcher:
cli_dispatcher.model = _model_for("execute_analyze")
cli_dispatcher.max_turns = _max_turns_for("execute_analyze")
exec_dispatcher = cli_dispatcher or llm_dispatcher
try:
if repo_path:
from orchestrator.worktree import (
create_experiment_worktree,
remove_experiment_worktree,
)
experiment_dir, experiment_id = create_experiment_worktree(
Path(repo_path), iteration,
)
(iter_dir / ".experiment_id").write_text(experiment_id)
print(f" Experiment worktree: {experiment_dir}")
if cli_dispatcher:
import contextlib
ctx = cli_dispatcher.override_cwd(experiment_dir) if experiment_dir else contextlib.nullcontext()
with ctx:
exec_dispatcher.dispatch(
"executor", "execute-analyze",
output_path=iter_dir / "executor_log.md",
iteration=iteration,
)
else:
# LLM API path or stub: dispatch and check if files were written directly
output_file = iter_dir / "execute_analyze_output.json"
exec_dispatcher.dispatch(
"executor", "execute-analyze",
output_path=output_file,
iteration=iteration,
)
# If the dispatcher wrote individual files (StubDispatcher),
# skip the JSON split. Otherwise parse the combined blob.
if not (iter_dir / "findings.json").exists():
combined = json.loads(output_file.read_text())
missing = {"plan", "findings", "principle_updates"} - set(combined.keys())
if missing:
raise RuntimeError(
f"execute-analyze output missing keys: {sorted(missing)}"
)
atomic_write(
iter_dir / "experiment_plan.yaml",
yaml.safe_dump(combined["plan"], default_flow_style=False, sort_keys=False),
)
atomic_write(
iter_dir / "findings.json",
json.dumps(combined["findings"], indent=2) + "\n",
)
atomic_write(
iter_dir / "principle_updates.json",
json.dumps(combined["principle_updates"], indent=2) + "\n",
)
# Validate artifacts regardless of dispatch path
from orchestrator.validate import validate_execution
result = validate_execution(iter_dir)
if result["status"] == "fail":
raise RuntimeError(
f"Executor artifacts failed validation:\n"
+ "\n".join(f" - {e}" for e in result["errors"])
)
finally:
if repo_path and experiment_id:
from orchestrator.worktree import remove_experiment_worktree
remove_experiment_worktree(Path(repo_path), experiment_id)
# Validate findings schema
findings_path = iter_dir / "findings.json"
if not findings_path.exists():
raise RuntimeError(f"{findings_path} not found.")
findings = json.loads(findings_path.read_text())
findings_schema = json.loads((SCHEMAS_DIR / "findings.schema.json").read_text())
try:
jsonschema.validate(findings, findings_schema)
except jsonschema.ValidationError as exc:
raise RuntimeError(
f"findings.json failed schema validation: {exc.message}"
) from exc
# ─── HUMAN FINDINGS GATE ──────────────────────────────────────────────
if _enter_phase(engine, "HUMAN_FINDINGS_GATE"):
print(f"\n{'='*60}")
print(f" HUMAN FINDINGS GATE")
print(f"{'='*60}")
summary_path = _generate_gate_summary(llm_dispatcher, iter_dir, iteration, "findings")
decision, reason = gate.prompt(
"Review the findings. Approve?",
summary_path=str(summary_path) if summary_path else None,
files=[str(iter_dir / "findings.json")],
)
if decision == "reject":
_save_human_feedback(iter_dir, "findings", reason or "(Rejected without specific feedback)")
print("Findings rejected. Re-running execution.")
engine.transition("EXECUTE_ANALYZE")
return IterationOutcome.REDESIGN
if decision == "abort":
print("Aborted.")
return IterationOutcome.ABORTED
# ─── PRINCIPLE MERGE (Python, no LLM) ─────────────────────────────────
_merge_principles(work_dir, iter_dir)
print(f" -> Principles merged into {work_dir / 'principles.json'}")
if final:
engine.transition("DONE")
print(f"\n{'='*60}")
print(f" DONE — iteration {iteration} complete")
print(f"{'='*60}")
print(f"\nOutput in: {iter_dir}")
print(f"Principles: {work_dir / 'principles.json'}")
return IterationOutcome.COMPLETED
else:
print(f"\n Iteration {iteration} complete — ready for next iteration.")
return IterationOutcome.CONTINUE
def main() -> None:
parser = argparse.ArgumentParser(
description="Run a single Nous iteration.",
epilog="Example: python run_iteration.py examples/campaign.yaml",
)
parser.add_argument("campaign", help="Path to campaign.yaml")
parser.add_argument("--model", default=None,
help="Fallback model name (default: from defaults.yaml)")
parser.add_argument("--run-id", default=None,
help="Working directory name (default: derived from campaign)")
parser.add_argument("--auto-approve", action="store_true",
help="Auto-approve all human gates (skip interactive prompts)")
parser.add_argument("--timeout", type=int, default=1800,
help="Timeout in seconds for claude -p calls (default: 1800)")
parser.add_argument("--max-cli-retries", type=int, default=10,
help="Max retries for transient claude -p failures (default: 10; 0 to disable; -1 for unlimited)")
parser.add_argument("-v", "--verbose", action="store_true",
help="Enable debug logging")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
campaign_path = Path(args.campaign)
if not campaign_path.exists():
print(f"Error: {campaign_path} not found", file=sys.stderr)
sys.exit(1)
campaign = yaml.safe_load(campaign_path.read_text())
schema = yaml.safe_load((SCHEMAS_DIR / "campaign.schema.yaml").read_text())
try:
jsonschema.validate(campaign, schema)
except jsonschema.ValidationError as exc:
print(
f"Error: {campaign_path} is not a valid campaign config.\n"
f" {exc.message}\n\n"
f"See examples/campaign.yaml for a working example.",
file=sys.stderr,
)
sys.exit(1)
run_id = args.run_id or campaign.get("run_id") or campaign_path.parent.name + "-run"
repo_path = campaign.get("target_system", {}).get("repo_path")
work_dir = setup_work_dir(run_id, repo_path=repo_path)
print(f"Working directory: {work_dir.resolve()}")
run_iteration(
campaign, work_dir, model=args.model,
auto_approve=args.auto_approve, timeout=args.timeout,
max_cli_retries=None if args.max_cli_retries == -1 else args.max_cli_retries,
)
if __name__ == "__main__":
main()