Skip to content

Latest commit

 

History

History
579 lines (441 loc) · 22.1 KB

File metadata and controls

579 lines (441 loc) · 22.1 KB
layout default
title Chapter 7: Advanced Crew Patterns
parent CrewAI Tutorial
nav_order 7

Chapter 7: Advanced Crew Patterns

Welcome to Chapter 7: Advanced Crew Patterns. In this part of CrewAI Tutorial: Building Collaborative AI Agent Teams, you will build an intuitive mental model first, then move into concrete implementation details and practical production tradeoffs.

This chapter explores sophisticated multi-crew architectures, specialized formations, and advanced collaboration patterns that enable complex problem-solving at scale.

Multi-Crew Architectures

Federated Crew System

from crewai import Crew, Agent, Task
from typing import List, Dict, Any, Optional
import asyncio

class FederatedCrewSystem:
    def __init__(self):
        self.crews = {}
        self.federation_coordinator = None
        self.communication_hub = CommunicationHub()

    async def create_federation(self, crew_configs: List[Dict[str, Any]]) -> str:
        """Create a federation of specialized crews"""
        federation_id = self._generate_federation_id()

        # Create individual crews
        for config in crew_configs:
            crew = await self._create_specialized_crew(config)
            self.crews[f"{federation_id}_{config['specialty']}"] = crew

        # Create federation coordinator
        self.federation_coordinator = await self._create_coordinator(federation_id)

        return federation_id

    async def _create_specialized_crew(self, config: Dict[str, Any]) -> Crew:
        """Create a crew specialized in a specific domain"""
        agents = []

        for agent_config in config["agents"]:
            agent = Agent(
                role=agent_config["role"],
                goal=agent_config["goal"],
                backstory=agent_config["backstory"],
                tools=agent_config.get("tools", [])
            )
            agents.append(agent)

        return Crew(
            agents=agents,
            tasks=[],  # Tasks assigned dynamically
            verbose=True
        )

    async def execute_federated_task(self, federation_id: str, task: Task) -> Dict[str, Any]:
        """Execute task across federated crews"""
        # Coordinator analyzes and decomposes task
        subtasks = await self.federation_coordinator.analyze_and_decompose(task)

        # Assign subtasks to appropriate crews
        assignments = await self._assign_subtasks_to_crews(federation_id, subtasks)

        # Execute assignments in parallel
        results = await self._execute_federated_assignments(assignments)

        # Coordinator synthesizes results
        final_result = await self.federation_coordinator.synthesize_results(results)

        return {
            "federation_id": federation_id,
            "subtasks": len(subtasks),
            "crew_assignments": len(assignments),
            "results": results,
            "final_result": final_result
        }

    async def _assign_subtasks_to_crews(self, federation_id: str, subtasks: List[Task]) -> Dict[str, List[Task]]:
        """Assign subtasks to appropriate specialized crews"""
        assignments = {}

        for subtask in subtasks:
            crew_key = await self._select_crew_for_subtask(federation_id, subtask)
            full_crew_key = f"{federation_id}_{crew_key}"

            if full_crew_key not in assignments:
                assignments[full_crew_key] = []

            assignments[full_crew_key].append(subtask)

        return assignments

    async def _select_crew_for_subtask(self, federation_id: str, subtask: Task) -> str:
        """Select most appropriate crew for subtask"""
        # Analyze subtask requirements
        requirements = await self._analyze_subtask_requirements(subtask)

        # Find best matching crew
        best_match = None
        best_score = 0

        for crew_key in self.crews.keys():
            if crew_key.startswith(f"{federation_id}_"):
                crew_specialty = crew_key.split("_")[-1]
                match_score = await self._calculate_crew_match(crew_specialty, requirements)

                if match_score > best_score:
                    best_score = match_score
                    best_match = crew_specialty

        return best_match or "general"

Hierarchical Crew Organization

