Skip to content

Commit d8f8602

Browse files
author
bgagent
committed
project(arch): address gaps and update docs
1 parent e4c4343 commit d8f8602

38 files changed

Lines changed: 1201 additions & 151 deletions

agent/README.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ To be safe, the agent isolates each task into its own workspace directory:
135135

136136
### Endpoints
137137

138-
**`GET /ping`** — Health check. Returns `{"status": "healthy"}`. Stays responsive while the agent runs.
138+
**`GET /ping`** — Health check. Returns `{"status": "healthy"}` while the last accepted task completed normally (or no task has failed in the background thread). If the background pipeline thread raised after `/invocations` returned, returns **503** with `{"status":"unhealthy","reason":"background_pipeline_failed"}` so a live process is not mistaken for successful work. The task should also be marked **FAILED** in DynamoDB (written from both `pipeline.run_task` and the server thread as a backup). A separate daemon thread writes a DynamoDB heartbeat (`agent_heartbeat_at`) every 45 seconds while a task is running; the orchestrator uses this to detect agent crashes (see [ORCHESTRATOR.md](../docs/design/ORCHESTRATOR.md)). The heartbeat worker is resilient to transient DynamoDB errors — each write is wrapped in try/except so a single failure does not kill the thread.
139139

140140
**`POST /invocations`** — Accept a task and start the agent in a **background thread**. The handler returns **immediately** with an acceptance payload; it does not wait for the agent to finish. While the task runs, progress and the final outcome are written to **DynamoDB** when `TASK_TABLE_NAME` is set (see `task_state.py`); the deployed platform polls that table via the orchestrator. For ad-hoc local testing without DynamoDB, follow **`docker logs -f bgagent-run`** (or your container name).
141141

