Skip to content

Latest commit

 

History

History
525 lines (430 loc) · 16.8 KB

File metadata and controls

525 lines (430 loc) · 16.8 KB
layout default
title Chapter 2: Workflow Basics
parent Deer Flow Tutorial
nav_order 2

Chapter 2: Workflow Basics

Welcome to Chapter 2: Workflow Basics. In this part of Deer Flow Tutorial: Distributed Workflow Orchestration Platform, you will build an intuitive mental model first, then move into concrete implementation details and practical production tradeoffs.

Learn to create and manage basic workflows with Deer Flow's workflow definition system.

Overview

Workflows are the core abstraction in Deer Flow. They define a series of tasks, their relationships, and execution parameters. This chapter covers fundamental workflow concepts and creation patterns.

Workflow Structure

Anatomy of a Workflow

┌─────────────────────────────────────────────────────────────────┐
│                      Deer Flow Workflow                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    Workflow Metadata                     │   │
│  │  - Name, ID, Description                                 │   │
│  │  - Version, Tags, Labels                                 │   │
│  │  - Schedule, Triggers                                    │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              │                                  │
│                              ▼                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                     Task Definitions                     │   │
│  │  ┌─────────┐   ┌─────────┐   ┌─────────┐               │   │
│  │  │ Task A  │──▶│ Task B  │──▶│ Task C  │               │   │
│  │  └─────────┘   └─────────┘   └─────────┘               │   │
│  │       │                            │                     │   │
│  │       └────────────┬───────────────┘                     │   │
│  │                    ▼                                     │   │
│  │              ┌─────────┐                                 │   │
│  │              │ Task D  │                                 │   │
│  │              └─────────┘                                 │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              │                                  │
│                              ▼                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                  Execution Configuration                 │   │
│  │  - Retry policies, Timeouts                              │   │
│  │  - Resource requirements                                 │   │
│  │  - Notifications, Callbacks                              │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Workflow Definition Format

{
  "name": "data_pipeline",
  "version": "1.0.0",
  "description": "Daily data processing pipeline",
  "metadata": {
    "owner": "data-team",
    "tags": ["etl", "daily"]
  },
  "tasks": [
    {
      "id": "extract",
      "type": "python",
      "config": {
        "script": "extract_data.py"
      }
    },
    {
      "id": "transform",
      "type": "python",
      "depends_on": ["extract"],
      "config": {
        "script": "transform_data.py"
      }
    },
    {
      "id": "load",
      "type": "python",
      "depends_on": ["transform"],
      "config": {
        "script": "load_data.py"
      }
    }
  ],
  "schedule": "0 2 * * *"
}

Creating Workflows

Using the CLI

# Create workflow from file
deerflow create -f workflow.json

# Create workflow with inline definition
deerflow create --name "my_workflow" \
    --task "step1:shell:echo Hello" \
    --task "step2:shell:echo World" \
    --depends "step2:step1"

# List workflows
deerflow list

# Get workflow details
deerflow get my_workflow

# Delete workflow
deerflow delete my_workflow

Using the Python SDK

from deerflow import Workflow, Task, ShellTask, PythonTask

# Create workflow
workflow = Workflow(
    name="data_pipeline",
    description="Daily data processing"
)

# Add tasks
extract = ShellTask(
    id="extract",
    command="python extract.py"
)

transform = PythonTask(
    id="transform",
    script="transform.py",
    depends_on=["extract"]
)

load = PythonTask(
    id="load",
    script="load.py",
    depends_on=["transform"]
)

workflow.add_tasks([extract, transform, load])

# Register workflow
workflow.register()

Using the REST API

# Create workflow via API
curl -X POST http://localhost:8080/api/workflows \
    -H "Content-Type: application/json" \
    -d '{
        "name": "api_workflow",
        "tasks": [
            {"id": "task1", "type": "shell", "command": "echo Hello"}
        ]
    }'

# Get workflow
curl http://localhost:8080/api/workflows/api_workflow

