-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathchat.ts
More file actions
1568 lines (1441 loc) · 59 KB
/
Copy pathchat.ts
File metadata and controls
1568 lines (1441 loc) · 59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* @module @trigger.dev/sdk/chat
*
* Browser-safe module for AI SDK chat transport integration.
* Use this on the frontend with the AI SDK's `useChat` hook.
*
* For backend helpers (`chatAgent`, `pipeChat`), use `@trigger.dev/sdk/ai` instead.
*
* @example
* ```tsx
* import { useChat } from "@ai-sdk/react";
* import { TriggerChatTransport } from "@trigger.dev/sdk/chat";
*
* function Chat() {
* const { messages, sendMessage, status } = useChat({
* transport: new TriggerChatTransport({
* task: "my-chat-task",
* accessToken: async ({ chatId }) => fetchSessionToken(chatId),
* startSession: async ({ chatId, taskId }) => createChatSession({ chatId, taskId }),
* }),
* });
* }
* ```
*/
import type { ChatTransport, UIMessage, UIMessageChunk, ChatRequestOptions } from "ai";
import {
controlSubtype,
headerValue,
PUBLIC_ACCESS_TOKEN_HEADER,
SSEStreamSubscription,
TRIGGER_CONTROL_SUBTYPE,
} from "@trigger.dev/core/v3";
import { ChatTabCoordinator } from "./chat-tab-coordinator.js";
import type { ChatInputChunk, ChatTaskWirePayload } from "./ai-shared.js";
import { resolveChatStreamBaseURL, slimSubmitMessageForWire } from "./ai-shared.js";
const DEFAULT_BASE_URL = "https://api.trigger.dev";
const DEFAULT_STREAM_TIMEOUT_SECONDS = 120;
/**
* Discriminator passed to per-endpoint `baseURL` and `fetch` callbacks.
*
* - `"in"` — `POST /realtime/v1/sessions/{chatId}/in/append` (user messages,
* stops, actions).
* - `"out"` — `GET /realtime/v1/sessions/{chatId}/out` (SSE response stream).
*
* Other endpoints (`/api/v1/sessions`, `/api/v1/auth/jwt/claims`) are reached
* from the server-side `chat.createStartSessionAction` and `accessToken`
* callback, not the transport — they accept the same callback shape on their
* own option objects.
*/
export type ChatTransportEndpoint = "in" | "out";
/** Context passed to `baseURL` and `fetch` callbacks. */
export type ChatTransportEndpointContext = {
endpoint: ChatTransportEndpoint;
chatId: string;
};
/** Resolver form of `baseURL` — return the base for the given endpoint. */
export type ChatBaseURLResolver = (ctx: ChatTransportEndpointContext) => string;
/**
* Per-request fetch override. Receives the fully-resolved URL and the
* RequestInit the transport would have used, plus endpoint context for
* routing decisions. Customers can rewrite the URL, inject headers, or
* delegate to a custom transport (e.g. a Cloudflare worker fronting
* `api.trigger.dev`). Must return a `Response` semantically equivalent to
* what `globalThis.fetch(url, init)` would have returned.
*/
export type ChatFetchOverride = (
url: string,
init: RequestInit,
ctx: ChatTransportEndpointContext
) => Promise<Response>;
/**
* Detect 401/403 from realtime/input-stream calls without relying on `instanceof`
* (Vitest can load duplicate `@trigger.dev/core` copies, which breaks subclass checks).
*/
function isAuthError(error: unknown): boolean {
if (error === null || typeof error !== "object") return false;
const e = error as { name?: string; status?: number };
return e.name === "TriggerApiError" && (e.status === 401 || e.status === 403);
}
/**
* Detect a 404 from a session-PAT-authed call — i.e. the session doesn't
* exist in the current environment. Happens when hydrated session state
* (the `sessions` option / a customer's persisted record) was created in a
* different trigger environment, or predates the upgrade to the sessions
* model, so there's no real Session row behind the cached PAT.
*/
function isSessionNotFoundError(error: unknown): boolean {
if (error === null || typeof error !== "object") return false;
const e = error as { name?: string; status?: number };
return e.name === "TriggerApiError" && e.status === 404;
}
/**
* Parses an SSE byte/text stream of `data: <UIMessageChunk JSON>\n\n`
* frames back into `UIMessageChunk` objects. Used by the handover
* first-turn path to convert the customer's route handler response
* (which is AI-SDK-shaped SSE text) into the chunk form the AI SDK's
* `useChat` consumes from a transport.
*
* Spec-light parser — assumes well-formed `data:` events from our own
* `chat.handover` SSE writer. Lines starting with `:` (comments) and
* other event types are ignored.
*/
function parseUIMessageSseTransform(): TransformStream<string, UIMessageChunk> {
let buffer = "";
return new TransformStream<string, UIMessageChunk>({
transform(chunk, controller) {
buffer += chunk;
// Frames are separated by blank lines.
let idx = buffer.indexOf("\n\n");
while (idx !== -1) {
const frame = buffer.slice(0, idx);
buffer = buffer.slice(idx + 2);
for (const line of frame.split("\n")) {
if (line.startsWith("data: ")) {
const data = line.slice(6).trim();
if (!data) continue;
try {
controller.enqueue(JSON.parse(data) as UIMessageChunk);
} catch {
/* drop malformed chunk; the response source is our own writer */
}
}
}
idx = buffer.indexOf("\n\n");
}
},
flush(controller) {
// Trailing data without a closing blank line — treat as a final frame.
if (buffer.trim().length === 0) return;
for (const line of buffer.split("\n")) {
if (line.startsWith("data: ")) {
const data = line.slice(6).trim();
if (!data) continue;
try {
controller.enqueue(JSON.parse(data) as UIMessageChunk);
} catch {
/* drop */
}
}
}
buffer = "";
},
});
}
/**
* Arguments for the `accessToken` callback. The transport invokes this
* whenever it needs a fresh session-scoped PAT — initial use, and
* after a 401 from any session-PAT-authed request.
*
* The callback's job is to return a token, not to start a run.
* Customers whose implementation also creates the session (typical for
* `chat.createStartSessionAction` server actions) own the trigger
* payload server-side — they know their own user/context and don't
* need anything from the browser to populate `basePayload.metadata`.
*/
export type AccessTokenParams = {
/** Conversation id — same value passed to `sendMessage` / `useChat`. */
chatId: string;
};
/**
* Arguments for the `startSession` callback. The transport invokes this
* when it needs a session for a chatId — on `transport.preload(chatId)`,
* on `transport.start(chatId)`, and lazily on the first `sendMessage`
* for any chatId without a cached session.
*
* The callback typically wraps a server action that calls
* `chat.createStartSessionAction(taskId)({ chatId, clientData })`. That
* action is idempotent on `(env, externalId)`, so concurrent / repeat
* calls converge on the same session.
*
* The `clientData` field carries the transport's current `clientData`
* option — same value the transport merges into per-turn `metadata` on
* each `.in` chunk. Passing it through `startSession` makes the first
* run's `payload.metadata` (visible in `onPreload` / `onChatStart`)
* match what subsequent turns see.
*
* @typeParam TClientData – Type of the agent's `clientDataSchema` (when
* the transport is parameterised with `useTriggerChatTransport<typeof agent>`).
*/
export type StartSessionParams<TClientData = unknown> = {
/** The Trigger.dev task ID associated with this transport. */
taskId: string;
/** Conversation id — same value passed to `sendMessage` / `useChat`. */
chatId: string;
/**
* The transport's current `clientData`. Pass through to the server
* action's `basePayload.metadata` so the first run's `payload.metadata`
* matches per-turn `metadata`.
*/
clientData: TClientData;
};
/**
* Result returned from the `startSession` callback. Carries the
* session-scoped PAT the transport caches and uses for every
* `.in/append`, `.out` SSE, and `end-and-continue` call afterward.
*/
export type StartSessionResult = {
/** Session-scoped PAT — `read:sessions:{chatId} + write:sessions:{chatId}`. */
publicAccessToken: string;
};
/**
* Public surface of {@link TriggerChatTransport}'s session state. Everything
* the customer should persist for resumption across page reloads. The
* transport addresses by `chatId` everywhere, so this is light: just a PAT,
* the last SSE event id, and a couple of UX-state flags.
*/
export type ChatSessionPersistedState = {
publicAccessToken: string;
lastEventId?: string;
isStreaming?: boolean;
};
/**
* Common options for the {@link TriggerChatTransport}.
*
* @typeParam TClientData – Type of the per-call client data merged into
* the wire payload via `metadata`. When the task uses `clientDataSchema`,
* pin this to the schema's input type for end-to-end type safety.
*/
export type TriggerChatTransportOptions<TClientData = unknown> = {
/**
* The Trigger.dev task ID this transport drives. Sessions created by
* `transport.start(chatId)` are bound to this task — every run the
* Session schedules invokes it. Threaded into `startSession` so the
* customer's server action knows which task to bind.
*/
task: string;
/**
* Returns a fresh session-scoped PAT for an existing chat session.
* The transport invokes this on a 401/403 from any session-PAT-authed
* request — pure refresh, never creates a session.
*
* Customer implementation typically does
* `auth.createPublicToken({ scopes: { read: { sessions: chatId },
* write: { sessions: chatId } } })` server-side and returns the token.
*
* Required so the transport can recover from PAT expiry — never
* leaves the consumer in an unrecoverable state.
*/
accessToken: (params: AccessTokenParams) => string | Promise<string>;
/**
* Creates (or no-ops on existing) a session for the given chatId, and
* returns the session-scoped PAT the transport will use afterward.
*
* Wraps a server action that calls
* `chat.createStartSessionAction(taskId)({ chatId, clientData })`.
* Customer's server controls authorization, the rest of the
* triggerConfig, and any atomic DB writes paired with session creation.
*
* The transport invokes this:
* - when `transport.start(chatId)` / `transport.preload(chatId)` is called
* - lazily on the first `sendMessage` for a chatId with no cached PAT
*
* Concurrent and repeat calls dedupe via an in-flight promise + the
* customer-side idempotency on `(env, externalId)`.
*
* Optional only when the customer fully manages session lifecycle
* externally (hydrating `sessions: { ... }` and never calling
* `start` / `preload`). Most customers should provide it.
*/
startSession?: (
params: StartSessionParams<
TClientData extends Record<string, unknown> ? TClientData : Record<string, unknown>
>
) => Promise<StartSessionResult>;
/**
* Base URL for the Trigger.dev API. Either a single string applied to every
* endpoint, or a function called per request that picks a base URL from the
* endpoint discriminator and chat ID. @default "https://api.trigger.dev"
*
* When a string base URL points at Trigger.dev Cloud (`https://api.trigger.dev`),
* the realtime session endpoints (`in`/`out`) are routed to the dedicated
* realtime host (`https://realtime.trigger.dev`) automatically. Pass a
* resolver function to take full control and opt out of this.
*
* @example Route appends through a proxy, SSE direct:
* ```ts
* baseURL: ({ endpoint }) =>
* endpoint === "out" ? "https://api.trigger.dev" : "https://proxy.example.com",
* ```
*/
baseURL?: string | ChatBaseURLResolver;
/**
* Base URL for the SSE stream subscription only (`GET .../sessions/{chatId}/out`).
* @deprecated Pass a function for `baseURL` instead and branch on
* `endpoint === "out"`. `streamBaseURL` continues to work for backwards
* compatibility and wins over `baseURL` for the SSE endpoint when both
* are set.
*/
streamBaseURL?: string;
/**
* Optional per-request fetch override. Called with the resolved URL and the
* RequestInit the transport built, plus endpoint context. Use this to
* inject custom headers (e.g. distributed tracing), redirect via a proxy,
* or wrap fetch with retries/logging.
*
* @example Add a tracing header to every chat request:
* ```ts
* fetch: (url, init, ctx) => {
* init.headers = new Headers(init.headers);
* init.headers.set("traceparent", currentTraceparent());
* return globalThis.fetch(url, init);
* },
* ```
*/
fetch?: ChatFetchOverride;
/** Additional headers included in every API request. */
headers?: Record<string, string>;
/**
* Seconds to wait for the realtime stream to produce data before timing
* out. @default 120
*/
streamTimeoutSeconds?: number;
/**
* Default client data merged into every wire `metadata`. Per-call
* `metadata` overrides transport-level defaults.
*/
clientData?: TClientData extends Record<string, unknown> ? TClientData : Record<string, unknown>;
/**
* Restore active session state from external storage (e.g. localStorage)
* after a page refresh. Hydrated entries skip the start round-trip and
* use their `publicAccessToken` directly. On 401, the transport
* invokes `accessToken` to refresh.
*/
sessions?: Record<string, ChatSessionPersistedState>;
/**
* Called whenever a chat session's state changes. Use this to persist
* state for reconnection after a page refresh — `null` is passed when
* the session is removed.
*/
onSessionChange?: (chatId: string, session: ChatSessionPersistedState | null) => void;
/**
* Enable multi-tab coordination. When `true`, only one tab at a time
* can send messages to a given chatId; other tabs go read-only.
*
* No-op when `BroadcastChannel` is unavailable. @default false
*/
multiTab?: boolean;
/**
* Read-only "watch" mode for observing an existing chat run from the
* outside (e.g. a dashboard viewer). When `true`, the SSE subscription
* stays open across `trigger:turn-complete` so consumers see turn 2,
* 3, … through one long-lived stream. Pair with `sessions` hydration
* and `reconnectToStream` for the typical viewer flow. @default false
*/
watch?: boolean;
/**
* Opt-in URL that gives a brand-new chat a head start: instead of
* waiting for the trigger.dev agent run to dequeue + boot before
* the first LLM call, the transport POSTs the first user message
* to a route handler in your warm process (Next.js, etc.) that
* exports `chat.handover({ agentId, run })` from
* `@trigger.dev/sdk/chat-server`. That handler runs `streamText`
* step 1 right away while the agent boots in parallel, then hands
* off mid-turn for tool execution (or exits clean for pure-text
* turns).
*
* First turn only. Subsequent turns on the same chat bypass this
* URL and write directly to `session.in` — the same direct-trigger
* path used when `headStart` is unset. Customers using `headStart`
* still need `accessToken` and (optionally) `startSession` for
* those subsequent turns.
*
* NOT a stock `useChat` "endpoint" — this is not the canonical
* request URL for every turn, just the warm first-turn shortcut.
*
* In benchmarks, head-starting drops first-turn TTFC roughly in
* half versus the direct-trigger flow (cold-start agent boot +
* onTurnStart hook overlap with the LLM TTFB instead of stacking
* before it).
*
* @default undefined (direct-trigger flow on every turn)
*/
headStart?: string;
};
/**
* Internal state for tracking active chat sessions. Sessions are
* task-bound and the server is the run manager — the transport only
* needs to know the session-scoped PAT to address `.in/append`, `.out`,
* `end-and-continue`, etc.
* @internal
*/
type ChatSessionState = {
/** Session-scoped PAT — `read:sessions:{chatId} + write:sessions:{chatId}`. */
publicAccessToken: string;
/** Last SSE event ID — used to resume the stream without replaying old events. */
lastEventId?: string;
/** Set when the stream was aborted mid-turn (stop). On reconnect, skip chunks until trigger:turn-complete. */
skipToTurnComplete?: boolean;
/** Whether the agent is currently streaming a response. Set on first chunk, cleared on turn-complete. */
isStreaming?: boolean;
};
/**
* A custom AI SDK `ChatTransport` that runs chat completions as durable
* Trigger.dev tasks via the Sessions primitive.
*
* Lifecycle:
* 1. Customer pre-creates the session server-side OR calls
* `transport.start(chatId)` to mint a one-shot start token and
* `POST /api/v1/sessions` from the browser.
* 2. The server triggers the first run as part of session create and
* returns a session-scoped PAT.
* 3. `sendMessages` appends to `.in` and subscribes to `.out`. When a
* run dies (idle, cancel, end-and-continue), the server's
* append-time probe triggers a fresh run for the same session —
* transport keeps streaming.
* 4. `stop()` posts a `{kind:"stop"}` chunk; the agent's turn aborts
* but the run keeps reading `.in` for the next message.
* 5. PAT expiry: transport invokes `accessToken` to refresh and
* retries the failing request once.
*/
export class TriggerChatTransport implements ChatTransport<UIMessage> {
private readonly taskId: string;
private readonly resolveAccessToken: (params: AccessTokenParams) => string | Promise<string>;
private readonly resolveStartSession:
| ((params: StartSessionParams<Record<string, unknown>>) => Promise<StartSessionResult>)
| undefined;
private readonly resolveBaseURLFn: ChatBaseURLResolver;
private readonly fetchOverride: ChatFetchOverride | undefined;
private readonly extraHeaders: Record<string, string>;
private readonly streamTimeoutSeconds: number;
private defaultMetadata: Record<string, unknown> | undefined;
private readonly watchMode: boolean;
private readonly headStart: string | undefined;
private coordinator: ChatTabCoordinator | null = null;
private _onSessionChange:
| ((chatId: string, session: ChatSessionPersistedState | null) => void)
| undefined;
private sessions: Map<string, ChatSessionState> = new Map();
private activeStreams: Map<string, AbortController> = new Map();
private pendingStarts: Map<string, Promise<ChatSessionState>> = new Map();
constructor(options: TriggerChatTransportOptions) {
this.taskId = options.task;
this.resolveAccessToken = options.accessToken;
this.resolveStartSession = options.startSession as
| ((params: StartSessionParams<Record<string, unknown>>) => Promise<StartSessionResult>)
| undefined;
const baseURLOption = options.baseURL ?? DEFAULT_BASE_URL;
const streamOverride = options.streamBaseURL;
// The transport only ever talks to realtime session endpoints (`in`/`out`).
// For a string base URL pointing at Trigger.dev Cloud, route those to the
// dedicated realtime host (`resolveChatStreamBaseURL`) so the long-lived
// SSE reads and input appends don't load the api service. An explicit
// `streamBaseURL` (SSE only, deprecated) and a `baseURL` resolver function
// are honored verbatim — the customer owns routing in those cases.
this.resolveBaseURLFn =
typeof baseURLOption === "function"
? (ctx) => (ctx.endpoint === "out" && streamOverride ? streamOverride : baseURLOption(ctx))
: (ctx) =>
ctx.endpoint === "out" && streamOverride
? streamOverride
: resolveChatStreamBaseURL(baseURLOption);
this.fetchOverride = options.fetch;
this.extraHeaders = options.headers ?? {};
this.streamTimeoutSeconds = options.streamTimeoutSeconds ?? DEFAULT_STREAM_TIMEOUT_SECONDS;
this.defaultMetadata = options.clientData;
this._onSessionChange = options.onSessionChange;
this.watchMode = options.watch ?? false;
this.headStart = options.headStart;
if (options.multiTab && !this.watchMode) {
this.coordinator = new ChatTabCoordinator();
this.coordinator.addSessionListener((chatId, sessionUpdate) => {
const session = this.sessions.get(chatId);
if (session && sessionUpdate.lastEventId) {
session.lastEventId = sessionUpdate.lastEventId;
}
});
}
if (options.sessions) {
for (const [chatId, session] of Object.entries(options.sessions)) {
this.sessions.set(chatId, {
publicAccessToken: session.publicAccessToken,
lastEventId: session.lastEventId,
isStreaming: session.isStreaming,
});
}
}
}
// -------------------------------------------------------------------------
// Public lifecycle
// -------------------------------------------------------------------------
/**
* Eagerly create a Session and trigger its first run. Useful as a
* "the user might be about to send a message — boot the agent now"
* preload, or to take ownership of the session before any sendMessage.
*
* Idempotent: calling `start(chatId)` twice converges to the same
* session via the `(env, externalId)` upsert. Concurrent calls
* deduplicate via an in-flight promise.
*
* Requires `getStartToken` to be configured. Customers who pre-create
* sessions server-side don't need to call this.
*/
async start(chatId: string): Promise<ChatSessionPersistedState> {
const existing = this.sessions.get(chatId);
if (existing?.publicAccessToken) {
return this.toPersisted(existing);
}
const inflight = this.pendingStarts.get(chatId);
if (inflight) return inflight.then(this.toPersisted);
const promise = this.doStart(chatId).finally(() => {
this.pendingStarts.delete(chatId);
});
this.pendingStarts.set(chatId, promise);
return promise.then(this.toPersisted);
}
/**
* Eagerly create the session before the user types. Same semantics as
* {@link start} — kept as a separate name for the AI SDK Chat hook,
* which calls `preload` rather than `start`.
*/
async preload(chatId: string): Promise<void> {
await this.start(chatId);
}
/**
* Send a user message via the session's `.in` channel. The server
* probes `currentRunId`; if terminal/null it triggers a fresh run on
* the same session before the append lands. The returned
* `ReadableStream` carries the agent's response chunks via `.out` SSE.
*/
sendMessages = async (
options: {
trigger: "submit-message" | "regenerate-message";
chatId: string;
messageId: string | undefined;
messages: UIMessage[];
abortSignal: AbortSignal | undefined;
} & ChatRequestOptions
): Promise<ReadableStream<UIMessageChunk>> => {
const { trigger, chatId, messageId, messages, abortSignal, body, metadata } = options;
if (this.coordinator) {
if (this.coordinator.isReadOnly(chatId)) {
throw new Error("This chat is active in another tab");
}
this.coordinator.claim(chatId);
}
const mergedMetadata =
this.defaultMetadata || metadata
? { ...(this.defaultMetadata ?? {}), ...((metadata as Record<string, unknown>) ?? {}) }
: undefined;
// First-turn handover routing — when `headStart` is set AND no
// session state exists yet for this chatId, POST the wire payload
// to the customer's `chat.handover` route handler. The handler
// creates the session, triggers the agent run with
// `handover-prepare`, runs `streamText` step 1 in its warm
// process, and tees the output back as the SSE response. We
// hydrate session state from the response headers so subsequent
// turns bypass the handler and use direct `session.in` writes.
if (this.headStart && !this.sessions.has(chatId)) {
return this.sendMessagesViaHandover({
trigger,
chatId,
messageId,
messages,
abortSignal,
body,
metadata: mergedMetadata,
});
}
// Slim wire — at most ONE message per record. The agent rebuilds prior
// history from its durable S3 snapshot + session.out replay at run boot
// (or `hydrateMessages`, if registered).
//
// - "submit-message": ship the latest message (new user message OR a
// tool-approval-responded assistant message). Throw if absent.
// Assistant messages with already-resolved tool parts are slimmed
// to just their resolution payload — reasoning blobs, prior text,
// and tool `input` stay on the agent side (rebuilt from snapshot
// or `hydrateMessages`). Keeps continuation payloads under the
// `.in/append` cap on reasoning-heavy turns.
// - "regenerate-message": omit `message`; the agent slices its own
// history (drops the trailing assistant) and re-runs.
if (trigger === "submit-message" && messages.length === 0) {
throw new Error(
"TriggerChatTransport.sendMessages: 'submit-message' trigger requires at least one message"
);
}
const wirePayload: ChatTaskWirePayload = {
...((body as Record<string, unknown>) ?? {}),
...(trigger === "submit-message"
? { message: slimSubmitMessageForWire(messages.at(-1)) }
: {}),
chatId,
trigger,
messageId,
metadata: mergedMetadata,
};
const state = await this.ensureSessionState(chatId);
// Generated outside the closure so auth-retries reuse the same part id
// and the server-side dedupe sees one logical append.
const partId = crypto.randomUUID();
const sendChatMessage = async (token: string) => {
await this.appendInputChunk(
chatId,
token,
this.serializeInputChunk({ kind: "message", payload: wirePayload }),
partId
);
};
await this.callWithAuthRetry(chatId, state, sendChatMessage);
// Cancel any in-flight stream for this chat — the new turn supersedes it.
const activeStream = this.activeStreams.get(chatId);
if (activeStream) {
activeStream.abort();
this.activeStreams.delete(chatId);
}
state.isStreaming = true;
this.notifySessionChange(chatId, state);
return this.subscribeToSessionStream(state, abortSignal, chatId);
};
/**
* First-turn-only path used when `headStart` is configured. POSTs the
* wire payload to the customer's `chat.handover` route handler and
* pipes its SSE response back as a UIMessageChunk stream. Hydrates
* session state from response headers so subsequent turns bypass
* the endpoint and use the direct `session.in` path.
*/
private async sendMessagesViaHandover(args: {
trigger: "submit-message" | "regenerate-message";
chatId: string;
messageId: string | undefined;
messages: UIMessage[];
abortSignal: AbortSignal | undefined;
body: ChatRequestOptions["body"];
metadata: Record<string, unknown> | undefined;
}): Promise<ReadableStream<UIMessageChunk>> {
if (!this.headStart) {
throw new Error("sendMessagesViaHandover called without headStart configured");
}
// Head-start ships full UIMessage history via `headStartMessages`. The
// route handler runs on the customer's own HTTP endpoint (NOT
// `/realtime/v1/sessions/{id}/in/append`), so the 512 KiB body cap
// doesn't apply. The agent's run boot consumes `headStartMessages` ONLY
// when no snapshot exists yet (very first turn) — see plan section B.3.
const wirePayload: ChatTaskWirePayload = {
...((args.body as Record<string, unknown>) ?? {}),
headStartMessages: args.messages,
chatId: args.chatId,
trigger: args.trigger,
messageId: args.messageId,
metadata: args.metadata,
};
const response = await fetch(this.headStart, {
method: "POST",
headers: {
"Content-Type": "application/json",
...this.extraHeaders,
},
body: JSON.stringify(wirePayload),
signal: args.abortSignal,
});
if (!response.ok) {
throw new Error(
`chat.handover endpoint returned ${response.status} ${response.statusText}`
);
}
if (!response.body) {
throw new Error("chat.handover endpoint returned no response body");
}
// Hydrate session state from response headers so subsequent turns
// skip the endpoint and write directly to session.in. Failing fast
// when the header is missing avoids a quiet degraded state where
// every later turn re-runs the handover route instead of taking
// the slim-wire path.
const accessToken = response.headers.get("X-Trigger-Chat-Access-Token");
const chatId = args.chatId;
if (!accessToken) {
throw new Error(
"chat.handover response is missing the X-Trigger-Chat-Access-Token header. chat.agent's handover endpoint must echo the session PAT so the transport can hydrate."
);
}
const state: ChatSessionState = {
publicAccessToken: accessToken,
isStreaming: true,
};
this.sessions.set(chatId, state);
this.notifySessionChange(chatId, state);
// Filter the parsed UIMessage stream:
// - Drop control chunks (`trigger:turn-complete`,
// `trigger:session-state`) before they reach AI SDK — they
// aren't valid UIMessageChunks and the AI SDK chunk parser
// would reject them.
// - On `trigger:turn-complete`, clear `isStreaming` so the
// useChat resume / reconnectToStream path doesn't open a
// second `session.out` subscription on top of our stitched
// response.
// - On `trigger:session-state`, hydrate `state.lastEventId`
// with the agent's final S2 event id. Without this, turn 2's
// `session.out` subscribe reads from the start and replays
// turn 1's chunks back into the UI.
// - On stream end (handover-skip case — no
// `trigger:turn-complete` arrives, customer's stream just
// ends), also clear `isStreaming` for the same reason.
const sessions = this.sessions;
const notifyChange = (id: string, state: ChatSessionState) =>
this.notifySessionChange(id, state);
const TRIGGER_TURN_COMPLETE = "trigger:turn-complete";
const TRIGGER_SESSION_STATE = "trigger:session-state";
const clearStreaming = () => {
const state = sessions.get(chatId);
if (state && state.isStreaming) {
state.isStreaming = false;
notifyChange(chatId, state);
}
};
const setLastEventId = (lastEventId: string) => {
const state = sessions.get(chatId);
if (state) {
state.lastEventId = lastEventId;
notifyChange(chatId, state);
}
};
return response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(parseUIMessageSseTransform())
.pipeThrough(
new TransformStream<UIMessageChunk, UIMessageChunk>({
transform(chunk, controller) {
if (chunk && typeof chunk === "object") {
const type = (chunk as { type?: unknown }).type;
if (type === TRIGGER_TURN_COMPLETE) {
clearStreaming();
return; // drop — not a real UIMessageChunk
}
if (type === TRIGGER_SESSION_STATE) {
const lastEventId = (chunk as { lastEventId?: unknown }).lastEventId;
if (typeof lastEventId === "string") {
setLastEventId(lastEventId);
}
return; // drop
}
}
controller.enqueue(chunk);
},
flush() {
clearStreaming();
},
})
);
}
/**
* Send a steering message during an active stream without disrupting
* it. The agent's `pendingMessages` config decides whether to inject
* between tool-call steps or buffer for the next turn.
*/
sendPendingMessage = async (
chatId: string,
message: UIMessage,
metadata?: Record<string, unknown>
): Promise<boolean> => {
const state = this.sessions.get(chatId);
if (!state) return false;
const mergedMetadata =
this.defaultMetadata || metadata
? { ...(this.defaultMetadata ?? {}), ...(metadata ?? {}) }
: undefined;
const wirePayload: ChatTaskWirePayload = {
message,
chatId,
trigger: "submit-message" as const,
metadata: mergedMetadata,
};
const partId = crypto.randomUUID();
const send = async (token: string) => {
await this.appendInputChunk(
chatId,
token,
this.serializeInputChunk({ kind: "message", payload: wirePayload }),
partId
);
};
try {
await this.callWithAuthRetry(chatId, state, send);
return true;
} catch {
return false;
}
};
/**
* Re-establish an SSE subscription to a known session. Used after a
* page refresh: the customer hydrates `sessions` in the constructor,
* the AI SDK calls `reconnectToStream` to resume the stream.
*/
reconnectToStream = async (
options: {
chatId: string;
abortSignal?: AbortSignal | undefined;
} & ChatRequestOptions
): Promise<ReadableStream<UIMessageChunk> | null> => {
const state = this.sessions.get(options.chatId);
if (!state) return null;
if (state.isStreaming === false) return null;
if (this.activeStreams.has(options.chatId)) return null;
const abortController = new AbortController();
this.activeStreams.set(options.chatId, abortController);
const abortSignal = options.abortSignal
? AbortSignal.any([options.abortSignal, abortController.signal])
: abortController.signal;
return this.subscribeToSessionStream(state, abortSignal, options.chatId, {
sendStopOnAbort: !!options.abortSignal,
// Reconnect-on-reload opts into the server's settled-peek shortcut
// so the SSE doesn't hang for 60s when no turn is in flight. Active
// send-a-message paths must keep wait=60 to avoid racing the
// freshly-triggered turn's first chunk.
peekSettled: true,
});
};
/**
* Stop the current generation. Sends `{kind:"stop"}` on `.in`; the
* agent aborts its `streamText` call but stays alive for the next
* message.
*/
stopGeneration = async (chatId: string): Promise<boolean> => {
const state = this.sessions.get(chatId);
if (!state) return false;
const partId = crypto.randomUUID();
const send = async (token: string) => {
await this.appendInputChunk(
chatId,
token,
this.serializeInputChunk({ kind: "stop" }),
partId
);
};
try {
await this.callWithAuthRetry(chatId, state, send);
} catch {
return false;
}
state.skipToTurnComplete = true;
const activeStream = this.activeStreams.get(chatId);
if (activeStream) {
activeStream.abort();
this.activeStreams.delete(chatId);
}
// The turn won't reach its turn-complete on this client (we just
// aborted the reader), so clear the streaming flag here and persist —
// otherwise a reload resumes mid-turn and replays the chunks the user
// explicitly stopped.
state.isStreaming = false;
this.notifySessionChange(chatId, state);
return true;
};
/**
* Send a custom action chunk (for `chat.agent`'s `actionSchema` /
* `onAction` hook). Actions are not turns — only `hydrateMessages`
* and `onAction` fire on the agent side. The returned stream
* carries any model response `onAction` produced (when it returns a
* `StreamTextResult`); for `void`-returning side-effect-only actions
* the stream completes immediately with `trigger:turn-complete`.
*/
sendAction = async (
chatId: string,
action: unknown
): Promise<ReadableStream<UIMessageChunk>> => {
if (this.coordinator) {
if (this.coordinator.isReadOnly(chatId)) {
throw new Error("This chat is active in another tab");
}
this.coordinator.claim(chatId);
}
const state = await this.ensureSessionState(chatId);
const wirePayload: ChatTaskWirePayload = {
chatId,
trigger: "action" as const,
action,
metadata: this.defaultMetadata ?? undefined,
};
const body = this.serializeInputChunk({ kind: "message", payload: wirePayload });
const partId = crypto.randomUUID();
const send = async (token: string) => {
await this.appendInputChunk(chatId, token, body, partId);
};
await this.callWithAuthRetry(chatId, state, send);
// Supersede any in-flight reader before subscribing — same as
// `sendMessages`. Two concurrent readers both write `state.lastEventId`
// and the slower one can regress the cursor, replaying records on the
// next reconnect.
const activeStream = this.activeStreams.get(chatId);
if (activeStream) {
activeStream.abort();
this.activeStreams.delete(chatId);
}
// Mark streaming + persist so a reload mid-action resumes (reconnectToStream
// no-ops when the persisted session says isStreaming: false).
state.isStreaming = true;
this.notifySessionChange(chatId, state);
return this.subscribeToSessionStream(state, undefined, chatId);
};
// -------------------------------------------------------------------------
// External-state surface
// -------------------------------------------------------------------------
getSession = (chatId: string): ChatSessionPersistedState | undefined => {
const state = this.sessions.get(chatId);
if (!state) return undefined;
return this.toPersisted(state);
};
setSession(chatId: string, session: ChatSessionPersistedState): void {
this.sessions.set(chatId, {
publicAccessToken: session.publicAccessToken,
lastEventId: session.lastEventId,
isStreaming: session.isStreaming,
});
this.notifySessionChange(chatId, this.toPersisted(this.sessions.get(chatId)!));
}
setOnSessionChange(
callback: ((chatId: string, session: ChatSessionPersistedState | null) => void) | undefined
): void {
this._onSessionChange = callback;
}
/**
* Update the transport's `clientData`. Used by `useTriggerChatTransport`
* to keep the latest value reachable from inside `startSession` and
* the per-turn `metadata` merge without recreating the transport.
*
* Reads always go through the live field — closures around the