Skip to content

Commit f02a956

Browse files
claudeericallam
authored andcommitted
docs: add input streams documentation to realtime guides
Cover the problems input streams solve (cancelling AI SDK streamText mid-stream, human-in-the-loop workflows, interactive agents), the full API surface (on, once, peek, send), and a complete cancellable AI streaming example. Updates rules/4.3.0/realtime.md, manifest.json, and the Claude Code skill docs. https://claude.ai/code/session_01SJHJts7r2yAxmoKLLz8vpc
1 parent cefa7cc commit f02a956

File tree

2 files changed

+603
-0
lines changed

2 files changed

+603
-0
lines changed

.claude/skills/trigger-dev-tasks/realtime.md

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ Realtime allows you to:
99
- Subscribe to run status changes, metadata updates, and streams
1010
- Build real-time dashboards and UI updates
1111
- Monitor task progress from frontend and backend
12+
- Send data into running tasks with input streams
1213

1314
## Authentication
1415

@@ -103,6 +104,143 @@ for await (const chunk of stream) {
103104
}
104105
```
105106

107+
## Input Streams
108+
109+
Input streams let you send data **into** a running task from your backend or frontend. Output streams send data out of tasks; input streams complete the loop.
110+
111+
### Problems Input Streams Solve
112+
113+
**Cancelling AI streams mid-generation.** When you use AI SDK's `streamText` inside a task, the LLM keeps generating tokens until it's done — even if the user has navigated away or clicked "Stop generating." Without input streams, there's no way to tell the running task to abort. With input streams, your frontend sends a cancel signal and the task aborts the LLM call immediately.
114+
115+
**Human-in-the-loop workflows.** A task generates a draft, then pauses and waits for user approval before proceeding.
116+
117+
**Interactive agents.** An AI agent running as a task needs follow-up information from the user mid-execution.
118+
119+
### Defining Input Streams
120+
121+
```ts
122+
// trigger/streams.ts
123+
import { streams } from "@trigger.dev/sdk";
124+
125+
export const cancelSignal = streams.input<{ reason?: string }>({ id: "cancel" });
126+
export const approval = streams.input<{ approved: boolean; reviewer: string }>({ id: "approval" });
127+
```
128+
129+
### Receiving Data Inside a Task
130+
131+
#### `once()` — Wait for the next value
132+
133+
```ts
134+
import { task } from "@trigger.dev/sdk";
135+
import { approval } from "./streams";
136+
137+
export const draftEmailTask = task({
138+
id: "draft-email",
139+
run: async (payload: { to: string; subject: string }) => {
140+
const draft = await generateDraft(payload);
141+
const result = await approval.once(); // Blocks until data arrives
142+
143+
if (result.approved) {
144+
await sendEmail(draft);
145+
}
146+
return { sent: result.approved, reviewer: result.reviewer };
147+
},
148+
});
149+
```
150+
151+
Options: `once({ timeoutMs: 300_000 })` or `once({ signal: controller.signal })`.
152+
153+
#### `on()` — Listen for every value
154+
155+
```ts
156+
import { task } from "@trigger.dev/sdk";
157+
import { cancelSignal } from "./streams";
158+
159+
export const streamingTask = task({
160+
id: "streaming-task",
161+
run: async (payload: { prompt: string }) => {
162+
const controller = new AbortController();
163+
164+
const sub = cancelSignal.on(() => {
165+
controller.abort();
166+
});
167+
168+
try {
169+
const result = await streamText({
170+
model: openai("gpt-4o"),
171+
prompt: payload.prompt,
172+
abortSignal: controller.signal,
173+
});
174+
return result;
175+
} finally {
176+
sub.off();
177+
}
178+
},
179+
});
180+
```
181+
182+
#### `peek()` — Non-blocking check
183+
184+
```ts
185+
const latest = cancelSignal.peek(); // undefined if nothing received yet
186+
```
187+
188+
### Sending Data to a Running Task
189+
190+
```ts
191+
import { cancelSignal, approval } from "./trigger/streams";
192+
193+
await cancelSignal.send(runId, { reason: "User clicked stop" });
194+
await approval.send(runId, { approved: true, reviewer: "alice@example.com" });
195+
```
196+
197+
### Full Example: Cancellable AI Streaming
198+
199+
```ts
200+
// trigger/streams.ts
201+
import { streams } from "@trigger.dev/sdk";
202+
203+
export const aiOutput = streams.define<string>({ id: "ai" });
204+
export const cancelStream = streams.input<{ reason?: string }>({ id: "cancel" });
205+
```
206+
207+
```ts
208+
// trigger/ai-task.ts
209+
import { task } from "@trigger.dev/sdk";
210+
import { streamText } from "ai";
211+
import { openai } from "@ai-sdk/openai";
212+
import { aiOutput, cancelStream } from "./streams";
213+
214+
export const aiTask = task({
215+
id: "ai-chat",
216+
run: async (payload: { prompt: string }) => {
217+
const controller = new AbortController();
218+
const sub = cancelStream.on(() => controller.abort());
219+
220+
try {
221+
const result = streamText({
222+
model: openai("gpt-4o"),
223+
prompt: payload.prompt,
224+
abortSignal: controller.signal,
225+
});
226+
227+
const { waitUntilComplete } = aiOutput.pipe(result.textStream);
228+
await waitUntilComplete();
229+
return { text: await result.text };
230+
} finally {
231+
sub.off();
232+
}
233+
},
234+
});
235+
```
236+
237+
### Important Notes
238+
239+
- Input streams require v2 realtime streams (SDK 4.1.0+). Calling `.on()` or `.once()` without v2 throws an error.
240+
- Cannot send data to completed/failed/canceled runs.
241+
- Max 1MB per `.send()` call.
242+
- Data sent before a listener is registered gets buffered and delivered when a listener attaches.
243+
106244
## React Frontend Usage
107245

108246
### Installation
@@ -242,3 +380,5 @@ Key properties available in run subscriptions:
242380
- **Handle errors**: Always check for errors in hooks and subscriptions
243381
- **Type safety**: Use task types for proper payload/output typing
244382
- **Cleanup subscriptions**: Backend subscriptions auto-complete, frontend hooks auto-cleanup
383+
- **Clean up input stream listeners**: Always call `.off()` in a `finally` block to avoid leaks
384+
- **Use timeouts with `once()`**: Avoid hanging indefinitely if the signal never arrives

0 commit comments

Comments
 (0)