Skip to content

Commit 4c8cc8b

Browse files
committed
input stream waitpoints and tests in the hello world reference project
1 parent 7805a39 commit 4c8cc8b

File tree

31 files changed

+960
-56
lines changed

31 files changed

+960
-56
lines changed

.changeset/input-stream-wait.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/core": patch
3+
"@trigger.dev/sdk": patch
4+
---
5+
6+
Add `.wait()` method to input streams for suspending tasks while waiting for data. Unlike `.once()` which keeps the task process alive, `.wait()` suspends the task entirely, freeing compute resources. The task resumes when data arrives via `.send()`.

.claude/skills/trigger-dev-tasks/realtime.md

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,40 @@ export const approval = streams.input<{ approved: boolean; reviewer: string }>({
128128

129129
### Receiving Data Inside a Task
130130

131-
#### `once()` — Wait for the next value
131+
#### `wait()` — Suspend until data arrives (recommended for long waits)
132+
133+
Suspends the task entirely, freeing compute. Returns `ManualWaitpointPromise<TData>` (same as `wait.forToken()`).
134+
135+
```ts
136+
import { task } from "@trigger.dev/sdk";
137+
import { approval } from "./streams";
138+
139+
export const publishPost = task({
140+
id: "publish-post",
141+
run: async (payload: { postId: string }) => {
142+
const draft = await prepareDraft(payload.postId);
143+
await notifyReviewer(draft);
144+
145+
// Suspend — no compute cost while waiting
146+
const result = await approval.wait({ timeout: "7d" });
147+
148+
if (result.ok) {
149+
return { published: result.output.approved };
150+
}
151+
return { published: false, timedOut: true };
152+
},
153+
});
154+
```
155+
156+
Options: `timeout` (period string), `idempotencyKey`, `idempotencyKeyTTL`, `tags`.
157+
158+
Use `.unwrap()` to throw on timeout: `const data = await approval.wait({ timeout: "24h" }).unwrap();`
159+
160+
**Use `.wait()` when:** nothing to do until data arrives, wait could be long, want zero compute cost.
161+
162+
#### `once()` — Wait for the next value (non-suspending)
163+
164+
Keeps the task process alive. Use for short waits or when doing concurrent work.
132165

133166
```ts
134167
import { task } from "@trigger.dev/sdk";
@@ -150,6 +183,8 @@ export const draftEmailTask = task({
150183

151184
Options: `once({ timeoutMs: 300_000 })` or `once({ signal: controller.signal })`.
152185

186+
**Use `.once()` when:** wait is short, doing concurrent work, need AbortSignal support.
187+
153188
#### `on()` — Listen for every value
154189

155190
```ts
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Add input stream `.wait()` support: new API route for creating input-stream-linked waitpoints, Redis cache for fast waitpoint lookup from `.send()`, and waitpoint completion bridging in the send route.
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
import { json } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import {
4+
CreateInputStreamWaitpointRequestBody,
5+
type CreateInputStreamWaitpointResponseBody,
6+
} from "@trigger.dev/core/v3";
7+
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
8+
import { $replica } from "~/db.server";
9+
import { createWaitpointTag, MAX_TAGS_PER_WAITPOINT } from "~/models/waitpointTag.server";
10+
import {
11+
deleteInputStreamWaitpoint,
12+
setInputStreamWaitpoint,
13+
} from "~/services/inputStreamWaitpointCache.server";
14+
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
15+
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
16+
import { parseDelay } from "~/utils/delays";
17+
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
18+
import { engine } from "~/v3/runEngine.server";
19+
import { ServiceValidationError } from "~/v3/services/baseService.server";
20+
21+
const ParamsSchema = z.object({
22+
runFriendlyId: z.string(),
23+
});
24+
25+
const { action, loader } = createActionApiRoute(
26+
{
27+
params: ParamsSchema,
28+
body: CreateInputStreamWaitpointRequestBody,
29+
maxContentLength: 1024 * 10, // 10KB
30+
method: "POST",
31+
allowJWT: true,
32+
corsStrategy: "all",
33+
authorization: {
34+
action: "write",
35+
resource: (params) => ({ inputStreams: params.runFriendlyId }),
36+
superScopes: ["write:inputStreams", "write:all", "admin"],
37+
},
38+
},
39+
async ({ authentication, body, params }) => {
40+
try {
41+
const run = await $replica.taskRun.findFirst({
42+
where: {
43+
friendlyId: params.runFriendlyId,
44+
runtimeEnvironmentId: authentication.environment.id,
45+
},
46+
select: {
47+
id: true,
48+
friendlyId: true,
49+
realtimeStreamsVersion: true,
50+
},
51+
});
52+
53+
if (!run) {
54+
return json({ error: "Run not found" }, { status: 404 });
55+
}
56+
57+
const idempotencyKeyExpiresAt = body.idempotencyKeyTTL
58+
? resolveIdempotencyKeyTTL(body.idempotencyKeyTTL)
59+
: undefined;
60+
61+
const timeout = await parseDelay(body.timeout);
62+
63+
// Process tags (same pattern as api.v1.waitpoints.tokens.ts)
64+
const bodyTags = typeof body.tags === "string" ? [body.tags] : body.tags;
65+
66+
if (bodyTags && bodyTags.length > MAX_TAGS_PER_WAITPOINT) {
67+
throw new ServiceValidationError(
68+
`Waitpoints can only have ${MAX_TAGS_PER_WAITPOINT} tags, you're trying to set ${bodyTags.length}.`
69+
);
70+
}
71+
72+
if (bodyTags && bodyTags.length > 0) {
73+
for (const tag of bodyTags) {
74+
await createWaitpointTag({
75+
tag,
76+
environmentId: authentication.environment.id,
77+
projectId: authentication.environment.projectId,
78+
});
79+
}
80+
}
81+
82+
// Step 1: Create the waitpoint
83+
const result = await engine.createManualWaitpoint({
84+
environmentId: authentication.environment.id,
85+
projectId: authentication.environment.projectId,
86+
idempotencyKey: body.idempotencyKey,
87+
idempotencyKeyExpiresAt,
88+
timeout,
89+
tags: bodyTags,
90+
inputStreamRunFriendlyId: run.friendlyId,
91+
inputStreamId: body.streamId,
92+
});
93+
94+
// Step 2: Cache the mapping in Redis for fast lookup from .send()
95+
const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined;
96+
await setInputStreamWaitpoint(
97+
run.friendlyId,
98+
body.streamId,
99+
result.waitpoint.id,
100+
ttlMs && ttlMs > 0 ? ttlMs : undefined
101+
);
102+
103+
// Step 3: Check if data was already sent to this input stream (race condition handling).
104+
// If .send() landed before .wait(), the data is in the S2 stream but no waitpoint
105+
// existed to complete. We check from the client's last known position.
106+
if (!result.isCached) {
107+
try {
108+
const realtimeStream = getRealtimeStreamInstance(
109+
authentication.environment,
110+
run.realtimeStreamsVersion
111+
);
112+
113+
if (realtimeStream.readRecords) {
114+
const records = await realtimeStream.readRecords(
115+
run.friendlyId,
116+
"__input",
117+
body.lastSeqNum
118+
);
119+
120+
// Find the first record matching this input stream ID
121+
for (const record of records) {
122+
try {
123+
const parsed = JSON.parse(record.data) as {
124+
stream: string;
125+
data: unknown;
126+
};
127+
128+
if (parsed.stream === body.streamId) {
129+
// Data exists — complete the waitpoint immediately
130+
await engine.completeWaitpoint({
131+
id: result.waitpoint.id,
132+
output: {
133+
value: JSON.stringify(parsed.data),
134+
type: "application/json",
135+
isError: false,
136+
},
137+
});
138+
139+
// Clean up the Redis cache since we completed it ourselves
140+
await deleteInputStreamWaitpoint(run.friendlyId, body.streamId);
141+
break;
142+
}
143+
} catch {
144+
// Skip malformed records
145+
}
146+
}
147+
}
148+
} catch {
149+
// Non-fatal: if the S2 check fails, the waitpoint is still PENDING.
150+
// The next .send() will complete it via the Redis cache path.
151+
}
152+
}
153+
154+
return json<CreateInputStreamWaitpointResponseBody>({
155+
waitpointId: WaitpointId.toFriendlyId(result.waitpoint.id),
156+
isCached: result.isCached,
157+
});
158+
} catch (error) {
159+
if (error instanceof ServiceValidationError) {
160+
return json({ error: error.message }, { status: 422 });
161+
} else if (error instanceof Error) {
162+
return json({ error: error.message }, { status: 500 });
163+
}
164+
165+
return json({ error: "Something went wrong" }, { status: 500 });
166+
}
167+
}
168+
);
169+
170+
export { action, loader };

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ async function responseHeaders(
166166
const claims = {
167167
sub: environment.id,
168168
pub: true,
169-
scopes: [`read:runs:${run.friendlyId}`],
169+
scopes: [`read:runs:${run.friendlyId}`, `write:inputStreams:${run.friendlyId}`],
170170
realtime,
171171
};
172172

apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import { json } from "@remix-run/server-runtime";
22
import { z } from "zod";
3-
import { $replica, prisma } from "~/db.server";
3+
import { $replica } from "~/db.server";
4+
import { getAndDeleteInputStreamWaitpoint } from "~/services/inputStreamWaitpointCache.server";
45
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
56
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
7+
import { engine } from "~/v3/runEngine.server";
68

79
const ParamsSchema = z.object({
810
runId: z.string(),
@@ -13,10 +15,17 @@ const BodySchema = z.object({
1315
data: z.unknown(),
1416
});
1517

16-
const { action } = createActionApiRoute(
18+
const { action, loader } = createActionApiRoute(
1719
{
1820
params: ParamsSchema,
1921
maxContentLength: 1024 * 1024, // 1MB max
22+
allowJWT: true,
23+
corsStrategy: "all",
24+
authorization: {
25+
action: "write",
26+
resource: (params) => ({ inputStreams: params.runId }),
27+
superScopes: ["write:inputStreams", "write:all", "admin"],
28+
},
2029
},
2130
async ({ request, params, authentication }) => {
2231
const run = await $replica.taskRun.findFirst({
@@ -28,7 +37,6 @@ const { action } = createActionApiRoute(
2837
id: true,
2938
friendlyId: true,
3039
completedAt: true,
31-
hasInputStream: true,
3240
realtimeStreamsVersion: true,
3341
},
3442
});
@@ -55,16 +63,6 @@ const { action } = createActionApiRoute(
5563
run.realtimeStreamsVersion
5664
);
5765

58-
// Lazily create the input stream on first send
59-
if (!run.hasInputStream) {
60-
await prisma.taskRun.update({
61-
where: { id: run.id },
62-
data: { hasInputStream: true },
63-
});
64-
65-
await realtimeStream.initializeStream(run.friendlyId, "__input");
66-
}
67-
6866
// Build the input stream record
6967
const recordId = `inp_${crypto.randomUUID().replace(/-/g, "").slice(0, 12)}`;
7068
const record = JSON.stringify({
@@ -74,11 +72,24 @@ const { action } = createActionApiRoute(
7472
id: recordId,
7573
});
7674

77-
// Append the record to the multiplexed __input stream
75+
// Append the record to the multiplexed __input stream (auto-creates on first write)
7876
await realtimeStream.appendPart(record, recordId, run.friendlyId, "__input");
7977

78+
// Check Redis cache for a linked .wait() waitpoint (fast, no DB hit if none)
79+
const waitpointId = await getAndDeleteInputStreamWaitpoint(params.runId, params.streamId);
80+
if (waitpointId) {
81+
await engine.completeWaitpoint({
82+
id: waitpointId,
83+
output: {
84+
value: JSON.stringify(body.data.data),
85+
type: "application/json",
86+
isError: false,
87+
},
88+
});
89+
}
90+
8091
return json({ ok: true });
8192
}
8293
);
8394

84-
export { action };
95+
export { action, loader };

apps/webapp/app/services/authorization.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
export type AuthorizationAction = "read" | "write" | string; // Add more actions as needed
22

3-
const ResourceTypes = ["tasks", "tags", "runs", "batch", "waitpoints", "deployments"] as const;
3+
const ResourceTypes = ["tasks", "tags", "runs", "batch", "waitpoints", "deployments", "inputStreams"] as const;
44

55
export type AuthorizationResources = {
66
[key in (typeof ResourceTypes)[number]]?: string | string[];

0 commit comments

Comments
 (0)