Skip to content

Commit f5f29ce

Browse files
authored
fix(sdk,core): chat.agent delivery, idempotency, and recovery fixes (#3891)
## Summary A batch of reliability fixes for `chat.agent`: - A user message sent while the agent is streaming is no longer delivered twice (which could run a duplicate turn). - Input appends carry an idempotency key (`X-Part-Id`) so a retried send can't duplicate a message. - `onTurnComplete` now fires on errored turns with the thrown error attached, and the failed turn's user message is persisted so it isn't lost on the next run. - Stopping a generation clears the streaming state, so a page reload doesn't replay the stopped turn. - Custom agents and manual `chat.writeTurnComplete` callers trim the output stream, sending a custom action no longer leaves a second stream reader running, a long-lived `watch` subscription no longer grows its dedupe set without bound, promoting a queued message to steering no longer risks a double-send, and runs keep the full set of dashboard tags. The `X-Part-Id` header is accepted by current servers (they just don't dedupe on it yet), so this is safe to ship ahead of the matching server change.
1 parent b82d100 commit f5f29ce

15 files changed

Lines changed: 512 additions & 105 deletions

File tree

.changeset/chat-agent-hardening.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Reliability fixes for `chat.agent`. A user message sent while the agent is streaming is no longer delivered twice (which could run a duplicate turn), input appends now carry an idempotency key so a retried send can't duplicate a message, stopping a generation clears the streaming state so a page reload doesn't replay the stopped turn, and runs can now carry the full set of dashboard tags instead of being silently truncated. `onTurnComplete` now fires on errored turns (with the thrown error attached) and the failed turn's user message is persisted so it isn't lost on the next run. Custom agents and manual `chat.writeTurnComplete` callers now trim the output stream, sending a custom action no longer leaves a second stream reader running, and a long-lived `watch` subscription no longer grows its dedupe set without bound.

packages/core/src/v3/apiClient/index.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { nanoid } from "nanoid";
12
import { z } from "zod";
23
import { VERSION } from "../../version.js";
34
import { generateJWT } from "../jwt.js";
@@ -1276,12 +1277,17 @@ export class ApiClient {
12761277
part: TBody,
12771278
requestOptions?: ZodFetchOptions
12781279
) {
1280+
// Generated once per logical append, outside zodfetch, so its internal
1281+
// retries reuse the same part id and the server-side dedupe collapses a
1282+
// retried POST whose first attempt actually committed. Full-length nanoid
1283+
// (~126 bits) to match the browser transport's randomUUID entropy.
1284+
const partId = nanoid();
12791285
return zodfetch(
12801286
AppendToStreamResponseBody,
12811287
`${this.baseUrl}/realtime/v1/sessions/${encodeURIComponent(sessionIdOrExternalId)}/${io}/append`,
12821288
{
12831289
method: "POST",
1284-
headers: this.#getHeaders(false),
1290+
headers: { ...this.#getHeaders(false), "X-Part-Id": partId },
12851291
body: part,
12861292
},
12871293
mergeRequestOptions(this.defaultRequestOptions, requestOptions)

packages/core/src/v3/apiClient/runStream.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,20 @@ export class SSEStreamSubscription implements StreamSubscription {
371371
this.retryCount = 0; // reset on success
372372
armStall();
373373

374+
// Dedup window for record ids. Bounded with FIFO eviction so a
375+
// long-lived `watch: true` subscription (one connection across many
376+
// turns) doesn't grow this set without bound. The window only needs
377+
// to cover the overlap a reconnect/replay can re-deliver, so a few
378+
// thousand ids is ample.
379+
const SEEN_IDS_CAP = 5000;
374380
const seenIds = new Set<string>();
381+
const rememberSeen = (id: string) => {
382+
seenIds.add(id);
383+
if (seenIds.size > SEEN_IDS_CAP) {
384+
const oldest = seenIds.values().next().value;
385+
if (oldest !== undefined) seenIds.delete(oldest);
386+
}
387+
};
375388

376389
const stream = response.body
377390
.pipeThrough(new TextDecoderStream())
@@ -426,7 +439,7 @@ export class SSEStreamSubscription implements StreamSubscription {
426439
| undefined;
427440
if (parsedBody?.id) {
428441
if (seenIds.has(parsedBody.id)) continue;
429-
seenIds.add(parsedBody.id);
442+
rememberSeen(parsedBody.id);
430443
}
431444
chunkController.enqueue({
432445
id: record.seq_num.toString(),

packages/core/src/v3/sessionStreams/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export class SessionStreamsAPI implements SessionStreamManager {
3434
public on(
3535
sessionId: string,
3636
io: SessionChannelIO,
37-
handler: (data: unknown) => void | Promise<void>
37+
handler: (data: unknown) => void | boolean | Promise<void>
3838
): { off: () => void } {
3939
return this.#getManager().on(sessionId, io, handler);
4040
}

packages/core/src/v3/sessionStreams/manager.ts

Lines changed: 62 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@ import { computeReconnectDelayMs } from "../utils/reconnectBackoff.js";
99
import { SessionChannelIO, SessionStreamManager } from "./types.js";
1010
import { controlSubtype } from "./wireProtocol.js";
1111

12-
type SessionStreamHandler = (data: unknown) => void | Promise<void>;
12+
// A handler that synchronously returns `true` CONSUMES the record: it is
13+
// not buffered for a later `once()` and the committed-consume cursor
14+
// advances past it. Anything else (void, a Promise) leaves the record
15+
// available to other consumers. See `SessionStreamManager.on` in types.ts.
16+
type SessionStreamHandler = (data: unknown) => void | boolean | Promise<void>;
1317

1418
type OnceWaiter = {
1519
resolve: (result: InputStreamOnceResult<unknown>) => void;
@@ -113,30 +117,41 @@ export class StandardSessionStreamManager implements SessionStreamManager {
113117
this.explicitlyDisconnected.delete(key);
114118
this.#ensureTailConnected(sessionId, io);
115119

120+
// Selective drain: offer each buffered record to the new handler and
121+
// remove ONLY the ones it consumed (returned `true` — e.g. the
122+
// messages facade for message-kind records). Consumed records advance
123+
// the committed-consume cursor, so a worker using `messagesInput.on()`
124+
// for user-message delivery persists a `.in` cursor that matches what
125+
// the handler processed. Records the handler did not consume (other
126+
// kinds) STAY buffered for a future `once()` or a different handler —
127+
// a blind drain here either swallowed them (delivered to a handler
128+
// that filtered them out, then deleted) or re-delivered already-
129+
// processed messages into every newly attached per-turn handler,
130+
// duplicating turns.
116131
const buffered = this.buffer.get(key);
117132
if (buffered && buffered.length > 0) {
118-
for (const data of buffered) {
119-
this.#invokeHandler(handler, data);
120-
}
121-
// Advance the committed-consume cursor to the highest seq drained
122-
// into the new handler. `on()`-drain removes the records from the
123-
// buffer, so they're no longer available to a future `once()` —
124-
// from the manager's perspective they've been consumed. Without
125-
// this, a worker that uses `messagesInput.on()` for user-message
126-
// delivery (pendingMessages mode) would persist a `.in` cursor
127-
// that lags behind the records the handler already processed, and
128-
// the next boot would re-deliver them.
129-
const seqList = this.bufferSeqNums.get(key);
130-
if (seqList) {
131-
for (const s of seqList) {
133+
const seqList = this.bufferSeqNums.get(key) ?? [];
134+
const keptRecords: unknown[] = [];
135+
// Kept in lock-step with `keptRecords` — drifting lengths would map
136+
// seq_nums to the wrong records on subsequent shifts.
137+
const keptSeqNums: Array<number | undefined> = [];
138+
for (let i = 0; i < buffered.length; i++) {
139+
const consumed = this.#invokeHandler(handler, buffered[i]);
140+
if (consumed) {
141+
const s = seqList[i];
132142
if (s !== undefined) this.#advanceLastDispatched(key, s);
143+
} else {
144+
keptRecords.push(buffered[i]);
145+
keptSeqNums.push(seqList[i]);
133146
}
134147
}
135-
this.buffer.delete(key);
136-
// Keep `bufferSeqNums` in lock-step with `buffer` — without this,
137-
// the parallel array desyncs and the next `#dispatch` that buffers
138-
// a record would shift a stale seqNum into `lastDispatchedSeqNum`.
139-
this.bufferSeqNums.delete(key);
148+
if (keptRecords.length > 0) {
149+
this.buffer.set(key, keptRecords);
150+
this.bufferSeqNums.set(key, keptSeqNums);
151+
} else {
152+
this.buffer.delete(key);
153+
this.bufferSeqNums.delete(key);
154+
}
140155
}
141156

142157
return {
@@ -509,13 +524,21 @@ export class StandardSessionStreamManager implements SessionStreamManager {
509524
return;
510525
}
511526

512-
// Persistent handlers (e.g. `stopInput.on(...)`) get a copy of the chunk,
513-
// but they don't "consume" it — handlers usually filter by `kind` and
514-
// ignore chunks they don't care about. Buffer the chunk regardless so a
515-
// subsequent `once()` (e.g. `messagesInput.waitWithIdleTimeout` in
516-
// chat.agent's preload) can still pick up the same chunk that arrived
517-
// before its waiter was registered.
518-
this.#invokeHandlers(key, data);
527+
// Persistent handlers get a copy of the chunk. A handler that
528+
// synchronously returns `true` CONSUMES it (e.g. the messages facade
529+
// for message-kind records): the record must not also be buffered, or
530+
// the next `on()` attach / `once()` would deliver it a second time —
531+
// in chat.agent's turn loop that duplicated user messages into a
532+
// second turn. Records no handler consumed (e.g. a message arriving
533+
// while only the stop facade is attached during preload) are buffered
534+
// so a subsequent `once()` can still pick them up.
535+
const consumed = this.#invokeHandlers(key, data);
536+
if (consumed) {
537+
if (seqNum !== undefined) {
538+
this.#advanceLastDispatched(key, seqNum);
539+
}
540+
return;
541+
}
519542

520543
let buffered = this.buffer.get(key);
521544
if (!buffered) {
@@ -535,17 +558,24 @@ export class StandardSessionStreamManager implements SessionStreamManager {
535558
bufferedSeqs.push(seqNum);
536559
}
537560

538-
#invokeHandlers(key: string, data: unknown): void {
561+
/** Returns true when any handler consumed the record. All handlers are invoked regardless. */
562+
#invokeHandlers(key: string, data: unknown): boolean {
539563
const handlers = this.handlers.get(key);
540-
if (!handlers) return;
564+
if (!handlers) return false;
565+
let consumed = false;
541566
for (const handler of handlers) {
542-
this.#invokeHandler(handler, data);
567+
if (this.#invokeHandler(handler, data)) {
568+
consumed = true;
569+
}
543570
}
571+
return consumed;
544572
}
545573

546-
#invokeHandler(handler: SessionStreamHandler, data: unknown): void {
574+
/** Returns true when the handler synchronously consumed the record (returned `true`). */
575+
#invokeHandler(handler: SessionStreamHandler, data: unknown): boolean {
547576
try {
548577
const result = handler(data);
578+
if (result === true) return true;
549579
if (result && typeof result === "object" && "catch" in result) {
550580
(result as Promise<void>).catch((error) => {
551581
if (this.debug) {
@@ -558,6 +588,7 @@ export class StandardSessionStreamManager implements SessionStreamManager {
558588
console.error("[SessionStreamManager] Handler error:", error);
559589
}
560590
}
591+
return false;
561592
}
562593

563594
#removeOnceWaiter(key: string, waiter: OnceWaiter): void {

packages/core/src/v3/sessionStreams/noopManager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ export class NoopSessionStreamManager implements SessionStreamManager {
66
on(
77
_sessionId: string,
88
_io: SessionChannelIO,
9-
_handler: (data: unknown) => void | Promise<void>
9+
_handler: (data: unknown) => void | boolean | Promise<void>
1010
): { off: () => void } {
1111
return { off: () => {} };
1212
}

packages/core/src/v3/sessionStreams/types.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,20 @@ export type SessionChannelIO = "out" | "in";
2222
* `.on` / `.once` / `.peek` / `.wait` / `.waitWithIdleTimeout`.
2323
*/
2424
export interface SessionStreamManager {
25-
/** Register a handler that fires every time data arrives on the given channel. */
25+
/**
26+
* Register a handler that fires every time data arrives on the given channel.
27+
*
28+
* A handler that synchronously returns `true` CONSUMES the record: it is
29+
* not buffered for a later `once()` and the committed-consume cursor
30+
* advances past it. Any other return value (including a Promise) leaves
31+
* the record available to other consumers. Kind-filtering facades return
32+
* `true` for the kinds they own so the same record is never delivered
33+
* twice — once to the handler and again via a buffer drain.
34+
*/
2635
on(
2736
sessionId: string,
2837
io: SessionChannelIO,
29-
handler: (data: unknown) => void | Promise<void>
38+
handler: (data: unknown) => void | boolean | Promise<void>
3039
): { off: () => void };
3140

3241
/** Wait for the next record on the given channel (buffered or live). */

0 commit comments

Comments
 (0)