Skip to content

Commit 5122b06

Browse files
committed
fix(sdk,core): address review feedback on chat.agent hardening
Fold the failed turn's wire message into the error-path snapshot and onTurnComplete event so an early pre-run throw cannot strand it, and stop passing raw metadata as the parsed clientData on errored turns. Mark the session streaming before subscribing in sendAction so a reload mid-action resumes. Keep the per-append X-Part-Id from being overridden by a transport-wide header, and align the server-side append part id entropy with the browser transport. Adds tests for the error-turn snapshot, sendAction streaming state, and the X-Part-Id header precedence.
1 parent 2905009 commit 5122b06

5 files changed

Lines changed: 145 additions & 9 deletions

File tree

packages/core/src/v3/apiClient/index.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1279,8 +1279,9 @@ export class ApiClient {
12791279
) {
12801280
// Generated once per logical append, outside zodfetch, so its internal
12811281
// retries reuse the same part id and the server-side dedupe collapses a
1282-
// retried POST whose first attempt actually committed.
1283-
const partId = nanoid(7);
1282+
// retried POST whose first attempt actually committed. Full-length nanoid
1283+
// (~126 bits) to match the browser transport's randomUUID entropy.
1284+
const partId = nanoid();
12841285
return zodfetch(
12851286
AppendToStreamResponseBody,
12861287
`${this.baseUrl}/realtime/v1/sessions/${encodeURIComponent(sessionIdOrExternalId)}/${io}/append`,

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7583,6 +7583,17 @@ function chatAgent<
75837583
// Best-effort — if stream write fails, let the run continue anyway
75847584
}
75857585

7586+
// The submit-message merge into the accumulator may not have run
7587+
// yet (a pre-run hook threw), so fold the wire message in for the
7588+
// error event + snapshot — the cursor has already advanced past it,
7589+
// so otherwise it survives in neither the snapshot nor the `.in` tail.
7590+
const erroredWireMessage = (currentWirePayload as { message?: TUIMessage }).message;
7591+
const erroredUIMessages =
7592+
erroredWireMessage &&
7593+
!accumulatedUIMessages.some((m) => m.id === erroredWireMessage.id)
7594+
? [...accumulatedUIMessages, erroredWireMessage]
7595+
: accumulatedUIMessages;
7596+
75867597
// Fire onTurnComplete on the error path too — the docs promise it
75877598
// runs "after every turn, successful or errored" so customers can
75887599
// mark the turn failed. `responseMessage` is undefined/partial and
@@ -7596,15 +7607,18 @@ function chatAgent<
75967607
ctx,
75977608
chatId: currentWirePayload.chatId,
75987609
messages: accumulatedMessages,
7599-
uiMessages: accumulatedUIMessages,
7610+
uiMessages: erroredUIMessages,
76007611
newMessages: [],
7601-
newUIMessages: [],
7612+
newUIMessages: erroredWireMessage ? [erroredWireMessage] : [],
76027613
responseMessage: undefined,
76037614
rawResponseMessage: undefined,
76047615
turn,
76057616
runId: ctx.run.id,
76067617
chatAccessToken: "",
7607-
clientData: currentWirePayload.metadata as inferSchemaIn<TClientDataSchema>,
7618+
// Parsed `clientData` isn't reliably in scope here (parsing
7619+
// may itself be the failure), and the raw metadata is the
7620+
// wrong shape — leave it undefined on the error path.
7621+
clientData: undefined,
76087622
stopped: false,
76097623
continuation,
76107624
previousRunId,
@@ -7642,7 +7656,7 @@ function chatAgent<
76427656
await writeChatSnapshot<TUIMessage>(sessionIdForSnapshot, {
76437657
version: 1,
76447658
savedAt: Date.now(),
7645-
messages: accumulatedUIMessages,
7659+
messages: erroredUIMessages,
76467660
lastOutEventId: errorTurnCompleteResult?.lastEventId,
76477661
});
76487662
} catch (error) {

packages/trigger-sdk/src/v3/chat.test.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,6 +1005,65 @@ describe("TriggerChatTransport", () => {
10051005
expect(actionBody.payload.trigger).toBe("action");
10061006
expect(actionBody.payload.action).toEqual({ type: "undo" });
10071007
});
1008+
1009+
it("marks the session streaming and notifies before subscribing", async () => {
1010+
global.fetch = vi.fn().mockImplementation(async (url: string | URL) => {
1011+
const urlStr = typeof url === "string" ? url : url.toString();
1012+
if (isSessionStreamAppendUrl(urlStr)) return defaultAppendResponse();
1013+
if (isSessionOutSubscribeUrl(urlStr)) return defaultSseResponse();
1014+
throw new Error(`Unexpected URL: ${urlStr}`);
1015+
});
1016+
1017+
const onSessionChange = vi.fn();
1018+
const transport = new TriggerChatTransport({
1019+
task: "my-chat-task",
1020+
accessToken: () => "pat",
1021+
onSessionChange,
1022+
sessions: { "chat-act-stream": { publicAccessToken: "p" } },
1023+
});
1024+
1025+
const stream = await transport.sendAction("chat-act-stream", { type: "undo" });
1026+
// isStreaming:true must be observed during the action — otherwise a reload
1027+
// mid-action sees a persisted isStreaming:false and never resumes.
1028+
expect(
1029+
onSessionChange.mock.calls.some(([, session]) => session && session.isStreaming === true)
1030+
).toBe(true);
1031+
await drainChunks(stream);
1032+
});
1033+
});
1034+
1035+
describe("append idempotency header", () => {
1036+
it("the per-append X-Part-Id wins over a transport-wide headers override", async () => {
1037+
let appendPartId: string | undefined;
1038+
global.fetch = vi.fn().mockImplementation(async (url: string | URL, init?: RequestInit) => {
1039+
const urlStr = typeof url === "string" ? url : url.toString();
1040+
if (isSessionStreamAppendUrl(urlStr)) {
1041+
appendPartId = (init!.headers as Record<string, string>)["X-Part-Id"];
1042+
return defaultAppendResponse();
1043+
}
1044+
if (isSessionOutSubscribeUrl(urlStr)) return defaultSseResponse();
1045+
throw new Error(`Unexpected URL: ${urlStr}`);
1046+
});
1047+
1048+
const transport = new TriggerChatTransport({
1049+
task: "my-chat-task",
1050+
accessToken: () => "pat",
1051+
headers: { "X-Part-Id": "STATIC-OVERRIDE" },
1052+
sessions: { "chat-hdr": { publicAccessToken: "p" } },
1053+
});
1054+
1055+
const stream = await transport.sendMessages({
1056+
trigger: "submit-message",
1057+
chatId: "chat-hdr",
1058+
messageId: undefined,
1059+
messages: [createUserMessage("hi")],
1060+
abortSignal: undefined,
1061+
});
1062+
await drainChunks(stream);
1063+
1064+
expect(appendPartId).toBeDefined();
1065+
expect(appendPartId).not.toBe("STATIC-OVERRIDE");
1066+
});
10081067
});
10091068

