Skip to content

Commit 2905009

Browse files
committed
fix(sdk,core): keep errored chat turns recoverable and bound stream growth
Fire onTurnComplete on errored turns (with the thrown error attached) and persist a snapshot of the failed turn so its user message is not stranded past the resume cursor on the next run. Custom agents and manual chat.writeTurnComplete callers now trim the output stream the same way the built-in agent does, so it no longer grows without bound. Sending a custom action supersedes any in-flight reader instead of leaving two readers racing the resume cursor, and a long-lived watch subscription caps its dedupe set.
1 parent 8c3dff6 commit 2905009

4 files changed

Lines changed: 124 additions & 5 deletions

File tree

.changeset/chat-agent-hardening.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@
33
"@trigger.dev/core": patch
44
---
55

6-
Reliability fixes for `chat.agent`. A user message sent while the agent is streaming is no longer delivered twice (which could run a duplicate turn), input appends now carry an idempotency key so a retried send can't duplicate a message, stopping a generation clears the streaming state so a page reload doesn't replay the stopped turn, and runs can now carry the full set of dashboard tags instead of being silently truncated.
6+
Reliability fixes for `chat.agent`. A user message sent while the agent is streaming is no longer delivered twice (which could run a duplicate turn), input appends now carry an idempotency key so a retried send can't duplicate a message, stopping a generation clears the streaming state so a page reload doesn't replay the stopped turn, and runs can now carry the full set of dashboard tags instead of being silently truncated. `onTurnComplete` now fires on errored turns (with the thrown error attached) and the failed turn's user message is persisted so it isn't lost on the next run. Custom agents and manual `chat.writeTurnComplete` callers now trim the output stream, sending a custom action no longer leaves a second stream reader running, and a long-lived `watch` subscription no longer grows its dedupe set without bound.

