-
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathchannels_basic.py
More file actions
235 lines (200 loc) · 7.47 KB
/
channels_basic.py
File metadata and controls
235 lines (200 loc) · 7.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
"""
Basic Channels Example
=======================
This example demonstrates the fundamental operations of channels in Graflow.
Channels provide a way for tasks to communicate and share data without direct
parameter passing, which is especially useful for:
- Shared configuration
- Accumulating state
- Broadcasting data
- Decoupling task dependencies
Concepts Covered:
-----------------
1. Getting channel instances from TaskExecutionContext
2. Setting values in channels
3. Getting values from channels (with defaults)
4. Checking key existence
5. Listing all channel keys
6. Channel data persistence across tasks
Expected Output:
----------------
=== Basic Channels Demo ===
Starting execution from: producer_1
📤 Producer 1: Writing data to channel
Set 'config' = {'batch_size': 100, 'timeout': 30}
Set 'counter' = 1
📤 Producer 2: Writing more data to channel
Current counter: 1
Set 'counter' = 2
Set 'status' = initialized
📥 Consumer: Reading data from channel
Available keys: ['config', 'counter', 'status']
Config: {'batch_size': 100, 'timeout': 30}
Counter: 2
Status: initialized
Missing key (with default): default_value
Execution completed after 3 steps
All tasks successfully communicated via channels! 🎉
"""
from graflow.core.context import TaskExecutionContext
from graflow.core.decorators import task
from graflow.core.workflow import workflow
def main():
"""Demonstrate basic channel operations."""
print("=== Basic Channels Demo ===\n")
with workflow("channel_demo") as ctx:
@task(inject_context=True)
def producer_1(context: TaskExecutionContext):
"""First task that writes data to the channel."""
print("📤 Producer 1: Writing data to channel")
# Get the channel instance
channel = context.get_channel()
# Set configuration data
config = {"batch_size": 100, "timeout": 30}
channel.set("config", config)
print(f" Set 'config' = {config}")
# Set a counter
channel.set("counter", 1)
print(" Set 'counter' = 1\n")
@task(inject_context=True)
def producer_2(context: TaskExecutionContext):
"""Second task that reads and updates channel data."""
print("📤 Producer 2: Writing more data to channel")
channel = context.get_channel()
# Read existing counter
counter = channel.get("counter")
print(f" Current counter: {counter}")
# Increment and update
channel.set("counter", counter + 1)
print(f" Set 'counter' = {counter + 1}")
# Add status information
channel.set("status", "initialized")
print(" Set 'status' = initialized\n")
@task(inject_context=True)
def consumer(context: TaskExecutionContext):
"""Consumer task that reads all data from the channel."""
print("📥 Consumer: Reading data from channel")
channel = context.get_channel()
# List all available keys
keys = channel.keys()
print(f" Available keys: {sorted(keys)}")
# Read configuration
config = channel.get("config")
print(f" Config: {config}")
# Read counter
counter = channel.get("counter")
print(f" Counter: {counter}")
# Read status
status = channel.get("status")
print(f" Status: {status}")
# Try to read a non-existent key with a default value
missing = channel.get("non_existent_key", default="default_value")
print(f" Missing key (with default): {missing}\n")
# Define the workflow: producer_1 -> producer_2 -> consumer
producer_1 >> producer_2 >> consumer
# Execute the workflow
ctx.execute("producer_1")
print("All tasks successfully communicated via channels! 🎉")
if __name__ == "__main__":
main()
# ============================================================================
# Key Takeaways:
# ============================================================================
#
# 1. **Getting the Channel**
# channel = context.get_channel()
# - Every task with inject_context=True can access the channel
# - The same channel instance is shared across all tasks in a workflow
#
# 2. **Setting Values**
# channel.set(key, value)
# - Store any Python object (dict, list, int, str, custom objects)
# - Keys are strings
# - Values persist for the entire workflow execution
#
# 3. **Getting Values**
# value = channel.get(key, default=None)
# - Retrieve previously stored values
# - Always provide a default for optional keys
# - Returns None (or default) if key doesn't exist
#
# 4. **Checking Key Existence**
# if "key" in channel.keys():
# value = channel.get("key")
# - Use channel.keys() to list all available keys
# - Check before getting to avoid None values
#
# 5. **Channel Lifecycle**
# - Channel data persists for the entire workflow execution
# - All tasks within the same workflow share the same channel
# - Channel is cleared when workflow completes
#
# 6. **When to Use Channels**
# ✅ Shared configuration across multiple tasks
# ✅ Accumulating state (counters, metrics, logs)
# ✅ Broadcasting data to multiple consumers
# ✅ Decoupling producer and consumer tasks
#
# 7. **When NOT to Use Channels**
# ❌ Direct task-to-task parameter passing (use function params)
# ❌ Storing large datasets (use references/paths instead)
# ❌ Required task inputs (use function parameters)
#
# ============================================================================
# Try Experimenting:
# ============================================================================
#
# 1. Store different data types:
# channel.set("list_data", [1, 2, 3])
# channel.set("dict_data", {"a": 1, "b": 2})
# channel.set("string_data", "hello")
# channel.set("number_data", 42)
#
# 2. Implement a counter pattern:
# @task(inject_context=True)
# def increment(ctx: TaskExecutionContext):
# channel = ctx.get_channel()
# count = channel.get("count", 0)
# channel.set("count", count + 1)
#
# 3. Accumulate logs:
# logs = channel.get("logs", [])
# logs.append(f"Task {ctx.task_id} executed")
# channel.set("logs", logs)
#
# 4. Share configuration:
# # Setup task
# channel.set("config", load_config())
#
# # All other tasks
# config = channel.get("config")
# use_config(config)
#
# 5. Implement feature flags:
# channel.set("debug_mode", True)
# if channel.get("debug_mode", False):
# print("Debug information...")
#
# ============================================================================
# Real-World Use Cases:
# ============================================================================
#
# **Configuration Management**:
# Setup task loads config once, all tasks read from channel
#
# **Metrics Collection**:
# Each task increments counters, final task reports all metrics
#
# **Error Tracking**:
# Tasks append errors to a list, error handler processes all errors
#
# **Progress Tracking**:
# Tasks update progress percentage, monitoring task displays progress
#
# **Resource Pooling**:
# Tasks share connection strings, each creates its own connection
#
# **State Machine**:
# Tasks read/write workflow state, enabling conditional execution
#
# ============================================================================