| layout | default |
|---|---|
| title | Chapter 8: Production Deployment |
| parent | CrewAI Tutorial |
| nav_order | 8 |
Welcome to Chapter 8: Production Deployment. In this part of CrewAI Tutorial: Building Collaborative AI Agent Teams, you will build an intuitive mental model first, then move into concrete implementation details and practical production tradeoffs.
Congratulations! You've mastered CrewAI from basic agents to advanced multi-crew architectures. Now it's time to deploy your AI crew systems to production with robust monitoring, scaling, and maintenance strategies.
from crewai import Crew, Agent, Task
from typing import Dict, List, Any, Optional
import asyncio
import logging
class ProductionCrewManager:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.crews = {}
self.monitoring = ProductionMonitoring()
self.scaling = AutoScalingManager()
self.backup = BackupManager()
self.logger = self._setup_logging()
async def deploy_crew_system(self, system_config: Dict[str, Any]) -> str:
"""Deploy complete crew system to production"""
system_id = self._generate_system_id()
try:
# Create production crews
await self._create_production_crews(system_config, system_id)
# Set up monitoring
await self.monitoring.setup_monitoring(system_id, self.crews)
# Configure auto-scaling
await self.scaling.setup_auto_scaling(system_id, system_config)
# Initialize backup systems
await self.backup.setup_backups(system_id, system_config)
# Health checks
await self._perform_health_checks(system_id)
self.logger.info(f"Successfully deployed crew system {system_id}")
return system_id
except Exception as e:
self.logger.error(f"Failed to deploy crew system: {e}")
await self._rollback_deployment(system_id)
raise
async def _create_production_crews(self, config: Dict[str, Any], system_id: str):
"""Create optimized crews for production"""
for crew_config in config["crews"]:
crew = await self._create_optimized_crew(crew_config)
crew_id = f"{system_id}_{crew_config['name']}"
self.crews[crew_id] = crew
async def _create_optimized_crew(self, crew_config: Dict[str, Any]) -> Crew:
"""Create crew optimized for production performance"""
agents = []
for agent_config in crew_config["agents"]:
agent = Agent(
role=agent_config["role"],
goal=agent_config["goal"],
backstory=agent_config["backstory"],
tools=agent_config.get("tools", []),
verbose=self.config.get("debug_mode", False),
max_execution_time=agent_config.get("timeout", 300),
memory=agent_config.get("memory_enabled", True)
)
agents.append(agent)
return Crew(
agents=agents,
tasks=[], # Tasks assigned dynamically
verbose=self.config.get("debug_mode", False),
process=crew_config.get("process", "sequential")
)class ProductionMonitoring:
def __init__(self):
self.metrics = {}
self.alerts = AlertManager()
self.logger = logging.getLogger(__name__)
async def setup_monitoring(self, system_id: str, crews: Dict[str, Crew]):
"""Set up comprehensive monitoring for production system"""
# Performance monitoring
await self._setup_performance_monitoring(system_id, crews)
# Health monitoring
await self._setup_health_monitoring(system_id, crews)
# Error tracking
await self._setup_error_tracking(system_id)
# Resource monitoring
await self._setup_resource_monitoring(system_id)
async def _setup_performance_monitoring(self, system_id: str, crews: Dict[str, Crew]):
"""Monitor crew performance metrics"""
for crew_id, crew in crews.items():
# Task completion metrics
self.metrics[f"{crew_id}_completion_rate"] = 0
# Response time metrics
self.metrics[f"{crew_id}_avg_response_time"] = 0
# Error rate metrics
self.metrics[f"{crew_id}_error_rate"] = 0
async def collect_metrics(self, system_id: str) -> Dict[str, Any]:
"""Collect current system metrics"""
metrics = {}
for metric_key, current_value in self.metrics.items():
metrics[metric_key] = {
"current": current_value,
"timestamp": self._get_timestamp(),
"thresholds": self._get_metric_thresholds(metric_key)
}
# Check for alerts
alerts = await self._check_alerts(metrics)
if alerts:
await self.alerts.send_alerts(alerts)
return metrics
async def _check_alerts(self, metrics: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Check metrics against thresholds and generate alerts"""
alerts = []
for metric_key, metric_data in metrics.items():
thresholds = metric_data["thresholds"]
current_value = metric_data["current"]
if current_value > thresholds["critical"]:
alerts.append({
"level": "critical",
"metric": metric_key,
"value": current_value,
"threshold": thresholds["critical"]
})
elif current_value > thresholds["warning"]:
alerts.append({
"level": "warning",
"metric": metric_key,
"value": current_value,
"threshold": thresholds["warning"]
})
return alerts
def _get_metric_thresholds(self, metric_key: str) -> Dict[str, float]:
"""Get alert thresholds for metric"""
thresholds = {
"completion_rate": {"warning": 0.8, "critical": 0.6},
"avg_response_time": {"warning": 60, "critical": 120},
"error_rate": {"warning": 0.05, "critical": 0.1}
}
metric_type = "_".join(metric_key.split("_")[-2:])
return thresholds.get(metric_type, {"warning": 0.8, "critical": 0.6})class AutoScalingManager:
def __init__(self):
self.scaling_policies = {}
self.current_capacity = {}
async def setup_auto_scaling(self, system_id: str, config: Dict[str, Any]):
"""Set up auto-scaling policies for crew system"""
for crew_config in config["crews"]:
crew_id = f"{system_id}_{crew_config['name']}"
policy = {
"min_agents": crew_config.get("min_agents", 1),
"max_agents": crew_config.get("max_agents", 10),
"scale_up_threshold": crew_config.get("scale_up_threshold", 0.8),
"scale_down_threshold": crew_config.get("scale_down_threshold", 0.3),
"cooldown_period": crew_config.get("cooldown_period", 300)
}
self.scaling_policies[crew_id] = policy
self.current_capacity[crew_id] = len(crew_config["agents"])
async def evaluate_scaling(self, system_id: str, metrics: Dict[str, Any]):
"""Evaluate and execute scaling decisions"""
scaling_decisions = []
for crew_id, policy in self.scaling_policies.items():
if not crew_id.startswith(f"{system_id}_"):
continue
# Get relevant metrics
workload = metrics.get(f"{crew_id}_workload", 0)
current_capacity = self.current_capacity[crew_id]
# Calculate utilization
utilization = workload / max(current_capacity, 1)
# Make scaling decision
decision = await self._make_scaling_decision(
crew_id, utilization, policy, current_capacity
)
if decision:
scaling_decisions.append(decision)
# Execute scaling decisions
for decision in scaling_decisions:
await self._execute_scaling_decision(decision)
async def _make_scaling_decision(self, crew_id: str, utilization: float,
policy: Dict[str, Any], current_capacity: int) -> Optional[Dict[str, Any]]:
"""Make scaling decision based on utilization and policy"""
decision = None
if utilization > policy["scale_up_threshold"]:
new_capacity = min(current_capacity + 1, policy["max_agents"])
if new_capacity > current_capacity:
decision = {
"crew_id": crew_id,
"action": "scale_up",
"current_capacity": current_capacity,
"new_capacity": new_capacity,
"reason": f"High utilization: {utilization:.2f}"
}
elif utilization < policy["scale_down_threshold"]:
new_capacity = max(current_capacity - 1, policy["min_agents"])
if new_capacity < current_capacity:
decision = {
"crew_id": crew_id,
"action": "scale_down",
"current_capacity": current_capacity,
"new_capacity": new_capacity,
"reason": f"Low utilization: {utilization:.2f}"
}
return decision
async def _execute_scaling_decision(self, decision: Dict[str, Any]):
"""Execute scaling decision"""
crew_id = decision["crew_id"]
action = decision["action"]
new_capacity = decision["new_capacity"]
# Implementation would add/remove agents from crew
self.logger.info(f"Scaling {crew_id}: {action} to {new_capacity} agents")
self.current_capacity[crew_id] = new_capacityclass BackupManager:
def __init__(self):
self.backup_configs = {}
self.backup_history = []
async def setup_backups(self, system_id: str, config: Dict[str, Any]):
"""Set up backup system for crew data and configurations"""
backup_config = {
"frequency": config.get("backup_frequency", "daily"),
"retention_days": config.get("retention_days", 30),
"storage_type": config.get("backup_storage", "cloud"),
"encryption": config.get("encryption_enabled", True),
"automated_testing": config.get("test_restores", True)
}
self.backup_configs[system_id] = backup_config
# Schedule backups
await self._schedule_backups(system_id, backup_config)
async def perform_backup(self, system_id: str) -> Dict[str, Any]:
"""Perform backup of crew system"""
config = self.backup_configs[system_id]
backup_data = {
"crew_configs": await self._backup_crew_configs(system_id),
"agent_memories": await self._backup_agent_memories(system_id),
"task_histories": await self._backup_task_histories(system_id),
"system_metrics": await self._backup_system_metrics(system_id),
"timestamp": self._get_timestamp()
}
# Encrypt backup if enabled
if config["encryption"]:
backup_data = await self._encrypt_backup(backup_data)
# Store backup
backup_id = await self._store_backup(backup_data, config)
# Record backup
self.backup_history.append({
"backup_id": backup_id,
"system_id": system_id,
"timestamp": backup_data["timestamp"],
"size": len(str(backup_data))
})
return {
"backup_id": backup_id,
"status": "completed",
"size": len(str(backup_data))
}
async def restore_backup(self, system_id: str, backup_id: str) -> Dict[str, Any]:
"""Restore crew system from backup"""
# Retrieve backup
backup_data = await self._retrieve_backup(backup_id)
# Decrypt if needed
if self.backup_configs[system_id]["encryption"]:
backup_data = await self._decrypt_backup(backup_data)
# Restore components
await self._restore_crew_configs(system_id, backup_data)
await self._restore_agent_memories(system_id, backup_data)
await self._restore_task_histories(system_id, backup_data)
await self._restore_system_metrics(system_id, backup_data)
return {
"status": "restored",
"backup_id": backup_id,
"timestamp": backup_data["timestamp"]
}
async def _schedule_backups(self, system_id: str, config: Dict[str, Any]):
"""Schedule automated backups"""
import schedule
import time
def backup_job():
asyncio.create_task(self.perform_backup(system_id))
if config["frequency"] == "daily":
schedule.every().day.at("02:00").do(backup_job)
elif config["frequency"] == "hourly":
schedule.every().hour.do(backup_job)
# Start scheduler in background
asyncio.create_task(self._run_scheduler())
async def _run_scheduler(self):
"""Run backup scheduler"""
while True:
schedule.run_pending()
await asyncio.sleep(60) # Check every minutefrom fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
class CrewAPI:
def __init__(self, crew_manager: ProductionCrewManager):
self.crew_manager = crew_manager
self.app = FastAPI(title="CrewAI Production API")
self._setup_routes()
def _setup_routes(self):
@self.app.post("/crews/{system_id}/tasks")
async def execute_task(system_id: str, task: TaskRequest):
try:
result = await self.crew_manager.execute_task(system_id, task)
return {"status": "success", "result": result}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@self.app.get("/crews/{system_id}/metrics")
async def get_metrics(system_id: str):
metrics = await self.crew_manager.monitoring.collect_metrics(system_id)
return {"metrics": metrics}
@self.app.post("/crews/{system_id}/scale")
async def scale_crew(system_id: str, scaling: ScalingRequest):
await self.crew_manager.scaling.scale_crew(system_id, scaling)
return {"status": "scaled"}
@self.app.post("/crews/{system_id}/backup")
async def create_backup(system_id: str):
result = await self.crew_manager.backup.perform_backup(system_id)
return {"backup": result}
def start_server(self, host: str = "0.0.0.0", port: int = 8000):
"""Start the API server"""
uvicorn.run(self.app, host=host, port=port)
class TaskRequest(BaseModel):
description: str
crew_name: str
priority: str = "medium"
timeout: int = 300
class ScalingRequest(BaseModel):
crew_name: str
action: str # "scale_up" or "scale_down"
factor: int = 1# .github/workflows/deploy-crewai.yml
name: Deploy CrewAI System
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run tests
run: python -m pytest tests/
- name: Test crew configurations
run: python -m pytest tests/test_crews.py
build:
needs: test
runs-on: ubuntu-latest
steps:
- name: Build Docker image
run: docker build -t crewai-system:${{ github.sha }} .
deploy-staging:
needs: build
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Deploy to staging
run: |
kubectl set image deployment/crewai-system crewai-system=crewai-system:${{ github.sha }}
kubectl rollout status deployment/crewai-system
deploy-production:
needs: deploy-staging
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Deploy to production
run: |
kubectl set image deployment/crewai-system crewai-system=crewai-system:${{ github.sha }}
kubectl rollout status deployment/crewai-systemclass ProductionConfig:
def __init__(self):
self.environments = {
"development": {
"debug_mode": True,
"log_level": "DEBUG",
"max_concurrent_tasks": 5,
"backup_frequency": "hourly"
},
"staging": {
"debug_mode": False,
"log_level": "INFO",
"max_concurrent_tasks": 20,
"backup_frequency": "daily"
},
"production": {
"debug_mode": False,
"log_level": "WARNING",
"max_concurrent_tasks": 100,
"backup_frequency": "daily"
}
}
def get_config(self, environment: str) -> Dict[str, Any]:
"""Get configuration for specific environment"""
if environment not in self.environments:
raise ValueError(f"Unknown environment: {environment}")
return self.environments[environment]
def validate_config(self, config: Dict[str, Any]) -> List[str]:
"""Validate configuration"""
errors = []
if config.get("max_concurrent_tasks", 0) <= 0:
errors.append("max_concurrent_tasks must be positive")
if config.get("log_level") not in ["DEBUG", "INFO", "WARNING", "ERROR"]:
errors.append("Invalid log_level")
return errors✅ Built production-ready crew infrastructure with monitoring and scaling ✅ Implemented comprehensive monitoring system with alerts ✅ Created auto-scaling capabilities for dynamic workloads ✅ Established backup and recovery systems ✅ Developed production API for crew management ✅ Set up CI/CD pipeline for automated deployment ✅ Implemented configuration management for different environments
You've successfully completed the CrewAI tutorial! Here's what you've accomplished:
- Crew Fundamentals: Built your first collaborative AI agents
- Agent Roles: Created specialized agents with distinct capabilities
- Task Planning: Mastered complex task decomposition and planning
- Tool Integration: Extended agent capabilities with external tools
- Communication: Implemented effective inter-agent communication
- Process Management: Built different execution patterns and monitoring
- Advanced Patterns: Created sophisticated multi-crew architectures
- Production Deployment: Deployed scalable, monitored crew systems
Your CrewAI journey continues with:
- Experiment with different crew configurations for your use cases
- Scale your implementations based on real-world requirements
- Contribute to the CrewAI community and share your experiences
- Explore advanced features and integrations as they become available
- Build production applications that leverage collaborative AI
Final Thought: CrewAI represents the future of AI collaboration—where specialized agents work together like a well-coordinated team to solve complex problems. You've mastered the fundamentals and are ready to build the next generation of intelligent systems!
Welcome to the era of collaborative AI! 🚀
Most teams struggle here because the hard part is not writing more code, but deciding clear boundaries for self, system_id, config so behavior stays predictable as complexity grows.
In practical terms, this chapter helps you avoid three common failures:
- coupling core logic too tightly to one implementation path
- missing the handoff boundaries between setup, execution, and validation
- shipping changes without clear rollback or observability strategy
After working through this chapter, you should be able to reason about Chapter 8: Production Deployment as an operating subsystem inside CrewAI Tutorial: Building Collaborative AI Agent Teams, with explicit contracts for inputs, state transitions, and outputs.
Use the implementation notes around metrics, crew_id, Dict as your checklist when adapting these patterns to your own repository.
Under the hood, Chapter 8: Production Deployment usually follows a repeatable control path:
- Context bootstrap: initialize runtime config and prerequisites for
self. - Input normalization: shape incoming data so
system_idreceives stable contracts. - Core execution: run the main logic branch and propagate intermediate state through
config. - Policy and safety checks: enforce limits, auth scopes, and failure boundaries.
- Output composition: return canonical result payloads for downstream consumers.
- Operational telemetry: emit logs/metrics needed for debugging and performance tuning.
When debugging, walk this sequence in order and confirm each stage has explicit success/failure conditions.
Use the following upstream sources to verify implementation details while reading this chapter:
- View Repo
Why it matters: authoritative reference on
View Repo(github.com).
Suggested trace strategy:
- search upstream code for
selfandsystem_idto map concrete implementation paths - compare docs claims against actual runtime/config code before reusing patterns in production