Go from a single-process workflow to a multi-worker distributed system in four steps. Each step adds one capability by setting an environment variable -- your workflow code never changes.
- Python 3.10+
- Docker (for PostgreSQL and Redis in Steps 2-4)
pip install kailash
Run a workflow with no configuration at all. Kailash uses SQLite and in-memory stores by default.
Create app.py:
from kailash import WorkflowBuilder, LocalRuntime
# Build a workflow that processes text
builder = WorkflowBuilder()
builder.add_node("PythonCodeNode", "transform", {
"code": "output = text.upper().replace(' ', '_')",
"inputs": {"text": "str"},
"output_type": "str",
})
wf = builder.build()
# Execute
with LocalRuntime() as runtime:
results, run_id = runtime.execute(wf, parameters={
"transform": {"text": "hello world"}
})
print(f"Result: {results['transform']}")
print(f"Run ID: {run_id}")Run it:
python app.py
# Result: {'output': 'HELLO_WORLD'}
# Run ID: abc123...No environment variables, no databases, no configuration files. Everything works.
Start a PostgreSQL instance and set one environment variable. Your workflow code is unchanged -- stores automatically switch to database-backed persistence.
docker run -d \
--name kailash-pg \
-e POSTGRES_USER=kailash \
-e POSTGRES_PASSWORD=kailash \
-e POSTGRES_DB=kailash \
-p 5432:5432 \
postgres:16The PostgreSQL driver (asyncpg) is included in the base pip install kailash.
export KAILASH_DATABASE_URL=postgresql://kailash:kailash@localhost:5432/kailashpython app.py
# Result: {'output': 'HELLO_WORLD'}
# Run ID: def456...The output is identical. Behind the scenes, events, checkpoints, execution metadata, and the dead letter queue now persist to PostgreSQL. Restart the process and the data survives.
Connect to PostgreSQL and check:
docker exec -it kailash-pg psql -U kailash -c "SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_name LIKE 'kailash_%';"You should see: kailash_meta, kailash_events, kailash_checkpoints, kailash_dlq, kailash_executions, kailash_idempotency.
Add a Redis instance and set a second environment variable. Tasks are now distributed across workers.
docker run -d \
--name kailash-redis \
-p 6379:6379 \
redis:7export KAILASH_QUEUE_URL=redis://localhost:6379/0Create producer.py:
import asyncio
from kailash.infrastructure import create_task_queue
async def main():
queue = await create_task_queue() # auto-detects from KAILASH_QUEUE_URL
for i in range(5):
task_id = await queue.enqueue(
payload={
"workflow_id": "text-transform",
"code": "output = text.upper().replace(' ', '_')",
"inputs": {"text": "str"},
"output_type": "str",
"parameters": {"transform": {"text": f"message number {i}"}},
},
queue_name="default",
max_attempts=3,
)
print(f"Enqueued task {task_id}")
stats = await queue.get_stats()
print(f"Queue stats: {stats}")
asyncio.run(main())Create worker.py:
import asyncio
from kailash import WorkflowBuilder, LocalRuntime
from kailash.infrastructure import create_task_queue
async def run_worker(worker_id: str):
queue = await create_task_queue()
runtime = LocalRuntime() # Don't forget to call runtime.close() when done
print(f"[{worker_id}] Worker started, polling for tasks...")
while True:
task = await queue.dequeue(queue_name="default", worker_id=worker_id)
if task is None:
await asyncio.sleep(1)
continue
print(f"[{worker_id}] Processing task {task.task_id}")
try:
builder = WorkflowBuilder()
builder.add_node("PythonCodeNode", "transform", {
"code": task.payload["code"],
"inputs": task.payload["inputs"],
"output_type": task.payload["output_type"],
})
wf = builder.build()
results, run_id = runtime.execute(
wf, parameters=task.payload.get("parameters", {})
)
await queue.complete(task.task_id)
print(f"[{worker_id}] Task {task.task_id} completed: {results}")
except Exception as e:
await queue.fail(task.task_id, error=str(e))
print(f"[{worker_id}] Task {task.task_id} failed: {e}")
await queue.requeue_stale()
asyncio.run(run_worker("worker-1"))Terminal 1 -- start the worker:
python worker.py
# [worker-1] Worker started, polling for tasks...Terminal 2 -- submit tasks:
python producer.py
# Enqueued task abc-123
# Enqueued task def-456
# ...The worker picks up tasks and processes them:
[worker-1] Processing task abc-123
[worker-1] Task abc-123 completed: {'transform': {'output': 'MESSAGE_NUMBER_0'}}
[worker-1] Processing task def-456
[worker-1] Task def-456 completed: {'transform': {'output': 'MESSAGE_NUMBER_1'}}
Start more workers in additional terminals to process tasks in parallel.
Wrap your runtime with IdempotentExecutor to ensure exactly-once execution. Duplicate requests with the same idempotency key return cached results.
Create idempotent_app.py:
import asyncio
from kailash import WorkflowBuilder, LocalRuntime
from kailash.infrastructure import StoreFactory, IdempotentExecutor
async def main():
# Create the idempotency store
factory = StoreFactory() # auto-detects from KAILASH_DATABASE_URL
idempotency_store = await factory.create_idempotency_store()
if idempotency_store is None:
print("Idempotency requires KAILASH_DATABASE_URL to be set")
return
executor = IdempotentExecutor(idempotency_store, ttl_seconds=3600)
# Build a workflow
builder = WorkflowBuilder()
builder.add_node("PythonCodeNode", "transform", {
"code": "output = text.upper()",
"inputs": {"text": "str"},
"output_type": "str",
})
wf = builder.build()
with LocalRuntime() as runtime:
# First call -- executes the workflow
print("Call 1 (fresh execution):")
results, run_id = await executor.execute(
runtime, wf,
parameters={"transform": {"text": "hello"}},
idempotency_key="unique-request-001",
)
print(f" Results: {results}")
print(f" Run ID: {run_id}")
# Second call with same key -- returns cached result
print("\nCall 2 (cached, no re-execution):")
results2, run_id2 = await executor.execute(
runtime, wf,
parameters={"transform": {"text": "hello"}},
idempotency_key="unique-request-001",
)
print(f" Results: {results2}")
print(f" Run ID: {run_id2}")
print(f"\nSame result? {results == results2}")
await factory.close()
asyncio.run(main())Run it (with KAILASH_DATABASE_URL still set):
python idempotent_app.py
# Call 1 (fresh execution):
# Results: {'transform': {'output': 'HELLO'}}
# Run ID: abc123...
#
# Call 2 (cached, no re-execution):
# Results: {'transform': {'output': 'HELLO'}}
# Run ID: abc123...
#
# Same result? TrueThe second call returns instantly from cache. No workflow re-execution.
Before deploying to production:
- PostgreSQL: Use a managed instance (AWS RDS, Cloud SQL, etc.) with connection pooling
- Redis: Use a managed instance (ElastiCache, Memorystore, etc.) with TLS (
rediss://) - Credentials: Store
KAILASH_DATABASE_URLandKAILASH_QUEUE_URLin your secret manager (not in code or.envfiles committed to git) - Workers: Run multiple worker processes behind a process manager (systemd, supervisord, Kubernetes)
- Stale task recovery: Run
queue.requeue_stale()periodically (every 60s) from each worker or a dedicated supervisor - Idempotency cleanup: Run
idempotency_store.cleanup()periodically to purge expired entries - Monitoring: Track queue depth (
queue.get_stats()) and DLQ growth (dlq.get_stats()) with your metrics system - Backups: Standard PostgreSQL backup strategy for the
kailash_*tables
Stop the Docker containers:
docker rm -f kailash-pg kailash-redisUnset environment variables:
unset KAILASH_DATABASE_URL
unset KAILASH_QUEUE_URL- Progressive Infrastructure Overview -- architecture and design principles
- Store Backends Reference -- table schemas and API details
- Task Queue Reference -- Redis vs SQL queue comparison
- Idempotency Reference -- exactly-once execution details
- Migration Guide -- upgrading between levels