Skip to content

Commit 1075894

Browse files
claudeericallam
authored andcommitted
feat: add streams.input() for inbound realtime data to running tasks
Adds a new `streams.input<T>()` API that enables external callers to send typed data into running tasks. Inside tasks, data is received via `.on()`, `.once()`, and `.peek()` methods. Outside tasks, `.send(runId, data)` pushes data through a new platform API route. Architecture: - All input streams for a run multiplex onto a single `__input` stream - Worker demuxes by stream ID and routes to registered handlers - Lazy stream creation on first `.send()` call - New IPC message `INPUT_STREAM_CREATED` triggers worker tail connection New files: - packages/core/src/v3/inputStreams/ (manager, types, noop, singleton) - apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts - DB migration for hasInputStream field on TaskRun https://claude.ai/code/session_01SJHJts7r2yAxmoKLLz8vpc
1 parent 8003923 commit 1075894

File tree

18 files changed

+655
-1
lines changed

18 files changed

+655
-1
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import { json } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { $replica, prisma } from "~/db.server";
4+
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
5+
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
6+
7+
const ParamsSchema = z.object({
8+
runId: z.string(),
9+
streamId: z.string(),
10+
});
11+
12+
const BodySchema = z.object({
13+
data: z.unknown(),
14+
});
15+
16+
const { action } = createActionApiRoute(
17+
{
18+
params: ParamsSchema,
19+
maxContentLength: 1024 * 1024, // 1MB max
20+
},
21+
async ({ request, params, authentication }) => {
22+
const run = await $replica.taskRun.findFirst({
23+
where: {
24+
friendlyId: params.runId,
25+
runtimeEnvironmentId: authentication.environment.id,
26+
},
27+
select: {
28+
id: true,
29+
friendlyId: true,
30+
completedAt: true,
31+
hasInputStream: true,
32+
realtimeStreamsVersion: true,
33+
},
34+
});
35+
36+
if (!run) {
37+
return json({ ok: false, error: "Run not found" }, { status: 404 });
38+
}
39+
40+
if (run.completedAt) {
41+
return json(
42+
{ ok: false, error: "Cannot send to input stream on a completed run" },
43+
{ status: 400 }
44+
);
45+
}
46+
47+
const body = BodySchema.safeParse(await request.json());
48+
49+
if (!body.success) {
50+
return json({ ok: false, error: "Invalid request body" }, { status: 400 });
51+
}
52+
53+
const realtimeStream = getRealtimeStreamInstance(
54+
authentication.environment,
55+
run.realtimeStreamsVersion
56+
);
57+
58+
// Lazily create the input stream on first send
59+
if (!run.hasInputStream) {
60+
await prisma.taskRun.update({
61+
where: { id: run.id },
62+
data: { hasInputStream: true },
63+
});
64+
65+
await realtimeStream.initializeStream(run.friendlyId, "__input");
66+
}
67+
68+
// Build the input stream record
69+
const recordId = `inp_${crypto.randomUUID().replace(/-/g, "").slice(0, 12)}`;
70+
const record = JSON.stringify({
71+
stream: params.streamId,
72+
data: body.data.data,
73+
ts: Date.now(),
74+
id: recordId,
75+
});
76+
77+
// Append the record to the multiplexed __input stream
78+
await realtimeStream.appendPart(record, recordId, run.friendlyId, "__input");
79+
80+
return json({ ok: true });
81+
}
82+
);
83+
84+
export { action };
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "TaskRun" ADD COLUMN "hasInputStream" BOOLEAN NOT NULL DEFAULT false;

