Skip to content

Commit b82d100

Browse files
authored
fix(webapp): harden the realtime session routes (#3890)
## Summary Reliability and authorization fixes for realtime chat sessions: - Session-stream waitpoint delivery is scoped to the environment, so two environments using the same session `externalId` can no longer complete each other's waitpoints. - The session snapshot-url routes now enforce per-session authorization, and appending to a session's `out` channel requires secret-key auth, so a session-scoped token can't read another session's snapshot or forge assistant output. - Appends that carry an `X-Part-Id` header are deduplicated on retry, so a retried send can't duplicate a message. - Session creation rejects expired sessions (instead of triggering a run that can never receive input), `externalId` is immutable after creation, and the sessions list endpoint returns friendly `run_*` ids to match the single-session routes. ## Rollout The waitpoint cache key gains an environment prefix. To keep waitpoints registered by the previous deploy working across the boundary, the drain reads both the new and the previous key for this release; the legacy read can be removed a release later once no pre-deploy waitpoints remain.
1 parent f9d57d3 commit b82d100

9 files changed

Lines changed: 329 additions & 56 deletions
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Hardening fixes for realtime sessions: stricter authorization on snapshot URLs and out-channel appends, environment-scoped message delivery for waiting runs, and idempotent appends via the X-Part-Id header. Session creation now rejects expired sessions, externalId can no longer be changed after creation, and the sessions list returns friendly run ids.

apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,14 @@ const { action, loader } = createActionApiRoute(
106106
});
107107

108108
// Step 2: Register the waitpoint on the session channel so the next
109-
// append fires it. Keyed by (addressingKey, io) — the canonical
110-
// string for the row. The append handler drains by the same
111-
// canonical key, so writers and readers converge regardless of
112-
// which URL form the agent vs. the appending caller used.
109+
// append fires it. Keyed by (environmentId, addressingKey, io) — the
110+
// canonical string for the row, scoped to the environment because
111+
// externalIds are only unique per environment. The append handler
112+
// drains by the same key, so writers and readers converge regardless
113+
// of which URL form the agent vs. the appending caller used.
113114
const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined;
114115
await addSessionStreamWaitpoint(
116+
authentication.environment.id,
115117
addressingKey,
116118
body.io,
117119
result.waitpoint.id,
@@ -152,6 +154,7 @@ const { action, loader } = createActionApiRoute(
152154
});
153155

