| layout | default |
|---|---|
| title | Chapter 2: Workflow Basics |
| parent | Deer Flow Tutorial |
| nav_order | 2 |
Welcome to Chapter 2: Workflow Basics. In this part of Deer Flow Tutorial: Distributed Workflow Orchestration Platform, you will build an intuitive mental model first, then move into concrete implementation details and practical production tradeoffs.
Learn to create and manage basic workflows with Deer Flow's workflow definition system.
Workflows are the core abstraction in Deer Flow. They define a series of tasks, their relationships, and execution parameters. This chapter covers fundamental workflow concepts and creation patterns.
┌─────────────────────────────────────────────────────────────────┐
│ Deer Flow Workflow │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Workflow Metadata │ │
│ │ - Name, ID, Description │ │
│ │ - Version, Tags, Labels │ │
│ │ - Schedule, Triggers │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Task Definitions │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Task A │──▶│ Task B │──▶│ Task C │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │ │ │
│ │ └────────────┬───────────────┘ │ │
│ │ ▼ │ │
│ │ ┌─────────┐ │ │
│ │ │ Task D │ │ │
│ │ └─────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Execution Configuration │ │
│ │ - Retry policies, Timeouts │ │
│ │ - Resource requirements │ │
│ │ - Notifications, Callbacks │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
{
"name": "data_pipeline",
"version": "1.0.0",
"description": "Daily data processing pipeline",
"metadata": {
"owner": "data-team",
"tags": ["etl", "daily"]
},
"tasks": [
{
"id": "extract",
"type": "python",
"config": {
"script": "extract_data.py"
}
},
{
"id": "transform",
"type": "python",
"depends_on": ["extract"],
"config": {
"script": "transform_data.py"
}
},
{
"id": "load",
"type": "python",
"depends_on": ["transform"],
"config": {
"script": "load_data.py"
}
}
],
"schedule": "0 2 * * *"
}# Create workflow from file
deerflow create -f workflow.json
# Create workflow with inline definition
deerflow create --name "my_workflow" \
--task "step1:shell:echo Hello" \
--task "step2:shell:echo World" \
--depends "step2:step1"
# List workflows
deerflow list
# Get workflow details
deerflow get my_workflow
# Delete workflow
deerflow delete my_workflowfrom deerflow import Workflow, Task, ShellTask, PythonTask
# Create workflow
workflow = Workflow(
name="data_pipeline",
description="Daily data processing"
)
# Add tasks
extract = ShellTask(
id="extract",
command="python extract.py"
)
transform = PythonTask(
id="transform",
script="transform.py",
depends_on=["extract"]
)
load = PythonTask(
id="load",
script="load.py",
depends_on=["transform"]
)
workflow.add_tasks([extract, transform, load])
# Register workflow
workflow.register()# Create workflow via API
curl -X POST http://localhost:8080/api/workflows \
-H "Content-Type: application/json" \
-d '{
"name": "api_workflow",
"tasks": [
{"id": "task1", "type": "shell", "command": "echo Hello"}
]
}'
# Get workflow
curl http://localhost:8080/api/workflows/api_workflow
# Update workflow
curl -X PUT http://localhost:8080/api/workflows/api_workflow \
-H "Content-Type: application/json" \
-d @updated_workflow.json{
"id": "shell_task",
"type": "shell",
"config": {
"command": "python script.py",
"working_dir": "/app/scripts",
"env": {
"ENV_VAR": "value"
},
"timeout": 3600
}
}{
"id": "python_task",
"type": "python",
"config": {
"script": "process_data.py",
"function": "main",
"args": ["arg1", "arg2"],
"kwargs": {"key": "value"},
"requirements": ["pandas", "numpy"]
}
}{
"id": "api_call",
"type": "http",
"config": {
"method": "POST",
"url": "https://api.example.com/webhook",
"headers": {
"Authorization": "Bearer ${API_TOKEN}"
},
"body": {
"data": "${task.previous.output}"
},
"timeout": 30,
"retry": {
"max_attempts": 3,
"backoff": "exponential"
}
}
}{
"id": "docker_task",
"type": "docker",
"config": {
"image": "python:3.11",
"command": ["python", "script.py"],
"volumes": [
"/data:/app/data"
],
"environment": {
"ENV": "production"
},
"resources": {
"memory": "2Gi",
"cpu": "1"
}
}
}# Run workflow immediately
deerflow run my_workflow
# Run with parameters
deerflow run my_workflow --param date=2024-01-15 --param env=prod
# Run specific tasks only
deerflow run my_workflow --task transform --task load
# Dry run (validate without executing)
deerflow run my_workflow --dry-run┌─────────────────────────────────────────────────────────────────┐
│ Workflow Execution States │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ PENDING │──▶│ RUNNING │──▶│ SUCCESS │ │ │ │
│ └─────────┘ └────┬────┘ └─────────┘ │ │ │
│ │ │ SKIPPED │ │
│ ▼ │ │ │
│ ┌─────────┐ └─────────┘ │
│ │ FAILED │ │
│ └────┬────┘ │
│ │ │
│ ▼ │
│ ┌─────────┐ ┌─────────┐ │
│ │ RETRY │──▶│ SUCCESS │ │
│ └─────────┘ │ /FAILED │ │
│ └─────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
# Watch execution in real-time
deerflow watch execution_id
# Get execution status
deerflow status execution_id
# Get execution logs
deerflow logs execution_id
deerflow logs execution_id --task transform
# List recent executions
deerflow executions --workflow my_workflow --limit 10{
"id": "parameterized_task",
"type": "python",
"config": {
"script": "process.py",
"args": [
"${params.date}",
"${params.environment}"
]
}
}{
"tasks": [
{
"id": "fetch_data",
"type": "python",
"config": {
"script": "fetch.py"
},
"outputs": ["data_path", "record_count"]
},
{
"id": "process_data",
"type": "python",
"depends_on": ["fetch_data"],
"config": {
"script": "process.py",
"args": [
"${tasks.fetch_data.outputs.data_path}"
]
}
}
]
}from deerflow import Workflow, PythonTask, Output
workflow = Workflow(name="data_flow_example")
@workflow.task(id="producer")
def produce_data():
data = {"records": 100, "file": "/tmp/data.csv"}
return Output(data)
@workflow.task(id="consumer", depends_on=["producer"])
def consume_data(producer_output):
print(f"Processing {producer_output['records']} records")
print(f"File: {producer_output['file']}"){
"name": "scheduled_workflow",
"schedule": {
"type": "cron",
"expression": "0 2 * * *",
"timezone": "UTC"
},
"tasks": [...]
}{
"name": "interval_workflow",
"schedule": {
"type": "interval",
"every": "1h",
"start_time": "2024-01-01T00:00:00Z"
},
"tasks": [...]
}{
"name": "event_triggered",
"triggers": [
{
"type": "webhook",
"path": "/trigger/my_workflow"
},
{
"type": "file",
"path": "/data/incoming/*.csv",
"event": "created"
},
{
"type": "queue",
"queue": "workflow-triggers",
"filter": {"type": "process_request"}
}
],
"tasks": [...]
}In this chapter, you've learned:
- Workflow Structure: Metadata, tasks, and execution configuration
- Creating Workflows: CLI, SDK, and API methods
- Task Types: Shell, Python, HTTP, and Docker tasks
- Execution: Running, monitoring, and managing workflows
- Data Flow: Parameters and task outputs
- Scheduling: Cron, interval, and event triggers
- JSON Definitions: Workflows are declaratively defined
- Multiple Task Types: Choose the right task type for each job
- Flexible Execution: Run immediately or schedule
- Data Passing: Tasks can share outputs
- Event-Driven: Trigger workflows from various sources
Ready to explore different task types in depth? Let's dive into Chapter 3.
Ready for Chapter 3? Task Management
Generated for Awesome Code Docs
Most teams struggle here because the hard part is not writing more code, but deciding clear boundaries for workflow, deerflow, python so behavior stays predictable as complexity grows.
In practical terms, this chapter helps you avoid three common failures:
- coupling core logic too tightly to one implementation path
- missing the handoff boundaries between setup, execution, and validation
- shipping changes without clear rollback or observability strategy
After working through this chapter, you should be able to reason about Chapter 2: Workflow Basics as an operating subsystem inside Deer Flow Tutorial: Distributed Workflow Orchestration Platform, with explicit contracts for inputs, state transitions, and outputs.
Use the implementation notes around script, config, tasks as your checklist when adapting these patterns to your own repository.
Under the hood, Chapter 2: Workflow Basics usually follows a repeatable control path:
- Context bootstrap: initialize runtime config and prerequisites for
workflow. - Input normalization: shape incoming data so
deerflowreceives stable contracts. - Core execution: run the main logic branch and propagate intermediate state through
python. - Policy and safety checks: enforce limits, auth scopes, and failure boundaries.
- Output composition: return canonical result payloads for downstream consumers.
- Operational telemetry: emit logs/metrics needed for debugging and performance tuning.
When debugging, walk this sequence in order and confirm each stage has explicit success/failure conditions.
Use the following upstream sources to verify implementation details while reading this chapter:
- Official Documentation
Why it matters: authoritative reference on
Official Documentation(github.com). - GitHub Repository
Why it matters: authoritative reference on
GitHub Repository(github.com). - API Reference
Why it matters: authoritative reference on
API Reference(github.com). - Community & Issues
Why it matters: authoritative reference on
Community & Issues(github.com). - Workflow Examples
Why it matters: authoritative reference on
Workflow Examples(github.com). - AI Codebase Knowledge Builder
Why it matters: authoritative reference on
AI Codebase Knowledge Builder(github.com).
Suggested trace strategy:
- search upstream code for
workflowanddeerflowto map concrete implementation paths - compare docs claims against actual runtime/config code before reusing patterns in production