Skip to content

Refactor: break up entrypoint.py into focused modules #19

@leandrodamascena

Description

@leandrodamascena

Depends on: #18 (Reorganize agent/ directory structure)

The following body was improved by Claude with the Opus 4.6 model to achieve accuracy in line counts and file names.

The problem

I spent a good amount of time trying to understand agent/entrypoint.py and honestly, even with AI tools helping me read through it, it was really hard to follow. It's a single 2,092-line file that handles config, repo cloning, prompt building, the Claude SDK call, post-hooks, PR creation, telemetry, metrics - all mixed together. There's no type safety (everything is passed around as dict), the config gets mutated mid-flight, and there are a bunch of lines of dead code (deprecated functions, 100 lines of comments for a disabled OTEL feature, duplicated helpers). The run_task() function alone is 300+ lines doing everything at once.

I think this creates the following problems:

  • Hard to onboard: even with AI coding assistants, understanding the pipeline takes multiple passes.
  • Hard to debug: every bug leads to "open entrypoint.py and search". There's no way to know that a PR creation issue lives in lines 452-819 without reading the whole file.
  • Hard to change safely: no typed contracts between pipeline steps. A function returns a dict, another function reads from that dict with .get() and hopes the keys are there. Nothing catches breakage until runtime.

Proposed solution

1. Break entrypoint.py into focused modules

The pipeline is actually simple - it's a linear sequence where each step takes typed input and returns typed output.

agent/src/
├── models.py           <- Pydantic models (shared language between modules)
├── config.py           <- Build TaskConfig from env vars + parameters
├── shell.py            <- run_cmd(), slugify(), redact_secrets()
├── repo.py             <- Clone, branch, mise install, baseline build/lint
├── runner.py           <- Claude SDK invocation
├── post_hooks.py       <- Safety commit, verify build/lint, create/push PR
├── prompt_builder.py   <- Assemble system prompt from templates + context
├── telemetry.py        <- CloudWatch Logs: trajectory events + metrics
└── pipeline.py         <- The thin orchestrator - calls the above in order

How they connect:

pipeline.py  (orchestrator - the only file that imports from the modules below)
    │
    ├── config.py          -> returns TaskConfig
    ├── repo.py            -> takes TaskConfig, returns RepoSetup
    ├── prompt_builder.py  -> takes TaskConfig + RepoSetup + HydratedContext, returns str
    ├── runner.py          -> takes prompt + system_prompt + TaskConfig, returns AgentResult
    ├── post_hooks.py      -> takes TaskConfig + RepoSetup + AgentResult, returns PostHookResult
    ├── memory.py          -> takes config + results, returns bool
    ├── task_state.py      -> takes task_id + status
    └── telemetry.py       -> takes task_id + metrics dict

PR creation failing? Open post_hooks.py. Agent timing out? Open runner.py.

What each module replaces

Module Replaces from entrypoint.py
models.py New file (no equivalent today)
config.py build_config() (lines 103-175) + resolve_github_token() (lines 177-202)
shell.py run_cmd() (lines 57-100) + slugify() (lines 204-210) + duplicated _clean_env() (lines 255 and 1225)
repo.py setup_repo() (lines 280-420) + _detect_default_branch() (lines 422-450)
runner.py run_agent() (lines 1310-1547) + _setup_agent_env() (lines 1194-1307)
post_hooks.py Lines 452-819: ensure_committed, verify_build, verify_lint, ensure_pushed, ensure_pr
prompt_builder.py _build_system_prompt() (lines 821-890) + _discover_project_config() (lines 892-948)
telemetry.py _TrajectoryWriter class (lines 951-1119) + _emit_metrics_to_cloudwatch() (lines 1121-1168)
pipeline.py run_task() (lines 1708-2015, the 307-line function)

What gets deleted (~290 lines)

Deprecated functions (fetch_github_issue, assemble_prompt), dry run mode, 100 lines of comments for a disabled OTEL feature, duplicated _clean_env, and inline token parsing/status logic replaced by typed functions.

2. Replace dicts with Pydantic models

The hardest part of reading the code was figuring out "what keys does this dict have?" because the answer depends on which code path built it and whether something got added mid-flight. The project already uses Pydantic in server.py, so this is consistent.

models.py

from __future__ import annotations
from enum import Enum

from pydantic import BaseModel, ConfigDict


class TaskType(str, Enum):
    NEW_TASK = "new_task"
    PR_ITERATION = "pr_iteration"
    PR_REVIEW = "pr_review"

    @property
    def is_pr_task(self) -> bool:
        return self in (TaskType.PR_ITERATION, TaskType.PR_REVIEW)

    @property
    def is_read_only(self) -> bool:
        return self == TaskType.PR_REVIEW


