Depends on: #18 (Reorganize agent/ directory structure)
The following body was improved by Claude with the Opus 4.6 model to achieve accuracy in line counts and file names.
The problem
I spent a good amount of time trying to understand agent/entrypoint.py and honestly, even with AI tools helping me read through it, it was really hard to follow. It's a single 2,092-line file that handles config, repo cloning, prompt building, the Claude SDK call, post-hooks, PR creation, telemetry, metrics - all mixed together. There's no type safety (everything is passed around as dict), the config gets mutated mid-flight, and there are a bunch of lines of dead code (deprecated functions, 100 lines of comments for a disabled OTEL feature, duplicated helpers). The run_task() function alone is 300+ lines doing everything at once.
I think this creates the following problems:
- Hard to onboard: even with AI coding assistants, understanding the pipeline takes multiple passes.
- Hard to debug: every bug leads to "open entrypoint.py and search". There's no way to know that a PR creation issue lives in lines 452-819 without reading the whole file.
- Hard to change safely: no typed contracts between pipeline steps. A function returns a dict, another function reads from that dict with
.get() and hopes the keys are there. Nothing catches breakage until runtime.
Proposed solution
1. Break entrypoint.py into focused modules
The pipeline is actually simple - it's a linear sequence where each step takes typed input and returns typed output.
agent/src/
├── models.py <- Pydantic models (shared language between modules)
├── config.py <- Build TaskConfig from env vars + parameters
├── shell.py <- run_cmd(), slugify(), redact_secrets()
├── repo.py <- Clone, branch, mise install, baseline build/lint
├── runner.py <- Claude SDK invocation
├── post_hooks.py <- Safety commit, verify build/lint, create/push PR
├── prompt_builder.py <- Assemble system prompt from templates + context
├── telemetry.py <- CloudWatch Logs: trajectory events + metrics
└── pipeline.py <- The thin orchestrator - calls the above in order
How they connect:
pipeline.py (orchestrator - the only file that imports from the modules below)
│
├── config.py -> returns TaskConfig
├── repo.py -> takes TaskConfig, returns RepoSetup
├── prompt_builder.py -> takes TaskConfig + RepoSetup + HydratedContext, returns str
├── runner.py -> takes prompt + system_prompt + TaskConfig, returns AgentResult
├── post_hooks.py -> takes TaskConfig + RepoSetup + AgentResult, returns PostHookResult
├── memory.py -> takes config + results, returns bool
├── task_state.py -> takes task_id + status
└── telemetry.py -> takes task_id + metrics dict
PR creation failing? Open post_hooks.py. Agent timing out? Open runner.py.
What each module replaces
| Module |
Replaces from entrypoint.py |
models.py |
New file (no equivalent today) |
config.py |
build_config() (lines 103-175) + resolve_github_token() (lines 177-202) |
shell.py |
run_cmd() (lines 57-100) + slugify() (lines 204-210) + duplicated _clean_env() (lines 255 and 1225) |
repo.py |
setup_repo() (lines 280-420) + _detect_default_branch() (lines 422-450) |
runner.py |
run_agent() (lines 1310-1547) + _setup_agent_env() (lines 1194-1307) |
post_hooks.py |
Lines 452-819: ensure_committed, verify_build, verify_lint, ensure_pushed, ensure_pr |
prompt_builder.py |
_build_system_prompt() (lines 821-890) + _discover_project_config() (lines 892-948) |
telemetry.py |
_TrajectoryWriter class (lines 951-1119) + _emit_metrics_to_cloudwatch() (lines 1121-1168) |
pipeline.py |
run_task() (lines 1708-2015, the 307-line function) |
What gets deleted (~290 lines)
Deprecated functions (fetch_github_issue, assemble_prompt), dry run mode, 100 lines of comments for a disabled OTEL feature, duplicated _clean_env, and inline token parsing/status logic replaced by typed functions.
2. Replace dicts with Pydantic models
The hardest part of reading the code was figuring out "what keys does this dict have?" because the answer depends on which code path built it and whether something got added mid-flight. The project already uses Pydantic in server.py, so this is consistent.
models.py
from __future__ import annotations
from enum import Enum
from pydantic import BaseModel, ConfigDict
class TaskType(str, Enum):
NEW_TASK = "new_task"
PR_ITERATION = "pr_iteration"
PR_REVIEW = "pr_review"
@property
def is_pr_task(self) -> bool:
return self in (TaskType.PR_ITERATION, TaskType.PR_REVIEW)
@property
def is_read_only(self) -> bool:
return self == TaskType.PR_REVIEW
class TaskConfig(BaseModel):
"""Immutable task configuration. Built once, never mutated."""
model_config = ConfigDict(frozen=True)
repo_url: str
github_token: str
aws_region: str
task_id: str
task_type: TaskType = TaskType.NEW_TASK
task_description: str = ""
issue_number: str = ""
anthropic_model: str = "us.anthropic.claude-sonnet-4-6"
max_turns: int = 100
max_budget_usd: float | None = None
system_prompt_overrides: str = ""
branch_name: str = ""
pr_number: str = ""
prompt_version: str = ""
memory_id: str = ""
class HydratedContext(BaseModel):
"""Context assembled by the orchestrator and passed to the agent container."""
model_config = ConfigDict(frozen=True)
user_prompt: str
memory_context: dict | None = None
resolved_base_branch: str | None = None
truncated: bool = False
class RepoSetup(BaseModel):
"""Result of cloning and preparing the repository."""
model_config = ConfigDict(frozen=True)
repo_dir: str
branch: str
default_branch: str
build_before: bool
lint_before: bool
notes: list[str] = []
class TokenUsage(BaseModel):
"""Token usage from a Claude SDK session."""
model_config = ConfigDict(frozen=True)
input_tokens: int = 0
output_tokens: int = 0
cache_read_input_tokens: int = 0
cache_creation_input_tokens: int = 0
class AgentResult(BaseModel):
"""Result from the Claude SDK agent invocation."""
model_config = ConfigDict(frozen=True)
status: str = "unknown"
turns: int = 0
cost_usd: float | None = None
num_turns: int = 0
duration_ms: int = 0
duration_api_ms: int = 0
session_id: str = ""
usage: TokenUsage | None = None
error: str | None = None
class PostHookResult(BaseModel):
"""Results from all post-agent hooks."""
model_config = ConfigDict(frozen=True)
safety_committed: bool = False
build_passed: bool = False
lint_passed: bool = False
pr_url: str | None = None
class TaskResult(BaseModel):
"""Final result of a complete task pipeline run."""
model_config = ConfigDict(frozen=True)
status: str
task_id: str
agent_status: str
pr_url: str | None = None
build_passed: bool = False
lint_passed: bool = False
cost_usd: float | None = None
turns: int | None = None
duration_s: float = 0.0
error: str | None = None
session_id: str = ""
prompt_version: str | None = None
memory_written: bool = False
usage: TokenUsage | None = None
Why frozen + Pydantic instead of dicts
Today the code mutates the config dict mid-flight (config["issue"] = ...), so you can't trust what a dict contains at any point. With frozen=True, data flows forward through return values only.
| Before (dict) |
After (Pydantic) |
config["repo_url"] - may KeyError |
config.repo_url - validated at construction |
config.get("max_budget_usd") - None or missing? |
config.max_budget_usd - typed as `float |
if config.get("task_type") in PR_TASK_TYPES (8 times) |
config.task_type.is_pr_task |
HydratedContext(**hydrated_context) - hope keys match |
HydratedContext.model_validate(hydrated_context) - validates + coerces |
result.__dict__ to serialize |
result.model_dump() - handles enums, nested models |
| No IDE autocomplete |
Full autocomplete + type checking |
3. The new pipeline
The current run_task() is 307 lines. The new one is ~120 lines - just connecting the modules:
def run_task(repo_url, task_description="", ..., **kwargs) -> dict:
# 1. Build config
config = build_config(repo_url=repo_url, ...)
# 2. Parse hydrated context from orchestrator
context = HydratedContext.model_validate(hydrated_context) if hydrated_context else None
with task_span("task.pipeline", attributes={"task.id": config.task_id}):
task_state.write_running(config.task_id)
# 3. Resolve prompt
prompt = context.user_prompt if context else _local_prompt(config)
# 4. Setup repository
setup = setup_repo(config) # -> RepoSetup
# 5. Build system prompt
system_prompt = build_system_prompt(config, setup, context) # -> str
# 6. Run agent
agent_result = invoke_agent(prompt, system_prompt, config, cwd=setup.repo_dir) # -> AgentResult
# 7. Post-hooks (commit, build, lint, PR)
post = run_post_hooks(config, setup, agent_result) # -> PostHookResult
# 8. Memory
memory_written = _write_memory(config, agent_result, post)
# 9. Build result, emit metrics, write terminal state
result = TaskResult(status=..., task_id=config.task_id, ...)
emit_metrics(config.task_id, result.model_dump())
task_state.write_terminal(config.task_id, ...)
return result.model_dump()
Please let me know what you think about this @krokoko
Acknowledgements
The problem
I spent a good amount of time trying to understand
agent/entrypoint.pyand honestly, even with AI tools helping me read through it, it was really hard to follow. It's a single 2,092-line file that handles config, repo cloning, prompt building, the Claude SDK call, post-hooks, PR creation, telemetry, metrics - all mixed together. There's no type safety (everything is passed around asdict), the config gets mutated mid-flight, and there are a bunch of lines of dead code (deprecated functions, 100 lines of comments for a disabled OTEL feature, duplicated helpers). Therun_task()function alone is 300+ lines doing everything at once.I think this creates the following problems:
.get()and hopes the keys are there. Nothing catches breakage until runtime.Proposed solution
1. Break
entrypoint.pyinto focused modulesThe pipeline is actually simple - it's a linear sequence where each step takes typed input and returns typed output.
How they connect:
PR creation failing? Open
post_hooks.py. Agent timing out? Openrunner.py.What each module replaces
models.pyconfig.pybuild_config()(lines 103-175) +resolve_github_token()(lines 177-202)shell.pyrun_cmd()(lines 57-100) +slugify()(lines 204-210) + duplicated_clean_env()(lines 255 and 1225)repo.pysetup_repo()(lines 280-420) +_detect_default_branch()(lines 422-450)runner.pyrun_agent()(lines 1310-1547) +_setup_agent_env()(lines 1194-1307)post_hooks.pyensure_committed,verify_build,verify_lint,ensure_pushed,ensure_prprompt_builder.py_build_system_prompt()(lines 821-890) +_discover_project_config()(lines 892-948)telemetry.py_TrajectoryWriterclass (lines 951-1119) +_emit_metrics_to_cloudwatch()(lines 1121-1168)pipeline.pyrun_task()(lines 1708-2015, the 307-line function)What gets deleted (~290 lines)
Deprecated functions (
fetch_github_issue,assemble_prompt), dry run mode, 100 lines of comments for a disabled OTEL feature, duplicated_clean_env, and inline token parsing/status logic replaced by typed functions.2. Replace dicts with Pydantic models
The hardest part of reading the code was figuring out "what keys does this dict have?" because the answer depends on which code path built it and whether something got added mid-flight. The project already uses Pydantic in
server.py, so this is consistent.models.pyWhy frozen + Pydantic instead of dicts
Today the code mutates the config dict mid-flight (
config["issue"] = ...), so you can't trust what a dict contains at any point. Withfrozen=True, data flows forward through return values only.config["repo_url"]- may KeyErrorconfig.repo_url- validated at constructionconfig.get("max_budget_usd")- None or missing?config.max_budget_usd- typed as `floatif config.get("task_type") in PR_TASK_TYPES(8 times)config.task_type.is_pr_taskHydratedContext(**hydrated_context)- hope keys matchHydratedContext.model_validate(hydrated_context)- validates + coercesresult.__dict__to serializeresult.model_dump()- handles enums, nested models3. The new pipeline
The current
run_task()is 307 lines. The new one is ~120 lines - just connecting the modules:Please let me know what you think about this @krokoko
Acknowledgements