Skip to content

Commit 8c3dff6

Browse files
committed
fix(sdk,core): harden chat.agent message delivery and idempotency
Stop delivering a user message twice when it arrives mid-stream: the session stream manager now lets a handler consume a record so it is not also buffered for the next turn, which previously re-ran the message as a duplicate turn. Input appends carry an X-Part-Id idempotency key so a retried send cannot duplicate a message. Stopping a generation clears the streaming state and persists it, so a page reload no longer replays the stopped turn. Promoting a queued message to steering no longer sends inside a React state updater. Runs keep up to the full tag limit instead of being silently truncated. The in-memory test stream manager now mirrors the production consume semantics so this class of bug is covered.
1 parent 459dce2 commit 8c3dff6

12 files changed

Lines changed: 252 additions & 100 deletions

File tree

.changeset/chat-agent-hardening.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Reliability fixes for `chat.agent`. A user message sent while the agent is streaming is no longer delivered twice (which could run a duplicate turn), input appends now carry an idempotency key so a retried send can't duplicate a message, stopping a generation clears the streaming state so a page reload doesn't replay the stopped turn, and runs can now carry the full set of dashboard tags instead of being silently truncated.

packages/core/src/v3/apiClient/index.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { nanoid } from "nanoid";
12
import { z } from "zod";
23
import { VERSION } from "../../version.js";
34
import { generateJWT } from "../jwt.js";
@@ -1276,12 +1277,16 @@ export class ApiClient {
12761277
part: TBody,
12771278
requestOptions?: ZodFetchOptions
12781279
) {
1280+
// Generated once per logical append, outside zodfetch, so its internal
1281+
// retries reuse the same part id and the server-side dedupe collapses a
1282+
// retried POST whose first attempt actually committed.
1283+
const partId = nanoid(7);
12791284
return zodfetch(
12801285
AppendToStreamResponseBody,
12811286
`${this.baseUrl}/realtime/v1/sessions/${encodeURIComponent(sessionIdOrExternalId)}/${io}/append`,
12821287
{
12831288
method: "POST",
1284-
headers: this.#getHeaders(false),
1289+
headers: { ...this.#getHeaders(false), "X-Part-Id": partId },
12851290
body: part,
12861291
},
12871292
mergeRequestOptions(this.defaultRequestOptions, requestOptions)

packages/core/src/v3/sessionStreams/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export class SessionStreamsAPI implements SessionStreamManager {
3434
public on(
3535
sessionId: string,
3636
io: SessionChannelIO,
37-
handler: (data: unknown) => void | Promise<void>
37+
handler: (data: unknown) => void | boolean | Promise<void>
3838
): { off: () => void } {
3939
return this.#getManager().on(sessionId, io, handler);
4040
}

packages/core/src/v3/sessionStreams/manager.ts

Lines changed: 62 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@ import { computeReconnectDelayMs } from "../utils/reconnectBackoff.js";
99
import { SessionChannelIO, SessionStreamManager } from "./types.js";
1010
import { controlSubtype } from "./wireProtocol.js";
1111

12-
type SessionStreamHandler = (data: unknown) => void | Promise<void>;
12+
// A handler that synchronously returns `true` CONSUMES the record: it is
13+
// not buffered for a later `once()` and the committed-consume cursor
14+
// advances past it. Anything else (void, a Promise) leaves the record
15+
// available to other consumers. See `SessionStreamManager.on` in types.ts.
16+
type SessionStreamHandler = (data: unknown) => void | boolean | Promise<void>;
1317

1418
type OnceWaiter = {
1519
resolve: (result: InputStreamOnceResult<unknown>) => void;
@@ -113,30 +117,41 @@ export class StandardSessionStreamManager implements SessionStreamManager {
113117
this.explicitlyDisconnected.delete(key);
114118
this.#ensureTailConnected(sessionId, io);
115119

120+
// Selective drain: offer each buffered record to the new handler and
121+
// remove ONLY the ones it consumed (returned `true` — e.g. the
122+
// messages facade for message-kind records). Consumed records advance
123+
// the committed-consume cursor, so a worker using `messagesInput.on()`
124+
// for user-message delivery persists a `.in` cursor that matches what
125+
// the handler processed. Records the handler did not consume (other
126+
// kinds) STAY buffered for a future `once()` or a different handler —
127+
// a blind drain here either swallowed them (delivered to a handler
128+
// that filtered them out, then deleted) or re-delivered already-
129+
// processed messages into every newly attached per-turn handler,
130+
// duplicating turns.
116131
const buffered = this.buffer.get(key);
117132
if (buffered && buffered.length > 0) {
118-
for (const data of buffered) {
119-
this.#invokeHandler(handler, data);
120-
}
121-
// Advance the committed-consume cursor to the highest seq drained
122-
// into the new handler. `on()`-drain removes the records from the
123-
// buffer, so they're no longer available to a future `once()` —
124-
// from the manager's perspective they've been consumed. Without
125-
// this, a worker that uses `messagesInput.on()` for user-message
126-
// delivery (pendingMessages mode) would persist a `.in` cursor
127-
// that lags behind the records the handler already processed, and
128-
// the next boot would re-deliver them.
129-
const seqList = this.bufferSeqNums.get(key);
130-
if (seqList) {
131-
for (const s of seqList) {
133+
const seqList = this.bufferSeqNums.get(key) ?? [];
134+
const keptRecords: unknown[] = [];
135+
// Kept in lock-step with `keptRecords` — drifting lengths would map
136+
// seq_nums to the wrong records on subsequent shifts.
137+
const keptSeqNums: Array<number | undefined> = [];
138+
for (let i = 0; i < buffered.length; i++) {
139+
const consumed = this.#invokeHandler(handler, buffered[i]);
140+
if (consumed) {
141+
const s = seqList[i];
132142
if (s !== undefined) this.#advanceLastDispatched(key, s);
143+
} else {
144+
keptRecords.push(buffered[i]);
145+
keptSeqNums.push(seqList[i]);
133146
}
134147
}
135-
this.buffer.delete(key);
136-
// Keep `bufferSeqNums` in lock-step with `buffer` — without this,
137-
// the parallel array desyncs and the next `#dispatch` that buffers
138-
// a record would shift a stale seqNum into `lastDispatchedSeqNum`.
139-
this.bufferSeqNums.delete(key);
148+
if (keptRecords.length > 0) {
149+
this.buffer.set(key, keptRecords);
150+
this.bufferSeqNums.set(key, keptSeqNums);
151+
} else {
152+
this.buffer.delete(key);
153+
this.bufferSeqNums.delete(key);
154+
}
140155
}
141156

142157
return {
@@ -509,13 +524,21 @@ export class StandardSessionStreamManager implements SessionStreamManager {
509524
return;
510525
}
511526

512-
// Persistent handlers (e.g. `stopInput.on(...)`) get a copy of the chunk,
513-
// but they don't "consume" it — handlers usually filter by `kind` and
514-
// ignore chunks they don't care about. Buffer the chunk regardless so a
515-
// subsequent `once()` (e.g. `messagesInput.waitWithIdleTimeout` in
516-
// chat.agent's preload) can still pick up the same chunk that arrived
517-
// before its waiter was registered.
518-
this.#invokeHandlers(key, data);
527+
// Persistent handlers get a copy of the chunk. A handler that
528+
// synchronously returns `true` CONSUMES it (e.g. the messages facade
529+
// for message-kind records): the record must not also be buffered, or
530+
// the next `on()` attach / `once()` would deliver it a second time —
531+
// in chat.agent's turn loop that duplicated user messages into a
532+
// second turn. Records no handler consumed (e.g. a message arriving
533+
// while only the stop facade is attached during preload) are buffered
534+
// so a subsequent `once()` can still pick them up.
535+
const consumed = this.#invokeHandlers(key, data);
536+
if (consumed) {
537+
if (seqNum !== undefined) {
538+
this.#advanceLastDispatched(key, seqNum);
539+
}
540+
return;
541+
}
519542

520543
let buffered = this.buffer.get(key);
521544
if (!buffered) {
@@ -535,17 +558,24 @@ export class StandardSessionStreamManager implements SessionStreamManager {
535558
bufferedSeqs.push(seqNum);
536559
}
537560

538-
#invokeHandlers(key: string, data: unknown): void {
561+
/** Returns true when any handler consumed the record. All handlers are invoked regardless. */
562+
#invokeHandlers(key: string, data: unknown): boolean {
539563
const handlers = this.handlers.get(key);
540-
if (!handlers) return;
564+
if (!handlers) return false;
565+
let consumed = false;
541566
for (const handler of handlers) {
542-
this.#invokeHandler(handler, data);
567+
if (this.#invokeHandler(handler, data)) {
568+
consumed = true;
569+
}
543570
}
571+
return consumed;
544572
}
545573

546-
#invokeHandler(handler: SessionStreamHandler, data: unknown): void {
574+
/** Returns true when the handler synchronously consumed the record (returned `true`). */
575+
#invokeHandler(handler: SessionStreamHandler, data: unknown): boolean {
547576
try {
548577
const result = handler(data);
578+
if (result === true) return true;
549579
if (result && typeof result === "object" && "catch" in result) {
550580
(result as Promise<void>).catch((error) => {
551581
if (this.debug) {
@@ -558,6 +588,7 @@ export class StandardSessionStreamManager implements SessionStreamManager {
558588
console.error("[SessionStreamManager] Handler error:", error);
559589
}
560590
}
591+
return false;
561592
}
562593

563594
#removeOnceWaiter(key: string, waiter: OnceWaiter): void {

packages/core/src/v3/sessionStreams/noopManager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ export class NoopSessionStreamManager implements SessionStreamManager {
66
on(
77
_sessionId: string,
88
_io: SessionChannelIO,
9-
_handler: (data: unknown) => void | Promise<void>
9+
_handler: (data: unknown) => void | boolean | Promise<void>
1010
): { off: () => void } {
1111
return { off: () => {} };
1212
}

packages/core/src/v3/sessionStreams/types.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,20 @@ export type SessionChannelIO = "out" | "in";
2222
* `.on` / `.once` / `.peek` / `.wait` / `.waitWithIdleTimeout`.
2323
*/
2424
export interface SessionStreamManager {
25-
/** Register a handler that fires every time data arrives on the given channel. */
25+
/**
26+
* Register a handler that fires every time data arrives on the given channel.
27+
*
28+
* A handler that synchronously returns `true` CONSUMES the record: it is
29+
* not buffered for a later `once()` and the committed-consume cursor
30+
* advances past it. Any other return value (including a Promise) leaves
31+
* the record available to other consumers. Kind-filtering facades return
32+
* `true` for the kinds they own so the same record is never delivered
33+
* twice — once to the handler and again via a buffer drain.
34+
*/
2635
on(
2736
sessionId: string,
2837
io: SessionChannelIO,
29-
handler: (data: unknown) => void | Promise<void>
38+
handler: (data: unknown) => void | boolean | Promise<void>
3039
): { off: () => void };
3140

3241
/** Wait for the next record on the given channel (buffered or live). */

packages/core/src/v3/test/test-session-stream-manager.ts

Lines changed: 92 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ type OnceWaiter = {
1616
abortHandler?: () => void;
1717
};
1818

19-
type Handler = (data: unknown) => void | Promise<void>;
19+
// Same contract as the production manager: a handler that synchronously
20+
// returns `true` CONSUMES the record (not buffered, not re-delivered on a
21+
// future `on()` attach). See `SessionStreamManager.on` in types.ts.
22+
type Handler = (data: unknown) => void | boolean | Promise<void>;
2023

2124
function keyFor(sessionId: string, io: SessionChannelIO): string {
2225
return `${sessionId}:${io}`;
@@ -51,20 +54,32 @@ export class TestSessionStreamManager implements SessionStreamManager {
5154
}
5255
set.add(handler);
5356

54-
// Note: we intentionally do NOT replay buffered records into the
55-
// newly-registered handler, and we do NOT drain the buffer. The
56-
// buffer is owned by `once()` — registering a passive observer
57-
// (`on`) must not consume records destined for a future `once`
58-
// waiter. This matches production SSE semantics where handlers
59-
// observe records as they arrive, not retroactively.
60-
//
61-
// Earlier versions drained the buffer here, which caused user
62-
// messages buffered during the runtime's `runFn` boot phase to be
63-
// silently swallowed by the `stopInput.on()` handler registered at
64-
// ai.ts:4806 (the stop handler ignores `kind: "message"` chunks).
65-
// The next `messagesInput.waitWithIdleTimeout` then waited 30s for
66-
// a record that had already been "delivered" to a handler that
67-
// didn't want it.
57+
// Selective drain, matching the production manager: offer each
58+
// buffered record to the new handler and remove ONLY the ones it
59+
// consumed (returned `true`). Records the handler filtered out (other
60+
// kinds) stay buffered for a future `once()`. This is the corrected
61+
// form of two historical bugs: a blind drain swallowed boot-phase user
62+
// messages into the stop facade (which ignores `kind: "message"`),
63+
// and no-drain-at-all let production re-deliver already-processed
64+
// messages into every newly attached per-turn handler.
65+
const buffered = this.buffer.get(key);
66+
if (buffered && buffered.length > 0) {
67+
const kept: unknown[] = [];
68+
for (const data of buffered) {
69+
let consumed = false;
70+
try {
71+
consumed = handler(data) === true;
72+
} catch {
73+
// Never let a handler error break test state
74+
}
75+
if (!consumed) kept.push(data);
76+
}
77+
if (kept.length > 0) {
78+
this.buffer.set(key, kept);
79+
} else {
80+
this.buffer.delete(key);
81+
}
82+
}
6883

6984
return {
7085
off: () => {
@@ -212,20 +227,20 @@ export class TestSessionStreamManager implements SessionStreamManager {
212227
/**
213228
* Push a record onto the given channel.
214229
*
215-
* Dispatch rules — similar to the production manager, but with a tweak
216-
* that makes unit tests deterministic:
230+
* Dispatch rules — same as the production manager:
231+
*
232+
* 1. **A pending `.once` waiter consumes first.** Handlers still observe
233+
* a copy.
234+
* 2. **Otherwise handlers observe.** A handler that synchronously
235+
* returns `true` consumes the record (kind-filtering facades do this
236+
* for the kinds they own) — it is NOT buffered.
237+
* 3. **Records no one consumed are buffered** for the next `.once` call
238+
* or the next consuming `on()` attach.
217239
*
218-
* 1. **Handlers always observe** (like production). A session-level `.on`
219-
* is a filter-observer — it fires every time a record arrives,
220-
* regardless of whether a `.once` waiter is also active.
221-
* 2. **First waiter consumes** the record if present (like production).
222-
* 3. **If no waiter, the record is buffered for the next `.once` call.**
223-
* Production discards records that only match handlers — but in
224-
* production the SSE tail introduces enough latency that the next
225-
* `.once` is usually registered before the next record arrives. Tests
226-
* send synchronously right after `turn-complete`, so without this
227-
* buffer the next `waitWithIdleTimeout` would race and lose the
228-
* message. The buffer is the only deviation from production semantics.
240+
* Handler promises are awaited before resolving so test code can rely
241+
* on async handler work having settled by the time `__sendFromTest`
242+
* resolves. Consumption is decided on the synchronous return value,
243+
* exactly like production.
229244
*/
230245
async __sendFromTest(
231246
sessionId: string,
@@ -234,23 +249,6 @@ export class TestSessionStreamManager implements SessionStreamManager {
234249
): Promise<void> {
235250
const key = keyFor(sessionId, io);
236251

237-
const handlers = this.handlers.get(key);
238-
if (handlers && handlers.size > 0) {
239-
// Awaited so test code can rely on handlers having completed by the
240-
// time `__sendFromTest` resolves. Wrapped per-handler so a
241-
// throwing/rejecting handler doesn't poison Promise.all and break
242-
// unrelated test state.
243-
await Promise.all(
244-
Array.from(handlers).map(async (h) => {
245-
try {
246-
await h(data);
247-
} catch {
248-
// Never let a handler error break test state
249-
}
250-
})
251-
);
252-
}
253-
254252
const waiters = this.onceWaiters.get(key);
255253
if (waiters && waiters.length > 0) {
256254
const w = waiters.shift()!;
@@ -260,6 +258,27 @@ export class TestSessionStreamManager implements SessionStreamManager {
260258
w.signal.removeEventListener("abort", w.abortHandler);
261259
}
262260
w.resolve({ ok: true, output: data });
261+
await this.#invokeHandlers(key, data);
262+
return;
263+
}
264+
265+
const consumed = await this.#invokeHandlers(key, data);
266+
if (consumed) return;
267+
268+
// Re-check waiters: handler invocation above is awaited (unlike the
269+
// synchronous production dispatch), and the runtime commonly registers
270+
// its next `once()` during that window — e.g. the turn loop reaching
271+
// `waitWithIdleTimeout` while a handler settles. Without this second
272+
// look the record would be buffered while the fresh waiter hangs.
273+
const lateWaiters = this.onceWaiters.get(key);
274+
if (lateWaiters && lateWaiters.length > 0) {
275+
const w = lateWaiters.shift()!;
276+
if (lateWaiters.length === 0) this.onceWaiters.delete(key);
277+
if (w.timer) clearTimeout(w.timer);
278+
if (w.signal && w.abortHandler) {
279+
w.signal.removeEventListener("abort", w.abortHandler);
280+
}
281+
w.resolve({ ok: true, output: data });
263282
return;
264283
}
265284

@@ -271,6 +290,34 @@ export class TestSessionStreamManager implements SessionStreamManager {
271290
buffered.push(data);
272291
}
273292

293+
/**
294+
* Invoke all handlers; resolves once any returned promises settle.
295+
* Returns true when any handler synchronously consumed the record.
296+
* Wrapped per-handler so a throwing/rejecting handler doesn't poison
297+
* Promise.all and break unrelated test state.
298+
*/
299+
async #invokeHandlers(key: string, data: unknown): Promise<boolean> {
300+
const handlers = this.handlers.get(key);
301+
if (!handlers || handlers.size === 0) return false;
302+
303+
let consumed = false;
304+
await Promise.all(
305+
Array.from(handlers).map(async (h) => {
306+
try {
307+
const result = h(data);
308+
if (result === true) {
309+
consumed = true;
310+
return;
311+
}
312+
await result;
313+
} catch {
314+
// Never let a handler error break test state
315+
}
316+
})
317+
);
318+
return consumed;
319+
}
320+
274321
/**
275322
* Immediately resolve every pending `once()` waiter for the given channel
276323
* with a timeout error. Simulates a closed stream (e.g. session closed).

0 commit comments

Comments
 (0)