class TaskConfig(BaseModel):
    """Immutable task configuration. Built once, never mutated."""
    model_config = ConfigDict(frozen=True)

    repo_url: str
    github_token: str
    aws_region: str
    task_id: str
    task_type: TaskType = TaskType.NEW_TASK
    task_description: str = ""
    issue_number: str = ""
    anthropic_model: str = "us.anthropic.claude-sonnet-4-6"
    max_turns: int = 100
    max_budget_usd: float | None = None
    system_prompt_overrides: str = ""
    branch_name: str = ""
    pr_number: str = ""
    prompt_version: str = ""
    memory_id: str = ""


class HydratedContext(BaseModel):
    """Context assembled by the orchestrator and passed to the agent container."""
    model_config = ConfigDict(frozen=True)

    user_prompt: str
    memory_context: dict | None = None
    resolved_base_branch: str | None = None
    truncated: bool = False


class RepoSetup(BaseModel):
    """Result of cloning and preparing the repository."""
    model_config = ConfigDict(frozen=True)

    repo_dir: str
    branch: str
    default_branch: str
    build_before: bool
    lint_before: bool
    notes: list[str] = []


class TokenUsage(BaseModel):
    """Token usage from a Claude SDK session."""
    model_config = ConfigDict(frozen=True)

    input_tokens: int = 0
    output_tokens: int = 0
    cache_read_input_tokens: int = 0
    cache_creation_input_tokens: int = 0


class AgentResult(BaseModel):
    """Result from the Claude SDK agent invocation."""
    model_config = ConfigDict(frozen=True)

    status: str = "unknown"
    turns: int = 0
    cost_usd: float | None = None
    num_turns: int = 0
    duration_ms: int = 0
    duration_api_ms: int = 0
    session_id: str = ""
    usage: TokenUsage | None = None
    error: str | None = None


class PostHookResult(BaseModel):
    """Results from all post-agent hooks."""
    model_config = ConfigDict(frozen=True)

    safety_committed: bool = False
    build_passed: bool = False
    lint_passed: bool = False
    pr_url: str | None = None


class TaskResult(BaseModel):
    """Final result of a complete task pipeline run."""
    model_config = ConfigDict(frozen=True)

    status: str
    task_id: str
    agent_status: str
    pr_url: str | None = None
    build_passed: bool = False
    lint_passed: bool = False
    cost_usd: float | None = None
    turns: int | None = None
    duration_s: float = 0.0
    error: str | None = None
    session_id: str = ""
    prompt_version: str | None = None
    memory_written: bool = False
    usage: TokenUsage | None = None

Why frozen + Pydantic instead of dicts

Today the code mutates the config dict mid-flight (config["issue"] = ...), so you can't trust what a dict contains at any point. With frozen=True, data flows forward through return values only.

Before (dict) After (Pydantic)
config["repo_url"] - may KeyError config.repo_url - validated at construction
config.get("max_budget_usd") - None or missing? config.max_budget_usd - typed as `float
if config.get("task_type") in PR_TASK_TYPES (8 times) config.task_type.is_pr_task
HydratedContext(**hydrated_context) - hope keys match HydratedContext.model_validate(hydrated_context) - validates + coerces
result.__dict__ to serialize result.model_dump() - handles enums, nested models
No IDE autocomplete Full autocomplete + type checking

3. The new pipeline

The current run_task() is 307 lines. The new one is ~120 lines - just connecting the modules:

def run_task(repo_url, task_description="", ..., **kwargs) -> dict:
    # 1. Build config
    config = build_config(repo_url=repo_url, ...)

    # 2. Parse hydrated context from orchestrator
    context = HydratedContext.model_validate(hydrated_context) if hydrated_context else None

    with task_span("task.pipeline", attributes={"task.id": config.task_id}):
        task_state.write_running(config.task_id)

        # 3. Resolve prompt
        prompt = context.user_prompt if context else _local_prompt(config)

        # 4. Setup repository
        setup = setup_repo(config)                    # -> RepoSetup

        # 5. Build system prompt
        system_prompt = build_system_prompt(config, setup, context)  # -> str

        # 6. Run agent
        agent_result = invoke_agent(prompt, system_prompt, config, cwd=setup.repo_dir)  # -> AgentResult

        # 7. Post-hooks (commit, build, lint, PR)
        post = run_post_hooks(config, setup, agent_result)  # -> PostHookResult

        # 8. Memory
        memory_written = _write_memory(config, agent_result, post)

        # 9. Build result, emit metrics, write terminal state
        result = TaskResult(status=..., task_id=config.task_id, ...)
        emit_metrics(config.task_id, result.model_dump())
        task_state.write_terminal(config.task_id, ...)

        return result.model_dump()

Please let me know what you think about this @krokoko

Acknowledgements

  • I may be able to implement this feature
  • This might be a breaking change

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions