Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ To be safe, the agent isolates each task into its own workspace directory:

### Endpoints

**`GET /ping`** — Health check. Returns `{"status": "healthy"}`. Stays responsive while the agent runs.
**`GET /ping`** — Health check. Returns `{"status": "healthy"}` while the last accepted task completed normally (or no task has failed in the background thread). If the background pipeline thread raised after `/invocations` returned, returns **503** with `{"status":"unhealthy","reason":"background_pipeline_failed"}` so a live process is not mistaken for successful work. The task should also be marked **FAILED** in DynamoDB (written from both `pipeline.run_task` and the server thread as a backup). A separate daemon thread writes a DynamoDB heartbeat (`agent_heartbeat_at`) every 45 seconds while a task is running; the orchestrator uses this to detect agent crashes (see [ORCHESTRATOR.md](../docs/design/ORCHESTRATOR.md)). The heartbeat worker is resilient to transient DynamoDB errors — each write is wrapped in try/except so a single failure does not kill the thread.

**`POST /invocations`** — Accept a task and start the agent in a **background thread**. The handler returns **immediately** with an acceptance payload; it does not wait for the agent to finish. While the task runs, progress and the final outcome are written to **DynamoDB** when `TASK_TABLE_NAME` is set (see `task_state.py`); the deployed platform polls that table via the orchestrator. For ad-hoc local testing without DynamoDB, follow **`docker logs -f bgagent-run`** (or your container name).

