| layout | default |
|---|---|
| title | Phidata Tutorial - Chapter 8: Production Deployment |
| nav_order | 8 |
| has_children | false |
| parent | Phidata Tutorial |
Welcome to Chapter 8: Production Deployment & Scaling Phidata Agents. In this part of Phidata Tutorial: Building Autonomous AI Agents, you will build an intuitive mental model first, then move into concrete implementation details and practical production tradeoffs.
Deploy autonomous agent systems at enterprise scale with high availability, monitoring, and production best practices.
┌─────────────────┐ ┌─────────────────┐
│ API Gateway │ │ Load Balancer │
│ (Kong/Traefik) │ │ (NGINX) │
└─────────────────┘ └─────────────────┘
│ │
└───────────────────────┘
│
┌─────────────────┐
│ Agent Router │
│ (Phidata API) │
└─────────────────┘
│
┌─────────────────┐
│ Agent Pool │
│ (Kubernetes) │
└─────────────────┘
│
┌─────────────────┐
│ Model Cache │
│ (Redis) │
└─────────────────┘
│
┌─────────────────┐
│ Vector DB │
│ (Qdrant/Pinecone│
│ for Memory) │
└─────────────────┘
│
┌─────────────────┐
│ Task Queue │
│ (Redis/Rabbit) │
└─────────────────┘
│
┌─────────────────┐
│ Monitoring │
│ (Prometheus/ │
│ Grafana) │
└─────────────────┘
# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: phidata
labels:
name: phidata
---
# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: phidata-config
namespace: phidata
data:
MODEL_NAME: "gpt-4"
MAX_CONCURRENT_AGENTS: "10"
AGENT_TIMEOUT: "300"
CACHE_TTL: "3600"
LOG_LEVEL: "INFO"
ENABLE_METRICS: "true"
DATABASE_URL: "postgresql://user:password@postgres:5432/phidata"
REDIS_URL: "redis://redis:6379"
VECTOR_DB_URL: "http://qdrant:6333"
---
# k8s/secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: phidata-secrets
namespace: phidata
type: Opaque
stringData:
OPENAI_API_KEY: "sk-your-openai-key"
ANTHROPIC_API_KEY: "sk-ant-your-anthropic-key"
DATABASE_PASSWORD: "your-db-password"
REDIS_PASSWORD: "your-redis-password"
API_SECRET_KEY: "your-api-secret"
---
# k8s/agent-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: phidata-agents
namespace: phidata
spec:
replicas: 5
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 2
maxSurge: 2
selector:
matchLabels:
app: phidata-agent
template:
metadata:
labels:
app: phidata-agent
spec:
containers:
- name: phidata-agent
image: phidata/agents:latest
ports:
- containerPort: 8000
envFrom:
- configMapRef:
name: phidata-config
- secretRef:
name: phidata-secrets
volumeMounts:
- name: agent-logs
mountPath: /app/logs
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 60
periodSeconds: 30
timeoutSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
startupProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 120
periodSeconds: 30
timeoutSeconds: 10
failureThreshold: 6
volumes:
- name: agent-logs
emptyDir: {}
---
# k8s/api-service.yaml
apiVersion: v1
kind: Service
metadata:
name: phidata-api
namespace: phidata
spec:
selector:
app: phidata-agent
ports:
- port: 80
targetPort: 8000
type: ClusterIP
---
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: phidata-hpa
namespace: phidata
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: phidata-agents
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: http_requests_per_second
target:
type: AverageValue
averageValue: "10"
---
# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: phidata-ingress
namespace: phidata
annotations:
nginx.ingress.kubernetes.io/ssl-redirect: "true"
cert-manager.io/cluster-issuer: "letsencrypt-prod"
nginx.ingress.kubernetes.io/rate-limit: "100"
nginx.ingress.kubernetes.io/rate-limit-window: "1m"
nginx.ingress.kubernetes.io/proxy-body-size: "10m"
nginx.ingress.kubernetes.io/proxy-read-timeout: "300"
nginx.ingress.kubernetes.io/proxy-send-timeout: "300"
spec:
tls:
- hosts:
- agents.company.com
secretName: phidata-tls
rules:
- host: agents.company.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: phidata-api
port:
number: 80# docker-compose.prod.yml
version: '3.8'
services:
# Phidata API Service
phidata-api:
build:
context: .
dockerfile: Dockerfile.production
container_name: phidata-api-prod
restart: unless-stopped
environment:
- MODEL_NAME=gpt-4
- MAX_CONCURRENT_AGENTS=20
- AGENT_TIMEOUT=300
- CACHE_TTL=3600
- DATABASE_URL=postgresql://user:password@postgres:5432/phidata
- REDIS_URL=redis://redis:6379
- VECTOR_DB_URL=http://qdrant:6333
- LOG_LEVEL=INFO
- ENABLE_METRICS=true
ports:
- "8000:8000"
volumes:
- ./logs:/app/logs
- agent_data:/app/data
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
qdrant:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
# PostgreSQL Database
postgres:
image: postgres:15
container_name: phidata-postgres
restart: unless-stopped
environment:
- POSTGRES_DB=phidata
- POSTGRES_USER=${DB_USER}
- POSTGRES_PASSWORD=${DB_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
- ./db/init.sql:/docker-entrypoint-initdb.d/init.sql
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${DB_USER} -d phidata"]
interval: 10s
timeout: 5s
retries: 5
# Redis Cache & Queue
redis:
image: redis:7-alpine
container_name: phidata-redis
restart: unless-stopped
command: redis-server --appendonly yes --requirepass ${REDIS_PASSWORD}
volumes:
- redis_data:/data
healthcheck:
test: ["CMD", "redis-cli", "--raw", "incr", "ping"]
interval: 10s
timeout: 3s
retries: 5
# Vector Database
qdrant:
image: qdrant/qdrant:latest
container_name: phidata-qdrant
restart: unless-stopped
volumes:
- qdrant_data:/qdrant/storage
- ./qdrant/config:/qdrant/config
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:6333/health"]
interval: 30s
timeout: 10s
retries: 3
# Message Queue (RabbitMQ)
rabbitmq:
image: rabbitmq:3-management-alpine
container_name: phidata-rabbitmq
restart: unless-stopped
environment:
- RABBITMQ_DEFAULT_USER=${MQ_USER}
- RABBITMQ_DEFAULT_PASS=${MQ_PASSWORD}
volumes:
- rabbitmq_data:/var/lib/rabbitmq
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "ping"]
interval: 30s
timeout: 10s
retries: 3
# Monitoring - Prometheus
prometheus:
image: prom/prometheus:latest
container_name: phidata-prometheus
restart: unless-stopped
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
# Monitoring - Grafana
grafana:
image: grafana/grafana:latest
container_name: phidata-grafana
restart: unless-stopped
environment:
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
volumes:
- grafana_data:/var/lib/grafana
- ./monitoring/grafana/provisioning:/etc/grafana/provisioning
- ./monitoring/grafana/dashboards:/var/lib/grafana/dashboards
depends_on:
- prometheus
volumes:
postgres_data:
redis_data:
qdrant_data:
rabbitmq_data:
prometheus_data:
grafana_data:
agent_data:from typing import Dict, List, Any, Optional
import asyncio
import time
from dataclasses import dataclass
from phidata.agent import Agent
@dataclass
class AgentInstance:
agent: Agent
created_at: float
last_used: float
active_tasks: int
total_tasks: int
class AgentPoolManager:
"""Production-ready agent pool with scaling and monitoring."""
def __init__(self, agent_configs: List[Dict[str, Any]], max_pool_size: int = 50):
self.agent_configs = agent_configs
self.max_pool_size = max_pool_size
self.pool: Dict[str, List[AgentInstance]] = {}
self.active_agents = 0
self.task_queue = asyncio.Queue()
self.metrics = {
"total_agents_created": 0,
"total_tasks_processed": 0,
"avg_task_duration": 0,
"pool_hit_rate": 0
}
async def get_agent(self, agent_type: str) -> Agent:
"""Get an agent from the pool or create a new one."""
if agent_type not in self.pool:
self.pool[agent_type] = []
# Find available agent
available_agents = [
instance for instance in self.pool[agent_type]
if instance.active_tasks == 0
]
if available_agents:
# Use existing agent
agent_instance = available_agents[0]
agent_instance.last_used = time.time()
agent_instance.active_tasks += 1
return agent_instance.agent
# Check if we can create a new agent
if self.active_agents >= self.max_pool_size:
# Wait for an agent to become available
await self._wait_for_available_agent(agent_type)
# Create new agent
agent_config = next(
(config for config in self.agent_configs if config["type"] == agent_type),
None
)
if not agent_config:
raise ValueError(f"Unknown agent type: {agent_type}")
agent = Agent(
name=f"{agent_type}_{self.metrics['total_agents_created']}",
instructions=agent_config["instructions"],
model=agent_config["model"]
)
agent_instance = AgentInstance(
agent=agent,
created_at=time.time(),
last_used=time.time(),
active_tasks=1,
total_tasks=0
)
self.pool[agent_type].append(agent_instance)
self.active_agents += 1
self.metrics["total_agents_created"] += 1
return agent
async def release_agent(self, agent: Agent, agent_type: str):
"""Release an agent back to the pool."""
if agent_type in self.pool:
for instance in self.pool[agent_type]:
if instance.agent.name == agent.name:
instance.active_tasks = max(0, instance.active_tasks - 1)
instance.total_tasks += 1
break
async def _wait_for_available_agent(self, agent_type: str, timeout: float = 30.0):
"""Wait for an agent to become available."""
start_time = time.time()
while time.time() - start_time < timeout:
available_agents = [
instance for instance in self.pool.get(agent_type, [])
if instance.active_tasks == 0
]
if available_agents:
return
await asyncio.sleep(0.1) # Small delay
raise TimeoutError(f"No {agent_type} agent available within {timeout} seconds")
async def execute_task(self, agent_type: str, task: str) -> Dict[str, Any]:
"""Execute a task using the agent pool."""
start_time = time.time()
try:
# Get agent from pool
agent = await self.get_agent(agent_type)
# Execute task
result = agent.run(task)
# Calculate duration
duration = time.time() - start_time
# Update metrics
self.metrics["total_tasks_processed"] += 1
self._update_avg_duration(duration)
# Release agent
await self.release_agent(agent, agent_type)
return {
"success": True,
"result": result,
"duration": duration,
"agent_name": agent.name
}
except Exception as e:
duration = time.time() - start_time
return {
"success": False,
"error": str(e),
"duration": duration
}
def _update_avg_duration(self, new_duration: float):
"""Update rolling average task duration."""
current_avg = self.metrics["avg_task_duration"]
total_tasks = self.metrics["total_tasks_processed"]
if total_tasks == 1:
self.metrics["avg_task_duration"] = new_duration
else:
# Exponential moving average
alpha = 0.1
self.metrics["avg_task_duration"] = alpha * new_duration + (1 - alpha) * current_avg
def get_pool_stats(self) -> Dict[str, Any]:
"""Get pool statistics."""
stats = {
"total_agents": self.active_agents,
"max_pool_size": self.max_pool_size,
"pool_utilization": self.active_agents / self.max_pool_size if self.max_pool_size > 0 else 0,
"agent_types": {}
}
for agent_type, instances in self.pool.items():
type_stats = {
"count": len(instances),
"active": sum(1 for inst in instances if inst.active_tasks > 0),
"total_tasks": sum(inst.total_tasks for inst in instances)
}
stats["agent_types"][agent_type] = type_stats
stats.update(self.metrics)
return stats
async def cleanup_idle_agents(self, idle_timeout: float = 300.0):
"""Clean up idle agents to free resources."""
current_time = time.time()
cleaned_count = 0
for agent_type, instances in list(self.pool.items()):
active_instances = []
for instance in instances:
# Remove idle agents
if (instance.active_tasks == 0 and
current_time - instance.last_used > idle_timeout):
self.active_agents -= 1
cleaned_count += 1
continue
active_instances.append(instance)
self.pool[agent_type] = active_instances
return cleaned_count
# Define agent configurations
agent_configs = [
{
"type": "general",
"instructions": "You are a helpful general-purpose AI assistant.",
"model": "gpt-4"
},
{
"type": "coder",
"instructions": "You are an expert software developer specializing in Python.",
"model": "gpt-4"
},
{
"type": "analyst",
"instructions": "You are a data analyst skilled in interpreting data and providing insights.",
"model": "gpt-4"
},
{
"type": "writer",
"instructions": "You are a professional content writer creating engaging written material.",
"model": "gpt-4"
}
]
# Create agent pool
agent_pool = AgentPoolManager(agent_configs, max_pool_size=20)
# Example usage
async def demonstrate_agent_pool():
"""Demonstrate agent pool functionality."""
tasks = [
("general", "What is the capital of France?"),
("coder", "Write a Python function to calculate fibonacci numbers."),
("analyst", "Analyze this data: [1, 2, 3, 4, 5]"),
("writer", "Write a short paragraph about artificial intelligence."),
("general", "Explain quantum computing in simple terms.")
]
print("Agent Pool Demonstration:")
for agent_type, task in tasks:
print(f"\nExecuting {agent_type} task: {task[:50]}...")
result = await agent_pool.execute_task(agent_type, task)
if result["success"]:
print(f"✓ Completed in {result['duration']:.2f}s using {result['agent_name']}")
print(f"Result: {result['result'][:100]}...")
else:
print(f"✗ Failed: {result['error']}")
# Show pool statistics
stats = agent_pool.get_pool_stats()
print("
Pool Statistics:")
print(f"Total agents created: {stats['total_agents_created']}")
print(f"Tasks processed: {stats['total_tasks_processed']}")
print(f"Average task duration: {stats['avg_task_duration']:.2f}s")
print(f"Pool utilization: {stats['pool_utilization']:.1%}")
for agent_type, type_stats in stats["agent_types"].items():
print(f"{agent_type}: {type_stats['count']} agents, {type_stats['total_tasks']} tasks")
asyncio.run(demonstrate_agent_pool())from prometheus_client import Counter, Histogram, Gauge, generate_latest
import time
import psutil
class AgentMetrics:
"""Production monitoring for agent systems."""
def __init__(self):
# Agent metrics
self.agent_requests_total = Counter(
'phidata_agent_requests_total',
'Total agent requests',
['agent_type', 'model', 'status']
)
self.agent_request_duration = Histogram(
'phidata_agent_request_duration_seconds',
'Agent request duration',
['agent_type', 'model']
)
self.active_agents = Gauge(
'phidata_active_agents',
'Number of currently active agents',
['agent_type']
)
# Pool metrics
self.pool_size = Gauge(
'phidata_pool_size',
'Current pool size',
['pool_type']
)
self.pool_utilization = Gauge(
'phidata_pool_utilization_percent',
'Pool utilization percentage',
['pool_type']
)
# Task metrics
self.task_queue_size = Gauge(
'phidata_task_queue_size',
'Number of tasks in queue'
)
self.task_processing_time = Histogram(
'phidata_task_processing_time_seconds',
'Task processing time'
)
# System metrics
self.memory_usage = Gauge(
'phidata_memory_usage_bytes',
'Memory usage in bytes'
)
self.cpu_usage = Gauge(
'phidata_cpu_usage_percent',
'CPU usage percentage'
)
# Error metrics
self.errors_total = Counter(
'phidata_errors_total',
'Total errors',
['error_type', 'agent_type']
)
def record_agent_request(self, agent_type: str, model: str, duration: float, status: str = "success"):
"""Record agent request metrics."""
self.agent_requests_total.labels(agent_type, model, status).inc()
self.agent_request_duration.labels(agent_type, model).observe(duration)
def set_active_agents(self, agent_type: str, count: int):
"""Set active agent count."""
self.active_agents.labels(agent_type).set(count)
def set_pool_metrics(self, pool_type: str, size: int, utilization: float):
"""Set pool metrics."""
self.pool_size.labels(pool_type).set(size)
self.pool_utilization.labels(pool_type).set(utilization * 100)
def set_task_queue_size(self, size: int):
"""Set task queue size."""
self.task_queue_size.set(size)
def record_task_processing(self, duration: float):
"""Record task processing time."""
self.task_processing_time.observe(duration)
def record_error(self, error_type: str, agent_type: str = "unknown"):
"""Record error."""
self.errors_total.labels(error_type, agent_type).inc()
def update_system_metrics(self):
"""Update system resource metrics."""
# Memory usage
memory = psutil.virtual_memory()
self.memory_usage.set(memory.used)
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
self.cpu_usage.set(cpu_percent)
def get_metrics_text(self) -> str:
"""Get metrics in Prometheus format."""
return generate_latest().decode('utf-8')
# Global metrics instance
agent_metrics = AgentMetrics()
class MonitoredAgentPool(AgentPoolManager):
"""Agent pool with built-in monitoring."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metrics = agent_metrics
async def execute_task(self, agent_type: str, task: str) -> Dict[str, Any]:
"""Execute task with monitoring."""
start_time = time.time()
# Update metrics
self.metrics.set_task_queue_size(self.task_queue.qsize())
self.metrics.update_system_metrics()
try:
result = await super().execute_task(agent_type, task)
duration = time.time() - start_time
status = "success" if result["success"] else "error"
# Record metrics
agent_config = next(
(config for config in self.agent_configs if config["type"] == agent_type),
{}
)
model = agent_config.get("model", "unknown")
self.metrics.record_agent_request(agent_type, model, duration, status)
self.metrics.record_task_processing(duration)
# Update pool metrics
pool_stats = self.get_pool_stats()
self.metrics.set_pool_metrics("agent_pool", pool_stats["total_agents"],
pool_stats["pool_utilization"])
for agent_type_name, type_stats in pool_stats["agent_types"].items():
self.metrics.set_active_agents(agent_type_name, type_stats["active"])
if not result["success"]:
self.metrics.record_error("task_execution_failed", agent_type)
return result
except Exception as e:
duration = time.time() - start_time
self.metrics.record_error("unexpected_error", agent_type)
return {
"success": False,
"error": str(e),
"duration": duration
}
# Create monitored agent pool
monitored_pool = MonitoredAgentPool(agent_configs, max_pool_size=20)
# Example monitoring
async def demonstrate_monitoring():
"""Demonstrate monitoring capabilities."""
# Execute some tasks
tasks = [
("general", "Hello, how are you?"),
("coder", "Write a Python hello world function."),
("analyst", "Analyze this data: [1, 2, 3, 4, 5]"),
]
print("Executing tasks with monitoring...")
for agent_type, task in tasks:
result = await monitored_pool.execute_task(agent_type, task)
print(f"{agent_type} task: {'✓' if result['success'] else '✗'}")
# Show metrics
print("
Monitoring Metrics Sample:")
metrics_text = agent_metrics.get_metrics_text()
# Show first few lines
lines = metrics_text.split('\n')[:20]
print('\n'.join(lines))
print("... (truncated)")
asyncio.run(demonstrate_monitoring())from typing import Dict, List, Any, Optional
import jwt
import time
import hashlib
import hmac
from functools import wraps
class EnterpriseSecurityManager:
"""Enterprise-grade security for agent systems."""
def __init__(self, jwt_secret: str, api_keys: Dict[str, str] = None):
self.jwt_secret = jwt_secret
self.api_keys = api_keys or {}
self.rate_limits: Dict[str, Dict[str, Any]] = {}
self.audit_log: List[Dict[str, Any]] = []
def authenticate_jwt(self, token: str) -> Optional[Dict[str, Any]]:
"""Authenticate JWT token with comprehensive validation."""
try:
# Decode token
payload = jwt.decode(token, self.jwt_secret, algorithms=['HS256'])
# Validate required claims
required_claims = ['user_id', 'exp', 'iat']
for claim in required_claims:
if claim not in payload:
return None
# Check expiration
if payload.get('exp', 0) < time.time():
self._audit_event("token_expired", {"user_id": payload.get("user_id")})
return None
# Check if token was issued recently (not too old)
max_age = 24 * 60 * 60 # 24 hours
if time.time() - payload.get('iat', 0) > max_age:
self._audit_event("token_too_old", {"user_id": payload.get("user_id")})
return None
# Log successful authentication
self._audit_event("authentication_success", {
"user_id": payload.get("user_id"),
"roles": payload.get("roles", [])
})
return payload
except jwt.ExpiredSignatureError:
self._audit_event("token_expired_signature", {})
return None
except jwt.InvalidTokenError:
self._audit_event("token_invalid", {})
return None
def authenticate_api_key(self, api_key: str) -> Optional[str]:
"""Authenticate API key with rate limiting."""
# Hash the provided key for comparison
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
for user_id, stored_hash in self.api_keys.items():
if hmac.compare_digest(key_hash, stored_hash):
self._audit_event("api_key_auth_success", {"user_id": user_id})
return user_id
self._audit_event("api_key_auth_failed", {"key_hash": key_hash[:8]})
return None
def authorize_request(self, user_info: Dict[str, Any], required_permission: str,
resource: str = None) -> bool:
"""Advanced authorization with role-based and attribute-based access control."""
user_permissions = user_info.get('permissions', [])
user_roles = user_info.get('roles', [])
user_attributes = user_info.get('attributes', {})
# Role-based permissions
role_permissions = {
'admin': ['*'], # Admin has all permissions
'developer': ['agent:create', 'agent:execute', 'agent:read'],
'analyst': ['agent:execute', 'agent:read', 'data:read'],
'user': ['agent:execute:basic', 'agent:read:own']
}
# Collect all permissions
allowed_permissions = set(user_permissions)
for role in user_roles:
if role in role_permissions:
allowed_permissions.update(role_permissions[role])
# Check if user has required permission
if required_permission in allowed_permissions or '*' in allowed_permissions:
self._audit_event("authorization_success", {
"user_id": user_info.get("user_id"),
"permission": required_permission,
"resource": resource
})
return True
# Attribute-based access control (ABAC)
if self._check_abac(user_attributes, required_permission, resource):
self._audit_event("abac_authorization_success", {
"user_id": user_info.get("user_id"),
"permission": required_permission,
"resource": resource
})
return True
self._audit_event("authorization_denied", {
"user_id": user_info.get("user_id"),
"permission": required_permission,
"resource": resource
})
return False
def _check_abac(self, user_attributes: Dict[str, Any], permission: str, resource: str) -> bool:
"""Check attribute-based access control rules."""
# Example ABAC rules
department = user_attributes.get('department')
clearance_level = user_attributes.get('clearance_level', 0)
# Analysts can only access their department's data
if permission.startswith('data:read') and department:
if resource and f"dept:{department}" not in resource:
return False
# Higher clearance levels get more permissions
if permission == 'agent:execute:advanced' and clearance_level < 2:
return False
return True
def check_rate_limit(self, user_id: str, endpoint: str, max_requests: int = 100,
window: int = 60) -> bool:
"""Advanced rate limiting with burst handling."""
key = f"{user_id}:{endpoint}"
current_time = int(time.time())
if key not in self.rate_limits:
self.rate_limits[key] = {
'requests': [],
'blocked_until': 0,
'burst_count': 0
}
limit_data = self.rate_limits[key]
# Check if still blocked
if current_time < limit_data['blocked_until']:
self._audit_event("rate_limit_blocked", {"user_id": user_id, "endpoint": endpoint})
return False
# Clean old requests
window_start = current_time - window
limit_data['requests'] = [
req_time for req_time in limit_data['requests']
if req_time > window_start
]
# Check burst protection (allow short bursts)
burst_limit = max_requests // 10 # 10% of normal limit for bursts
recent_requests = [req for req in limit_data['requests'] if current_time - req < 10]
if len(recent_requests) >= burst_limit:
# Temporary block for burst protection
limit_data['blocked_until'] = current_time + 10
self._audit_event("burst_limit_triggered", {"user_id": user_id, "endpoint": endpoint})
return False
# Check normal rate limit
if len(limit_data['requests']) >= max_requests:
# Block for the window duration
limit_data['blocked_until'] = current_time + window
self._audit_event("rate_limit_exceeded", {"user_id": user_id, "endpoint": endpoint})
return False
# Add current request
limit_data['requests'].append(current_time)
return True
def _audit_event(self, event_type: str, details: Dict[str, Any]):
"""Log security audit event."""
audit_entry = {
"timestamp": time.time(),
"event_type": event_type,
"details": details,
"ip_address": "system" # In production, get from request
}
self.audit_log.append(audit_entry)
# Keep only last 10000 entries
if len(self.audit_log) > 10000:
self.audit_log = self.audit_log[-10000:]
def get_audit_log(self, user_id: Optional[str] = None, event_type: Optional[str] = None,
limit: int = 100) -> List[Dict[str, Any]]:
"""Get audit log entries with filtering."""
filtered_log = self.audit_log
if user_id:
filtered_log = [entry for entry in filtered_log
if entry["details"].get("user_id") == user_id]
if event_type:
filtered_log = [entry for entry in filtered_log
if entry["event_type"] == event_type]
return filtered_log[-limit:]
# Create enterprise security manager
security_manager = EnterpriseSecurityManager(
jwt_secret="your-super-secure-jwt-secret-change-in-production",
api_keys={
"user123": hashlib.sha256("prod-api-key-123".encode()).hexdigest(),
"admin456": hashlib.sha256("admin-key-456".encode()).hexdigest()
}
)
# Secure API endpoint decorator
def require_security(permission: str, rate_limit: bool = True):
"""Decorator for secure API endpoints."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Extract request (FastAPI pattern)
request = kwargs.get('request') or args[0] if args else None
if not request:
return {"error": "No request object"}
# Authenticate user
user_info = None
auth_header = request.headers.get('Authorization', '')
api_key = request.headers.get('X-API-Key')
if auth_header.startswith('Bearer '):
token = auth_header[7:]
user_info = security_manager.authenticate_jwt(token)
elif api_key:
user_id = security_manager.authenticate_api_key(api_key)
if user_id:
user_info = {"user_id": user_id, "roles": ["user"]}
if not user_info:
return {"error": "Authentication failed"}
# Check rate limit
if rate_limit:
endpoint = request.url.path
if not security_manager.check_rate_limit(user_info['user_id'], endpoint):
return {"error": "Rate limit exceeded"}
# Authorize request
if not security_manager.authorize_request(user_info, permission):
return {"error": "Insufficient permissions"}
# Add user info to request
request.state.user = user_info
return await func(*args, **kwargs)
return wrapper
return decorator
# Example secure FastAPI endpoint
from fastapi import FastAPI, Request
app = FastAPI(title="Secure Phidata API", version="1.0.0")
@app.post("/agent/execute")
@require_security("agent:execute")
async def execute_agent(request: Request, payload: Dict[str, Any]):
"""Secure agent execution endpoint."""
user = request.state.user
# Execute agent with user context
result = await monitored_pool.execute_task(
payload["agent_type"],
payload["task"]
)
# Log the execution
security_manager._audit_event("agent_execution", {
"user_id": user["user_id"],
"agent_type": payload["agent_type"],
"task_length": len(payload["task"])
})
return result
@app.get("/audit/log")
@require_security("audit:read")
async def get_audit_log(request: Request, user_id: Optional[str] = None, limit: int = 50):
"""Secure audit log access."""
user = request.state.user
# Only admins can see all logs, users can only see their own
if "admin" not in user.get("roles", []) and user_id != user["user_id"]:
user_id = user["user_id"]
audit_log = security_manager.get_audit_log(user_id=user_id, limit=limit)
return {"audit_log": audit_log}
@app.get("/security/status")
@require_security("security:read")
async def security_status(request: Request):
"""Security system status."""
user = request.state.user
# Only admins can see security status
if "admin" not in user.get("roles", []):
return {"error": "Admin access required"}
recent_audit = security_manager.get_audit_log(limit=10)
return {
"total_audit_entries": len(security_manager.audit_log),
"recent_events": len(recent_audit),
"rate_limits_active": len(security_manager.rate_limits)
}import asyncio
import time
import statistics
from typing import List, Dict, Any
import json
class ProductionBenchmarkSuite:
"""Comprehensive benchmarking for production agent systems."""
def __init__(self, agent_pool: AgentPoolManager):
self.agent_pool = agent_pool
async def run_comprehensive_benchmark(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Run comprehensive production benchmark suite."""
benchmark_results = {
"timestamp": time.time(),
"config": config,
"results": {},
"summary": {}
}
print("Starting Production Benchmark Suite...")
# 1. Latency Benchmark
print("Running latency benchmark...")
latency_results = await self.benchmark_latency(config.get("latency_tests", []))
benchmark_results["results"]["latency"] = latency_results
# 2. Throughput Benchmark
print("Running throughput benchmark...")
throughput_results = await self.benchmark_throughput(config.get("throughput_tests", []))
benchmark_results["results"]["throughput"] = throughput_results
# 3. Memory Usage Benchmark
print("Running memory benchmark...")
memory_results = await self.benchmark_memory_usage(config.get("memory_tests", []))
benchmark_results["results"]["memory"] = memory_results
# 4. Concurrent Load Test
print("Running concurrent load test...")
load_results = await self.benchmark_concurrent_load(config.get("load_tests", {}))
benchmark_results["results"]["load"] = load_results
# 5. Error Handling Benchmark
print("Running error handling benchmark...")
error_results = await self.benchmark_error_handling(config.get("error_tests", []))
benchmark_results["results"]["errors"] = error_results
# Generate summary
benchmark_results["summary"] = self.generate_summary(benchmark_results["results"])
print("Benchmark suite completed!")
return benchmark_results
async def benchmark_latency(self, test_cases: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Benchmark response latency."""
results = {}
for test_case in test_cases:
agent_type = test_case["agent_type"]
prompt = test_case["prompt"]
iterations = test_case.get("iterations", 10)
print(f" Testing {agent_type} latency...")
latencies = []
for i in range(iterations):
start_time = time.time()
result = await self.agent_pool.execute_task(agent_type, prompt)
end_time = time.time()
if result["success"]:
latencies.append(end_time - start_time)
if latencies:
results[agent_type] = {
"mean_latency": statistics.mean(latencies),
"median_latency": statistics.median(latencies),
"min_latency": min(latencies),
"max_latency": max(latencies),
"std_dev": statistics.stdev(latencies) if len(latencies) > 1 else 0,
"success_rate": len(latencies) / iterations,
"iterations": iterations
}
return results
async def benchmark_throughput(self, test_cases: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Benchmark system throughput."""
results = {}
for test_case in test_cases:
agent_type = test_case["agent_type"]
prompts = test_case["prompts"]
duration = test_case.get("duration", 60) # seconds
print(f" Testing {agent_type} throughput for {duration}s...")
start_time = time.time()
completed_tasks = 0
errors = 0
while time.time() - start_time < duration:
for prompt in prompts:
try:
result = await self.agent_pool.execute_task(agent_type, prompt)
if result["success"]:
completed_tasks += 1
else:
errors += 1
except:
errors += 1
total_time = time.time() - start_time
throughput = completed_tasks / total_time
results[agent_type] = {
"total_tasks": completed_tasks,
"errors": errors,
"duration": total_time,
"throughput_tps": throughput, # tasks per second
"error_rate": errors / (completed_tasks + errors) if (completed_tasks + errors) > 0 else 0
}
return results
async def benchmark_memory_usage(self, test_cases: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Benchmark memory usage patterns."""
import psutil
import os
results = {}
for test_case in test_cases:
agent_type = test_case["agent_type"]
prompts = test_case["prompts"]
print(f" Testing {agent_type} memory usage...")
# Get baseline memory
process = psutil.Process(os.getpid())
baseline_memory = process.memory_info().rss / 1024 / 1024 # MB
memory_readings = []
for prompt in prompts:
result = await self.agent_pool.execute_task(agent_type, prompt)
current_memory = process.memory_info().rss / 1024 / 1024
memory_readings.append(current_memory)
final_memory = process.memory_info().rss / 1024 / 1024
results[agent_type] = {
"baseline_memory_mb": baseline_memory,
"peak_memory_mb": max(memory_readings) if memory_readings else baseline_memory,
"final_memory_mb": final_memory,
"memory_increase_mb": final_memory - baseline_memory,
"memory_growth_percent": ((final_memory - baseline_memory) / baseline_memory * 100) if baseline_memory > 0 else 0
}
return results
async def benchmark_concurrent_load(self, load_config: Dict[str, Any]) -> Dict[str, Any]:
"""Benchmark concurrent load handling."""
num_concurrent = load_config.get("concurrent_users", 10)
duration = load_config.get("duration", 30)
agent_type = load_config.get("agent_type", "general")
prompt = load_config.get("prompt", "Hello, how are you?")
print(f" Testing {num_concurrent} concurrent users for {duration}s...")
async def simulate_user(user_id: int):
"""Simulate a single user making requests."""
user_results = {
"requests_made": 0,
"successful_requests": 0,
"total_latency": 0,
"errors": 0
}
start_time = time.time()
while time.time() - start_time < duration:
try:
result = await self.agent_pool.execute_task(agent_type, f"{prompt} (User {user_id})")
user_results["requests_made"] += 1
if result["success"]:
user_results["successful_requests"] += 1
user_results["total_latency"] += result.get("duration", 0)
else:
user_results["errors"] += 1
except Exception as e:
user_results["errors"] += 1
break
return user_results
# Run concurrent users
tasks = [simulate_user(i) for i in range(num_concurrent)]
user_results = await asyncio.gather(*tasks)
# Aggregate results
total_requests = sum(r["requests_made"] for r in user_results)
successful_requests = sum(r["successful_requests"] for r in user_results)
total_errors = sum(r["errors"] for r in user_results)
total_latency = sum(r["total_latency"] for r in user_results)
avg_latency = total_latency / successful_requests if successful_requests > 0 else 0
return {
"concurrent_users": num_concurrent,
"duration": duration,
"total_requests": total_requests,
"successful_requests": successful_requests,
"errors": total_errors,
"success_rate": successful_requests / total_requests if total_requests > 0 else 0,
"avg_latency": avg_latency,
"requests_per_second": total_requests / duration,
"user_results": user_results
}
async def benchmark_error_handling(self, error_tests: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Benchmark error handling and recovery."""
results = {}
for test_case in error_tests:
test_name = test_case["name"]
agent_type = test_case["agent_type"]
error_prompts = test_case["error_prompts"]
iterations = test_case.get("iterations", 5)
print(f" Testing {test_name} error handling...")
error_stats = {
"total_tests": iterations * len(error_prompts),
"successful_handling": 0,
"error_types": {},
"recovery_times": []
}
for _ in range(iterations):
for error_prompt in error_prompts:
start_time = time.time()
try:
result = await self.agent_pool.execute_task(agent_type, error_prompt)
if not result["success"]:
# Check if error was handled gracefully
if "error" in result and len(result["error"]) > 0:
error_stats["successful_handling"] += 1
# Categorize error
error_msg = result.get("error", "unknown")
error_type = self.categorize_error(error_msg)
error_stats["error_types"][error_type] = error_stats["error_types"].get(error_type, 0) + 1
except Exception as e:
error_type = self.categorize_error(str(e))
error_stats["error_types"][error_type] = error_stats["error_types"].get(error_type, 0) + 1
recovery_time = time.time() - start_time
error_stats["recovery_times"].append(recovery_time)
error_stats["avg_recovery_time"] = statistics.mean(error_stats["recovery_times"])
error_stats["error_handling_rate"] = error_stats["successful_handling"] / error_stats["total_tests"]
results[test_name] = error_stats
return results
def categorize_error(self, error_msg: str) -> str:
"""Categorize error types."""
error_msg = error_msg.lower()
if "timeout" in error_msg or "time" in error_msg:
return "timeout"
elif "rate limit" in error_msg or "quota" in error_msg:
return "rate_limit"
elif "authentication" in error_msg or "authorization" in error_msg:
return "auth"
elif "network" in error_msg or "connection" in error_msg:
return "network"
elif "memory" in error_msg or "out of memory" in error_msg:
return "resource"
else:
return "other"
def generate_summary(self, results: Dict[str, Any]) -> Dict[str, Any]:
"""Generate comprehensive benchmark summary."""
summary = {
"overall_health": "unknown",
"performance_score": 0,
"recommendations": []
}
# Analyze latency
if "latency" in results:
avg_latencies = [
agent_results["mean_latency"]
for agent_results in results["latency"].values()
]
if avg_latencies:
overall_avg_latency = statistics.mean(avg_latencies)
summary["avg_latency"] = overall_avg_latency
if overall_avg_latency < 2.0:
summary["latency_score"] = "excellent"
elif overall_avg_latency < 5.0:
summary["latency_score"] = "good"
elif overall_avg_latency < 10.0:
summary["latency_score"] = "acceptable"
else:
summary["latency_score"] = "poor"
summary["recommendations"].append("High latency detected. Consider optimizing model or infrastructure.")
# Analyze throughput
if "throughput" in results:
throughputs = [
agent_results["throughput_tps"]
for agent_results in results["throughput"].values()
]
if throughputs:
avg_throughput = statistics.mean(throughputs)
summary["avg_throughput"] = avg_throughput
if avg_throughput > 10:
summary["throughput_score"] = "excellent"
elif avg_throughput > 5:
summary["throughput_score"] = "good"
elif avg_throughput > 1:
summary["throughput_score"] = "acceptable"
else:
summary["throughput_score"] = "poor"
summary["recommendations"].append("Low throughput detected. Consider scaling infrastructure.")
# Analyze errors
if "errors" in results:
error_rates = []
for test_results in results["errors"].values():
error_handling_rate = test_results["error_handling_rate"]
error_rates.append(error_handling_rate)
if error_rates:
avg_error_handling = statistics.mean(error_rates)
summary["error_handling_rate"] = avg_error_handling
if avg_error_handling > 0.9:
summary["error_score"] = "excellent"
elif avg_error_handling > 0.7:
summary["error_score"] = "good"
else:
summary["error_score"] = "needs_improvement"
summary["recommendations"].append("Error handling could be improved. Consider adding more robust error recovery.")
# Overall health assessment
scores = []
if "latency_score" in summary:
score_map = {"excellent": 4, "good": 3, "acceptable": 2, "poor": 1}
scores.append(score_map.get(summary["latency_score"], 2))
if "throughput_score" in summary:
scores.append(score_map.get(summary["throughput_score"], 2))
if "error_score" in summary:
scores.append(score_map.get(summary["error_score"], 2))
if scores:
avg_score = statistics.mean(scores)
summary["performance_score"] = avg_score
if avg_score >= 3.5:
summary["overall_health"] = "excellent"
elif avg_score >= 2.5:
summary["overall_health"] = "good"
elif avg_score >= 1.5:
summary["overall_health"] = "acceptable"
else:
summary["overall_health"] = "needs_attention"
return summary
# Run comprehensive benchmark
benchmark_config = {
"latency_tests": [
{"agent_type": "general", "prompt": "Hello!", "iterations": 5},
{"agent_type": "coder", "prompt": "Write a function", "iterations": 5}
],
"throughput_tests": [
{
"agent_type": "general",
"prompts": ["Hi", "Hello", "Hey"],
"duration": 10
}
],
"memory_tests": [
{"agent_type": "general", "prompts": ["Test"] * 10}
],
"load_tests": {
"concurrent_users": 5,
"duration": 15,
"agent_type": "general",
"prompt": "Quick test"
},
"error_tests": [
{
"name": "invalid_requests",
"agent_type": "general",
"error_prompts": ["", "This prompt is way too long" * 1000],
"iterations": 3
}
]
}
benchmark_suite = ProductionBenchmarkSuite(monitored_pool)
# Run benchmarks
async def run_benchmarks():
results = await benchmark_suite.run_comprehensive_benchmark(benchmark_config)
print(f"\nBenchmark Summary:")
print(f"Overall Health: {results['summary']['overall_health']}")
print(f"Performance Score: {results['summary']['performance_score']:.2f}")
if results['summary']['recommendations']:
print("Recommendations:")
for rec in results['summary']['recommendations']:
print(f" - {rec}")
# Save detailed results
with open("production_benchmark_results.json", "w") as f:
json.dump(results, f, indent=2, default=str)
print("Detailed results saved to production_benchmark_results.json")
asyncio.run(run_benchmarks())This comprehensive production deployment chapter covers enterprise-scale infrastructure, monitoring, security, and performance optimization for Phidata agent systems. The implementation provides production-ready scalability and reliability. 🚀
# Deploy to Kubernetes
kubectl apply -f k8s/
# Or use Docker Compose
docker-compose -f docker-compose.prod.yml up -d
# Check health
curl https://agents.company.com/health
# Monitor metrics
curl https://agents.company.com/metrics
# Run benchmarks
python production_benchmarks.pyThis completes the comprehensive Phidata production deployment guide.
Treat this chapter as the production baseline for agent workloads:
- pin model/provider configs by environment and rotate keys on schedule
- enforce per-tenant rate limits and workload isolation for shared clusters
- alert on token-cost spikes, latency regressions, and downstream tool failures
- run disaster recovery drills for vector stores, session stores, and agent memory backends
- maintain benchmark baselines and rerun after runtime, model, or prompt-stack changes
With these operational controls, Phidata deployments stay predictable under real production load.
Most teams struggle here because the hard part is not writing more code, but deciding clear boundaries for self, agent_type, time so behavior stays predictable as complexity grows.
In practical terms, this chapter helps you avoid three common failures:
- coupling core logic too tightly to one implementation path
- missing the handoff boundaries between setup, execution, and validation
- shipping changes without clear rollback or observability strategy
After working through this chapter, you should be able to reason about Chapter 8: Production Deployment & Scaling Phidata Agents as an operating subsystem inside Phidata Tutorial: Building Autonomous AI Agents, with explicit contracts for inputs, state transitions, and outputs.
Use the implementation notes around agent, user_id, duration as your checklist when adapting these patterns to your own repository.
Under the hood, Chapter 8: Production Deployment & Scaling Phidata Agents usually follows a repeatable control path:
- Context bootstrap: initialize runtime config and prerequisites for
self. - Input normalization: shape incoming data so
agent_typereceives stable contracts. - Core execution: run the main logic branch and propagate intermediate state through
time. - Policy and safety checks: enforce limits, auth scopes, and failure boundaries.
- Output composition: return canonical result payloads for downstream consumers.
- Operational telemetry: emit logs/metrics needed for debugging and performance tuning.
When debugging, walk this sequence in order and confirm each stage has explicit success/failure conditions.
Use the following upstream sources to verify implementation details while reading this chapter:
- View Repo
Why it matters: authoritative reference on
View Repo(github.com).
Suggested trace strategy:
- search upstream code for
selfandagent_typeto map concrete implementation paths - compare docs claims against actual runtime/config code before reusing patterns in production