class HierarchicalCrewOrganization:
    def __init__(self):
        self.crew_hierarchy = {}
        self.management_levels = ["executive", "manager", "worker"]

    async def create_hierarchy(self, hierarchy_config: Dict[str, Any]) -> str:
        """Create hierarchical crew organization"""
        org_id = self._generate_org_id()

        # Create crews for each level
        for level in self.management_levels:
            level_crews = hierarchy_config.get(level, [])
            self.crew_hierarchy[f"{org_id}_{level}"] = []

            for crew_config in level_crews:
                crew = await self._create_level_crew(level, crew_config)
                self.crew_hierarchy[f"{org_id}_{level}"].append(crew)

        # Establish reporting relationships
        await self._establish_reporting_relationships(org_id)

        return org_id

    async def execute_hierarchical_task(self, org_id: str, task: Task) -> Dict[str, Any]:
        """Execute task through hierarchical organization"""
        # Executive level receives and analyzes task
        executive_analysis = await self._executive_analysis(org_id, task)

        # Manager level breaks down into projects
        manager_breakdown = await self._manager_breakdown(org_id, executive_analysis)

        # Worker level executes specific tasks
        worker_execution = await self._worker_execution(org_id, manager_breakdown)

        # Results flow back up the hierarchy
        synthesis = await self._hierarchical_synthesis(org_id, worker_execution)

        return {
            "organization_id": org_id,
            "executive_analysis": executive_analysis,
            "manager_breakdown": manager_breakdown,
            "worker_execution": worker_execution,
            "final_synthesis": synthesis
        }

    async def _executive_analysis(self, org_id: str, task: Task) -> Dict[str, Any]:
        """Executive level analysis and strategic direction"""
        executive_crews = self.crew_hierarchy.get(f"{org_id}_executive", [])

        analysis_tasks = []
        for crew in executive_crews:
            analysis_task = Task(
                description=f"Analyze strategic implications of: {task.description}",
                agent=crew.agents[0],
                expected_output="Strategic analysis and high-level direction"
            )
            analysis_tasks.append(analysis_task)

        # Execute executive analysis
        executive_results = await asyncio.gather(*[
            crew.execute_task(task) for crew, task in zip(executive_crews, analysis_tasks)
        ])

        return {
            "executive_insights": executive_results,
            "strategic_direction": await self._synthesize_executive_insights(executive_results)
        }

Specialized Crew Formations

Research & Development Crew

class ResearchDevelopmentCrew:
    def __init__(self):
        self.research_crew = None
        self.development_crew = None
        self.integration_crew = None

    async def setup_rnd_crew(self, config: Dict[str, Any]) -> str:
        """Set up integrated R&D crew formation"""
        rnd_id = self._generate_rnd_id()

        # Research crew for investigation and analysis
        self.research_crew = await self._create_research_crew(config["research"])

        # Development crew for implementation
        self.development_crew = await self._create_development_crew(config["development"])

        # Integration crew for combining research and development
        self.integration_crew = await self._create_integration_crew(config["integration"])

        return rnd_id

    async def execute_rnd_project(self, rnd_id: str, project: Dict[str, Any]) -> Dict[str, Any]:
        """Execute R&D project through integrated crews"""
        # Research phase
        research_findings = await self.research_crew.execute_task(
            Task(
                description=f"Research phase: {project['research_question']}",
                agent=self.research_crew.agents[0],
                expected_output="Comprehensive research findings and insights"
            )
        )

        # Development phase (parallel with research refinement)
        development_tasks = await self._create_development_tasks(project, research_findings)
        development_results = await asyncio.gather(*[
            self.development_crew.execute_task(task) for task in development_tasks
        ])

        # Integration phase
        integration_result = await self.integration_crew.execute_task(
            Task(
                description=f"Integrate research and development: {project['integration_requirements']}",
                agent=self.integration_crew.agents[0],
                expected_output="Integrated solution combining research and development",
                context=[research_findings] + development_results
            )
        )

        return {
            "rnd_id": rnd_id,
            "research_findings": research_findings,
            "development_results": development_results,
            "integration_result": integration_result,
            "project_status": "completed"
        }

Crisis Management Crew

class CrisisManagementCrew:
    def __init__(self):
        self.assessment_crew = None
        self.response_crew = None
        self.communication_crew = None
        self.recovery_crew = None

    async def activate_crisis_mode(self, crisis_config: Dict[str, Any]) -> str:
        """Activate crisis management crew formation"""
        crisis_id = self._generate_crisis_id()

        # Rapid assessment crew
        self.assessment_crew = await self._create_assessment_crew(crisis_config)

        # Immediate response crew
        self.response_crew = await self._create_response_crew(crisis_config)

        # Communication crew for stakeholders
        self.communication_crew = await self._create_communication_crew(crisis_config)

        # Recovery planning crew
        self.recovery_crew = await self._create_recovery_crew(crisis_config)

        return crisis_id

    async def manage_crisis(self, crisis_id: str, crisis_description: str) -> Dict[str, Any]:
        """Manage crisis through specialized crews"""
        # Immediate assessment
        assessment = await self.assessment_crew.execute_task(
            Task(
                description=f"Crisis assessment: {crisis_description}",
                agent=self.assessment_crew.agents[0],
                expected_output="Crisis severity, impact analysis, immediate actions needed"
            )
        )

        # Parallel execution of response and communication
        response_task = Task(
            description=f"Execute crisis response based on assessment: {assessment}",
            agent=self.response_crew.agents[0],
            expected_output="Immediate response actions and implementation"
        )

        communication_task = Task(
            description=f"Communicate crisis status to stakeholders: {assessment}",
            agent=self.communication_crew.agents[0],
            expected_output="Stakeholder communication plan and execution"
        )

        response_result, communication_result = await asyncio.gather(
            self.response_crew.execute_task(response_task),
            self.communication_crew.execute_task(communication_task)
        )

        # Recovery planning
        recovery_plan = await self.recovery_crew.execute_task(
            Task(
                description=f"Develop recovery plan: {assessment} + {response_result}",
                agent=self.recovery_crew.agents[0],
                expected_output="Comprehensive recovery strategy and timeline"
            )
        )

        return {
            "crisis_id": crisis_id,
            "assessment": assessment,
            "immediate_response": response_result,
            "stakeholder_communication": communication_result,
            "recovery_plan": recovery_plan,
            "crisis_status": "managed"
        }

