Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions docker-compose.local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@ services:
build:
context: .
dockerfile: Dockerfile
env_file: .env
environment:
- AGENTFIELD_SERVER=http://host.docker.internal:8080
- NODE_ID=swe-planner
- PORT=8003
# Callback URL for control plane to reach this agent
- AGENT_CALLBACK_URL=http://localhost:8003
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- CLAUDE_CODE_OAUTH_TOKEN=${CLAUDE_CODE_OAUTH_TOKEN}
- GH_TOKEN=${GH_TOKEN}
ports:
- "8003:8003"
volumes:
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ dependencies = [
# Compatibility pin: newer SDK builds have surfaced
# "Unknown message type: rate_limit_event" during streaming.
"claude-agent-sdk==0.1.20",
# HITL plan-approval gate (auto-enabled when HAX_API_KEY is set).
"hax-sdk>=0.2.0",
"python-dotenv>=1.0",
]

[project.optional-dependencies]
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@
agentfield>=0.1.77
pydantic>=2.0
claude-agent-sdk==0.1.20
hax-sdk>=0.2.0
python-dotenv>=1.0
258 changes: 258 additions & 0 deletions swe_af/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
import subprocess
import uuid

from dotenv import load_dotenv

load_dotenv() # surface HAX_API_KEY (and friends) before Agent() is constructed

from swe_af.reasoners import router
from swe_af.reasoners.pipeline import _assign_sequence_numbers, _compute_levels, _validate_file_conflicts
from swe_af.reasoners.schemas import PlanResult, ReviewResult
Expand Down Expand Up @@ -300,6 +304,56 @@ async def _run_ci_gate(
}


def _format_plan_for_approval(
plan_result: dict,
) -> tuple[str, str, str, list[dict]]:
"""Format plan_result into the fields the hax-sdk plan-review-v2 template expects."""
plan_summary = plan_result.get("rationale", "")
prd_data = plan_result.get("prd", {})
arch_data = plan_result.get("architecture", {})

prd_md_parts: list[str] = []
if prd_data.get("validated_description"):
prd_md_parts.append(f"## Description\n{prd_data['validated_description']}")
if prd_data.get("must_have"):
prd_md_parts.append("## Must Have\n" + "\n".join(f"- {item}" for item in prd_data["must_have"]))
if prd_data.get("nice_to_have"):
prd_md_parts.append("## Nice to Have\n" + "\n".join(f"- {item}" for item in prd_data["nice_to_have"]))
if prd_data.get("acceptance_criteria"):
prd_md_parts.append("## Acceptance Criteria\n" + "\n".join(f"- {item}" for item in prd_data["acceptance_criteria"]))
prd_markdown = "\n\n".join(prd_md_parts)

arch_md_parts: list[str] = []
if arch_data.get("summary"):
arch_md_parts.append(f"## Summary\n{arch_data['summary']}")
if arch_data.get("components"):
arch_md_parts.append("## Components")
for comp in arch_data["components"]:
arch_md_parts.append(f"### {comp.get('name', 'Component')}\n{comp.get('responsibility', '')}")
if comp.get("touches_files"):
arch_md_parts.append("Files: " + ", ".join(f"`{f}`" for f in comp["touches_files"]))
if arch_data.get("decisions"):
arch_md_parts.append("## Key Decisions")
for dec in arch_data["decisions"]:
arch_md_parts.append(f"- **{dec.get('decision', '')}**: {dec.get('rationale', '')}")
architecture_markdown = "\n\n".join(arch_md_parts)

issues_for_template = [
{
"name": issue.get("name", ""),
"title": issue.get("title", ""),
"description": issue.get("description", ""),
"dependsOn": issue.get("depends_on", []),
"filesToModify": issue.get("files_to_modify", []),
"filesToCreate": issue.get("files_to_create", []),
"acceptanceCriteria": issue.get("acceptance_criteria", []),
}
for issue in plan_result.get("issues", [])
]

return plan_summary, prd_markdown, architecture_markdown, issues_for_template