@@ -270,7 +270,8 @@ The agent pipeline (shared by both modes). Behavior varies by task type (`new_ta
270270
- **`pr_iteration`**: Reads review feedback, addresses it with focused changes, commits and pushes, posts a summary comment on the PR
271271
- **`pr_review`**: Analyzes changes read-only (no `Write` or `Edit` tools available), composes structured review findings, posts a batch review via the GitHub Reviews API
272272
6. **Deterministic post-hooks** — verifies `mise run build` and `mise run lint`, ensures a PR exists (creates one if the agent did not). For `pr_review`, build status is informational only and the commit/push steps are skipped.
273-
7. **Metrics** — returns duration, disk usage, turn count, cost, and PR URL
273+
7. **Status resolution**`_resolve_overall_task_status()` maps the agent outcome (success/end_turn/error/unknown) and build gate into a final task status. `agent_status=unknown` (SDK stream ended without a ResultMessage) always fails — success is never inferred from PR or build alone. If a post-hook raises after the agent ran, `_chain_prior_agent_error()` preserves the agent-layer error so it is not masked by the later exception.
274+
8. **Metrics** — returns duration, disk usage, turn count, cost, and PR URL
274275

275276
## Metrics
276277

@@ -327,8 +328,8 @@ agent/
327328
│ ├── __init__.py
328329
│ ├── entrypoint.py Re-export shim for backward compatibility (tests); delegates to specific modules
329330
│ ├── config.py Configuration: build_config(), get_config(), resolve_github_token(), TaskType validation
330-
│ ├── models.py Data models and enumerations (TaskType StrEnum with is_pr_task property)
331-
│ ├── pipeline.py Top-level pipeline: main() CLI entry, run_task() orchestration
331+
│ ├── models.py Pydantic data models (TaskConfig, RepoSetup, AgentResult, TaskResult, HydratedContext, etc.) and enumerations (TaskType StrEnum)
332+
│ ├── pipeline.py Top-level pipeline: main() CLI entry, run_task() orchestration, status resolution, error chaining
332333
│ ├── runner.py Agent runner: run_agent() — ClaudeSDKClient connect/query/receive_response
333334
│ ├── context.py Context hydration: fetch_github_issue(), assemble_prompt() (local/dry-run only)
334335
│ ├── prompt_builder.py System prompt assembly + memory context, repo config scanning
@@ -338,8 +339,8 @@ agent/
338339
│ ├── repo.py Repository setup: clone, branch, git auth, mise trust/install/build/lint
339340
│ ├── shell.py Shell utilities: log(), run_cmd(), redact_secrets(), slugify(), truncate()
340341
│ ├── telemetry.py Metrics, disk usage, trajectory writer (_TrajectoryWriter with write_policy_decision)
341-
│ ├── server.py FastAPI — async /invocations (background thread) and /ping; OTEL session correlation
342-
│ ├── task_state.py Best-effort DynamoDB task status (no-op if TASK_TABLE_NAME unset)
342+
│ ├── server.py FastAPI — async /invocations (background thread), /ping health check, heartbeat daemon; OTEL session correlation
343+
│ ├── task_state.py Best-effort DynamoDB task status and heartbeat writes (no-op if TASK_TABLE_NAME unset)
343344
│ ├── observability.py OpenTelemetry helpers (e.g. AgentCore session id)
344345
│ ├── memory.py Optional memory / episode integration for the agent
345346
│ ├── system_prompt.py Behavioral contract (PRD Section 11)
@@ -354,9 +355,9 @@ agent/
354355
├── tests/ pytest unit tests (pythonpath: src/)
355356
│ ├── test_config.py Config validation and TaskType tests
356357
│ ├── test_hooks.py PreToolUse hook and hook matcher tests
357-
│ ├── test_models.py TaskType enum tests
358+
│ ├── test_models.py Pydantic model tests (construction, validation, frozen enforcement, model_dump)
358359
│ ├── test_policy.py Cedar policy engine tests (fail-closed, deny-list)
359-
│ ├── test_pipeline.py Pipeline orchestration tests (cedar_policies injection)
360+
│ ├── test_pipeline.py Pipeline tests (cedar_policies injection, _resolve_overall_task_status, _chain_prior_agent_error)
360361
│ ├── test_shell.py Shell utility tests (slugify, redact_secrets, truncate, format_bytes)
361362
│ └── ...
362363
├── test_sdk_smoke.py Diagnostic: minimal SDK smoke test (ClaudeSDKClient → CLI → Bedrock)

agent/src/context.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ def fetch_github_issue(repo_url: str, issue_number: str, token: str) -> GitHubIs
3131
)
3232
comments_resp.raise_for_status()
3333
comments = [
34-
IssueComment(author=c["user"]["login"], body=c["body"]) for c in comments_resp.json()
34+
IssueComment(id=int(c["id"]), author=c["user"]["login"], body=c["body"] or "")
35+
for c in comments_resp.json()
3536
]
3637

3738
return GitHubIssue(

agent/src/models.py

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
from __future__ import annotations
44

55
from enum import StrEnum
6+
from typing import Self
67

7-
from pydantic import BaseModel, ConfigDict
8+
from pydantic import BaseModel, ConfigDict, Field, model_validator
89

910

1011
class TaskType(StrEnum):
@@ -24,36 +25,64 @@ def is_read_only(self) -> bool:
2425

2526

2627
class IssueComment(BaseModel):
27-
model_config = ConfigDict(frozen=True)
28+
"""Single GitHub issue comment — mirrors ``IssueComment`` in context-hydration.ts."""
29+
30+
model_config = ConfigDict(frozen=True, extra="forbid")
2831

32+
id: int
2933
author: str
3034
body: str
3135

3236

3337
class GitHubIssue(BaseModel):
34-
model_config = ConfigDict(frozen=True)
38+
"""GitHub issue slice — mirrors ``GitHubIssueContext`` in context-hydration.ts."""
39+
40+
model_config = ConfigDict(frozen=True, extra="forbid")
3541

3642
title: str
3743
body: str = ""
3844
number: int
39-
comments: list[IssueComment] = []
45+
comments: list[IssueComment] = Field(default_factory=list)
4046

4147

4248
class MemoryContext(BaseModel):
43-
model_config = ConfigDict(frozen=True)
49+
model_config = ConfigDict(frozen=True, extra="forbid")
50+
51+
repo_knowledge: list[str] = Field(default_factory=list)
52+
past_episodes: list[str] = Field(default_factory=list)
53+
4454

45-
repo_knowledge: list[str] = []
46-
past_episodes: list[str] = []
55+
# Bump when this agent supports a new orchestrator HydratedContext shape
56+
# (see cdk/src/handlers/shared/context-hydration.ts).
57+
SUPPORTED_HYDRATED_CONTEXT_VERSION = 1
4758

4859

4960
class HydratedContext(BaseModel):
50-
model_config = ConfigDict(frozen=True)
61+
"""Orchestrator context JSON — keep in sync with HydratedContext in context-hydration.ts."""
62+
63+
model_config = ConfigDict(frozen=True, extra="forbid")
5164

65+
version: int = 1
5266
user_prompt: str
5367
issue: GitHubIssue | None = None
54-
resolved_base_branch: str | None = None
55-
truncated: bool = False
5668
memory_context: MemoryContext | None = None
69+
sources: list[str] = Field(default_factory=list)
70+
token_estimate: int = 0
71+
truncated: bool = False
72+
fallback_error: str | None = None
73+
guardrail_blocked: str | None = None
74+
resolved_branch_name: str | None = None
75+
resolved_base_branch: str | None = None
76+
77+
@model_validator(mode="after")
78+
def version_supported(self) -> Self:
79+
if self.version > SUPPORTED_HYDRATED_CONTEXT_VERSION:
80+
raise ValueError(
81+
f"HydratedContext schema version {self.version} is not supported by this agent "
82+
f"(max supported: {SUPPORTED_HYDRATED_CONTEXT_VERSION}). "
83+
"Deploy an updated agent container image."
84+
)
85+
return self
5786

5887

5988
class TaskConfig(BaseModel):

agent/src/pipeline.py

Lines changed: 93 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import sys
1010
import time
1111

12+
from pydantic import ValidationError
13+
1214
import memory as agent_memory
1315
import task_state
1416
from config import AGENT_WORKSPACE, build_config, get_config
@@ -28,6 +30,55 @@
2830
from system_prompt import SYSTEM_PROMPT
2931
from telemetry import format_bytes, get_disk_usage, print_metrics
3032

33+
_SDK_NO_RESULT_MESSAGE = (
34+
"Agent SDK stream ended without a ResultMessage (agent_status=unknown). "
35+
"Treat as failure: possible SDK bug, network interruption, or protocol mismatch."
36+
)
37+
38+
39+
def _chain_prior_agent_error(agent_result: AgentResult | None, exc: BaseException) -> str:
40+
"""Preserve agent-layer failures when a later pipeline stage raises."""
41+
tail = f"{type(exc).__name__}: {exc}"
42+
if agent_result is None:
43+
return tail
44+
if agent_result.error:
45+
return f"{agent_result.error}; subsequent failure: {tail}"
46+
if agent_result.status == "error":
47+
return f"Agent reported status=error; subsequent failure: {tail}"
48+
return tail
49+
50+
51+
def _resolve_overall_task_status(
52+
agent_result: AgentResult,
53+
*,
54+
build_ok: bool,
55+
pr_url: str | None,
56+
) -> tuple[str, str | None]:
57+
"""Map agent outcome + build gate to (overall_status, error_for_task_result)."""
58+
agent_status = agent_result.status
59+
err = agent_result.error
60+
61+
if agent_status in ("success", "end_turn") and build_ok:
62+
return "success", err
63+
64+
if agent_status == "unknown":
65+
if pr_url:
66+
log(
67+
"INFO",
68+
f"No ResultMessage from SDK (agent_status=unknown); pr_url present: {pr_url}",
69+
)
70+
if build_ok:
71+
log(
72+
"INFO",
73+
"No ResultMessage from SDK; build_ok=True (informational; task still failed)",
74+
)
75+
merged = f"{err}; {_SDK_NO_RESULT_MESSAGE}" if err else _SDK_NO_RESULT_MESSAGE
76+
return "error", merged
77+
78+
if not err:
79+
err = f"Task did not succeed (agent_status={agent_status!r}, build_ok={build_ok})"
80+
return "error", err
81+
3182

3283
def _write_memory(
3384
config: TaskConfig,
@@ -149,20 +200,43 @@ def run_task(
149200
},
150201
) as root_span:
151202
task_state.write_running(config.task_id)
203+
task_state.write_heartbeat(config.task_id)
152204

205+
agent_result: AgentResult | None = None
153206
try:
154207
# Context hydration
155208
with task_span("task.context_hydration"):
156209
if hydrated_context:
157210
log("TASK", "Using hydrated context from orchestrator")
158-
hc = HydratedContext.model_validate(hydrated_context)
211+
try:
212+
hc = HydratedContext.model_validate(hydrated_context)
213+
except ValidationError as err:
214+
parts = [
215+
f"{'.'.join(str(x) for x in e['loc'])}: {e['msg']}"
216+
for e in err.errors()
217+
]
218+
log(
219+
"ERROR",
220+
"HydratedContext validation failed (orchestrator vs agent contract): "
221+
+ "; ".join(parts),
222+
)
223+
raise
159224
prompt = hc.user_prompt
160225
if hc.issue:
161226
config.issue = hc.issue
227+
if hc.resolved_branch_name:
228+
config.branch_name = hc.resolved_branch_name
162229
if hc.resolved_base_branch:
163230
config.base_branch = hc.resolved_base_branch
164231
if hc.truncated:
165232
log("WARN", "Context was truncated by orchestrator token budget")
233+
if hc.fallback_error:
234+
log("WARN", f"Orchestrator context fallback: {hc.fallback_error}")
235+
if hc.guardrail_blocked:
236+
log(
237+
"WARN",
238+
f"Orchestrator guardrail blocked content: {hc.guardrail_blocked}",
239+
)
166240
else:
167241
hc = None
168242
# Local batch mode — fetch issue and assemble prompt in-container
@@ -240,7 +314,7 @@ def run_task(
240314
agent_span.record_exception(e)
241315
agent_result = AgentResult(status="error", error=str(e))
242316

243-
# Post-hooks
317+
# Post-hooks (agent_result is guaranteed set by the try/except above)
244318
with task_span("task.post_hooks") as post_span:
245319
# Safety net: commit any uncommitted tracked changes (skip for read-only tasks)
246320
if config.task_type == "pr_review":
@@ -276,16 +350,9 @@ def run_task(
276350
duration = time.time() - start_time
277351
disk_after = get_disk_usage(AGENT_WORKSPACE)
278352

279-
# Determine overall status:
280-
# - "success" if the agent reported success/end_turn and the build passes
281-
# (or the build was already broken before the agent ran — pre-existing failure)
282-
# - "success" if agent_status is unknown (SDK didn't yield ResultMessage)
283-
# but the pipeline produced a PR and the build didn't regress
284-
# - "error" otherwise
285-
# NOTE: lint_passed is intentionally NOT used in the status
286-
# determination — lint failures are advisory and reported in the PR
287-
# body and span attributes but do not affect the task's terminal
288-
# status. Lint regression detection is planned for Iteration 3c.
353+
# Overall status: do not infer success from PR/build when the SDK never
354+
# emitted ResultMessage (agent_status=unknown) — that masks protocol gaps.
355+
# NOTE: lint_passed is intentionally NOT used for terminal status.
289356
agent_status = agent_result.status
290357
# Default True = assume build was green before, so a post-agent
291358
# failure IS counted as a regression (conservative).
@@ -302,17 +369,11 @@ def run_task(
302369
"Post-agent build failed, but build was already failing before "
303370
"agent changes — not counting as regression",
304371
)
305-
if agent_status in ("success", "end_turn") and build_ok:
306-
overall_status = "success"
307-
elif agent_status == "unknown" and pr_url and build_ok:
308-
log(
309-
"WARN",
310-
"Agent SDK did not yield a ResultMessage, but PR was created "
311-
"and build didn't regress — treating as success",
312-
)
313-
overall_status = "success"
314-
else:
315-
overall_status = "error"
372+
overall_status, result_error = _resolve_overall_task_status(
373+
agent_result,
374+
build_ok=build_ok,
375+
pr_url=pr_url,
376+
)
316377

317378
# Build TaskResult
318379
usage = agent_result.usage
@@ -331,7 +392,7 @@ def run_task(
331392
disk_delta=format_bytes(disk_after - disk_before),
332393
prompt_version=prompt_version or None,
333394
memory_written=memory_written,
334-
error=agent_result.error,
395+
error=result_error,
335396
session_id=agent_result.session_id or None,
336397
input_tokens=usage.input_tokens if usage else None,
337398
output_tokens=usage.output_tokens if usage else None,
@@ -377,7 +438,14 @@ def run_task(
377438
except Exception as e:
378439
# Ensure the task is marked FAILED in DynamoDB even if the pipeline
379440
# crashes before reaching the normal terminal-state write.
380-
crash_result = TaskResult(status="error", error=str(e), task_id=config.task_id)
441+
agent_for_chain = agent_result
442+
combined = _chain_prior_agent_error(agent_for_chain, e)
443+
crash_result = TaskResult(
444+
status="error",
445+
error=combined,
446+
task_id=config.task_id,
447+
agent_status=agent_for_chain.status if agent_for_chain else "unknown",
448+
)
381449
task_state.write_terminal(config.task_id, "FAILED", crash_result.model_dump())
382450
raise
383451

0 commit comments

Comments
 (0)