Dynamic Crew Formation

Adaptive Crew Assembly

class AdaptiveCrewAssembler:
    def __init__(self, agent_pool: List[Agent]):
        self.agent_pool = agent_pool
        self.formation_history = []
        self.performance_patterns = {}

    async def assemble_crew(self, task: Task, context: Dict[str, Any]) -> Crew:
        """Dynamically assemble optimal crew for task"""
        # Analyze task requirements
        requirements = await self._analyze_requirements(task)

        # Identify needed agent types
        needed_types = await self._identify_agent_types(requirements)

        # Select best agents for each type
        selected_agents = await self._select_agents(needed_types, context)

        # Create optimized crew
        crew = await self._create_optimized_crew(selected_agents, task)

        # Record formation for learning
        await self._record_formation(task, selected_agents, requirements)

        return crew

    async def _analyze_requirements(self, task: Task) -> Dict[str, Any]:
        """Analyze task requirements for crew assembly"""
        return {
            "complexity": self._assess_complexity(task),
            "skills_required": await self._extract_skills(task),
            "estimated_duration": task.estimated_duration or 60,
            "collaboration_level": self._assess_collaboration_needs(task),
            "specialized_domains": await self._identify_domains(task)
        }

    async def _select_agents(self, needed_types: List[str], context: Dict[str, Any]) -> List[Agent]:
        """Select best agents based on type requirements and context"""
        selected = []

        for agent_type in needed_types:
            candidates = [a for a in self.agent_pool if await self._matches_type(a, agent_type)]

            if candidates:
                # Select best candidate based on performance history
                best_candidate = await self._select_best_candidate(candidates, agent_type, context)
                selected.append(best_candidate)

        return selected

    async def _select_best_candidate(self, candidates: List[Agent], agent_type: str, context: Dict[str, Any]) -> Agent:
        """Select best candidate from pool"""
        scored_candidates = []

        for candidate in candidates:
            score = await self._calculate_candidate_score(candidate, agent_type, context)
            scored_candidates.append((candidate, score))

        # Return highest scoring candidate
        return max(scored_candidates, key=lambda x: x[1])[0]

    async def _calculate_candidate_score(self, agent: Agent, agent_type: str, context: Dict[str, Any]) -> float:
        """Calculate candidate suitability score"""
        base_score = 0.5  # Default score

        # Performance history bonus
        performance_bonus = await self._calculate_performance_bonus(agent, agent_type)
        base_score += performance_bonus

        # Context relevance bonus
        context_bonus = await self._calculate_context_bonus(agent, context)
        base_score += context_bonus

        # Availability bonus
        availability_bonus = await self._calculate_availability_bonus(agent)
        base_score += availability_bonus

        return min(base_score, 1.0)  # Cap at 1.0

Performance Optimization

Crew Performance Analytics

class CrewPerformanceAnalytics:
    def __init__(self):
        self.crew_metrics = {}
        self.performance_history = []

    async def analyze_crew_performance(self, crew: Crew, task: Task, result: Any) -> Dict[str, Any]:
        """Analyze crew performance after task completion"""
        metrics = {
            "task_completion_time": await self._calculate_completion_time(crew, task),
            "agent_utilization": await self._calculate_agent_utilization(crew),
            "communication_efficiency": await self._calculate_communication_efficiency(crew),
            "quality_score": await self._assess_result_quality(result),
            "collaboration_score": await self._calculate_collaboration_score(crew)
        }

        # Store for historical analysis
        self.performance_history.append({
            "crew_id": crew.id,
            "task": task.id,
            "metrics": metrics,
            "timestamp": self._get_timestamp()
        })

        # Update crew metrics
        self.crew_metrics[crew.id] = metrics

        return metrics

    async def generate_optimization_recommendations(self, crew_id: str) -> List[str]:
        """Generate recommendations for crew optimization"""
        metrics = self.crew_metrics.get(crew_id, {})
        recommendations = []

        if metrics.get("agent_utilization", 1) < 0.7:
            recommendations.append("Consider adding more agents or redistributing workload")

        if metrics.get("communication_efficiency", 1) < 0.8:
            recommendations.append("Improve communication protocols and reduce message overhead")

        if metrics.get("collaboration_score", 0) < 0.6:
            recommendations.append("Enhance collaboration training and team building")

        return recommendations

