|
| 1 | +# Trigger.dev Events (v4) |
| 2 | + |
| 3 | +**Pub/sub event system for fan-out, event-driven workflows, and task coordination** |
| 4 | + |
| 5 | +## Defining Events |
| 6 | + |
| 7 | +```ts |
| 8 | +import { event } from "@trigger.dev/sdk"; |
| 9 | +import { z } from "zod"; |
| 10 | + |
| 11 | +// Event with typed schema |
| 12 | +export const orderCreated = event({ |
| 13 | + id: "order.created", |
| 14 | + schema: z.object({ |
| 15 | + orderId: z.string(), |
| 16 | + amount: z.number(), |
| 17 | + customerId: z.string(), |
| 18 | + }), |
| 19 | +}); |
| 20 | + |
| 21 | +// Event without schema (payload is `unknown`) |
| 22 | +export const systemAlert = event({ |
| 23 | + id: "system.alert", |
| 24 | + description: "Generic system alert", |
| 25 | +}); |
| 26 | + |
| 27 | +// Event with rate limiting |
| 28 | +export const userActivity = event({ |
| 29 | + id: "user.activity", |
| 30 | + schema: z.object({ userId: z.string(), action: z.string() }), |
| 31 | + rateLimit: { |
| 32 | + limit: 500, |
| 33 | + window: "1m", // "10s", "1m", "1h" |
| 34 | + }, |
| 35 | +}); |
| 36 | +``` |
| 37 | + |
| 38 | +> Events MUST be exported from your task files. The schema supports Zod, Valibot, ArkType, and any schema library compatible with `@standard-schema`. |
| 39 | +
|
| 40 | +## Subscribing Tasks to Events |
| 41 | + |
| 42 | +```ts |
| 43 | +import { task } from "@trigger.dev/sdk"; |
| 44 | +import { orderCreated } from "./events"; |
| 45 | + |
| 46 | +// Subscribe a task to an event — payload is typed from schema |
| 47 | +export const sendOrderEmail = task({ |
| 48 | + id: "send-order-email", |
| 49 | + on: orderCreated, |
| 50 | + run: async (payload) => { |
| 51 | + // payload is typed: { orderId: string, amount: number, customerId: string } |
| 52 | + await sendEmail(payload.customerId, `Order ${payload.orderId} confirmed!`); |
| 53 | + }, |
| 54 | +}); |
| 55 | + |
| 56 | +// Multiple tasks can subscribe to the same event (fan-out) |
| 57 | +export const updateInventory = task({ |
| 58 | + id: "update-inventory", |
| 59 | + on: orderCreated, |
| 60 | + run: async (payload) => { |
| 61 | + await adjustStock(payload.orderId); |
| 62 | + }, |
| 63 | +}); |
| 64 | +``` |
| 65 | + |
| 66 | +## Publishing Events |
| 67 | + |
| 68 | +```ts |
| 69 | +import { orderCreated } from "./events"; |
| 70 | + |
| 71 | +// From inside a task |
| 72 | +export const checkoutTask = task({ |
| 73 | + id: "checkout", |
| 74 | + run: async (payload: { orderId: string; amount: number; customerId: string }) => { |
| 75 | + // Process checkout... |
| 76 | + |
| 77 | + // Publish event — triggers all subscribed tasks |
| 78 | + const result = await orderCreated.publish({ |
| 79 | + orderId: payload.orderId, |
| 80 | + amount: payload.amount, |
| 81 | + customerId: payload.customerId, |
| 82 | + }); |
| 83 | + |
| 84 | + console.log(`Published ${result.id}, triggered ${result.runs.length} tasks`); |
| 85 | + }, |
| 86 | +}); |
| 87 | +``` |
| 88 | + |
| 89 | +### Publish Options |
| 90 | + |
| 91 | +```ts |
| 92 | +await orderCreated.publish(payload, { |
| 93 | + idempotencyKey: `order-${orderId}`, // Prevent duplicate publishes |
| 94 | + delay: "30s", // Delay before triggering subscribers |
| 95 | + tags: ["priority", "vip"], // Tags on generated runs |
| 96 | + metadata: { source: "checkout" }, // Metadata on generated runs |
| 97 | + orderingKey: customerId, // Sequential processing per key |
| 98 | +}); |
| 99 | +``` |
| 100 | + |
| 101 | +### Batch Publish |
| 102 | + |
| 103 | +```ts |
| 104 | +const results = await orderCreated.batchPublish([ |
| 105 | + { payload: { orderId: "1", amount: 50, customerId: "a" } }, |
| 106 | + { payload: { orderId: "2", amount: 100, customerId: "b" }, options: { tags: ["bulk"] } }, |
| 107 | +]); |
| 108 | +``` |
| 109 | + |
| 110 | +## Content-based Filtering |
| 111 | + |
| 112 | +Subscribe only to events that match a filter: |
| 113 | + |
| 114 | +```ts |
| 115 | +export const highValueHandler = task({ |
| 116 | + id: "high-value-order", |
| 117 | + on: orderCreated, |
| 118 | + filter: { |
| 119 | + amount: [{ $gte: 1000 }], |
| 120 | + }, |
| 121 | + run: async (payload) => { |
| 122 | + // Only receives orders with amount >= 1000 |
| 123 | + await notifyVipTeam(payload); |
| 124 | + }, |
| 125 | +}); |
| 126 | +``` |
| 127 | + |
| 128 | +## Wildcard Pattern Subscriptions |
| 129 | + |
| 130 | +Subscribe to multiple event types using wildcard patterns: |
| 131 | + |
| 132 | +```ts |
| 133 | +import { events, task } from "@trigger.dev/sdk"; |
| 134 | + |
| 135 | +// * matches exactly one segment |
| 136 | +export const orderHandler = task({ |
| 137 | + id: "order-handler", |
| 138 | + on: events.match("order.*"), // matches order.created, order.updated, etc. |
| 139 | + run: async (payload) => { |
| 140 | + // payload is `unknown` for pattern subscriptions |
| 141 | + }, |
| 142 | +}); |
| 143 | + |
| 144 | +// # matches zero or more segments |
| 145 | +export const allHandler = task({ |
| 146 | + id: "audit-logger", |
| 147 | + on: events.match("order.#"), // matches order, order.created, order.status.changed |
| 148 | + run: async (payload) => { |
| 149 | + await logAuditEvent(payload); |
| 150 | + }, |
| 151 | +}); |
| 152 | +``` |
| 153 | + |
| 154 | +## Publish and Wait (Fan-out / Fan-in) |
| 155 | + |
| 156 | +Publish an event and wait for all subscriber tasks to complete: |
| 157 | + |
| 158 | +```ts |
| 159 | +export const orchestrator = task({ |
| 160 | + id: "orchestrator", |
| 161 | + run: async (payload) => { |
| 162 | + const result = await orderCreated.publishAndWait({ |
| 163 | + orderId: "123", |
| 164 | + amount: 500, |
| 165 | + customerId: "abc", |
| 166 | + }); |
| 167 | + |
| 168 | + // result.results is Record<taskSlug, TaskRunExecutionResult> |
| 169 | + for (const [taskSlug, runResult] of Object.entries(result.results)) { |
| 170 | + console.log(`${taskSlug}: ${runResult.ok ? "success" : "failed"}`); |
| 171 | + } |
| 172 | + }, |
| 173 | +}); |
| 174 | +``` |
| 175 | + |
| 176 | +> `publishAndWait` can only be called from inside a `task.run()`. It blocks until all subscribers finish. |
| 177 | +
|
| 178 | +## Ordering Keys |
| 179 | + |
| 180 | +Ensure events with the same key are processed sequentially per consumer: |
| 181 | + |
| 182 | +```ts |
| 183 | +await orderCreated.publish(payload, { |
| 184 | + orderingKey: payload.customerId, // All events for same customer processed in order |
| 185 | +}); |
| 186 | +``` |
| 187 | + |
| 188 | +## Consumer Groups |
| 189 | + |
| 190 | +Within a consumer group, only one task receives each event (load balancing): |
| 191 | + |
| 192 | +```ts |
| 193 | +export const workerA = task({ |
| 194 | + id: "order-processor-a", |
| 195 | + on: orderCreated, |
| 196 | + consumerGroup: "order-processors", |
| 197 | + run: async (payload) => { /* ... */ }, |
| 198 | +}); |
| 199 | + |
| 200 | +export const workerB = task({ |
| 201 | + id: "order-processor-b", |
| 202 | + on: orderCreated, |
| 203 | + consumerGroup: "order-processors", |
| 204 | + run: async (payload) => { /* ... */ }, |
| 205 | +}); |
| 206 | + |
| 207 | +// Each published event goes to either workerA OR workerB, not both |
| 208 | +``` |
| 209 | + |
| 210 | +## Validation |
| 211 | + |
| 212 | +Pre-validate a payload before publishing: |
| 213 | + |
| 214 | +```ts |
| 215 | +try { |
| 216 | + const validated = await orderCreated.validate({ orderId: "123", amount: -1 }); |
| 217 | +} catch (error) { |
| 218 | + console.error("Invalid payload:", error); |
| 219 | +} |
| 220 | +``` |
| 221 | + |
| 222 | +## Dead Letter Queue |
| 223 | + |
| 224 | +Events that fail after all retries are captured in a DLQ. The DLQ is managed via API: |
| 225 | + |
| 226 | +- `GET /api/v1/events/dlq` — list failed events |
| 227 | +- `POST /api/v1/events/dlq/:id/retry` — retry a failed event |
| 228 | +- `POST /api/v1/events/dlq/:id/discard` — discard a failed event |
| 229 | +- `POST /api/v1/events/dlq/retry-all` — retry all pending failures |
| 230 | + |
| 231 | +## Event History & Replay |
| 232 | + |
| 233 | +Published events are persisted and can be replayed: |
| 234 | + |
| 235 | +- `GET /api/v1/events/:eventId/history` — view event history |
| 236 | +- `POST /api/v1/events/:eventId/replay` — replay events in a date range |
| 237 | + |
| 238 | +## Best Practices |
| 239 | + |
| 240 | +- **Schema everything**: Define schemas for type safety and validation at publish time |
| 241 | +- **Idempotency keys**: Use for critical events to prevent duplicate processing |
| 242 | +- **Ordering keys**: Use when event order matters per entity (e.g., per customer) |
| 243 | +- **Consumer groups**: Use when you want load balancing instead of fan-out |
| 244 | +- **Filters**: Use to reduce unnecessary task invocations |
| 245 | +- **Rate limits**: Configure per-event to protect downstream systems |
| 246 | +- **publishAndWait**: Use for orchestration patterns (saga, scatter-gather) |
| 247 | +- **DLQ**: Monitor and retry failed events, don't let them accumulate |
0 commit comments