|
| 1 | +# Dynamic Node Scheduling |
| 2 | + |
| 3 | +Dynamic node scheduling allows you to execute workflow nodes dynamically at runtime using `ctx.run_node()`. This enables imperative workflow construction using standard Python control flow instead of static graph edges. |
| 4 | + |
| 5 | +## Introduction |
| 6 | + |
| 7 | +While static graph definitions (`Workflow(edges=[...])`) are suitable for many structured tasks, some scenarios require more flexibility. For example, you might need to: |
| 8 | +- Loop a set of nodes until a condition is met (e.g., generator-evaluator loops). |
| 9 | +- Run a variable number of tasks in parallel based on runtime input (dynamic fan-out). |
| 10 | +- Conditionally execute nodes based on complex logic that is difficult to express in static edges. |
| 11 | + |
| 12 | +`ctx.run_node()` allows a parent node to execute a child node (which can be a function, an Agent, or another Workflow) and await its result. |
| 13 | + |
| 14 | +## Get started |
| 15 | + |
| 16 | +The following example demonstrates how to dynamically execute a child agent from a parent node. |
| 17 | + |
| 18 | +```python |
| 19 | +from google.adk import Agent, Context, Event, Workflow |
| 20 | +from google.adk.workflow import node |
| 21 | + |
| 22 | +# Define a child agent |
| 23 | +generate_headline = Agent( |
| 24 | + name="generate_headline", |
| 25 | + instruction="Write a catchy headline about the topic in the user message.", |
| 26 | +) |
| 27 | + |
| 28 | + |
| 29 | +# Define the parent orchestrator node (MUST have rerun_on_resume=True) |
| 30 | +@node(rerun_on_resume=True) |
| 31 | +async def orchestrate(ctx: Context, node_input: str) -> str: |
| 32 | + # Dynamically execute the child agent and await its output |
| 33 | + headline = await ctx.run_node(generate_headline, node_input=node_input) |
| 34 | + |
| 35 | + yield Event(output=headline) |
| 36 | + |
| 37 | +# Build the workflow |
| 38 | +root_agent = Workflow( |
| 39 | + name="root_agent", |
| 40 | + edges=[("START", orchestrate)], |
| 41 | +) |
| 42 | +``` |
| 43 | + |
| 44 | + |
| 45 | +## How it works |
| 46 | + |
| 47 | +When `await ctx.run_node(node_like, ...)` is called: |
| 48 | + |
| 49 | +1. **Orchestrator Registration**: The workflow's `DynamicNodeScheduler` registers the child node execution. |
| 50 | +2. **State Tracking**: The execution state and events of the child node are tracked under the parent node's path (e.g., `parent_node@1/child_node@1`). |
| 51 | +3. **Resumption Support**: If the child node interrupts (e.g., waiting for user input), the parent node is also paused. When the workflow resumes, the parent node is re-run from the beginning (`rerun_on_resume=True`), but previous successful `ctx.run_node()` calls are replayed from history (cached outputs are returned) to avoid re-executing completed steps. |
| 52 | + |
| 53 | +### Input Mapping |
| 54 | + |
| 55 | +The `node_input` passed to `ctx.run_node(node, node_input=value)` is delivered differently depending on the type of the child node: |
| 56 | + |
| 57 | +- **Python Functions / FunctionNodes**: The `value` is passed directly to the function parameter named `node_input`. Other parameters are bound from the session state (default mode). |
| 58 | +- **Agents (Single-Turn Mode)**: The `value` is converted to a user-role message (`types.Content`) and appended to the session events history. The agent receives it as the incoming user message. |
| 59 | +- **Agents (Task Mode)**: The `value` is set as `user_content` in the `InvocationContext`, serving as the fallback first user turn for the task agent if it wasn't triggered by a tool call. |
| 60 | + |
| 61 | +## Requirements & Rules |
| 62 | + |
| 63 | +### 1. `rerun_on_resume=True` is Mandatory for Parents |
| 64 | + |
| 65 | +Any node that calls `ctx.run_node()` **must** be configured with `rerun_on_resume=True`. |
| 66 | +If the parent node does not have this setting, calling `ctx.run_node()` will raise a `ValueError` at runtime. |
| 67 | + |
| 68 | +### 2. Function Parameter Mapping (`node_input` vs. Dict Binding) |
| 69 | + |
| 70 | +By default, functions wrapped as nodes look up their arguments in the session state (state binding). However, the `node_input` argument passed to `ctx.run_node(..., node_input=value)` is passed directly to the node. |
| 71 | + |
| 72 | +How you receive this input depends on how you define your function: |
| 73 | + |
| 74 | +#### Pass-through `node_input` (Default) |
| 75 | +To receive the raw `value` directly, the function's parameter must be named exactly `node_input`. |
| 76 | + |
| 77 | +```python |
| 78 | +# Correct: receives the raw value passed to node_input |
| 79 | +def my_worker(node_input: str): |
| 80 | + return f"Done: {node_input}" |
| 81 | + |
| 82 | +# Incorrect: will fail because it tries to look up 'data' in session state |
| 83 | +def my_worker(data: str): |
| 84 | + return f"Done: {data}" |
| 85 | +``` |
| 86 | + |
| 87 | +#### Binding Dictionary Keys to Parameters (`parameter_binding='node_input'`) |
| 88 | +If you pass a dictionary to `node_input` (e.g., `node_input={'foo': 'bar'}`) and want to bind its keys to individual function parameters (e.g., `def my_worker(foo: str)`), you must configure the node with `parameter_binding='node_input'`. |
| 89 | + |
| 90 | +You can configure this using the `@node` decorator with `parameter_binding='node_input'`: |
| 91 | + |
| 92 | +```python |
| 93 | +from google.adk.workflow import node |
| 94 | + |
| 95 | +# Decorate with parameter_binding='node_input' |
| 96 | +@node(parameter_binding='node_input') |
| 97 | +def my_worker(foo: str): |
| 98 | + return f"Done: {foo}" |
| 99 | + |
| 100 | +# Call via ctx.run_node |
| 101 | +result = await ctx.run_node(my_worker, node_input={'foo': 'bar'}) # foo gets 'bar' |
| 102 | +``` |
| 103 | + |
| 104 | + |
| 105 | +### 3. Nested Dynamic Nodes |
| 106 | + |
| 107 | +If a dynamically scheduled node *itself* calls `ctx.run_node()`, it becomes a parent and must also have `rerun_on_resume=True`. |
| 108 | +You should decorate the nested function with `@node(rerun_on_resume=True)` to ensure it has this property when executed: |
| 109 | + |
| 110 | +```python |
| 111 | +from google.adk.workflow import node |
| 112 | + |
| 113 | +@node(rerun_on_resume=True) |
| 114 | +async def inner_parent(ctx: Context): |
| 115 | + # Calls another dynamic node internally |
| 116 | + result = await ctx.run_node(some_child) |
| 117 | + yield Event(output=result) |
| 118 | + |
| 119 | +# In the outer parent: |
| 120 | +await ctx.run_node(inner_parent) |
| 121 | +``` |
| 122 | + |
| 123 | + |
| 124 | +### 4. Generator Returns |
| 125 | + |
| 126 | +In nodes that use `yield` (generators), you cannot use `return value` to produce the final output of the node due to Python syntax constraints. You must yield `Event(output=value)` instead. |
| 127 | + |
| 128 | +## Method Signature |
| 129 | + |
| 130 | +```python |
| 131 | +async def run_node( |
| 132 | + self, |
| 133 | + node: NodeLike, |
| 134 | + node_input: Any = None, |
| 135 | + *, |
| 136 | + use_as_output: bool = False, |
| 137 | + run_id: str | None = None, |
| 138 | + use_sub_branch: bool = False, |
| 139 | + override_branch: str | None = None, |
| 140 | +) -> Any: |
| 141 | +``` |
| 142 | + |
| 143 | +### Parameters |
| 144 | + |
| 145 | +| Parameter | Type | Default | Description | |
| 146 | +| :--- | :--- | :--- | :--- | |
| 147 | +| `node` | `NodeLike` | *Required* | The node to execute (Function, Agent, or Workflow). | |
| 148 | +| `node_input` | `Any` | `None` | Input data to pass to the dynamic node. | |
| 149 | +| `use_as_output` | `bool` | `False` | If `True`, the child node's output is used as the calling parent node's output. The parent's own output event is suppressed. Can only be set once per parent execution. | |
| 150 | +| `run_id` | `str \| None` | `None` | Optional custom run ID. If provided, **must contain non-numeric characters** (e.g., `"run_a"`) to prevent collision with auto-generated IDs. | |
| 151 | +| `use_sub_branch` | `bool` | `False` | If `True`, executes the node in a sub-branch (appending `node_name@run_id` to the branch path). Essential for parallel runs to isolate events. | |
| 152 | +| `override_branch` | `str \| None` | `None` | Explicitly overrides the branch name for the execution context. | |
| 153 | + |
| 154 | +## Advanced Applications |
| 155 | + |
| 156 | +### Dynamic Fan-Out (Parallel Execution) |
| 157 | + |
| 158 | +You can perform dynamic fan-out by scheduling multiple tasks in parallel using `asyncio.gather`. When doing this, you **must** set `use_sub_branch=True` to isolate the events of each parallel execution. |
| 159 | + |
| 160 | +```python |
| 161 | +import asyncio |
| 162 | +from google.adk import Context, Event, Agent |
| 163 | +from google.adk.workflow import node |
| 164 | + |
| 165 | +worker = Agent(name="worker", instruction="Process {node_input}") |
| 166 | + |
| 167 | +@node(rerun_on_resume=True) |
| 168 | +async def parallel_orchestrator(ctx: Context, node_input: list[str]): |
| 169 | + tasks = [] |
| 170 | + for topic in node_input: |
| 171 | + tasks.append( |
| 172 | + ctx.run_node( |
| 173 | + worker, |
| 174 | + node_input=topic, |
| 175 | + use_sub_branch=True, # Critical for parallel isolation |
| 176 | + ) |
| 177 | + ) |
| 178 | + |
| 179 | + # Await all tasks concurrently |
| 180 | + results = await asyncio.gather(*tasks) |
| 181 | + yield Event(output=results) |
| 182 | +``` |
| 183 | + |
| 184 | +## Best Practices |
| 185 | + |
| 186 | +- **Avoid Unsupervised Tasks**: Always `await` `ctx.run_node()` directly (or via `asyncio.gather`). Do **not** wrap it in `asyncio.create_task()` without awaiting it, as errors will be swallowed, and tasks won't be cancelled if the workflow is interrupted. |
| 187 | +- **Manage Side Effects and Resumption**: Because a parent node with `rerun_on_resume=True` is executed from the beginning on resumption, any code with side effects (e.g., database writes, API calls) in the parent node will run again. |
| 188 | + - *Best Practice*: Keep the parent orchestrator node's logic as light as possible, containing mostly control flow and `ctx.run_node` calls. |
| 189 | + - *Best Practice*: Move any logic with side effects into dedicated child nodes and execute them via `ctx.run_node`. Since completed child nodes are cached and replayed, their side effects will *not* be executed again on resumption. |
| 190 | + |
| 191 | + |
| 192 | +## Limitations |
| 193 | + |
| 194 | +- **Replay Overhead**: Because the parent node is re-run from the beginning on resume, long-running parent node logic (outside of `ctx.run_node` calls) will be re-executed. Keep the orchestrator node logic light and delegate heavy lifting to child nodes. |
| 195 | + |
| 196 | +## Related samples |
| 197 | + |
| 198 | +- [Dynamic Nodes Sample](../../../../contributing/samples/workflows/dynamic_nodes/) |
| 199 | +- [Dynamic Fan-Out / Fan-In Sample](../../../../contributing/samples/workflows/dynamic_fan_out_fan_in/) |
0 commit comments