Skip to content

Latest commit

 

History

History
155 lines (111 loc) · 4.29 KB

File metadata and controls

155 lines (111 loc) · 4.29 KB
title Input Streams
sidebarTitle Input Streams
description Send data into running tasks from your backend code

The Input Streams API allows you to send data into running Trigger.dev tasks from your backend code. This enables bidirectional communication — while output streams let you read data from tasks, input streams let you push data into them.

To learn how to receive input stream data inside your tasks, see [Input Streams](/tasks/streams#input-streams) in the Streams doc.

Sending data to a running task

Using defined input streams (Recommended)

The recommended approach is to use defined input streams for full type safety:

import { cancelSignal, approval } from "./trigger/streams";

// Cancel a running AI stream
await cancelSignal.send(runId, { reason: "User clicked stop" });

// Approve a draft
await approval.send(runId, { approved: true, reviewer: "alice@example.com" });

The .send() method is fully typed — the data parameter must match the generic type you defined on the input stream.

`.send()` works the same regardless of how the task is listening — whether it uses `.wait()` (suspending), `.once()` (non-suspending), or `.on()` (continuous). The sender doesn't need to know how the task is consuming the data. See [Input Streams](/tasks/streams#input-streams) for details on each receiving method.

Practical examples

Cancel from a Next.js API route

import { cancelStream } from "@/trigger/streams";

export async function POST(req: Request) {
  const { runId } = await req.json();

  await cancelStream.send(runId, { reason: "User clicked stop" });

  return Response.json({ cancelled: true });
}

Approval workflow API

import { approval } from "@/trigger/streams";

export async function POST(req: Request) {
  const { runId, approved, reviewer } = await req.json();

  await approval.send(runId, {
    approved,
    reviewer,
  });

  return Response.json({ success: true });
}

Remix action handler

import { json, type ActionFunctionArgs } from "@remix-run/node";
import { approval } from "~/trigger/streams";

export async function action({ request }: ActionFunctionArgs) {
  const formData = await request.formData();
  const runId = formData.get("runId") as string;
  const approved = formData.get("approved") === "true";
  const reviewer = formData.get("reviewer") as string;

  await approval.send(runId, { approved, reviewer });

  return json({ success: true });
}

Express handler

import express from "express";
import { cancelSignal } from "./trigger/streams";

const app = express();
app.use(express.json());

app.post("/api/cancel", async (req, res) => {
  const { runId, reason } = req.body;

  await cancelSignal.send(runId, { reason });

  res.json({ cancelled: true });
});

Sending from another task

You can send input stream data from one task to another running task:

import { task } from "@trigger.dev/sdk";
import { approval } from "./streams";

export const reviewerTask = task({
  id: "auto-reviewer",
  run: async (payload: { targetRunId: string }) => {
    // Perform automated review logic...
    const isApproved = await performReview();

    // Send approval to the waiting task
    await approval.send(payload.targetRunId, {
      approved: isApproved,
      reviewer: "auto-reviewer",
    });
  },
});

Error handling

The .send() method will throw if:

  • The run has already completed, failed, or been canceled
  • The payload exceeds the 1MB size limit
  • The run ID is invalid
import { cancelSignal } from "./trigger/streams";

try {
  await cancelSignal.send(runId, { reason: "User clicked stop" });
} catch (error) {
  console.error("Failed to send:", error);
  // Handle the error — the run may have already completed
}

Important notes

  • Maximum payload size per .send() call is 1MB
  • You cannot send data to a completed, failed, or canceled run
  • Data sent before a listener is registered inside the task is buffered and delivered when a listener attaches
  • Input streams require the current streams implementation (v2 is the default in SDK 4.1.0+). See Streams for details.