packages/core/src/v3/apiClient/runStream.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,20 @@ export class SSEStreamSubscription implements StreamSubscription {
371371
this.retryCount = 0; // reset on success
372372
armStall();
373373

374+
// Dedup window for record ids. Bounded with FIFO eviction so a
375+
// long-lived `watch: true` subscription (one connection across many
376+
// turns) doesn't grow this set without bound. The window only needs
377+
// to cover the overlap a reconnect/replay can re-deliver, so a few
378+
// thousand ids is ample.
379+
const SEEN_IDS_CAP = 5000;
374380
const seenIds = new Set<string>();
381+
const rememberSeen = (id: string) => {
382+
seenIds.add(id);
383+
if (seenIds.size > SEEN_IDS_CAP) {
384+
const oldest = seenIds.values().next().value;
385+
if (oldest !== undefined) seenIds.delete(oldest);
386+
}
387+
};
375388

376389
const stream = response.body
377390
.pipeThrough(new TextDecoderStream())
@@ -426,7 +439,7 @@ export class SSEStreamSubscription implements StreamSubscription {
426439
| undefined;
427440
if (parsedBody?.id) {
428441
if (seenIds.has(parsedBody.id)) continue;
429-
seenIds.add(parsedBody.id);
442+
rememberSeen(parsedBody.id);
430443
}
431444
chunkController.enqueue({
432445
id: record.seq_num.toString(),

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 99 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4252,6 +4252,13 @@ export type TurnCompleteEvent<TClientData = unknown, TUIM extends UIMessage = UI
42524252
* manual `pipeChat()` or an aborted stream).
42534253
*/
42544254
finishReason?: FinishReason;
4255+
/**
4256+
* Set when the turn failed (the `run()` body or a lifecycle hook threw).
4257+
* On an errored turn `responseMessage` is undefined or partial and
4258+
* `finishReason` is `"error"`. Use this to mark the turn failed in your
4259+
* persistence. Undefined on a successful turn.
4260+
*/
4261+
error?: unknown;
42554262
};
42564263

42574264
/**
@@ -5094,6 +5101,11 @@ function chatCustomAgent<
50945101
// No client-side upsert needed.
50955102
locals.set(chatSessionHandleKey, sessions.open(payload.chatId));
50965103
locals.set(chatAgentRunContextKey, runOptions.ctx);
5104+
// Initialize the turn-complete trim slot so `chat.writeTurnComplete`
5105+
// trims `session.out` back to the previous turn boundary. Without
5106+
// this the slot is undefined and the trim never runs, so `.out`
5107+
// grows without bound for the whole custom-agent surface.
5108+
locals.set(lastTurnCompleteSeqNumKey, { value: undefined });
50975109
markChatAgentRunForStreamsWarning();
50985110
taskContext.setConversationId(payload.chatId);
50995111
stampConversationIdOnActiveSpan(payload.chatId);
@@ -7556,18 +7568,91 @@ function chatAgent<
75567568
throw turnError;
75577569
}
75587570

7571+
let errorTurnCompleteResult: Awaited<
7572+
ReturnType<typeof writeTurnCompleteChunk>
7573+
> | undefined;
75597574
try {
75607575
await withChatWriter(async (writer) => {
75617576
const errorText =
75627577
turnError instanceof Error ? turnError.message : "An unexpected error occurred";
75637578
writer.write({ type: "error", errorText } as any);
75647579
});
75657580
// Signal turn complete so the client knows this turn is done
7566-
await writeTurnCompleteChunk(currentWirePayload.chatId);
7581+
errorTurnCompleteResult = await writeTurnCompleteChunk(currentWirePayload.chatId);
75677582
} catch {
75687583
// Best-effort — if stream write fails, let the run continue anyway
75697584
}
75707585

7586+
// Fire onTurnComplete on the error path too — the docs promise it
7587+
// runs "after every turn, successful or errored" so customers can
7588+
// mark the turn failed. `responseMessage` is undefined/partial and
7589+
// `error` carries the thrown value.
7590+
if (onTurnComplete) {
7591+
try {
7592+
await tracer.startActiveSpan(
7593+
"onTurnComplete()",
7594+
async () => {
7595+
await onTurnComplete({
7596+
ctx,
7597+
chatId: currentWirePayload.chatId,
7598+
messages: accumulatedMessages,
7599+
uiMessages: accumulatedUIMessages,
7600+
newMessages: [],
7601+
newUIMessages: [],
7602+
responseMessage: undefined,
7603+
rawResponseMessage: undefined,
7604+
turn,
7605+
runId: ctx.run.id,
7606+
chatAccessToken: "",
7607+
clientData: currentWirePayload.metadata as inferSchemaIn<TClientDataSchema>,
7608+
stopped: false,
7609+
continuation,
7610+
previousRunId,
7611+
preloaded,
7612+
totalUsage: cumulativeUsage,
7613+
finishReason: "error",
7614+
error: turnError,
7615+
lastEventId: errorTurnCompleteResult?.lastEventId,
7616+
});
7617+
},
7618+
{
7619+
attributes: {
7620+
[SemanticInternalAttributes.STYLE_ICON]: "task-hook-onComplete",
7621+
[SemanticInternalAttributes.COLLAPSED]: true,
7622+
"chat.id": currentWirePayload.chatId,
7623+
"chat.turn": turn + 1,
7624+
"chat.errored": true,
7625+
},
7626+
}
7627+
);
7628+
} catch {
7629+
// A throwing onTurnComplete on the error path must not crash
7630+
// the run — keep the conversation alive for the next message.
7631+
}
7632+
}
7633+
7634+
// Persist a snapshot so the failed turn's user message isn't
7635+
// stranded. `writeTurnCompleteChunk` already advanced the `.in`
7636+
// cursor past it (via the session-in-event-id header), and the
7637+
// success-path snapshot write is skipped on error — without this
7638+
// the next boot would resume past a message that exists in
7639+
// neither the snapshot nor the replayable `.in` tail.
7640+
if (!hydrateMessages) {
7641+
try {
7642+
await writeChatSnapshot<TUIMessage>(sessionIdForSnapshot, {
7643+
version: 1,
7644+
savedAt: Date.now(),
7645+
messages: accumulatedUIMessages,
7646+
lastOutEventId: errorTurnCompleteResult?.lastEventId,
7647+
});
7648+
} catch (error) {
7649+
logger.warn("chat.agent: error-path snapshot write failed", {
7650+
error: error instanceof Error ? error.message : String(error),
7651+
sessionId: sessionIdForSnapshot,
7652+
});
7653+
}
7654+
}
7655+
75717656
// chat.requestUpgrade() / chat.endRun() — exit after error turn too
75727657
if (
75737658
locals.get(chatUpgradeRequestedKey) ||
@@ -9873,8 +9958,19 @@ async function writeTurnCompleteChunk(
98739958
// 2. Trim back to the previous turn-complete, if we have one. Skipping on
98749959
// first-turn-ever (or first turn post-OOM without a snapshot seed) is
98759960
// fine — the chain catches up next turn.
9876-
const slot = locals.get(lastTurnCompleteSeqNumKey);
9877-
const prev = slot?.value;
9961+
//
9962+
// Lazily create the slot if a caller reached here without one (a plain
9963+
// `task()` driving `chat.createSession` / `chat.writeTurnComplete`, vs.
9964+
// chatAgent/chatCustomAgent which seed it at boot). The first call then
9965+
// does no trim (nothing before it) and records its seq; later calls trim
9966+
// — so `.out` is bounded for every writeTurnComplete caller, not just the
9967+
// built-in agents.
9968+
let slot = locals.get(lastTurnCompleteSeqNumKey);
9969+
if (!slot) {
9970+
slot = { value: undefined };
9971+
locals.set(lastTurnCompleteSeqNumKey, slot);
9972+
}
9973+
const prev = slot.value;
98789974
if (slot && prev !== undefined) {
98799975
try {
98809976
await session.out.trimTo(prev);

packages/trigger-sdk/src/v3/chat.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -934,6 +934,16 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
934934

935935
await this.callWithAuthRetry(chatId, state, send);
936936

937+
// Supersede any in-flight reader before subscribing — same as
938+
// `sendMessages`. Two concurrent readers both write `state.lastEventId`
939+
// and the slower one can regress the cursor, replaying records on the
940+
// next reconnect.
941+
const activeStream = this.activeStreams.get(chatId);
942+
if (activeStream) {
943+
activeStream.abort();
944+
this.activeStreams.delete(chatId);
945+
}
946+
937947
return this.subscribeToSessionStream(state, undefined, chatId);
938948
};
939949

0 commit comments

Comments
 (0)