@AGENTS.md
MUST use @trigger.dev/sdk, NEVER client.defineJob
import { task } from "@trigger.dev/sdk";
export const processData = task({
id: "process-data",
retry: {
maxAttempts: 10,
factor: 1.8,
minTimeoutInMs: 500,
maxTimeoutInMs: 30_000,
randomize: false,
},
run: async (payload: { userId: string; data: any[] }) => {
// Task logic - runs for long time, no timeouts
console.log(`Processing ${payload.data.length} items for user ${payload.userId}`);
return { processed: payload.data.length };
},
});import { schemaTask } from "@trigger.dev/sdk";
import { z } from "zod";
export const validatedTask = schemaTask({
id: "validated-task",
schema: z.object({
name: z.string(),
age: z.number(),
email: z.string().email(),
}),
run: async (payload) => {
// Payload is automatically validated and typed
return { message: `Hello ${payload.name}, age ${payload.age}` };
},
});import { tasks } from "@trigger.dev/sdk";
import type { processData } from "./trigger/tasks";
// Single trigger
const handle = await tasks.trigger<typeof processData>("process-data", {
userId: "123",
data: [{ id: 1 }, { id: 2 }],
});
// Batch trigger (up to 1,000 items, 3MB per payload)
const batchHandle = await tasks.batchTrigger<typeof processData>("process-data", [
{ payload: { userId: "123", data: [{ id: 1 }] } },
{ payload: { userId: "456", data: [{ id: 2 }] } },
]);Consolidate multiple triggers into a single execution:
// Multiple rapid triggers with same key = single execution
await myTask.trigger(
{ userId: "123" },
{
debounce: {
key: "user-123-update", // Unique key for debounce group
delay: "5s", // Wait before executing
},
}
);
// Trailing mode: use payload from LAST trigger
await myTask.trigger(
{ data: "latest-value" },
{
debounce: {
key: "trailing-example",
delay: "10s",
mode: "trailing", // Default is "leading" (first payload)
},
}
);Debounce modes:
leading(default): Uses payload from first trigger, subsequent triggers only rescheduletrailing: Uses payload from most recent trigger
export const parentTask = task({
id: "parent-task",
run: async (payload) => {
// Trigger and continue
const handle = await childTask.trigger({ data: "value" });
// Trigger and wait - returns Result object, NOT task output
const result = await childTask.triggerAndWait({ data: "value" });
if (result.ok) {
console.log("Task output:", result.output); // Actual task return value
} else {
console.error("Task failed:", result.error);
}
// Quick unwrap (throws on error)
const output = await childTask.triggerAndWait({ data: "value" }).unwrap();
// Batch trigger and wait
const results = await childTask.batchTriggerAndWait([
{ payload: { data: "item1" } },
{ payload: { data: "item2" } },
]);
for (const run of results) {
if (run.ok) {
console.log("Success:", run.output);
} else {
console.log("Failed:", run.error);
}
}
},
});
export const childTask = task({
id: "child-task",
run: async (payload: { data: string }) => {
return { processed: payload.data };
},
});Never wrap triggerAndWait or batchTriggerAndWait calls in a Promise.all or Promise.allSettled as this is not supported in Trigger.dev tasks.
import { task, wait } from "@trigger.dev/sdk";
export const taskWithWaits = task({
id: "task-with-waits",
run: async (payload) => {
console.log("Starting task");
// Wait for specific duration
await wait.for({ seconds: 30 });
await wait.for({ minutes: 5 });
await wait.for({ hours: 1 });
await wait.for({ days: 1 });
// Wait until specific date
await wait.until({ date: new Date("2024-12-25") });
// Wait for token (from external system)
await wait.forToken({
token: "user-approval-token",
timeoutInSeconds: 3600, // 1 hour timeout
});
console.log("All waits completed");
return { status: "completed" };
},
});Never wrap wait calls in a Promise.all or Promise.allSettled as this is not supported in Trigger.dev tasks.
- Result vs Output:
triggerAndWait()returns aResultobject withok,output,errorproperties - NOT the direct task output - Type safety: Use
import typefor task references when triggering from backend - Waits > 5 seconds: Automatically checkpointed, don't count toward compute usage
- Debounce + idempotency: Idempotency keys take precedence over debounce settings
// BREAKS APPLICATION
client.defineJob({
id: "job-id",
run: async (payload, io) => {
/* ... */
},
});Use SDK (@trigger.dev/sdk), check result.ok before accessing result.output
Advanced patterns and features for writing tasks
import { task, tags } from "@trigger.dev/sdk";
export const processUser = task({
id: "process-user",
run: async (payload: { userId: string; orgId: string }, { ctx }) => {
// Add tags during execution
await tags.add(`user_${payload.userId}`);
await tags.add(`org_${payload.orgId}`);
return { processed: true };
},
});
// Trigger with tags
await processUser.trigger(
{ userId: "123", orgId: "abc" },
{ tags: ["priority", "user_123", "org_abc"] } // Max 10 tags per run
);
// Subscribe to tagged runs
for await (const run of runs.subscribeToRunsWithTag("user_123")) {
console.log(`User task ${run.id}: ${run.status}`);
}Tag Best Practices:
- Use prefixes:
user_123,org_abc,video:456 - Max 10 tags per run, 1-64 characters each
- Tags don't propagate to child tasks automatically
Enhanced batch triggering with larger payloads and streaming ingestion.
- Maximum batch size: 1,000 items (increased from 500)
- Payload per item: 3MB each (increased from 1MB combined)
- Payloads > 512KB automatically offload to object storage
| Tier | Bucket Size | Refill Rate |
|---|---|---|
| Free | 1,200 runs | 100 runs/10 sec |
| Hobby | 5,000 runs | 500 runs/5 sec |
| Pro | 5,000 runs | 500 runs/5 sec |
| Tier | Concurrent Batches |
|---|---|
| Free | 1 |
| Hobby | 10 |
| Pro | 10 |
import { myTask } from "./trigger/myTask";
// Basic batch trigger (up to 1,000 items)
const runs = await myTask.batchTrigger([
{ payload: { userId: "user-1" } },
{ payload: { userId: "user-2" } },
{ payload: { userId: "user-3" } },
]);
// Batch trigger with wait
const results = await myTask.batchTriggerAndWait([
{ payload: { userId: "user-1" } },
{ payload: { userId: "user-2" } },
]);
for (const result of results) {
if (result.ok) {
console.log("Result:", result.output);
}
}
// With per-item options
const batchHandle = await myTask.batchTrigger([
{
payload: { userId: "123" },
options: {
idempotencyKey: "user-123-batch",
tags: ["priority"],
},
},
{
payload: { userId: "456" },
options: {
idempotencyKey: "user-456-batch",
},
},
]);Consolidate multiple triggers into a single execution by debouncing task runs with a unique key and delay window.
- User activity updates: Batch rapid user actions into a single run
- Webhook deduplication: Handle webhook bursts without redundant processing
- Search indexing: Combine document updates instead of processing individually
- Notification batching: Group notifications to prevent user spam
await myTask.trigger(
{ userId: "123" },
{
debounce: {
key: "user-123-update", // Unique identifier for debounce group
delay: "5s", // Wait duration ("5s", "1m", or milliseconds)
},
}
);Leading Mode (default): Uses payload/options from the first trigger; subsequent triggers only reschedule execution time.
// First trigger sets the payload
await myTask.trigger({ action: "first" }, {
debounce: { key: "my-key", delay: "10s" }
});
// Second trigger only reschedules - payload remains "first"
await myTask.trigger({ action: "second" }, {
debounce: { key: "my-key", delay: "10s" }
});
// Task executes with { action: "first" }Trailing Mode: Uses payload/options from the most recent trigger.
await myTask.trigger(
{ data: "latest-value" },
{
debounce: {
key: "trailing-example",
delay: "10s",
mode: "trailing",
},
}
);In trailing mode, these options update with each trigger:
payload— task input datametadata— run metadatatags— run tags (replaces existing)maxAttempts— retry attemptsmaxDuration— maximum compute timemachine— machine preset
- Idempotency keys take precedence over debounce settings
- Compatible with
triggerAndWait()— parent runs block correctly on debounced execution - Debounce key is scoped to the task
import { task, queue } from "@trigger.dev/sdk";
// Shared queue for related tasks
const emailQueue = queue({
name: "email-processing",
concurrencyLimit: 5, // Max 5 emails processing simultaneously
});
// Task-level concurrency
export const oneAtATime = task({
id: "sequential-task",
queue: { concurrencyLimit: 1 }, // Process one at a time
run: async (payload) => {
// Critical section - only one instance runs
},
});
// Per-user concurrency
export const processUserData = task({
id: "process-user-data",
run: async (payload: { userId: string }) => {
// Override queue with user-specific concurrency
await childTask.trigger(payload, {
queue: {
name: `user-${payload.userId}`,
concurrencyLimit: 2,
},
});
},
});
export const emailTask = task({
id: "send-email",
queue: emailQueue, // Use shared queue
run: async (payload: { to: string }) => {
// Send email logic
},
});import { task, retry, AbortTaskRunError } from "@trigger.dev/sdk";
export const resilientTask = task({
id: "resilient-task",
retry: {
maxAttempts: 10,
factor: 1.8, // Exponential backoff multiplier
minTimeoutInMs: 500,
maxTimeoutInMs: 30_000,
randomize: false,
},
catchError: async ({ error, ctx }) => {
// Custom error handling
if (error.code === "FATAL_ERROR") {
throw new AbortTaskRunError("Cannot retry this error");
}
// Log error details
console.error(`Task ${ctx.task.id} failed:`, error);
// Allow retry by returning nothing
return { retryAt: new Date(Date.now() + 60000) }; // Retry in 1 minute
},
run: async (payload) => {
// Retry specific operations
const result = await retry.onThrow(
async () => {
return await unstableApiCall(payload);
},
{ maxAttempts: 3 }
);
// Conditional HTTP retries
const response = await retry.fetch("https://api.example.com", {
retry: {
maxAttempts: 5,
condition: (response, error) => {
return response?.status === 429 || response?.status >= 500;
},
},
});
return result;
},
});export const heavyTask = task({
id: "heavy-computation",
machine: { preset: "large-2x" }, // 8 vCPU, 16 GB RAM
maxDuration: 1800, // 30 minutes timeout
run: async (payload, { ctx }) => {
// Resource-intensive computation
if (ctx.machine.preset === "large-2x") {
// Use all available cores
return await parallelProcessing(payload);
}
return await standardProcessing(payload);
},
});
// Override machine when triggering
await heavyTask.trigger(payload, {
machine: { preset: "medium-1x" }, // Override for this run
});Machine Presets:
micro: 0.25 vCPU, 0.25 GB RAMsmall-1x: 0.5 vCPU, 0.5 GB RAM (default)small-2x: 1 vCPU, 1 GB RAMmedium-1x: 1 vCPU, 2 GB RAMmedium-2x: 2 vCPU, 4 GB RAMlarge-1x: 4 vCPU, 8 GB RAMlarge-2x: 8 vCPU, 16 GB RAM
import { task, idempotencyKeys } from "@trigger.dev/sdk";
export const paymentTask = task({
id: "process-payment",
retry: {
maxAttempts: 3,
},
run: async (payload: { orderId: string; amount: number }) => {
// Automatically scoped to this task run, so if the task is retried, the idempotency key will be the same
const idempotencyKey = await idempotencyKeys.create(`payment-${payload.orderId}`);
// Ensure payment is processed only once
await chargeCustomer.trigger(payload, {
idempotencyKey,
idempotencyKeyTTL: "24h", // Key expires in 24 hours
});
},
});
// Payload-based idempotency
import { createHash } from "node:crypto";
function createPayloadHash(payload: any): string {
const hash = createHash("sha256");
hash.update(JSON.stringify(payload));
return hash.digest("hex");
}
export const deduplicatedTask = task({
id: "deduplicated-task",
run: async (payload) => {
const payloadHash = createPayloadHash(payload);
const idempotencyKey = await idempotencyKeys.create(payloadHash);
await processData.trigger(payload, { idempotencyKey });
},
});import { task, metadata } from "@trigger.dev/sdk";
export const batchProcessor = task({
id: "batch-processor",
run: async (payload: { items: any[] }, { ctx }) => {
const totalItems = payload.items.length;
// Initialize progress metadata
metadata
.set("progress", 0)
.set("totalItems", totalItems)
.set("processedItems", 0)
.set("status", "starting");
const results = [];
for (let i = 0; i < payload.items.length; i++) {
const item = payload.items[i];
// Process item
const result = await processItem(item);
results.push(result);
// Update progress
const progress = ((i + 1) / totalItems) * 100;
metadata
.set("progress", progress)
.increment("processedItems", 1)
.append("logs", `Processed item ${i + 1}/${totalItems}`)
.set("currentItem", item.id);
}
// Final status
metadata.set("status", "completed");
return { results, totalProcessed: results.length };
},
});
// Update parent metadata from child task
export const childTask = task({
id: "child-task",
run: async (payload, { ctx }) => {
// Update parent task metadata
metadata.parent.set("childStatus", "processing");
metadata.root.increment("childrenCompleted", 1);
return { processed: true };
},
});import { task, logger } from "@trigger.dev/sdk";
export const tracedTask = task({
id: "traced-task",
run: async (payload, { ctx }) => {
logger.info("Task started", { userId: payload.userId });
// Custom trace with attributes
const user = await logger.trace(
"fetch-user",
async (span) => {
span.setAttribute("user.id", payload.userId);
span.setAttribute("operation", "database-fetch");
const userData = await database.findUser(payload.userId);
span.setAttribute("user.found", !!userData);
return userData;
},
{ userId: payload.userId }
);
logger.debug("User fetched", { user: user.id });
try {
const result = await processUser(user);
logger.info("Processing completed", { result });
return result;
} catch (error) {
logger.error("Processing failed", {
error: error.message,
userId: payload.userId,
});
throw error;
}
},
});Hidden Tasks
// Hidden task - not exported, only used internally
const internalProcessor = task({
id: "internal-processor",
run: async (payload: { data: string }) => {
return { processed: payload.data.toUpperCase() };
},
});
// Public task that uses hidden task
export const publicWorkflow = task({
id: "public-workflow",
run: async (payload: { input: string }) => {
// Use hidden task internally
const result = await internalProcessor.triggerAndWait({
data: payload.input,
});
if (result.ok) {
return { output: result.output.processed };
}
throw new Error("Internal processing failed");
},
});- Concurrency: Use queues to prevent overwhelming external services
- Retries: Configure exponential backoff for transient failures
- Idempotency: Always use for payment/critical operations
- Metadata: Track progress for long-running tasks
- Machines: Match machine size to computational requirements
- Tags: Use consistent naming patterns for filtering
- Debouncing: Use for user activity, webhooks, and notification batching
- Batch triggering: Use for bulk operations up to 1,000 items
- Error Handling: Distinguish between retryable and fatal errors
Design tasks to be stateless, idempotent, and resilient to failures. Use metadata for state tracking and queues for resource management.
Complete guide to configuring trigger.config.ts with build extensions
import { defineConfig } from "@trigger.dev/sdk";
export default defineConfig({
project: "<project-ref>", // Required: Your project reference
dirs: ["./trigger"], // Task directories
runtime: "node", // "node", "node-22", or "bun"
logLevel: "info", // "debug", "info", "warn", "error"
// Default retry settings
retries: {
enabledInDev: false,
default: {
maxAttempts: 3,
minTimeoutInMs: 1000,
maxTimeoutInMs: 10000,
factor: 2,
randomize: true,
},
},
// Build configuration
build: {
autoDetectExternal: true,
keepNames: true,
minify: false,
extensions: [], // Build extensions go here
},
// Global lifecycle hooks
onStartAttempt: async ({ payload, ctx }) => {
console.log("Global task start");
},
onSuccess: async ({ payload, output, ctx }) => {
console.log("Global task success");
},
onFailure: async ({ payload, error, ctx }) => {
console.log("Global task failure");
},
});import { prismaExtension } from "@trigger.dev/build/extensions/prisma";
extensions: [
prismaExtension({
schema: "prisma/schema.prisma",
version: "5.19.0", // Optional: specify version
migrate: true, // Run migrations during build
directUrlEnvVarName: "DIRECT_DATABASE_URL",
typedSql: true, // Enable TypedSQL support
}),
];import { emitDecoratorMetadata } from "@trigger.dev/build/extensions/typescript";
extensions: [
emitDecoratorMetadata(), // Enables decorator metadata
];import { pythonExtension } from "@trigger.dev/build/extensions/python";
extensions: [
pythonExtension({
scripts: ["./python/**/*.py"], // Copy Python files
requirementsFile: "./requirements.txt", // Install packages
devPythonBinaryPath: ".venv/bin/python", // Dev mode binary
}),
];
// Usage in tasks
const result = await python.runInline(`print("Hello, world!")`);
const output = await python.runScript("./python/script.py", ["arg1"]);import { playwright } from "@trigger.dev/build/extensions/playwright";
extensions: [
playwright({
browsers: ["chromium", "firefox", "webkit"], // Default: ["chromium"]
headless: true, // Default: true
}),
];import { puppeteer } from "@trigger.dev/build/extensions/puppeteer";
extensions: [puppeteer()];
// Environment variable needed:
// PUPPETEER_EXECUTABLE_PATH: "/usr/bin/google-chrome-stable"import { lightpanda } from "@trigger.dev/build/extensions/lightpanda";
extensions: [
lightpanda({
version: "latest", // or "nightly"
disableTelemetry: false,
}),
];import { ffmpeg } from "@trigger.dev/build/extensions/core";
extensions: [
ffmpeg({ version: "7" }), // Static build, or omit for Debian version
];
// Automatically sets FFMPEG_PATH and FFPROBE_PATH
// Add fluent-ffmpeg to external packages if usingimport { audioWaveform } from "@trigger.dev/build/extensions/audioWaveform";
extensions: [
audioWaveform(), // Installs Audio Waveform 1.1.0
];import { aptGet } from "@trigger.dev/build/extensions/core";
extensions: [
aptGet({
packages: ["ffmpeg", "imagemagick", "curl=7.68.0-1"], // Can specify versions
}),
];Only use this for installing CLI tools, NOT packages you import in your code.
import { additionalPackages } from "@trigger.dev/build/extensions/core";
extensions: [
additionalPackages({
packages: ["wrangler"], // CLI tools and specific versions
}),
];import { additionalFiles } from "@trigger.dev/build/extensions/core";
extensions: [
additionalFiles({
files: ["wrangler.toml", "./assets/**", "./fonts/**"], // Glob patterns supported
}),
];import { syncEnvVars } from "@trigger.dev/build/extensions/core";
extensions: [
syncEnvVars(async (ctx) => {
// ctx contains: environment, projectRef, env
return [
{ name: "SECRET_KEY", value: await getSecret(ctx.environment) },
{ name: "API_URL", value: ctx.environment === "prod" ? "api.prod.com" : "api.dev.com" },
];
}),
];import { esbuildPlugin } from "@trigger.dev/build/extensions";
import { sentryEsbuildPlugin } from "@sentry/esbuild-plugin";
extensions: [
esbuildPlugin(
sentryEsbuildPlugin({
org: process.env.SENTRY_ORG,
project: process.env.SENTRY_PROJECT,
authToken: process.env.SENTRY_AUTH_TOKEN,
}),
{ placement: "last", target: "deploy" } // Optional config
),
];import { defineConfig } from "@trigger.dev/sdk";
const customExtension = {
name: "my-custom-extension",
externalsForTarget: (target) => {
return ["some-native-module"]; // Add external dependencies
},
onBuildStart: async (context) => {
console.log(`Build starting for ${context.target}`);
// Register esbuild plugins, modify build context
},
onBuildComplete: async (context, manifest) => {
console.log("Build complete, adding layers");
// Add build layers, modify deployment
context.addLayer({
id: "my-layer",
files: [{ source: "./custom-file", destination: "/app/custom" }],
commands: ["chmod +x /app/custom"],
});
},
};
export default defineConfig({
project: "my-project",
build: {
extensions: [customExtension],
},
});import { PrismaInstrumentation } from "@prisma/instrumentation";
import { OpenAIInstrumentation } from "@langfuse/openai";
export default defineConfig({
// ... other config
telemetry: {
instrumentations: [new PrismaInstrumentation(), new OpenAIInstrumentation()],
exporters: [customExporter], // Optional custom exporters
},
});export default defineConfig({
// ... other config
defaultMachine: "large-1x", // Default machine for all tasks
maxDuration: 300, // Default max duration (seconds)
enableConsoleLogging: true, // Console logging in development
});extensions: [
prismaExtension({ schema: "prisma/schema.prisma", migrate: true }),
additionalFiles({ files: ["./public/**", "./assets/**"] }),
syncEnvVars(async (ctx) => [...envVars]),
];extensions: [
pythonExtension({
scripts: ["./ai/**/*.py"],
requirementsFile: "./requirements.txt",
}),
ffmpeg({ version: "7" }),
additionalPackages({ packages: ["wrangler"] }),
];extensions: [
playwright({ browsers: ["chromium"] }),
puppeteer(),
additionalFiles({ files: ["./selectors.json", "./proxies.txt"] }),
];- Use specific versions: Pin extension versions for reproducible builds
- External packages: Add modules with native addons to the
build.externalarray - Environment sync: Use
syncEnvVarsfor dynamic secrets - File paths: Use glob patterns for flexible file inclusion
- Debug builds: Use
--log-level debug --dry-runfor troubleshooting
Extensions only affect deployment, not local development. Use external array for packages that shouldn't be bundled.
Recurring tasks using cron. For one-off future runs, use the delay option.
import { schedules } from "@trigger.dev/sdk";
export const task = schedules.task({
id: "first-scheduled-task",
run: async (payload) => {
payload.timestamp; // Date (scheduled time, UTC)
payload.lastTimestamp; // Date | undefined
payload.timezone; // IANA, e.g. "America/New_York" (default "UTC")
payload.scheduleId; // string
payload.externalId; // string | undefined
payload.upcoming; // Date[]
payload.timestamp.toLocaleString("en-US", { timeZone: payload.timezone });
},
});Scheduled tasks need at least one schedule attached to run.
Declarative (sync on dev/deploy):
schedules.task({
id: "every-2h",
cron: "0 */2 * * *", // UTC
run: async () => {},
});
schedules.task({
id: "tokyo-5am",
cron: { pattern: "0 5 * * *", timezone: "Asia/Tokyo", environments: ["PRODUCTION", "STAGING"] },
run: async () => {},
});Imperative (SDK or dashboard):
await schedules.create({
task: task.id,
cron: "0 0 * * *",
timezone: "America/New_York", // DST-aware
externalId: "user_123",
deduplicationKey: "user_123-daily", // updates if reused
});// /trigger/reminder.ts
export const reminderTask = schedules.task({
id: "todo-reminder",
run: async (p) => {
if (!p.externalId) throw new Error("externalId is required");
const user = await db.getUser(p.externalId);
await sendReminderEmail(user);
},
});// app/reminders/route.ts
export async function POST(req: Request) {
const data = await req.json();
return Response.json(
await schedules.create({
task: reminderTask.id,
cron: "0 8 * * *",
timezone: data.timezone,
externalId: data.userId,
deduplicationKey: `${data.userId}-reminder`,
})
);
}* * * * *
| | | | └ day of week (0–7 or 1L–7L; 0/7=Sun; L=last)
| | | └── month (1–12)
| | └──── day of month (1–31 or L)
| └────── hour (0–23)
└──────── minute (0–59)
- Dev: only when the dev CLI is running.
- Staging/Production: only for tasks in the latest deployment.
await schedules.retrieve(id);
await schedules.list();
await schedules.update(id, { cron: "0 0 1 * *", externalId: "ext", deduplicationKey: "key" });
await schedules.deactivate(id);
await schedules.activate(id);
await schedules.del(id);
await schedules.timezones(); // list of IANA timezonesCreate/attach schedules visually (Task, Cron pattern, Timezone, Optional: External ID, Dedup key, Environments). Test scheduled tasks from the Test page.
Real-time monitoring and updates for runs
Realtime allows you to:
- Subscribe to run status changes, metadata updates, and streams
- Build real-time dashboards and UI updates
- Monitor task progress from frontend and backend
import { auth } from "@trigger.dev/sdk";
// Read-only token for specific runs
const publicToken = await auth.createPublicToken({
scopes: {
read: {
runs: ["run_123", "run_456"],
tasks: ["my-task-1", "my-task-2"],
},
},
expirationTime: "1h", // Default: 15 minutes
});// Single-use token for triggering tasks
const triggerToken = await auth.createTriggerPublicToken("my-task", {
expirationTime: "30m",
});import { runs, tasks } from "@trigger.dev/sdk";
// Trigger and subscribe
const handle = await tasks.trigger("my-task", { data: "value" });
// Subscribe to specific run
for await (const run of runs.subscribeToRun<typeof myTask>(handle.id)) {
console.log(`Status: ${run.status}, Progress: ${run.metadata?.progress}`);
if (run.status === "COMPLETED") break;
}
// Subscribe to runs with tag
for await (const run of runs.subscribeToRunsWithTag("user-123")) {
console.log(`Tagged run ${run.id}: ${run.status}`);
}
// Subscribe to batch
for await (const run of runs.subscribeToBatch(batchId)) {
console.log(`Batch run ${run.id}: ${run.status}`);
}import { streams, InferStreamType } from "@trigger.dev/sdk";
// 1. Define streams (shared location)
export const aiStream = streams.define<string>({
id: "ai-output",
});
export type AIStreamPart = InferStreamType<typeof aiStream>;
// 2. Pipe from task
export const streamingTask = task({
id: "streaming-task",
run: async (payload) => {
const completion = await openai.chat.completions.create({
model: "gpt-4",
messages: [{ role: "user", content: payload.prompt }],
stream: true,
});
const { waitUntilComplete } = aiStream.pipe(completion);
await waitUntilComplete();
},
});
// 3. Read from backend
const stream = await aiStream.read(runId, {
timeoutInSeconds: 300,
startIndex: 0, // Resume from specific chunk
});
for await (const chunk of stream) {
console.log("Chunk:", chunk); // Fully typed
}Enable v2 by upgrading to 4.1.0 or later.
npm add @trigger.dev/react-hooks"use client";
import { useTaskTrigger, useRealtimeTaskTrigger } from "@trigger.dev/react-hooks";
import type { myTask } from "../trigger/tasks";
function TriggerComponent({ accessToken }: { accessToken: string }) {
// Basic trigger
const { submit, handle, isLoading } = useTaskTrigger<typeof myTask>("my-task", {
accessToken,
});
// Trigger with realtime updates
const {
submit: realtimeSubmit,
run,
isLoading: isRealtimeLoading,
} = useRealtimeTaskTrigger<typeof myTask>("my-task", { accessToken });
return (
<div>
<button onClick={() => submit({ data: "value" })} disabled={isLoading}>
Trigger Task
</button>
<button onClick={() => realtimeSubmit({ data: "realtime" })} disabled={isRealtimeLoading}>
Trigger with Realtime
</button>
{run && <div>Status: {run.status}</div>}
</div>
);
}"use client";
import { useRealtimeRun, useRealtimeRunsWithTag } from "@trigger.dev/react-hooks";
import type { myTask } from "../trigger/tasks";
function SubscribeComponent({ runId, accessToken }: { runId: string; accessToken: string }) {
// Subscribe to specific run
const { run, error } = useRealtimeRun<typeof myTask>(runId, {
accessToken,
onComplete: (run) => {
console.log("Task completed:", run.output);
},
});
// Subscribe to tagged runs
const { runs } = useRealtimeRunsWithTag("user-123", { accessToken });
if (error) return <div>Error: {error.message}</div>;
if (!run) return <div>Loading...</div>;
return (
<div>
<div>Status: {run.status}</div>
<div>Progress: {run.metadata?.progress || 0}%</div>
{run.output && <div>Result: {JSON.stringify(run.output)}</div>}
<h3>Tagged Runs:</h3>
{runs.map((r) => (
<div key={r.id}>
{r.id}: {r.status}
</div>
))}
</div>
);
}"use client";
import { useRealtimeStream } from "@trigger.dev/react-hooks";
import { aiStream } from "../trigger/streams";
function StreamComponent({ runId, accessToken }: { runId: string; accessToken: string }) {
// Pass defined stream directly for type safety
const { parts, error } = useRealtimeStream(aiStream, runId, {
accessToken,
timeoutInSeconds: 300,
throttleInMs: 50, // Control re-render frequency
});
if (error) return <div>Error: {error.message}</div>;
if (!parts) return <div>Loading...</div>;
const text = parts.join(""); // parts is typed as AIStreamPart[]
return <div>Streamed Text: {text}</div>;
}"use client";
import { useWaitToken } from "@trigger.dev/react-hooks";
function WaitTokenComponent({ tokenId, accessToken }: { tokenId: string; accessToken: string }) {
const { complete } = useWaitToken(tokenId, { accessToken });
return <button onClick={() => complete({ approved: true })}>Approve Task</button>;
}"use client";
import { useRun } from "@trigger.dev/react-hooks";
import type { myTask } from "../trigger/tasks";
function SWRComponent({ runId, accessToken }: { runId: string; accessToken: string }) {
const { run, error, isLoading } = useRun<typeof myTask>(runId, {
accessToken,
refreshInterval: 0, // Disable polling (recommended)
});
if (isLoading) return <div>Loading...</div>;
if (error) return <div>Error: {error.message}</div>;
return <div>Run: {run?.status}</div>;
}Key properties available in run subscriptions:
id: Unique run identifierstatus:QUEUED,EXECUTING,COMPLETED,FAILED,CANCELED, etc.payload: Task input data (typed)output: Task result (typed, when completed)metadata: Real-time updatable datacreatedAt,updatedAt: TimestampscostInCents: Execution cost
- Use Realtime over SWR: Recommended for most use cases due to rate limits
- Scope tokens properly: Only grant necessary read/trigger permissions
- Handle errors: Always check for errors in hooks and subscriptions
- Type safety: Use task types for proper payload/output typing
- Cleanup subscriptions: Backend subscriptions auto-complete, frontend hooks auto-cleanup