Skip to content

Commit 4e832c9

Browse files
committed
[ddev/ai/phases]: Introduce AgenticPhase and make Phase an abstract lifecycle base (#23663)
* refactor(ai/phases): introduce PhaseOutcome and abstract Phase.execute() - Add PhaseOutcome dataclass (memory_text, token counts, extra_checkpoint) - Add validate_config() classmethod to Phase (no-op default) - Add execute() method that implements the agent pipeline (later to be overridden by AgentPhase) - Rewrite process_message() to call execute() and assemble the checkpoint from PhaseOutcome * refactor(ai/phases): extract AgentPhase from Phase - Create agent_phase.py with AgentPhase(Phase) that owns the LLM pipeline: before_react/after_react hooks, run_tasks, execute() - Move render_task_prompt and render_memory_prompt to agent_phase.py - AgentPhase.validate_config enforces agent, known-agent, and non-empty tasks - Phase.execute() now raises NotImplementedError — subclasses must implement it - Strip base.py of all agent-specific code and imports - Split test_base.py into lifecycle-only tests (using _StubPhase) and test_agent_phase.py for the agent-driven behaviour tests * refactor(ai/phases): make PhaseConfig.agent and .tasks optional - type default: "Phase" → "AgentPhase" - agent: str (required) → str | None = None - tasks: list[TaskConfig] (required) → list[TaskConfig] = [] - Remove at_least_one_task field validator (now enforced by AgentPhase.validate_config) - FlowConfig.cross_references: skip unknown-agent check when agent is None - orchestrator: guard agent_config lookup against None, import AgentConfig - test_config.py: update type assertion, remove empty_tasks test, add test_flow_config_phase_without_agent_validates - test_base.py / test_agent_phase.py: drop model_construct workarounds * refactor(ai/phases): invoke Phase.validate_config from orchestrator - Call phase_cls.validate_config(phase_id, config, agents) immediately after resolving the phase class in on_initialize — only for phases scheduled in flow: - Orphan phases (defined but absent from flow:) are skipped before the call - test_orchestrator.py: drop explicit type: Phase lines from fixtures (use AgentPhase default), assert AgentPhase is registered by discovery, add tests for validate_config invocation and orphan-skip behaviour * Rename AgentPhase to AgenticPhase * Split AgenticPhase's execute into smaller functions and added tests for them * Move agent and client parameters to AgenticPhase and make Phase abstract * Add e2e Phase contract test * Move some tests from agentic phase to conftest * Phase not registered and improve tests * Prevent extra_checkpoint from overriding checkpoint_payload * Make Phase and Orchestrator model-agnostic * Add Phase.extra_init_kwargs and agent/build.py tests
1 parent e5e4d72 commit 4e832c9

13 files changed

Lines changed: 1245 additions & 891 deletions

File tree

ddev/src/ddev/ai/agent/build.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# (C) Datadog, Inc. 2026-present
2+
# All rights reserved
3+
# Licensed under a 3-clause BSD style license (see LICENSE)
4+
5+
from __future__ import annotations
6+
7+
from collections.abc import Callable
8+
from typing import TYPE_CHECKING, Any
9+
10+
from ddev.ai.agent.anthropic_client import AnthropicAgent
11+
from ddev.ai.agent.base import BaseAgent
12+
from ddev.ai.tools.fs.file_registry import FileRegistry
13+
from ddev.ai.tools.registry import ToolRegistry
14+
15+
if TYPE_CHECKING:
16+
from ddev.ai.phases.config import AgentConfig
17+
18+
AgentBuilder = Callable[[str, str], tuple[BaseAgent[Any], ToolRegistry]]
19+
20+
21+
def _resolve_client(agent_clients: dict[str, Any], provider: str) -> Any:
22+
client = agent_clients.get(provider)
23+
if client is None:
24+
raise ValueError(f"No client provided for agent provider {provider!r}")
25+
return client
26+
27+
28+
def build_agent(
29+
agent_config: AgentConfig,
30+
agent_clients: dict[str, Any],
31+
system_prompt: str,
32+
owner_id: str,
33+
file_registry: FileRegistry,
34+
) -> tuple[BaseAgent[Any], ToolRegistry]:
35+
"""Construct a provider-specific BaseAgent and its ToolRegistry from an AgentConfig."""
36+
37+
tool_registry = ToolRegistry.from_names(
38+
agent_config.tools,
39+
owner_id=owner_id,
40+
file_registry=file_registry,
41+
)
42+
43+
if agent_config.provider == "anthropic":
44+
kwargs: dict[str, Any] = {}
45+
if agent_config.model is not None:
46+
kwargs["model"] = agent_config.model
47+
if agent_config.max_tokens is not None:
48+
kwargs["max_tokens"] = agent_config.max_tokens
49+
agent: BaseAgent[Any] = AnthropicAgent(
50+
client=_resolve_client(agent_clients, "anthropic"),
51+
tools=tool_registry,
52+
system_prompt=system_prompt,
53+
name=owner_id,
54+
**kwargs,
55+
)
56+
return agent, tool_registry
57+
58+
raise ValueError(f"Unknown agent provider: {agent_config.provider!r}")
59+
60+
61+
def make_agent_builder(
62+
agent_config: AgentConfig,
63+
agent_clients: dict[str, Any],
64+
file_registry: FileRegistry,
65+
) -> AgentBuilder:
66+
"""Return a closure that builds an agent+registry given a rendered system_prompt and owner_id."""
67+
68+
def builder(system_prompt: str, owner_id: str) -> tuple[BaseAgent[Any], ToolRegistry]:
69+
return build_agent(
70+
agent_config=agent_config,
71+
agent_clients=agent_clients,
72+
system_prompt=system_prompt,
73+
owner_id=owner_id,
74+
file_registry=file_registry,
75+
)
76+
77+
return builder
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
# (C) Datadog, Inc. 2026-present
2+
# All rights reserved
3+
# Licensed under a 3-clause BSD style license (see LICENSE)
4+
5+
import logging
6+
from collections.abc import Callable
7+
from pathlib import Path
8+
from typing import Any
9+
10+
from ddev.ai.agent.base import BaseAgent
11+
from ddev.ai.agent.build import AgentBuilder, make_agent_builder
12+
from ddev.ai.callbacks.callbacks import Callbacks
13+
from ddev.ai.phases.base import Phase, PhaseOutcome
14+
from ddev.ai.phases.checkpoint import CheckpointManager
15+
from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig
16+
from ddev.ai.phases.template import render_inline, render_prompt
17+
from ddev.ai.react.process import ReActProcess
18+
from ddev.ai.tools.fs.file_registry import FileRegistry
19+
20+
21+
def render_task_prompt(
22+
task: TaskConfig,
23+
config_dir: Path,
24+
context: dict[str, Any],
25+
resolver: Callable[[str], str] | None = None,
26+
) -> str:
27+
"""Render a task prompt — from file if prompt_path is set, inline otherwise."""
28+
if task.prompt_path is not None:
29+
return render_prompt(config_dir / task.prompt_path, context, resolver)
30+
if task.prompt is None:
31+
raise FlowConfigError("TaskConfig must set either 'prompt' or 'prompt_path'")
32+
return render_inline(task.prompt, context, resolver)
33+
34+
35+
def render_memory_prompt(
36+
checkpoint: CheckpointConfig,
37+
config_dir: Path,
38+
context: dict[str, Any],
39+
) -> str:
40+
"""Render a checkpoint memory prompt — from file if memory_prompt_path is set, inline otherwise."""
41+
if checkpoint.memory_prompt_path is not None:
42+
return render_prompt(config_dir / checkpoint.memory_prompt_path, context)
43+
if checkpoint.memory_prompt is None:
44+
raise FlowConfigError("CheckpointConfig must set either 'memory_prompt' or 'memory_prompt_path'")
45+
return render_inline(checkpoint.memory_prompt, context)
46+
47+
48+
class AgenticPhase(Phase):
49+
"""Phase that owns an LLM agent and drives one or more ReAct loops."""
50+
51+
def __init__(
52+
self,
53+
phase_id: str,
54+
dependencies: list[str],
55+
config: PhaseConfig,
56+
agent_builder: AgentBuilder,
57+
checkpoint_manager: CheckpointManager,
58+
runtime_variables: dict[str, str],
59+
flow_variables: dict[str, str],
60+
config_dir: Path,
61+
file_registry: FileRegistry,
62+
callbacks: Callbacks | None = None,
63+
logger: logging.Logger | None = None,
64+
) -> None:
65+
super().__init__(
66+
phase_id=phase_id,
67+
dependencies=dependencies,
68+
config=config,
69+
checkpoint_manager=checkpoint_manager,
70+
runtime_variables=runtime_variables,
71+
flow_variables=flow_variables,
72+
config_dir=config_dir,
73+
file_registry=file_registry,
74+
callbacks=callbacks,
75+
logger=logger,
76+
)
77+
self._agent_builder = agent_builder
78+
79+
@classmethod
80+
def validate_config(
81+
cls,
82+
phase_id: str,
83+
config: PhaseConfig,
84+
agents: dict[str, AgentConfig],
85+
) -> None:
86+
if config.agent is None:
87+
raise FlowConfigError(f"Phase {phase_id!r} (AgenticPhase) requires 'agent'")
88+
if config.agent not in agents:
89+
raise FlowConfigError(f"Phase {phase_id!r} references unknown agent: {config.agent!r}")
90+
if not config.tasks:
91+
raise FlowConfigError(f"Phase {phase_id!r} (AgenticPhase) must have at least one task")
92+
93+
@classmethod
94+
def extra_init_kwargs(
95+
cls,
96+
phase_id: str,
97+
phase_config: PhaseConfig,
98+
agents: dict[str, AgentConfig],
99+
agent_clients: dict[str, Any],
100+
file_registry: FileRegistry,
101+
) -> dict[str, Any]:
102+
if phase_config.agent is None:
103+
raise FlowConfigError(f"Phase {phase_id!r} (AgenticPhase) requires 'agent'")
104+
return {
105+
"agent_builder": make_agent_builder(
106+
agent_config=agents[phase_config.agent],
107+
agent_clients=agent_clients,
108+
file_registry=file_registry,
109+
)
110+
}
111+
112+
def before_react(self) -> None:
113+
"""Called once before agent/tools are created. Override for phase-specific setup."""
114+
115+
def after_react(self) -> None:
116+
"""Called once after all tasks complete. Override for phase-specific teardown."""
117+
118+
async def run_tasks(
119+
self,
120+
process: ReActProcess,
121+
context: dict[str, Any],
122+
) -> tuple[int, int]:
123+
"""Run the task loop. Returns (total_input_tokens, total_output_tokens).
124+
125+
Override to customize task execution — e.g. add retries, change ordering, etc.
126+
Default implementation iterates through config.tasks sequentially.
127+
"""
128+
total_input = total_output = 0
129+
last_result = None
130+
for task in self._config.tasks:
131+
if last_result is not None and last_result.context_usage is not None:
132+
if last_result.context_usage.context_pct >= self._config.context_compact_threshold_pct:
133+
compact_in, compact_out = await process.compact()
134+
total_input += compact_in
135+
total_output += compact_out
136+
prompt = render_task_prompt(task, self._config_dir, context, self._resolver)
137+
last_result = await process.start(prompt)
138+
total_input += last_result.total_input_tokens
139+
total_output += last_result.total_output_tokens
140+
return total_input, total_output
141+
142+
def _build_agent_and_process(self, context: dict[str, Any]) -> tuple[BaseAgent[Any], ReActProcess]:
143+
"""Build the agent and ReAct process used to drive task execution."""
144+
system_prompt = render_prompt(
145+
self._config_dir / "prompts" / f"{self._config.agent}.md",
146+
context,
147+
self._resolver,
148+
)
149+
agent, tool_registry = self._agent_builder(system_prompt, self._phase_id)
150+
process = ReActProcess(
151+
agent=agent,
152+
tool_registry=tool_registry,
153+
callbacks=self._callbacks,
154+
)
155+
return agent, process
156+
157+
async def _run_memory_step(
158+
self,
159+
agent: BaseAgent[Any],
160+
context: dict[str, Any],
161+
) -> tuple[str, int, int]:
162+
"""Run the final summary turn. Returns (memory_text, input_tokens, output_tokens)."""
163+
user_additions = None
164+
if self._config.checkpoint is not None:
165+
user_additions = render_memory_prompt(self._config.checkpoint, self._config_dir, context)
166+
memory_prompt = self._checkpoint_manager.build_memory_prompt(user_additions)
167+
168+
await self._callbacks.fire_before_agent_send(1)
169+
response = await agent.send(memory_prompt, allowed_tools=[])
170+
await self._callbacks.fire_agent_response(response, 1)
171+
return response.text, response.usage.input_tokens, response.usage.output_tokens
172+
173+
async def execute(self, context: dict[str, Any]) -> PhaseOutcome:
174+
self.before_react()
175+
agent, process = self._build_agent_and_process(context)
176+
total_input, total_output = await self.run_tasks(process, context)
177+
self.after_react()
178+
179+
memory_text, mem_in, mem_out = await self._run_memory_step(agent, context)
180+
181+
return PhaseOutcome(
182+
memory_text=memory_text,
183+
total_input_tokens=total_input + mem_in,
184+
total_output_tokens=total_output + mem_out,
185+
)

0 commit comments

Comments
 (0)