Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/session-route-hardening.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Hardening fixes for realtime sessions: stricter authorization on snapshot URLs and out-channel appends, environment-scoped message delivery for waiting runs, and idempotent appends via the X-Part-Id header. Session creation now rejects expired sessions, externalId can no longer be changed after creation, and the sessions list returns friendly run ids.
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,14 @@ const { action, loader } = createActionApiRoute(
});

// Step 2: Register the waitpoint on the session channel so the next
// append fires it. Keyed by (addressingKey, io) — the canonical
// string for the row. The append handler drains by the same
// canonical key, so writers and readers converge regardless of
// which URL form the agent vs. the appending caller used.
// append fires it. Keyed by (environmentId, addressingKey, io) — the
// canonical string for the row, scoped to the environment because
// externalIds are only unique per environment. The append handler
// drains by the same key, so writers and readers converge regardless
// of which URL form the agent vs. the appending caller used.
const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined;
await addSessionStreamWaitpoint(
authentication.environment.id,
addressingKey,
body.io,
result.waitpoint.id,
Expand Down Expand Up @@ -152,6 +154,7 @@ const { action, loader } = createActionApiRoute(
});

await removeSessionStreamWaitpoint(
authentication.environment.id,
addressingKey,
body.io,
result.waitpoint.id
Expand Down
18 changes: 18 additions & 0 deletions apps/webapp/app/routes/api.v1.sessions.$session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,24 @@ const { action } = createActionApiRoute(
return json({ error: "Session not found" }, { status: 404 });
}

// The externalId is the canonical addressing key once set: the S2
// stream names, the waitpoint cache key, and the minted session PAT
// scope all derive from it. Re-keying a session would orphan its
// streams (the chat goes silent) and invalidate the PAT's scope, so
// reject any change. Same-value PATCHes stay idempotent.
if (
body.externalId !== undefined &&
body.externalId !== existing.externalId
) {
return json(
{
error:
"externalId cannot be changed after creation; close this session and create a new one with the desired externalId",
},
{ status: 422 }
);
}
Comment thread
ericallam marked this conversation as resolved.

try {
const updated = await prisma.session.update({
where: { id: existing.id },
Expand Down
36 changes: 34 additions & 2 deletions apps/webapp/app/routes/api.v1.sessions.$sessionId.snapshot-url.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { $replica } from "~/db.server";
import { chatSnapshotStorageKey } from "~/services/realtime/chatSnapshot.server";
import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server";
import {
anyResource,
createActionApiRoute,
createLoaderApiRoute,
} from "~/services/routeBuilders/apiBuilder.server";
Expand All @@ -21,8 +22,31 @@ const routeConfig = {
resolveSessionByIdOrExternalId($replica, auth.environment.id, params.sessionId),
};

// Authorize against the union of the URL form, friendlyId, and externalId —
// same shape as the sibling session routes. Without an authorization block
// the route builder skips scope checks entirely, so any session-scoped JWT
// in the environment could presign URLs for any other session's snapshot.
function sessionResource(
paramId: string,
session: { friendlyId: string; externalId: string | null } | null | undefined
) {
const ids = new Set<string>([paramId]);
if (session) {
ids.add(session.friendlyId);
if (session.externalId) ids.add(session.externalId);
}
return anyResource([...ids].map((id) => ({ type: "sessions" as const, id })));
}

export const { action } = createActionApiRoute(
{ ...routeConfig, method: "PUT" },
{
...routeConfig,
method: "PUT",
authorization: {
action: "write",
resource: (params, _, __, ___, session) => sessionResource(params.sessionId, session),
},
},
async ({ authentication, resource: session }) => {
if (!session) {
return json({ error: "Session not found" }, { status: 404 });
Expand All @@ -42,7 +66,15 @@ export const { action } = createActionApiRoute(
}
);

export const loader = createLoaderApiRoute(routeConfig, async ({ authentication, resource: session }) => {
export const loader = createLoaderApiRoute(
{
...routeConfig,
authorization: {
action: "read",
resource: (session, params) => sessionResource(params.sessionId, session),
},
Comment thread
ericallam marked this conversation as resolved.
},
async ({ authentication, resource: session }) => {
if (!session) {
return json({ error: "Session not found" }, { status: 404 });
}
Expand Down
44 changes: 33 additions & 11 deletions apps/webapp/app/routes/api.v1.sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import {
type SessionTriggerConfig,
} from "~/services/realtime/sessionRunManager.server";
import { chatSnapshotStoragePathForSession } from "~/services/realtime/chatSnapshot.server";
import { serializeSession } from "~/services/realtime/sessions.server";
import {
serializeSession,
serializeSessionsWithFriendlyRunIds,
} from "~/services/realtime/sessions.server";
import { SessionsRepository } from "~/services/sessionsRepository/sessionsRepository.server";
import {
anyResource,
Expand Down Expand Up @@ -91,17 +94,25 @@ export const loader = createLoaderApiRoute(
},
});

// Batched friendlyId translation: `currentRunId` on the wire is the
// public `run_*` form, matching the single-session routes. One `IN`
// lookup per page.
const data = await serializeSessionsWithFriendlyRunIds(
rows.map(
(row) =>
({
...row,
// Columns the list query doesn't select — filled so the
// serializer can operate on a narrowed payload without type errors.
projectId: authentication.environment.projectId,
environmentType: authentication.environment.type,
organizationId: authentication.environment.organizationId,
}) as Session
)
);

return json<ListSessionsResponseBody>({
data: rows.map((row) =>
serializeSession({
...row,
// Columns the list query doesn't select — filled so `serializeSession`
// can operate on a narrowed payload without type errors.
projectId: authentication.environment.projectId,
environmentType: authentication.environment.type,
organizationId: authentication.environment.organizationId,
} as Session)
),
data,
pagination: {
...(pagination.nextCursor ? { next: pagination.nextCursor } : {}),
...(pagination.previousCursor ? { previous: pagination.previousCursor } : {}),
Expand Down Expand Up @@ -225,6 +236,17 @@ const { action } = createActionApiRoute(
);
}

// Same guard as the append / end-and-continue handlers: an expired
// row must not spawn a run, because every subsequent `.in/append`
// would 400 on the expiry check — a run boots but the chat can
// never receive input.
if (session.expiresAt && session.expiresAt.getTime() < Date.now()) {
return json(
{ error: "Session is expired; use a different externalId to create a new session" },
{ status: 409 }
);
}

// Session is task-bound — every session has a live run by
// construction. `ensureRunForSession` is idempotent: on the
// cached path it sees `currentRunId` is alive and returns it
Expand Down
49 changes: 46 additions & 3 deletions apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ import {
resolveSessionByIdOrExternalId,
} from "~/services/realtime/sessions.server";
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
import { drainSessionStreamWaitpoints } from "~/services/sessionStreamWaitpointCache.server";
import {
drainSessionStreamWaitpoints,
markSessionStreamPartAppended,
wasSessionStreamPartAppended,
} from "~/services/sessionStreamWaitpointCache.server";
import {
anyResource,
createActionApiRoute,
Expand Down Expand Up @@ -91,6 +95,17 @@ const { action, loader } = createActionApiRoute(
);
}

// `.out` is the agent→client channel. Only PRIVATE (secret key) auth —
// i.e. the agent run itself — may write to it. Session-scoped JWTs carry
// `write:sessions:<key>` for `.in`; without this gate they could forge
// assistant chunks and complete `.out` waitpoints on their own session.
if (params.io === "out" && authentication.type !== "PRIVATE") {
return json(
{ ok: false, error: "Appending to the out channel requires secret key authentication" },
{ status: 403 }
);
}

const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
session,
});
Expand Down Expand Up @@ -132,7 +147,26 @@ const { action, loader } = createActionApiRoute(
const addressingKey = canonicalSessionAddressingKey(session, params.session);

const part = await request.text();
const partId = request.headers.get("X-Part-Id") ?? nanoid(7);
const clientPartId = request.headers.get("X-Part-Id");
const partId = clientPartId ?? nanoid(7);

// Idempotency on client-supplied part ids: a retried POST whose first
// attempt committed is acknowledged without a second append (which
// would duplicate the record and double-fire the waitpoint drain).
// The marker is only written after a successful append, so retries of
// genuinely failed appends still go through. Server-generated ids are
// per-request and carry no dedupe meaning.
if (
clientPartId &&
(await wasSessionStreamPartAppended(
authentication.environment.id,
addressingKey,
params.io,
clientPartId
))
) {
return json({ ok: true }, { status: 200 });
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}

const [appendError] = await tryCatch(
realtimeStream.appendPartToSessionStream(part, partId, addressingKey, params.io)
Expand All @@ -153,14 +187,23 @@ const { action, loader } = createActionApiRoute(
return json({ ok: false, error: "Something went wrong, please try again." }, { status: 500 });
}

if (clientPartId) {
await markSessionStreamPartAppended(
authentication.environment.id,
addressingKey,
params.io,
clientPartId
);
}

// Fire any run-scoped waitpoints registered against this channel. Best
// effort — a failure here must not fail the append (the record is
// durable in S2; the SSE tail will still deliver it). Waitpoints are
// keyed on the canonical addressing key the agent registered with via
// `sessions.open(...).in.wait()`, so writers and readers converge
// regardless of which URL form they used.
const [drainError, waitpointIds] = await tryCatch(
drainSessionStreamWaitpoints(addressingKey, params.io)
drainSessionStreamWaitpoints(authentication.environment.id, addressingKey, params.io)
);
if (drainError) {
logger.error("Failed to drain session stream waitpoints", {
Expand Down
Comment thread
ericallam marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
// Drain any waitpoints registered for this channel — same as the
// public append. Best-effort; failure doesn't fail the append.
const [drainError, waitpointIds] = await tryCatch(
drainSessionStreamWaitpoints(addressingKey, io)
drainSessionStreamWaitpoints(environment.id, addressingKey, io)
);
if (drainError) {
logger.error("Failed to drain session stream waitpoints (playground)", {
Expand Down
36 changes: 32 additions & 4 deletions apps/webapp/app/services/realtime/sessions.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ export function canonicalSessionAddressingKey(
*
* Note: `currentRunId` is left as-is — Prisma stores the internal run id
* (cuid), but `SessionItem.currentRunId` is the *friendly* form. Routes
* that emit a single `SessionItem` should use
* {@link serializeSessionWithFriendlyRunId} instead, which resolves the
* friendlyId via a TaskRun lookup. List endpoints stay on this raw form
* to avoid N+1 lookups when paginating.
* that emit `SessionItem`s must translate: single-row endpoints via
* {@link serializeSessionWithFriendlyRunId}, list endpoints via the
* batched {@link serializeSessionsWithFriendlyRunIds}. Never put this
* raw form on the wire directly.
*/
export function serializeSession(session: Session): SessionItem {
return {
Expand Down Expand Up @@ -125,3 +125,31 @@ export async function serializeSessionWithFriendlyRunId(
currentRunId: run?.friendlyId ?? null,
};
}

/**
* Batched form of {@link serializeSessionWithFriendlyRunId} for list
* endpoints: one `IN` lookup per page instead of N+1. `currentRunId` on
* the wire is always the public `run_*` friendlyId — the raw
* {@link serializeSession} form leaks the internal cuid, which customers
* can't use with `runs.retrieve(...)`.
*/
export async function serializeSessionsWithFriendlyRunIds(
sessions: Session[]
): Promise<SessionItem[]> {
const runIds = [...new Set(sessions.map((s) => s.currentRunId).filter((id): id is string => !!id))];

const runs = runIds.length
? await $replica.taskRun.findMany({
where: { id: { in: runIds } },
select: { id: true, friendlyId: true },
})
Comment thread
ericallam marked this conversation as resolved.
: [];
const friendlyIdByRunId = new Map(runs.map((run) => [run.id, run.friendlyId]));

return sessions.map((session) => ({
...serializeSession(session),
currentRunId: session.currentRunId
? friendlyIdByRunId.get(session.currentRunId) ?? null
: null,
}));
}
Loading