|
| 1 | +# DAG Pipes — Part 1: CLI Guide |
| 2 | + |
| 3 | +Build and run data pipelines using `stacker pipe` commands. No code, no curl — just the CLI. |
| 4 | + |
| 5 | +> **Other guides:** |
| 6 | +> [Part 2: Visual Editor (Web UI)](./DAG_PIPES_PART2_WEB_EDITOR.md) · |
| 7 | +> [Part 3: REST API Deep Dive](./DAG_PIPES_PART3_API_DEEP_DIVE.md) |
| 8 | +
|
| 9 | +--- |
| 10 | + |
| 11 | +## What is a Pipe? |
| 12 | + |
| 13 | +A pipe connects services in your deployment. Data flows from a **source** (where data comes from) through optional **transforms** and **conditions**, to one or more **targets** (where data goes). |
| 14 | + |
| 15 | +``` |
| 16 | +[Source] → [Transform] → [Target] |
| 17 | +``` |
| 18 | + |
| 19 | +That's it. Stacker handles the wiring, execution, retries, and history. |
| 20 | + |
| 21 | +--- |
| 22 | + |
| 23 | +## Getting Started |
| 24 | + |
| 25 | +```bash |
| 26 | +# 1. Login |
| 27 | +stacker login |
| 28 | + |
| 29 | +# 2. Make sure you have a deployment running |
| 30 | +stacker status |
| 31 | +``` |
| 32 | + |
| 33 | +--- |
| 34 | + |
| 35 | +## Example 1: Contact Form → Telegram + Slack |
| 36 | + |
| 37 | +**Goal**: When someone submits a contact form, notify your team on both Telegram and Slack. |
| 38 | + |
| 39 | +### Step 1 — Scan your services |
| 40 | + |
| 41 | +```bash |
| 42 | +# See what APIs your website exposes |
| 43 | +stacker pipe scan website |
| 44 | + |
| 45 | +# See what APIs are available with sample data |
| 46 | +stacker pipe scan website --capture-samples |
| 47 | +``` |
| 48 | + |
| 49 | +Output: |
| 50 | +``` |
| 51 | +App: website |
| 52 | +Protocols detected: rest |
| 53 | +
|
| 54 | +[rest] http://website:3000 |
| 55 | + POST /api/contact -- Submit contact form |
| 56 | + fields: [name, email, message] |
| 57 | + sample: {"name":"Alice","email":"alice@example.com","message":"Hello"} |
| 58 | +``` |
| 59 | + |
| 60 | +### Step 2 — Create the pipe |
| 61 | + |
| 62 | +```bash |
| 63 | +# Interactive wizard walks you through it |
| 64 | +stacker pipe create website telegram |
| 65 | +``` |
| 66 | + |
| 67 | +The wizard will: |
| 68 | +1. Scan both apps for endpoints |
| 69 | +2. Let you pick source endpoint (POST /api/contact) |
| 70 | +3. Let you pick target endpoint (sendMessage) |
| 71 | +4. Auto-match fields (`name` → text, `email` → text) |
| 72 | +5. Ask for a pipe name |
| 73 | + |
| 74 | +Repeat for Slack: |
| 75 | +```bash |
| 76 | +stacker pipe create website slack |
| 77 | +``` |
| 78 | + |
| 79 | +### Step 3 — Activate |
| 80 | + |
| 81 | +```bash |
| 82 | +# List your pipes to get the IDs |
| 83 | +stacker pipe list |
| 84 | + |
| 85 | +# Activate both — webhook mode triggers on each form submission |
| 86 | +stacker pipe activate <telegram-pipe-id> |
| 87 | +stacker pipe activate <slack-pipe-id> |
| 88 | +``` |
| 89 | + |
| 90 | +### Step 4 — Test it |
| 91 | + |
| 92 | +```bash |
| 93 | +# Manual trigger with test data |
| 94 | +stacker pipe trigger <telegram-pipe-id> \ |
| 95 | + --data '{"name":"Alice","email":"alice@example.com","message":"Hello!"}' |
| 96 | +``` |
| 97 | + |
| 98 | +### Step 5 — Check history |
| 99 | + |
| 100 | +```bash |
| 101 | +stacker pipe history <telegram-pipe-id> |
| 102 | +``` |
| 103 | + |
| 104 | +``` |
| 105 | +EXECUTION ID TRIGGER STATUS DURATION STARTED |
| 106 | +───────────────────────────────────────────────────────────────── |
| 107 | +a1b2c3d4-e5f6-... manual success 342ms 2026-04-16T13:00:00Z |
| 108 | +
|
| 109 | +1 execution(s) shown. |
| 110 | +``` |
| 111 | + |
| 112 | +--- |
| 113 | + |
| 114 | +## Example 2: Contact Form → PostgreSQL CDC → Telegram |
| 115 | + |
| 116 | +**Goal**: Your website saves contact forms to PostgreSQL normally. The pipe watches for new rows and sends a Telegram notification — no changes to your website code needed. |
| 117 | + |
| 118 | +``` |
| 119 | +Website → writes to PostgreSQL (as usual) |
| 120 | + ↓ CDC detects new row |
| 121 | + [cdc_source] → [transform] → [target: telegram] |
| 122 | +``` |
| 123 | + |
| 124 | +### Step 1 — Scan PostgreSQL for CDC |
| 125 | + |
| 126 | +```bash |
| 127 | +stacker pipe scan postgresql --protocols cdc |
| 128 | +``` |
| 129 | + |
| 130 | +Output: |
| 131 | +``` |
| 132 | +App: postgresql |
| 133 | +Protocols detected: cdc |
| 134 | +
|
| 135 | +[cdc] postgresql://postgres:5432 |
| 136 | + TABLE public.contacts -- Contact form submissions |
| 137 | + fields: [id, name, email, message, created_at] |
| 138 | + TABLE public.users -- User accounts |
| 139 | + fields: [id, email, password_hash, created_at] |
| 140 | +``` |
| 141 | + |
| 142 | +### Step 2 — Create the pipe |
| 143 | + |
| 144 | +```bash |
| 145 | +stacker pipe create postgresql telegram |
| 146 | +# Select: TABLE public.contacts → POST sendMessage |
| 147 | +# The wizard maps: name, email, message → text field |
| 148 | +``` |
| 149 | + |
| 150 | +### Step 3 — Activate with webhook trigger |
| 151 | + |
| 152 | +```bash |
| 153 | +stacker pipe activate <pipe-id> --trigger webhook |
| 154 | +``` |
| 155 | + |
| 156 | +Now every INSERT into the `contacts` table automatically sends a Telegram message. No polling, no cron jobs. |
| 157 | + |
| 158 | +### Step 4 — Verify |
| 159 | + |
| 160 | +```bash |
| 161 | +# Insert a test row into PostgreSQL (from your app or directly) |
| 162 | +# Then check pipe history: |
| 163 | +stacker pipe history <pipe-id> |
| 164 | +``` |
| 165 | + |
| 166 | +--- |
| 167 | + |
| 168 | +## Example 3: Contact Form → Email + Slack |
| 169 | + |
| 170 | +**Goal**: Send a confirmation email to the user AND post to your team's Slack channel. |
| 171 | + |
| 172 | +### Step 1 — Create both pipes |
| 173 | + |
| 174 | +```bash |
| 175 | +# Pipe 1: website → email service |
| 176 | +stacker pipe create website email-service |
| 177 | + |
| 178 | +# Pipe 2: website → slack |
| 179 | +stacker pipe create website slack |
| 180 | +``` |
| 181 | + |
| 182 | +### Step 2 — Activate both |
| 183 | + |
| 184 | +```bash |
| 185 | +stacker pipe activate <email-pipe-id> --trigger webhook |
| 186 | +stacker pipe activate <slack-pipe-id> --trigger webhook |
| 187 | +``` |
| 188 | + |
| 189 | +### Step 3 — Test |
| 190 | + |
| 191 | +```bash |
| 192 | +# Trigger both with the same data |
| 193 | +stacker pipe trigger <email-pipe-id> \ |
| 194 | + --data '{"name":"Carol","email":"carol@example.com","message":"Demo please"}' |
| 195 | + |
| 196 | +stacker pipe trigger <slack-pipe-id> \ |
| 197 | + --data '{"name":"Carol","email":"carol@example.com","message":"Demo please"}' |
| 198 | +``` |
| 199 | + |
| 200 | +--- |
| 201 | + |
| 202 | +## Command Reference |
| 203 | + |
| 204 | +| Command | What it does | |
| 205 | +|---------|-------------| |
| 206 | +| `stacker pipe scan <app>` | Discover what APIs an app exposes | |
| 207 | +| `stacker pipe create <source> <target>` | Create a pipe (interactive wizard) | |
| 208 | +| `stacker pipe list` | Show all pipes for your deployment | |
| 209 | +| `stacker pipe activate <id>` | Start the pipe (begin listening) | |
| 210 | +| `stacker pipe deactivate <id>` | Stop the pipe | |
| 211 | +| `stacker pipe trigger <id>` | Run the pipe once manually | |
| 212 | +| `stacker pipe history <id>` | View past executions | |
| 213 | +| `stacker pipe replay <exec-id>` | Re-run a past execution | |
| 214 | + |
| 215 | +### Useful flags |
| 216 | + |
| 217 | +| Flag | Used with | What it does | |
| 218 | +|------|-----------|-------------| |
| 219 | +| `--json` | Any command | Output as JSON (for scripting) | |
| 220 | +| `--trigger webhook` | `activate` | Listen for events in real-time (default) | |
| 221 | +| `--trigger poll` | `activate` | Check for changes periodically | |
| 222 | +| `--poll-interval 60` | `activate` | Poll every N seconds | |
| 223 | +| `--trigger manual` | `activate` | Only run when you call `trigger` | |
| 224 | +| `--data '{...}'` | `trigger` | Pass custom input data | |
| 225 | +| `--capture-samples` | `scan` | Show real response examples | |
| 226 | +| `--ai` | `create` | Use AI for smart field matching | |
| 227 | +| `--no-ai` | `create` | Use deterministic matching only | |
| 228 | +| `--manual` | `create` | Skip auto-matching entirely | |
| 229 | +| `--limit 50` | `history` | Show more results | |
| 230 | + |
| 231 | +--- |
| 232 | + |
| 233 | +## Trigger Types Explained |
| 234 | + |
| 235 | +| Type | How it works | Best for | |
| 236 | +|------|-------------|----------| |
| 237 | +| **webhook** | Fires instantly when data arrives | Real-time notifications | |
| 238 | +| **poll** | Checks for new data every N seconds | Periodic syncs, batch jobs | |
| 239 | +| **manual** | Only runs when you say `pipe trigger` | Testing, one-off transfers | |
| 240 | + |
| 241 | +--- |
| 242 | + |
| 243 | +## Debugging |
| 244 | + |
| 245 | +```bash |
| 246 | +# See what went wrong |
| 247 | +stacker pipe history <id> --json | jq '.[0]' |
| 248 | + |
| 249 | +# Replay a failed execution (retries with same input) |
| 250 | +stacker pipe replay <execution-id> |
| 251 | + |
| 252 | +# Trigger with custom test data |
| 253 | +stacker pipe trigger <id> --data '{"name":"test","email":"test@test.com","message":"debug"}' |
| 254 | +``` |
| 255 | + |
| 256 | +--- |
| 257 | + |
| 258 | +## What's Next? |
| 259 | + |
| 260 | +- **[Part 2: Visual Editor](./DAG_PIPES_PART2_WEB_EDITOR.md)** — Build pipes with drag-and-drop in your browser |
| 261 | +- **[Part 3: REST API Deep Dive](./DAG_PIPES_PART3_API_DEEP_DIVE.md)** — Full API reference, curl scripts, gRPC streaming, advanced DAG features |
0 commit comments