Difficulty: Advanced Status: ✅ Complete Prerequisites: Complete 04_execution first
This section demonstrates distributed execution in Graflow - how to scale workflows across multiple workers using Redis as a coordination backend. Distributed execution allows you to:
- Process tasks across multiple machines
- Scale horizontally by adding workers
- Handle long-running workflows
- Implement reliable task processing
- 🔴 Redis-based task queues
- 👷 Worker processes and task execution
- 🌐 Distributed workflow coordination
- 📡 Redis channels for inter-process communication
- ⚙️ Worker pool management
Concept: Worker process setup
Learn how to create and manage worker processes that execute tasks from Redis queues.
uv run python examples/05_distributed/redis_worker.pyKey Concepts:
- TaskWorker initialization
- Worker lifecycle management
- Task dequeuing and execution
- Worker graceful shutdown
Concept: Complete distributed workflow
Build a complete ETL workflow that executes across multiple workers.
uv run python examples/05_distributed/distributed_workflow.pyKey Concepts:
- Distributed workflow setup
- Multiple worker coordination
- Task result sharing via Redis
- Complete pipeline execution
┌─────────────┐
│ Main │ 1. Enqueues tasks
│ Process │ 2. Monitors completion
└──────┬──────┘
│
│ Redis Queue
▼
┌──────────────────────────────┐
│ Redis Server │
│ - Task Queue (FIFO) │
│ - Result Channel │
│ - Coordination State │
└──────────────────────────────┘
│ ▲
│ │
▼ │
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Worker │ │ Worker │ │ Worker │
│ 1 │ │ 2 │ │ 3 │
└─────────┘ └─────────┘ └─────────┘
Dequeue Execute Store
tasks tasks results
Main Process:
- Creates ExecutionContext with Redis backend
- Enqueues tasks to Redis queue
- Monitors workflow execution
- Retrieves final results
Redis Server:
- Stores task queue (FIFO order)
- Stores execution results
- Coordinates worker communication
- Provides atomic operations
Worker Processes:
- Poll Redis queue for tasks
- Execute task functions
- Store results back to Redis
- Handle errors gracefully
from graflow.core.context import ExecutionContext
from graflow.queue.factory import QueueBackend
import redis
# Create Redis client
redis_client = redis.Redis(
host='localhost',
port=6379,
decode_responses=True
)
# Create context with Redis backend
context = ExecutionContext.create(
graph=graph,
start_node="start",
queue_backend=QueueBackend.REDIS,
channel_backend="redis",
config={"redis_client": redis_client}
)from graflow.worker.worker import TaskWorker
# Create worker
worker = TaskWorker(
redis_host='localhost',
redis_port=6379,
worker_id="worker-1"
)
# Start processing
worker.start() # Blocks until stoppedUsing Docker:
docker run -p 6379:6379 redis:7.2Using Homebrew (macOS):
brew services start redisUsing apt (Ubuntu/Debian):
sudo service redis-server startIn separate terminals:
# Terminal 1
uv run python -m graflow.worker.main --worker-id worker-1
# Terminal 2
uv run python -m graflow.worker.main --worker-id worker-2
# Terminal 3
uv run python -m graflow.worker.main --worker-id worker-3uv run python examples/05_distributed/distributed_workflow.pywith workflow("fanout") as ctx:
@task
def split_data():
return {"batches": [1, 2, 3, 4, 5]}
@task
def process_batch(batch_id: int):
# Each batch processed by different worker
return f"batch_{batch_id}_processed"
# Create parallel tasks
split_data >> (process_batch | process_batch | process_batch)with workflow("mapreduce") as ctx:
@task
def map_task(data: list):
# Distribute processing
return [process(item) for item in data]
@task
def reduce_task(results: list):
# Aggregate results
return sum(results)
map_task >> reduce_taskwith workflow("pipeline") as ctx:
@task
def extract():
return {"data": "raw"}
@task
def transform(data: dict):
return {"data": "transformed"}
@task
def load(data: dict):
return "loaded"
extract >> transform >> load
# Each task can execute on different worker- ✅ Add more workers to handle increased load
- ✅ Process tasks in parallel across machines
- ✅ No single point of bottleneck
- ✅ Workers can fail and restart
- ✅ Tasks can be retried on failure
- ✅ Redis provides persistent storage
- ✅ Workers can run on different machines
- ✅ Scale workers independently
- ✅ Mix local and remote execution
- Redis communication adds overhead (~1-5ms per operation)
- Use local Redis for development
- Use remote Redis for production
- Too small: Network overhead dominates
- Too large: Poor parallelization
- Just right: Balance network vs computation
- Too few: Underutilized capacity
- Too many: Redis contention
- Optimal: Match available CPU cores
-
Use appropriate task granularity
# Good: Meaningful work unit @task def process_batch(items: list): return [heavy_computation(item) for item in items]
-
Handle worker failures
# Implement retry logic @task def reliable_task(): try: return risky_operation() except Exception as e: log_error(e) raise # Will be retried
-
Monitor worker health
# Use worker heartbeats worker = TaskWorker( redis_host='localhost', heartbeat_interval=5 # seconds )
-
Don't create too many small tasks
# Bad: Too much overhead for i in range(10000): tiny_task(i) # Each task ~1-5ms overhead
-
Don't assume task execution order
# Bad: Race condition task1() # Executed by worker-1 task2() # Might execute before task1 completes # Use >> operator for dependencies
-
Don't store large objects in Redis
# Bad: 100MB dataset return huge_dataset # Good: Store reference save_to_s3(huge_dataset, "key") return {"s3_key": "key"}
Error: Error connecting to Redis at localhost:6379
Solution: Ensure Redis is running
# Check if Redis is running
redis-cli ping # Should return PONG
# Start Redis if not running
docker run -p 6379:6379 redis:7.2Workers idle, tasks in queue
Solution: Check worker registration
# Verify workers are registered
redis-cli SMEMBERS graflow:workersError: cannot pickle 'module' object
Solution: Use cloudpickle-compatible code
# Import inside task, not at module level
@task
def my_task():
import numpy as np # Inside task
return np.array([1, 2, 3])Error: OOM command not allowed when used memory > 'maxmemory'
Solution: Configure Redis maxmemory policy
# In redis.conf
maxmemory 1gb
maxmemory-policy allkeys-lruRoute specific tasks to specific workers:
@task(handler="redis", worker_pool="gpu")
def gpu_task():
# Only executed by GPU workers
passImplement priority-based task execution:
# High priority tasks
queue.enqueue(task, priority=10)
# Low priority tasks
queue.enqueue(task, priority=1)Set TTL for task results:
context.set_result(
task_id,
result,
ttl=3600 # 1 hour
)# Monitor Redis operations
redis-cli MONITOR
# Check queue size
redis-cli LLEN graflow:queue
# Check worker count
redis-cli SCARD graflow:workers# Access worker metrics
worker.get_metrics()
# Returns: {
# "tasks_processed": 100,
# "tasks_failed": 2,
# "uptime_seconds": 3600
# }After mastering distributed execution:
- 06_advanced: Learn cycles, dynamic tasks, error handling
- 07_real_world: Apply concepts to real-world problems
ExecutionContext:
ExecutionContext.create(..., queue_backend=QueueBackend.REDIS)ExecutionContext.create(..., channel_backend="redis")
TaskWorker:
TaskWorker(redis_host, redis_port, worker_id)worker.start()- Start processing tasksworker.stop()- Graceful shutdown
Redis Configuration:
config={"redis_client": redis.Redis(...)}
Ready to scale workflows? Start with redis_basics.py! 🚀