|
| 1 | +# Advanced Workflow Patterns Reference |
| 2 | + |
| 3 | +Nested workflows, dynamic nodes, retry configuration, custom node types, and |
| 4 | +graph construction. |
| 5 | + |
| 6 | +## 📋 Agent Verification Checklist (Advanced Patterns) |
| 7 | + |
| 8 | +Use this checklist when implementing complex workflows: |
| 9 | + |
| 10 | +- [ ] **Validation**: Does your graph follow all 7 validation rules? (e.g., no |
| 11 | + unconditional cycles) |
| 12 | +- [ ] **Custom Nodes**: If creating a custom node, did you override |
| 13 | + `get_name()` and `run()`? |
| 14 | +- [ ] **Dynamic Execution**: If using `run_node`, did you follow the rules in |
| 15 | + the dedicated dynamic-nodes reference? |
| 16 | +- [ ] **Waiting State**: Did you use `wait_for_output=True` if the node should |
| 17 | + stay in WAITING state until output is yielded? |
| 18 | + |
| 19 | +## 💡 Quick Reference |
| 20 | + |
| 21 | +- **Retry**: `RetryConfig(max_attempts=5, initial_delay=1.0)` |
| 22 | +- **Custom Node Fields**: `rerun_on_resume`, `wait_for_output`, |
| 23 | + `retry_config`, `timeout` |
| 24 | + |
| 25 | +## Nested Workflows |
| 26 | + |
| 27 | +A `Workflow` is both an agent and a node. Use one workflow inside another: |
| 28 | + |
| 29 | +```python |
| 30 | +from google.adk.workflow import Workflow |
| 31 | + |
| 32 | +# Inner workflow |
| 33 | +inner = Workflow( |
| 34 | + name="inner_pipeline", |
| 35 | + edges=[ |
| 36 | + ('START', step_a), |
| 37 | + (step_a, step_b), |
| 38 | + ], |
| 39 | +) |
| 40 | + |
| 41 | +# Outer workflow using inner as a node |
| 42 | +outer = Workflow( |
| 43 | + name="outer_pipeline", |
| 44 | + edges=[ |
| 45 | + ('START', pre_process), |
| 46 | + (pre_process, inner), # Nested workflow |
| 47 | + (inner, post_process), |
| 48 | + ], |
| 49 | +) |
| 50 | +``` |
| 51 | + |
| 52 | +The inner workflow receives the predecessor's output as its START input and its |
| 53 | +terminal output flows to the next node in the outer workflow. |
| 54 | + |
| 55 | +## Dynamic Node Scheduling |
| 56 | + |
| 57 | +Schedule nodes at runtime using `ctx.run_node()`. |
| 58 | + |
| 59 | +See the dedicated |
| 60 | +[Dynamic Node Scheduling Reference](file:///Users/deanchen/Desktop/adk-workflow/.agents/skills/adk-workflow/references/dynamic-nodes.md) |
| 61 | +for detailed rules, examples, and best practices. |
| 62 | + |
| 63 | +## Retry Configuration |
| 64 | + |
| 65 | +Configure automatic retry for nodes that may fail: |
| 66 | + |
| 67 | +```python |
| 68 | +from google.adk.workflow import RetryConfig |
| 69 | +from google.adk.workflow import FunctionNode |
| 70 | + |
| 71 | +retry = RetryConfig( |
| 72 | + max_attempts=5, # Max attempts (default: 5). 0 or 1 = no retry |
| 73 | + initial_delay=1.0, # Seconds before first retry (default: 1.0) |
| 74 | + max_delay=60.0, # Max seconds between retries (default: 60.0) |
| 75 | + backoff_factor=2.0, # Delay multiplier per attempt (default: 2.0) |
| 76 | + jitter=1.0, # Randomness factor (default: 1.0, 0.0 = none) |
| 77 | + exceptions=None, # Exception types to retry (None = all) |
| 78 | +) |
| 79 | + |
| 80 | +node = FunctionNode( |
| 81 | + flaky_api_call, |
| 82 | + name="api_call", |
| 83 | + retry_config=retry, |
| 84 | +) |
| 85 | +``` |
| 86 | + |
| 87 | +### Retry delay formula |
| 88 | + |
| 89 | +``` |
| 90 | +delay = initial_delay * (backoff_factor ^ attempt) |
| 91 | +delay = min(delay, max_delay) |
| 92 | +delay = delay * (1 + random(0, jitter)) |
| 93 | +``` |
| 94 | + |
| 95 | +### Accessing retry count |
| 96 | + |
| 97 | +```python |
| 98 | +def my_node(ctx: Context, node_input: str) -> str: |
| 99 | + if ctx.retry_count > 0: |
| 100 | + print(f"Retry attempt {ctx.retry_count}") |
| 101 | + return "result" |
| 102 | +``` |
| 103 | + |
| 104 | +## Custom Node Types |
| 105 | + |
| 106 | +Subclass `BaseNode` for custom behavior: |
| 107 | + |
| 108 | +```python |
| 109 | +from google.adk.workflow import BaseNode |
| 110 | +from google.adk.events.event import Event |
| 111 | +from google.adk.agents.context import Context |
| 112 | +from pydantic import ConfigDict, Field |
| 113 | +from typing import Any, AsyncGenerator |
| 114 | +from typing_extensions import override |
| 115 | + |
| 116 | +class BatchProcessorNode(BaseNode): |
| 117 | + """Processes items in batches.""" |
| 118 | + model_config = ConfigDict(arbitrary_types_allowed=True) |
| 119 | + |
| 120 | + name: str = Field(default="batch_processor") |
| 121 | + batch_size: int = Field(default=10) |
| 122 | + |
| 123 | + def __init__(self, *, name: str = "batch_processor", batch_size: int = 10): |
| 124 | + super().__init__() |
| 125 | + object.__setattr__(self, 'name', name) |
| 126 | + object.__setattr__(self, 'batch_size', batch_size) |
| 127 | + |
| 128 | + @override |
| 129 | + def get_name(self) -> str: |
| 130 | + return self.name |
| 131 | + |
| 132 | + @override |
| 133 | + async def run( |
| 134 | + self, |
| 135 | + *, |
| 136 | + ctx: Context, |
| 137 | + node_input: Any, |
| 138 | + ) -> AsyncGenerator[Any, None]: |
| 139 | + items = node_input if isinstance(node_input, list) else [node_input] |
| 140 | + results = [] |
| 141 | + for i in range(0, len(items), self.batch_size): |
| 142 | + batch = items[i:i + self.batch_size] |
| 143 | + batch_result = await process_batch(batch) |
| 144 | + results.extend(batch_result) |
| 145 | + yield Event(output=results) |
| 146 | +``` |
| 147 | + |
| 148 | +### BaseNode Fields |
| 149 | + |
| 150 | +| Field | Default | Description | |
| 151 | +| ----------------- | ------- | ------------------------------------------- | |
| 152 | +| `rerun_on_resume` | `False` | Whether to rerun after HITL interrupt | |
| 153 | +| `wait_for_output` | `False` | Node stays in WAITING state until it yields | |
| 154 | +: : : output (see below) : |
| 155 | +| `retry_config` | `None` | Retry configuration on failure | |
| 156 | +| `timeout` | `None` | Max seconds for node to complete | |
| 157 | + |
| 158 | +### wait_for_output |
| 159 | + |
| 160 | +When `wait_for_output=True`, a node that finishes without yielding an `Event` |
| 161 | +with output moves to **WAITING** state instead of COMPLETED. Downstream nodes |
| 162 | +are **not** triggered. The node can then be re-triggered by upstream |
| 163 | +predecessors. |
| 164 | + |
| 165 | +This is how `JoinNode` works internally — it runs once per predecessor, storing |
| 166 | +partial inputs, and only yields output (triggering downstream) when all |
| 167 | +predecessors have completed. `LlmAgentWrapper` in `task` mode also sets |
| 168 | +`wait_for_output=True` automatically. |
| 169 | + |
| 170 | +```python |
| 171 | +from google.adk.workflow import BaseNode |
| 172 | + |
| 173 | +class CollectorNode(BaseNode): |
| 174 | + wait_for_output: bool = True # Stay in WAITING until output is yielded |
| 175 | + |
| 176 | + async def run(self, *, ctx, node_input): |
| 177 | + # Store partial input, don't yield output yet |
| 178 | + collected = ctx.state.get("collected", []) |
| 179 | + collected.append(node_input) |
| 180 | + yield Event(state={"collected": collected}) |
| 181 | + |
| 182 | + # Only yield output when we have enough |
| 183 | + if len(collected) >= 3: |
| 184 | + yield Event(output=collected) |
| 185 | + # Now node transitions to COMPLETED and triggers downstream |
| 186 | +``` |
| 187 | + |
| 188 | +Nodes with `wait_for_output=True` default: |
| 189 | + |
| 190 | +- `JoinNode`: `True` (waits for all predecessors) |
| 191 | +- `LlmAgentWrapper` (task mode): `True` (set in `model_post_init`) |
| 192 | +- All other nodes: `False` |
| 193 | + |
| 194 | +### Required Methods |
| 195 | + |
| 196 | +Method | Description |
| 197 | +------------------------------------------- | ------------------------------ |
| 198 | +`get_name() -> str` | Return the node name |
| 199 | +`run(*, ctx, node_input) -> AsyncGenerator` | Execute the node, yield events |
| 200 | + |
| 201 | +## ToolNode |
| 202 | + |
| 203 | +Wrap an ADK tool as a workflow node: |
| 204 | + |
| 205 | +```python |
| 206 | +from google.adk.workflow._tool_node import _ToolNode as ToolNode |
| 207 | +from google.adk.tools.function_tool import FunctionTool |
| 208 | + |
| 209 | +def search(query: str) -> str: |
| 210 | + """Search for information.""" |
| 211 | + return f"Results for: {query}" |
| 212 | + |
| 213 | +tool = FunctionTool(search) |
| 214 | +tool_node = ToolNode(tool, name="search_node") |
| 215 | + |
| 216 | +agent = Workflow( |
| 217 | + name="with_tool", |
| 218 | + edges=[ |
| 219 | + ('START', prepare_query), |
| 220 | + (prepare_query, tool_node), # Input must be dict (tool args) or None |
| 221 | + (tool_node, process_results), |
| 222 | + ], |
| 223 | +) |
| 224 | +``` |
| 225 | + |
| 226 | +**Important**: ToolNode input must be a dictionary of tool arguments or None. |
| 227 | + |
| 228 | +## AgentNode |
| 229 | + |
| 230 | +Wrap any `BaseAgent` (not just LlmAgent) as a workflow node: |
| 231 | + |
| 232 | +```python |
| 233 | +from google.adk.workflow._agent_node import AgentNode |
| 234 | +from google.adk.agents.loop_agent import LoopAgent |
| 235 | + |
| 236 | +loop = LoopAgent( |
| 237 | + name="refine_loop", |
| 238 | + sub_agents=[writer, reviewer], |
| 239 | + max_iterations=3, |
| 240 | +) |
| 241 | + |
| 242 | +loop_node = AgentNode(agent=loop, name="refinement") |
| 243 | + |
| 244 | +agent = Workflow( |
| 245 | + name="with_loop", |
| 246 | + edges=[ |
| 247 | + ('START', loop_node), |
| 248 | + (loop_node, final_step), |
| 249 | + ], |
| 250 | +) |
| 251 | +``` |
| 252 | + |
| 253 | +## Graph Validation Rules |
| 254 | + |
| 255 | +The workflow graph is validated on construction. These rules are enforced: |
| 256 | + |
| 257 | +1. START node must exist |
| 258 | +2. START node must not have incoming edges |
| 259 | +3. All non-START nodes must be reachable (appear as `to_node` in some edge) |
| 260 | +4. No duplicate node names |
| 261 | +5. No duplicate edges |
| 262 | +6. At most one `__DEFAULT__` route per node |
| 263 | +7. No unconditional cycles (cycles must have at least one routed edge) |
| 264 | + |
| 265 | +## Edge Construction Patterns |
| 266 | + |
| 267 | +```python |
| 268 | +from google.adk.workflow import Edge |
| 269 | +from google.adk.workflow._workflow_graph import WorkflowGraph |
| 270 | + |
| 271 | +# Tuple syntax (most common) |
| 272 | +edges = [ |
| 273 | + ('START', node_a), # Simple edge |
| 274 | + (node_a, node_b, "route"), # Routed edge |
| 275 | + (node_a, (node_b, node_c)), # Fan-out |
| 276 | + ((node_b, node_c), join_node), # Fan-in |
| 277 | +] |
| 278 | + |
| 279 | +# Sequence shorthand (tuple with 3+ elements creates chain) |
| 280 | +edges = [('START', node_a, node_b, node_c)] |
| 281 | +# Equivalent to: [('START', node_a), (node_a, node_b), (node_b, node_c)] |
| 282 | + |
| 283 | +# Routing map (dict syntax) |
| 284 | +edges = [ |
| 285 | + (classifier, {"success": handler_a, "error": handler_b}), |
| 286 | +] |
| 287 | + |
| 288 | +# Edge objects (explicit) |
| 289 | +edges = [ |
| 290 | + Edge(START, node_a), |
| 291 | + Edge(node_a, node_b, route="success"), |
| 292 | +] |
| 293 | + |
| 294 | +# Edge.chain helper |
| 295 | +edges = Edge.chain('START', node_a, node_b, node_c) |
| 296 | +# Returns: [(START, node_a), (node_a, node_b), (node_b, node_c)] |
| 297 | + |
| 298 | +# WorkflowGraph.from_edge_items |
| 299 | +graph = WorkflowGraph.from_edge_items([ |
| 300 | + ('START', node_a), |
| 301 | + (node_a, node_b), |
| 302 | +]) |
| 303 | +agent = Workflow(name="my_workflow", graph=graph) |
| 304 | +``` |
| 305 | + |
| 306 | +## Source File Locations |
| 307 | + |
| 308 | +Component | File |
| 309 | +------------------- | ----------------------------------------------- |
| 310 | +Workflow | `src/google/adk/workflow/_workflow.py` |
| 311 | +WorkflowGraph, Edge | `src/google/adk/workflow/_workflow_graph.py` |
| 312 | +Context | `src/google/adk/agents/context.py` |
| 313 | +FunctionNode | `src/google/adk/workflow/_function_node.py` |
| 314 | +_LlmAgentWrapper | `src/google/adk/workflow/_llm_agent_wrapper.py` |
| 315 | +AgentNode | `src/google/adk/workflow/_agent_node.py` |
| 316 | +_ToolNode | `src/google/adk/workflow/_tool_node.py` |
| 317 | +JoinNode | `src/google/adk/workflow/_join_node.py` |
| 318 | +ParallelWorker | `src/google/adk/workflow/_parallel_worker.py` |
| 319 | +BaseNode, START | `src/google/adk/workflow/_base_node.py` |
| 320 | +@node decorator | `src/google/adk/workflow/_node.py` |
| 321 | +RetryConfig | `src/google/adk/workflow/_retry_config.py` |
| 322 | +Event | `src/google/adk/events/event.py` |
| 323 | +RequestInput | `src/google/adk/events/request_input.py` |
0 commit comments