Scaling Strategies

Horizontal Crew Scaling

class CrewScalingManager:
    def __init__(self):
        self.scaling_policies = {}
        self.scaling_history = []

    async def scale_crew(self, crew: Crew, scaling_config: Dict[str, Any]) -> Crew:
        """Scale crew based on workload and performance metrics"""
        current_metrics = await self._get_current_metrics(crew)

        # Determine scaling needs
        scaling_decision = await self._analyze_scaling_needs(current_metrics, scaling_config)

        if scaling_decision["action"] == "scale_up":
            scaled_crew = await self._scale_up_crew(crew, scaling_decision)
        elif scaling_decision["action"] == "scale_down":
            scaled_crew = await self._scale_down_crew(crew, scaling_decision)
        else:
            scaled_crew = crew  # No scaling needed

        # Record scaling action
        await self._record_scaling_action(crew, scaling_decision, scaled_crew)

        return scaled_crew

    async def _analyze_scaling_needs(self, metrics: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]:
        """Analyze if crew needs scaling"""
        # Check workload vs capacity
        workload_ratio = metrics.get("current_workload", 0) / config.get("max_capacity", 1)

        if workload_ratio > 0.8:
            return {
                "action": "scale_up",
                "reason": "High workload ratio",
                "scale_factor": min(workload_ratio - 0.7, 1.0)
            }
        elif workload_ratio < 0.3:
            return {
                "action": "scale_down",
                "reason": "Low workload ratio",
                "scale_factor": 0.5
            }
        else:
            return {
                "action": "maintain",
                "reason": "Optimal workload"
            }

What We've Accomplished

Built federated crew systems for distributed collaboration ✅ Created hierarchical organizations with management levels ✅ Developed specialized formations for different domains ✅ Implemented adaptive crew assembly based on task requirements ✅ Established performance analytics and optimization ✅ Implemented scaling strategies for growing workloads

Next Steps

Ready for production deployment? In Chapter 8: Production Deployment, we'll cover deploying CrewAI systems to production with monitoring, scaling, and maintenance strategies.


Key Takeaway: Advanced crew patterns enable sophisticated problem-solving at scale. From federated systems to hierarchical organizations, these patterns allow AI crews to tackle increasingly complex challenges through specialized collaboration and intelligent scaling.

What Problem Does This Solve?

Most teams struggle here because the hard part is not writing more code, but deciding clear boundaries for self, crew, task so behavior stays predictable as complexity grows.

In practical terms, this chapter helps you avoid three common failures:

  • coupling core logic too tightly to one implementation path
  • missing the handoff boundaries between setup, execution, and validation
  • shipping changes without clear rollback or observability strategy

After working through this chapter, you should be able to reason about Chapter 7: Advanced Crew Patterns as an operating subsystem inside CrewAI Tutorial: Building Collaborative AI Agent Teams, with explicit contracts for inputs, state transitions, and outputs.

Use the implementation notes around Dict, Task, agents as your checklist when adapting these patterns to your own repository.

How it Works Under the Hood

Under the hood, Chapter 7: Advanced Crew Patterns usually follows a repeatable control path:

  1. Context bootstrap: initialize runtime config and prerequisites for self.
  2. Input normalization: shape incoming data so crew receives stable contracts.
  3. Core execution: run the main logic branch and propagate intermediate state through task.
  4. Policy and safety checks: enforce limits, auth scopes, and failure boundaries.
  5. Output composition: return canonical result payloads for downstream consumers.
  6. Operational telemetry: emit logs/metrics needed for debugging and performance tuning.

When debugging, walk this sequence in order and confirm each stage has explicit success/failure conditions.

Source Walkthrough

Use the following upstream sources to verify implementation details while reading this chapter:

  • View Repo Why it matters: authoritative reference on View Repo (github.com).

Suggested trace strategy:

  • search upstream code for self and crew to map concrete implementation paths
  • compare docs claims against actual runtime/config code before reusing patterns in production

Chapter Connections