@app.reasoner()
async def build(
goal: str,
Expand Down Expand Up @@ -556,6 +610,210 @@ async def build(
tags=["build", "git_init", "error"],
)

# 1.5 APPROVAL CHECKPOINT — pause for human plan review when HAX_API_KEY is set.
# SWE-AF posts the plan to hax-sdk and pauses on the control plane until
# the reviewer responds. On request_changes, re-runs Architect → Tech Lead
# → Sprint Planner with the feedback and re-requests approval, bounded by
# cfg.max_plan_revision_iterations.
_hax_api_key = os.environ.get("HAX_API_KEY", "").strip()
execution_id = app.ctx.execution_id if app.ctx else ""
if _hax_api_key and execution_id:
import json as _json
from hax import HaxClient

hax_client = HaxClient(
api_key=_hax_api_key,
base_url=os.environ.get("HAX_SDK_URL", "http://localhost:3000") + "/api/v1",
)
cp_base_url = (app.agentfield_server or "http://localhost:8080").rstrip("/")
approval_state_path = os.path.join(abs_artifacts_dir, "approval_state.json")
os.makedirs(os.path.dirname(approval_state_path), exist_ok=True)
revision_history: list[dict] = []

for revision_iter in range(cfg.max_plan_revision_iterations + 1):
app.note(
f"Phase 1.5: Requesting plan approval (iteration {revision_iter})",
tags=["build", "approval"],
)

plan_summary, prd_md, arch_md, issues_for_template = (
_format_plan_for_approval(plan_result)
)

title = "SWE-AF Plan Review"
if revision_iter > 0:
title = f"SWE-AF Plan Review (Revision {revision_iter})"

hax_payload = {
"planSummary": plan_summary,
"issues": issues_for_template,
"architecture": arch_md,
"prd": prd_md,
"metadata": {
"repoUrl": cfg.repo_url,
"goalDescription": goal,
"agentNodeId": NODE_ID,
"executionId": execution_id,
},
"revisionNumber": revision_iter,
"revisionHistory": revision_history,
}

hax_create_kwargs: dict = {
"type": "plan-review-v2",
"title": title,
"description": "Review the proposed implementation plan before execution begins",
"payload": hax_payload,
"webhook_url": f"{cp_base_url}/api/v1/webhooks/approval-response",
"expires_in_seconds": cfg.approval_expires_in_hours * 3600,
}
approval_user_id = os.environ.get("AGENTFIELD_APPROVAL_USER_ID", "")
if approval_user_id:
hax_create_kwargs["user_id"] = approval_user_id

hax_request = hax_client.create_request(**hax_create_kwargs)

with open(approval_state_path, "w") as _fp:
_json.dump({
"decision": "pending",
"feedback": "",
"request_id": hax_request.id,
"request_url": hax_request.url,
"revision_number": revision_iter,
}, _fp, indent=2)

approval_result = await app.pause(
approval_request_id=hax_request.id,
approval_request_url=hax_request.url,
expires_in_hours=cfg.approval_expires_in_hours,
)

with open(approval_state_path, "w") as _fp:
_json.dump({
"decision": approval_result.decision,
"feedback": approval_result.feedback,
"request_id": approval_result.approval_request_id,
"request_url": hax_request.url,
"revision_number": revision_iter,
"revision_history": revision_history,
}, _fp, indent=2)

if approval_result.approved:
app.note(
"Plan approved — proceeding to execution",
tags=["build", "approval", "approved"],
)
break

if approval_result.changes_requested:
if revision_iter >= cfg.max_plan_revision_iterations:
app.note(
f"Max plan revision iterations ({cfg.max_plan_revision_iterations}) reached",
tags=["build", "approval", "exhausted"],
)
return BuildResult(
plan_result=plan_result,
dag_state={},
success=False,
summary=f"Plan revision limit reached after {revision_iter + 1} iterations",
).model_dump()

revision_history.append({
"iteration": revision_iter,
"feedback": approval_result.feedback,
})

app.note(
f"Changes requested (iteration {revision_iter}): "
f"{approval_result.feedback[:200]}",
tags=["build", "approval", "request_changes"],
)

# Re-plan with the reviewer feedback. Skip PM (PRD/scope is fixed)
# and re-run Architect → Tech Lead loop → Sprint Planner.
arch = _unwrap(await app.call(
f"{NODE_ID}.run_architect",
prd=plan_result.get("prd", {}),
repo_path=repo_path,
artifacts_dir=artifacts_dir,
feedback=approval_result.feedback,
model=resolved["architect_model"],
permission_mode=cfg.permission_mode,
ai_provider=cfg.ai_provider,
workspace_manifest=manifest.model_dump() if manifest else None,
), "run_architect (human revision)")

review = None
for tl_iter in range(cfg.max_review_iterations + 1):
review = _unwrap(await app.call(
f"{NODE_ID}.run_tech_lead",
prd=plan_result.get("prd", {}),
repo_path=repo_path,
artifacts_dir=artifacts_dir,
revision_number=tl_iter,
model=resolved["tech_lead_model"],
permission_mode=cfg.permission_mode,
ai_provider=cfg.ai_provider,
workspace_manifest=manifest.model_dump() if manifest else None,
), "run_tech_lead")
if review["approved"]:
break
if tl_iter < cfg.max_review_iterations:
arch = _unwrap(await app.call(
f"{NODE_ID}.run_architect",
prd=plan_result.get("prd", {}),
repo_path=repo_path,
artifacts_dir=artifacts_dir,
feedback=review["feedback"],
model=resolved["architect_model"],
permission_mode=cfg.permission_mode,
ai_provider=cfg.ai_provider,
workspace_manifest=manifest.model_dump() if manifest else None,
), "run_architect (tech lead revision)")

if review and not review["approved"]:
review = ReviewResult(
approved=True,
feedback=review["feedback"],
scope_issues=review.get("scope_issues", []),
complexity_assessment=review.get("complexity_assessment", "appropriate"),
summary=review["summary"] + " [auto-approved after max iterations]",
).model_dump()

sprint_result = _unwrap(await app.call(
f"{NODE_ID}.run_sprint_planner",
prd=plan_result.get("prd", {}),
architecture=arch,
repo_path=repo_path,
artifacts_dir=artifacts_dir,
model=resolved["sprint_planner_model"],
permission_mode=cfg.permission_mode,
ai_provider=cfg.ai_provider,
workspace_manifest=manifest.model_dump() if manifest else None,
), "run_sprint_planner (revision)")

plan_result = {
**plan_result,
"architecture": arch,
"review": review,
"issues": sprint_result["issues"],
"rationale": sprint_result["rationale"],
}
continue

# Terminal: rejected, expired, or error
reason = approval_result.feedback or approval_result.decision
app.note(
f"Plan {approval_result.decision} by human reviewer: {reason}",
tags=["build", "approval", approval_result.decision],
)
return BuildResult(
plan_result=plan_result,
dag_state={},
success=False,
summary=f"Plan {approval_result.decision}: {reason}",
).model_dump()

# 2. EXECUTE
exec_config = cfg.to_execution_config_dict()

Expand Down
5 changes: 5 additions & 0 deletions swe_af/execution/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ class BuildConfig(BaseModel):
models: dict[str, str] | None = None

max_review_iterations: int = 2
max_plan_revision_iterations: int = 2 # human reviewer "request changes" loops
max_retries_per_issue: int = 2
max_replans: int = 2
enable_replanning: bool = True
Expand Down Expand Up @@ -735,6 +736,10 @@ class BuildConfig(BaseModel):
level_failure_abort_threshold: float = (
0.8 # abort DAG when >= this fraction of a level fails
)
# HITL plan-approval gate. Auto-engaged when HAX_API_KEY is set in the
# environment; this controls how long the request stays open before the
# control plane treats it as expired.
approval_expires_in_hours: int = 72

@model_validator(mode="before")
@classmethod
Expand Down
4 changes: 4 additions & 0 deletions swe_af/fast/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
import os
import re

from dotenv import load_dotenv

load_dotenv()

from agentfield import Agent
from swe_af.execution.envelope import unwrap_call_result as _unwrap
from swe_af.fast import fast_router
Expand Down
Loading