154156
await removeSessionStreamWaitpoint(
157+
authentication.environment.id,
155158
addressingKey,
156159
body.io,
157160
result.waitpoint.id

apps/webapp/app/routes/api.v1.sessions.$session.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,24 @@ const { action } = createActionApiRoute(
7474
return json({ error: "Session not found" }, { status: 404 });
7575
}
7676

77+
// The externalId is the canonical addressing key once set: the S2
78+
// stream names, the waitpoint cache key, and the minted session PAT
79+
// scope all derive from it. Re-keying a session would orphan its
80+
// streams (the chat goes silent) and invalidate the PAT's scope, so
81+
// reject any change. Same-value PATCHes stay idempotent.
82+
if (
83+
body.externalId !== undefined &&
84+
body.externalId !== existing.externalId
85+
) {
86+
return json(
87+
{
88+
error:
89+
"externalId cannot be changed after creation; close this session and create a new one with the desired externalId",
90+
},
91+
{ status: 422 }
92+
);
93+
}
94+
7795
try {
7896
const updated = await prisma.session.update({
7997
where: { id: existing.id },

apps/webapp/app/routes/api.v1.sessions.$sessionId.snapshot-url.ts

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { $replica } from "~/db.server";
44
import { chatSnapshotStorageKey } from "~/services/realtime/chatSnapshot.server";
55
import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server";
66
import {
7+
anyResource,
78
createActionApiRoute,
89
createLoaderApiRoute,
910
} from "~/services/routeBuilders/apiBuilder.server";
@@ -21,8 +22,31 @@ const routeConfig = {
2122
resolveSessionByIdOrExternalId($replica, auth.environment.id, params.sessionId),
2223
};
2324

25+
// Authorize against the union of the URL form, friendlyId, and externalId —
26+
// same shape as the sibling session routes. Without an authorization block
27+
// the route builder skips scope checks entirely, so any session-scoped JWT
28+
// in the environment could presign URLs for any other session's snapshot.
29+
function sessionResource(
30+
paramId: string,
31+
session: { friendlyId: string; externalId: string | null } | null | undefined
32+
) {
33+
const ids = new Set<string>([paramId]);
34+
if (session) {
35+
ids.add(session.friendlyId);
36+
if (session.externalId) ids.add(session.externalId);
37+
}
38+
return anyResource([...ids].map((id) => ({ type: "sessions" as const, id })));
39+
}
40+
2441
export const { action } = createActionApiRoute(
25-
{ ...routeConfig, method: "PUT" },
42+
{
43+
...routeConfig,
44+
method: "PUT",
45+
authorization: {
46+
action: "write",
47+
resource: (params, _, __, ___, session) => sessionResource(params.sessionId, session),
48+
},
49+
},
2650
async ({ authentication, resource: session }) => {
2751
if (!session) {
2852
return json({ error: "Session not found" }, { status: 404 });
@@ -42,7 +66,15 @@ export const { action } = createActionApiRoute(
4266
}
4367
);
4468

45-
export const loader = createLoaderApiRoute(routeConfig, async ({ authentication, resource: session }) => {
69+
export const loader = createLoaderApiRoute(
70+
{
71+
...routeConfig,
72+
authorization: {
73+
action: "read",
74+
resource: (session, params) => sessionResource(params.sessionId, session),
75+
},
76+
},
77+
async ({ authentication, resource: session }) => {
4678
if (!session) {
4779
return json({ error: "Session not found" }, { status: 404 });
4880
}

apps/webapp/app/routes/api.v1.sessions.ts

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ import {
1818
type SessionTriggerConfig,
1919
} from "~/services/realtime/sessionRunManager.server";
2020
import { chatSnapshotStoragePathForSession } from "~/services/realtime/chatSnapshot.server";
21-
import { serializeSession } from "~/services/realtime/sessions.server";
21+
import {
22+
serializeSession,
23+
serializeSessionsWithFriendlyRunIds,
24+
} from "~/services/realtime/sessions.server";
2225
import { SessionsRepository } from "~/services/sessionsRepository/sessionsRepository.server";
2326
import {
2427
anyResource,
@@ -91,17 +94,29 @@ export const loader = createLoaderApiRoute(
9194
},
9295
});
9396

94-
return json<ListSessionsResponseBody>({
95-
data: rows.map((row) =>
96-
serializeSession({
97-
...row,
98-
// Columns the list query doesn't select — filled so `serializeSession`
99-
// can operate on a narrowed payload without type errors.
100-
projectId: authentication.environment.projectId,
101-
environmentType: authentication.environment.type,
102-
organizationId: authentication.environment.organizationId,
103-
} as Session)
97+
// Batched friendlyId translation: `currentRunId` on the wire is the
98+
// public `run_*` form, matching the single-session routes. One `IN`
99+
// lookup per page.
100+
const data = await serializeSessionsWithFriendlyRunIds(
101+
rows.map(
102+
(row) =>
103+
({
104+
...row,
105+
// Columns the list query doesn't select — filled so the
106+
// serializer can operate on a narrowed payload without type errors.
107+
projectId: authentication.environment.projectId,
108+
environmentType: authentication.environment.type,
109+
organizationId: authentication.environment.organizationId,
110+
}) as Session
104111
),
112+
{
113+
projectId: authentication.environment.projectId,
114+
runtimeEnvironmentId: authentication.environment.id,
115+
}
116+
);
117+
118+
return json<ListSessionsResponseBody>({
119+
data,
105120
pagination: {
106121
...(pagination.nextCursor ? { next: pagination.nextCursor } : {}),
107122
...(pagination.previousCursor ? { previous: pagination.previousCursor } : {}),
@@ -225,6 +240,17 @@ const { action } = createActionApiRoute(
225240
);
226241
}
227242

243+
// Same guard as the append / end-and-continue handlers: an expired
244+
// row must not spawn a run, because every subsequent `.in/append`
245+
// would 400 on the expiry check — a run boots but the chat can
246+
// never receive input.
247+
if (session.expiresAt && session.expiresAt.getTime() < Date.now()) {
248+
return json(
249+
{ error: "Session is expired; use a different externalId to create a new session" },
250+
{ status: 409 }
251+
);
252+
}
253+
228254
// Session is task-bound — every session has a live run by
229255
// construction. `ensureRunForSession` is idempotent: on the
230256
// cached path it sees `currentRunId` is alive and returns it

apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@ import {
1111
resolveSessionByIdOrExternalId,
1212
} from "~/services/realtime/sessions.server";
1313
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
14-
import { drainSessionStreamWaitpoints } from "~/services/sessionStreamWaitpointCache.server";
14+
import {
15+
claimSessionStreamPart,
16+
drainSessionStreamWaitpoints,
17+
releaseSessionStreamPart,
18+
} from "~/services/sessionStreamWaitpointCache.server";
1519
import {
1620
anyResource,
1721
createActionApiRoute,
@@ -91,6 +95,17 @@ const { action, loader } = createActionApiRoute(
9195
);
9296
}
9397

98+
// `.out` is the agent→client channel. Only PRIVATE (secret key) auth —
99+
// i.e. the agent run itself — may write to it. Session-scoped JWTs carry
100+
// `write:sessions:<key>` for `.in`; without this gate they could forge
101+
// assistant chunks and complete `.out` waitpoints on their own session.
102+
if (params.io === "out" && authentication.type !== "PRIVATE") {
103+
return json(
104+
{ ok: false, error: "Appending to the out channel requires secret key authentication" },
105+
{ status: 403 }
106+
);
107+
}
108+
94109
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
95110
session,
96111
});
@@ -132,25 +147,54 @@ const { action, loader } = createActionApiRoute(
132147
const addressingKey = canonicalSessionAddressingKey(session, params.session);
133148

134149
const part = await request.text();
135-
const partId = request.headers.get("X-Part-Id") ?? nanoid(7);
150+
const clientPartId = request.headers.get("X-Part-Id");
151+
const partId = clientPartId ?? nanoid(7);
136152

137-
const [appendError] = await tryCatch(
138-
realtimeStream.appendPartToSessionStream(part, partId, addressingKey, params.io)
139-
);
153+
// Idempotency on client-supplied part ids: atomically claim the id before
154+
// appending. A concurrent or retried POST that loses the claim skips the
155+
// append (no duplicate record) but still falls through to the drain below,
156+
// so a retry whose first attempt died before waking the waitpoint can still
157+
// recover it. The claim is released on append failure so a genuine retry
158+
// can re-claim and proceed.
159+
const wonClaim = clientPartId
160+
? await claimSessionStreamPart(
161+
authentication.environment.id,
162+
addressingKey,
163+
params.io,
164+
clientPartId
165+
)
166+
: true;
167+
168+
if (wonClaim) {
169+
const [appendError] = await tryCatch(
170+
realtimeStream.appendPartToSessionStream(part, partId, addressingKey, params.io)
171+
);
140172

141-
if (appendError) {
142-
if (appendError instanceof ServiceValidationError) {
173+
if (appendError) {
174+
if (clientPartId) {
175+
await releaseSessionStreamPart(
176+
authentication.environment.id,
177+
addressingKey,
178+
params.io,
179+
clientPartId
180+
);
181+
}
182+
if (appendError instanceof ServiceValidationError) {
183+
return json(
184+
{ ok: false, error: appendError.message },
185+
{ status: appendError.status ?? 422 }
186+
);
187+
}
188+
logger.error("Failed to append to session stream", {
189+
sessionId: session.id,
190+
io: params.io,
191+
error: appendError,
192+
});
143193
return json(
144-
{ ok: false, error: appendError.message },
145-
{ status: appendError.status ?? 422 }
194+
{ ok: false, error: "Something went wrong, please try again." },
195+
{ status: 500 }
146196
);
147197
}
148-
logger.error("Failed to append to session stream", {
149-
sessionId: session.id,
150-
io: params.io,
151-
error: appendError,
152-
});
153-
return json({ ok: false, error: "Something went wrong, please try again." }, { status: 500 });
154198
}
155199

156200
// Fire any run-scoped waitpoints registered against this channel. Best
@@ -160,7 +204,7 @@ const { action, loader } = createActionApiRoute(
160204
// `sessions.open(...).in.wait()`, so writers and readers converge
161205
// regardless of which URL form they used.
162206
const [drainError, waitpointIds] = await tryCatch(
163-
drainSessionStreamWaitpoints(addressingKey, params.io)
207+
drainSessionStreamWaitpoints(authentication.environment.id, addressingKey, params.io)
164208
);
165209
if (drainError) {
166210
logger.error("Failed to drain session stream waitpoints", {

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.playground.realtime.v1.sessions.$session.$io.append.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
128128
// Drain any waitpoints registered for this channel — same as the
129129
// public append. Best-effort; failure doesn't fail the append.
130130
const [drainError, waitpointIds] = await tryCatch(
131-
drainSessionStreamWaitpoints(addressingKey, io)
131+
drainSessionStreamWaitpoints(environment.id, addressingKey, io)
132132
);
133133
if (drainError) {
134134
logger.error("Failed to drain session stream waitpoints (playground)", {

apps/webapp/app/services/realtime/sessions.server.ts

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,10 @@ export function canonicalSessionAddressingKey(
7575
*
7676
* Note: `currentRunId` is left as-is — Prisma stores the internal run id
7777
* (cuid), but `SessionItem.currentRunId` is the *friendly* form. Routes
78-
* that emit a single `SessionItem` should use
79-
* {@link serializeSessionWithFriendlyRunId} instead, which resolves the
80-
* friendlyId via a TaskRun lookup. List endpoints stay on this raw form
81-
* to avoid N+1 lookups when paginating.
78+
* that emit `SessionItem`s must translate: single-row endpoints via
79+
* {@link serializeSessionWithFriendlyRunId}, list endpoints via the
80+
* batched {@link serializeSessionsWithFriendlyRunIds}. Never put this
81+
* raw form on the wire directly.
8282
*/
8383
export function serializeSession(session: Session): SessionItem {
8484
return {
@@ -125,3 +125,38 @@ export async function serializeSessionWithFriendlyRunId(
125125
currentRunId: run?.friendlyId ?? null,
126126
};
127127
}
128+
129+
/**
130+
* Batched form of {@link serializeSessionWithFriendlyRunId} for list
131+
* endpoints: one `IN` lookup per page instead of N+1. `currentRunId` on
132+
* the wire is always the public `run_*` friendlyId — the raw
133+
* {@link serializeSession} form leaks the internal cuid, which customers
134+
* can't use with `runs.retrieve(...)`.
135+
*/
136+
export async function serializeSessionsWithFriendlyRunIds(
137+
sessions: Session[],
138+
scope: { projectId: string; runtimeEnvironmentId: string }
139+
): Promise<SessionItem[]> {
140+
const runIds = [...new Set(sessions.map((s) => s.currentRunId).filter((id): id is string => !!id))];
141+
142+
// `currentRunId` is a plain string pointer (no FK), so scope the lookup to
143+
// the caller's tenant — a stale value must not resolve a run in another env.
144+
const runs = runIds.length
145+
? await $replica.taskRun.findMany({
146+
where: {
147+
id: { in: runIds },
148+
projectId: scope.projectId,
149+
runtimeEnvironmentId: scope.runtimeEnvironmentId,
150+
},
151+
select: { id: true, friendlyId: true },
152+
})
153+
: [];
154+
const friendlyIdByRunId = new Map(runs.map((run) => [run.id, run.friendlyId]));
155+
156+
return sessions.map((session) => ({
157+
...serializeSession(session),
158+
currentRunId: session.currentRunId
159+
? friendlyIdByRunId.get(session.currentRunId) ?? null
160+
: null,
161+
}));
162+
}

0 commit comments

Comments
 (0)