# Update workflow
curl -X PUT http://localhost:8080/api/workflows/api_workflow \
    -H "Content-Type: application/json" \
    -d @updated_workflow.json

Task Types

Shell Tasks

{
  "id": "shell_task",
  "type": "shell",
  "config": {
    "command": "python script.py",
    "working_dir": "/app/scripts",
    "env": {
      "ENV_VAR": "value"
    },
    "timeout": 3600
  }
}

Python Tasks

{
  "id": "python_task",
  "type": "python",
  "config": {
    "script": "process_data.py",
    "function": "main",
    "args": ["arg1", "arg2"],
    "kwargs": {"key": "value"},
    "requirements": ["pandas", "numpy"]
  }
}

HTTP Tasks

{
  "id": "api_call",
  "type": "http",
  "config": {
    "method": "POST",
    "url": "https://api.example.com/webhook",
    "headers": {
      "Authorization": "Bearer ${API_TOKEN}"
    },
    "body": {
      "data": "${task.previous.output}"
    },
    "timeout": 30,
    "retry": {
      "max_attempts": 3,
      "backoff": "exponential"
    }
  }
}

Docker Tasks

{
  "id": "docker_task",
  "type": "docker",
  "config": {
    "image": "python:3.11",
    "command": ["python", "script.py"],
    "volumes": [
      "/data:/app/data"
    ],
    "environment": {
      "ENV": "production"
    },
    "resources": {
      "memory": "2Gi",
      "cpu": "1"
    }
  }
}

Workflow Execution

Running Workflows

# Run workflow immediately
deerflow run my_workflow

# Run with parameters
deerflow run my_workflow --param date=2024-01-15 --param env=prod

# Run specific tasks only
deerflow run my_workflow --task transform --task load

# Dry run (validate without executing)
deerflow run my_workflow --dry-run

Execution States

┌─────────────────────────────────────────────────────────────────┐
│                   Workflow Execution States                      │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────┐   ┌─────────┐   ┌─────────┐   ┌─────────┐        │
│  │ PENDING │──▶│ RUNNING │──▶│ SUCCESS │   │         │        │
│  └─────────┘   └────┬────┘   └─────────┘   │         │        │
│                     │                       │ SKIPPED │        │
│                     ▼                       │         │        │
│               ┌─────────┐                  └─────────┘        │
│               │ FAILED  │                                      │
│               └────┬────┘                                      │
│                    │                                           │
│                    ▼                                           │
│               ┌─────────┐   ┌─────────┐                       │
│               │ RETRY   │──▶│ SUCCESS │                       │
│               └─────────┘   │ /FAILED │                       │
│                             └─────────┘                       │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Monitoring Execution

# Watch execution in real-time
deerflow watch execution_id

# Get execution status
deerflow status execution_id

# Get execution logs
deerflow logs execution_id
deerflow logs execution_id --task transform

# List recent executions
deerflow executions --workflow my_workflow --limit 10

Input and Output

Task Parameters

{
  "id": "parameterized_task",
  "type": "python",
  "config": {
    "script": "process.py",
    "args": [
      "${params.date}",
      "${params.environment}"
    ]
  }
}

Passing Data Between Tasks

{
  "tasks": [
    {
      "id": "fetch_data",
      "type": "python",
      "config": {
        "script": "fetch.py"
      },
      "outputs": ["data_path", "record_count"]
    },
    {
      "id": "process_data",
      "type": "python",
      "depends_on": ["fetch_data"],
      "config": {
        "script": "process.py",
        "args": [
          "${tasks.fetch_data.outputs.data_path}"
        ]
      }
    }
  ]
}

Using Python SDK for Data Flow

from deerflow import Workflow, PythonTask, Output

workflow = Workflow(name="data_flow_example")

@workflow.task(id="producer")
def produce_data():
    data = {"records": 100, "file": "/tmp/data.csv"}
    return Output(data)

