Skip to content

Commit 54c14dc

Browse files
committed
Add a writer to easily write chunks in callbacks
1 parent 3e2cdb1 commit 54c14dc

File tree

2 files changed

+160
-36
lines changed
  • packages/trigger-sdk/src/v3
  • references/ai-chat/src/trigger

2 files changed

+160
-36
lines changed

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 157 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,97 @@ export { CHAT_MESSAGES_STREAM_ID, CHAT_STOP_STREAM_ID };
317317
*/
318318
const chatStream = streams.define<UIMessageChunk>({ id: _CHAT_STREAM_KEY });
319319

320+
// ---------------------------------------------------------------------------
321+
// ChatWriter — stream writer for callbacks
322+
// ---------------------------------------------------------------------------
323+
324+
/**
325+
* A stream writer passed to chat lifecycle callbacks (`onPreload`, `onChatStart`,
326+
* `onTurnStart`, `onTurnComplete`, `onCompacted`).
327+
*
328+
* Write custom `UIMessageChunk` parts (e.g. `data-*` parts) directly to the chat
329+
* stream without the ceremony of `chat.stream.writer({ execute })`.
330+
*
331+
* The writer is lazy — no stream overhead if you don't call `write()` or `merge()`.
332+
*
333+
* @example
334+
* ```ts
335+
* onTurnStart: async ({ writer }) => {
336+
* writer.write({ type: "data-status", data: { loading: true } });
337+
* },
338+
* onTurnComplete: async ({ writer, uiMessages }) => {
339+
* writer.write({ type: "data-analytics", data: { messageCount: uiMessages.length } });
340+
* },
341+
* ```
342+
*/
343+
export type ChatWriter = {
344+
/** Write a single UIMessageChunk to the chat stream. */
345+
write(part: UIMessageChunk): void;
346+
/** Merge another stream's chunks into the chat stream. */
347+
merge(stream: ReadableStream<UIMessageChunk>): void;
348+
};
349+
350+
/**
351+
* Creates a lazy ChatWriter that only opens a realtime stream on first use.
352+
* Call `flush()` after the callback returns to await stream completion.
353+
* @internal
354+
*/
355+
function createLazyChatWriter(): { writer: ChatWriter; flush: () => Promise<void> } {
356+
let writeImpl: ((part: UIMessageChunk) => void) | null = null;
357+
let mergeImpl: ((stream: ReadableStream<UIMessageChunk>) => void) | null = null;
358+
let waitPromise: (() => Promise<unknown>) | null = null;
359+
let resolveExecute: (() => void) | null = null;
360+
361+
function ensureInitialized() {
362+
if (writeImpl) return;
363+
364+
const executePromise = new Promise<void>((resolve) => {
365+
resolveExecute = resolve;
366+
});
367+
368+
const { waitUntilComplete } = chatStream.writer({
369+
collapsed: true,
370+
spanName: "callback writer",
371+
execute: ({ write, merge }) => {
372+
writeImpl = write;
373+
mergeImpl = merge;
374+
return executePromise; // Keep execute alive until flush()
375+
},
376+
});
377+
waitPromise = waitUntilComplete;
378+
}
379+
380+
return {
381+
writer: {
382+
write(part: UIMessageChunk) {
383+
ensureInitialized();
384+
writeImpl!(part);
385+
},
386+
merge(stream: ReadableStream<UIMessageChunk>) {
387+
ensureInitialized();
388+
mergeImpl!(stream);
389+
},
390+
},
391+
async flush() {
392+
if (resolveExecute) {
393+
resolveExecute(); // Signal execute to complete
394+
await waitPromise!(); // Wait for stream to finish piping
395+
}
396+
},
397+
};
398+
}
399+
400+
/**
401+
* Runs a callback with a lazy ChatWriter, flushing the stream after completion.
402+
* @internal
403+
*/
404+
async function withChatWriter<T>(fn: (writer: ChatWriter) => Promise<T> | T): Promise<T> {
405+
const { writer, flush } = createLazyChatWriter();
406+
const result = await fn(writer);
407+
await flush();
408+
return result;
409+
}
410+
320411
/**
321412
* The wire payload shape sent by `TriggerChatTransport`.
322413
* Uses `metadata` to match the AI SDK's `ChatRequestOptions` field name.
@@ -748,6 +839,8 @@ export type CompactedEvent = {
748839
chatId?: string;
749840
/** The current turn number (if running inside a chat.task). */
750841
turn?: number;
842+
/** Stream writer — write custom `UIMessageChunk` parts to the chat stream. Lazy: no overhead if unused. */
843+
writer: ChatWriter;
751844
};
752845