10101069
describe("reconnectToStream", () => {

packages/trigger-sdk/src/v3/chat.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -944,6 +944,11 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
944944
this.activeStreams.delete(chatId);
945945
}
946946

947+
// Mark streaming + persist so a reload mid-action resumes (reconnectToStream
948+
// no-ops when the persisted session says isStreaming: false).
949+
state.isStreaming = true;
950+
this.notifySessionChange(chatId, state);
951+
947952
return this.subscribeToSessionStream(state, undefined, chatId);
948953
};
949954

@@ -1123,14 +1128,14 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
11231128
): Promise<void> {
11241129
const ctx: ChatTransportEndpointContext = { endpoint: "in", chatId };
11251130
const url = `${this.resolveBaseURL(ctx)}/realtime/v1/sessions/${encodeURIComponent(chatId)}/in/append`;
1131+
// extraHeaders first so the fixed headers below win — a transport-wide
1132+
// X-Part-Id must not override the per-append idempotency key.
11261133
const headers: Record<string, string> = {
1134+
...this.extraHeaders,
11271135
"Content-Type": "application/json",
11281136
Authorization: `Bearer ${token}`,
11291137
"x-trigger-source": "sdk",
1130-
// Idempotency key: the server skips appends whose part id it has
1131-
// already committed, so a retried POST can't duplicate the record.
11321138
"X-Part-Id": partId ?? crypto.randomUUID(),
1133-
...this.extraHeaders,
11341139
};
11351140
const response = await this.doFetch(ctx, url, { method: "POST", headers, body });
11361141
if (!response.ok) {

packages/trigger-sdk/test/mockChatAgent.test.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1140,6 +1140,63 @@ describe("mockChatAgent", () => {
11401140
}
11411141
});
11421142

1143+
it("error turn: onTurnComplete fires with the error and the failed message is snapshotted", async () => {
1144+
const onTurnComplete = vi.fn();
1145+
const agent = chat.agent({
1146+
id: "mockChatAgent.error-turn-run",
1147+
onTurnComplete,
1148+
run: async () => {
1149+
throw new Error("boom in run");
1150+
},
1151+
});
1152+
const harness = mockChatAgent(agent, { chatId: "err-run" });
1153+
try {
1154+
await harness.sendMessage(userMessage("hello", "u-err-run"));
1155+
await new Promise((r) => setTimeout(r, 50));
1156+
expect(onTurnComplete).toHaveBeenCalledTimes(1);
1157+
const evt = onTurnComplete.mock.calls[0]![0];
1158+
expect(evt.error).toBeInstanceOf(Error);
1159+
expect(evt.finishReason).toBe("error");
1160+
expect(evt.responseMessage).toBeUndefined();
1161+
expect(evt.uiMessages.some((m: any) => m.id === "u-err-run")).toBe(true);
1162+
const snap = harness.getSnapshot();
1163+
expect(snap?.messages.some((m) => m.id === "u-err-run")).toBe(true);
1164+
} finally {
1165+
await harness.close();
1166+
}
1167+
});
1168+
1169+
it("error turn: a pre-merge hook throw still snapshots the failed user message", async () => {
1170+
const model = new MockLanguageModelV3({
1171+
doStream: async () => ({ stream: textStream("never reached") }),
1172+
});
1173+
const onTurnComplete = vi.fn();
1174+
const agent = chat.agent({
1175+
id: "mockChatAgent.error-turn-prehook",
1176+
// onValidateMessages fires BEFORE the wire message is merged into the
1177+
// accumulator, so the message lands in the snapshot only because the
1178+
// error path folds the wire message back in.
1179+
onValidateMessages: async () => {
1180+
throw new Error("boom in validate");
1181+
},
1182+
onTurnComplete,
1183+
run: async ({ messages, signal }) => streamText({ model, messages, abortSignal: signal }),
1184+
});
1185+
const harness = mockChatAgent(agent, { chatId: "err-prehook" });
1186+
try {
1187+
await harness.sendMessage(userMessage("validate me", "u-err-prehook"));
1188+
await new Promise((r) => setTimeout(r, 50));
1189+
expect(onTurnComplete).toHaveBeenCalledTimes(1);
1190+
const evt = onTurnComplete.mock.calls[0]![0];
1191+
expect(evt.error).toBeInstanceOf(Error);
1192+
expect(evt.uiMessages.some((m: any) => m.id === "u-err-prehook")).toBe(true);
1193+
const snap = harness.getSnapshot();
1194+
expect(snap?.messages.some((m) => m.id === "u-err-prehook")).toBe(true);
1195+
} finally {
1196+
await harness.close();
1197+
}
1198+
});
1199+
11431200
it("seeds locals before run() via setupLocals (DI pattern)", async () => {
11441201
type FakeDb = { findUser(id: string): Promise<{ id: string; name: string }> };
11451202
const dbKey = locals.create<FakeDb>("test-db");

0 commit comments

Comments
 (0)