Difficulty: Intermediate Status: ✅ Complete Prerequisites: Complete 02_workflows first
This section demonstrates data flow and inter-task communication in Graflow - how tasks share data, state, and results through channels and the execution context.
- 📡 Using channels for inter-task communication
- 🔒 Thread-safe channel operations (
atomic_add,lock) - 🏷️ Type-safe channels with TypedDict
- 💾 Storing and retrieving task results
- 🔄 Data flow patterns in workflows
- 📊 Sharing state across task boundaries
Concept: Basic channel operations
Learn how to use channels for sharing data between tasks without direct parameter passing.
uv run python examples/03_data_flow/channels_basic.pyKey Concepts:
- Getting channel instances
- Setting and getting values
- Channel keys and existence checks
- Default values
- Channel lifecycle
Concept: Thread-safe channel operations
Learn how to safely update shared channel data when tasks run in parallel using ParallelGroup.
uv run python examples/03_data_flow/channel_concurrency.pyKey Concepts:
- Race conditions with naive
get/setunder concurrency - Atomic counter updates with
channel.atomic_add() - Advisory locking with
channel.lock()for compound operations - Threshold-based reset pattern with multi-key updates
Concept: Type-safe channels
Master TypedChannels for type-safe inter-task communication using TypedDict schemas.
uv run python examples/03_data_flow/typed_channels.pyKey Concepts:
- Defining message schemas with TypedDict
- Creating TypedChannel instances
- Type checking and validation
- IDE autocomplete support
- Schema evolution
Concept: Task results and dependency data
Learn how to store task results and access them from dependent tasks.
uv run python examples/03_data_flow/results_storage.pyKey Concepts:
- Task result storage
- Retrieving results by task ID
- Building data pipelines
- Result propagation
- Error handling
@task(inject_context=True)
def setup(ctx: TaskExecutionContext):
channel = ctx.get_channel()
channel.set("config", {"batch_size": 100})
@task(inject_context=True)
def process(ctx: TaskExecutionContext):
channel = ctx.get_channel()
config = channel.get("config")
batch_size = config["batch_size"]@task(inject_context=True)
def process_batch(ctx: TaskExecutionContext):
channel = ctx.get_channel()
# Get current state
total = channel.get("total_processed", 0)
# Update state
total += 100
channel.set("total_processed", total)@task(inject_context=True)
def count_items(ctx: TaskExecutionContext):
channel = ctx.get_channel()
# Atomic — safe for parallel tasks
channel.atomic_add("processed_count", 1)@task(inject_context=True)
def check_and_reset(ctx: TaskExecutionContext):
channel = ctx.get_channel()
with channel.lock("counter"):
val = channel.get("counter")
if val >= threshold:
channel.set("counter", 0)
channel.atomic_add("overflow_count", 1)
else:
channel.set("counter", val + 1)from typing import TypedDict
class MetricsMessage(TypedDict):
count: int
status: str
timestamp: float
@task(inject_context=True)
def collect_metrics(ctx: TaskExecutionContext):
typed_channel = ctx.get_typed_channel(MetricsMessage)
metrics: MetricsMessage = {
"count": 100,
"status": "success",
"timestamp": time.time()
}
typed_channel.set("metrics", metrics)with workflow("pipeline") as ctx:
@task
def fetch():
return {"data": [1, 2, 3]}
@task(inject_context=True)
def process(ctx: TaskExecutionContext):
# Get result from previous task
data = ctx.get_result("fetch")
return {"processed": len(data["data"])}
fetch >> processGraflow supports different channel backends:
- Use: Single-process workflows
- Scope: Process-local
- Persistence: No (in-memory only)
context = ExecutionContext.create(
graph, "start",
channel_backend="memory"
)- Use: Distributed workflows
- Scope: Across processes/workers
- Persistence: Redis server
context = ExecutionContext.create(
graph, "start",
channel_backend="redis",
config={"redis_client": redis_client}
)-
Use channels for shared state
channel.set("shared_config", config)
-
Use TypedChannels for complex data
typed_channel = ctx.get_typed_channel(MySchema)
-
Provide default values
value = channel.get("key", default=0)
-
Use descriptive keys
channel.set("user_auth_token", token)
-
Don't store large objects
# Bad: 100MB dataset channel.set("data", huge_dataset) # Good: Store reference or path channel.set("data_path", "/path/to/data")
-
Don't use channels for task parameters
# Bad: Using channel for direct parameters @task(inject_context=True) def process(ctx): x = ctx.get_channel().get("x") # Good: Use function parameters @task def process(x: int): return x * 2
-
Don't assume key existence
# Bad: May raise KeyError value = channel.get("key") # Good: Use default value = channel.get("key", default=None)
channel = ctx.get_channel()
# Set value
channel.set("key", "value")
# Get value
value = channel.get("key")
# Get with default
value = channel.get("key", default="default")
# Check existence
if "key" in channel.keys():
value = channel.get("key")
# List all keys
all_keys = channel.keys()from typing import TypedDict
class MyMessage(TypedDict):
count: int
status: str
typed_channel = ctx.get_typed_channel(MyMessage)
# Set typed message
message: MyMessage = {"count": 42, "status": "ok"}
typed_channel.set("msg", message)
# Get typed message (with IDE support)
msg = typed_channel.get("msg")
print(msg["count"]) # IDE knows this is int- ✅ Shared configuration across many tasks
- ✅ Accumulating state (counters, logs)
- ✅ Broadcasting data to multiple consumers
- ✅ Decoupling task implementations
- ✅ Direct task-to-task data passing
- ✅ Required inputs for task execution
- ✅ Simple, linear data flow
- ✅ Type-checked function arguments
@task(inject_context=True)
def load_config(ctx: TaskExecutionContext):
config = {"db": "postgres", "batch": 100}
ctx.get_channel().set("config", config)
@task(inject_context=True)
def use_config(ctx: TaskExecutionContext):
config = ctx.get_channel().get("config")
# All tasks can access config@task(inject_context=True)
def track_metrics(ctx: TaskExecutionContext):
channel = ctx.get_channel()
# Thread-safe counter — works in parallel tasks
channel.atomic_add("processed", 1)@task(inject_context=True)
def handle_error(ctx: TaskExecutionContext):
channel = ctx.get_channel()
errors = channel.get("errors", [])
errors.append({"task": ctx.task_id, "error": "..."})
channel.set("errors", errors)@task(inject_context=True)
def init_resources(ctx: TaskExecutionContext):
# Store connection string (not the connection itself!)
ctx.get_channel().set("db_url", "postgresql://...")
@task(inject_context=True)
def use_resource(ctx: TaskExecutionContext):
db_url = ctx.get_channel().get("db_url")
# Create connection from URL-
Inspect channel state:
channel = ctx.get_channel() print(f"Channel keys: {channel.keys()}") for key in channel.keys(): print(f"{key}: {channel.get(key)}")
-
Log channel operations:
channel.set("key", value) print(f"[{ctx.task_id}] Set channel key 'key' = {value}")
-
Check for key existence:
if "expected_key" not in channel.keys(): print(f"Warning: Expected key not found in channel")
After mastering data flow:
- 04_execution: Learn custom execution handlers
- 05_distributed: Scale workflows with distributed execution
- 06_advanced: Explore advanced patterns like cycles and dynamic tasks
Channel:
channel.set(key, value)- Store valuechannel.get(key, default=None)- Retrieve valuechannel.keys()- List all keyschannel.atomic_add(key, amount=1)- Atomic numeric add/subtract (thread-safe forMemoryChannel; single-command atomic for Redis)channel.lock(key, timeout=10.0)- Advisory per-key lock for compound read-modify-write operations (thread-safe viathreading.RLock)
TypedChannel:
typed_channel = ctx.get_typed_channel(SchemaClass)- Create typed channeltyped_channel.set(key, message)- Store typed messagetyped_channel.get(key)- Retrieve typed message
ExecutionContext:
ctx.get_channel()- Get channel instancectx.get_typed_channel(MessageType)- Get typed channelctx.get_result(task_id, default=None)- Get task result
Ready to master data flow? Start with channels_basic.py! 🚀