internal-packages/database/prisma/schema.prisma

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -778,6 +778,9 @@ model TaskRun {
778778
/// Store the stream keys that are being used by the run
779779
realtimeStreams String[] @default([])
780780
781+
/// Whether this run has an active input stream (created lazily on first streams.input send)
782+
hasInputStream Boolean @default(false)
783+
781784
@@unique([oneTimeUseToken])
782785
@@unique([runtimeEnvironmentId, taskIdentifier, idempotencyKey])
783786
// Finding child runs

packages/cli-v3/src/entryPoints/dev-run-worker.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import {
3333
traceContext,
3434
heartbeats,
3535
realtimeStreams,
36+
inputStreams,
3637
} from "@trigger.dev/core/v3";
3738
import { TriggerTracer } from "@trigger.dev/core/v3/tracer";
3839
import {
@@ -59,6 +60,7 @@ import {
5960
StandardTraceContextManager,
6061
StandardHeartbeatsManager,
6162
StandardRealtimeStreamsManager,
63+
StandardInputStreamManager,
6264
} from "@trigger.dev/core/v3/workers";
6365
import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc";
6466
import { readFile } from "node:fs/promises";
@@ -160,6 +162,14 @@ const standardRealtimeStreamsManager = new StandardRealtimeStreamsManager(
160162
);
161163
realtimeStreams.setGlobalManager(standardRealtimeStreamsManager);
162164

165+
const standardInputStreamManager = new StandardInputStreamManager(
166+
apiClientManager.clientOrThrow(),
167+
getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev",
168+
(getEnvVar("TRIGGER_STREAMS_DEBUG") === "1" || getEnvVar("TRIGGER_STREAMS_DEBUG") === "true") ??
169+
false
170+
);
171+
inputStreams.setGlobalManager(standardInputStreamManager);
172+
163173
const waitUntilTimeoutInMs = getNumberEnvVar("TRIGGER_WAIT_UNTIL_TIMEOUT_MS", 60_000);
164174
const waitUntilManager = new StandardWaitUntilManager(waitUntilTimeoutInMs);
165175
waitUntil.setGlobalManager(waitUntilManager);
@@ -333,6 +343,7 @@ function resetExecutionEnvironment() {
333343
usageTimeoutManager.reset();
334344
runMetadataManager.reset();
335345
standardRealtimeStreamsManager.reset();
346+
standardInputStreamManager.reset();
336347
waitUntilManager.reset();
337348
_sharedWorkerRuntime?.reset();
338349
durableClock.reset();
@@ -634,6 +645,9 @@ const zodIpc = new ZodIpcConnection({
634645
RESOLVE_WAITPOINT: async ({ waitpoint }) => {
635646
_sharedWorkerRuntime?.resolveWaitpoints([waitpoint]);
636647
},
648+
INPUT_STREAM_CREATED: async ({ runId }) => {
649+
standardInputStreamManager.connectTail(runId);
650+
},
637651
},
638652
});
639653

packages/cli-v3/src/entryPoints/managed-run-worker.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import {
3232
traceContext,
3333
heartbeats,
3434
realtimeStreams,
35+
inputStreams,
3536
} from "@trigger.dev/core/v3";
3637
import { TriggerTracer } from "@trigger.dev/core/v3/tracer";
3738
import {
@@ -59,6 +60,7 @@ import {
5960
StandardTraceContextManager,
6061
StandardHeartbeatsManager,
6162
StandardRealtimeStreamsManager,
63+
StandardInputStreamManager,
6264
} from "@trigger.dev/core/v3/workers";
6365
import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc";
6466
import { readFile } from "node:fs/promises";
@@ -140,6 +142,14 @@ const standardRealtimeStreamsManager = new StandardRealtimeStreamsManager(
140142
);
141143
realtimeStreams.setGlobalManager(standardRealtimeStreamsManager);
142144

145+
const standardInputStreamManager = new StandardInputStreamManager(
146+
apiClientManager.clientOrThrow(),
147+
getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev",
148+
(getEnvVar("TRIGGER_STREAMS_DEBUG") === "1" || getEnvVar("TRIGGER_STREAMS_DEBUG") === "true") ??
149+
false
150+
);
151+
inputStreams.setGlobalManager(standardInputStreamManager);
152+
143153
const waitUntilTimeoutInMs = getNumberEnvVar("TRIGGER_WAIT_UNTIL_TIMEOUT_MS", 60_000);
144154
const waitUntilManager = new StandardWaitUntilManager(waitUntilTimeoutInMs);
145155
waitUntil.setGlobalManager(waitUntilManager);
@@ -313,6 +323,7 @@ function resetExecutionEnvironment() {
313323
runMetadataManager.reset();
314324
waitUntilManager.reset();
315325
standardRealtimeStreamsManager.reset();
326+
standardInputStreamManager.reset();
316327
_sharedWorkerRuntime?.reset();
317328
durableClock.reset();
318329
taskContext.disable();
@@ -627,6 +638,9 @@ const zodIpc = new ZodIpcConnection({
627638
RESOLVE_WAITPOINT: async ({ waitpoint }) => {
628639
_sharedWorkerRuntime?.resolveWaitpoints([waitpoint]);
629640
},
641+
INPUT_STREAM_CREATED: async ({ runId }) => {
642+
standardInputStreamManager.connectTail(runId);
643+
},
630644
},
631645
});
632646

packages/core/src/v3/apiClient/index.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import {
4141
RetrieveRunResponse,
4242
RetrieveRunTraceResponseBody,
4343
ScheduleObject,
44+
SendInputStreamResponseBody,
4445
StreamBatchItemsResponse,
4546
TaskRunExecutionResult,
4647
TriggerTaskRequestBody,
@@ -1385,6 +1386,24 @@ export class ApiClient {
13851386
);
13861387
}
13871388

1389+
async sendInputStream(
1390+
runId: string,
1391+
streamId: string,
1392+
data: unknown,
1393+
requestOptions?: ZodFetchOptions
1394+
) {
1395+
return zodfetch(
1396+
SendInputStreamResponseBody,
1397+
`${this.baseUrl}/realtime/v1/streams/${runId}/input/${streamId}`,
1398+
{
1399+
method: "POST",
1400+
headers: this.#getHeaders(false),
1401+
body: JSON.stringify({ data }),
1402+
},
1403+
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
1404+
);
1405+
}
1406+
13881407
async generateJWTClaims(requestOptions?: ZodFetchOptions): Promise<Record<string, any>> {
13891408
return zodfetch(
13901409
z.record(z.any()),

packages/core/src/v3/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ export * from "./lifecycle-hooks-api.js";
2020
export * from "./locals-api.js";
2121
export * from "./heartbeats-api.js";
2222
export * from "./realtime-streams-api.js";
23+
export * from "./input-streams-api.js";
2324
export * from "./schemas/index.js";
2425
export { SemanticInternalAttributes } from "./semanticInternalAttributes.js";
2526
export * from "./resource-catalog-api.js";
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
// Split module-level variable definition into separate files to allow
2+
// tree-shaking on each api instance.
3+
import { InputStreamsAPI } from "./inputStreams/index.js";
4+
5+
export const inputStreams = InputStreamsAPI.getInstance();
6+
7+
export * from "./inputStreams/types.js";
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import { getGlobal, registerGlobal } from "../utils/globals.js";
2+
import { NoopInputStreamManager } from "./noopManager.js";
3+
import { InputStreamManager } from "./types.js";
4+
import { InputStreamOnceOptions } from "../realtimeStreams/types.js";
5+
6+
const API_NAME = "input-streams";
7+
8+
const NOOP_MANAGER = new NoopInputStreamManager();
9+
10+
export class InputStreamsAPI implements InputStreamManager {
11+
private static _instance?: InputStreamsAPI;
12+
13+
private constructor() {}
14+
15+
public static getInstance(): InputStreamsAPI {
16+
if (!this._instance) {
17+
this._instance = new InputStreamsAPI();
18+
}
19+
20+
return this._instance;
21+
}
22+
23+
setGlobalManager(manager: InputStreamManager): boolean {
24+
return registerGlobal(API_NAME, manager);
25+
}
26+
27+
#getManager(): InputStreamManager {
28+
return getGlobal(API_NAME) ?? NOOP_MANAGER;
29+
}
30+
31+
public on(
32+
streamId: string,
33+
handler: (data: unknown) => void | Promise<void>
34+
): { off: () => void } {
35+
return this.#getManager().on(streamId, handler);
36+
}
37+
38+
public once(streamId: string, options?: InputStreamOnceOptions): Promise<unknown> {
39+
return this.#getManager().once(streamId, options);
40+
}
41+
42+
public peek(streamId: string): unknown | undefined {
43+
return this.#getManager().peek(streamId);
44+
}
45+
46+
public reset(): void {
47+
this.#getManager().reset();
48+
}
49+
50+
public disconnect(): void {
51+
this.#getManager().disconnect();
52+
}
53+
54+
public connectTail(runId: string, fromSeq?: number): void {
55+
this.#getManager().connectTail(runId, fromSeq);
56+
}
57+
}

0 commit comments

Comments
 (0)