Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/chat-agent-hardening.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
---

Reliability fixes for `chat.agent`. A user message sent while the agent is streaming is no longer delivered twice (which could run a duplicate turn), input appends now carry an idempotency key so a retried send can't duplicate a message, stopping a generation clears the streaming state so a page reload doesn't replay the stopped turn, and runs can now carry the full set of dashboard tags instead of being silently truncated. `onTurnComplete` now fires on errored turns (with the thrown error attached) and the failed turn's user message is persisted so it isn't lost on the next run. Custom agents and manual `chat.writeTurnComplete` callers now trim the output stream, sending a custom action no longer leaves a second stream reader running, and a long-lived `watch` subscription no longer grows its dedupe set without bound.
7 changes: 6 additions & 1 deletion packages/core/src/v3/apiClient/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { nanoid } from "nanoid";
import { z } from "zod";
import { VERSION } from "../../version.js";
import { generateJWT } from "../jwt.js";
Expand Down Expand Up @@ -1276,12 +1277,16 @@ export class ApiClient {
part: TBody,
requestOptions?: ZodFetchOptions
) {
// Generated once per logical append, outside zodfetch, so its internal
// retries reuse the same part id and the server-side dedupe collapses a
// retried POST whose first attempt actually committed.
const partId = nanoid(7);
Comment thread
ericallam marked this conversation as resolved.
Outdated
return zodfetch(
AppendToStreamResponseBody,
`${this.baseUrl}/realtime/v1/sessions/${encodeURIComponent(sessionIdOrExternalId)}/${io}/append`,
{
method: "POST",
headers: this.#getHeaders(false),
headers: { ...this.#getHeaders(false), "X-Part-Id": partId },
body: part,
},
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
Expand Down
15 changes: 14 additions & 1 deletion packages/core/src/v3/apiClient/runStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,20 @@ export class SSEStreamSubscription implements StreamSubscription {
this.retryCount = 0; // reset on success
armStall();

// Dedup window for record ids. Bounded with FIFO eviction so a
// long-lived `watch: true` subscription (one connection across many
// turns) doesn't grow this set without bound. The window only needs
// to cover the overlap a reconnect/replay can re-deliver, so a few
// thousand ids is ample.
const SEEN_IDS_CAP = 5000;
const seenIds = new Set<string>();
const rememberSeen = (id: string) => {
seenIds.add(id);
if (seenIds.size > SEEN_IDS_CAP) {
const oldest = seenIds.values().next().value;
if (oldest !== undefined) seenIds.delete(oldest);
}
};

const stream = response.body
.pipeThrough(new TextDecoderStream())
Expand Down Expand Up @@ -426,7 +439,7 @@ export class SSEStreamSubscription implements StreamSubscription {
| undefined;
if (parsedBody?.id) {
if (seenIds.has(parsedBody.id)) continue;
seenIds.add(parsedBody.id);
rememberSeen(parsedBody.id);
}
chunkController.enqueue({
id: record.seq_num.toString(),
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/v3/sessionStreams/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export class SessionStreamsAPI implements SessionStreamManager {
public on(
sessionId: string,
io: SessionChannelIO,
handler: (data: unknown) => void | Promise<void>
handler: (data: unknown) => void | boolean | Promise<void>
): { off: () => void } {
return this.#getManager().on(sessionId, io, handler);
}
Expand Down
93 changes: 62 additions & 31 deletions packages/core/src/v3/sessionStreams/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ import { computeReconnectDelayMs } from "../utils/reconnectBackoff.js";
import { SessionChannelIO, SessionStreamManager } from "./types.js";
import { controlSubtype } from "./wireProtocol.js";

type SessionStreamHandler = (data: unknown) => void | Promise<void>;
// A handler that synchronously returns `true` CONSUMES the record: it is
// not buffered for a later `once()` and the committed-consume cursor
// advances past it. Anything else (void, a Promise) leaves the record
// available to other consumers. See `SessionStreamManager.on` in types.ts.
type SessionStreamHandler = (data: unknown) => void | boolean | Promise<void>;

type OnceWaiter = {
resolve: (result: InputStreamOnceResult<unknown>) => void;
Expand Down Expand Up @@ -113,30 +117,41 @@ export class StandardSessionStreamManager implements SessionStreamManager {
this.explicitlyDisconnected.delete(key);
this.#ensureTailConnected(sessionId, io);

// Selective drain: offer each buffered record to the new handler and
// remove ONLY the ones it consumed (returned `true` — e.g. the
// messages facade for message-kind records). Consumed records advance
// the committed-consume cursor, so a worker using `messagesInput.on()`
// for user-message delivery persists a `.in` cursor that matches what
// the handler processed. Records the handler did not consume (other
// kinds) STAY buffered for a future `once()` or a different handler —
// a blind drain here either swallowed them (delivered to a handler
// that filtered them out, then deleted) or re-delivered already-
// processed messages into every newly attached per-turn handler,
// duplicating turns.
const buffered = this.buffer.get(key);
if (buffered && buffered.length > 0) {
for (const data of buffered) {
this.#invokeHandler(handler, data);
}
// Advance the committed-consume cursor to the highest seq drained
// into the new handler. `on()`-drain removes the records from the
// buffer, so they're no longer available to a future `once()` —
// from the manager's perspective they've been consumed. Without
// this, a worker that uses `messagesInput.on()` for user-message
// delivery (pendingMessages mode) would persist a `.in` cursor
// that lags behind the records the handler already processed, and
// the next boot would re-deliver them.
const seqList = this.bufferSeqNums.get(key);
if (seqList) {
for (const s of seqList) {
const seqList = this.bufferSeqNums.get(key) ?? [];
const keptRecords: unknown[] = [];
// Kept in lock-step with `keptRecords` — drifting lengths would map
// seq_nums to the wrong records on subsequent shifts.
const keptSeqNums: Array<number | undefined> = [];
for (let i = 0; i < buffered.length; i++) {
const consumed = this.#invokeHandler(handler, buffered[i]);
if (consumed) {
const s = seqList[i];
if (s !== undefined) this.#advanceLastDispatched(key, s);
} else {
keptRecords.push(buffered[i]);
keptSeqNums.push(seqList[i]);
}
}
this.buffer.delete(key);
// Keep `bufferSeqNums` in lock-step with `buffer` — without this,
// the parallel array desyncs and the next `#dispatch` that buffers
// a record would shift a stale seqNum into `lastDispatchedSeqNum`.
this.bufferSeqNums.delete(key);
if (keptRecords.length > 0) {
this.buffer.set(key, keptRecords);
this.bufferSeqNums.set(key, keptSeqNums);
} else {
this.buffer.delete(key);
this.bufferSeqNums.delete(key);
}
}

return {
Expand Down Expand Up @@ -509,13 +524,21 @@ export class StandardSessionStreamManager implements SessionStreamManager {
return;
}

// Persistent handlers (e.g. `stopInput.on(...)`) get a copy of the chunk,
// but they don't "consume" it — handlers usually filter by `kind` and
// ignore chunks they don't care about. Buffer the chunk regardless so a
// subsequent `once()` (e.g. `messagesInput.waitWithIdleTimeout` in
// chat.agent's preload) can still pick up the same chunk that arrived
// before its waiter was registered.
this.#invokeHandlers(key, data);
// Persistent handlers get a copy of the chunk. A handler that
// synchronously returns `true` CONSUMES it (e.g. the messages facade
// for message-kind records): the record must not also be buffered, or
// the next `on()` attach / `once()` would deliver it a second time —
// in chat.agent's turn loop that duplicated user messages into a
// second turn. Records no handler consumed (e.g. a message arriving
// while only the stop facade is attached during preload) are buffered
// so a subsequent `once()` can still pick them up.
const consumed = this.#invokeHandlers(key, data);
if (consumed) {
if (seqNum !== undefined) {
this.#advanceLastDispatched(key, seqNum);
}
return;
}

let buffered = this.buffer.get(key);
if (!buffered) {
Expand All @@ -535,17 +558,24 @@ export class StandardSessionStreamManager implements SessionStreamManager {
bufferedSeqs.push(seqNum);
}

#invokeHandlers(key: string, data: unknown): void {
/** Returns true when any handler consumed the record. All handlers are invoked regardless. */
#invokeHandlers(key: string, data: unknown): boolean {
const handlers = this.handlers.get(key);
if (!handlers) return;
if (!handlers) return false;
let consumed = false;
for (const handler of handlers) {
this.#invokeHandler(handler, data);
if (this.#invokeHandler(handler, data)) {
consumed = true;
}
}
return consumed;
}

#invokeHandler(handler: SessionStreamHandler, data: unknown): void {
/** Returns true when the handler synchronously consumed the record (returned `true`). */
#invokeHandler(handler: SessionStreamHandler, data: unknown): boolean {
try {
const result = handler(data);
if (result === true) return true;
if (result && typeof result === "object" && "catch" in result) {
(result as Promise<void>).catch((error) => {
if (this.debug) {
Expand All @@ -558,6 +588,7 @@ export class StandardSessionStreamManager implements SessionStreamManager {
console.error("[SessionStreamManager] Handler error:", error);
}
}
return false;
}

#removeOnceWaiter(key: string, waiter: OnceWaiter): void {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/v3/sessionStreams/noopManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export class NoopSessionStreamManager implements SessionStreamManager {
on(
_sessionId: string,
_io: SessionChannelIO,
_handler: (data: unknown) => void | Promise<void>
_handler: (data: unknown) => void | boolean | Promise<void>
): { off: () => void } {
return { off: () => {} };
}
Expand Down
13 changes: 11 additions & 2 deletions packages/core/src/v3/sessionStreams/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,20 @@ export type SessionChannelIO = "out" | "in";
* `.on` / `.once` / `.peek` / `.wait` / `.waitWithIdleTimeout`.
*/
export interface SessionStreamManager {
/** Register a handler that fires every time data arrives on the given channel. */
/**
* Register a handler that fires every time data arrives on the given channel.
*
* A handler that synchronously returns `true` CONSUMES the record: it is
* not buffered for a later `once()` and the committed-consume cursor
* advances past it. Any other return value (including a Promise) leaves
* the record available to other consumers. Kind-filtering facades return
* `true` for the kinds they own so the same record is never delivered
* twice — once to the handler and again via a buffer drain.
*/
on(
sessionId: string,
io: SessionChannelIO,
handler: (data: unknown) => void | Promise<void>
handler: (data: unknown) => void | boolean | Promise<void>
): { off: () => void };

/** Wait for the next record on the given channel (buffered or live). */
Expand Down
Loading
Loading