Skip to content

Commit b733908

Browse files
committed
feat(sdk,core,webapp): direct-from-store reads for chat session .out streams
Chat session response streams (.out) read directly from the realtime stream store instead of relaying every chunk through Trigger.dev, removing a network hop from the streamed response. useTriggerChatTransport and AgentChat take the direct path automatically for the active turn, obtain the read grant themselves, and fall back to the relayed path whenever a grant cannot be obtained or used (so failing to get one never breaks the chat). Off by default when a custom baseURL routes .out through your own edge, and gated server-side, so existing setups are unaffected. Set directStreamReads to override.
1 parent ef04cc3 commit b733908

19 files changed

Lines changed: 1288 additions & 68 deletions
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Chat session response streams (`.out`) can now be read directly from the realtime stream store instead of relaying every chunk through Trigger.dev's servers, removing a network hop from the streamed response. `useTriggerChatTransport` and `AgentChat` use the direct path automatically for the active streaming turn. The SDK obtains the read grant on its own (from the session-start response, or from a lightweight grant endpoint if your `startSession` doesn't forward it), refreshes it on turn-complete as it nears expiry, and transparently falls back to the relayed path whenever a grant can't be obtained or used (reconnects, hydrated sessions after a reload, or watch mode). Failing to obtain a grant never breaks the chat. No code changes are required to benefit.
7+
8+
The direct read turns on by default only when you haven't customized `.out` routing. If you set a custom `baseURL`/`streamBaseURL` (e.g. fronting chat traffic with your own edge proxy), it stays off and `.out` follows your routing, so updating changes nothing for those setups. Use `directStreamReads: true`/`false` to override.

apps/webapp/app/env.server.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1740,6 +1740,20 @@ const EnvironmentSchema = z
17401740
REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS: z.coerce.number().int().default(100),
17411741
REALTIME_STREAMS_S2_MAX_RETRIES: z.coerce.number().int().default(10),
17421742
REALTIME_STREAMS_S2_WAIT_SECONDS: z.coerce.number().int().default(60),
1743+
// When "true", `POST /api/v1/sessions` hands the client a read-scoped S2
1744+
// token + endpoint so the browser reads the session's `.out` stream
1745+
// straight from S2 instead of proxying through the realtime host. Off
1746+
// keeps every read on the proxy. Requires real S2 (not s2-lite / a custom
1747+
// endpoint, which don't issue scoped tokens).
1748+
REALTIME_STREAMS_SESSIONS_DIRECT_READ_ENABLED: z.enum(["true", "false"]).default("false"),
1749+
// TTL for the browser-held read token (read-only, scoped to one stream).
1750+
// Long by default so a token rarely expires mid-session; the client also
1751+
// proactively refreshes it on turn-complete as it nears expiry, and falls
1752+
// back to the proxy if a refresh ever can't be obtained in time.
1753+
REALTIME_STREAMS_S2_READ_TOKEN_EXPIRATION_IN_MS: z.coerce
1754+
.number()
1755+
.int()
1756+
.default(60_000 * 60 * 24), // 24 hours
17431757
// When "true", provision a dedicated S2 basin per org and stamp
17441758
// `streamBasinName` on new rows. Off keeps everything on the single
17451759
// basin defined by `REALTIME_STREAMS_S2_BASIN`.