Expand Down Expand Up @@ -270,7 +270,8 @@ The agent pipeline (shared by both modes). Behavior varies by task type (`new_ta
- **`pr_iteration`**: Reads review feedback, addresses it with focused changes, commits and pushes, posts a summary comment on the PR
- **`pr_review`**: Analyzes changes read-only (no `Write` or `Edit` tools available), composes structured review findings, posts a batch review via the GitHub Reviews API
6. **Deterministic post-hooks** — verifies `mise run build` and `mise run lint`, ensures a PR exists (creates one if the agent did not). For `pr_review`, build status is informational only and the commit/push steps are skipped.
7. **Metrics** — returns duration, disk usage, turn count, cost, and PR URL
7. **Status resolution** — `_resolve_overall_task_status()` maps the agent outcome (success/end_turn/error/unknown) and build gate into a final task status. `agent_status=unknown` (SDK stream ended without a ResultMessage) always fails — success is never inferred from PR or build alone. If a post-hook raises after the agent ran, `_chain_prior_agent_error()` preserves the agent-layer error so it is not masked by the later exception.
8. **Metrics** — returns duration, disk usage, turn count, cost, and PR URL

## Metrics

Expand Down Expand Up @@ -327,8 +328,8 @@ agent/
│ ├── __init__.py
│ ├── entrypoint.py Re-export shim for backward compatibility (tests); delegates to specific modules
│ ├── config.py Configuration: build_config(), get_config(), resolve_github_token(), TaskType validation
│ ├── models.py Data models and enumerations (TaskType StrEnum with is_pr_task property)
│ ├── pipeline.py Top-level pipeline: main() CLI entry, run_task() orchestration
│ ├── models.py Pydantic data models (TaskConfig, RepoSetup, AgentResult, TaskResult, HydratedContext, etc.) and enumerations (TaskType StrEnum)
│ ├── pipeline.py Top-level pipeline: main() CLI entry, run_task() orchestration, status resolution, error chaining
│ ├── runner.py Agent runner: run_agent() — ClaudeSDKClient connect/query/receive_response
│ ├── context.py Context hydration: fetch_github_issue(), assemble_prompt() (local/dry-run only)
│ ├── prompt_builder.py System prompt assembly + memory context, repo config scanning
Expand All @@ -338,8 +339,8 @@ agent/
│ ├── repo.py Repository setup: clone, branch, git auth, mise trust/install/build/lint
│ ├── shell.py Shell utilities: log(), run_cmd(), redact_secrets(), slugify(), truncate()
│ ├── telemetry.py Metrics, disk usage, trajectory writer (_TrajectoryWriter with write_policy_decision)
│ ├── server.py FastAPI — async /invocations (background thread) and /ping; OTEL session correlation
│ ├── task_state.py Best-effort DynamoDB task status (no-op if TASK_TABLE_NAME unset)
│ ├── server.py FastAPI — async /invocations (background thread), /ping health check, heartbeat daemon; OTEL session correlation
│ ├── task_state.py Best-effort DynamoDB task status and heartbeat writes (no-op if TASK_TABLE_NAME unset)
│ ├── observability.py OpenTelemetry helpers (e.g. AgentCore session id)
│ ├── memory.py Optional memory / episode integration for the agent
│ ├── system_prompt.py Behavioral contract (PRD Section 11)
Expand All @@ -354,9 +355,9 @@ agent/
├── tests/ pytest unit tests (pythonpath: src/)
│ ├── test_config.py Config validation and TaskType tests
│ ├── test_hooks.py PreToolUse hook and hook matcher tests
│ ├── test_models.py TaskType enum tests
│ ├── test_models.py Pydantic model tests (construction, validation, frozen enforcement, model_dump)
│ ├── test_policy.py Cedar policy engine tests (fail-closed, deny-list)
│ ├── test_pipeline.py Pipeline orchestration tests (cedar_policies injection)
│ ├── test_pipeline.py Pipeline tests (cedar_policies injection, _resolve_overall_task_status, _chain_prior_agent_error)
│ ├── test_shell.py Shell utility tests (slugify, redact_secrets, truncate, format_bytes)
│ └── ...
├── test_sdk_smoke.py Diagnostic: minimal SDK smoke test (ClaudeSDKClient → CLI → Bedrock)
Expand Down
3 changes: 2 additions & 1 deletion agent/src/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def fetch_github_issue(repo_url: str, issue_number: str, token: str) -> GitHubIs
)
comments_resp.raise_for_status()
comments = [
IssueComment(author=c["user"]["login"], body=c["body"]) for c in comments_resp.json()
IssueComment(id=int(c["id"]), author=c["user"]["login"], body=c["body"] or "")
for c in comments_resp.json()
]

return GitHubIssue(
Expand Down
49 changes: 39 additions & 10 deletions agent/src/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from __future__ import annotations

from enum import StrEnum
from typing import Self

from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel, ConfigDict, Field, model_validator


class TaskType(StrEnum):
Expand All @@ -24,36 +25,64 @@ def is_read_only(self) -> bool:


class IssueComment(BaseModel):
model_config = ConfigDict(frozen=True)
"""Single GitHub issue comment — mirrors ``IssueComment`` in context-hydration.ts."""

model_config = ConfigDict(frozen=True, extra="forbid")

id: int
author: str
body: str


class GitHubIssue(BaseModel):
model_config = ConfigDict(frozen=True)
"""GitHub issue slice — mirrors ``GitHubIssueContext`` in context-hydration.ts."""

model_config = ConfigDict(frozen=True, extra="forbid")

title: str
body: str = ""
number: int
comments: list[IssueComment] = []
comments: list[IssueComment] = Field(default_factory=list)


class MemoryContext(BaseModel):
model_config = ConfigDict(frozen=True)
model_config = ConfigDict(frozen=True, extra="forbid")

repo_knowledge: list[str] = Field(default_factory=list)
past_episodes: list[str] = Field(default_factory=list)


repo_knowledge: list[str] = []
past_episodes: list[str] = []
# Bump when this agent supports a new orchestrator HydratedContext shape
# (see cdk/src/handlers/shared/context-hydration.ts).
SUPPORTED_HYDRATED_CONTEXT_VERSION = 1


class HydratedContext(BaseModel):
model_config = ConfigDict(frozen=True)
"""Orchestrator context JSON — keep in sync with HydratedContext in context-hydration.ts."""

model_config = ConfigDict(frozen=True, extra="forbid")

version: int = 1
user_prompt: str
issue: GitHubIssue | None = None
resolved_base_branch: str | None = None
truncated: bool = False
memory_context: MemoryContext | None = None
sources: list[str] = Field(default_factory=list)
token_estimate: int = 0
truncated: bool = False
fallback_error: str | None = None
guardrail_blocked: str | None = None
resolved_branch_name: str | None = None
resolved_base_branch: str | None = None

@model_validator(mode="after")
def version_supported(self) -> Self:
if self.version > SUPPORTED_HYDRATED_CONTEXT_VERSION:
raise ValueError(
f"HydratedContext schema version {self.version} is not supported by this agent "
f"(max supported: {SUPPORTED_HYDRATED_CONTEXT_VERSION}). "
"Deploy an updated agent container image."
)
return self


class TaskConfig(BaseModel):
Expand Down
118 changes: 93 additions & 25 deletions agent/src/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import sys
import time

from pydantic import ValidationError

import memory as agent_memory
import task_state
from config import AGENT_WORKSPACE, build_config, get_config
Expand All @@ -28,6 +30,55 @@
from system_prompt import SYSTEM_PROMPT
from telemetry import format_bytes, get_disk_usage, print_metrics

_SDK_NO_RESULT_MESSAGE = (
"Agent SDK stream ended without a ResultMessage (agent_status=unknown). "
"Treat as failure: possible SDK bug, network interruption, or protocol mismatch."
)


def _chain_prior_agent_error(agent_result: AgentResult | None, exc: BaseException) -> str:
"""Preserve agent-layer failures when a later pipeline stage raises."""
tail = f"{type(exc).__name__}: {exc}"
if agent_result is None:
return tail
if agent_result.error:
return f"{agent_result.error}; subsequent failure: {tail}"
if agent_result.status == "error":
return f"Agent reported status=error; subsequent failure: {tail}"
return tail


def _resolve_overall_task_status(
agent_result: AgentResult,
*,
build_ok: bool,
pr_url: str | None,
) -> tuple[str, str | None]:
"""Map agent outcome + build gate to (overall_status, error_for_task_result)."""
agent_status = agent_result.status
err = agent_result.error

if agent_status in ("success", "end_turn") and build_ok:
return "success", err

if agent_status == "unknown":
if pr_url:
log(
"INFO",
f"No ResultMessage from SDK (agent_status=unknown); pr_url present: {pr_url}",
)
if build_ok:
log(
"INFO",
"No ResultMessage from SDK; build_ok=True (informational; task still failed)",
)
merged = f"{err}; {_SDK_NO_RESULT_MESSAGE}" if err else _SDK_NO_RESULT_MESSAGE
return "error", merged

if not err:
err = f"Task did not succeed (agent_status={agent_status!r}, build_ok={build_ok})"
return "error", err


def _write_memory(
config: TaskConfig,
Expand Down Expand Up @@ -149,20 +200,43 @@ def run_task(
},
) as root_span:
task_state.write_running(config.task_id)
task_state.write_heartbeat(config.task_id)

agent_result: AgentResult | None = None
try:
# Context hydration
with task_span("task.context_hydration"):
if hydrated_context:
log("TASK", "Using hydrated context from orchestrator")
hc = HydratedContext.model_validate(hydrated_context)
try:
hc = HydratedContext.model_validate(hydrated_context)
except ValidationError as err:
parts = [
f"{'.'.join(str(x) for x in e['loc'])}: {e['msg']}"
for e in err.errors()
]
log(
"ERROR",
"HydratedContext validation failed (orchestrator vs agent contract): "
+ "; ".join(parts),
)
raise
prompt = hc.user_prompt
if hc.issue:
config.issue = hc.issue
if hc.resolved_branch_name:
config.branch_name = hc.resolved_branch_name
if hc.resolved_base_branch:
config.base_branch = hc.resolved_base_branch
if hc.truncated:
log("WARN", "Context was truncated by orchestrator token budget")
if hc.fallback_error:
log("WARN", f"Orchestrator context fallback: {hc.fallback_error}")
if hc.guardrail_blocked:
log(
"WARN",
f"Orchestrator guardrail blocked content: {hc.guardrail_blocked}",
)
else:
hc = None
# Local batch mode — fetch issue and assemble prompt in-container
Expand Down Expand Up @@ -240,7 +314,7 @@ def run_task(
agent_span.record_exception(e)
agent_result = AgentResult(status="error", error=str(e))

# Post-hooks
# Post-hooks (agent_result is guaranteed set by the try/except above)
with task_span("task.post_hooks") as post_span:
# Safety net: commit any uncommitted tracked changes (skip for read-only tasks)
if config.task_type == "pr_review":
Expand Down Expand Up @@ -276,16 +350,9 @@ def run_task(
duration = time.time() - start_time
disk_after = get_disk_usage(AGENT_WORKSPACE)

# Determine overall status:
# - "success" if the agent reported success/end_turn and the build passes
# (or the build was already broken before the agent ran — pre-existing failure)
# - "success" if agent_status is unknown (SDK didn't yield ResultMessage)
# but the pipeline produced a PR and the build didn't regress
# - "error" otherwise
# NOTE: lint_passed is intentionally NOT used in the status
# determination — lint failures are advisory and reported in the PR
# body and span attributes but do not affect the task's terminal
# status. Lint regression detection is planned for Iteration 3c.
# Overall status: do not infer success from PR/build when the SDK never
# emitted ResultMessage (agent_status=unknown) — that masks protocol gaps.
# NOTE: lint_passed is intentionally NOT used for terminal status.
agent_status = agent_result.status
# Default True = assume build was green before, so a post-agent
# failure IS counted as a regression (conservative).
Expand All @@ -302,17 +369,11 @@ def run_task(
"Post-agent build failed, but build was already failing before "
"agent changes — not counting as regression",
)
if agent_status in ("success", "end_turn") and build_ok:
overall_status = "success"
elif agent_status == "unknown" and pr_url and build_ok:
log(
"WARN",
"Agent SDK did not yield a ResultMessage, but PR was created "
"and build didn't regress — treating as success",
)
overall_status = "success"
else:
overall_status = "error"
overall_status, result_error = _resolve_overall_task_status(
agent_result,
build_ok=build_ok,
pr_url=pr_url,
)

# Build TaskResult
usage = agent_result.usage
Expand All @@ -331,7 +392,7 @@ def run_task(
disk_delta=format_bytes(disk_after - disk_before),
prompt_version=prompt_version or None,
memory_written=memory_written,
error=agent_result.error,
error=result_error,
session_id=agent_result.session_id or None,
input_tokens=usage.input_tokens if usage else None,
output_tokens=usage.output_tokens if usage else None,
Expand Down Expand Up @@ -377,7 +438,14 @@ def run_task(
except Exception as e:
# Ensure the task is marked FAILED in DynamoDB even if the pipeline
# crashes before reaching the normal terminal-state write.
crash_result = TaskResult(status="error", error=str(e), task_id=config.task_id)
agent_for_chain = agent_result
combined = _chain_prior_agent_error(agent_for_chain, e)
crash_result = TaskResult(
status="error",
error=combined,
task_id=config.task_id,
agent_status=agent_for_chain.status if agent_for_chain else "unknown",
)
task_state.write_terminal(config.task_id, "FAILED", crash_result.model_dump())
raise

Expand Down
Loading
Loading