753846
/**
@@ -988,7 +1081,7 @@ async function chatCompact(
9881081
const { waitUntilComplete } = streams.writer(CHAT_STREAM_KEY, {
9891082
spanName: "stream compaction chunks",
9901083
collapsed: true,
991-
execute: async ({ write }) => {
1084+
execute: async ({ write, merge }) => {
9921085
write({ type: "step-start" });
9931086
write({
9941087
type: "data-compaction",
@@ -1012,7 +1105,8 @@ async function chatCompact(
10121105
{ role: "assistant" as const, content: [{ type: "text" as const, text: `[Conversation summary]\n\n${summary}` }] },
10131106
]);
10141107

1015-
// Fire onCompacted hook
1108+
// Fire onCompacted hook — pass the existing writer so the callback
1109+
// can write custom chunks without creating a separate stream.
10161110
const onCompactedHook = locals.get(chatOnCompactedKey);
10171111
if (onCompactedHook) {
10181112
await onCompactedHook({
@@ -1026,6 +1120,7 @@ async function chatCompact(
10261120
stepNumber,
10271121
chatId: turnCtx?.chatId,
10281122
turn: turnCtx?.turn,
1123+
writer: { write, merge },
10291124
});
10301125
}
10311126

@@ -1576,6 +1671,8 @@ export type PreloadEvent<TClientData = unknown> = {
15761671
chatAccessToken: string;
15771672
/** Custom data from the frontend. */
15781673
clientData?: TClientData;
1674+
/** Stream writer — write custom `UIMessageChunk` parts to the chat stream. Lazy: no overhead if unused. */
1675+
writer: ChatWriter;
15791676
};
15801677

15811678
/**
@@ -1598,6 +1695,8 @@ export type ChatStartEvent<TClientData = unknown> = {
15981695
previousRunId?: string;
15991696
/** Whether this run was preloaded before the first message. */
16001697
preloaded: boolean;
1698+
/** Stream writer — write custom `UIMessageChunk` parts to the chat stream. Lazy: no overhead if unused. */
1699+
writer: ChatWriter;
16011700
};
16021701

16031702
/**
@@ -1628,6 +1727,8 @@ export type TurnStartEvent<TClientData = unknown> = {
16281727
previousTurnUsage?: LanguageModelUsage;
16291728
/** Cumulative token usage across all completed turns so far. */
16301729
totalUsage: LanguageModelUsage;
1730+
/** Stream writer — write custom `UIMessageChunk` parts to the chat stream. Lazy: no overhead if unused. */
1731+
writer: ChatWriter;
16311732
};
16321733

16331734
/**
@@ -1685,6 +1786,15 @@ export type TurnCompleteEvent<TClientData = unknown> = {
16851786
totalUsage: LanguageModelUsage;
16861787
};
16871788

1789+
/**
1790+
* Event passed to the `onBeforeTurnComplete` callback.
1791+
* Same as `TurnCompleteEvent` but includes a `writer` since the stream is still open.
1792+
*/
1793+
export type BeforeTurnCompleteEvent<TClientData = unknown> = TurnCompleteEvent<TClientData> & {
1794+
/** Stream writer — write custom `UIMessageChunk` parts to the chat stream. Lazy: no overhead if unused. */
1795+
writer: ChatWriter;
1796+
};
1797+
16881798
export type ChatTaskOptions<
16891799
TIdentifier extends string,
16901800
TClientDataSchema extends TaskSchema | undefined = undefined,
@@ -1774,17 +1884,17 @@ export type ChatTaskOptions<
17741884
*
17751885
* @example
17761886
* ```ts
1777-
* onBeforeTurnComplete: async ({ messages, uiMessages, usage }) => {
1887+
* onBeforeTurnComplete: async ({ writer, usage }) => {
17781888
* if (usage?.inputTokens && usage.inputTokens > 5000) {
1779-
* await chat.stream.append({ type: "data-compaction", id: generateId(), data: { status: "compacting" } });
1889+
* writer.write({ type: "data-compaction", id: generateId(), data: { status: "compacting" } });
17801890
* // ... compact messages ...
17811891
* chat.setMessages(compactedMessages);
1782-
* await chat.stream.append({ type: "data-compaction", id: generateId(), data: { status: "complete" } });
1892+
* writer.write({ type: "data-compaction", id: generateId(), data: { status: "complete" } });
17831893
* }
17841894
* }
17851895
* ```
17861896
*/
1787-
onBeforeTurnComplete?: (event: TurnCompleteEvent<inferSchemaOut<TClientDataSchema>>) => Promise<void> | void;
1897+
onBeforeTurnComplete?: (event: BeforeTurnCompleteEvent<inferSchemaOut<TClientDataSchema>>) => Promise<void> | void;
17881898

