Skip to content

Commit 57f2488

Browse files
committed
No more input stream multiplexing, no naming collision risk, removed unnecessary waitpoint added columns.
1 parent 4c8cc8b commit 57f2488

File tree

14 files changed

+146
-153
lines changed

14 files changed

+146
-153
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,5 @@ apps/**/public/build
6767
**/.claude/settings.local.json
6868
.mcp.log
6969
.mcp.json
70-
.cursor/debug.log
70+
.cursor/debug.log
71+
ailogger-output.log

apps/webapp/app/routes/api.v1.runs.$runFriendlyId.input-streams.wait.ts

Lines changed: 17 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,6 @@ const { action, loader } = createActionApiRoute(
2828
body: CreateInputStreamWaitpointRequestBody,
2929
maxContentLength: 1024 * 10, // 10KB
3030
method: "POST",
31-
allowJWT: true,
32-
corsStrategy: "all",
33-
authorization: {
34-
action: "write",
35-
resource: (params) => ({ inputStreams: params.runFriendlyId }),
36-
superScopes: ["write:inputStreams", "write:all", "admin"],
37-
},
3831
},
3932
async ({ authentication, body, params }) => {
4033
try {
@@ -87,8 +80,6 @@ const { action, loader } = createActionApiRoute(
8780
idempotencyKeyExpiresAt,
8881
timeout,
8982
tags: bodyTags,
90-
inputStreamRunFriendlyId: run.friendlyId,
91-
inputStreamId: body.streamId,
9283
});
9384

9485
// Step 2: Cache the mapping in Redis for fast lookup from .send()
@@ -113,33 +104,27 @@ const { action, loader } = createActionApiRoute(
113104
if (realtimeStream.readRecords) {
114105
const records = await realtimeStream.readRecords(
115106
run.friendlyId,
116-
"__input",
107+
`$trigger.input:${body.streamId}`,
117108
body.lastSeqNum
118109
);
119110

120-
// Find the first record matching this input stream ID
121-
for (const record of records) {
111+
if (records.length > 0) {
112+
const record = records[0]!;
122113
try {
123-
const parsed = JSON.parse(record.data) as {
124-
stream: string;
125-
data: unknown;
126-
};
127-
128-
if (parsed.stream === body.streamId) {
129-
// Data exists — complete the waitpoint immediately
130-
await engine.completeWaitpoint({
131-
id: result.waitpoint.id,
132-
output: {
133-
value: JSON.stringify(parsed.data),
134-
type: "application/json",
135-
isError: false,
136-
},
137-
});
138-
139-
// Clean up the Redis cache since we completed it ourselves
140-
await deleteInputStreamWaitpoint(run.friendlyId, body.streamId);
141-
break;
142-
}
114+
const parsed = JSON.parse(record.data) as { data: unknown };
115+
116+
// Data exists — complete the waitpoint immediately
117+
await engine.completeWaitpoint({
118+
id: result.waitpoint.id,
119+
output: {
120+
value: JSON.stringify(parsed.data),
121+
type: "application/json",
122+
isError: false,
123+
},
124+
});
125+
126+
// Clean up the Redis cache since we completed it ourselves
127+
await deleteInputStreamWaitpoint(run.friendlyId, body.streamId);
143128
} catch {
144129
// Skip malformed records
145130
}

apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts

Lines changed: 82 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ import { json } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { $replica } from "~/db.server";
44
import { getAndDeleteInputStreamWaitpoint } from "~/services/inputStreamWaitpointCache.server";
5-
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
5+
import {
6+
createActionApiRoute,
7+
createLoaderApiRoute,
8+
} from "~/services/routeBuilders/apiBuilder.server";
69
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
710
import { engine } from "~/v3/runEngine.server";
811

@@ -15,7 +18,8 @@ const BodySchema = z.object({
1518
data: z.unknown(),
1619
});
1720

18-
const { action, loader } = createActionApiRoute(
21+
// POST: Send data to an input stream
22+
const { action } = createActionApiRoute(
1923
{
2024
params: ParamsSchema,
2125
maxContentLength: 1024 * 1024, // 1MB max
@@ -66,14 +70,18 @@ const { action, loader } = createActionApiRoute(
6670
// Build the input stream record
6771
const recordId = `inp_${crypto.randomUUID().replace(/-/g, "").slice(0, 12)}`;
6872
const record = JSON.stringify({
69-
stream: params.streamId,
7073
data: body.data.data,
7174
ts: Date.now(),
7275
id: recordId,
7376
});
7477

75-
// Append the record to the multiplexed __input stream (auto-creates on first write)
76-
await realtimeStream.appendPart(record, recordId, run.friendlyId, "__input");
78+
// Append the record to the per-stream S2 stream (auto-creates on first write)
79+
await realtimeStream.appendPart(
80+
record,
81+
recordId,
82+
run.friendlyId,
83+
`$trigger.input:${params.streamId}`
84+
);
7785

7886
// Check Redis cache for a linked .wait() waitpoint (fast, no DB hit if none)
7987
const waitpointId = await getAndDeleteInputStreamWaitpoint(params.runId, params.streamId);
@@ -92,4 +100,73 @@ const { action, loader } = createActionApiRoute(
92100
}
93101
);
94102

103+
// GET: SSE stream for reading input stream data (used by the in-task SSE tail)
104+
const loader = createLoaderApiRoute(
105+
{
106+
params: ParamsSchema,
107+
allowJWT: true,
108+
corsStrategy: "all",
109+
findResource: async (params, auth) => {
110+
return $replica.taskRun.findFirst({
111+
where: {
112+
friendlyId: params.runId,
113+
runtimeEnvironmentId: auth.environment.id,
114+
},
115+
include: {
116+
batch: {
117+
select: {
118+
friendlyId: true,
119+
},
120+
},
121+
},
122+
});
123+
},
124+
authorization: {
125+
action: "read",
126+
resource: (run) => ({
127+
runs: run.friendlyId,
128+
tags: run.runTags,
129+
batch: run.batch?.friendlyId,
130+
tasks: run.taskIdentifier,
131+
}),
132+
superScopes: ["read:runs", "read:all", "admin"],
133+
},
134+
},
135+
async ({ params, request, resource: run, authentication }) => {
136+
const lastEventId = request.headers.get("Last-Event-ID") || undefined;
137+
138+
const timeoutInSecondsRaw = request.headers.get("Timeout-Seconds") ?? undefined;
139+
const timeoutInSeconds = timeoutInSecondsRaw ? parseInt(timeoutInSecondsRaw) : undefined;
140+
141+
if (timeoutInSeconds && isNaN(timeoutInSeconds)) {
142+
return new Response("Invalid timeout seconds", { status: 400 });
143+
}
144+
145+
if (timeoutInSeconds && timeoutInSeconds < 1) {
146+
return new Response("Timeout seconds must be greater than 0", { status: 400 });
147+
}
148+
149+
if (timeoutInSeconds && timeoutInSeconds > 600) {
150+
return new Response("Timeout seconds must be less than 600", { status: 400 });
151+
}
152+
153+
const realtimeStream = getRealtimeStreamInstance(
154+
authentication.environment,
155+
run.realtimeStreamsVersion
156+
);
157+
158+
// Read from the internal S2 stream name (prefixed to avoid user stream collisions)
159+
return realtimeStream.streamResponse(
160+
request,
161+
run.friendlyId,
162+
`$trigger.input:${params.streamId}`,
163+
request.signal,
164+
{
165+
lastEventId,
166+
timeoutInSeconds,
167+
}
168+
);
169+
}
170+
);
171+
95172
export { action, loader };

internal-packages/database/prisma/migrations/20260222130000_add_input_stream_waitpoint_columns/migration.sql

Lines changed: 0 additions & 3 deletions
This file was deleted.

internal-packages/database/prisma/migrations/20260222130001_add_input_stream_waitpoint_index/migration.sql

Lines changed: 0 additions & 4 deletions
This file was deleted.

internal-packages/database/prisma/schema.prisma

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,11 +1074,6 @@ model Waitpoint {
10741074
createdAt DateTime @default(now())
10751075
updatedAt DateTime @updatedAt
10761076
1077-
/// If this waitpoint is linked to an input stream .wait(), this is the run's friendlyId
1078-
inputStreamRunFriendlyId String?
1079-
/// If this waitpoint is linked to an input stream .wait(), this is the stream ID
1080-
inputStreamId String?
1081-
10821077
/// Denormized column that holds the raw tags
10831078
/// Denormalized column that holds the raw tags
10841079
tags String[]

internal-packages/run-engine/src/engine/index.ts

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1481,20 +1481,6 @@ export class RunEngine {
14811481
});
14821482
}
14831483

1484-
/** Finds and completes a PENDING waitpoint linked to an input stream (DB fallback). */
1485-
async completeInputStreamWaitpoint(params: {
1486-
environmentId: string;
1487-
runFriendlyId: string;
1488-
streamId: string;
1489-
output?: {
1490-
value: string;
1491-
type?: string;
1492-
isError: boolean;
1493-
};
1494-
}): Promise<Waitpoint | null> {
1495-
return this.waitpointSystem.completeInputStreamWaitpoint(params);
1496-
}
1497-
14981484
/**
14991485
* This gets called AFTER the checkpoint has been created
15001486
* The CPU/Memory checkpoint at this point exists in our snapshot storage

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -266,17 +266,13 @@ export class WaitpointSystem {
266266
idempotencyKeyExpiresAt,
267267
timeout,
268268
tags,
269-
inputStreamRunFriendlyId,
270-
inputStreamId,
271269
}: {
272270
environmentId: string;
273271
projectId: string;
274272
idempotencyKey?: string;
275273
idempotencyKeyExpiresAt?: Date;
276274
timeout?: Date;
277275
tags?: string[];
278-
inputStreamRunFriendlyId?: string;
279-
inputStreamId?: string;
280276
}): Promise<{ waitpoint: Waitpoint; isCached: boolean }> {
281277
const existingWaitpoint = idempotencyKey
282278
? await this.$.prisma.waitpoint.findFirst({
@@ -332,8 +328,6 @@ export class WaitpointSystem {
332328
projectId,
333329
completedAfter: timeout,
334330
tags,
335-
inputStreamRunFriendlyId,
336-
inputStreamId,
337331
},
338332
update: {},
339333
});
@@ -370,40 +364,6 @@ export class WaitpointSystem {
370364
throw new Error(`Failed to create waitpoint after ${maxRetries} attempts due to conflicts.`);
371365
}
372366

373-
/**
374-
* Finds and completes a PENDING waitpoint linked to an input stream.
375-
* This is the DB fallback path used when the Redis cache misses.
376-
* Returns the completed waitpoint, or null if no matching waitpoint exists.
377-
*/
378-
async completeInputStreamWaitpoint({
379-
environmentId,
380-
runFriendlyId,
381-
streamId,
382-
output,
383-
}: {
384-
environmentId: string;
385-
runFriendlyId: string;
386-
streamId: string;
387-
output?: {
388-
value: string;
389-
type?: string;
390-
isError: boolean;
391-
};
392-
}): Promise<Waitpoint | null> {
393-
const waitpoint = await this.$.prisma.waitpoint.findFirst({
394-
where: {
395-
environmentId,
396-
inputStreamRunFriendlyId: runFriendlyId,
397-
inputStreamId: streamId,
398-
status: "PENDING",
399-
},
400-
});
401-
402-
if (!waitpoint) return null;
403-
404-
return this.completeWaitpoint({ id: waitpoint.id, output });
405-
}
406-
407367
/**
408368
* Prevents a run from continuing until the waitpoint is completed.
409369
*

packages/core/src/v3/inputStreams/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ export class InputStreamsAPI implements InputStreamManager {
4747
return this.#getManager().peek(streamId);
4848
}
4949

50-
public get lastSeqNum(): number | undefined {
51-
return this.#getManager().lastSeqNum;
50+
public lastSeqNum(streamId: string): number | undefined {
51+
return this.#getManager().lastSeqNum(streamId);
5252
}
5353

5454
public reset(): void {

0 commit comments

Comments
 (0)