| layout | default |
|---|---|
| title | Chapter 5: Task Decomposition |
| parent | Microsoft AutoGen Tutorial |
| nav_order | 5 |
Welcome to task decomposition! In this chapter, we'll explore how AutoGen agents break down complex tasks into manageable subtasks, coordinate their execution, and ensure successful completion through intelligent planning and monitoring.
from typing import Dict, Any, List, Optional
from enum import Enum
import time
class TaskComplexity(Enum):
SIMPLE = "simple"
MODERATE = "moderate"
COMPLEX = "complex"
VERY_COMPLEX = "very_complex"
class TaskDependency:
def __init__(self, task_id: str, depends_on: List[str] = None):
self.task_id = task_id
self.depends_on = depends_on or []
class SubTask:
def __init__(self, id: str, description: str, complexity: TaskComplexity,
estimated_duration: int, required_skills: List[str] = None,
dependencies: List[str] = None):
self.id = id
self.description = description
self.complexity = complexity
self.estimated_duration = estimated_duration # in minutes
self.required_skills = required_skills or []
self.dependencies = dependencies or []
self.status = "pending"
self.assigned_agent = None
self.actual_duration = None
self.result = None
class TaskDecomposer:
def __init__(self):
self.decomposition_strategies = {
TaskComplexity.SIMPLE: self._decompose_simple,
TaskComplexity.MODERATE: self._decompose_moderate,
TaskComplexity.COMPLEX: self._decompose_complex,
TaskComplexity.VERY_COMPLEX: self._decompose_very_complex
}
async def decompose_task(self, task_description: str, context: Dict[str, Any] = None) -> List[SubTask]:
"""Decompose a task into subtasks based on complexity analysis"""
complexity = await self._analyze_complexity(task_description, context)
strategy = self.decomposition_strategies[complexity]
subtasks = await strategy(task_description, context)
subtasks = self._optimize_subtasks(subtasks)
subtasks = self._add_dependencies(subtasks)
return subtasks
async def _analyze_complexity(self, task: str, context: Dict[str, Any] = None) -> TaskComplexity:
"""Analyze task complexity using heuristics"""
word_count = len(task.split())
has_multiple_steps = any(word in task.lower() for word in
['and', 'then', 'after', 'followed by', 'next'])
has_research = any(word in task.lower() for word in
['research', 'find', 'analyze', 'investigate'])
has_creation = any(word in task.lower() for word in
['create', 'build', 'develop', 'design'])
complexity_score = 0
complexity_score += min(word_count // 20, 3) # Length factor
complexity_score += 1 if has_multiple_steps else 0
complexity_score += 2 if has_research else 0
complexity_score += 2 if has_creation else 0
if complexity_score <= 2:
return TaskComplexity.SIMPLE
elif complexity_score <= 4:
return TaskComplexity.MODERATE
elif complexity_score <= 6:
return TaskComplexity.COMPLEX
else:
return TaskComplexity.VERY_COMPLEX
async def _decompose_simple(self, task: str, context: Dict[str, Any] = None) -> List[SubTask]:
"""Decompose simple tasks (1-2 subtasks)"""
return [
SubTask(
id="simple_1",
description=task,
complexity=TaskComplexity.SIMPLE,
estimated_duration=15
)
]
async def _decompose_moderate(self, task: str, context: Dict[str, Any] = None) -> List[SubTask]:
"""Decompose moderate tasks (2-4 subtasks)"""
subtasks = []
# Analyze task components
if 'research' in task.lower():
subtasks.append(SubTask(
id="research",
description="Research and gather information",
complexity=TaskComplexity.SIMPLE,
estimated_duration=20,
required_skills=["research"]
))
if 'analyze' in task.lower() or 'review' in task.lower():
subtasks.append(SubTask(
id="analysis",
description="Analyze gathered information",
complexity=TaskComplexity.SIMPLE,
estimated_duration=15,
required_skills=["analysis"]
))
if 'create' in task.lower() or 'write' in task.lower():
subtasks.append(SubTask(
id="creation",
description="Create the final output",
complexity=TaskComplexity.SIMPLE,
estimated_duration=25,
required_skills=["writing"]
))
return subtasks or [SubTask(
id="moderate_1",
description=task,
complexity=TaskComplexity.MODERATE,
estimated_duration=30
)]
async def _decompose_complex(self, task: str, context: Dict[str, Any] = None) -> List[SubTask]:
"""Decompose complex tasks (4-8 subtasks)"""
# Use more sophisticated decomposition
plan = await self._generate_decomposition_plan(task)
subtasks = []
for i, step in enumerate(plan['steps']):
subtasks.append(SubTask(
id=f"complex_{i+1}",
description=step['description'],
complexity=TaskComplexity.MODERATE,
estimated_duration=step.get('duration', 20),
required_skills=step.get('skills', [])
))
return subtasks
async def _decompose_very_complex(self, task: str, context: Dict[str, Any] = None) -> List[SubTask]:
"""Decompose very complex tasks (8+ subtasks)"""
# Break down into phases
phases = await self._identify_phases(task)
subtasks = []
for phase in phases:
phase_subtasks = await self._decompose_complex(phase['description'])
for subtask in phase_subtasks:
subtask.id = f"{phase['id']}_{subtask.id}"
subtasks.append(subtask)
return subtasks
def _optimize_subtasks(self, subtasks: List[SubTask]) -> List[SubTask]:
"""Optimize subtasks for efficiency"""
# Remove redundant subtasks
unique_descriptions = set()
optimized = []
for subtask in subtasks:
if subtask.description not in unique_descriptions:
unique_descriptions.add(subtask.description)
optimized.append(subtask)
# Sort by dependencies (simple implementation)
return sorted(optimized, key=lambda x: len(x.dependencies))
def _add_dependencies(self, subtasks: List[SubTask]) -> List[SubTask]:
"""Add dependency relationships between subtasks"""
for i, subtask in enumerate(subtasks):
# Simple dependency logic - later subtasks depend on earlier ones
if i > 0:
subtask.dependencies = [subtasks[i-1].id]
return subtasks
async def _generate_decomposition_plan(self, task: str) -> Dict[str, Any]:
"""Generate a detailed decomposition plan"""
# This would typically use an LLM to generate the plan
# For now, return a simple structure
return {
"steps": [
{"description": "Analyze requirements", "duration": 15, "skills": ["analysis"]},
{"description": "Plan approach", "duration": 10, "skills": ["planning"]},
{"description": "Execute main work", "duration": 30, "skills": ["execution"]},
{"description": "Review and refine", "duration": 15, "skills": ["review"]}
]
}
async def _identify_phases(self, task: str) -> List[Dict[str, Any]]:
"""Identify major phases in a complex task"""
return [
{"id": "planning", "description": "Planning and preparation phase"},
{"id": "execution", "description": "Main execution phase"},
{"id": "review", "description": "Review and refinement phase"},
{"id": "delivery", "description": "Final delivery and documentation"}
]class DependencyManager:
def __init__(self):
self.dependencies: Dict[str, List[str]] = {}
self.completed_tasks: set = set()
def add_dependency(self, task_id: str, depends_on: List[str]):
"""Add dependencies for a task"""
self.dependencies[task_id] = depends_on
def can_execute(self, task_id: str) -> bool:
"""Check if a task can be executed (all dependencies met)"""
deps = self.dependencies.get(task_id, [])
return all(dep in self.completed_tasks for dep in deps)
def get_executable_tasks(self, all_tasks: List[SubTask]) -> List[SubTask]:
"""Get all tasks that can currently be executed"""
return [task for task in all_tasks
if task.status == "pending" and self.can_execute(task.id)]
def mark_completed(self, task_id: str):
"""Mark a task as completed"""
self.completed_tasks.add(task_id)
def get_critical_path(self, tasks: List[SubTask]) -> List[str]:
"""Calculate the critical path through the task dependencies"""
# Simple critical path calculation
critical_path = []
remaining_tasks = {task.id: task for task in tasks}
while remaining_tasks:
# Find tasks with no remaining dependencies
executable = [tid for tid, task in remaining_tasks.items()
if self.can_execute(tid)]
if not executable:
break # Circular dependency or impossible to complete
# Add first executable task to critical path
next_task = executable[0]
critical_path.append(next_task)
self.mark_completed(next_task)
del remaining_tasks[next_task]
return critical_pathfrom autogen import AssistantAgent
import asyncio
from typing import Dict, Any, List
class TaskCoordinator:
def __init__(self, agents: List[AssistantAgent]):
self.agents = agents
self.task_assignments: Dict[str, str] = {} # task_id -> agent_name
self.task_status: Dict[str, str] = {} # task_id -> status
self.decomposer = TaskDecomposer()
self.dependency_manager = DependencyManager()
async def execute_complex_task(self, task_description: str) -> Dict[str, Any]:
"""Execute a complex task by decomposing and coordinating subtasks"""
print(f"🤖 Starting complex task: {task_description}")
# 1. Decompose the task
subtasks = await self.decomposer.decompose_task(task_description)
print(f"📋 Decomposed into {len(subtasks)} subtasks")
# 2. Set up dependencies
for subtask in subtasks:
self.dependency_manager.add_dependency(subtask.id, subtask.dependencies)
self.task_status[subtask.id] = "pending"
# 3. Execute subtasks in dependency order
results = {}
max_iterations = len(subtasks) * 2 # Prevent infinite loops
iteration = 0
while iteration < max_iterations:
iteration += 1
# Get executable tasks
executable_tasks = self.dependency_manager.get_executable_tasks(subtasks)
if not executable_tasks:
break # No more tasks can be executed
# Execute tasks in parallel where possible
execution_results = await self._execute_parallel(executable_tasks)
# Update results and mark tasks as completed
for task_id, result in execution_results.items():
results[task_id] = result
self.task_status[task_id] = "completed"
self.dependency_manager.mark_completed(task_id)
# 4. Synthesize final result
final_result = await self._synthesize_results(results, task_description)
return {
"success": True,
"subtasks_completed": len(results),
"total_subtasks": len(subtasks),
"final_result": final_result,
"execution_time": time.time()
}
async def _execute_parallel(self, tasks: List[SubTask]) -> Dict[str, Any]:
"""Execute multiple tasks in parallel"""
execution_tasks = []
for subtask in tasks:
# Assign task to appropriate agent
agent = self._assign_task_to_agent(subtask)
if agent:
execution_tasks.append(self._execute_single_task(subtask, agent))
# Execute all tasks concurrently
results = await asyncio.gather(*execution_tasks, return_exceptions=True)
# Process results
execution_results = {}
for i, result in enumerate(results):
task_id = tasks[i].id
if isinstance(result, Exception):
execution_results[task_id] = {"error": str(result)}
else:
execution_results[task_id] = result
return execution_results
def _assign_task_to_agent(self, subtask: SubTask) -> Optional[AssistantAgent]:
"""Assign a subtask to the most appropriate agent"""
best_agent = None
best_score = 0
for agent in self.agents:
score = self._calculate_agent_fit(agent, subtask)
if score > best_score:
best_score = score
best_agent = agent
if best_agent:
self.task_assignments[subtask.id] = best_agent.name
return best_agent
def _calculate_agent_fit(self, agent: AssistantAgent, subtask: SubTask) -> float:
"""Calculate how well an agent fits a subtask"""
score = 0.0
# Check agent capabilities (if available)
if hasattr(agent, 'capabilities'):
agent_skills = set(agent.capabilities)
required_skills = set(subtask.required_skills)
if required_skills:
skill_match = len(agent_skills.intersection(required_skills)) / len(required_skills)
score += skill_match * 0.6
# Check agent name/expertise based on naming
agent_name_lower = agent.name.lower()
task_desc_lower = subtask.description.lower()
if 'research' in task_desc_lower and 'research' in agent_name_lower:
score += 0.3
elif 'analysis' in task_desc_lower and ('analyst' in agent_name_lower or 'analysis' in agent_name_lower):
score += 0.3
elif 'writing' in task_desc_lower and ('writer' in agent_name_lower or 'content' in agent_name_lower):
score += 0.3
# Default score for any agent
score = max(score, 0.1)
return score
async def _execute_single_task(self, subtask: SubTask, agent: AssistantAgent) -> Dict[str, Any]:
"""Execute a single subtask with an agent"""
start_time = time.time()
try:
# Create a focused prompt for the subtask
prompt = f"""Please complete the following task:
{subtask.description}
Context: This is part of a larger task that has been decomposed into subtasks.
Focus on completing this specific part thoroughly and efficiently.
Required skills: {', '.join(subtask.required_skills) if subtask.required_skills else 'General'}
Estimated time: {subtask.estimated_duration} minutes
"""
# Execute the task
response = await agent.agenerate_reply(
messages=[{"role": "user", "content": prompt}]
)
execution_time = time.time() - start_time
return {
"success": True,
"result": response,
"execution_time": execution_time,
"agent": agent.name
}
except Exception as e:
execution_time = time.time() - start_time
return {
"success": False,
"error": str(e),
"execution_time": execution_time,
"agent": agent.name
}
async def _synthesize_results(self, results: Dict[str, Any], original_task: str) -> str:
"""Synthesize results from all subtasks into a final answer"""
# Create a synthesis agent
synthesis_agent = AssistantAgent(
name="SynthesisAgent",
llm_config={"model": "gpt-4", "api_key": os.environ.get("OPENAI_API_KEY")},
system_message="You are an expert at synthesizing information from multiple sources into coherent results."
)
# Prepare synthesis prompt
synthesis_prompt = f"""Please synthesize the results from the following subtasks into a comprehensive final answer:
Original Task: {original_task}
Subtask Results:
"""
for task_id, result in results.items():
synthesis_prompt += f"\n--- {task_id} ---\n"
if result.get("success"):
synthesis_prompt += result.get("result", "No result")
else:
synthesis_prompt += f"Error: {result.get('error', 'Unknown error')}"
synthesis_prompt += "\n\nPlease provide a well-structured, comprehensive final answer that addresses the original task."
# Generate synthesis
synthesis_response = await synthesis_agent.agenerate_reply(
messages=[{"role": "user", "content": synthesis_prompt}]
)
return synthesis_responseclass HierarchicalDecomposer:
def __init__(self):
self.max_depth = 3
self.min_subtask_complexity = TaskComplexity.SIMPLE
async def hierarchical_decompose(self, task: str, current_depth: int = 0) -> List[SubTask]:
"""Decompose tasks hierarchically"""
if current_depth >= self.max_depth:
return [SubTask(
id=f"leaf_{current_depth}",
description=task,
complexity=self.min_subtask_complexity,
estimated_duration=15
)]
# Analyze if task needs further decomposition
complexity = await self._analyze_complexity(task)
if complexity <= self.min_subtask_complexity:
return [SubTask(
id=f"task_{current_depth}",
description=task,
complexity=complexity,
estimated_duration=self._estimate_duration(complexity)
)]
# Decompose into subtasks
subtasks_info = await self._generate_subtasks(task)
all_subtasks = []
for i, subtask_info in enumerate(subtasks_info):
subtask_id = f"level_{current_depth}_{i}"
subtask_description = subtask_info['description']
# Recursively decompose if needed
sub_subtasks = await self.hierarchical_decompose(
subtask_description,
current_depth + 1
)
# Update IDs to maintain hierarchy
for sub_subtask in sub_subtasks:
sub_subtask.id = f"{subtask_id}_{sub_subtask.id}"
all_subtasks.extend(sub_subtasks)
return all_subtasks
async def _generate_subtasks(self, task: str) -> List[Dict[str, Any]]:
"""Generate logical subtasks for a given task"""
# This would typically use an LLM to generate subtasks
# For now, return a simple breakdown
return [
{"description": f"Analyze the requirements for: {task}"},
{"description": f"Plan the approach for: {task}"},
{"description": f"Execute the main work for: {task}"},
{"description": f"Review and refine the results for: {task}"}
]
def _estimate_duration(self, complexity: TaskComplexity) -> int:
"""Estimate duration based on complexity"""
duration_map = {
TaskComplexity.SIMPLE: 15,
TaskComplexity.MODERATE: 30,
TaskComplexity.COMPLEX: 60,
TaskComplexity.VERY_COMPLEX: 120
}
return duration_map.get(complexity, 30)class GoalOrientedDecomposer:
def __init__(self):
self.goal_templates = {
"research": self._decompose_research_goal,
"creation": self._decompose_creation_goal,
"analysis": self._decompose_analysis_goal,
"implementation": self._decompose_implementation_goal
}
async def decompose_by_goal_type(self, task: str, goal_type: str = None) -> List[SubTask]:
"""Decompose task based on identified goal type"""
if not goal_type:
goal_type = await self._identify_goal_type(task)
decomposer = self.goal_templates.get(goal_type, self._decompose_general_goal)
return await decomposer(task)
async def _identify_goal_type(self, task: str) -> str:
"""Identify the primary goal type of a task"""
task_lower = task.lower()
if any(word in task_lower for word in ['research', 'find', 'discover', 'investigate']):
return "research"
elif any(word in task_lower for word in ['create', 'build', 'develop', 'design']):
return "creation"
elif any(word in task_lower for word in ['analyze', 'review', 'evaluate', 'assess']):
return "analysis"
elif any(word in task_lower for word in ['implement', 'code', 'program', 'execute']):
return "implementation"
else:
return "general"
async def _decompose_research_goal(self, task: str) -> List[SubTask]:
"""Decompose research-oriented tasks"""
return [
SubTask("research_plan", "Define research objectives and methodology", TaskComplexity.SIMPLE, 15, ["planning"]),
SubTask("gather_sources", "Identify and collect relevant sources", TaskComplexity.MODERATE, 30, ["research"]),
SubTask("analyze_data", "Analyze and synthesize collected information", TaskComplexity.MODERATE, 45, ["analysis"]),
SubTask("draw_conclusions", "Draw conclusions and prepare findings", TaskComplexity.SIMPLE, 20, ["synthesis"])
]
async def _decompose_creation_goal(self, task: str) -> List[SubTask]:
"""Decompose creation-oriented tasks"""
return [
SubTask("requirements", "Gather and analyze requirements", TaskComplexity.SIMPLE, 20, ["analysis"]),
SubTask("design", "Create design and architecture", TaskComplexity.MODERATE, 40, ["design"]),
SubTask("implementation", "Implement the solution", TaskComplexity.COMPLEX, 90, ["implementation"]),
SubTask("testing", "Test and validate the creation", TaskComplexity.MODERATE, 30, ["testing"])
]
async def _decompose_analysis_goal(self, task: str) -> List[SubTask]:
"""Decompose analysis-oriented tasks"""
return [
SubTask("data_collection", "Collect and organize data", TaskComplexity.SIMPLE, 20, ["data_collection"]),
SubTask("data_cleaning", "Clean and prepare data for analysis", TaskComplexity.SIMPLE, 15, ["data_processing"]),
SubTask("analysis_execution", "Perform the actual analysis", TaskComplexity.MODERATE, 45, ["analysis"]),
SubTask("reporting", "Create analysis report and recommendations", TaskComplexity.SIMPLE, 25, ["reporting"])
]
async def _decompose_implementation_goal(self, task: str) -> List[SubTask]:
"""Decompose implementation-oriented tasks"""
return [
SubTask("planning", "Plan the implementation approach", TaskComplexity.SIMPLE, 20, ["planning"]),
SubTask("setup", "Set up development environment", TaskComplexity.SIMPLE, 15, ["setup"]),
SubTask("coding", "Write and implement the code", TaskComplexity.COMPLEX, 120, ["coding"]),
SubTask("integration", "Integrate with existing systems", TaskComplexity.MODERATE, 40, ["integration"]),
SubTask("deployment", "Deploy and test in production", TaskComplexity.MODERATE, 30, ["deployment"])
]
async def _decompose_general_goal(self, task: str) -> List[SubTask]:
"""General decomposition for unspecified goal types"""
return [
SubTask("analyze", "Analyze the task requirements", TaskComplexity.SIMPLE, 15, ["analysis"]),
SubTask("plan", "Create a plan to accomplish the task", TaskComplexity.SIMPLE, 20, ["planning"]),
SubTask("execute", "Execute the main work", TaskComplexity.MODERATE, 60, ["execution"]),
SubTask("review", "Review and refine the results", TaskComplexity.SIMPLE, 15, ["review"])
]class TaskProgressTracker:
def __init__(self):
self.task_progress: Dict[str, Dict[str, Any]] = {}
self.overall_progress = 0.0
def initialize_task(self, task_id: str, subtasks: List[SubTask]):
"""Initialize progress tracking for a task"""
self.task_progress[task_id] = {
"total_subtasks": len(subtasks),
"completed_subtasks": 0,
"subtask_progress": {subtask.id: 0.0 for subtask in subtasks},
"start_time": time.time(),
"estimated_completion": None,
"actual_completion": None
}
# Calculate estimated completion time
total_duration = sum(subtask.estimated_duration for subtask in subtasks)
self.task_progress[task_id]["estimated_completion"] = time.time() + (total_duration * 60)
def update_subtask_progress(self, task_id: str, subtask_id: str, progress: float):
"""Update progress for a specific subtask"""
if task_id in self.task_progress:
self.task_progress[task_id]["subtask_progress"][subtask_id] = min(1.0, max(0.0, progress))
# Recalculate overall progress
self._recalculate_overall_progress(task_id)
def mark_subtask_completed(self, task_id: str, subtask_id: str):
"""Mark a subtask as completed"""
self.update_subtask_progress(task_id, subtask_id, 1.0)
self.task_progress[task_id]["completed_subtasks"] += 1
# Check if task is complete
if self._is_task_complete(task_id):
self.task_progress[task_id]["actual_completion"] = time.time()
def get_task_progress(self, task_id: str) -> Dict[str, Any]:
"""Get progress information for a task"""
if task_id not in self.task_progress:
return {}
progress = self.task_progress[task_id]
return {
"overall_progress": self._calculate_overall_progress(task_id),
"completed_subtasks": progress["completed_subtasks"],
"total_subtasks": progress["total_subtasks"],
"estimated_completion": progress["estimated_completion"],
"actual_completion": progress["actual_completion"],
"time_elapsed": time.time() - progress["start_time"],
"on_schedule": self._is_on_schedule(task_id)
}
def _calculate_overall_progress(self, task_id: str) -> float:
"""Calculate overall progress for a task"""
progress = self.task_progress[task_id]
subtask_progress = progress["subtask_progress"]
if not subtask_progress:
return 0.0
total_progress = sum(subtask_progress.values())
return total_progress / len(subtask_progress)
def _recalculate_overall_progress(self, task_id: str):
"""Recalculate and cache overall progress"""
self.overall_progress = self._calculate_overall_progress(task_id)
def _is_task_complete(self, task_id: str) -> bool:
"""Check if a task is complete"""
progress = self.task_progress[task_id]
return progress["completed_subtasks"] >= progress["total_subtasks"]
def _is_on_schedule(self, task_id: str) -> bool:
"""Check if task is on schedule"""
progress = self.task_progress[task_id]
current_time = time.time()
estimated_completion = progress["estimated_completion"]
if not estimated_completion:
return True
overall_progress = self._calculate_overall_progress(task_id)
expected_progress = (current_time - progress["start_time"]) / (estimated_completion - progress["start_time"])
# Allow 10% variance
return abs(overall_progress - expected_progress) <= 0.1Congratulations! 🎉 You've successfully learned about:
- Task Decomposition Fundamentals - Analyzing complexity and breaking down tasks
- Dependency Management - Managing task relationships and execution order
- Task Execution Coordination - Coordinating multiple agents and subtasks
- Advanced Decomposition Strategies - Hierarchical and goal-oriented decomposition
- Task Execution Monitoring - Tracking progress and performance
Now that you understand task decomposition, let's explore custom agent development. In Chapter 6: Custom Agent Development, we'll learn how to create specialized agents with custom capabilities and behaviors.
Practice what you've learned:
- Decompose a complex task from your work into subtasks with dependencies
- Implement a task coordinator that assigns subtasks to appropriate agents
- Create a progress tracker that monitors task completion
- Experiment with different decomposition strategies for various task types
What kind of complex task would you decompose first? 📋
Most teams struggle here because the hard part is not writing more code, but deciding clear boundaries for self, task, SubTask so behavior stays predictable as complexity grows.
In practical terms, this chapter helps you avoid three common failures:
- coupling core logic too tightly to one implementation path
- missing the handoff boundaries between setup, execution, and validation
- shipping changes without clear rollback or observability strategy
After working through this chapter, you should be able to reason about Chapter 5: Task Decomposition as an operating subsystem inside Microsoft AutoGen Tutorial: Building Multi-Agent AI Systems, with explicit contracts for inputs, state transitions, and outputs.
Use the implementation notes around subtasks, task_id, TaskComplexity as your checklist when adapting these patterns to your own repository.
Under the hood, Chapter 5: Task Decomposition usually follows a repeatable control path:
- Context bootstrap: initialize runtime config and prerequisites for
self. - Input normalization: shape incoming data so
taskreceives stable contracts. - Core execution: run the main logic branch and propagate intermediate state through
SubTask. - Policy and safety checks: enforce limits, auth scopes, and failure boundaries.
- Output composition: return canonical result payloads for downstream consumers.
- Operational telemetry: emit logs/metrics needed for debugging and performance tuning.
When debugging, walk this sequence in order and confirm each stage has explicit success/failure conditions.
Use the following upstream sources to verify implementation details while reading this chapter:
- View Repo
Why it matters: authoritative reference on
View Repo(github.com).
Suggested trace strategy:
- search upstream code for
selfandtaskto map concrete implementation paths - compare docs claims against actual runtime/config code before reusing patterns in production