apps/webapp/app/routes/api.v1.sessions.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,19 @@ import {
44
type CreatedSessionResponseBody,
55
ListSessionsQueryParams,
66
type ListSessionsResponseBody,
7+
type SessionDirectReadGrant,
78
type SessionItem,
89
type SessionStatus,
910
} from "@trigger.dev/core/v3";
1011
import { SessionId } from "@trigger.dev/core/v3/isomorphic";
1112
import type { Prisma, Session } from "@trigger.dev/database";
1213
import { $replica, prisma, type PrismaClient } from "~/db.server";
14+
import { env } from "~/env.server";
1315
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
1416
import { logger } from "~/services/logger.server";
1517
import { mintSessionToken } from "~/services/realtime/mintSessionToken.server";
18+
import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server";
19+
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
1620
import {
1721
ensureRunForSession,
1822
type SessionTriggerConfig,
@@ -257,6 +261,35 @@ const { action } = createActionApiRoute(
257261
addressingKey
258262
);
259263

264+
// When direct session reads are enabled, hand the client a read-scoped
265+
// S2 token + endpoint so it reads `.out` straight from S2 instead of
266+
// proxying through the realtime host. Best-effort: a failure here only
267+
// means the client falls back to the proxy, so it must never fail the
268+
// create.
269+
let directReadOut: SessionDirectReadGrant | undefined;
270+
if (env.REALTIME_STREAMS_SESSIONS_DIRECT_READ_ENABLED === "true") {
271+
try {
272+
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
273+
session,
274+
});
275+
if (realtimeStream instanceof S2RealtimeStreams) {
276+
// The `.out` stream is keyed by the canonical addressing key
277+
// (externalId if set, else friendlyId) — the same key the worker
278+
// writes with and the `.out` SSE route reads with. Mint the grant
279+
// against that, NOT the friendlyId, or it points at an empty stream.
280+
const grant = await realtimeStream.issueSessionOutReadGrant(addressingKey);
281+
if (grant) {
282+
directReadOut = { provider: "s2", ...grant };
283+
}
284+
}
285+
} catch (error) {
286+
logger.warn("Failed to mint session direct-read grant; client will use the proxy", {
287+
sessionId: session.id,
288+
error,
289+
});
290+
}
291+
}
292+
260293
const sessionItem: SessionItem = {
261294
...serializeSession(session),
262295
triggerConfig: session.triggerConfig as unknown as SessionTriggerConfig,
@@ -268,6 +301,7 @@ const { action } = createActionApiRoute(
268301
runId: run.friendlyId,
269302
publicAccessToken,
270303
isCached,
304+
...(directReadOut ? { directReadOut } : {}),
271305
};
272306

273307
return json<CreatedSessionResponseBody>(responseBody, {
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import { json } from "@remix-run/server-runtime";
2+
import { type SessionDirectReadGrant } from "@trigger.dev/core/v3";
3+
import { z } from "zod";
4+
import { $replica } from "~/db.server";
5+
import { env } from "~/env.server";
6+
import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server";
7+
import {
8+
canonicalSessionAddressingKey,
9+
isSessionFriendlyIdForm,
10+
resolveSessionByIdOrExternalId,
11+
} from "~/services/realtime/sessions.server";
12+
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
13+
import {
14+
anyResource,
15+
createLoaderApiRoute,
16+
} from "~/services/routeBuilders/apiBuilder.server";
17+
18+
const ParamsSchema = z.object({
19+
session: z.string(),
20+
});
21+
22+
const SearchParamsSchema = z.object({
23+
// `peek=1` is sent on the reconnect handshake: do the settled-peek
24+
// server-side (next to S2) and return the verdict alongside the grant so the
25+
// client doesn't make its own client→S2 peek round-trip. Absent on the
26+
// active send path (no verdict needed there).
27+
peek: z.string().optional(),
28+
});
29+
30+
/**
31+
* GET /realtime/v1/sessions/:session/out/grant
32+
*
33+
* Mint a fresh direct-read grant for a session's `.out` stream so the client
34+
* can keep reading directly from S2 without the token expiring mid-session.
35+
* Lightweight: authed by the session PAT the client already holds, no run
36+
* trigger, no DB write. Returns `{ directReadOut: null }` when direct reads
37+
* aren't available (flag off, or a backend that can't mint scoped tokens) so
38+
* the client transparently stays on the proxied read.
39+
*/
40+
const loader = createLoaderApiRoute(
41+
{
42+
params: ParamsSchema,
43+
searchParams: SearchParamsSchema,
44+
allowJWT: true,
45+
corsStrategy: "all",
46+
findResource: async (params, auth) => {
47+
const row = await resolveSessionByIdOrExternalId(
48+
$replica,
49+
auth.environment.id,
50+
params.session
51+
);
52+
if (!row && isSessionFriendlyIdForm(params.session)) {
53+
return undefined; // 404 — opaque friendlyId must reference a real row
54+
}
55+
return {
56+
row,
57+
addressingKey: canonicalSessionAddressingKey(row, params.session),
58+
};
59+
},
60+
authorization: {
61+
action: "read",
62+
resource: ({ row, addressingKey }) => {
63+
const ids = new Set<string>([addressingKey]);
64+
if (row) {
65+
ids.add(row.friendlyId);
66+
if (row.externalId) ids.add(row.externalId);
67+
}
68+
return anyResource([...ids].map((id) => ({ type: "sessions", id })));
69+
},
70+
},
71+
},
72+
async ({ authentication, resource, searchParams }) => {
73+
let directReadOut: SessionDirectReadGrant | null = null;
74+
// Reconnect settled-peek verdict, returned only when the client asks for
75+
// it (`?peek=1`). `settled` undefined means "not peeked / unavailable" —
76+
// the client then does its own peek.
77+
let settled: boolean | undefined;
78+
let tailSeq: number | undefined;
79+
80+
if (env.REALTIME_STREAMS_SESSIONS_DIRECT_READ_ENABLED === "true") {
81+
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
82+
session: resource.row,
83+
organization: resource.row ? null : authentication.environment.organization,
84+
});
85+
86+
if (realtimeStream instanceof S2RealtimeStreams) {
87+
const grant = await realtimeStream.issueSessionOutReadGrant(resource.addressingKey);
88+
if (grant) {
89+
directReadOut = { provider: "s2", ...grant };
90+
91+
// Piggyback the settled-peek on the grant fetch the reconnecting
92+
// client is already making — done here next to S2 (sub-ms) so the
93+
// client can pick direct-drain vs direct-stream without its own peek.
94+
if (searchParams.peek === "1") {
95+
const verdict = await realtimeStream.peekSessionOutSettled(resource.addressingKey);
96+
settled = verdict.settled;
97+
tailSeq = verdict.tailSeq;
98+
}
99+
}
100+
}
101+
}
102+
103+
return json({ directReadOut, settled, tailSeq });
104+
}
105+
);
106+
107+
export { loader };

0 commit comments

Comments
 (0)