diff --git a/agent/README.md b/agent/README.md index a8e8103..37c782f 100644 --- a/agent/README.md +++ b/agent/README.md @@ -135,7 +135,7 @@ To be safe, the agent isolates each task into its own workspace directory: ### Endpoints -**`GET /ping`** — Health check. Returns `{"status": "healthy"}`. Stays responsive while the agent runs. +**`GET /ping`** — Health check. Returns `{"status": "healthy"}` while the last accepted task completed normally (or no task has failed in the background thread). If the background pipeline thread raised after `/invocations` returned, returns **503** with `{"status":"unhealthy","reason":"background_pipeline_failed"}` so a live process is not mistaken for successful work. The task should also be marked **FAILED** in DynamoDB (written from both `pipeline.run_task` and the server thread as a backup). A separate daemon thread writes a DynamoDB heartbeat (`agent_heartbeat_at`) every 45 seconds while a task is running; the orchestrator uses this to detect agent crashes (see [ORCHESTRATOR.md](../docs/design/ORCHESTRATOR.md)). The heartbeat worker is resilient to transient DynamoDB errors — each write is wrapped in try/except so a single failure does not kill the thread. **`POST /invocations`** — Accept a task and start the agent in a **background thread**. The handler returns **immediately** with an acceptance payload; it does not wait for the agent to finish. While the task runs, progress and the final outcome are written to **DynamoDB** when `TASK_TABLE_NAME` is set (see `task_state.py`); the deployed platform polls that table via the orchestrator. For ad-hoc local testing without DynamoDB, follow **`docker logs -f bgagent-run`** (or your container name). @@ -270,7 +270,8 @@ The agent pipeline (shared by both modes). Behavior varies by task type (`new_ta - **`pr_iteration`**: Reads review feedback, addresses it with focused changes, commits and pushes, posts a summary comment on the PR - **`pr_review`**: Analyzes changes read-only (no `Write` or `Edit` tools available), composes structured review findings, posts a batch review via the GitHub Reviews API 6. **Deterministic post-hooks** — verifies `mise run build` and `mise run lint`, ensures a PR exists (creates one if the agent did not). For `pr_review`, build status is informational only and the commit/push steps are skipped. -7. **Metrics** — returns duration, disk usage, turn count, cost, and PR URL +7. **Status resolution** — `_resolve_overall_task_status()` maps the agent outcome (success/end_turn/error/unknown) and build gate into a final task status. `agent_status=unknown` (SDK stream ended without a ResultMessage) always fails — success is never inferred from PR or build alone. If a post-hook raises after the agent ran, `_chain_prior_agent_error()` preserves the agent-layer error so it is not masked by the later exception. +8. **Metrics** — returns duration, disk usage, turn count, cost, and PR URL ## Metrics @@ -327,8 +328,8 @@ agent/ │ ├── __init__.py │ ├── entrypoint.py Re-export shim for backward compatibility (tests); delegates to specific modules │ ├── config.py Configuration: build_config(), get_config(), resolve_github_token(), TaskType validation -│ ├── models.py Data models and enumerations (TaskType StrEnum with is_pr_task property) -│ ├── pipeline.py Top-level pipeline: main() CLI entry, run_task() orchestration +│ ├── models.py Pydantic data models (TaskConfig, RepoSetup, AgentResult, TaskResult, HydratedContext, etc.) and enumerations (TaskType StrEnum) +│ ├── pipeline.py Top-level pipeline: main() CLI entry, run_task() orchestration, status resolution, error chaining │ ├── runner.py Agent runner: run_agent() — ClaudeSDKClient connect/query/receive_response │ ├── context.py Context hydration: fetch_github_issue(), assemble_prompt() (local/dry-run only) │ ├── prompt_builder.py System prompt assembly + memory context, repo config scanning @@ -338,8 +339,8 @@ agent/ │ ├── repo.py Repository setup: clone, branch, git auth, mise trust/install/build/lint │ ├── shell.py Shell utilities: log(), run_cmd(), redact_secrets(), slugify(), truncate() │ ├── telemetry.py Metrics, disk usage, trajectory writer (_TrajectoryWriter with write_policy_decision) -│ ├── server.py FastAPI — async /invocations (background thread) and /ping; OTEL session correlation -│ ├── task_state.py Best-effort DynamoDB task status (no-op if TASK_TABLE_NAME unset) +│ ├── server.py FastAPI — async /invocations (background thread), /ping health check, heartbeat daemon; OTEL session correlation +│ ├── task_state.py Best-effort DynamoDB task status and heartbeat writes (no-op if TASK_TABLE_NAME unset) │ ├── observability.py OpenTelemetry helpers (e.g. AgentCore session id) │ ├── memory.py Optional memory / episode integration for the agent │ ├── system_prompt.py Behavioral contract (PRD Section 11) @@ -354,9 +355,9 @@ agent/ ├── tests/ pytest unit tests (pythonpath: src/) │ ├── test_config.py Config validation and TaskType tests │ ├── test_hooks.py PreToolUse hook and hook matcher tests -│ ├── test_models.py TaskType enum tests +│ ├── test_models.py Pydantic model tests (construction, validation, frozen enforcement, model_dump) │ ├── test_policy.py Cedar policy engine tests (fail-closed, deny-list) -│ ├── test_pipeline.py Pipeline orchestration tests (cedar_policies injection) +│ ├── test_pipeline.py Pipeline tests (cedar_policies injection, _resolve_overall_task_status, _chain_prior_agent_error) │ ├── test_shell.py Shell utility tests (slugify, redact_secrets, truncate, format_bytes) │ └── ... ├── test_sdk_smoke.py Diagnostic: minimal SDK smoke test (ClaudeSDKClient → CLI → Bedrock) diff --git a/agent/src/context.py b/agent/src/context.py index abb2e9d..a459d17 100644 --- a/agent/src/context.py +++ b/agent/src/context.py @@ -31,7 +31,8 @@ def fetch_github_issue(repo_url: str, issue_number: str, token: str) -> GitHubIs ) comments_resp.raise_for_status() comments = [ - IssueComment(author=c["user"]["login"], body=c["body"]) for c in comments_resp.json() + IssueComment(id=int(c["id"]), author=c["user"]["login"], body=c["body"] or "") + for c in comments_resp.json() ] return GitHubIssue( diff --git a/agent/src/models.py b/agent/src/models.py index 05c155f..7fa393e 100644 --- a/agent/src/models.py +++ b/agent/src/models.py @@ -3,8 +3,9 @@ from __future__ import annotations from enum import StrEnum +from typing import Self -from pydantic import BaseModel, ConfigDict +from pydantic import BaseModel, ConfigDict, Field, model_validator class TaskType(StrEnum): @@ -24,36 +25,64 @@ def is_read_only(self) -> bool: class IssueComment(BaseModel): - model_config = ConfigDict(frozen=True) + """Single GitHub issue comment — mirrors ``IssueComment`` in context-hydration.ts.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + id: int author: str body: str class GitHubIssue(BaseModel): - model_config = ConfigDict(frozen=True) + """GitHub issue slice — mirrors ``GitHubIssueContext`` in context-hydration.ts.""" + + model_config = ConfigDict(frozen=True, extra="forbid") title: str body: str = "" number: int - comments: list[IssueComment] = [] + comments: list[IssueComment] = Field(default_factory=list) class MemoryContext(BaseModel): - model_config = ConfigDict(frozen=True) + model_config = ConfigDict(frozen=True, extra="forbid") + + repo_knowledge: list[str] = Field(default_factory=list) + past_episodes: list[str] = Field(default_factory=list) + - repo_knowledge: list[str] = [] - past_episodes: list[str] = [] +# Bump when this agent supports a new orchestrator HydratedContext shape +# (see cdk/src/handlers/shared/context-hydration.ts). +SUPPORTED_HYDRATED_CONTEXT_VERSION = 1 class HydratedContext(BaseModel): - model_config = ConfigDict(frozen=True) + """Orchestrator context JSON — keep in sync with HydratedContext in context-hydration.ts.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + version: int = 1 user_prompt: str issue: GitHubIssue | None = None - resolved_base_branch: str | None = None - truncated: bool = False memory_context: MemoryContext | None = None + sources: list[str] = Field(default_factory=list) + token_estimate: int = 0 + truncated: bool = False + fallback_error: str | None = None + guardrail_blocked: str | None = None + resolved_branch_name: str | None = None + resolved_base_branch: str | None = None + + @model_validator(mode="after") + def version_supported(self) -> Self: + if self.version > SUPPORTED_HYDRATED_CONTEXT_VERSION: + raise ValueError( + f"HydratedContext schema version {self.version} is not supported by this agent " + f"(max supported: {SUPPORTED_HYDRATED_CONTEXT_VERSION}). " + "Deploy an updated agent container image." + ) + return self class TaskConfig(BaseModel): diff --git a/agent/src/pipeline.py b/agent/src/pipeline.py index 8708800..b1dcafc 100644 --- a/agent/src/pipeline.py +++ b/agent/src/pipeline.py @@ -9,6 +9,8 @@ import sys import time +from pydantic import ValidationError + import memory as agent_memory import task_state from config import AGENT_WORKSPACE, build_config, get_config @@ -28,6 +30,55 @@ from system_prompt import SYSTEM_PROMPT from telemetry import format_bytes, get_disk_usage, print_metrics +_SDK_NO_RESULT_MESSAGE = ( + "Agent SDK stream ended without a ResultMessage (agent_status=unknown). " + "Treat as failure: possible SDK bug, network interruption, or protocol mismatch." +) + + +def _chain_prior_agent_error(agent_result: AgentResult | None, exc: BaseException) -> str: + """Preserve agent-layer failures when a later pipeline stage raises.""" + tail = f"{type(exc).__name__}: {exc}" + if agent_result is None: + return tail + if agent_result.error: + return f"{agent_result.error}; subsequent failure: {tail}" + if agent_result.status == "error": + return f"Agent reported status=error; subsequent failure: {tail}" + return tail + + +def _resolve_overall_task_status( + agent_result: AgentResult, + *, + build_ok: bool, + pr_url: str | None, +) -> tuple[str, str | None]: + """Map agent outcome + build gate to (overall_status, error_for_task_result).""" + agent_status = agent_result.status + err = agent_result.error + + if agent_status in ("success", "end_turn") and build_ok: + return "success", err + + if agent_status == "unknown": + if pr_url: + log( + "INFO", + f"No ResultMessage from SDK (agent_status=unknown); pr_url present: {pr_url}", + ) + if build_ok: + log( + "INFO", + "No ResultMessage from SDK; build_ok=True (informational; task still failed)", + ) + merged = f"{err}; {_SDK_NO_RESULT_MESSAGE}" if err else _SDK_NO_RESULT_MESSAGE + return "error", merged + + if not err: + err = f"Task did not succeed (agent_status={agent_status!r}, build_ok={build_ok})" + return "error", err + def _write_memory( config: TaskConfig, @@ -149,20 +200,43 @@ def run_task( }, ) as root_span: task_state.write_running(config.task_id) + task_state.write_heartbeat(config.task_id) + agent_result: AgentResult | None = None try: # Context hydration with task_span("task.context_hydration"): if hydrated_context: log("TASK", "Using hydrated context from orchestrator") - hc = HydratedContext.model_validate(hydrated_context) + try: + hc = HydratedContext.model_validate(hydrated_context) + except ValidationError as err: + parts = [ + f"{'.'.join(str(x) for x in e['loc'])}: {e['msg']}" + for e in err.errors() + ] + log( + "ERROR", + "HydratedContext validation failed (orchestrator vs agent contract): " + + "; ".join(parts), + ) + raise prompt = hc.user_prompt if hc.issue: config.issue = hc.issue + if hc.resolved_branch_name: + config.branch_name = hc.resolved_branch_name if hc.resolved_base_branch: config.base_branch = hc.resolved_base_branch if hc.truncated: log("WARN", "Context was truncated by orchestrator token budget") + if hc.fallback_error: + log("WARN", f"Orchestrator context fallback: {hc.fallback_error}") + if hc.guardrail_blocked: + log( + "WARN", + f"Orchestrator guardrail blocked content: {hc.guardrail_blocked}", + ) else: hc = None # Local batch mode — fetch issue and assemble prompt in-container @@ -240,7 +314,7 @@ def run_task( agent_span.record_exception(e) agent_result = AgentResult(status="error", error=str(e)) - # Post-hooks + # Post-hooks (agent_result is guaranteed set by the try/except above) with task_span("task.post_hooks") as post_span: # Safety net: commit any uncommitted tracked changes (skip for read-only tasks) if config.task_type == "pr_review": @@ -276,16 +350,9 @@ def run_task( duration = time.time() - start_time disk_after = get_disk_usage(AGENT_WORKSPACE) - # Determine overall status: - # - "success" if the agent reported success/end_turn and the build passes - # (or the build was already broken before the agent ran — pre-existing failure) - # - "success" if agent_status is unknown (SDK didn't yield ResultMessage) - # but the pipeline produced a PR and the build didn't regress - # - "error" otherwise - # NOTE: lint_passed is intentionally NOT used in the status - # determination — lint failures are advisory and reported in the PR - # body and span attributes but do not affect the task's terminal - # status. Lint regression detection is planned for Iteration 3c. + # Overall status: do not infer success from PR/build when the SDK never + # emitted ResultMessage (agent_status=unknown) — that masks protocol gaps. + # NOTE: lint_passed is intentionally NOT used for terminal status. agent_status = agent_result.status # Default True = assume build was green before, so a post-agent # failure IS counted as a regression (conservative). @@ -302,17 +369,11 @@ def run_task( "Post-agent build failed, but build was already failing before " "agent changes — not counting as regression", ) - if agent_status in ("success", "end_turn") and build_ok: - overall_status = "success" - elif agent_status == "unknown" and pr_url and build_ok: - log( - "WARN", - "Agent SDK did not yield a ResultMessage, but PR was created " - "and build didn't regress — treating as success", - ) - overall_status = "success" - else: - overall_status = "error" + overall_status, result_error = _resolve_overall_task_status( + agent_result, + build_ok=build_ok, + pr_url=pr_url, + ) # Build TaskResult usage = agent_result.usage @@ -331,7 +392,7 @@ def run_task( disk_delta=format_bytes(disk_after - disk_before), prompt_version=prompt_version or None, memory_written=memory_written, - error=agent_result.error, + error=result_error, session_id=agent_result.session_id or None, input_tokens=usage.input_tokens if usage else None, output_tokens=usage.output_tokens if usage else None, @@ -377,7 +438,14 @@ def run_task( except Exception as e: # Ensure the task is marked FAILED in DynamoDB even if the pipeline # crashes before reaching the normal terminal-state write. - crash_result = TaskResult(status="error", error=str(e), task_id=config.task_id) + agent_for_chain = agent_result + combined = _chain_prior_agent_error(agent_for_chain, e) + crash_result = TaskResult( + status="error", + error=combined, + task_id=config.task_id, + agent_status=agent_for_chain.status if agent_for_chain else "unknown", + ) task_state.write_terminal(config.task_id, "FAILED", crash_result.model_dump()) raise diff --git a/agent/src/server.py b/agent/src/server.py index d501c2e..b1c58e4 100644 --- a/agent/src/server.py +++ b/agent/src/server.py @@ -18,9 +18,12 @@ from typing import Any from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse from pydantic import BaseModel +import task_state from config import resolve_github_token +from models import TaskResult from observability import set_session_id from pipeline import run_task @@ -45,6 +48,21 @@ def filter(self, record: logging.LogRecord) -> bool: _active_threads: list[threading.Thread] = [] _threads_lock = threading.Lock() +# Set when the pipeline thread raises after /invocations accepted (Dynamo backup + ping signal). +_background_pipeline_failed = False + + +def _heartbeat_worker(task_id: str, stop: threading.Event) -> None: + """Periodically refresh ``agent_heartbeat_at`` so the orchestrator can detect crashes.""" + while not stop.wait(timeout=45): + try: + task_state.write_heartbeat(task_id) + except Exception as e: + print( + f"[heartbeat] write_heartbeat error (will retry): {type(e).__name__}: {e}", + flush=True, + ) + def _drain_threads(timeout: int = 300) -> None: """Join all active background threads, allowing in-flight tasks to complete.""" @@ -85,7 +103,15 @@ class InvocationResponse(BaseModel): @app.get("/ping") async def ping(): - """Health check endpoint. Must remain responsive at all times.""" + """Health check endpoint. Returns 503 if the last background pipeline thread crashed.""" + if _background_pipeline_failed: + return JSONResponse( + status_code=503, + content={ + "status": "unhealthy", + "reason": "background_pipeline_failed", + }, + ) return {"status": "healthy"} @@ -110,6 +136,19 @@ def _run_task_background( cedar_policies: list[str] | None = None, ) -> None: """Run the agent task in a background thread.""" + global _background_pipeline_failed + + stop_heartbeat = threading.Event() + hb_thread: threading.Thread | None = None + if task_id: + hb_thread = threading.Thread( + target=_heartbeat_worker, + args=(task_id, stop_heartbeat), + name=f"heartbeat-{task_id}", + daemon=True, + ) + hb_thread.start() + try: # Propagate session ID into this thread's OTEL context so spans # are correlated with the AgentCore session in CloudWatch. @@ -135,9 +174,22 @@ def _run_task_background( pr_number=pr_number, cedar_policies=cedar_policies, ) + _background_pipeline_failed = False except Exception as e: + _background_pipeline_failed = True print(f"Background task {task_id} failed: {type(e).__name__}: {e}") traceback.print_exc() + if task_id: + backup = TaskResult( + status="error", + error=f"Background pipeline thread: {type(e).__name__}: {e}", + task_id=task_id, + ) + task_state.write_terminal(task_id, "FAILED", backup.model_dump()) + finally: + stop_heartbeat.set() + if hb_thread is not None and hb_thread.is_alive(): + hb_thread.join(timeout=3) @app.post("/invocations", response_model=InvocationResponse) @@ -203,8 +255,12 @@ def invoke_agent(request: Request, body: InvocationRequest): ) # Track the thread for graceful shutdown BEFORE starting it so # _drain_threads cannot miss a very-short-lived thread. + global _background_pipeline_failed + with _threads_lock: _active_threads[:] = [t for t in _active_threads if t.is_alive()] + if not _active_threads: + _background_pipeline_failed = False _active_threads.append(thread) thread.start() diff --git a/agent/src/task_state.py b/agent/src/task_state.py index 8ac43cf..5632bb0 100644 --- a/agent/src/task_state.py +++ b/agent/src/task_state.py @@ -72,6 +72,30 @@ def write_submitted( print(f"[task_state] write_submitted failed (best-effort): {e}") +def write_heartbeat(task_id: str) -> None: + """Update ``agent_heartbeat_at`` while the task is RUNNING (orchestrator crash detection).""" + try: + table = _get_table() + if table is None: + return + table.update_item( + Key={"task_id": task_id}, + UpdateExpression="SET agent_heartbeat_at = :t", + ConditionExpression="#s = :running", + ExpressionAttributeNames={"#s": "status"}, + ExpressionAttributeValues={":t": _now_iso(), ":running": "RUNNING"}, + ) + except Exception as e: + from botocore.exceptions import ClientError + + if ( + isinstance(e, ClientError) + and e.response.get("Error", {}).get("Code") == "ConditionalCheckFailedException" + ): + return + print(f"[task_state] write_heartbeat failed (best-effort): {type(e).__name__}: {e}") + + def write_running(task_id: str) -> None: """Transition a task to RUNNING (called at agent start).""" try: diff --git a/agent/tests/test_entrypoint.py b/agent/tests/test_entrypoint.py index 296da65..bd9d0c7 100644 --- a/agent/tests/test_entrypoint.py +++ b/agent/tests/test_entrypoint.py @@ -239,7 +239,7 @@ def test_with_issue(self): number=42, title="Login broken", body="Users cannot log in", - comments=[IssueComment(author="alice", body="Confirmed!")], + comments=[IssueComment(id=1, author="alice", body="Confirmed!")], ), ) result = assemble_prompt(config) diff --git a/agent/tests/test_models.py b/agent/tests/test_models.py index 4a51531..17f5830 100644 --- a/agent/tests/test_models.py +++ b/agent/tests/test_models.py @@ -4,6 +4,7 @@ from pydantic import ValidationError from models import ( + SUPPORTED_HYDRATED_CONTEXT_VERSION, AgentResult, GitHubIssue, HydratedContext, @@ -52,19 +53,24 @@ def test_str_enum_membership(self): class TestIssueComment: def test_construction(self): - c = IssueComment(author="alice", body="Looks good!") + c = IssueComment(id=1, author="alice", body="Looks good!") + assert c.id == 1 assert c.author == "alice" assert c.body == "Looks good!" def test_frozen(self): - c = IssueComment(author="alice", body="text") + c = IssueComment(id=1, author="alice", body="text") with pytest.raises(ValidationError): c.author = "bob" def test_model_dump(self): - c = IssueComment(author="alice", body="text") + c = IssueComment(id=99, author="alice", body="text") d = c.model_dump() - assert d == {"author": "alice", "body": "text"} + assert d == {"id": 99, "author": "alice", "body": "text"} + + def test_extra_forbidden(self): + with pytest.raises(ValidationError): + IssueComment.model_validate({"id": 1, "author": "a", "body": "b", "unknown": True}) class TestGitHubIssue: @@ -80,7 +86,7 @@ def test_construction_with_comments(self): title="Bug", body="desc", number=42, - comments=[IssueComment(author="bob", body="noted")], + comments=[IssueComment(id=10, author="bob", body="noted")], ) assert len(issue.comments) == 1 assert issue.comments[0].author == "bob" @@ -111,11 +117,17 @@ def test_frozen(self): class TestHydratedContext: def test_construction(self): hc = HydratedContext(user_prompt="Fix the bug") + assert hc.version == 1 assert hc.user_prompt == "Fix the bug" assert hc.issue is None + assert hc.sources == [] + assert hc.token_estimate == 0 + assert hc.resolved_branch_name is None assert hc.resolved_base_branch is None assert hc.truncated is False assert hc.memory_context is None + assert hc.fallback_error is None + assert hc.guardrail_blocked is None def test_with_nested_models(self): hc = HydratedContext( @@ -133,14 +145,61 @@ def test_frozen(self): def test_model_validate_from_dict(self): data = { + "version": 1, "user_prompt": "Fix bug", "issue": {"title": "Bug", "number": 42, "body": "", "comments": []}, + "sources": ["github_issue"], + "token_estimate": 100, "truncated": True, } hc = HydratedContext.model_validate(data) assert hc.user_prompt == "Fix bug" assert hc.issue is not None and hc.issue.number == 42 assert hc.truncated is True + assert hc.sources == ["github_issue"] + assert hc.token_estimate == 100 + + def test_model_validate_orchestrator_shape(self): + data = { + "version": 1, + "user_prompt": "Do the thing", + "issue": { + "number": 7, + "title": "T", + "body": "B", + "comments": [{"id": 1, "author": "u", "body": "c"}], + }, + "memory_context": {"repo_knowledge": ["k"], "past_episodes": ["e"]}, + "sources": ["github_issue", "memory"], + "token_estimate": 500, + "truncated": False, + "fallback_error": None, + "guardrail_blocked": None, + "resolved_branch_name": "feat/x", + "resolved_base_branch": "main", + } + hc = HydratedContext.model_validate(data) + assert hc.resolved_branch_name == "feat/x" + assert hc.issue is not None + assert hc.issue.comments[0].id == 1 + + def test_version_above_supported_fails(self): + with pytest.raises(ValidationError) as excinfo: + HydratedContext( + version=SUPPORTED_HYDRATED_CONTEXT_VERSION + 1, + user_prompt="x", + ) + assert "not supported" in str(excinfo.value).lower() + assert str(SUPPORTED_HYDRATED_CONTEXT_VERSION + 1) in str(excinfo.value) + + def test_extra_top_level_forbidden(self): + with pytest.raises(ValidationError): + HydratedContext.model_validate( + { + "user_prompt": "x", + "future_orchestrator_field": True, + } + ) class TestTaskConfig: diff --git a/agent/tests/test_pipeline.py b/agent/tests/test_pipeline.py index 882f1a6..e6d783c 100644 --- a/agent/tests/test_pipeline.py +++ b/agent/tests/test_pipeline.py @@ -1,8 +1,9 @@ -"""Unit tests for pipeline.py — cedar_policies injection.""" +"""Unit tests for pipeline.py — cedar_policies injection and pure helpers.""" from unittest.mock import MagicMock, patch from models import AgentResult, RepoSetup, TaskConfig +from pipeline import _chain_prior_agent_error, _resolve_overall_task_status class TestCedarPoliciesInjection: @@ -137,3 +138,96 @@ async def fake_run_agent(_prompt, _system_prompt, config, cwd=None): assert captured_config is not None assert captured_config.cedar_policies == [] + + +class TestChainPriorAgentError: + def test_none_agent_result_returns_exception_only(self): + exc = RuntimeError("post-hook crash") + assert _chain_prior_agent_error(None, exc) == "RuntimeError: post-hook crash" + + def test_agent_with_error_chains_both(self): + ar = AgentResult(status="error", error="SDK timeout") + exc = ValueError("PR creation failed") + result = _chain_prior_agent_error(ar, exc) + assert result == "SDK timeout; subsequent failure: ValueError: PR creation failed" + + def test_agent_error_status_without_error_message(self): + ar = AgentResult(status="error") + exc = OSError("disk full") + result = _chain_prior_agent_error(ar, exc) + assert result == "Agent reported status=error; subsequent failure: OSError: disk full" + + def test_agent_success_returns_exception_only(self): + ar = AgentResult(status="success") + exc = RuntimeError("unexpected") + assert _chain_prior_agent_error(ar, exc) == "RuntimeError: unexpected" + + def test_agent_unknown_no_error_returns_exception_only(self): + ar = AgentResult(status="unknown") + exc = TypeError("bad arg") + assert _chain_prior_agent_error(ar, exc) == "TypeError: bad arg" + + +class TestResolveOverallTaskStatus: + def test_success_with_build_ok(self): + ar = AgentResult(status="success") + status, err = _resolve_overall_task_status(ar, build_ok=True, pr_url="https://pr") + assert status == "success" + assert err is None + + def test_end_turn_with_build_ok(self): + ar = AgentResult(status="end_turn") + status, err = _resolve_overall_task_status(ar, build_ok=True, pr_url=None) + assert status == "success" + assert err is None + + def test_success_with_build_failed(self): + ar = AgentResult(status="success") + status, err = _resolve_overall_task_status(ar, build_ok=False, pr_url="https://pr") + assert status == "error" + assert err is not None + assert "agent_status='success'" in err + assert "build_ok=False" in err + + def test_unknown_always_error_even_with_pr_and_build(self): + """agent_status=unknown must always fail — never infer success from PR/build.""" + ar = AgentResult(status="unknown") + status, err = _resolve_overall_task_status(ar, build_ok=True, pr_url="https://pr") + assert status == "error" + assert err is not None + assert "ResultMessage" in err + + def test_unknown_with_prior_error_chains(self): + ar = AgentResult(status="unknown", error="connection reset") + status, err = _resolve_overall_task_status(ar, build_ok=False, pr_url=None) + assert status == "error" + assert err is not None + assert "connection reset" in err + assert "ResultMessage" in err + + def test_error_status_preserves_agent_error(self): + ar = AgentResult(status="error", error="OOM killed") + status, err = _resolve_overall_task_status(ar, build_ok=False, pr_url=None) + assert status == "error" + assert err == "OOM killed" + + def test_error_status_without_agent_error_generates_message(self): + ar = AgentResult(status="error") + status, err = _resolve_overall_task_status(ar, build_ok=False, pr_url=None) + assert status == "error" + assert err is not None + assert "agent_status='error'" in err + + def test_unknown_no_pr_no_build(self): + ar = AgentResult(status="unknown") + status, err = _resolve_overall_task_status(ar, build_ok=False, pr_url=None) + assert status == "error" + assert err is not None + assert "ResultMessage" in err + + def test_success_preserves_existing_error(self): + """If agent reports success with a non-fatal error, it's preserved on success.""" + ar = AgentResult(status="success", error="non-fatal warning") + status, err = _resolve_overall_task_status(ar, build_ok=True, pr_url=None) + assert status == "success" + assert err == "non-fatal warning" diff --git a/agent/tests/test_pipeline_outcomes.py b/agent/tests/test_pipeline_outcomes.py new file mode 100644 index 0000000..4b22258 --- /dev/null +++ b/agent/tests/test_pipeline_outcomes.py @@ -0,0 +1,59 @@ +"""Unit tests for pipeline task outcome resolution and error chaining.""" + +from models import AgentResult +from pipeline import _chain_prior_agent_error, _resolve_overall_task_status + + +class TestResolveOverallTaskStatus: + def test_success_end_turn_with_build_ok(self): + ar = AgentResult(status="success", error=None) + overall, err = _resolve_overall_task_status(ar, build_ok=True, pr_url="https://pr") + assert overall == "success" + assert err is None + + def test_unknown_is_always_error_even_with_pr(self): + ar = AgentResult(status="unknown", error=None) + overall, err = _resolve_overall_task_status( + ar, + build_ok=True, + pr_url="https://github.com/o/r/pull/1", + ) + assert overall == "error" + assert err is not None + assert "ResultMessage" in (err or "") + assert "unknown" in (err or "").lower() + + def test_unknown_merges_existing_agent_error(self): + ar = AgentResult(status="unknown", error="receive_response() failed: boom") + overall, err = _resolve_overall_task_status(ar, build_ok=True, pr_url=None) + assert overall == "error" + assert err is not None + assert "receive_response() failed: boom" in err + assert "ResultMessage" in err + + def test_error_status_without_message_gets_default(self): + ar = AgentResult(status="error", error=None) + overall, err = _resolve_overall_task_status(ar, build_ok=True, pr_url=None) + assert overall == "error" + assert err is not None + assert "agent_status='error'" in err + + +class TestChainPriorAgentError: + def test_no_agent_result(self): + msg = _chain_prior_agent_error(None, ValueError("post-hook failed")) + assert "ValueError" in msg + assert "post-hook failed" in msg + + def test_chains_agent_error_string(self): + ar = AgentResult(status="error", error="agent blew up") + msg = _chain_prior_agent_error(ar, RuntimeError("ensure_pr failed")) + assert "agent blew up" in msg + assert "ensure_pr failed" in msg + assert "subsequent failure" in msg + + def test_chains_status_error_without_message(self): + ar = AgentResult(status="error", error=None) + msg = _chain_prior_agent_error(ar, OSError("network")) + assert "Agent reported status=error" in msg + assert "network" in msg diff --git a/agent/tests/test_server.py b/agent/tests/test_server.py new file mode 100644 index 0000000..fdc951f --- /dev/null +++ b/agent/tests/test_server.py @@ -0,0 +1,73 @@ +"""Tests for AgentCore FastAPI server behavior.""" + +import time +from unittest.mock import MagicMock + +import pytest +from fastapi.testclient import TestClient + +import server + + +@pytest.fixture(autouse=True) +def reset_server_state(): + server._background_pipeline_failed = False + with server._threads_lock: + server._active_threads.clear() + yield + server._background_pipeline_failed = False + with server._threads_lock: + server._active_threads.clear() + + +@pytest.fixture +def client(): + return TestClient(server.app) + + +def test_ping_healthy_by_default(client): + r = client.get("/ping") + assert r.status_code == 200 + assert r.json() == {"status": "healthy"} + + +def test_background_thread_failure_503_and_backup_terminal_write(client, monkeypatch): + def boom(**_kwargs): + raise RuntimeError("simulated pipeline crash") + + mock_write = MagicMock() + monkeypatch.setattr(server, "run_task", boom) + monkeypatch.setattr(server.task_state, "write_terminal", mock_write) + + client.post( + "/invocations", + json={ + "input": { + "task_id": "task-crash-1", + "repo_url": "o/r", + "prompt": "x", + "github_token": "ghp_x", + "aws_region": "us-east-1", + } + }, + ) + + deadline = time.time() + 5.0 + while time.time() < deadline: + r = client.get("/ping") + if r.status_code == 503: + break + time.sleep(0.05) + assert r.status_code == 503 + body = r.json() + assert body["status"] == "unhealthy" + assert body["reason"] == "background_pipeline_failed" + + mock_write.assert_called() + call_kw = mock_write.call_args + assert call_kw[0][0] == "task-crash-1" + assert call_kw[0][1] == "FAILED" + dumped = call_kw[0][2] + assert "error" in dumped + assert "Background pipeline thread" in dumped["error"] + assert "RuntimeError" in dumped["error"] diff --git a/cdk/src/constructs/task-api.ts b/cdk/src/constructs/task-api.ts index 80e2aa7..4d60278 100644 --- a/cdk/src/constructs/task-api.ts +++ b/cdk/src/constructs/task-api.ts @@ -94,6 +94,12 @@ export interface TaskApiProps { * @default 30 */ readonly webhookRetentionDays?: number; + + /** + * AgentCore runtime ARNs for which cancel-task may call `StopRuntimeSession`. + * First ARN is also passed as `RUNTIME_ARN` when the task record has no `agent_runtime_arn`. + */ + readonly agentCoreStopSessionRuntimeArns?: string[]; } /** @@ -318,12 +324,18 @@ export class TaskApi extends Construct { bundling: commonBundling, }); + const cancelTaskEnv: Record = { ...commonEnv }; + const stopSessionArns = props.agentCoreStopSessionRuntimeArns ?? []; + if (stopSessionArns.length > 0) { + cancelTaskEnv.RUNTIME_ARN = stopSessionArns[0]!; + } + const cancelTaskFn = new lambda.NodejsFunction(this, 'CancelTaskFn', { entry: path.join(handlersDir, 'cancel-task.ts'), handler: 'handler', runtime: Runtime.NODEJS_24_X, architecture: Architecture.ARM_64, - environment: commonEnv, + environment: cancelTaskEnv, bundling: commonBundling, }); @@ -343,6 +355,14 @@ export class TaskApi extends Construct { props.taskTable.grantReadWriteData(cancelTaskFn); props.taskEventsTable.grantReadWriteData(cancelTaskFn); + if (stopSessionArns.length > 0) { + const runtimeResources = stopSessionArns.flatMap(arn => [arn, `${arn}/*`]); + cancelTaskFn.addToRolePolicy(new iam.PolicyStatement({ + actions: ['bedrock-agentcore:StopRuntimeSession'], + resources: runtimeResources, + })); + } + // Repo table read for onboarding gate if (props.repoTable) { props.repoTable.grantReadData(createTaskFn); diff --git a/cdk/src/handlers/cancel-task.ts b/cdk/src/handlers/cancel-task.ts index 3f154a1..10a1102 100644 --- a/cdk/src/handlers/cancel-task.ts +++ b/cdk/src/handlers/cancel-task.ts @@ -17,6 +17,7 @@ * SOFTWARE. */ +import { BedrockAgentCoreClient, StopRuntimeSessionCommand } from '@aws-sdk/client-bedrock-agentcore'; import { DynamoDBClient } from '@aws-sdk/client-dynamodb'; import { DynamoDBDocumentClient, GetCommand, PutCommand, UpdateCommand } from '@aws-sdk/lib-dynamodb'; import type { APIGatewayProxyEvent, APIGatewayProxyResult } from 'aws-lambda'; @@ -29,9 +30,11 @@ import type { TaskRecord } from './shared/types'; import { computeTtlEpoch } from './shared/validation'; const ddb = DynamoDBDocumentClient.from(new DynamoDBClient({})); +const agentCoreClient = new BedrockAgentCoreClient({}); const TABLE_NAME = process.env.TASK_TABLE_NAME!; const EVENTS_TABLE_NAME = process.env.TASK_EVENTS_TABLE_NAME!; const TASK_RETENTION_DAYS = Number(process.env.TASK_RETENTION_DAYS ?? '90'); +const RUNTIME_ARN = process.env.RUNTIME_ARN; /** * DELETE /v1/tasks/{task_id} — Cancel a task. @@ -73,6 +76,10 @@ export async function handler(event: APIGatewayProxyEvent): Promise = asyn if (TERMINAL_STATUSES.includes(current.status)) { return false; } - const result = await runPreflightChecks(task.repo, blueprintConfig, task.pr_number); + const result = await runPreflightChecks(task.repo, blueprintConfig, task.pr_number, task.task_type); if (!result.passed) { const errorMessage = `Pre-flight check failed: ${result.failureReason}${result.failureDetail ? ' — ' + result.failureDetail : ''}`; await failTask(taskId, current.status, errorMessage, task.user_id, true); @@ -127,12 +127,11 @@ const durableHandler: DurableExecutionHandler = asyn }); // Step 5: Wait for agent to finish - // NOTE: Polls DynamoDB every 30s rather than re-invoking the AgentCore session. - // The agent writes terminal status directly to DDB. If the agent crashes without - // writing a terminal status, we detect it via the HYDRATING early-exit check - // (MAX_NON_RUNNING_POLLS ~5min); otherwise the loop runs up to MAX_POLL_ATTEMPTS - // (~8.5h). A future improvement could add AgentCore session status checks for - // faster crash detection. + // Polls DynamoDB on each interval. The agent writes terminal status when done. + // While RUNNING, the runtime updates `agent_heartbeat_at`; if that timestamp + // goes stale, `pollTaskStatus` sets `sessionUnhealthy` so we fail fast instead + // of waiting the full MAX_POLL_ATTEMPTS window (~8.5h) after a silent crash. + // HYDRATING without transition to RUNNING is still bounded by MAX_NON_RUNNING_POLLS (~5min). const finalPollState = await context.waitForCondition( 'await-agent-completion', async (state) => { @@ -144,6 +143,9 @@ const durableHandler: DurableExecutionHandler = asyn if (state.lastStatus && TERMINAL_STATUSES.includes(state.lastStatus)) { return { shouldContinue: false }; } + if (state.sessionUnhealthy) { + return { shouldContinue: false }; + } if (state.attempts >= MAX_POLL_ATTEMPTS) { return { shouldContinue: false }; } diff --git a/cdk/src/handlers/shared/orchestrator.ts b/cdk/src/handlers/shared/orchestrator.ts index 539f376..1efbf02 100644 --- a/cdk/src/handlers/shared/orchestrator.ts +++ b/cdk/src/handlers/shared/orchestrator.ts @@ -51,8 +51,15 @@ const MEMORY_ID = process.env.MEMORY_ID; export interface PollState { readonly attempts: number; readonly lastStatus?: TaskStatusType; + /** True when the agent stopped sending heartbeats while still RUNNING (likely crash/OOM). */ + readonly sessionUnhealthy?: boolean; } +/** After RUNNING this long, we expect `agent_heartbeat_at` from the agent (if ever set). */ +const AGENT_HEARTBEAT_GRACE_SEC = 120; +/** If `agent_heartbeat_at` exists and is older than this, the session is treated as lost. */ +const AGENT_HEARTBEAT_STALE_SEC = 240; + /** * Load a task record from DynamoDB. * @param taskId - the task to load. @@ -381,6 +388,7 @@ export async function startSession( await transitionTask(task.task_id, TaskStatus.HYDRATING, TaskStatus.RUNNING, { session_id: sessionId, started_at: new Date().toISOString(), + agent_runtime_arn: runtimeArn, }); await emitTaskEvent(task.task_id, 'session_started', { session_id: sessionId }); @@ -400,15 +408,54 @@ export async function pollTaskStatus(taskId: string, state: PollState): Promise< const result = await ddb.send(new GetCommand({ TableName: TABLE_NAME, Key: { task_id: taskId }, - ProjectionExpression: '#status', - ExpressionAttributeNames: { '#status': 'status' }, + ProjectionExpression: '#st, session_id, started_at, agent_heartbeat_at', + ExpressionAttributeNames: { '#st': 'status' }, })); const currentStatus = result.Item?.status as TaskStatusType | undefined; + const item = result.Item as Record | undefined; + + let sessionUnhealthy = false; + if ( + currentStatus === TaskStatus.RUNNING + && item?.session_id + && typeof item.started_at === 'string' + ) { + const startedMs = Date.parse(item.started_at); + const now = Date.now(); + if (!Number.isNaN(startedMs)) { + const runningAgeSec = (now - startedMs) / 1000; + + if (typeof item.agent_heartbeat_at === 'string') { + // Agent has sent at least one heartbeat — check staleness + const hbMs = Date.parse(item.agent_heartbeat_at); + if (!Number.isNaN(hbMs)) { + const hbAgeSec = (now - hbMs) / 1000; + if (runningAgeSec > AGENT_HEARTBEAT_GRACE_SEC && hbAgeSec > AGENT_HEARTBEAT_STALE_SEC) { + sessionUnhealthy = true; + logger.warn('Agent heartbeat stale while task RUNNING', { + task_id: taskId, + agent_heartbeat_at: item.agent_heartbeat_at, + heartbeat_age_sec: Math.round(hbAgeSec), + }); + } + } + } else if (runningAgeSec > AGENT_HEARTBEAT_GRACE_SEC + AGENT_HEARTBEAT_STALE_SEC) { + // Agent never sent a heartbeat and task has been RUNNING well past + // the grace period — likely early crash before pipeline started. + sessionUnhealthy = true; + logger.warn('Agent never sent heartbeat while task RUNNING past grace period', { + task_id: taskId, + running_age_sec: Math.round(runningAgeSec), + }); + } + } + } return { attempts: state.attempts + 1, lastStatus: currentStatus, + sessionUnhealthy, }; } @@ -426,6 +473,52 @@ export async function finalizeTask( const task = await loadTask(taskId); const currentStatus = task.status; + // Lost session: RUNNING but agent heartbeats stopped (crash/OOM) — fail fast + if ( + pollState.sessionUnhealthy + && (currentStatus === TaskStatus.RUNNING || currentStatus === TaskStatus.FINALIZING) + ) { + let transitioned = false; + try { + await transitionTask(taskId, currentStatus, TaskStatus.FAILED, { + completed_at: new Date().toISOString(), + error_message: + 'Agent session lost: no recent heartbeat from the runtime (container may have crashed, been OOM-killed, or stopped)', + }); + transitioned = true; + } catch (err) { + // Task may have transitioned concurrently (e.g. agent wrote terminal status). + // Re-read to avoid double-decrement or contradictory events. + logger.warn('Finalization transition to FAILED (heartbeat) failed, task may have transitioned concurrently', { + task_id: taskId, + error: err instanceof Error ? err.message : String(err), + }); + } + if (transitioned) { + await emitTaskEvent(taskId, 'task_failed', { + reason: 'agent_heartbeat_stale', + poll_attempts: pollState.attempts, + }); + await decrementConcurrency(userId); + } else { + // Transition failed — re-read task to determine actual state. + // If already terminal the block below will handle TTL + concurrency. + const reread = await loadTask(taskId); + if (TERMINAL_STATUSES.includes(reread.status)) { + logger.info('Heartbeat path: task already terminal after failed transition', { task_id: taskId, status: reread.status }); + await emitTaskEvent(taskId, `task_${reread.status.toLowerCase()}`, { + final_status: reread.status, + poll_attempts: pollState.attempts, + }); + await decrementConcurrency(userId); + } else { + logger.warn('Heartbeat path: task in unexpected state after failed transition, releasing concurrency', { task_id: taskId, status: reread.status }); + await decrementConcurrency(userId); + } + } + return; + } + // If the agent already wrote a terminal status, just finalize if (TERMINAL_STATUSES.includes(currentStatus)) { logger.info('Task already in terminal state', { task_id: taskId, status: currentStatus }); diff --git a/cdk/src/handlers/shared/preflight.ts b/cdk/src/handlers/shared/preflight.ts index 1bc5d9c..f852240 100644 --- a/cdk/src/handlers/shared/preflight.ts +++ b/cdk/src/handlers/shared/preflight.ts @@ -23,6 +23,7 @@ import { resolveGitHubToken } from './context-hydration'; import { logger } from './logger'; import type { BlueprintConfig } from './repo-config'; +import type { TaskType } from './types'; // --------------------------------------------------------------------------- // Types @@ -30,6 +31,7 @@ import type { BlueprintConfig } from './repo-config'; export const PreflightFailureReason = { GITHUB_UNREACHABLE: 'GITHUB_UNREACHABLE', + INSUFFICIENT_GITHUB_REPO_PERMISSIONS: 'INSUFFICIENT_GITHUB_REPO_PERMISSIONS', REPO_NOT_FOUND_OR_NO_ACCESS: 'REPO_NOT_FOUND_OR_NO_ACCESS', RUNTIME_UNAVAILABLE: 'RUNTIME_UNAVAILABLE', PR_NOT_FOUND_OR_CLOSED: 'PR_NOT_FOUND_OR_CLOSED', @@ -59,6 +61,59 @@ export interface PreflightResult { const GITHUB_API_TIMEOUT_MS = 5_000; +/** GitHub GraphQL `viewerPermission` values that allow pushing branches (new_task / pr_iteration). */ +const CONTENTS_WRITE_LEVELS = new Set(['WRITE', 'MAINTAIN', 'ADMIN']); + +/** + * Minimum `viewerPermission` for pr_review (issue/PR comments without Contents write). + * See GitHub collaborator roles; TRIAGE can manage PRs without push. + */ +const PR_REVIEW_INTERACTION_LEVELS = new Set(['TRIAGE', 'WRITE', 'MAINTAIN', 'ADMIN']); + +function taskRequiresContentsWrite(taskType: TaskType): boolean { + return taskType === 'new_task' || taskType === 'pr_iteration'; +} + +function splitRepo(repo: string): { owner: string; name: string } | undefined { + const idx = repo.indexOf('/'); + if (idx <= 0 || idx === repo.length - 1) { + return undefined; + } + return { owner: repo.slice(0, idx), name: repo.slice(idx + 1) }; +} + +async function fetchViewerPermission(repo: string, token: string): Promise { + const parts = splitRepo(repo); + if (!parts) { + return undefined; + } + try { + const resp = await fetch('https://api.github.com/graphql', { + method: 'POST', + headers: { + 'Authorization': `Bearer ${token}`, + 'Content-Type': 'application/json', + 'Accept': 'application/json', + }, + body: JSON.stringify({ + query: 'query($owner:String!,$name:String!){repository(owner:$owner,name:$name){viewerPermission}}', + variables: { owner: parts.owner, name: parts.name }, + }), + signal: AbortSignal.timeout(GITHUB_API_TIMEOUT_MS), + }); + if (!resp.ok) { + return undefined; + } + const body = await resp.json() as { data?: { repository?: { viewerPermission?: string | null } } }; + const perm = body.data?.repository?.viewerPermission; + return perm ?? undefined; + } catch (err) { + const detail = err instanceof Error ? err.message : String(err); + logger.warn('GitHub GraphQL viewerPermission lookup failed', { repo, error: detail }); + return undefined; + } +} + // --------------------------------------------------------------------------- // Internal check functions // --------------------------------------------------------------------------- @@ -98,7 +153,7 @@ async function checkGitHubReachability(token: string): Promise { +async function checkRepoAccess(repo: string, token: string, taskType: TaskType): Promise { const start = Date.now(); try { const resp = await fetch(`https://api.github.com/repos/${repo}`, { @@ -109,27 +164,74 @@ async function checkRepoAccess(repo: string, token: string): Promise { return { check: 'runtime_availability', passed: true, durationMs: Date.now() - start }; } +/** Order for surfacing the most actionable failure when multiple checks fail. */ +const PREFLIGHT_FAILURE_PRIORITY: readonly PreflightFailureReasonType[] = [ + PreflightFailureReason.GITHUB_UNREACHABLE, + PreflightFailureReason.INSUFFICIENT_GITHUB_REPO_PERMISSIONS, + PreflightFailureReason.REPO_NOT_FOUND_OR_NO_ACCESS, + PreflightFailureReason.PR_NOT_FOUND_OR_CLOSED, + PreflightFailureReason.RUNTIME_UNAVAILABLE, +]; + +function pickPrimaryPreflightFailure(failedChecks: PreflightCheckResult[]): PreflightCheckResult { + for (const reason of PREFLIGHT_FAILURE_PRIORITY) { + const hit = failedChecks.find(c => c.reason === reason); + if (hit) { + return hit; + } + } + return failedChecks[0]; +} + // --------------------------------------------------------------------------- // Main pre-flight check runner // --------------------------------------------------------------------------- -export async function runPreflightChecks(repo: string, blueprintConfig: BlueprintConfig, prNumber?: number): Promise { +export async function runPreflightChecks( + repo: string, + blueprintConfig: BlueprintConfig, + prNumber?: number, + taskType: TaskType = 'new_task', +): Promise { const checks: PreflightCheckResult[] = []; if (blueprintConfig.github_token_secret_arn) { @@ -228,7 +354,7 @@ export async function runPreflightChecks(repo: string, blueprintConfig: Blueprin // eslint-disable-next-line @cdklabs/promiseall-no-unbounded-parallelism const results = await Promise.allSettled([ checkGitHubReachability(token), - checkRepoAccess(repo, token), + checkRepoAccess(repo, token, taskType), ...(prNumber !== undefined ? [checkPrAccessible(repo, prNumber, token)] : []), ]); @@ -263,9 +389,7 @@ export async function runPreflightChecks(repo: string, blueprintConfig: Blueprin return { passed: true, checks }; } - // Prioritize GITHUB_UNREACHABLE over REPO_NOT_FOUND_OR_NO_ACCESS - const primaryFailure = failedChecks.find(c => c.reason === PreflightFailureReason.GITHUB_UNREACHABLE) - ?? failedChecks[0]; + const primaryFailure = pickPrimaryPreflightFailure(failedChecks); return { passed: false, diff --git a/cdk/src/handlers/shared/types.ts b/cdk/src/handlers/shared/types.ts index be5a7e0..c54dda9 100644 --- a/cdk/src/handlers/shared/types.ts +++ b/cdk/src/handlers/shared/types.ts @@ -41,6 +41,10 @@ export interface TaskRecord { readonly task_description?: string; readonly branch_name: string; readonly session_id?: string; + /** AgentCore runtime ARN used for this session (StopRuntimeSession on cancel). */ + readonly agent_runtime_arn?: string; + /** ISO timestamp of last agent heartbeat (DynamoDB); optional, written by the runtime. */ + readonly agent_heartbeat_at?: string; readonly execution_id?: string; readonly pr_url?: string; readonly error_message?: string; diff --git a/cdk/src/stacks/agent.ts b/cdk/src/stacks/agent.ts index afb81f5..d7fa064 100644 --- a/cdk/src/stacks/agent.ts +++ b/cdk/src/stacks/agent.ts @@ -305,6 +305,7 @@ export class AgentStack extends Stack { orchestratorFunctionArn: orchestrator.alias.functionArn, guardrailId: inputGuardrail.guardrailId, guardrailVersion: inputGuardrail.guardrailVersion, + agentCoreStopSessionRuntimeArns: [runtime.agentRuntimeArn], }); // --- Operator dashboard --- diff --git a/cdk/test/handlers/cancel-task.test.ts b/cdk/test/handlers/cancel-task.test.ts index acba264..ca83729 100644 --- a/cdk/test/handlers/cancel-task.test.ts +++ b/cdk/test/handlers/cancel-task.test.ts @@ -21,6 +21,11 @@ import type { APIGatewayProxyEvent } from 'aws-lambda'; // --- Mocks --- const mockSend = jest.fn(); +const mockAgentCoreSend = jest.fn(); +jest.mock('@aws-sdk/client-bedrock-agentcore', () => ({ + BedrockAgentCoreClient: jest.fn(() => ({ send: mockAgentCoreSend })), + StopRuntimeSessionCommand: jest.fn((input: unknown) => ({ _type: 'StopRuntimeSession', input })), +})); jest.mock('@aws-sdk/client-dynamodb', () => ({ DynamoDBClient: jest.fn(() => ({})) })); jest.mock('@aws-sdk/lib-dynamodb', () => ({ DynamoDBDocumentClient: { from: jest.fn(() => ({ send: mockSend })) }, @@ -34,6 +39,7 @@ jest.mock('ulid', () => ({ ulid: jest.fn(() => 'REQ-ULID') })); process.env.TASK_TABLE_NAME = 'Tasks'; process.env.TASK_EVENTS_TABLE_NAME = 'TaskEvents'; process.env.TASK_RETENTION_DAYS = '90'; +process.env.RUNTIME_ARN = 'arn:aws:bedrock-agentcore:us-east-1:123456789012:runtime/default'; import { handler } from '../../src/handlers/cancel-task'; @@ -47,6 +53,8 @@ const RUNNING_TASK = { status_created_at: 'RUNNING#2025-03-15T10:30:00Z', created_at: '2025-03-15T10:30:00Z', updated_at: '2025-03-15T10:31:00Z', + session_id: '550e8400-e29b-41d4-a716-446655440000', + agent_runtime_arn: 'arn:aws:bedrock-agentcore:us-east-1:123456789012:runtime/task-runtime', }; function makeEvent(overrides: Partial = {}): APIGatewayProxyEvent { @@ -98,6 +106,7 @@ function makeEvent(overrides: Partial = {}): APIGatewayPro beforeEach(() => { jest.clearAllMocks(); + mockAgentCoreSend.mockResolvedValue({}); // Default: GetCommand returns running task, UpdateCommand + PutCommand succeed mockSend .mockResolvedValueOnce({ Item: RUNNING_TASK }) // GetCommand @@ -110,6 +119,10 @@ describe('cancel-task handler', () => { const result = await handler(makeEvent()); expect(result.statusCode).toBe(200); + expect(mockAgentCoreSend).toHaveBeenCalledTimes(1); + const cmd = mockAgentCoreSend.mock.calls[0][0]; + expect(cmd.input.runtimeSessionId).toBe(RUNNING_TASK.session_id); + expect(cmd.input.agentRuntimeArn).toBe(RUNNING_TASK.agent_runtime_arn); const body = JSON.parse(result.body); expect(body.data.task_id).toBe('task-1'); expect(body.data.status).toBe('CANCELLED'); @@ -216,5 +229,49 @@ describe('cancel-task handler', () => { const result = await handler(makeEvent()); expect(result.statusCode).toBe(200); + expect(mockAgentCoreSend).not.toHaveBeenCalled(); + }); + + test('does not call StopRuntimeSession when RUNNING but session_id is missing', async () => { + mockSend.mockReset(); + const noSession = { ...RUNNING_TASK }; + delete (noSession as { session_id?: string }).session_id; + mockSend + .mockResolvedValueOnce({ Item: noSession }) + .mockResolvedValueOnce({}) + .mockResolvedValueOnce({}); + + const result = await handler(makeEvent()); + expect(result.statusCode).toBe(200); + expect(mockAgentCoreSend).not.toHaveBeenCalled(); + }); + + test('uses RUNTIME_ARN when agent_runtime_arn is not on the task record', async () => { + mockSend.mockReset(); + const withoutArn = { ...RUNNING_TASK }; + delete (withoutArn as { agent_runtime_arn?: string }).agent_runtime_arn; + mockSend + .mockResolvedValueOnce({ Item: withoutArn }) + .mockResolvedValueOnce({}) + .mockResolvedValueOnce({}); + + const result = await handler(makeEvent()); + expect(result.statusCode).toBe(200); + expect(mockAgentCoreSend).toHaveBeenCalledTimes(1); + const cmd = mockAgentCoreSend.mock.calls[0][0]; + expect(cmd.input.agentRuntimeArn).toBe(process.env.RUNTIME_ARN); + }); + + test('returns 200 when StopRuntimeSession fails', async () => { + mockSend.mockReset(); + mockAgentCoreSend.mockRejectedValueOnce(new Error('Throttling')); + mockSend + .mockResolvedValueOnce({ Item: RUNNING_TASK }) + .mockResolvedValueOnce({}) + .mockResolvedValueOnce({}); + + const result = await handler(makeEvent()); + expect(result.statusCode).toBe(200); + expect(mockAgentCoreSend).toHaveBeenCalled(); }); }); diff --git a/cdk/test/handlers/orchestrate-task.test.ts b/cdk/test/handlers/orchestrate-task.test.ts index a495732..abaafa8 100644 --- a/cdk/test/handlers/orchestrate-task.test.ts +++ b/cdk/test/handlers/orchestrate-task.test.ts @@ -31,6 +31,7 @@ const mockAgentCoreSend = jest.fn(); jest.mock('@aws-sdk/client-bedrock-agentcore', () => ({ BedrockAgentCoreClient: jest.fn(() => ({ send: mockAgentCoreSend })), InvokeAgentRuntimeCommand: jest.fn((input: unknown) => ({ _type: 'InvokeAgentRuntime', input })), + StopRuntimeSessionCommand: jest.fn((input: unknown) => ({ _type: 'StopRuntimeSession', input })), })); const mockHydrateContext = jest.fn(); @@ -244,6 +245,13 @@ describe('startSession', () => { // Session ID is a UUID v4 (36 chars), not a ULID expect(sessionId).toMatch(/^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/); expect(mockAgentCoreSend).toHaveBeenCalledTimes(1); + const transitionCall = mockDdbSend.mock.calls.find( + (c: any[]) => c[0]._type === 'Update' && c[0].input.ExpressionAttributeValues?.[':toStatus'] === 'RUNNING', + ); + expect(transitionCall).toBeDefined(); + expect(transitionCall![0].input.ExpressionAttributeValues[':attr_agent_runtime_arn']).toBe( + 'arn:aws:bedrock-agentcore:us-east-1:123456789012:runtime/test', + ); }); }); @@ -253,6 +261,7 @@ describe('pollTaskStatus', () => { const result = await pollTaskStatus('TASK001', { attempts: 5 }); expect(result.attempts).toBe(6); expect(result.lastStatus).toBe('RUNNING'); + expect(result.sessionUnhealthy).toBe(false); }); test('handles missing item gracefully', async () => { @@ -261,6 +270,59 @@ describe('pollTaskStatus', () => { expect(result.attempts).toBe(1); expect(result.lastStatus).toBeUndefined(); }); + + test('sets sessionUnhealthy when agent heartbeat is stale (RUNNING)', async () => { + const old = new Date(Date.now() - 400_000).toISOString(); + mockDdbSend.mockResolvedValueOnce({ + Item: { + status: 'RUNNING', + session_id: '550e8400-e29b-41d4-a716-446655440000', + started_at: old, + agent_heartbeat_at: old, + }, + }); + const result = await pollTaskStatus('TASK001', { attempts: 1 }); + expect(result.sessionUnhealthy).toBe(true); + }); + + test('does not set sessionUnhealthy when heartbeat is fresh', async () => { + const started = new Date(Date.now() - 200_000).toISOString(); + const hb = new Date(Date.now() - 30_000).toISOString(); + mockDdbSend.mockResolvedValueOnce({ + Item: { + status: 'RUNNING', + session_id: '550e8400-e29b-41d4-a716-446655440000', + started_at: started, + agent_heartbeat_at: hb, + }, + }); + const result = await pollTaskStatus('TASK001', { attempts: 1 }); + expect(result.sessionUnhealthy).toBe(false); + }); + + test('does not set sessionUnhealthy when agent_heartbeat_at is absent but within grace period', async () => { + mockDdbSend.mockResolvedValueOnce({ + Item: { + status: 'RUNNING', + session_id: '550e8400-e29b-41d4-a716-446655440000', + started_at: new Date(Date.now() - 60_000).toISOString(), + }, + }); + const result = await pollTaskStatus('TASK001', { attempts: 1 }); + expect(result.sessionUnhealthy).toBe(false); + }); + + test('sets sessionUnhealthy when agent_heartbeat_at is absent and past grace + stale window', async () => { + mockDdbSend.mockResolvedValueOnce({ + Item: { + status: 'RUNNING', + session_id: '550e8400-e29b-41d4-a716-446655440000', + started_at: new Date(Date.now() - 400_000).toISOString(), + }, + }); + const result = await pollTaskStatus('TASK001', { attempts: 1 }); + expect(result.sessionUnhealthy).toBe(true); + }); }); describe('loadBlueprintConfig', () => { @@ -510,6 +572,23 @@ describe('finalizeTask', () => { expect(mockDdbSend).toHaveBeenCalled(); }); + test('transitions RUNNING to FAILED when pollState.sessionUnhealthy', async () => { + mockDdbSend + .mockResolvedValueOnce({ Item: { ...baseTask, status: 'RUNNING' } }) // loadTask + .mockResolvedValue({}); // transitionTask + emitTaskEvent + decrementConcurrency + await finalizeTask( + 'TASK001', + { attempts: 12, lastStatus: 'RUNNING', sessionUnhealthy: true }, + 'user-123', + ); + const transitionCall = mockDdbSend.mock.calls[1][0]; + expect(transitionCall.input.ExpressionAttributeValues[':toStatus']).toBe('FAILED'); + expect(transitionCall.input.ExpressionAttributeValues[':fromStatus']).toBe('RUNNING'); + const eventCall = mockDdbSend.mock.calls[2][0]; + expect(eventCall.input.Item.event_type).toBe('task_failed'); + expect(eventCall.input.Item.metadata.reason).toBe('agent_heartbeat_stale'); + }); + test('transitions RUNNING to TIMED_OUT on poll timeout', async () => { mockDdbSend .mockResolvedValueOnce({ Item: { ...baseTask, status: 'RUNNING' } }) // loadTask diff --git a/cdk/test/handlers/shared/preflight.test.ts b/cdk/test/handlers/shared/preflight.test.ts index 194909f..780e047 100644 --- a/cdk/test/handlers/shared/preflight.test.ts +++ b/cdk/test/handlers/shared/preflight.test.ts @@ -45,6 +45,15 @@ const baseBlueprintConfig: BlueprintConfig = { github_token_secret_arn: 'arn:aws:secretsmanager:us-east-1:123456789012:secret:github-token', }; +/** Successful GET /repos/... with a `permissions` object (preflight parses JSON). */ +function githubRepoOk(permissions: { push?: boolean; pull?: boolean } = { push: true }) { + return { + ok: true, + status: 200, + json: async () => ({ permissions }), + }; +} + beforeEach(() => { jest.clearAllMocks(); clearTokenCache(); @@ -60,7 +69,7 @@ describe('runPreflightChecks', () => { mockSmSend.mockResolvedValueOnce({ SecretString: 'ghp_test123' }); mockFetch .mockResolvedValueOnce({ ok: true, status: 200 }) - .mockResolvedValueOnce({ ok: true, status: 200 }); + .mockResolvedValueOnce(githubRepoOk()); const result = await runPreflightChecks('owner/repo', baseBlueprintConfig); @@ -106,7 +115,7 @@ describe('runPreflightChecks', () => { mockSmSend.mockResolvedValueOnce({ SecretString: 'ghp_test123' }); mockFetch .mockResolvedValueOnce({ ok: false, status: 500 }) - .mockResolvedValueOnce({ ok: true, status: 200 }); + .mockResolvedValueOnce(githubRepoOk()); const result = await runPreflightChecks('owner/repo', baseBlueprintConfig); @@ -119,7 +128,7 @@ describe('runPreflightChecks', () => { mockSmSend.mockResolvedValueOnce({ SecretString: 'ghp_revoked' }); mockFetch .mockResolvedValueOnce({ ok: false, status: 401 }) - .mockResolvedValueOnce({ ok: true, status: 200 }); + .mockResolvedValueOnce(githubRepoOk()); const result = await runPreflightChecks('owner/repo', baseBlueprintConfig); @@ -132,7 +141,7 @@ describe('runPreflightChecks', () => { mockSmSend.mockResolvedValueOnce({ SecretString: 'ghp_test123' }); mockFetch .mockRejectedValueOnce(new Error('The operation was aborted')) - .mockResolvedValueOnce({ ok: true, status: 200 }); + .mockResolvedValueOnce(githubRepoOk()); const result = await runPreflightChecks('owner/repo', baseBlueprintConfig); @@ -145,7 +154,7 @@ describe('runPreflightChecks', () => { mockSmSend.mockResolvedValueOnce({ SecretString: 'ghp_test123' }); mockFetch .mockRejectedValueOnce(new Error('getaddrinfo ENOTFOUND api.github.com')) - .mockResolvedValueOnce({ ok: true, status: 200 }); + .mockResolvedValueOnce(githubRepoOk()); const result = await runPreflightChecks('owner/repo', baseBlueprintConfig); @@ -224,7 +233,7 @@ describe('runPreflightChecks', () => { mockSmSend.mockResolvedValueOnce({ SecretString: 'ghp_test123' }); mockFetch .mockResolvedValueOnce({ ok: true, status: 200 }) - .mockResolvedValueOnce({ ok: true, status: 200 }); + .mockResolvedValueOnce(githubRepoOk()); const result = await runPreflightChecks('owner/repo', baseBlueprintConfig); @@ -238,7 +247,7 @@ describe('runPreflightChecks', () => { mockSmSend.mockResolvedValueOnce({ SecretString: 'ghp_test123' }); mockFetch .mockResolvedValueOnce({ ok: true, status: 200 }) - .mockResolvedValueOnce({ ok: true, status: 200 }); + .mockResolvedValueOnce(githubRepoOk()); const result = await runPreflightChecks('owner/repo', baseBlueprintConfig); @@ -294,7 +303,7 @@ describe('runPreflightChecks', () => { mockSmSend.mockResolvedValueOnce({ SecretString: 'ghp_test123' }); mockFetch .mockResolvedValueOnce({ ok: true, status: 200 }) // reachability - .mockResolvedValueOnce({ ok: true, status: 200 }) // repo access + .mockResolvedValueOnce(githubRepoOk()) // repo access .mockResolvedValueOnce({ ok: true, status: 200, json: async () => ({ state: 'open' }) }); // PR const result = await runPreflightChecks('owner/repo', baseBlueprintConfig, 42); @@ -309,7 +318,7 @@ describe('runPreflightChecks', () => { mockSmSend.mockResolvedValueOnce({ SecretString: 'ghp_test123' }); mockFetch .mockResolvedValueOnce({ ok: true, status: 200 }) - .mockResolvedValueOnce({ ok: true, status: 200 }) + .mockResolvedValueOnce(githubRepoOk()) .mockResolvedValueOnce({ ok: false, status: 404 }); const result = await runPreflightChecks('owner/repo', baseBlueprintConfig, 42); @@ -322,7 +331,7 @@ describe('runPreflightChecks', () => { mockSmSend.mockResolvedValueOnce({ SecretString: 'ghp_test123' }); mockFetch .mockResolvedValueOnce({ ok: true, status: 200 }) - .mockResolvedValueOnce({ ok: true, status: 200 }) + .mockResolvedValueOnce(githubRepoOk()) .mockResolvedValueOnce({ ok: true, status: 200, json: async () => ({ state: 'closed' }) }); const result = await runPreflightChecks('owner/repo', baseBlueprintConfig, 42); @@ -340,7 +349,7 @@ describe('runPreflightChecks', () => { mockSmSend.mockResolvedValueOnce({ SecretString: 'ghp_per_repo' }); mockFetch .mockResolvedValueOnce({ ok: true, status: 200 }) - .mockResolvedValueOnce({ ok: true, status: 200 }); + .mockResolvedValueOnce(githubRepoOk()); const result = await runPreflightChecks('owner/repo', config); @@ -348,4 +357,74 @@ describe('runPreflightChecks', () => { const smCall = mockSmSend.mock.calls[0][0]; expect(smCall.input.SecretId).toBe(perRepoArn); }); + + test('fails INSUFFICIENT_GITHUB_REPO_PERMISSIONS when token is read-only (GraphQL viewer READ)', async () => { + mockSmSend.mockResolvedValueOnce({ SecretString: 'ghp_readonly' }); + mockFetch + .mockResolvedValueOnce({ ok: true, status: 200 }) + .mockResolvedValueOnce(githubRepoOk({ push: false, pull: true })) + .mockResolvedValueOnce({ + ok: true, + status: 200, + json: async () => ({ data: { repository: { viewerPermission: 'READ' } } }), + }); + + const result = await runPreflightChecks('owner/repo', baseBlueprintConfig); + + expect(result.passed).toBe(false); + expect(result.failureReason).toBe(PreflightFailureReason.INSUFFICIENT_GITHUB_REPO_PERMISSIONS); + expect(result.failureDetail).toMatch(/push|Contents write/i); + }); + + test('passes new_task when REST omits push but GraphQL reports WRITE', async () => { + mockSmSend.mockResolvedValueOnce({ SecretString: 'ghp_fine_grained' }); + mockFetch + .mockResolvedValueOnce({ ok: true, status: 200 }) + .mockResolvedValueOnce(githubRepoOk({})) + .mockResolvedValueOnce({ + ok: true, + status: 200, + json: async () => ({ data: { repository: { viewerPermission: 'WRITE' } } }), + }); + + const result = await runPreflightChecks('owner/repo', baseBlueprintConfig); + + expect(result.passed).toBe(true); + expect(result.checks.filter(c => c.check === 'repo_access' && c.passed)).toHaveLength(1); + }); + + test('passes pr_review when token has TRIAGE but not push (GraphQL)', async () => { + mockSmSend.mockResolvedValueOnce({ SecretString: 'ghp_triage' }); + mockFetch + .mockResolvedValueOnce({ ok: true, status: 200 }) + .mockResolvedValueOnce(githubRepoOk({ push: false, pull: true })) + .mockResolvedValueOnce({ ok: true, status: 200, json: async () => ({ state: 'open' }) }) + .mockResolvedValueOnce({ + ok: true, + status: 200, + json: async () => ({ data: { repository: { viewerPermission: 'TRIAGE' } } }), + }); + + const result = await runPreflightChecks('owner/repo', baseBlueprintConfig, 7, 'pr_review'); + + expect(result.passed).toBe(true); + }); + + test('fails pr_review when viewerPermission is READ-only', async () => { + mockSmSend.mockResolvedValueOnce({ SecretString: 'ghp_readonly' }); + mockFetch + .mockResolvedValueOnce({ ok: true, status: 200 }) + .mockResolvedValueOnce(githubRepoOk({ push: false, pull: true })) + .mockResolvedValueOnce({ ok: true, status: 200, json: async () => ({ state: 'open' }) }) + .mockResolvedValueOnce({ + ok: true, + status: 200, + json: async () => ({ data: { repository: { viewerPermission: 'READ' } } }), + }); + + const result = await runPreflightChecks('owner/repo', baseBlueprintConfig, 7, 'pr_review'); + + expect(result.passed).toBe(false); + expect(result.failureReason).toBe(PreflightFailureReason.INSUFFICIENT_GITHUB_REPO_PERMISSIONS); + }); }); diff --git a/docs/design/ARCHITECTURE.md b/docs/design/ARCHITECTURE.md index bb763fe..662356a 100644 --- a/docs/design/ARCHITECTURE.md +++ b/docs/design/ARCHITECTURE.md @@ -167,7 +167,7 @@ The following risks were identified via external review (March 2026) and are tra |---|------|----------|-----------|-------------------| | 1 | **Agent vs. orchestrator DynamoDB race** — `agent/task_state.py` writes terminal status without conditional expressions, so it can overwrite orchestrator-managed CANCELLED with COMPLETED. The orchestrator's `transitionTask()` uses `ConditionExpression` but the agent side does not. | High | `agent/task_state.py` | Resolved (Iteration 3bis) — `ConditionExpression` guards added to `write_running()` (requires status IN SUBMITTED, HYDRATING) and `write_terminal()` (requires status IN RUNNING, HYDRATING, FINALIZING). `ConditionalCheckFailedException` is caught and logged as a skip. | | 2 | **No DLQ on orchestrator async invocation** — The orchestrator Lambda is invoked with `InvocationType: 'Event'` but has no dead-letter queue. Failed or throttled invocations leave tasks stuck in SUBMITTED. | High | `src/constructs/task-orchestrator.ts` | Resolved (Iteration 3bis) — SQS DLQ deliberately skipped since durable execution (`withDurableExecution`, 14-day retention) manages its own retries; a DLQ would conflict. Added `retryAttempts: 0` on alias async invoke config to prevent Lambda-level duplicate invocations. CloudWatch alarm on `fn.metricErrors()` (threshold: 3, 2 periods of 5min) provides alerting. | -| 3 | **Concurrency counter drift** — If the orchestrator crashes between concurrency increment and decrement, the user's counter is permanently inflated. The `UserConcurrencyTable` JSDoc acknowledges this but no reconciliation process exists. | Medium | `src/constructs/user-concurrency-table.ts` | Resolved (Iteration 3bis) — `ConcurrencyReconciler` construct with scheduled Lambda (EventBridge rate 15min). Scans concurrency table, queries task table's `UserStatusIndex` GSI per user, compares actual count with stored `active_count`, and corrects drift. TOCTOU-safe via `ConditionExpression` on update. | +| 3 | **Concurrency counter drift** — If the orchestrator crashes between concurrency increment and decrement, the user's counter is permanently inflated. The `UserConcurrencyTable` JSDoc acknowledges this but no reconciliation process exists. | Medium | `src/constructs/user-concurrency-table.ts` | Resolved (Iteration 3bis) — `ConcurrencyReconciler` construct with scheduled Lambda (EventBridge rate 15min). Scans concurrency table, queries task table's `UserStatusIndex` GSI per user, compares actual count with stored `active_count`, and corrects drift. TOCTOU-safe via `ConditionExpression` on update. Additionally, the `finalizeTask` heartbeat-detected crash path guards against double-decrement by only releasing concurrency after a successful `transitionTask`, and re-reading the task state on failure. | | 4 | **Single NAT Gateway** — `natGateways: 1` means a single AZ failure blocks all agent internet egress. Acceptable for development; needs multi-AZ NAT for production. | Medium | `src/constructs/agent-vpc.ts` | Mitigated (Iteration 3bis) — already configurable via `AgentVpcProps.natGateways` (default: 1). Deployers can set `natGateways: 2` or higher for multi-AZ redundancy. No code changes needed. | | 5 | **Dual-language prompt assembly** — Both TypeScript (`context-hydration.ts:assembleUserPrompt`) and Python (`entrypoint.py:assemble_prompt`) implement the same logic. Changes to one must be manually replicated in the other. | Medium | `src/handlers/shared/context-hydration.ts`, `agent/entrypoint.py` | Mitigated (Iteration 3bis) — production path uses orchestrator's `assembleUserPrompt()` exclusively; the Python `assemble_prompt()` has a deprecation docstring and is retained only for local batch mode and dry-run mode. Risk of divergence reduced but not eliminated. | diff --git a/docs/design/MEMORY.md b/docs/design/MEMORY.md index cf6181f..982c30e 100644 --- a/docs/design/MEMORY.md +++ b/docs/design/MEMORY.md @@ -51,7 +51,7 @@ Task end (orchestrator fallback): - **Token budget** — Memory context is capped at 2,000 tokens (~8,000 characters) to avoid consuming too much system prompt space. Oldest entries are dropped first. - **Per-repo namespace via template variables** — Namespace isolation is configured on the extraction strategies using `{actorId}` and `{sessionId}` template variables. Events are written with `actorId = "owner/repo"` and `sessionId = taskId`. The extraction pipeline places records at `/{repo}/knowledge/` (semantic) and `/{repo}/episodes/{taskId}/` (episodic). Reads use these paths as namespace prefixes. This is a breaking infrastructure change from the initial implementation — the Memory resource must be recreated on deploy. - **Prompt version excludes memory** — The SHA-256 hash is computed from deterministic prompt parts only. Memory context varies per run, so including it would make every prompt version unique and defeat the purpose of tracking prompt changes. -- **Orchestrator fallback** — If the agent container crashes, times out, or OOMs without writing memory, the orchestrator writes a minimal episode so the episodic record is not lost. The fallback is itself fail-open (wrapped in try-catch) to never block `finalizeTask`. The return value is logged to surface silent failures (Iteration 3bis hardening). +- **Orchestrator fallback** — If the agent container crashes, times out, or OOMs without writing memory, the orchestrator writes a minimal episode so the episodic record is not lost. This includes cases where the heartbeat-based crash detection triggers early finalization (agent died before writing any memory). The fallback is itself fail-open (wrapped in try-catch) to never block `finalizeTask`. The return value is logged to surface silent failures (Iteration 3bis hardening). ### Test coverage diff --git a/docs/design/ORCHESTRATOR.md b/docs/design/ORCHESTRATOR.md index 4c6aec7..5279270 100644 --- a/docs/design/ORCHESTRATOR.md +++ b/docs/design/ORCHESTRATOR.md @@ -179,7 +179,7 @@ See the Admission control section for details. Validates that the task is allowe #### Step 2: Context hydration (deterministic) -See the Context hydration section for details. Assembles the agent's prompt from multiple sources depending on task type. For `new_task`: user message, GitHub issue (title, body, comments), memory, repo configuration, and platform defaults. For `pr_iteration`: PR metadata, review comments, diff summary, and optional user instructions. An additional **pre-flight** sub-step verifies PR accessibility when `pr_number` is set (see [preflight.ts](../../cdk/src/handlers/shared/preflight.ts)). The assembled prompt is screened through Amazon Bedrock Guardrails for prompt injection before the agent receives it (PR tasks: always screened; `new_task`: screened when issue content is present). The output is a fully assembled prompt, ready to pass to the compute session. +See the Context hydration section for details. Assembles the agent's prompt from multiple sources depending on task type. For `new_task`: user message, GitHub issue (title, body, comments), memory, repo configuration, and platform defaults. For `pr_iteration`: PR metadata, review comments, diff summary, and optional user instructions. An additional **pre-flight** sub-step (see [preflight.ts](../../cdk/src/handlers/shared/preflight.ts)) verifies PR accessibility when `pr_number` is set and validates that the resolved GitHub token has sufficient repository permissions for the task type (so read-only PATs fail early with `INSUFFICIENT_GITHUB_REPO_PERMISSIONS`). The assembled prompt is screened through Amazon Bedrock Guardrails for prompt injection before the agent receives it (PR tasks: always screened; `new_task`: screened when issue content is present). The output is a fully assembled prompt, ready to pass to the compute session. #### Step 3: Session start and agent execution (deterministic start + agentic execution) @@ -224,7 +224,7 @@ When the orchestrator loads a task's `blueprint_config`, it resolves the step pi 1. **Load `RepoConfig`** from the `RepoTable` by `repo` (PK). Merge with platform defaults (see [REPO_ONBOARDING.md](./REPO_ONBOARDING.md#platform-defaults) for default values and override precedence). 2. **Resolve compute strategy** from `compute_type` (default: `agentcore`). The strategy implements the `ComputeStrategy` interface (see [REPO_ONBOARDING.md](./REPO_ONBOARDING.md#compute-strategy-interface)). -3. **Build step list.** If `step_sequence` is provided, use it; otherwise use the default sequence (`admission-control` → `hydrate-context` → `pre-flight` → `start-session` → `await-agent-completion` → `finalize`). The `pre-flight` step runs fail-closed readiness checks (GitHub API reachability, repo access, PR accessibility for PR tasks) before consuming compute — see [ROADMAP.md Iteration 3c](../guides/ROADMAP.md). For each entry, resolve to a built-in step function or a Lambda invocation wrapper. +3. **Build step list.** If `step_sequence` is provided, use it; otherwise use the default sequence (`admission-control` → `hydrate-context` → `pre-flight` → `start-session` → `await-agent-completion` → `finalize`). The `pre-flight` step runs fail-closed readiness checks (GitHub API reachability, repository access, **PAT privilege** for the task type via REST `permissions` and GraphQL `viewerPermission` when needed, PR accessibility for PR tasks) before consuming compute — see [ROADMAP.md Iteration 3c](../guides/ROADMAP.md). For each entry, resolve to a built-in step function or a Lambda invocation wrapper. 4. **Inject custom steps.** If `custom_steps` are defined and no explicit `step_sequence` is provided, insert them at their declared `phase` position (pre-agent steps before `start-session`, post-agent steps after `await-agent-completion`). 5. **Validate.** Check that required steps are present and correctly ordered (see [step sequence validation](./REPO_ONBOARDING.md#step-sequence-validation)). If invalid, fail the task with `INVALID_STEP_SEQUENCE`. 6. **Execute.** Iterate the resolved list. For each step: check cancellation, filter `blueprintConfig` to only the fields that step needs (stripping credential ARNs for custom Lambda steps), execute with retry policy, enforce `StepOutput.metadata` size budget (10KB), prune `previousStepResults` to last 5 steps, emit events. Built-in steps that need durable waits (e.g. `await-agent-completion`) receive the `DurableContext` and `ComputeStrategy` so they can call `waitForCondition` and `computeStrategy.pollSession()` internally — no name-based special-casing in the framework loop. @@ -424,7 +424,13 @@ The orchestrator needs to know whether the session is still running. Two complem **Iteration 1 (historical).** The `invoke_agent_runtime` call blocked; when it returned, the session was over. No explicit liveness check was needed. -**Fallback: DynamoDB heartbeat (optional enhancement).** As defense in depth, the agent can write a heartbeat timestamp to DynamoDB every N minutes. The orchestrator reads it during its poll cycle. A missing heartbeat (e.g. none in the last 10 minutes while `/ping` reports `HealthyBusy`) could indicate the agent is stuck but not idle — triggering investigation or forced termination. +**DynamoDB heartbeat (implemented).** The agent writes an `agent_heartbeat_at` timestamp to DynamoDB every 45 seconds via a daemon thread in `server.py`. The heartbeat worker is resilient to transient DynamoDB errors (each write is wrapped in try/except with a retry on the next interval). The orchestrator's `pollTaskStatus` reads this timestamp during each poll cycle and applies two thresholds: + +- **Grace period** (`AGENT_HEARTBEAT_GRACE_SEC = 120s`): After transitioning to RUNNING, the orchestrator waits this long before expecting heartbeats. This covers container startup and pipeline initialization. +- **Stale threshold** (`AGENT_HEARTBEAT_STALE_SEC = 240s`): If `agent_heartbeat_at` exists and is older than this, the session is treated as lost (crash, OOM, or stuck). +- **Early crash detection**: If `agent_heartbeat_at` is never set and the task has been RUNNING past the combined grace + stale window (360s), the orchestrator treats this as an early crash (agent died before the pipeline started). + +When either condition is met, `pollTaskStatus` sets `sessionUnhealthy = true` in the poll state. The `finalizeTask` function then transitions the task to FAILED with the reason `"Agent session lost: no recent heartbeat from the runtime"`. The pipeline also writes an initial heartbeat at the very start of `run_task()` to minimize the window between session start and first heartbeat. ### The 15-minute idle timeout problem @@ -441,7 +447,7 @@ AgentCore Runtime terminates sessions after 15 minutes of inactivity (no `/ping` When the session ends (agent finishes, crashes, or is terminated), the orchestrator detects this: - **Iteration 1 (historical):** The `invoke_agent_runtime` call returned (it blocked). The response body contained the agent's output (status, PR URL, cost, etc.). -- **Target state:** The orchestrator polls the agent via re-invocation on the same session (see Invocation model above). Completion is detected when: (a) the agent responds with a "completed" or "failed" status in the poll response, or (b) the re-invocation fails because the session was terminated (idle timeout, crash, or 8-hour limit reached). In the durable orchestrator, a `waitForCondition` evaluates the poll result at each interval and resumes the pipeline when the condition is met. See the session monitoring pattern in the Implementation options section. +- **Target state:** The orchestrator polls the agent via re-invocation on the same session (see Invocation model above). Completion is detected when: (a) the agent responds with a "completed" or "failed" status in the poll response, (b) the re-invocation fails because the session was terminated (idle timeout, crash, or 8-hour limit reached), or (c) the DynamoDB heartbeat check detects the session is unhealthy (stale or missing `agent_heartbeat_at` — see DynamoDB heartbeat above). In the durable orchestrator, a `waitForCondition` evaluates the poll result at each interval and resumes the pipeline when the condition is met. See the session monitoring pattern in the Implementation options section. ### External termination (cancellation) @@ -539,7 +545,8 @@ This section uses an FMEA (Failure Mode and Effects Analysis) approach: for each | Failure mode | Impact | Recovery | |---|---|---| -| Agent crashes mid-task (unhandled exception) | Partial branch may exist on GitHub | Orchestrator detects session end. Finalization inspects GitHub state. If commits exist, may mark as partial completion. Task transitions to `FAILED` or `COMPLETED` with partial flag. | +| Agent crashes mid-task (unhandled exception) | Partial branch may exist on GitHub | Orchestrator detects session end via DynamoDB heartbeat staleness check (see Liveness monitoring). Finalization inspects GitHub state. If commits exist, may mark as partial completion. Task transitions to `FAILED` or `COMPLETED` with partial flag. | +| Agent crashes before pipeline starts (early crash: OOM during startup, import error, container failure) | `agent_heartbeat_at` is never set in DynamoDB | `pollTaskStatus` detects missing heartbeat after the combined grace + stale window (360s). Task transitions to `FAILED` with reason "Agent session lost". | | Agent runs out of turns (max_turns limit) | Agent stopped by SDK, not by crash | Session ends normally with status `end_turn`. Orchestrator finalizes; if PR exists, task is `COMPLETED`. | | Agent exceeds cost budget (max_budget_usd limit) | Agent stopped by SDK when budget is reached | Session ends normally. Orchestrator finalizes; if PR exists, task is `COMPLETED`. | | Agent is idle for 15 min (AgentCore kills session) | Work in progress may be lost if not committed | Task transitions to `TIMED_OUT`. Partial work may be on the branch if the agent committed before going idle. | @@ -558,7 +565,7 @@ This section uses an FMEA (Failure Mode and Effects Analysis) approach: for each |---|---|---| | Orchestrator crashes during `HYDRATING` | Task stuck in `HYDRATING` | Durable execution (Lambda Durable Functions) automatically replays from the last checkpoint, skipping completed steps. Without durable orchestration, a recovery process detects stuck tasks (in `HYDRATING` for > N minutes) and restarts them. | | Orchestrator crashes during `RUNNING` | Task stuck in `RUNNING`, session may still be alive | Recovery process detects task is in `RUNNING` but orchestrator is not managing it. It resumes monitoring the session (using the stored session ID). When the session ends, it runs finalization. | -| Orchestrator crashes during `FINALIZING` | Task stuck in `FINALIZING` | Recovery process detects and restarts finalization. Finalization steps must be idempotent (decrementing a counter twice should be detected and handled). | +| Orchestrator crashes during `FINALIZING` | Task stuck in `FINALIZING` | Recovery process detects and restarts finalization. Finalization steps are idempotent. The heartbeat-detected crash finalization path avoids double-decrement by only emitting events and releasing concurrency after a successful `transitionTask`; if the transition fails (task already terminal), it re-reads the task and handles accordingly. | | DynamoDB unavailable during state transition | State not persisted | Retry with backoff. If the state transition cannot be persisted, the orchestrator must not proceed (risk of inconsistency). After retries are exhausted, alert operators. | ### Recovery mechanisms summary @@ -603,7 +610,7 @@ Concurrency is tracked using atomic counters: - **UserConcurrency.** A DynamoDB item per user: `{ user_id, active_count }`. Incremented atomically (conditional update: `active_count < max`) during admission. Decremented during finalization. - **SystemConcurrency.** A single DynamoDB item: `{ pk: "SYSTEM", active_count }`. Same pattern. -**Counter drift.** If the orchestrator crashes after starting a session but before persisting the session-to-task mapping, or after a session ends but before decrementing the counter, the counter drifts. Mitigation: +**Counter drift.** If the orchestrator crashes after starting a session but before persisting the session-to-task mapping, or after a session ends but before decrementing the counter, the counter drifts. The heartbeat-detected crash finalization path (`finalizeTask` sessionUnhealthy branch) guards against double-decrement: it only decrements after a successful state transition, and re-reads the task if the transition fails to determine the correct action. Mitigation: - Always persist the task state transition **before** taking the action (write-ahead pattern). For example, persist the task as `RUNNING` and record the session ID before calling `invoke_agent_runtime`. - Run a **reconciliation Lambda** every 5 minutes (EventBridge schedule): query the Tasks table for tasks in `RUNNING` + `HYDRATING` state per user (GSI on `user_id` + `status`), compare the count to `UserConcurrency.active_count`, and correct via `UpdateItem` if different. The Lambda emits a `counter_drift_corrected` CloudWatch metric (dimensions: `user_id`, `drift_amount`) when it corrects a value, and a `counter_reconciliation_run` metric on every execution for health monitoring. diff --git a/docs/design/SECURITY.md b/docs/design/SECURITY.md index 1655300..007f0ec 100644 --- a/docs/design/SECURITY.md +++ b/docs/design/SECURITY.md @@ -115,7 +115,7 @@ The platform enforces policies at multiple points in the task lifecycle. Today, | **Submission** | Guardrail input screening | `create-task-core.ts` (Bedrock Guardrails) | HTTP 400 response only — no event emitted | | **Submission** | Idempotency check | `create-task-core.ts` | HTTP 409 response only — no event emitted | | **Admission** | Concurrency limit | `orchestrator.ts` (`admissionControl`) | `admission_rejected` event emitted | -| **Pre-flight** | GitHub reachability, repo access, PR access | `preflight.ts` | `preflight_failed` event emitted | +| **Pre-flight** | GitHub reachability, repo access, PAT repo permissions (push / `viewerPermission` by task type), PR access | `preflight.ts` | `preflight_failed` event emitted | | **Hydration** | Guardrail prompt screening (PR + issue content) | `context-hydration.ts` | `guardrail_blocked` event emitted | | **Hydration** | Budget/quota resolution (3-tier max_turns, 2-tier max_budget_usd) | `orchestrator.ts` (`hydrateAndTransition`) | Values persisted on task record — no policy decision event | | **Hydration** | Token budget for prompt assembly | `context-hydration.ts` | No event emitted | diff --git a/docs/guides/DEVELOPER_GUIDE.md b/docs/guides/DEVELOPER_GUIDE.md index 38707c7..e2fd8d5 100644 --- a/docs/guides/DEVELOPER_GUIDE.md +++ b/docs/guides/DEVELOPER_GUIDE.md @@ -29,11 +29,13 @@ For a concise duplicate of this table, common pitfalls, and a CDK test file map, The CDK stack ships with a **sample onboarded repository** (`krokoko/agent-plugins` in `cdk/src/stacks/agent.ts`) so the project deploys and CDK tests run cleanly out of the box. That value is for **default wiring only**: a real agent run **pushes branches and opens pull requests** with your GitHub PAT, so the onboarded repo must be one your token can **clone, push to, and open PRs on**. Most people do **not** have that access to the upstream repo. -**Recommended first setup:** fork [`awslabs/agent-plugins`](https://github.com/awslabs/agent-plugins) on GitHub, set the `Blueprint` **`repo`** to **`your-github-username/agent-plugins`** (match your fork’s owner and repo name), and create a **fine-grained PAT** with access **only to that fork** (clone, push, PRs—see `agent/README.md` for scopes). Use that token for **`GITHUB_TOKEN`** when running `./agent/run.sh` locally and store the same value in Secrets Manager after deploy. For use on your own codebases, point the Blueprint at those repos instead and scope the PAT to match. +**Recommended first setup:** fork [`awslabs/agent-plugins`](https://github.com/awslabs/agent-plugins) on GitHub, set the `Blueprint` **`repo`** to **`your-github-username/agent-plugins`** (match your fork’s owner and repo name), and use a **fine-grained PAT** scoped to **that fork** with the permissions in step 2. Use the same token for **`GITHUB_TOKEN`** when running `./agent/run.sh` locally and store it in Secrets Manager (step 3) after deploy. -Register every repo you want tasks to target and align tools and permissions (steps below). +After deployment, the orchestrator **pre-flight** step calls the GitHub API to verify your token can access the task repository with enough privilege (`preflight.ts`). That catches common mistakes (for example a read-only PAT) **before** AgentCore work starts: the task fails with `INSUFFICIENT_GITHUB_REPO_PERMISSIONS` and a clear detail string instead of completing after a `git push` 403 buried in CloudWatch logs. -### 1. Register repositories with `Blueprint` (required) +### Required setup + +#### 1. Register repositories with `Blueprint` The Task API only accepts tasks for repositories that are **onboarded** — each one is a `Blueprint` construct in `cdk/src/stacks/agent.ts` that writes a `RepoConfig` row to DynamoDB. @@ -45,23 +47,58 @@ Optional per-repo overrides (same file / `Blueprint` props) include a different After changing Blueprints, redeploy: `cd cdk && npx cdk deploy` (or `MISE_EXPERIMENTAL=1 mise //cdk:deploy`). -### 2. GitHub personal access token +#### 2. GitHub personal access token (fine-grained) + +Create a **fine-grained PAT** at GitHub → **Settings** → **Developer settings** → **Personal access tokens** → **Fine-grained tokens**. + +**Repository access:** select only the repo(s) the agent will use (for the fork workflow, **only your fork**). + +| Permission | Access | Reason | +|------------|--------|--------| +| **Contents** | Read and write | `git clone` and `git push` | +| **Pull requests** | Read and write | `gh pr create` / update PRs | +| **Issues** | Read | Issue title, body, and comments for context | +| **Metadata** | Read | Granted by default | + +For **`new_task`** and **`pr_iteration`**, pre-flight requires **Contents write** (REST `permissions.push`, or GraphQL `viewerPermission` of `WRITE` / `MAINTAIN` / `ADMIN`). For **`pr_review`**, **Triage** or higher is sufficient when the workflow does not need to push branches. Classic PATs with equivalent **`repo`** scope still work; see `agent/README.md` for environment variables and edge cases. + +#### 3. Store the PAT in AWS Secrets Manager (after deploy) + +The stack creates a secret (output **`GitHubTokenSecretArn`**). After your first successful **`mise run //cdk:deploy`**, store the **same** PAT string you use locally: + +```bash +# Same Region you deployed to (example: us-east-1). Must be non-empty—see [Post-deployment setup](#post-deployment-setup) if `put-secret-value` fails with a double-dot endpoint. +REGION=us-east-1 + +SECRET_ARN=$(aws cloudformation describe-stacks \ + --stack-name backgroundagent-dev \ + --region "$REGION" \ + --query 'Stacks[0].Outputs[?OutputKey==`GitHubTokenSecretArn`].OutputValue | [0]' \ + --output text) + +aws secretsmanager put-secret-value \ + --region "$REGION" \ + --secret-id "$SECRET_ARN" \ + --secret-string "ghp_your_fine_grained_pat_here" +``` + +If you use a **per-repo** secret (`githubTokenSecretArn` on a Blueprint), put the PAT in that secret instead; the orchestrator reads whichever ARN is configured for the repo. -The agent clones, pushes, and opens pull requests using a **GitHub PAT** stored in Secrets Manager (see [Post-deployment setup](#post-deployment-setup)). The token must have permission to access **every** onboarded repository (clone, push to branches, create/update PRs). Use a fine-grained PAT scoped to those repos—for the fork workflow above, restrict the token to **your fork** only; see `agent/README.md` for required scopes. +### Optional customization -### 3. Agent image (`agent/Dockerfile`) +#### Agent image (`agent/Dockerfile`) The default image installs Python, Node 20, `git`, `gh`, Claude Code CLI, and **`mise`** for polyglot builds. If your repositories need extra runtimes (Java, Go, specific CLIs, native libs), **extend `agent/Dockerfile`** (and optionally `agent/` tooling) so `mise run build` and your stack’s workflows succeed inside the container. Rebuild the runtime asset when you change the Dockerfile (a normal `cd cdk && npx cdk deploy` / CDK asset build does this). -### 4. Stack name (optional) +#### Stack name (optional) The development stack id is set in **`cdk/src/main.ts`** (default **`backgroundagent-dev`**). If you rename it, update every place that passes **`--stack-name`** to the AWS CLI (including examples in this guide and any scripts you keep locally). -### 5. Fork-specific metadata (optional) +#### Fork-specific metadata (optional) If you maintain your own fork, you will typically also replace **clone URLs**, **README badges**, **issue links**, and **`package.json` `name`** fields with your org’s identifiers. Those do not affect runtime behavior but avoid confusion for contributors. -### 6. Make target repositories easy for the agent +#### Make target repositories easy for the agent Keep each repo you onboard **clear and automatable**: documented build/test commands, consistent layout, and project-level agent hints (`CLAUDE.md`, `.claude/`). See [Make your codebase AI ready](https://medium.com/@alain.krok/make-your-codebase-ai-ready-05d6a160f1d5) for practical guidance. @@ -87,7 +124,7 @@ Default output format [None]: json - [Docker](https://docs.docker.com/engine/install/) — for local agent runs and CDK asset builds. - [mise](https://mise.jdx.dev/getting-started.html) — task runner and version manager for Node, security tools, and (under `agent/`) Python. Install from the official guide; it is **not** installed via npm. - **AWS CDK CLI** ≥ 2.233.0 — install globally with npm **after** mise is active so it uses the same Node as this repo (see [Set up your toolchain](#set-up-your-toolchain)): `npm install -g aws-cdk`. -- A **GitHub personal access token** (PAT) with permission to access every repository you onboard (clone, push to branches, create and update pull requests)—often a fine-grained token scoped to **your fork** of `awslabs/agent-plugins` if you follow the fork workflow under **Repository preparation**. After deployment, store it in the Secrets Manager secret the stack creates ([Post-deployment setup](#post-deployment-setup)); for local agent runs, export `GITHUB_TOKEN` (see **Local testing**). Required scopes are documented in `agent/README.md`. +- A **GitHub personal access token** (PAT) with permission to access every repository you onboard—see **[Repository preparation](#repository-preparation)** (steps 2–3) for required fine-grained permissions and how to store the value in Secrets Manager after deploy. For local agent runs, export `GITHUB_TOKEN` (see **Local testing**). Extra runtime notes live in `agent/README.md`. **Versions this repo pins via mise (no separate Node/Yarn/Python install needed for the standard path):** @@ -347,11 +384,11 @@ If `put-secret-value` returns **`Invalid endpoint: https://secretsmanager..amazo #### Set the GitHub token -The agent reads the GitHub personal access token from Secrets Manager at startup. After deploying, store your PAT in the secret: +The agent reads the GitHub personal access token from Secrets Manager at runtime. The canonical flow (permissions table + `put-secret-value` commands) is **[Repository preparation](#repository-preparation), step 3**—follow that first. + +If you only need the commands here, use the same snippet as in that section (adjust **`--stack-name`** if you renamed the stack). If `SECRET_ARN` is empty after setting `REGION`, list outputs in that Region (`describe-stacks` … `--query 'Stacks[0].Outputs' --output table`) and confirm the row `GitHubTokenSecretArn` exists—wrong stack name or an incomplete deployment are the other common causes. ```bash -# Same Region you deployed to (example: us-east-1). Must be non-empty—see note above -# about "Invalid endpoint: ...secretsmanager..amazonaws.com". REGION=us-east-1 SECRET_ARN=$(aws cloudformation describe-stacks \ @@ -366,10 +403,6 @@ aws secretsmanager put-secret-value \ --secret-string "ghp_your_fine_grained_pat_here" ``` -If `SECRET_ARN` is still empty after setting `REGION`, list outputs in that Region (`describe-stacks` … `--query 'Stacks[0].Outputs' --output table`) and confirm the row `GitHubTokenSecretArn` exists—wrong stack name or an incomplete deployment are the other common causes. - -See `agent/README.md` for the required PAT permissions. - #### Onboard repositories Repositories must be onboarded before tasks can target them. Each repository is registered as a `Blueprint` construct in the CDK stack (`cdk/src/stacks/agent.ts`). A `Blueprint` writes a `RepoConfig` record to the shared `RepoTable` DynamoDB table via a CloudFormation custom resource. diff --git a/docs/guides/USER_GUIDE.md b/docs/guides/USER_GUIDE.md index 6fef6c4..8875133 100644 --- a/docs/guides/USER_GUIDE.md +++ b/docs/guides/USER_GUIDE.md @@ -269,7 +269,7 @@ Transitions the task to `CANCELLED` and records a cancellation event. Only tasks curl "$API_URL/tasks//events" -H "Authorization: $TOKEN" ``` -Returns the chronological event log for a task (e.g., `task_created`, `session_started`, `task_completed`). Supports `limit` and `next_token` pagination parameters. +Returns the chronological event log for a task (e.g., `task_created`, `preflight_failed`, `session_started`, `task_completed`). Supports `limit` and `next_token` pagination parameters. If the task failed before the agent ran, inspect `preflight_failed` entries for `reason` and `detail` (see **Task events** under **Task lifecycle**). ## Using the CLI @@ -398,8 +398,11 @@ node lib/bin/bgagent.js list --repo owner/repo --limit 10 ```bash node lib/bin/bgagent.js events node lib/bin/bgagent.js events --limit 20 +node lib/bin/bgagent.js events --output json ``` +Use **`--output json`** to see the full payload for **`preflight_failed`** (`reason`, `detail`, and per-check metadata). See **Task events** under **Task lifecycle** for how to interpret common `reason` values. + ### Cancelling a task ```bash @@ -526,7 +529,7 @@ The orchestrator uses Lambda Durable Functions to manage the lifecycle durably | `HYDRATING` | Orchestrator passed admission control; assembling the agent payload | | `RUNNING` | Agent session started and actively working on the task | | `COMPLETED` | Agent finished and created a PR (or determined no changes were needed) | -| `FAILED` | Agent encountered an error, user concurrency limit was reached, or content was blocked by guardrail screening | +| `FAILED` | Agent encountered an error, user concurrency limit was reached, content was blocked by guardrail screening, or **pre-flight** checks failed before the agent started (for example an underpowered GitHub PAT) | | `CANCELLED` | Task was cancelled by the user | | `TIMED_OUT` | Task exceeded the maximum allowed duration (~9 hours) | @@ -550,6 +553,8 @@ curl "$API_URL/tasks//events" -H "Authorization: $TOKEN" Events include: `task_created`, `admission_rejected`, `preflight_failed`, `hydration_started`, `hydration_complete`, `guardrail_blocked`, `session_started`, `pr_created`, `pr_updated`, `task_completed`, `task_failed`, `task_cancelled`, `task_timed_out`. Event records are subject to the same 90-day retention as task records and are automatically deleted after that period. +**`preflight_failed`:** The orchestrator could not safely start work (GitHub API checks run **before** hydration and AgentCore). Open the event in `bgagent events ` (or the JSON from `GET /tasks/{id}/events`) and read **`reason`** and **`detail`**. Typical values for **`reason`** include `GITHUB_UNREACHABLE`, `REPO_NOT_FOUND_OR_NO_ACCESS`, `INSUFFICIENT_GITHUB_REPO_PERMISSIONS`, and `PR_NOT_FOUND_OR_CLOSED`. The most common fix for **`INSUFFICIENT_GITHUB_REPO_PERMISSIONS`** is to update the GitHub PAT in AWS Secrets Manager so it matches your task type—for **`new_task`** / **`pr_iteration`** you need **Contents** read/write and **Pull requests** read/write on the target repo; **`pr_review`** can pass with **Triage** (or higher) when you do not need to push. See [Developer guide — Repository preparation](./DEVELOPER_GUIDE.md#repository-preparation) for the full table and `put-secret-value` steps. + ## What the agent does ### New task (`new_task`) @@ -615,6 +620,7 @@ Filter by task ID to find logs for a specific task. ## Tips - **Onboard your repo first**: Repositories must be registered via a `Blueprint` construct before tasks can target them. If you get a `REPO_NOT_ONBOARDED` error, contact your platform administrator. +- **GitHub PAT and `preflight_failed`**: If a task ends in `FAILED` with a `preflight_failed` event, the platform rejected the run before the agent consumed compute—often a token scoped read-only while the task needed push access. Check event `reason` / `detail` and align your fine-grained PAT with [Repository preparation](./DEVELOPER_GUIDE.md#repository-preparation); then update the secret and submit a new task. - **Prepare your repo**: The agent works best with repositories that are agent friendly. See the [Developer guide](./DEVELOPER_GUIDE.md) for repository preparation advice. - **Add a CLAUDE.md**: The agent automatically loads project-level configuration from your repository — `CLAUDE.md`, `.claude/CLAUDE.md`, `.claude/rules/*.md`, `.claude/settings.json`, `.claude/agents/`, and `.mcp.json`. Use these to provide project-specific build commands, conventions, constraints, custom subagents, and architecture notes. See the [Prompt guide](./PROMPT_GUIDE.md#repo-level-customization) for details and examples. - **Issue vs text**: When using `--issue` (CLI) or `issue_number` (API), the agent fetches the full issue body from GitHub, including any labels, comments, and linked context. This is usually better than a short text description. diff --git a/docs/scripts/sync-starlight.mjs b/docs/scripts/sync-starlight.mjs index 93961aa..099c60a 100644 --- a/docs/scripts/sync-starlight.mjs +++ b/docs/scripts/sync-starlight.mjs @@ -44,6 +44,17 @@ function rewriteDocsLinkTarget(target) { CONTRIBUTING: '/developer-guide/contributing', }; + /** `splitGuide` emits each `##` from DEVELOPER_GUIDE as its own page — map #anchors to those routes. */ + const developerGuideAnchorRoutes = { + 'repository-preparation': '/developer-guide/repository-preparation', + }; + if (stem === 'DEVELOPER_GUIDE' && anchor) { + const splitRoute = developerGuideAnchorRoutes[anchor.toLowerCase()]; + if (splitRoute) { + return splitRoute; + } + } + if (explicitGuideRoutes[stem]) { return `${explicitGuideRoutes[stem]}${anchorSuffix}`; } diff --git a/docs/src/content/docs/design/Architecture.md b/docs/src/content/docs/design/Architecture.md index 2a7fa80..e069011 100644 --- a/docs/src/content/docs/design/Architecture.md +++ b/docs/src/content/docs/design/Architecture.md @@ -171,7 +171,7 @@ The following risks were identified via external review (March 2026) and are tra |---|------|----------|-----------|-------------------| | 1 | **Agent vs. orchestrator DynamoDB race** — `agent/task_state.py` writes terminal status without conditional expressions, so it can overwrite orchestrator-managed CANCELLED with COMPLETED. The orchestrator's `transitionTask()` uses `ConditionExpression` but the agent side does not. | High | `agent/task_state.py` | Resolved (Iteration 3bis) — `ConditionExpression` guards added to `write_running()` (requires status IN SUBMITTED, HYDRATING) and `write_terminal()` (requires status IN RUNNING, HYDRATING, FINALIZING). `ConditionalCheckFailedException` is caught and logged as a skip. | | 2 | **No DLQ on orchestrator async invocation** — The orchestrator Lambda is invoked with `InvocationType: 'Event'` but has no dead-letter queue. Failed or throttled invocations leave tasks stuck in SUBMITTED. | High | `src/constructs/task-orchestrator.ts` | Resolved (Iteration 3bis) — SQS DLQ deliberately skipped since durable execution (`withDurableExecution`, 14-day retention) manages its own retries; a DLQ would conflict. Added `retryAttempts: 0` on alias async invoke config to prevent Lambda-level duplicate invocations. CloudWatch alarm on `fn.metricErrors()` (threshold: 3, 2 periods of 5min) provides alerting. | -| 3 | **Concurrency counter drift** — If the orchestrator crashes between concurrency increment and decrement, the user's counter is permanently inflated. The `UserConcurrencyTable` JSDoc acknowledges this but no reconciliation process exists. | Medium | `src/constructs/user-concurrency-table.ts` | Resolved (Iteration 3bis) — `ConcurrencyReconciler` construct with scheduled Lambda (EventBridge rate 15min). Scans concurrency table, queries task table's `UserStatusIndex` GSI per user, compares actual count with stored `active_count`, and corrects drift. TOCTOU-safe via `ConditionExpression` on update. | +| 3 | **Concurrency counter drift** — If the orchestrator crashes between concurrency increment and decrement, the user's counter is permanently inflated. The `UserConcurrencyTable` JSDoc acknowledges this but no reconciliation process exists. | Medium | `src/constructs/user-concurrency-table.ts` | Resolved (Iteration 3bis) — `ConcurrencyReconciler` construct with scheduled Lambda (EventBridge rate 15min). Scans concurrency table, queries task table's `UserStatusIndex` GSI per user, compares actual count with stored `active_count`, and corrects drift. TOCTOU-safe via `ConditionExpression` on update. Additionally, the `finalizeTask` heartbeat-detected crash path guards against double-decrement by only releasing concurrency after a successful `transitionTask`, and re-reading the task state on failure. | | 4 | **Single NAT Gateway** — `natGateways: 1` means a single AZ failure blocks all agent internet egress. Acceptable for development; needs multi-AZ NAT for production. | Medium | `src/constructs/agent-vpc.ts` | Mitigated (Iteration 3bis) — already configurable via `AgentVpcProps.natGateways` (default: 1). Deployers can set `natGateways: 2` or higher for multi-AZ redundancy. No code changes needed. | | 5 | **Dual-language prompt assembly** — Both TypeScript (`context-hydration.ts:assembleUserPrompt`) and Python (`entrypoint.py:assemble_prompt`) implement the same logic. Changes to one must be manually replicated in the other. | Medium | `src/handlers/shared/context-hydration.ts`, `agent/entrypoint.py` | Mitigated (Iteration 3bis) — production path uses orchestrator's `assembleUserPrompt()` exclusively; the Python `assemble_prompt()` has a deprecation docstring and is retained only for local batch mode and dry-run mode. Risk of divergence reduced but not eliminated. | diff --git a/docs/src/content/docs/design/Memory.md b/docs/src/content/docs/design/Memory.md index 33ed999..835d5d4 100644 --- a/docs/src/content/docs/design/Memory.md +++ b/docs/src/content/docs/design/Memory.md @@ -55,7 +55,7 @@ Task end (orchestrator fallback): - **Token budget** — Memory context is capped at 2,000 tokens (~8,000 characters) to avoid consuming too much system prompt space. Oldest entries are dropped first. - **Per-repo namespace via template variables** — Namespace isolation is configured on the extraction strategies using `{actorId}` and `{sessionId}` template variables. Events are written with `actorId = "owner/repo"` and `sessionId = taskId`. The extraction pipeline places records at `/{repo}/knowledge/` (semantic) and `/{repo}/episodes/{taskId}/` (episodic). Reads use these paths as namespace prefixes. This is a breaking infrastructure change from the initial implementation — the Memory resource must be recreated on deploy. - **Prompt version excludes memory** — The SHA-256 hash is computed from deterministic prompt parts only. Memory context varies per run, so including it would make every prompt version unique and defeat the purpose of tracking prompt changes. -- **Orchestrator fallback** — If the agent container crashes, times out, or OOMs without writing memory, the orchestrator writes a minimal episode so the episodic record is not lost. The fallback is itself fail-open (wrapped in try-catch) to never block `finalizeTask`. The return value is logged to surface silent failures (Iteration 3bis hardening). +- **Orchestrator fallback** — If the agent container crashes, times out, or OOMs without writing memory, the orchestrator writes a minimal episode so the episodic record is not lost. This includes cases where the heartbeat-based crash detection triggers early finalization (agent died before writing any memory). The fallback is itself fail-open (wrapped in try-catch) to never block `finalizeTask`. The return value is logged to surface silent failures (Iteration 3bis hardening). ### Test coverage diff --git a/docs/src/content/docs/design/Orchestrator.md b/docs/src/content/docs/design/Orchestrator.md index c9a344f..4bdb35c 100644 --- a/docs/src/content/docs/design/Orchestrator.md +++ b/docs/src/content/docs/design/Orchestrator.md @@ -183,7 +183,7 @@ See the Admission control section for details. Validates that the task is allowe #### Step 2: Context hydration (deterministic) -See the Context hydration section for details. Assembles the agent's prompt from multiple sources depending on task type. For `new_task`: user message, GitHub issue (title, body, comments), memory, repo configuration, and platform defaults. For `pr_iteration`: PR metadata, review comments, diff summary, and optional user instructions. An additional **pre-flight** sub-step verifies PR accessibility when `pr_number` is set (see [preflight.ts](../../cdk/src/handlers/shared/preflight.ts)). The assembled prompt is screened through Amazon Bedrock Guardrails for prompt injection before the agent receives it (PR tasks: always screened; `new_task`: screened when issue content is present). The output is a fully assembled prompt, ready to pass to the compute session. +See the Context hydration section for details. Assembles the agent's prompt from multiple sources depending on task type. For `new_task`: user message, GitHub issue (title, body, comments), memory, repo configuration, and platform defaults. For `pr_iteration`: PR metadata, review comments, diff summary, and optional user instructions. An additional **pre-flight** sub-step (see [preflight.ts](../../cdk/src/handlers/shared/preflight.ts)) verifies PR accessibility when `pr_number` is set and validates that the resolved GitHub token has sufficient repository permissions for the task type (so read-only PATs fail early with `INSUFFICIENT_GITHUB_REPO_PERMISSIONS`). The assembled prompt is screened through Amazon Bedrock Guardrails for prompt injection before the agent receives it (PR tasks: always screened; `new_task`: screened when issue content is present). The output is a fully assembled prompt, ready to pass to the compute session. #### Step 3: Session start and agent execution (deterministic start + agentic execution) @@ -228,7 +228,7 @@ When the orchestrator loads a task's `blueprint_config`, it resolves the step pi 1. **Load `RepoConfig`** from the `RepoTable` by `repo` (PK). Merge with platform defaults (see [REPO_ONBOARDING.md](/design/repo-onboarding#platform-defaults) for default values and override precedence). 2. **Resolve compute strategy** from `compute_type` (default: `agentcore`). The strategy implements the `ComputeStrategy` interface (see [REPO_ONBOARDING.md](/design/repo-onboarding#compute-strategy-interface)). -3. **Build step list.** If `step_sequence` is provided, use it; otherwise use the default sequence (`admission-control` → `hydrate-context` → `pre-flight` → `start-session` → `await-agent-completion` → `finalize`). The `pre-flight` step runs fail-closed readiness checks (GitHub API reachability, repo access, PR accessibility for PR tasks) before consuming compute — see [ROADMAP.md Iteration 3c](/roadmap/roadmap). For each entry, resolve to a built-in step function or a Lambda invocation wrapper. +3. **Build step list.** If `step_sequence` is provided, use it; otherwise use the default sequence (`admission-control` → `hydrate-context` → `pre-flight` → `start-session` → `await-agent-completion` → `finalize`). The `pre-flight` step runs fail-closed readiness checks (GitHub API reachability, repository access, **PAT privilege** for the task type via REST `permissions` and GraphQL `viewerPermission` when needed, PR accessibility for PR tasks) before consuming compute — see [ROADMAP.md Iteration 3c](/roadmap/roadmap). For each entry, resolve to a built-in step function or a Lambda invocation wrapper. 4. **Inject custom steps.** If `custom_steps` are defined and no explicit `step_sequence` is provided, insert them at their declared `phase` position (pre-agent steps before `start-session`, post-agent steps after `await-agent-completion`). 5. **Validate.** Check that required steps are present and correctly ordered (see [step sequence validation](/design/repo-onboarding#step-sequence-validation)). If invalid, fail the task with `INVALID_STEP_SEQUENCE`. 6. **Execute.** Iterate the resolved list. For each step: check cancellation, filter `blueprintConfig` to only the fields that step needs (stripping credential ARNs for custom Lambda steps), execute with retry policy, enforce `StepOutput.metadata` size budget (10KB), prune `previousStepResults` to last 5 steps, emit events. Built-in steps that need durable waits (e.g. `await-agent-completion`) receive the `DurableContext` and `ComputeStrategy` so they can call `waitForCondition` and `computeStrategy.pollSession()` internally — no name-based special-casing in the framework loop. @@ -428,7 +428,13 @@ The orchestrator needs to know whether the session is still running. Two complem **Iteration 1 (historical).** The `invoke_agent_runtime` call blocked; when it returned, the session was over. No explicit liveness check was needed. -**Fallback: DynamoDB heartbeat (optional enhancement).** As defense in depth, the agent can write a heartbeat timestamp to DynamoDB every N minutes. The orchestrator reads it during its poll cycle. A missing heartbeat (e.g. none in the last 10 minutes while `/ping` reports `HealthyBusy`) could indicate the agent is stuck but not idle — triggering investigation or forced termination. +**DynamoDB heartbeat (implemented).** The agent writes an `agent_heartbeat_at` timestamp to DynamoDB every 45 seconds via a daemon thread in `server.py`. The heartbeat worker is resilient to transient DynamoDB errors (each write is wrapped in try/except with a retry on the next interval). The orchestrator's `pollTaskStatus` reads this timestamp during each poll cycle and applies two thresholds: + +- **Grace period** (`AGENT_HEARTBEAT_GRACE_SEC = 120s`): After transitioning to RUNNING, the orchestrator waits this long before expecting heartbeats. This covers container startup and pipeline initialization. +- **Stale threshold** (`AGENT_HEARTBEAT_STALE_SEC = 240s`): If `agent_heartbeat_at` exists and is older than this, the session is treated as lost (crash, OOM, or stuck). +- **Early crash detection**: If `agent_heartbeat_at` is never set and the task has been RUNNING past the combined grace + stale window (360s), the orchestrator treats this as an early crash (agent died before the pipeline started). + +When either condition is met, `pollTaskStatus` sets `sessionUnhealthy = true` in the poll state. The `finalizeTask` function then transitions the task to FAILED with the reason `"Agent session lost: no recent heartbeat from the runtime"`. The pipeline also writes an initial heartbeat at the very start of `run_task()` to minimize the window between session start and first heartbeat. ### The 15-minute idle timeout problem @@ -445,7 +451,7 @@ AgentCore Runtime terminates sessions after 15 minutes of inactivity (no `/ping` When the session ends (agent finishes, crashes, or is terminated), the orchestrator detects this: - **Iteration 1 (historical):** The `invoke_agent_runtime` call returned (it blocked). The response body contained the agent's output (status, PR URL, cost, etc.). -- **Target state:** The orchestrator polls the agent via re-invocation on the same session (see Invocation model above). Completion is detected when: (a) the agent responds with a "completed" or "failed" status in the poll response, or (b) the re-invocation fails because the session was terminated (idle timeout, crash, or 8-hour limit reached). In the durable orchestrator, a `waitForCondition` evaluates the poll result at each interval and resumes the pipeline when the condition is met. See the session monitoring pattern in the Implementation options section. +- **Target state:** The orchestrator polls the agent via re-invocation on the same session (see Invocation model above). Completion is detected when: (a) the agent responds with a "completed" or "failed" status in the poll response, (b) the re-invocation fails because the session was terminated (idle timeout, crash, or 8-hour limit reached), or (c) the DynamoDB heartbeat check detects the session is unhealthy (stale or missing `agent_heartbeat_at` — see DynamoDB heartbeat above). In the durable orchestrator, a `waitForCondition` evaluates the poll result at each interval and resumes the pipeline when the condition is met. See the session monitoring pattern in the Implementation options section. ### External termination (cancellation) @@ -543,7 +549,8 @@ This section uses an FMEA (Failure Mode and Effects Analysis) approach: for each | Failure mode | Impact | Recovery | |---|---|---| -| Agent crashes mid-task (unhandled exception) | Partial branch may exist on GitHub | Orchestrator detects session end. Finalization inspects GitHub state. If commits exist, may mark as partial completion. Task transitions to `FAILED` or `COMPLETED` with partial flag. | +| Agent crashes mid-task (unhandled exception) | Partial branch may exist on GitHub | Orchestrator detects session end via DynamoDB heartbeat staleness check (see Liveness monitoring). Finalization inspects GitHub state. If commits exist, may mark as partial completion. Task transitions to `FAILED` or `COMPLETED` with partial flag. | +| Agent crashes before pipeline starts (early crash: OOM during startup, import error, container failure) | `agent_heartbeat_at` is never set in DynamoDB | `pollTaskStatus` detects missing heartbeat after the combined grace + stale window (360s). Task transitions to `FAILED` with reason "Agent session lost". | | Agent runs out of turns (max_turns limit) | Agent stopped by SDK, not by crash | Session ends normally with status `end_turn`. Orchestrator finalizes; if PR exists, task is `COMPLETED`. | | Agent exceeds cost budget (max_budget_usd limit) | Agent stopped by SDK when budget is reached | Session ends normally. Orchestrator finalizes; if PR exists, task is `COMPLETED`. | | Agent is idle for 15 min (AgentCore kills session) | Work in progress may be lost if not committed | Task transitions to `TIMED_OUT`. Partial work may be on the branch if the agent committed before going idle. | @@ -562,7 +569,7 @@ This section uses an FMEA (Failure Mode and Effects Analysis) approach: for each |---|---|---| | Orchestrator crashes during `HYDRATING` | Task stuck in `HYDRATING` | Durable execution (Lambda Durable Functions) automatically replays from the last checkpoint, skipping completed steps. Without durable orchestration, a recovery process detects stuck tasks (in `HYDRATING` for > N minutes) and restarts them. | | Orchestrator crashes during `RUNNING` | Task stuck in `RUNNING`, session may still be alive | Recovery process detects task is in `RUNNING` but orchestrator is not managing it. It resumes monitoring the session (using the stored session ID). When the session ends, it runs finalization. | -| Orchestrator crashes during `FINALIZING` | Task stuck in `FINALIZING` | Recovery process detects and restarts finalization. Finalization steps must be idempotent (decrementing a counter twice should be detected and handled). | +| Orchestrator crashes during `FINALIZING` | Task stuck in `FINALIZING` | Recovery process detects and restarts finalization. Finalization steps are idempotent. The heartbeat-detected crash finalization path avoids double-decrement by only emitting events and releasing concurrency after a successful `transitionTask`; if the transition fails (task already terminal), it re-reads the task and handles accordingly. | | DynamoDB unavailable during state transition | State not persisted | Retry with backoff. If the state transition cannot be persisted, the orchestrator must not proceed (risk of inconsistency). After retries are exhausted, alert operators. | ### Recovery mechanisms summary @@ -607,7 +614,7 @@ Concurrency is tracked using atomic counters: - **UserConcurrency.** A DynamoDB item per user: `{ user_id, active_count }`. Incremented atomically (conditional update: `active_count < max`) during admission. Decremented during finalization. - **SystemConcurrency.** A single DynamoDB item: `{ pk: "SYSTEM", active_count }`. Same pattern. -**Counter drift.** If the orchestrator crashes after starting a session but before persisting the session-to-task mapping, or after a session ends but before decrementing the counter, the counter drifts. Mitigation: +**Counter drift.** If the orchestrator crashes after starting a session but before persisting the session-to-task mapping, or after a session ends but before decrementing the counter, the counter drifts. The heartbeat-detected crash finalization path (`finalizeTask` sessionUnhealthy branch) guards against double-decrement: it only decrements after a successful state transition, and re-reads the task if the transition fails to determine the correct action. Mitigation: - Always persist the task state transition **before** taking the action (write-ahead pattern). For example, persist the task as `RUNNING` and record the session ID before calling `invoke_agent_runtime`. - Run a **reconciliation Lambda** every 5 minutes (EventBridge schedule): query the Tasks table for tasks in `RUNNING` + `HYDRATING` state per user (GSI on `user_id` + `status`), compare the count to `UserConcurrency.active_count`, and correct via `UpdateItem` if different. The Lambda emits a `counter_drift_corrected` CloudWatch metric (dimensions: `user_id`, `drift_amount`) when it corrects a value, and a `counter_reconciliation_run` metric on every execution for health monitoring. diff --git a/docs/src/content/docs/design/Security.md b/docs/src/content/docs/design/Security.md index 5009f5c..9d948e2 100644 --- a/docs/src/content/docs/design/Security.md +++ b/docs/src/content/docs/design/Security.md @@ -119,7 +119,7 @@ The platform enforces policies at multiple points in the task lifecycle. Today, | **Submission** | Guardrail input screening | `create-task-core.ts` (Bedrock Guardrails) | HTTP 400 response only — no event emitted | | **Submission** | Idempotency check | `create-task-core.ts` | HTTP 409 response only — no event emitted | | **Admission** | Concurrency limit | `orchestrator.ts` (`admissionControl`) | `admission_rejected` event emitted | -| **Pre-flight** | GitHub reachability, repo access, PR access | `preflight.ts` | `preflight_failed` event emitted | +| **Pre-flight** | GitHub reachability, repo access, PAT repo permissions (push / `viewerPermission` by task type), PR access | `preflight.ts` | `preflight_failed` event emitted | | **Hydration** | Guardrail prompt screening (PR + issue content) | `context-hydration.ts` | `guardrail_blocked` event emitted | | **Hydration** | Budget/quota resolution (3-tier max_turns, 2-tier max_budget_usd) | `orchestrator.ts` (`hydrateAndTransition`) | Values persisted on task record — no policy decision event | | **Hydration** | Token budget for prompt assembly | `context-hydration.ts` | No event emitted | diff --git a/docs/src/content/docs/developer-guide/Installation.md b/docs/src/content/docs/developer-guide/Installation.md index 1e3a486..fa69592 100644 --- a/docs/src/content/docs/developer-guide/Installation.md +++ b/docs/src/content/docs/developer-guide/Installation.md @@ -22,7 +22,7 @@ Default output format [None]: json - [Docker](https://docs.docker.com/engine/install/) — for local agent runs and CDK asset builds. - [mise](https://mise.jdx.dev/getting-started.html) — task runner and version manager for Node, security tools, and (under `agent/`) Python. Install from the official guide; it is **not** installed via npm. - **AWS CDK CLI** ≥ 2.233.0 — install globally with npm **after** mise is active so it uses the same Node as this repo (see [Set up your toolchain](#set-up-your-toolchain)): `npm install -g aws-cdk`. -- A **GitHub personal access token** (PAT) with permission to access every repository you onboard (clone, push to branches, create and update pull requests)—often a fine-grained token scoped to **your fork** of `awslabs/agent-plugins` if you follow the fork workflow under **Repository preparation**. After deployment, store it in the Secrets Manager secret the stack creates ([Post-deployment setup](#post-deployment-setup)); for local agent runs, export `GITHUB_TOKEN` (see **Local testing**). Required scopes are documented in `agent/README.md`. +- A **GitHub personal access token** (PAT) with permission to access every repository you onboard—see **[Repository preparation](#repository-preparation)** (steps 2–3) for required fine-grained permissions and how to store the value in Secrets Manager after deploy. For local agent runs, export `GITHUB_TOKEN` (see **Local testing**). Extra runtime notes live in `agent/README.md`. **Versions this repo pins via mise (no separate Node/Yarn/Python install needed for the standard path):** @@ -282,11 +282,11 @@ If `put-secret-value` returns **`Invalid endpoint: https://secretsmanager..amazo #### Set the GitHub token -The agent reads the GitHub personal access token from Secrets Manager at startup. After deploying, store your PAT in the secret: +The agent reads the GitHub personal access token from Secrets Manager at runtime. The canonical flow (permissions table + `put-secret-value` commands) is **[Repository preparation](#repository-preparation), step 3**—follow that first. + +If you only need the commands here, use the same snippet as in that section (adjust **`--stack-name`** if you renamed the stack). If `SECRET_ARN` is empty after setting `REGION`, list outputs in that Region (`describe-stacks` … `--query 'Stacks[0].Outputs' --output table`) and confirm the row `GitHubTokenSecretArn` exists—wrong stack name or an incomplete deployment are the other common causes. ```bash -# Same Region you deployed to (example: us-east-1). Must be non-empty—see note above -# about "Invalid endpoint: ...secretsmanager..amazonaws.com". REGION=us-east-1 SECRET_ARN=$(aws cloudformation describe-stacks \ @@ -301,10 +301,6 @@ aws secretsmanager put-secret-value \ --secret-string "ghp_your_fine_grained_pat_here" ``` -If `SECRET_ARN` is still empty after setting `REGION`, list outputs in that Region (`describe-stacks` … `--query 'Stacks[0].Outputs' --output table`) and confirm the row `GitHubTokenSecretArn` exists—wrong stack name or an incomplete deployment are the other common causes. - -See `agent/README.md` for the required PAT permissions. - #### Onboard repositories Repositories must be onboarded before tasks can target them. Each repository is registered as a `Blueprint` construct in the CDK stack (`cdk/src/stacks/agent.ts`). A `Blueprint` writes a `RepoConfig` record to the shared `RepoTable` DynamoDB table via a CloudFormation custom resource. diff --git a/docs/src/content/docs/developer-guide/Repository-preparation.md b/docs/src/content/docs/developer-guide/Repository-preparation.md index e7ed169..4ddefda 100644 --- a/docs/src/content/docs/developer-guide/Repository-preparation.md +++ b/docs/src/content/docs/developer-guide/Repository-preparation.md @@ -4,11 +4,13 @@ title: Repository preparation The CDK stack ships with a **sample onboarded repository** (`krokoko/agent-plugins` in `cdk/src/stacks/agent.ts`) so the project deploys and CDK tests run cleanly out of the box. That value is for **default wiring only**: a real agent run **pushes branches and opens pull requests** with your GitHub PAT, so the onboarded repo must be one your token can **clone, push to, and open PRs on**. Most people do **not** have that access to the upstream repo. -**Recommended first setup:** fork [`awslabs/agent-plugins`](https://github.com/awslabs/agent-plugins) on GitHub, set the `Blueprint` **`repo`** to **`your-github-username/agent-plugins`** (match your fork’s owner and repo name), and create a **fine-grained PAT** with access **only to that fork** (clone, push, PRs—see `agent/README.md` for scopes). Use that token for **`GITHUB_TOKEN`** when running `./agent/run.sh` locally and store the same value in Secrets Manager after deploy. For use on your own codebases, point the Blueprint at those repos instead and scope the PAT to match. +**Recommended first setup:** fork [`awslabs/agent-plugins`](https://github.com/awslabs/agent-plugins) on GitHub, set the `Blueprint` **`repo`** to **`your-github-username/agent-plugins`** (match your fork’s owner and repo name), and use a **fine-grained PAT** scoped to **that fork** with the permissions in step 2. Use the same token for **`GITHUB_TOKEN`** when running `./agent/run.sh` locally and store it in Secrets Manager (step 3) after deploy. -Register every repo you want tasks to target and align tools and permissions (steps below). +After deployment, the orchestrator **pre-flight** step calls the GitHub API to verify your token can access the task repository with enough privilege (`preflight.ts`). That catches common mistakes (for example a read-only PAT) **before** AgentCore work starts: the task fails with `INSUFFICIENT_GITHUB_REPO_PERMISSIONS` and a clear detail string instead of completing after a `git push` 403 buried in CloudWatch logs. -### 1. Register repositories with `Blueprint` (required) +### Required setup + +#### 1. Register repositories with `Blueprint` The Task API only accepts tasks for repositories that are **onboarded** — each one is a `Blueprint` construct in `cdk/src/stacks/agent.ts` that writes a `RepoConfig` row to DynamoDB. @@ -20,22 +22,57 @@ Optional per-repo overrides (same file / `Blueprint` props) include a different After changing Blueprints, redeploy: `cd cdk && npx cdk deploy` (or `MISE_EXPERIMENTAL=1 mise //cdk:deploy`). -### 2. GitHub personal access token +#### 2. GitHub personal access token (fine-grained) + +Create a **fine-grained PAT** at GitHub → **Settings** → **Developer settings** → **Personal access tokens** → **Fine-grained tokens**. + +**Repository access:** select only the repo(s) the agent will use (for the fork workflow, **only your fork**). + +| Permission | Access | Reason | +|------------|--------|--------| +| **Contents** | Read and write | `git clone` and `git push` | +| **Pull requests** | Read and write | `gh pr create` / update PRs | +| **Issues** | Read | Issue title, body, and comments for context | +| **Metadata** | Read | Granted by default | + +For **`new_task`** and **`pr_iteration`**, pre-flight requires **Contents write** (REST `permissions.push`, or GraphQL `viewerPermission` of `WRITE` / `MAINTAIN` / `ADMIN`). For **`pr_review`**, **Triage** or higher is sufficient when the workflow does not need to push branches. Classic PATs with equivalent **`repo`** scope still work; see `agent/README.md` for environment variables and edge cases. + +#### 3. Store the PAT in AWS Secrets Manager (after deploy) + +The stack creates a secret (output **`GitHubTokenSecretArn`**). After your first successful **`mise run //cdk:deploy`**, store the **same** PAT string you use locally: + +```bash +# Same Region you deployed to (example: us-east-1). Must be non-empty—see [Post-deployment setup](#post-deployment-setup) if `put-secret-value` fails with a double-dot endpoint. +REGION=us-east-1 + +SECRET_ARN=$(aws cloudformation describe-stacks \ + --stack-name backgroundagent-dev \ + --region "$REGION" \ + --query 'Stacks[0].Outputs[?OutputKey==`GitHubTokenSecretArn`].OutputValue | [0]' \ + --output text) + +aws secretsmanager put-secret-value \ + --region "$REGION" \ + --secret-id "$SECRET_ARN" \ + --secret-string "ghp_your_fine_grained_pat_here" +``` + +If you use a **per-repo** secret (`githubTokenSecretArn` on a Blueprint), put the PAT in that secret instead; the orchestrator reads whichever ARN is configured for the repo. -The agent clones, pushes, and opens pull requests using a **GitHub PAT** stored in Secrets Manager (see [Post-deployment setup](#post-deployment-setup)). The token must have permission to access **every** onboarded repository (clone, push to branches, create/update PRs). Use a fine-grained PAT scoped to those repos—for the fork workflow above, restrict the token to **your fork** only; see `agent/README.md` for required scopes. +### Optional customization -### 3. Agent image (`agent/Dockerfile`) +#### Agent image (`agent/Dockerfile`) The default image installs Python, Node 20, `git`, `gh`, Claude Code CLI, and **`mise`** for polyglot builds. If your repositories need extra runtimes (Java, Go, specific CLIs, native libs), **extend `agent/Dockerfile`** (and optionally `agent/` tooling) so `mise run build` and your stack’s workflows succeed inside the container. Rebuild the runtime asset when you change the Dockerfile (a normal `cd cdk && npx cdk deploy` / CDK asset build does this). -### 4. Stack name (optional) +#### Stack name (optional) The development stack id is set in **`cdk/src/main.ts`** (default **`backgroundagent-dev`**). If you rename it, update every place that passes **`--stack-name`** to the AWS CLI (including examples in this guide and any scripts you keep locally). -### 5. Fork-specific metadata (optional) +#### Fork-specific metadata (optional) If you maintain your own fork, you will typically also replace **clone URLs**, **README badges**, **issue links**, and **`package.json` `name`** fields with your org’s identifiers. Those do not affect runtime behavior but avoid confusion for contributors. -### 6. Make target repositories easy for the agent +#### Make target repositories easy for the agent Keep each repo you onboard **clear and automatable**: documented build/test commands, consistent layout, and project-level agent hints (`CLAUDE.md`, `.claude/`). See [Make your codebase AI ready](https://medium.com/@alain.krok/make-your-codebase-ai-ready-05d6a160f1d5) for practical guidance. \ No newline at end of file diff --git a/docs/src/content/docs/user-guide/Task-lifecycle.md b/docs/src/content/docs/user-guide/Task-lifecycle.md index da99c79..763f4dd 100644 --- a/docs/src/content/docs/user-guide/Task-lifecycle.md +++ b/docs/src/content/docs/user-guide/Task-lifecycle.md @@ -20,7 +20,7 @@ The orchestrator uses Lambda Durable Functions to manage the lifecycle durably | `HYDRATING` | Orchestrator passed admission control; assembling the agent payload | | `RUNNING` | Agent session started and actively working on the task | | `COMPLETED` | Agent finished and created a PR (or determined no changes were needed) | -| `FAILED` | Agent encountered an error, user concurrency limit was reached, or content was blocked by guardrail screening | +| `FAILED` | Agent encountered an error, user concurrency limit was reached, content was blocked by guardrail screening, or **pre-flight** checks failed before the agent started (for example an underpowered GitHub PAT) | | `CANCELLED` | Task was cancelled by the user | | `TIMED_OUT` | Task exceeded the maximum allowed duration (~9 hours) | @@ -42,4 +42,6 @@ Each lifecycle transition is recorded as an audit event. Use the events endpoint curl "$API_URL/tasks//events" -H "Authorization: $TOKEN" ``` -Events include: `task_created`, `admission_rejected`, `preflight_failed`, `hydration_started`, `hydration_complete`, `guardrail_blocked`, `session_started`, `pr_created`, `pr_updated`, `task_completed`, `task_failed`, `task_cancelled`, `task_timed_out`. Event records are subject to the same 90-day retention as task records and are automatically deleted after that period. \ No newline at end of file +Events include: `task_created`, `admission_rejected`, `preflight_failed`, `hydration_started`, `hydration_complete`, `guardrail_blocked`, `session_started`, `pr_created`, `pr_updated`, `task_completed`, `task_failed`, `task_cancelled`, `task_timed_out`. Event records are subject to the same 90-day retention as task records and are automatically deleted after that period. + +**`preflight_failed`:** The orchestrator could not safely start work (GitHub API checks run **before** hydration and AgentCore). Open the event in `bgagent events ` (or the JSON from `GET /tasks/{id}/events`) and read **`reason`** and **`detail`**. Typical values for **`reason`** include `GITHUB_UNREACHABLE`, `REPO_NOT_FOUND_OR_NO_ACCESS`, `INSUFFICIENT_GITHUB_REPO_PERMISSIONS`, and `PR_NOT_FOUND_OR_CLOSED`. The most common fix for **`INSUFFICIENT_GITHUB_REPO_PERMISSIONS`** is to update the GitHub PAT in AWS Secrets Manager so it matches your task type—for **`new_task`** / **`pr_iteration`** you need **Contents** read/write and **Pull requests** read/write on the target repo; **`pr_review`** can pass with **Triage** (or higher) when you do not need to push. See [Developer guide — Repository preparation](/developer-guide/repository-preparation) for the full table and `put-secret-value` steps. \ No newline at end of file diff --git a/docs/src/content/docs/user-guide/Tips.md b/docs/src/content/docs/user-guide/Tips.md index 755604e..0c38fe7 100644 --- a/docs/src/content/docs/user-guide/Tips.md +++ b/docs/src/content/docs/user-guide/Tips.md @@ -3,6 +3,7 @@ title: Tips --- - **Onboard your repo first**: Repositories must be registered via a `Blueprint` construct before tasks can target them. If you get a `REPO_NOT_ONBOARDED` error, contact your platform administrator. +- **GitHub PAT and `preflight_failed`**: If a task ends in `FAILED` with a `preflight_failed` event, the platform rejected the run before the agent consumed compute—often a token scoped read-only while the task needed push access. Check event `reason` / `detail` and align your fine-grained PAT with [Repository preparation](/developer-guide/repository-preparation); then update the secret and submit a new task. - **Prepare your repo**: The agent works best with repositories that are agent friendly. See the [Developer guide](/developer-guide/introduction) for repository preparation advice. - **Add a CLAUDE.md**: The agent automatically loads project-level configuration from your repository — `CLAUDE.md`, `.claude/CLAUDE.md`, `.claude/rules/*.md`, `.claude/settings.json`, `.claude/agents/`, and `.mcp.json`. Use these to provide project-specific build commands, conventions, constraints, custom subagents, and architecture notes. See the [Prompt guide](/user-guide/prompt-guide#repo-level-customization) for details and examples. - **Issue vs text**: When using `--issue` (CLI) or `issue_number` (API), the agent fetches the full issue body from GitHub, including any labels, comments, and linked context. This is usually better than a short text description. diff --git a/docs/src/content/docs/user-guide/Using-the-cli.md b/docs/src/content/docs/user-guide/Using-the-cli.md index d721420..36e8480 100644 --- a/docs/src/content/docs/user-guide/Using-the-cli.md +++ b/docs/src/content/docs/user-guide/Using-the-cli.md @@ -127,8 +127,11 @@ node lib/bin/bgagent.js list --repo owner/repo --limit 10 ```bash node lib/bin/bgagent.js events node lib/bin/bgagent.js events --limit 20 +node lib/bin/bgagent.js events --output json ``` +Use **`--output json`** to see the full payload for **`preflight_failed`** (`reason`, `detail`, and per-check metadata). See **Task events** under **Task lifecycle** for how to interpret common `reason` values. + ### Cancelling a task ```bash diff --git a/docs/src/content/docs/user-guide/Using-the-rest-api.md b/docs/src/content/docs/user-guide/Using-the-rest-api.md index f35021c..ce1cbc8 100644 --- a/docs/src/content/docs/user-guide/Using-the-rest-api.md +++ b/docs/src/content/docs/user-guide/Using-the-rest-api.md @@ -162,4 +162,4 @@ Transitions the task to `CANCELLED` and records a cancellation event. Only tasks curl "$API_URL/tasks//events" -H "Authorization: $TOKEN" ``` -Returns the chronological event log for a task (e.g., `task_created`, `session_started`, `task_completed`). Supports `limit` and `next_token` pagination parameters. \ No newline at end of file +Returns the chronological event log for a task (e.g., `task_created`, `preflight_failed`, `session_started`, `task_completed`). Supports `limit` and `next_token` pagination parameters. If the task failed before the agent ran, inspect `preflight_failed` entries for `reason` and `detail` (see **Task events** under **Task lifecycle**). \ No newline at end of file