@workflow.task(id="consumer", depends_on=["producer"])
def consume_data(producer_output):
    print(f"Processing {producer_output['records']} records")
    print(f"File: {producer_output['file']}")

Scheduling

Cron Schedules

{
  "name": "scheduled_workflow",
  "schedule": {
    "type": "cron",
    "expression": "0 2 * * *",
    "timezone": "UTC"
  },
  "tasks": [...]
}

Interval Schedules

{
  "name": "interval_workflow",
  "schedule": {
    "type": "interval",
    "every": "1h",
    "start_time": "2024-01-01T00:00:00Z"
  },
  "tasks": [...]
}

Event Triggers

{
  "name": "event_triggered",
  "triggers": [
    {
      "type": "webhook",
      "path": "/trigger/my_workflow"
    },
    {
      "type": "file",
      "path": "/data/incoming/*.csv",
      "event": "created"
    },
    {
      "type": "queue",
      "queue": "workflow-triggers",
      "filter": {"type": "process_request"}
    }
  ],
  "tasks": [...]
}

Summary

In this chapter, you've learned:

  • Workflow Structure: Metadata, tasks, and execution configuration
  • Creating Workflows: CLI, SDK, and API methods
  • Task Types: Shell, Python, HTTP, and Docker tasks
  • Execution: Running, monitoring, and managing workflows
  • Data Flow: Parameters and task outputs
  • Scheduling: Cron, interval, and event triggers

Key Takeaways

  1. JSON Definitions: Workflows are declaratively defined
  2. Multiple Task Types: Choose the right task type for each job
  3. Flexible Execution: Run immediately or schedule
  4. Data Passing: Tasks can share outputs
  5. Event-Driven: Trigger workflows from various sources

Next Steps

Ready to explore different task types in depth? Let's dive into Chapter 3.


Ready for Chapter 3? Task Management

Generated for Awesome Code Docs

What Problem Does This Solve?

Most teams struggle here because the hard part is not writing more code, but deciding clear boundaries for workflow, deerflow, python so behavior stays predictable as complexity grows.

In practical terms, this chapter helps you avoid three common failures:

  • coupling core logic too tightly to one implementation path
  • missing the handoff boundaries between setup, execution, and validation
  • shipping changes without clear rollback or observability strategy

After working through this chapter, you should be able to reason about Chapter 2: Workflow Basics as an operating subsystem inside Deer Flow Tutorial: Distributed Workflow Orchestration Platform, with explicit contracts for inputs, state transitions, and outputs.

Use the implementation notes around script, config, tasks as your checklist when adapting these patterns to your own repository.

How it Works Under the Hood

Under the hood, Chapter 2: Workflow Basics usually follows a repeatable control path:

  1. Context bootstrap: initialize runtime config and prerequisites for workflow.
  2. Input normalization: shape incoming data so deerflow receives stable contracts.
  3. Core execution: run the main logic branch and propagate intermediate state through python.
  4. Policy and safety checks: enforce limits, auth scopes, and failure boundaries.
  5. Output composition: return canonical result payloads for downstream consumers.
  6. Operational telemetry: emit logs/metrics needed for debugging and performance tuning.

When debugging, walk this sequence in order and confirm each stage has explicit success/failure conditions.

Source Walkthrough

Use the following upstream sources to verify implementation details while reading this chapter:

  • Official Documentation Why it matters: authoritative reference on Official Documentation (github.com).
  • GitHub Repository Why it matters: authoritative reference on GitHub Repository (github.com).
  • API Reference Why it matters: authoritative reference on API Reference (github.com).
  • Community & Issues Why it matters: authoritative reference on Community & Issues (github.com).
  • Workflow Examples Why it matters: authoritative reference on Workflow Examples (github.com).
  • AI Codebase Knowledge Builder Why it matters: authoritative reference on AI Codebase Knowledge Builder (github.com).

Suggested trace strategy:

  • search upstream code for workflow and deerflow to map concrete implementation paths
  • compare docs claims against actual runtime/config code before reusing patterns in production

Chapter Connections