17891899
/**
17901900
* Called when conversation compaction occurs (via `chat.compact()` or
@@ -2120,11 +2230,14 @@ function chatTask<
21202230
await tracer.startActiveSpan(
21212231
"onPreload()",
21222232
async () => {
2123-
await onPreload({
2124-
chatId: payload.chatId,
2125-
runId: currentRunId,
2126-
chatAccessToken: preloadAccessToken,
2127-
clientData: preloadClientData,
2233+
await withChatWriter(async (writer) => {
2234+
await onPreload({
2235+
chatId: payload.chatId,
2236+
runId: currentRunId,
2237+
chatAccessToken: preloadAccessToken,
2238+
clientData: preloadClientData,
2239+
writer,
2240+
});
21282241
});
21292242
},
21302243
{
@@ -2333,15 +2446,18 @@ function chatTask<
23332446
await tracer.startActiveSpan(
23342447
"onChatStart()",
23352448
async () => {
2336-
await onChatStart({
2337-
chatId: currentWirePayload.chatId,
2338-
messages: accumulatedMessages,
2339-
clientData,
2340-
runId: currentRunId,
2341-
chatAccessToken: turnAccessToken,
2342-
continuation,
2343-
previousRunId,
2344-
preloaded,
2449+
await withChatWriter(async (writer) => {
2450+
await onChatStart({
2451+
chatId: currentWirePayload.chatId,
2452+
messages: accumulatedMessages,
2453+
clientData,
2454+
runId: currentRunId,
2455+
chatAccessToken: turnAccessToken,
2456+
continuation,
2457+
previousRunId,
2458+
preloaded,
2459+
writer,
2460+
});
23452461
});
23462462
},
23472463
{
@@ -2364,19 +2480,22 @@ function chatTask<
23642480
await tracer.startActiveSpan(
23652481
"onTurnStart()",
23662482
async () => {
2367-
await onTurnStart({
2368-
chatId: currentWirePayload.chatId,
2369-
messages: accumulatedMessages,
2370-
uiMessages: accumulatedUIMessages,
2371-
turn,
2372-
runId: currentRunId,
2373-
chatAccessToken: turnAccessToken,
2374-
clientData,
2375-
continuation,
2376-
previousRunId,
2377-
preloaded,
2378-
previousTurnUsage,
2379-
totalUsage: cumulativeUsage,
2483+
await withChatWriter(async (writer) => {
2484+
await onTurnStart({
2485+
chatId: currentWirePayload.chatId,
2486+
messages: accumulatedMessages,
2487+
uiMessages: accumulatedUIMessages,
2488+
turn,
2489+
runId: currentRunId,
2490+
chatAccessToken: turnAccessToken,
2491+
clientData,
2492+
continuation,
2493+
previousRunId,
2494+
preloaded,
2495+
previousTurnUsage,
2496+
totalUsage: cumulativeUsage,
2497+
writer,
2498+
});
23802499
});
23812500

23822501
// Check if onTurnStart replaced messages (compaction)
@@ -2617,7 +2736,7 @@ function chatTask<
26172736
const { waitUntilComplete } = streams.writer(CHAT_STREAM_KEY, {
26182737
spanName: "stream compaction chunks",
26192738
collapsed: true,
2620-
execute: async ({ write }) => {
2739+
execute: async ({ write, merge }) => {
26212740
write({
26222741
type: "data-compaction",
26232742
id: compactionId,
@@ -2670,6 +2789,7 @@ function chatTask<
26702789
stepNumber: -1, // outer loop, not a step
26712790
chatId: currentWirePayload.chatId,
26722791
turn,
2792+
writer: { write, merge },
26732793
});
26742794
}
26752795

@@ -2733,7 +2853,9 @@ function chatTask<
27332853
await tracer.startActiveSpan(
27342854
"onBeforeTurnComplete()",
27352855
async () => {
2736-
await onBeforeTurnComplete(turnCompleteEvent);
2856+
await withChatWriter(async (writer) => {
2857+
await onBeforeTurnComplete({ ...turnCompleteEvent, writer });
2858+
});
27372859

27382860
// Check if the hook replaced messages (compaction)
27392861
const override = locals.get(chatOverrideMessagesKey);

references/ai-chat/src/trigger/chat.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,9 @@ export const aiChat = chat.task({
424424
summaryLength: summary.length,
425425
});
426426
},
427-
onTurnStart: async ({ chatId, uiMessages }) => {
427+
onTurnStart: async ({ chatId, uiMessages, writer }) => {
428+
writer.write({ type: "data-turn-status", data: { status: "preparing" } });
429+
428430
// Persist messages so mid-stream refresh still shows the user message.
429431
// Deferred — runs in parallel with streaming, awaited before onTurnComplete.
430432
chat.defer(prisma.chat.update({ where: { id: chatId }, data: { messages: uiMessages as any } }));

0 commit comments

Comments
 (0)