|
| 1 | +--- |
| 2 | +name: airflow-hitl |
| 3 | +description: Use when the user needs human-in-the-loop workflows in Airflow (approval/reject, form input, or human-driven branching). Covers ApprovalOperator, HITLOperator, HITLBranchOperator, HITLEntryOperator. Requires Airflow 3.1+. Does not cover AI/LLM calls (see airflow-ai). |
| 4 | +--- |
| 5 | + |
| 6 | +# Airflow Human-in-the-Loop Operators |
| 7 | + |
| 8 | +Implement human approval gates, form inputs, and human-driven branching in Airflow DAGs using the HITL operators. These deferrable operators pause workflow execution until a human responds via the Airflow UI or REST API. |
| 9 | + |
| 10 | +## Implementation Checklist |
| 11 | + |
| 12 | +Execute steps in order. Prefer deferrable HITL operators over custom sensors/polling loops. |
| 13 | + |
| 14 | +> **CRITICAL**: Requires Airflow 3.1+. NOT available in Airflow 2.x. |
| 15 | +> |
| 16 | +> **Deferrable**: All HITL operators are deferrable—they release their worker slot while waiting for human input. |
| 17 | +> |
| 18 | +> **UI Location**: View pending actions at **Browse → Required Actions** in Airflow UI. Respond via the **task instance page's Required Actions tab** or the REST API. |
| 19 | +> |
| 20 | +> **Cross-reference**: For AI/LLM calls, see the **airflow-ai** skill. |
| 21 | +
|
| 22 | +--- |
| 23 | + |
| 24 | +## Step 1: Choose operator |
| 25 | + |
| 26 | +| Operator | Human action | Outcome | |
| 27 | +|----------|--------------|---------| |
| 28 | +| `ApprovalOperator` | Approve or Reject | Reject causes downstream tasks to be skipped (approval task itself succeeds) | |
| 29 | +| `HITLOperator` | Select option(s) + form | Returns selections | |
| 30 | +| `HITLBranchOperator` | Select downstream task(s) | Runs selected, skips others | |
| 31 | +| `HITLEntryOperator` | Submit form | Returns form data | |
| 32 | + |
| 33 | +--- |
| 34 | + |
| 35 | +## Step 2: Implement operator |
| 36 | + |
| 37 | +### ApprovalOperator |
| 38 | + |
| 39 | +```python |
| 40 | +from airflow.providers.standard.operators.hitl import ApprovalOperator |
| 41 | +from airflow.sdk import dag, task, chain, Param |
| 42 | +from pendulum import datetime |
| 43 | + |
| 44 | +@dag(start_date=datetime(2025, 1, 1), schedule="@daily") |
| 45 | +def approval_example(): |
| 46 | + @task |
| 47 | + def prepare(): |
| 48 | + return "Review quarterly report" |
| 49 | + |
| 50 | + approval = ApprovalOperator( |
| 51 | + task_id="approve_report", |
| 52 | + subject="Report Approval", |
| 53 | + body="{{ ti.xcom_pull(task_ids='prepare') }}", |
| 54 | + defaults="Approve", # Optional: auto on timeout |
| 55 | + params={"comments": Param("", type="string")}, |
| 56 | + ) |
| 57 | + |
| 58 | + @task |
| 59 | + def after_approval(result): |
| 60 | + print(f"Decision: {result['chosen_options']}") |
| 61 | + |
| 62 | + chain(prepare(), approval) |
| 63 | + after_approval(approval.output) |
| 64 | + |
| 65 | +approval_example() |
| 66 | +``` |
| 67 | + |
| 68 | +### HITLOperator |
| 69 | + |
| 70 | +> **Required parameters**: `subject` and `options`. |
| 71 | +
|
| 72 | +```python |
| 73 | +from airflow.providers.standard.operators.hitl import HITLOperator |
| 74 | +from airflow.sdk import dag, task, chain, Param |
| 75 | +from datetime import timedelta |
| 76 | +from pendulum import datetime |
| 77 | + |
| 78 | +@dag(start_date=datetime(2025, 1, 1), schedule="@daily") |
| 79 | +def hitl_example(): |
| 80 | + hitl = HITLOperator( |
| 81 | + task_id="select_option", |
| 82 | + subject="Select Payment Method", |
| 83 | + body="Choose how to process payment", |
| 84 | + options=["ACH", "Wire", "Check"], # REQUIRED |
| 85 | + defaults=["ACH"], |
| 86 | + multiple=False, |
| 87 | + execution_timeout=timedelta(hours=4), |
| 88 | + params={"amount": Param(1000, type="number")}, |
| 89 | + ) |
| 90 | + |
| 91 | + @task |
| 92 | + def process(result): |
| 93 | + print(f"Selected: {result['chosen_options']}") |
| 94 | + print(f"Amount: {result['params_input']['amount']}") |
| 95 | + |
| 96 | + process(hitl.output) |
| 97 | + |
| 98 | +hitl_example() |
| 99 | +``` |
| 100 | + |
| 101 | +### HITLBranchOperator |
| 102 | + |
| 103 | +> **IMPORTANT**: Options can either: |
| 104 | +> 1. **Directly match downstream task IDs** - simpler approach |
| 105 | +> 2. **Use `options_mapping`** - for human-friendly labels that map to task IDs |
| 106 | +
|
| 107 | +```python |
| 108 | +from airflow.providers.standard.operators.hitl import HITLBranchOperator |
| 109 | +from airflow.sdk import dag, task, chain |
| 110 | +from pendulum import datetime |
| 111 | + |
| 112 | +DEPTS = ["marketing", "engineering", "sales"] |
| 113 | + |
| 114 | +@dag(start_date=datetime(2025, 1, 1), schedule="@daily") |
| 115 | +def branch_example(): |
| 116 | + branch = HITLBranchOperator( |
| 117 | + task_id="select_dept", |
| 118 | + subject="Select Departments", |
| 119 | + options=[f"Fund {d}" for d in DEPTS], |
| 120 | + options_mapping={f"Fund {d}": d for d in DEPTS}, |
| 121 | + multiple=True, |
| 122 | + ) |
| 123 | + |
| 124 | + for dept in DEPTS: |
| 125 | + @task(task_id=dept) |
| 126 | + def handle(dept_name: str = dept): |
| 127 | + # Bind the loop variable at definition time to avoid late-binding bugs |
| 128 | + print(f"Processing {dept_name}") |
| 129 | + chain(branch, handle()) |
| 130 | + |
| 131 | +branch_example() |
| 132 | +``` |
| 133 | + |
| 134 | +### HITLEntryOperator |
| 135 | + |
| 136 | +```python |
| 137 | +from airflow.providers.standard.operators.hitl import HITLEntryOperator |
| 138 | +from airflow.sdk import dag, task, chain, Param |
| 139 | +from pendulum import datetime |
| 140 | + |
| 141 | +@dag(start_date=datetime(2025, 1, 1), schedule="@daily") |
| 142 | +def entry_example(): |
| 143 | + entry = HITLEntryOperator( |
| 144 | + task_id="get_input", |
| 145 | + subject="Enter Details", |
| 146 | + body="Provide response", |
| 147 | + params={ |
| 148 | + "response": Param("", type="string"), |
| 149 | + "priority": Param("p3", type="string"), |
| 150 | + }, |
| 151 | + ) |
| 152 | + |
| 153 | + @task |
| 154 | + def process(result): |
| 155 | + print(f"Response: {result['params_input']['response']}") |
| 156 | + |
| 157 | + process(entry.output) |
| 158 | + |
| 159 | +entry_example() |
| 160 | +``` |
| 161 | + |
| 162 | +--- |
| 163 | + |
| 164 | +## Step 3: Optional features |
| 165 | + |
| 166 | +### Notifiers |
| 167 | + |
| 168 | +```python |
| 169 | +from airflow.sdk import BaseNotifier, Context |
| 170 | +from airflow.providers.standard.operators.hitl import HITLOperator |
| 171 | + |
| 172 | +class MyNotifier(BaseNotifier): |
| 173 | + template_fields = ("message",) |
| 174 | + def __init__(self, message=""): self.message = message |
| 175 | + def notify(self, context: Context): |
| 176 | + if context["ti"].state == "running": |
| 177 | + url = HITLOperator.generate_link_to_ui_from_context(context, base_url="https://airflow.example.com") |
| 178 | + self.log.info(f"Action needed: {url}") |
| 179 | + |
| 180 | +hitl = HITLOperator(..., notifiers=[MyNotifier("{{ task.subject }}")]) |
| 181 | +``` |
| 182 | + |
| 183 | +### Restrict respondents |
| 184 | + |
| 185 | +Format depends on your auth manager: |
| 186 | + |
| 187 | +| Auth Manager | Format | Example | |
| 188 | +|--------------|--------|--------| |
| 189 | +| SimpleAuthManager | Username | `["admin", "manager"]` | |
| 190 | +| FabAuthManager | Email | `["manager@example.com"]` | |
| 191 | +| Astro | Astro ID | `["cl1a2b3cd456789ef1gh2ijkl3"]` | |
| 192 | + |
| 193 | +> **Astro Users**: Find Astro ID at **Organization → Access Management**. |
| 194 | +
|
| 195 | +```python |
| 196 | +hitl = HITLOperator(..., respondents=["manager@example.com"]) # FabAuthManager |
| 197 | +``` |
| 198 | + |
| 199 | +### Timeout behavior |
| 200 | + |
| 201 | +- **With `defaults`**: Task succeeds, default option(s) selected |
| 202 | +- **Without `defaults`**: Task fails on timeout |
| 203 | + |
| 204 | +```python |
| 205 | +hitl = HITLOperator( |
| 206 | + ..., |
| 207 | + options=["Option A", "Option B"], |
| 208 | + defaults=["Option A"], # Auto-selected on timeout |
| 209 | + execution_timeout=timedelta(hours=4), |
| 210 | +) |
| 211 | +``` |
| 212 | + |
| 213 | +### Markdown in body |
| 214 | + |
| 215 | +The `body` parameter supports **markdown formatting** and is **Jinja templatable**: |
| 216 | + |
| 217 | +```python |
| 218 | +hitl = HITLOperator( |
| 219 | + ..., |
| 220 | + body="""**Total Budget:** {{ ti.xcom_pull(task_ids='get_budget') }} |
| 221 | +
|
| 222 | +| Category | Amount | |
| 223 | +|----------|--------| |
| 224 | +| Marketing | $1M | |
| 225 | +""", |
| 226 | +) |
| 227 | +``` |
| 228 | + |
| 229 | +### Callbacks |
| 230 | + |
| 231 | +All HITL operators support standard Airflow callbacks: |
| 232 | + |
| 233 | +```python |
| 234 | +def on_hitl_failure(context): |
| 235 | + print(f"HITL task failed: {context['task_instance'].task_id}") |
| 236 | + |
| 237 | +def on_hitl_success(context): |
| 238 | + print(f"HITL task succeeded with: {context['task_instance'].xcom_pull()}") |
| 239 | + |
| 240 | +hitl = HITLOperator( |
| 241 | + task_id="approval_required", |
| 242 | + subject="Review needed", |
| 243 | + options=["Approve", "Reject"], |
| 244 | + on_failure_callback=on_hitl_failure, |
| 245 | + on_success_callback=on_hitl_success, |
| 246 | +) |
| 247 | +``` |
| 248 | + |
| 249 | +--- |
| 250 | + |
| 251 | +## Step 4: API integration |
| 252 | + |
| 253 | +For external responders (Slack, custom app): |
| 254 | + |
| 255 | +```python |
| 256 | +import requests, os |
| 257 | + |
| 258 | +HOST = os.getenv("AIRFLOW_HOST") |
| 259 | +TOKEN = os.getenv("AIRFLOW_API_TOKEN") |
| 260 | + |
| 261 | +# Get pending actions |
| 262 | +r = requests.get(f"{HOST}/api/v2/hitlDetails/?state=pending", |
| 263 | + headers={"Authorization": f"Bearer {TOKEN}"}) |
| 264 | + |
| 265 | +# Respond |
| 266 | +requests.patch( |
| 267 | + f"{HOST}/api/v2/hitlDetails/{dag_id}/{run_id}/{task_id}", |
| 268 | + headers={"Authorization": f"Bearer {TOKEN}"}, |
| 269 | + json={"chosen_options": ["ACH"], "params_input": {"amount": 1500}} |
| 270 | +) |
| 271 | +``` |
| 272 | + |
| 273 | +--- |
| 274 | + |
| 275 | +## Step 5: Safety checks |
| 276 | + |
| 277 | +Before finalizing, verify: |
| 278 | + |
| 279 | +- [ ] Airflow 3.1+ installed |
| 280 | +- [ ] For `HITLBranchOperator`: options map to downstream task IDs |
| 281 | +- [ ] `defaults` values are in `options` list |
| 282 | +- [ ] API token configured if using external responders |
| 283 | + |
| 284 | +--- |
| 285 | + |
| 286 | +## Reference |
| 287 | + |
| 288 | +- Airflow HITL Operators: https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/operators/hitl.html |
| 289 | + |
| 290 | +--- |
| 291 | + |
| 292 | +## Related Skills |
| 293 | + |
| 294 | +- **airflow-ai**: For AI/LLM task decorators and GenAI patterns |
| 295 | +- **authoring-dags**: For general DAG writing best practices |
| 296 | +- **testing-dags**: For testing DAGs with debugging cycles |
0 commit comments