Skip to content

Commit cef3d00

Browse files
claudeericallam
authored andcommitted
docs: add SDK design proposal for input stream .wait() method
Documents the proposed .wait() method on input streams that uses waitpoint tokens internally to suspend task execution. Covers API surface, options, return types, examples, and behavioral differences from .once(). https://claude.ai/code/session_01SJHJts7r2yAxmoKLLz8vpc
1 parent 7a749de commit cef3d00

File tree

1 file changed

+370
-0
lines changed

1 file changed

+370
-0
lines changed

docs/designs/input-stream-wait.md

Lines changed: 370 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,370 @@
1+
# Input Stream `.wait()` — SDK Design
2+
3+
## Problem
4+
5+
The existing input stream methods (`.on()`, `.once()`, `.peek()`) are all **non-suspending**. When a task calls `await approval.once()`, the task process stays alive with an open SSE tail connection, consuming compute the entire time it waits for data.
6+
7+
This is fine for short-lived waits or cases where the task is doing other work concurrently (like streaming AI output while listening for a cancel signal). But for use cases where the task genuinely has nothing to do until data arrives — approval gates, human-in-the-loop decisions, waiting for external webhooks — keeping the process alive wastes compute and money.
8+
9+
`wait.forToken()` already solves this for arbitrary waitpoints: the task suspends, the process is freed, and execution resumes when the token is completed. Input stream `.wait()` brings that same suspension behavior to input streams, so you get the ergonomics of typed input streams with the efficiency of waitpoint-based suspension.
10+
11+
## API Surface
12+
13+
### `.wait()` method on input streams
14+
15+
```ts
16+
const approval = streams.input<{ approved: boolean; reviewer: string }>({
17+
id: "approval",
18+
});
19+
20+
// Inside a task — suspends execution until data arrives
21+
const result = await approval.wait();
22+
```
23+
24+
#### Signature
25+
26+
```ts
27+
type RealtimeDefinedInputStream<TData> = {
28+
// ... existing methods ...
29+
30+
/**
31+
* Suspend the task until data arrives on this input stream.
32+
*
33+
* Unlike `.once()` which keeps the task process alive while waiting,
34+
* `.wait()` suspends the task entirely — freeing compute resources.
35+
* The task resumes when data is sent via `.send()`.
36+
*
37+
* Uses a waitpoint token internally. Can only be called inside a task.run().
38+
*/
39+
wait: (options?: InputStreamWaitOptions) => ManualWaitpointPromise<TData>;
40+
};
41+
```
42+
43+
#### Options
44+
45+
```ts
46+
type InputStreamWaitOptions = {
47+
/**
48+
* Maximum time to wait before the waitpoint times out.
49+
* Uses the same period format as `wait.createToken()`.
50+
* If the timeout is reached, the result will be `{ ok: false, error }`.
51+
*
52+
* @example "30s", "5m", "1h", "24h", "7d"
53+
*/
54+
timeout?: string;
55+
56+
/**
57+
* Idempotency key for the underlying waitpoint token.
58+
* If the same key is used again (and hasn't expired), the existing
59+
* waitpoint is reused. This means if the task retries, it will
60+
* resume waiting on the same waitpoint rather than creating a new one.
61+
*/
62+
idempotencyKey?: string;
63+
64+
/**
65+
* TTL for the idempotency key. After this period, the same key
66+
* will create a new waitpoint.
67+
*/
68+
idempotencyKeyTTL?: string;
69+
70+
/**
71+
* Tags for the underlying waitpoint token, useful for querying
72+
* and filtering waitpoints via `wait.listTokens()`.
73+
*/
74+
tags?: string[];
75+
};
76+
```
77+
78+
#### Return type
79+
80+
Returns `ManualWaitpointPromise<TData>` — the same type returned by `wait.forToken()`. This gives you two ways to handle the result:
81+
82+
**Check `ok` explicitly:**
83+
84+
```ts
85+
const result = await approval.wait({ timeout: "24h" });
86+
87+
if (result.ok) {
88+
console.log(result.output.approved); // TData, fully typed
89+
} else {
90+
console.log("Timed out:", result.error.message);
91+
}
92+
```
93+
94+
**Use `.unwrap()` to throw on timeout:**
95+
96+
```ts
97+
// Throws WaitpointTimeoutError if the timeout is reached
98+
const data = await approval.wait({ timeout: "24h" }).unwrap();
99+
console.log(data.approved); // TData directly
100+
```
101+
102+
## When to Use `.wait()` vs `.once()` vs `.on()`
103+
104+
| Method | Task suspended? | Compute cost while waiting | Best for |
105+
|--------|----------------|---------------------------|----------|
106+
| `.on(handler)` | No | Full — process stays alive | Continuous listening (cancel signals, live updates) |
107+
| `.once()` | No | Full — process stays alive | Short waits, or when doing concurrent work |
108+
| `.wait()` | **Yes** | **None** — process freed | Approval gates, human-in-the-loop, long waits |
109+
110+
### Use `.wait()` when:
111+
112+
- The task has **nothing else to do** until data arrives
113+
- The wait could be **long** (minutes, hours, days) — e.g., waiting for a human to review something
114+
- You want to **minimize compute cost** — the task suspends and doesn't burn resources
115+
- You want **timeout behavior** matching `wait.forToken()` (auto-timeout with `ok: false`)
116+
- You need **idempotency** for retries — the same idempotency key resumes the same wait
117+
118+
### Use `.once()` when:
119+
120+
- The wait is **short** (seconds) and suspending would add unnecessary overhead
121+
- You're doing **concurrent work** while waiting — e.g., streaming AI output and waiting for the next user message at the same time
122+
- You want **AbortSignal support**`.once()` accepts a signal for cancellation from within the task
123+
- You need to **check a buffer**`.once()` resolves immediately if data was already sent before the call
124+
125+
### Use `.on()` when:
126+
127+
- You need to **react to every value**, not just the first one
128+
- You're implementing **event-driven patterns** like cancel signals
129+
- The handler runs **alongside other task work** (e.g., abort an AI stream when cancel arrives)
130+
131+
## Examples
132+
133+
### Approval gate — the core use case
134+
135+
The simplest case: a task does some work, then waits for human approval before continuing. With `.once()` this burns compute for the entire review period. With `.wait()` the task suspends.
136+
137+
```ts trigger/streams.ts
138+
import { streams } from "@trigger.dev/sdk";
139+
140+
export const approval = streams.input<{
141+
approved: boolean;
142+
reviewer: string;
143+
comment?: string;
144+
}>({ id: "approval" });
145+
```
146+
147+
```ts trigger/publish-post.ts
148+
import { task } from "@trigger.dev/sdk";
149+
import { approval } from "./streams";
150+
151+
export const publishPost = task({
152+
id: "publish-post",
153+
run: async (payload: { postId: string }) => {
154+
const draft = await prepareDraft(payload.postId);
155+
156+
// Notify reviewer (email, Slack, etc.)
157+
await notifyReviewer(draft);
158+
159+
// Suspend until reviewer responds — no compute cost while waiting
160+
const result = await approval.wait({ timeout: "7d" }).unwrap();
161+
162+
if (result.approved) {
163+
await publish(draft);
164+
return { published: true, reviewer: result.reviewer };
165+
}
166+
167+
return { published: false, reason: result.comment };
168+
},
169+
});
170+
```
171+
172+
```ts app/api/review/route.ts
173+
import { approval } from "@/trigger/streams";
174+
175+
export async function POST(req: Request) {
176+
const { runId, approved, comment } = await req.json();
177+
178+
await approval.send(runId, {
179+
approved,
180+
reviewer: "alice@example.com",
181+
comment,
182+
});
183+
184+
return Response.json({ ok: true });
185+
}
186+
```
187+
188+
### Idempotent waits for retries
189+
190+
If a task retries after a failure, you don't want to create a duplicate waitpoint. Use `idempotencyKey` to ensure the retry resumes the same wait.
191+
192+
```ts
193+
export const processOrder = task({
194+
id: "process-order",
195+
retry: { maxAttempts: 3 },
196+
run: async (payload: { orderId: string }) => {
197+
await prepareOrder(payload.orderId);
198+
199+
// Same idempotency key across retries — won't create duplicate waitpoints
200+
const result = await approval.wait({
201+
timeout: "48h",
202+
idempotencyKey: `order-approval-${payload.orderId}`,
203+
tags: [`order:${payload.orderId}`],
204+
});
205+
206+
if (!result.ok) {
207+
throw new Error("Approval timed out after 48 hours");
208+
}
209+
210+
await fulfillOrder(payload.orderId, result.output);
211+
},
212+
});
213+
```
214+
215+
### Multi-step conversation with an AI agent
216+
217+
An AI agent that pauses to ask the user for clarification. Each step suspends until the user responds.
218+
219+
```ts trigger/streams.ts
220+
import { streams } from "@trigger.dev/sdk";
221+
222+
export const userMessage = streams.input<{
223+
text: string;
224+
attachments?: string[];
225+
}>({ id: "user-message" });
226+
227+
export const agentOutput = streams.define<string>({ id: "agent" });
228+
```
229+
230+
```ts trigger/agent.ts
231+
import { task } from "@trigger.dev/sdk";
232+
import { streamText } from "ai";
233+
import { openai } from "@ai-sdk/openai";
234+
import { userMessage, agentOutput } from "./streams";
235+
236+
export const agentTask = task({
237+
id: "ai-agent",
238+
run: async (payload: { initialPrompt: string }) => {
239+
const messages: Array<{ role: string; content: string }> = [
240+
{ role: "user", content: payload.initialPrompt },
241+
];
242+
243+
for (let turn = 0; turn < 10; turn++) {
244+
// Generate a response
245+
const result = streamText({
246+
model: openai("gpt-4o"),
247+
messages,
248+
});
249+
250+
const { waitUntilComplete } = agentOutput.pipe(result.textStream);
251+
await waitUntilComplete();
252+
253+
const text = await result.text;
254+
messages.push({ role: "assistant", content: text });
255+
256+
// Check if the agent wants to ask the user something
257+
if (!needsUserInput(text)) {
258+
break;
259+
}
260+
261+
// Suspend and wait for the user to respond — zero compute cost
262+
const reply = await userMessage.wait({ timeout: "1h" }).unwrap();
263+
messages.push({ role: "user", content: reply.text });
264+
}
265+
266+
return { messages };
267+
},
268+
});
269+
```
270+
271+
### Timeout handling
272+
273+
When a `.wait()` times out, you get `{ ok: false, error }` — just like `wait.forToken()`.
274+
275+
```ts
276+
const result = await approval.wait({ timeout: "24h" });
277+
278+
if (!result.ok) {
279+
// WaitpointTimeoutError — the 24 hours elapsed
280+
await escalate(payload.ticketId);
281+
return { escalated: true };
282+
}
283+
284+
// result.output is the typed data
285+
await processApproval(result.output);
286+
```
287+
288+
Or let it throw with `.unwrap()`:
289+
290+
```ts
291+
try {
292+
const data = await approval.wait({ timeout: "24h" }).unwrap();
293+
await processApproval(data);
294+
} catch (error) {
295+
if (error instanceof WaitpointTimeoutError) {
296+
await escalate(payload.ticketId);
297+
}
298+
throw error;
299+
}
300+
```
301+
302+
### Combining `.wait()` and `.on()` in the same task
303+
304+
A task that waits for structured user input (suspending) but also listens for a cancel signal (non-suspending) during the active work phases.
305+
306+
```ts trigger/streams.ts
307+
import { streams } from "@trigger.dev/sdk";
308+
309+
export const cancelSignal = streams.input<{ reason?: string }>({ id: "cancel" });
310+
export const userInput = streams.input<{ choice: "a" | "b" | "c" }>({ id: "user-input" });
311+
```
312+
313+
```ts trigger/interactive-task.ts
314+
import { task } from "@trigger.dev/sdk";
315+
import { cancelSignal, userInput } from "./streams";
316+
317+
export const interactiveTask = task({
318+
id: "interactive",
319+
run: async (payload: { question: string }) => {
320+
// Phase 1: Suspend and wait for user choice (no compute cost)
321+
const { choice } = await userInput.wait({ timeout: "1h" }).unwrap();
322+
323+
// Phase 2: Do expensive work with cancel support (compute is running)
324+
const controller = new AbortController();
325+
const sub = cancelSignal.on(() => controller.abort());
326+
327+
try {
328+
const result = await doExpensiveWork(choice, controller.signal);
329+
return result;
330+
} finally {
331+
sub.off();
332+
}
333+
},
334+
});
335+
```
336+
337+
## Sending data — no changes
338+
339+
`.send()` works exactly the same whether the task is waiting via `.wait()`, `.once()`, or `.on()`. The caller doesn't need to know how the task is listening:
340+
341+
```ts
342+
// This works regardless of whether the task used .wait(), .once(), or .on()
343+
await approval.send(runId, { approved: true, reviewer: "alice" });
344+
```
345+
346+
## Behavioral differences from `.once()`
347+
348+
| Behavior | `.once()` | `.wait()` |
349+
|----------|-----------|-----------|
350+
| Task process | Stays alive | Suspended |
351+
| Buffered data | Resolves immediately from buffer | N/A — creates waitpoint before checking buffer |
352+
| AbortSignal | Supported via `options.signal` | Not supported — use `timeout` instead |
353+
| Timeout format | `timeoutMs` (milliseconds) | `timeout` (period string: `"24h"`, `"7d"`) |
354+
| Timeout result | Rejects with `Error` | Resolves with `{ ok: false, error }` (or throws via `.unwrap()`) |
355+
| Return type | `Promise<TData>` | `ManualWaitpointPromise<TData>` |
356+
| Idempotency | None | Supported via `idempotencyKey` |
357+
| Tags | None | Supported via `tags` |
358+
| Multiple calls | Each `.once()` waits for the next value | Each `.wait()` creates a new waitpoint |
359+
| Can use outside task | No (needs SSE tail) | No (needs `runtime.waitUntil()`) |
360+
361+
## How it works (conceptual)
362+
363+
Under the hood, `.wait()` bridges input streams with the waitpoint token system:
364+
365+
1. **Task calls `approval.wait()`** — creates a waitpoint token internally and tells the platform to link it to this input stream
366+
2. **Task suspends** via `runtime.waitUntil(tokenId)` — process is freed, zero compute cost
367+
3. **Caller sends data** via `approval.send(runId, data)` — the platform sees the linked waitpoint and completes it with the sent data
368+
4. **Task resumes** — the waitpoint resolves and `.wait()` returns the typed data
369+
370+
The key insight is that the platform handles the bridging: it knows which waitpoint token is associated with which input stream, so when data arrives on the stream, it completes the corresponding waitpoint. The task doesn't need a running process to receive the data.

0 commit comments

Comments
 (0)