Comprehensive workflow orchestration system using Temporal.io for durable execution of multi-stage workflows involving multiple agents.
- Overview
- Architecture
- Installation
- Quick Start
- Workflow Core
- Workflow Activities
- Workflow Patterns
- Pre-built Workflows
- Workflow Management
- API Reference
- Integration
- Temporal Setup
- Testing
- Known Limitations
The Workflow Engine provides:
- Durable Execution: Workflows survive process restarts and failures
- Multi-Agent Orchestration: Coordinate multiple AI agents in complex workflows
- Temporal.io Integration: Production-grade workflow orchestration
- Lightweight Mode: In-memory engine for development and testing
- Pre-built Workflows: Ready-to-use workflows for common scenarios
- Real-time Monitoring: Track workflow progress via WebSocket events
┌─────────────────────────────────────────────────────────────────┐
│ API Gateway Layer │
│ FastAPI Workflow API │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Workflow Management Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ Workflow │ │ Workflow │ │ Workflow │ │
│ │ Manager │ │ Scheduler │ │ Monitor │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Workflow Engine Layer │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ WorkflowEngine │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────┐ │ │
│ │ │ Temporal │ │ Lightweight│ │ Integration │ │ │
│ │ │ Client │ │ Engine │ │ Components │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────────┘ │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Workflow Definitions Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ Patterns │ │ Pre-built │ │ Custom Workflows │ │
│ │ │ │ Workflows │ │ │ │
│ │ - Sequential│ │ - Document │ │ (User Defined) │ │
│ │ - Parallel │ │ - Research │ │ │ │
│ │ - Conditional│ │ - Content │ │ │ │
│ │ - Pipeline │ │ - Code │ │ │ │
│ │ - Stateful │ │ - Book │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Activities Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ Agent │ │ Tool │ │ Validation │ │
│ │ Activity │ │ Activity │ │ Activity │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ Parallel │ │ Sequential │ │ Conditional │ │
│ │ Activity │ │ Activity │ │ Activity │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Integration Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ Agent │ │ Knowledge │ │ WebSocket │ │
│ │ Integration │ │ Integration │ │ Integration │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Add to pyproject.toml:
[tool.poetry.dependencies]
temporalio = "^1.4.0"
croniter = "^2.0.0"Install:
cd backend
poetry installDevelopment (Docker):
docker run --rm --name temporal \
-p 7233:7233 \
temporalio/auto-setup:latestProduction:
See Temporal Setup Instructions
Add to .env:
# Temporal Configuration
TEMPORAL_HOST=localhost
TEMPORAL_PORT=7233
TEMPORAL_TASK_QUEUE=workflow-queuefrom app.workflow import WorkflowEngine, DocumentAnalysisWorkflow
# Initialize engine
engine = WorkflowEngine(
temporal_host="localhost",
temporal_port=7233,
task_queue="workflow-queue",
)
# Connect to Temporal
await engine.connect()
# Register workflow
engine.register_workflow(DocumentAnalysisWorkflow)
# Start worker (in a separate task)
asyncio.create_task(engine.start_worker())
# Execute workflow
run_id = await engine.execute_workflow(
workflow_id="doc-analysis-001",
workflow_type="DocumentAnalysisWorkflow",
input_data={
"document_id": "doc-123",
"document_path": "/path/to/document.pdf",
"analysis_config": {"extract_entities": True},
},
)
print(f"Workflow started with run_id: {run_id}")from app.workflow import WorkflowEngine, WorkflowManager
# Initialize
engine = WorkflowEngine()
await engine.connect()
manager = WorkflowManager(engine)
# Register workflow definition
from app.workflow import WorkflowDefinition
definition = WorkflowDefinition(
name="Document Analysis",
description="Analyze documents",
tasks=[...],
)
manager.register_definition(definition)
# Create and start workflow
execution = await manager.create_workflow(
workflow_type="DocumentAnalysisWorkflow",
input_data={"document_id": "doc-123"},
)
run_id = await manager.start_workflow(execution.execution_id)from app.workflow.lightweight import LightweightWorkflowEngine
# Create engine (no Temporal required)
engine = LightweightWorkflowEngine()
# Register workflow function
async def my_workflow(input_data):
return {"result": "completed"}
engine.register_workflow("my_workflow", my_workflow)
# Execute
execution_id = await engine.execute(
workflow_type="my_workflow",
input_data={"key": "value"},
)
# Get status
status = await engine.get_status(execution_id)
print(f"Status: {status.status.value}")Main orchestration engine for Temporal.io workflows.
class WorkflowEngine:
"""Main orchestration engine."""
def __init__(
self,
temporal_host: str = "localhost",
temporal_port: int = 7233,
task_queue: str = "workflow-queue",
) -> None:
...
async def connect(self) -> None:
"""Connect to Temporal server."""
...
def register_workflow(self, workflow_class: type) -> None:
"""Register a workflow definition."""
...
async def execute_workflow(
self,
workflow_id: str,
workflow_type: str,
input_data: dict[str, Any],
) -> str:
"""Execute a workflow."""
...Workflow structure and configuration.
@dataclass
class WorkflowDefinition:
"""Workflow structure and configuration."""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
description: Optional[str] = None
version: str = "1.0.0"
tasks: list[dict[str, Any]] = field(default_factory=list)
edges: list[dict[str, Any]] = field(default_factory=list)
config: dict[str, Any] = field(default_factory=dict)
retry_policy: Optional[dict[str, Any]] = None
timeout: Optional[int] = NoneState management for workflows.
@dataclass
class WorkflowState:
"""State management for workflows."""
workflow_id: str
definition_id: str
status: WorkflowStatus
priority: WorkflowPriority
input_data: dict[str, Any]
output_data: dict[str, Any]
current_task: Optional[str]
completed_tasks: list[str]
failed_tasks: list[str]
progress: int
error_message: Optional[str]class WorkflowStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
RETRYING = "retrying"Activities are the building blocks of workflows.
Execute a single agent task.
@activity.defn
async def agent_activity(input_data: dict[str, Any]) -> dict[str, Any]:
"""Execute a single agent task.
Input:
- agent_id: Agent ID to execute
- task_type: Type of task
- payload: Task payload
Output:
- status: Execution status
- output_data: Agent result
"""Execute multiple activities in parallel.
@activity.defn
async def parallel_activity(input_data: dict[str, Any]) -> dict[str, Any]:
"""Execute multiple activities in parallel.
Input:
- activities: List of activity definitions
- fail_fast: Whether to fail on first error
"""Validate activity outputs.
@activity.defn
async def validation_activity(input_data: dict[str, Any]) -> dict[str, Any]:
"""Validate activity outputs.
Input:
- data: Data to validate
- schema: Validation schema
- rules: Validation rules
"""Linear task execution.
from app.workflow import SequentialWorkflow
engine.register_workflow(SequentialWorkflow)
# Execute
await engine.execute_workflow(
workflow_id="seq-001",
workflow_type="SequentialWorkflow",
input_data={
"tasks": [
{"id": "task1", "type": "agent_activity", "config": {...}},
{"id": "task2", "type": "agent_activity", "config": {...}},
],
"initial_data": {...},
},
)Fan-out/fan-in pattern.
from app.workflow import ParallelWorkflow
engine.register_workflow(ParallelWorkflow)
# Execute
await engine.execute_workflow(
workflow_id="parallel-001",
workflow_type="ParallelWorkflow",
input_data={
"tasks": [...],
"aggregation_strategy": "all",
"fail_fast": False,
},
)Branching based on conditions.
from app.workflow import ConditionalWorkflow
engine.register_workflow(ConditionalWorkflow)
# Execute
await engine.execute_workflow(
workflow_id="conditional-001",
workflow_type="ConditionalWorkflow",
input_data={
"conditions": [
{"name": "high_priority", "expression": "priority > 5"},
],
"branches": {
"high_priority": {"type": "sequential", "tasks": [...]},
},
"default_branch": {...},
"context": {"priority": 8},
},
)Data pipeline pattern.
from app.workflow import PipelineWorkflow
engine.register_workflow(PipelineWorkflow)
# Execute
await engine.execute_workflow(
workflow_id="pipeline-001",
workflow_type="PipelineWorkflow",
input_data={
"stages": [
{"id": "extract", "type": "agent_activity", "config": {...}},
{"id": "transform", "type": "agent_activity", "config": {...}},
{"id": "load", "type": "agent_activity", "config": {...}},
],
"input_data": {...},
},
)Complete document analysis pipeline.
from app.workflow import DocumentAnalysisWorkflow
engine.register_workflow(DocumentAnalysisWorkflow)
result = await engine.execute_workflow(
workflow_id="doc-analysis-001",
workflow_type="DocumentAnalysisWorkflow",
input_data={
"document_id": "doc-123",
"document_path": "/path/to/document.pdf",
"analysis_config": {"extract_entities": True},
"extraction_types": ["entities", "relationships", "events"],
},
)
# Result structure:
# {
# "document_id": "doc-123",
# "stages": {
# "ingestion": {...},
# "structure": {...},
# "analysis": {...},
# "knowledge": {...},
# "summary": {...},
# "validation": {...},
# },
# "final": {...},
# }Comprehensive research workflow.
from app.workflow import ResearchWorkflow
engine.register_workflow(ResearchWorkflow)
result = await engine.execute_workflow(
workflow_id="research-001",
workflow_type="ResearchWorkflow",
input_data={
"topic": "AI Safety",
"sources": ["web", "academic", "news"],
"depth": "deep",
"research_config": {"citation_style": "apa"},
},
)Generate content with multiple sections.
from app.workflow import ContentGenerationWorkflow
engine.register_workflow(ContentGenerationWorkflow)
result = await engine.execute_workflow(
workflow_id="content-001",
workflow_type="ContentGenerationWorkflow",
input_data={
"topic": "Introduction to Machine Learning",
"content_type": "article",
"target_audience": "beginners",
"length": "long",
"tone": "friendly",
},
)Generate code with multiple modules.
from app.workflow import CodeGenerationWorkflow
engine.register_workflow(CodeGenerationWorkflow)
result = await engine.execute_workflow(
workflow_id="code-001",
workflow_type="CodeGenerationWorkflow",
input_data={
"requirements": {"description": "REST API for user management"},
"tech_stack": {"language": "python", "framework": "fastapi"},
"modules": [
{"name": "models", "description": "Database models"},
{"name": "routes", "description": "API routes"},
{"name": "services", "description": "Business logic"},
],
"coding_standards": {"style": "pep8"},
},
)Generate complete books.
from app.workflow import BookGenerationWorkflow
engine.register_workflow(BookGenerationWorkflow)
result = await engine.execute_workflow(
workflow_id="book-001",
workflow_type="BookGenerationWorkflow",
input_data={
"book_title": "Introduction to Python",
"genre": "technical",
"target_audience": "beginners",
"chapter_count": 12,
"writing_style": "educational",
},
)Create, start, pause, resume, cancel workflows.
from app.workflow import WorkflowManager, WorkflowEngine
engine = WorkflowEngine()
await engine.connect()
manager = WorkflowManager(engine)
# Create workflow
execution = await manager.create_workflow(
workflow_type="DocumentAnalysisWorkflow",
input_data={"document_id": "doc-123"},
)
# Start workflow
run_id = await manager.start_workflow(execution.execution_id)
# Pause workflow
await manager.pause_workflow(execution.execution_id)
# Resume workflow
await manager.resume_workflow(execution.execution_id)
# Cancel workflow
await manager.cancel_workflow(execution.execution_id)
# Get status
state = await manager.get_workflow_status(execution.execution_id)Schedule workflows with cron expressions.
from app.workflow import WorkflowScheduler, WorkflowManager
scheduler = WorkflowScheduler(manager)
await scheduler.start()
# Schedule daily workflow
schedule_id = scheduler.schedule_workflow(
workflow_type="DocumentAnalysisWorkflow",
cron_expression="0 9 * * *", # Daily at 9 AM
input_data={"document_id": "daily-doc"},
)
# Unscheduled
scheduler.unschedule_workflow(schedule_id)Monitor workflow health and progress.
from app.workflow import WorkflowMonitor, WorkflowManager
monitor = WorkflowMonitor(manager)
await monitor.start()
# Get progress
progress = await monitor.get_workflow_progress(execution_id)
print(f"Progress: {progress['progress']}%")
# Check health
health = await monitor.check_health()
print(f"System healthy: {health['healthy']}")
# Get metrics
metrics = monitor.metrics
print(f"Total executions: {metrics['total_executions']}")| Method | Endpoint | Description |
|---|---|---|
| POST | /api/v1/workflows |
Create workflow |
| GET | /api/v1/workflows |
List workflows |
| GET | /api/v1/workflows/{id} |
Get workflow details |
| POST | /api/v1/workflows/{id}/execute |
Start execution |
| POST | /api/v1/workflows/{id}/pause |
Pause execution |
| POST | /api/v1/workflows/{id}/resume |
Resume execution |
| POST | /api/v1/workflows/{id}/cancel |
Cancel execution |
| GET | /api/v1/workflows/{id}/history |
Get execution history |
| GET | /api/v1/workflows/{id}/progress |
Get real-time progress |
| POST | /api/v1/workflows/schedule |
Schedule recurring workflow |
| DELETE | /api/v1/workflows/schedule/{id} |
Remove schedule |
| GET | /api/v1/workflows/monitor/health |
Get health status |
| GET | /api/v1/workflows/monitor/metrics |
Get metrics |
# Create workflow
curl -X POST "http://localhost:8000/api/v1/workflows" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"workflow_type": "DocumentAnalysisWorkflow",
"input_data": {"document_id": "doc-123"}
}'
# Start execution
curl -X POST "http://localhost:8000/api/v1/workflows/{execution_id}/execute" \
-H "Authorization: Bearer $TOKEN"
# Get progress
curl "http://localhost:8000/api/v1/workflows/{execution_id}/progress" \
-H "Authorization: Bearer $TOKEN"from app.workflow.integration import AgentIntegration
agent_integration = AgentIntegration(
agent_orchestrator=orchestrator,
agent_service=agent_service,
)
result = await agent_integration.execute_agent_task(
agent_id="research-agent",
task_type="research_topic",
payload={"topic": "AI Safety"},
timeout=300,
)from app.workflow.integration import KnowledgeIntegration
knowledge_integration = KnowledgeIntegration(
knowledge_service=knowledge_service,
)
# Store knowledge
entity_id = await knowledge_integration.store_knowledge(
entity_type="document",
entity_data={"title": "AI Safety Report"},
workspace_id="ws-123",
)
# Retrieve knowledge
entities = await knowledge_integration.retrieve_knowledge(
entity_type="document",
query="AI Safety",
limit=10,
)from app.workflow.integration import WebSocketIntegration
ws_integration = WebSocketIntegration(
websocket_manager=websocket_manager,
event_publisher=event_publisher,
)
# Notify progress
await ws_integration.notify_workflow_progress(
workflow_id="workflow-123",
progress=50,
current_task="analyzing",
user_id="user-456",
)-
Install Docker Desktop (if not already installed)
-
Start Temporal:
docker run --rm --name temporal \
-p 7233:7233 \
temporalio/auto-setup:latest- Access Temporal Web UI:
docker run --rm --name temporal-ui \
-p 8080:8080 \
--env TEMPORAL_ADDRESS=temporal:7233 \
--link temporal \
temporalio/ui:latestVisit http://localhost:8080
-
Use Temporal Cloud (recommended) or self-hosted cluster
-
Configure connection:
TEMPORAL_HOST=your-temporal-host.temporal-cloud.com
TEMPORAL_PORT=7233
TEMPORAL_NAMESPACE=your-namespace
TEMPORAL_API_KEY=your-api-key- Set up TLS:
from temporalio.client import Client
client = await Client.connect(
"your-host:7233",
namespace="your-namespace",
api_key="your-api-key",
tls=True,
)- Run worker as separate service:
# worker.py
from app.workflow import init_workflow_engine, DocumentAnalysisWorkflow
async def main():
engine = await init_workflow_engine(
temporal_host="temporal",
temporal_port=7233,
task_queue="workflow-queue",
)
# Register all workflows
engine.register_workflow(DocumentAnalysisWorkflow)
# ... register other workflows
# Start worker
await engine.start_worker()
if __name__ == "__main__":
asyncio.run(main())- Deploy with Docker:
FROM python:3.11-slim
WORKDIR /app
COPY . .
RUN pip install poetry && poetry install
CMD ["python", "worker.py"]import pytest
from app.workflow.lightweight import LightweightWorkflowEngine
@pytest.mark.asyncio
async def test_sequential_workflow():
engine = LightweightWorkflowEngine()
async def task1(data):
return {"step": 1}
async def task2(data):
return {"step": 2}
engine.register_workflow("test", lambda d: {"result": "ok"})
execution_id = await engine.execute("test", {})
status = await engine.get_status(execution_id)
assert status.status.value == "completed"import pytest
from app.workflow import WorkflowEngine, SequentialWorkflow
@pytest.mark.asyncio
async def test_temporal_workflow():
engine = WorkflowEngine()
try:
await engine.connect()
engine.register_workflow(SequentialWorkflow)
# Start worker in background
worker_task = asyncio.create_task(engine.start_worker())
# Give worker time to start
await asyncio.sleep(1)
# Execute workflow
run_id = await engine.execute_workflow(
workflow_id="test-001",
workflow_type="SequentialWorkflow",
input_data={"tasks": []},
)
assert run_id is not None
finally:
await engine.shutdown()
worker_task.cancel()- Temporal Dependency: Production workflows require Temporal server
- Agent Integration: Placeholder implementations need actual agent framework integration
- Message Persistence: Workflow messages not persisted by default
- Distributed Agents: Assumes single-process deployment
- Resource Limits: No built-in resource quota management
- Add message persistence layer
- Implement distributed agent support
- Add LLM provider integration
- Implement resource quota management
- Add workflow versioning and migration
- Implement workflow templates library
- Add comprehensive monitoring dashboard
- Implement workflow collaboration patterns
- Add retry policies configuration
- Implement circuit breaker pattern
- Add workflow analytics and reporting
- Implement workflow debugging tools
- Add workflow simulation mode
- Implement workflow cost tracking
For issues and questions:
- Check existing issues in the repository
- Review Temporal documentation: https://docs.temporal.io
- Contact the development team