| layout | default |
|---|---|
| title | Chapter 7: Advanced Crew Patterns |
| parent | CrewAI Tutorial |
| nav_order | 7 |
Welcome to Chapter 7: Advanced Crew Patterns. In this part of CrewAI Tutorial: Building Collaborative AI Agent Teams, you will build an intuitive mental model first, then move into concrete implementation details and practical production tradeoffs.
This chapter explores sophisticated multi-crew architectures, specialized formations, and advanced collaboration patterns that enable complex problem-solving at scale.
from crewai import Crew, Agent, Task
from typing import List, Dict, Any, Optional
import asyncio
class FederatedCrewSystem:
def __init__(self):
self.crews = {}
self.federation_coordinator = None
self.communication_hub = CommunicationHub()
async def create_federation(self, crew_configs: List[Dict[str, Any]]) -> str:
"""Create a federation of specialized crews"""
federation_id = self._generate_federation_id()
# Create individual crews
for config in crew_configs:
crew = await self._create_specialized_crew(config)
self.crews[f"{federation_id}_{config['specialty']}"] = crew
# Create federation coordinator
self.federation_coordinator = await self._create_coordinator(federation_id)
return federation_id
async def _create_specialized_crew(self, config: Dict[str, Any]) -> Crew:
"""Create a crew specialized in a specific domain"""
agents = []
for agent_config in config["agents"]:
agent = Agent(
role=agent_config["role"],
goal=agent_config["goal"],
backstory=agent_config["backstory"],
tools=agent_config.get("tools", [])
)
agents.append(agent)
return Crew(
agents=agents,
tasks=[], # Tasks assigned dynamically
verbose=True
)
async def execute_federated_task(self, federation_id: str, task: Task) -> Dict[str, Any]:
"""Execute task across federated crews"""
# Coordinator analyzes and decomposes task
subtasks = await self.federation_coordinator.analyze_and_decompose(task)
# Assign subtasks to appropriate crews
assignments = await self._assign_subtasks_to_crews(federation_id, subtasks)
# Execute assignments in parallel
results = await self._execute_federated_assignments(assignments)
# Coordinator synthesizes results
final_result = await self.federation_coordinator.synthesize_results(results)
return {
"federation_id": federation_id,
"subtasks": len(subtasks),
"crew_assignments": len(assignments),
"results": results,
"final_result": final_result
}
async def _assign_subtasks_to_crews(self, federation_id: str, subtasks: List[Task]) -> Dict[str, List[Task]]:
"""Assign subtasks to appropriate specialized crews"""
assignments = {}
for subtask in subtasks:
crew_key = await self._select_crew_for_subtask(federation_id, subtask)
full_crew_key = f"{federation_id}_{crew_key}"
if full_crew_key not in assignments:
assignments[full_crew_key] = []
assignments[full_crew_key].append(subtask)
return assignments
async def _select_crew_for_subtask(self, federation_id: str, subtask: Task) -> str:
"""Select most appropriate crew for subtask"""
# Analyze subtask requirements
requirements = await self._analyze_subtask_requirements(subtask)
# Find best matching crew
best_match = None
best_score = 0
for crew_key in self.crews.keys():
if crew_key.startswith(f"{federation_id}_"):
crew_specialty = crew_key.split("_")[-1]
match_score = await self._calculate_crew_match(crew_specialty, requirements)
if match_score > best_score:
best_score = match_score
best_match = crew_specialty
return best_match or "general"class HierarchicalCrewOrganization:
def __init__(self):
self.crew_hierarchy = {}
self.management_levels = ["executive", "manager", "worker"]
async def create_hierarchy(self, hierarchy_config: Dict[str, Any]) -> str:
"""Create hierarchical crew organization"""
org_id = self._generate_org_id()
# Create crews for each level
for level in self.management_levels:
level_crews = hierarchy_config.get(level, [])
self.crew_hierarchy[f"{org_id}_{level}"] = []
for crew_config in level_crews:
crew = await self._create_level_crew(level, crew_config)
self.crew_hierarchy[f"{org_id}_{level}"].append(crew)
# Establish reporting relationships
await self._establish_reporting_relationships(org_id)
return org_id
async def execute_hierarchical_task(self, org_id: str, task: Task) -> Dict[str, Any]:
"""Execute task through hierarchical organization"""
# Executive level receives and analyzes task
executive_analysis = await self._executive_analysis(org_id, task)
# Manager level breaks down into projects
manager_breakdown = await self._manager_breakdown(org_id, executive_analysis)
# Worker level executes specific tasks
worker_execution = await self._worker_execution(org_id, manager_breakdown)
# Results flow back up the hierarchy
synthesis = await self._hierarchical_synthesis(org_id, worker_execution)
return {
"organization_id": org_id,
"executive_analysis": executive_analysis,
"manager_breakdown": manager_breakdown,
"worker_execution": worker_execution,
"final_synthesis": synthesis
}
async def _executive_analysis(self, org_id: str, task: Task) -> Dict[str, Any]:
"""Executive level analysis and strategic direction"""
executive_crews = self.crew_hierarchy.get(f"{org_id}_executive", [])
analysis_tasks = []
for crew in executive_crews:
analysis_task = Task(
description=f"Analyze strategic implications of: {task.description}",
agent=crew.agents[0],
expected_output="Strategic analysis and high-level direction"
)
analysis_tasks.append(analysis_task)
# Execute executive analysis
executive_results = await asyncio.gather(*[
crew.execute_task(task) for crew, task in zip(executive_crews, analysis_tasks)
])
return {
"executive_insights": executive_results,
"strategic_direction": await self._synthesize_executive_insights(executive_results)
}class ResearchDevelopmentCrew:
def __init__(self):
self.research_crew = None
self.development_crew = None
self.integration_crew = None
async def setup_rnd_crew(self, config: Dict[str, Any]) -> str:
"""Set up integrated R&D crew formation"""
rnd_id = self._generate_rnd_id()
# Research crew for investigation and analysis
self.research_crew = await self._create_research_crew(config["research"])
# Development crew for implementation
self.development_crew = await self._create_development_crew(config["development"])
# Integration crew for combining research and development
self.integration_crew = await self._create_integration_crew(config["integration"])
return rnd_id
async def execute_rnd_project(self, rnd_id: str, project: Dict[str, Any]) -> Dict[str, Any]:
"""Execute R&D project through integrated crews"""
# Research phase
research_findings = await self.research_crew.execute_task(
Task(
description=f"Research phase: {project['research_question']}",
agent=self.research_crew.agents[0],
expected_output="Comprehensive research findings and insights"
)
)
# Development phase (parallel with research refinement)
development_tasks = await self._create_development_tasks(project, research_findings)
development_results = await asyncio.gather(*[
self.development_crew.execute_task(task) for task in development_tasks
])
# Integration phase
integration_result = await self.integration_crew.execute_task(
Task(
description=f"Integrate research and development: {project['integration_requirements']}",
agent=self.integration_crew.agents[0],
expected_output="Integrated solution combining research and development",
context=[research_findings] + development_results
)
)
return {
"rnd_id": rnd_id,
"research_findings": research_findings,
"development_results": development_results,
"integration_result": integration_result,
"project_status": "completed"
}class CrisisManagementCrew:
def __init__(self):
self.assessment_crew = None
self.response_crew = None
self.communication_crew = None
self.recovery_crew = None
async def activate_crisis_mode(self, crisis_config: Dict[str, Any]) -> str:
"""Activate crisis management crew formation"""
crisis_id = self._generate_crisis_id()
# Rapid assessment crew
self.assessment_crew = await self._create_assessment_crew(crisis_config)
# Immediate response crew
self.response_crew = await self._create_response_crew(crisis_config)
# Communication crew for stakeholders
self.communication_crew = await self._create_communication_crew(crisis_config)
# Recovery planning crew
self.recovery_crew = await self._create_recovery_crew(crisis_config)
return crisis_id
async def manage_crisis(self, crisis_id: str, crisis_description: str) -> Dict[str, Any]:
"""Manage crisis through specialized crews"""
# Immediate assessment
assessment = await self.assessment_crew.execute_task(
Task(
description=f"Crisis assessment: {crisis_description}",
agent=self.assessment_crew.agents[0],
expected_output="Crisis severity, impact analysis, immediate actions needed"
)
)
# Parallel execution of response and communication
response_task = Task(
description=f"Execute crisis response based on assessment: {assessment}",
agent=self.response_crew.agents[0],
expected_output="Immediate response actions and implementation"
)
communication_task = Task(
description=f"Communicate crisis status to stakeholders: {assessment}",
agent=self.communication_crew.agents[0],
expected_output="Stakeholder communication plan and execution"
)
response_result, communication_result = await asyncio.gather(
self.response_crew.execute_task(response_task),
self.communication_crew.execute_task(communication_task)
)
# Recovery planning
recovery_plan = await self.recovery_crew.execute_task(
Task(
description=f"Develop recovery plan: {assessment} + {response_result}",
agent=self.recovery_crew.agents[0],
expected_output="Comprehensive recovery strategy and timeline"
)
)
return {
"crisis_id": crisis_id,
"assessment": assessment,
"immediate_response": response_result,
"stakeholder_communication": communication_result,
"recovery_plan": recovery_plan,
"crisis_status": "managed"
}class AdaptiveCrewAssembler:
def __init__(self, agent_pool: List[Agent]):
self.agent_pool = agent_pool
self.formation_history = []
self.performance_patterns = {}
async def assemble_crew(self, task: Task, context: Dict[str, Any]) -> Crew:
"""Dynamically assemble optimal crew for task"""
# Analyze task requirements
requirements = await self._analyze_requirements(task)
# Identify needed agent types
needed_types = await self._identify_agent_types(requirements)
# Select best agents for each type
selected_agents = await self._select_agents(needed_types, context)
# Create optimized crew
crew = await self._create_optimized_crew(selected_agents, task)
# Record formation for learning
await self._record_formation(task, selected_agents, requirements)
return crew
async def _analyze_requirements(self, task: Task) -> Dict[str, Any]:
"""Analyze task requirements for crew assembly"""
return {
"complexity": self._assess_complexity(task),
"skills_required": await self._extract_skills(task),
"estimated_duration": task.estimated_duration or 60,
"collaboration_level": self._assess_collaboration_needs(task),
"specialized_domains": await self._identify_domains(task)
}
async def _select_agents(self, needed_types: List[str], context: Dict[str, Any]) -> List[Agent]:
"""Select best agents based on type requirements and context"""
selected = []
for agent_type in needed_types:
candidates = [a for a in self.agent_pool if await self._matches_type(a, agent_type)]
if candidates:
# Select best candidate based on performance history
best_candidate = await self._select_best_candidate(candidates, agent_type, context)
selected.append(best_candidate)
return selected
async def _select_best_candidate(self, candidates: List[Agent], agent_type: str, context: Dict[str, Any]) -> Agent:
"""Select best candidate from pool"""
scored_candidates = []
for candidate in candidates:
score = await self._calculate_candidate_score(candidate, agent_type, context)
scored_candidates.append((candidate, score))
# Return highest scoring candidate
return max(scored_candidates, key=lambda x: x[1])[0]
async def _calculate_candidate_score(self, agent: Agent, agent_type: str, context: Dict[str, Any]) -> float:
"""Calculate candidate suitability score"""
base_score = 0.5 # Default score
# Performance history bonus
performance_bonus = await self._calculate_performance_bonus(agent, agent_type)
base_score += performance_bonus
# Context relevance bonus
context_bonus = await self._calculate_context_bonus(agent, context)
base_score += context_bonus
# Availability bonus
availability_bonus = await self._calculate_availability_bonus(agent)
base_score += availability_bonus
return min(base_score, 1.0) # Cap at 1.0class CrewPerformanceAnalytics:
def __init__(self):
self.crew_metrics = {}
self.performance_history = []
async def analyze_crew_performance(self, crew: Crew, task: Task, result: Any) -> Dict[str, Any]:
"""Analyze crew performance after task completion"""
metrics = {
"task_completion_time": await self._calculate_completion_time(crew, task),
"agent_utilization": await self._calculate_agent_utilization(crew),
"communication_efficiency": await self._calculate_communication_efficiency(crew),
"quality_score": await self._assess_result_quality(result),
"collaboration_score": await self._calculate_collaboration_score(crew)
}
# Store for historical analysis
self.performance_history.append({
"crew_id": crew.id,
"task": task.id,
"metrics": metrics,
"timestamp": self._get_timestamp()
})
# Update crew metrics
self.crew_metrics[crew.id] = metrics
return metrics
async def generate_optimization_recommendations(self, crew_id: str) -> List[str]:
"""Generate recommendations for crew optimization"""
metrics = self.crew_metrics.get(crew_id, {})
recommendations = []
if metrics.get("agent_utilization", 1) < 0.7:
recommendations.append("Consider adding more agents or redistributing workload")
if metrics.get("communication_efficiency", 1) < 0.8:
recommendations.append("Improve communication protocols and reduce message overhead")
if metrics.get("collaboration_score", 0) < 0.6:
recommendations.append("Enhance collaboration training and team building")
return recommendationsclass CrewScalingManager:
def __init__(self):
self.scaling_policies = {}
self.scaling_history = []
async def scale_crew(self, crew: Crew, scaling_config: Dict[str, Any]) -> Crew:
"""Scale crew based on workload and performance metrics"""
current_metrics = await self._get_current_metrics(crew)
# Determine scaling needs
scaling_decision = await self._analyze_scaling_needs(current_metrics, scaling_config)
if scaling_decision["action"] == "scale_up":
scaled_crew = await self._scale_up_crew(crew, scaling_decision)
elif scaling_decision["action"] == "scale_down":
scaled_crew = await self._scale_down_crew(crew, scaling_decision)
else:
scaled_crew = crew # No scaling needed
# Record scaling action
await self._record_scaling_action(crew, scaling_decision, scaled_crew)
return scaled_crew
async def _analyze_scaling_needs(self, metrics: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze if crew needs scaling"""
# Check workload vs capacity
workload_ratio = metrics.get("current_workload", 0) / config.get("max_capacity", 1)
if workload_ratio > 0.8:
return {
"action": "scale_up",
"reason": "High workload ratio",
"scale_factor": min(workload_ratio - 0.7, 1.0)
}
elif workload_ratio < 0.3:
return {
"action": "scale_down",
"reason": "Low workload ratio",
"scale_factor": 0.5
}
else:
return {
"action": "maintain",
"reason": "Optimal workload"
}✅ Built federated crew systems for distributed collaboration ✅ Created hierarchical organizations with management levels ✅ Developed specialized formations for different domains ✅ Implemented adaptive crew assembly based on task requirements ✅ Established performance analytics and optimization ✅ Implemented scaling strategies for growing workloads
Ready for production deployment? In Chapter 8: Production Deployment, we'll cover deploying CrewAI systems to production with monitoring, scaling, and maintenance strategies.
Key Takeaway: Advanced crew patterns enable sophisticated problem-solving at scale. From federated systems to hierarchical organizations, these patterns allow AI crews to tackle increasingly complex challenges through specialized collaboration and intelligent scaling.
Most teams struggle here because the hard part is not writing more code, but deciding clear boundaries for self, crew, task so behavior stays predictable as complexity grows.
In practical terms, this chapter helps you avoid three common failures:
- coupling core logic too tightly to one implementation path
- missing the handoff boundaries between setup, execution, and validation
- shipping changes without clear rollback or observability strategy
After working through this chapter, you should be able to reason about Chapter 7: Advanced Crew Patterns as an operating subsystem inside CrewAI Tutorial: Building Collaborative AI Agent Teams, with explicit contracts for inputs, state transitions, and outputs.
Use the implementation notes around Dict, Task, agents as your checklist when adapting these patterns to your own repository.
Under the hood, Chapter 7: Advanced Crew Patterns usually follows a repeatable control path:
- Context bootstrap: initialize runtime config and prerequisites for
self. - Input normalization: shape incoming data so
crewreceives stable contracts. - Core execution: run the main logic branch and propagate intermediate state through
task. - Policy and safety checks: enforce limits, auth scopes, and failure boundaries.
- Output composition: return canonical result payloads for downstream consumers.
- Operational telemetry: emit logs/metrics needed for debugging and performance tuning.
When debugging, walk this sequence in order and confirm each stage has explicit success/failure conditions.
Use the following upstream sources to verify implementation details while reading this chapter:
- View Repo
Why it matters: authoritative reference on
View Repo(github.com).
Suggested trace strategy:
- search upstream code for
selfandcrewto map concrete implementation paths - compare docs claims against actual runtime/config code before reusing patterns in production