| layout | default |
|---|---|
| title | Chapter 5: Error Handling |
| parent | Deer Flow Tutorial |
| nav_order | 5 |
Welcome to Chapter 5: Error Handling. In this part of Deer Flow Tutorial: Distributed Workflow Orchestration Platform, you will build an intuitive mental model first, then move into concrete implementation details and practical production tradeoffs.
Implement fault-tolerant workflows with retries, fallbacks, and recovery mechanisms.
Production workflows must handle failures gracefully. Deer Flow provides comprehensive error handling mechanisms including retries, fallbacks, timeouts, and alerting to ensure workflow reliability.
{
"id": "retryable_task",
"type": "http",
"config": {
"url": "https://api.example.com/data"
},
"retry": {
"max_attempts": 3,
"delay": 5
}
}{
"id": "backoff_task",
"type": "http",
"config": {
"url": "https://api.example.com/data"
},
"retry": {
"max_attempts": 5,
"initial_delay": 1,
"max_delay": 60,
"backoff": "exponential",
"multiplier": 2
}
}Attempt 1: immediate
Attempt 2: wait 1s
Attempt 3: wait 2s
Attempt 4: wait 4s
Attempt 5: wait 8s (capped at max_delay)
{
"id": "conditional_retry",
"type": "http",
"config": {
"url": "https://api.example.com/data"
},
"retry": {
"max_attempts": 3,
"retry_on": {
"exceptions": ["TimeoutError", "ConnectionError"],
"status_codes": [429, 500, 502, 503, 504],
"conditions": ["${output.retry_requested == true}"]
},
"no_retry_on": {
"exceptions": ["AuthenticationError"],
"status_codes": [400, 401, 403, 404]
}
}
}from deerflow import Workflow, retry
workflow = Workflow(name="retry_example")
@workflow.task(id="flaky_operation")
@retry(
max_attempts=3,
backoff="exponential",
retry_on=[TimeoutError, ConnectionError]
)
def flaky_operation(context):
# This will be retried on failure
response = make_api_call()
return response{
"id": "bounded_task",
"type": "python",
"config": {
"script": "long_running.py"
},
"timeout": {
"execution": 3600,
"idle": 300,
"queue": 600
}
}- execution: Maximum total execution time
- idle: Maximum time without output
- queue: Maximum time waiting in queue
{
"name": "timed_workflow",
"timeout": 7200,
"tasks": [
{"id": "task1", "timeout": 1800, "...": "..."},
{"id": "task2", "timeout": 1800, "...": "..."},
{"id": "task3", "timeout": 1800, "...": "..."}
]
}from deerflow import Workflow, TimeoutError
@workflow.task(id="timeout_aware")
def process_with_timeout(context):
try:
result = long_operation()
return result
except TimeoutError:
# Save partial progress
save_checkpoint(context.checkpoint_path)
raise{
"id": "primary_task",
"type": "http",
"config": {
"url": "https://primary-api.com/data"
},
"fallback": {
"task": {
"type": "http",
"config": {
"url": "https://backup-api.com/data"
}
}
}
}{
"id": "resilient_fetch",
"type": "http",
"config": {"url": "https://api1.example.com"},
"fallbacks": [
{
"type": "http",
"config": {"url": "https://api2.example.com"}
},
{
"type": "http",
"config": {"url": "https://api3.example.com"}
},
{
"type": "python",
"config": {
"script": "load_cached_data.py"
}
}
]
}from deerflow import Workflow, fallback_value
@workflow.task(id="with_default")
@fallback_value({"status": "unknown", "data": []})
def fetch_data(context):
# If this fails, return the default value
return fetch_from_api(){
"id": "monitored_task",
"type": "python",
"config": {"script": "process.py"},
"on_failure": {
"tasks": [
{
"id": "send_alert",
"type": "http",
"config": {
"method": "POST",
"url": "https://alerts.example.com/webhook",
"body": {
"message": "Task ${task.id} failed",
"error": "${task.error}",
"workflow": "${workflow.name}"
}
}
},
{
"id": "cleanup",
"type": "shell",
"command": "rm -rf /tmp/task_${task.id}/*"
}
]
}
}{
"name": "monitored_workflow",
"on_failure": {
"notify": {
"type": "slack",
"channel": "#alerts",
"message": "Workflow ${workflow.name} failed: ${workflow.error}"
},
"cleanup": {
"type": "shell",
"command": "cleanup.sh ${execution.id}"
}
},
"on_success": {
"notify": {
"type": "slack",
"channel": "#success",
"message": "Workflow ${workflow.name} completed successfully"
}
},
"tasks": [...]
}from deerflow import Workflow, on_failure, on_success
workflow = Workflow(name="callback_example")
@workflow.on_failure
def handle_workflow_failure(context, error):
send_alert(
channel="#alerts",
message=f"Workflow failed: {error}",
execution_id=context.execution_id
)
cleanup_resources(context)
@workflow.on_success
def handle_workflow_success(context):
update_dashboard(context.execution_id, status="success")
@workflow.task(id="risky_task")
@on_failure(lambda ctx, err: log_failure(ctx, err))
def risky_task(context):
# Task implementation
passfrom deerflow import Workflow, CircuitBreaker
workflow = Workflow(name="circuit_breaker_example")
# Configure circuit breaker
breaker = CircuitBreaker(
failure_threshold=5, # Open after 5 failures
reset_timeout=60, # Try again after 60s
half_open_requests=3 # Test with 3 requests
)
@workflow.task(id="protected_call")
@breaker.protect
def call_external_service(context):
response = requests.get("https://unreliable-api.com")
return response.json()┌─────────────────────────────────────────────────────────────────┐
│ Circuit Breaker States │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ failures >= threshold ┌─────────┐ │
│ │ CLOSED │ ───────────────────────────▶│ OPEN │ │
│ │(normal) │ │ (fail │ │
│ └────┬────┘ │ fast) │ │
│ │ └────┬────┘ │
│ │ success │ │
│ │ │ timeout │
│ │ ▼ │
│ │ ┌────────────────┐ │
│ │ │ HALF-OPEN │ │
│ │ │ (test requests)│ │
│ │ └────────┬───────┘ │
│ │ │ │
│ │ success ──────────┘ │
│ │ │ │
│ └───────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
{
"name": "dlq_workflow",
"dead_letter_queue": {
"enabled": true,
"queue": "workflow-dlq",
"retention": "7d",
"include_context": true
},
"tasks": [...]
}from deerflow import DLQProcessor
processor = DLQProcessor(queue="workflow-dlq")
# List failed tasks
failed = processor.list(
workflow="my_workflow",
since="2024-01-01"
)
# Retry a specific failure
processor.retry(failure_id="abc123")
# Retry all failures for a workflow
processor.retry_all(workflow="my_workflow")
# Purge old failures
processor.purge(older_than="7d")from deerflow import Workflow, checkpoint
workflow = Workflow(name="checkpoint_example")
@workflow.task(id="long_process")
@checkpoint(interval=100) # Checkpoint every 100 items
def process_large_dataset(context):
dataset = load_dataset()
# Resume from checkpoint if exists
start_idx = context.checkpoint.get("last_index", 0)
for i, item in enumerate(dataset[start_idx:], start=start_idx):
process_item(item)
# Save checkpoint periodically
if i % 100 == 0:
context.save_checkpoint({"last_index": i})
return {"processed": len(dataset)}# Resume failed workflow from last checkpoint
deerflow resume execution_id
# Resume from specific task
deerflow resume execution_id --from-task task_id
# Resume with modified parameters
deerflow resume execution_id --param key=new_value# config/alerts.yaml
alerts:
channels:
slack:
webhook_url: https://hooks.slack.com/...
default_channel: "#workflow-alerts"
email:
smtp_host: smtp.example.com
from_address: alerts@example.com
recipients:
- team@example.com
pagerduty:
api_key: ${PAGERDUTY_KEY}
service_id: P123ABC
rules:
- name: critical_failure
condition: "workflow.status == 'failed' && workflow.tags.critical"
channels: [slack, pagerduty]
severity: critical
- name: task_timeout
condition: "task.status == 'timeout'"
channels: [slack]
severity: warning
- name: retry_exhausted
condition: "task.retries_exhausted"
channels: [email]
severity: highIn this chapter, you've learned:
- Retries: Basic, exponential backoff, conditional
- Timeouts: Task and workflow level
- Fallbacks: Task fallbacks and defaults
- Callbacks: Failure and success handlers
- Circuit Breaker: Protect against cascading failures
- Recovery: Checkpoints and workflow resume
- Alerting: Notifications for failures
- Retry Intelligently: Use backoff and conditions
- Set Timeouts: Prevent infinite waits
- Plan Fallbacks: Have alternatives ready
- Checkpoint Progress: Enable partial recovery
- Alert Early: Catch failures before impact
Ready to scale your workflows? Let's explore distributed execution in Chapter 6.
Ready for Chapter 6? Scaling
Generated for Awesome Code Docs
Most teams struggle here because the hard part is not writing more code, but deciding clear boundaries for workflow, task, context so behavior stays predictable as complexity grows.
In practical terms, this chapter helps you avoid three common failures:
- coupling core logic too tightly to one implementation path
- missing the handoff boundaries between setup, execution, and validation
- shipping changes without clear rollback or observability strategy
After working through this chapter, you should be able to reason about Chapter 5: Error Handling as an operating subsystem inside Deer Flow Tutorial: Distributed Workflow Orchestration Platform, with explicit contracts for inputs, state transitions, and outputs.
Use the implementation notes around config, Workflow, name as your checklist when adapting these patterns to your own repository.
Under the hood, Chapter 5: Error Handling usually follows a repeatable control path:
- Context bootstrap: initialize runtime config and prerequisites for
workflow. - Input normalization: shape incoming data so
taskreceives stable contracts. - Core execution: run the main logic branch and propagate intermediate state through
context. - Policy and safety checks: enforce limits, auth scopes, and failure boundaries.
- Output composition: return canonical result payloads for downstream consumers.
- Operational telemetry: emit logs/metrics needed for debugging and performance tuning.
When debugging, walk this sequence in order and confirm each stage has explicit success/failure conditions.
Use the following upstream sources to verify implementation details while reading this chapter:
- Official Documentation
Why it matters: authoritative reference on
Official Documentation(github.com). - GitHub Repository
Why it matters: authoritative reference on
GitHub Repository(github.com). - API Reference
Why it matters: authoritative reference on
API Reference(github.com). - Community & Issues
Why it matters: authoritative reference on
Community & Issues(github.com). - Workflow Examples
Why it matters: authoritative reference on
Workflow Examples(github.com). - AI Codebase Knowledge Builder
Why it matters: authoritative reference on
AI Codebase Knowledge Builder(github.com).
Suggested trace strategy:
- search upstream code for
workflowandtaskto map concrete implementation paths - compare docs claims against actual runtime/config code before reusing patterns in production