Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changeset/chat-slim-wire-merge.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
"@trigger.dev/sdk": patch
---

Fix `chat.agent` HITL continuations on reasoning-heavy turns. Two changes that work together:

- The per-turn merge now overlays the wire copy's tool-part state advancement onto the agent's existing chain — `state` + the matching resolution field (`output` / `errorText` / `approval`) come from the wire, everything else (text, reasoning, tool `input`, provider metadata) stays whatever the snapshot or `hydrateMessages` returned. Previously a full-message replace overwrote those fields with whatever the client shipped, so a slimmed wire copy landed a tool call with no `arguments` on the next LLM call. Covers `output-available` / `output-error` (HITL `addToolOutput`) and `approval-responded` / `output-denied` (approval flow).
- `TriggerChatTransport.sendMessages` and `AgentChat.sendRaw` now slim assistant messages that carry advanced tool parts. The wire payload is just `{ id, role, parts: [<state + resolution field>] }` for `submit-message` continuations; everything else passes through. Reasoning blobs and full tool inputs no longer ride the wire on every `addToolOutput` / `addToolApproveResponse`, so continuation payloads stay well under the `.in/append` cap on long agent loops.

Note: `onValidateMessages` receives the slim wire on HITL turns. If you call `validateUIMessages` from `ai` against the full `messages` array it will reject the slim assistant; filter to user messages (or skip on HITL turns) — see the updated docstring on `onValidateMessages` for the recommended pattern.

Net effect: `chat.addToolOutput(...)` / `chat.addToolApproveResponse(...)` on multi-step reasoning agents (OpenAI Responses with `store: false`, Anthropic extended thinking, etc.) no longer blows the cap and no longer corrupts the LLM input.
91 changes: 91 additions & 0 deletions packages/trigger-sdk/src/v3/ai-shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,94 @@ export type InferChatUIMessage<TTask extends AnyTask> = TTask extends Task<
>
? TUIM
: UIMessage;

/**
* Tool-part states that the client advances and ships back over the wire.
* Covers HITL `addToolOutput` (output-available / output-error) and the
* approval flow (approval-responded / output-denied). `input-streaming` /
* `input-available` / `approval-requested` are server-emitted only — if
* we see them on the wire we treat them as no-ops and skip the slim/merge.
*/
function isWireAdvanceableToolState(
state: unknown
): state is "output-available" | "output-error" | "approval-responded" | "output-denied" {
return (
state === "output-available" ||
state === "output-error" ||
state === "approval-responded" ||
state === "output-denied"
);
}

/** Whether a tool-UI part is a static (`tool-${name}`) or dynamic tool. */
function isToolPartType(type: unknown): boolean {
return typeof type === "string" && (type.startsWith("tool-") || type === "dynamic-tool");
}

/**
* Slim an outgoing assistant message before it ships on `submit-message`.
*
* When the client calls `addToolOutput(...)` to resolve a HITL tool (or
* `addToolApproveResponse(...)` to approve/deny one), the AI SDK turns
* it into a `submit-message` whose `messages.at(-1)` is the existing
* assistant message with the new state stitched onto a single tool
* part. On a reasoning-heavy multi-step turn, that full assistant
* message can be 600 KB – 1 MB (encrypted reasoning blobs, reasoning
* text, full tool `input` JSON, prior tool outputs) — well over the
* `.in/append` cap.
*
* The agent runtime only consumes the wire-advanced fields of those
* tool parts (state + output / errorText / approval). Everything else
* (text, reasoning, tool `input`) is rebuilt server-side from the
* durable snapshot or `hydrateMessages`. So we drop everything but
* the advanced tool parts here, and reduce those to just the fields
* the server overlays.
*
* The slim only fires when the assistant message carries at least one
* wire-advanceable tool part. Plain assistant resends (no resolved /
* approval-responded tool) and non-assistant messages pass through
* untouched.
*
* Pairs with the per-turn merge on the agent side
* (`mergeIncomingIntoHydrated` in `ai.ts`).
*/
export function slimSubmitMessageForWire<TMsg extends UIMessage | undefined>(
message: TMsg
): TMsg {
if (!message) return message;
if (message.role !== "assistant") return message;
const parts = (message.parts ?? []) as any[];
const advancedToolParts = parts.filter(
(p) =>
p &&
typeof p === "object" &&
isToolPartType(p.type) &&
isWireAdvanceableToolState(p.state)
);
if (advancedToolParts.length === 0) return message;
const slimParts = advancedToolParts.map((p: any) => {
const base: Record<string, unknown> = {
type: p.type,
toolCallId: p.toolCallId,
state: p.state,
};
if (p.type === "dynamic-tool" && typeof p.toolName === "string") {
base.toolName = p.toolName;
}
if (p.state === "output-available") {
base.output = p.output;
if (p.approval !== undefined) base.approval = p.approval;
} else if (p.state === "output-error") {
if (p.errorText !== undefined) base.errorText = p.errorText;
if (p.approval !== undefined) base.approval = p.approval;
} else if (p.state === "approval-responded" || p.state === "output-denied") {
if (p.approval !== undefined) base.approval = p.approval;
}
return base;
});
return {
id: message.id,
role: message.role,
parts: slimParts,
} as unknown as TMsg;
}
178 changes: 160 additions & 18 deletions packages/trigger-sdk/src/v3/ai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2128,6 +2128,110 @@ function extractNewToolResultsFromHistory(
return out;
}

/**
* Per-turn merge of an incoming wire `UIMessage` onto the matching entry
* a `hydrateMessages` hook (or the default accumulator) provides. Used
* to fold tool-state advances from the client into the agent's
* authoritative chain without trusting the wire copy for fields the
* LLM consumes.
*
* `hydrated` is treated as the source of truth for everything outside
* tool-state advancement: text, reasoning blobs, provider metadata,
* and tool `input` all stay as hydrated had them. We only overlay
* tool parts whose incoming state is wire-advanced — `output-available`
* / `output-error` (HITL `addToolOutput`) or `approval-responded` /
* `output-denied` (approval flow) — and only the corresponding
* resolution fields (`output` / `errorText` / `approval`). Hydrated
* `input` and everything else stay put.
*
* Without this, a slim wire copy (which `TriggerChatTransport` /
* `AgentChat.sendRaw` ship by default on HITL continuations) would
* clobber the hydrated assistant — the next LLM call would receive a
* tool call with no `input` and 4xx.
*
* @internal
*/
function mergeIncomingIntoHydrated<TMsg extends UIMessage>(
hydrated: TMsg,
incoming: UIMessage
): TMsg {
const incomingAdvancedByCallId = new Map<string, any>();
for (const part of (incoming.parts ?? []) as any[]) {
if (!isToolUIPart(part)) continue;
const toolCallId = part.toolCallId;
if (typeof toolCallId !== "string" || toolCallId.length === 0) continue;
if (!isWireAdvanceableToolState(part.state)) continue;
incomingAdvancedByCallId.set(toolCallId, part);
}

if (incomingAdvancedByCallId.size === 0) return hydrated;

let mutated = false;
const hydratedParts = (hydrated.parts ?? []) as any[];
const mergedParts = hydratedParts.map((part) => {
if (!isToolUIPart(part)) return part;
const toolCallId = part.toolCallId;
if (typeof toolCallId !== "string" || toolCallId.length === 0) return part;
const incomingPart = incomingAdvancedByCallId.get(toolCallId);
if (!incomingPart) return part;
// Terminal hydrated states (`output-available`, `output-error`,
// `output-denied`) are authoritative — never regressed by a stale
// wire arrival (replay, retry, out-of-order). `output-denied`
// matters here because the wire's `approval-responded` could
// otherwise overwrite a hydrated denial back to a non-terminal
// state.
if (isResolvedToolState(part.state) || part.state === "output-denied") {
return part;
}
// Same state on both sides — no progression to apply.
if (part.state === incomingPart.state) return part;
mutated = true;
if (incomingPart.state === "output-available") {
return {
...part,
state: incomingPart.state,
output: incomingPart.output,
...(incomingPart.approval !== undefined ? { approval: incomingPart.approval } : {}),
};
}
if (incomingPart.state === "output-error") {
return {
...part,
state: incomingPart.state,
errorText: incomingPart.errorText,
...(incomingPart.approval !== undefined ? { approval: incomingPart.approval } : {}),
};
}
// approval-responded / output-denied — overlay state + approval.
return {
...part,
state: incomingPart.state,
...(incomingPart.approval !== undefined ? { approval: incomingPart.approval } : {}),
};
});

if (!mutated) return hydrated;
return { ...hydrated, parts: mergedParts };
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/**
* Mirror of `slimSubmitMessageForWire`'s predicate. Kept here so the
* agent runtime doesn't have to import from `ai-shared.ts` for a
* one-liner. See that file for the full state-machine docs.
*
* @internal
*/
function isWireAdvanceableToolState(
state: unknown
): state is "output-available" | "output-error" | "approval-responded" | "output-denied" {
return (
state === "output-available" ||
state === "output-error" ||
state === "approval-responded" ||
state === "output-denied"
);
}

/**
* Imperative API for reading and modifying the accumulated message history.
*
Expand Down Expand Up @@ -3876,7 +3980,14 @@ export type HydrateMessagesEvent<TClientData = unknown, TUIM extends UIMessage =
* Event passed to the `onValidateMessages` callback.
*/
export type ValidateMessagesEvent<TUIM extends UIMessage = UIMessage> = {
/** The incoming UI messages for this turn (after cleanup of aborted tool parts). */
/**
* The incoming UI messages for this turn (after cleanup of aborted tool parts).
*
* For HITL continuations the assistant entry is slim — `state` + `output` /
* `errorText` / `approval` only, no `input` or other parts. Don't pass the
* full `messages` array to `validateUIMessages` from `ai`; filter to user
* messages (or your own subset) first.
*/
messages: TUIM[];
/** The unique identifier for the chat session. */
chatId: string;
Expand Down Expand Up @@ -4372,8 +4483,13 @@ export type ChatAgentOptions<
*
* Return the validated messages array. Throw to abort the turn with an error.
*
* This is the right place to call the AI SDK's `validateUIMessages` to catch
* malformed messages from storage or untrusted input before they reach the model.
* This is the right place to call the AI SDK's `validateUIMessages` on fresh
* user input. For HITL continuations (`addToolOutput` /
* `addToolApproveResponse`), the wire carries a slim assistant message — only
* the resolved tool parts, with `state` + `output` / `errorText` / `approval`
* and no `input`. `validateUIMessages` against the AI SDK schema rejects
* that shape, so filter to user messages (or skip validation entirely) on
* those turns.
*
* @example
* ```ts
Expand All @@ -4382,7 +4498,11 @@ export type ChatAgentOptions<
* chat.agent({
* id: "my-chat",
* onValidateMessages: async ({ messages }) => {
* return validateUIMessages({ messages, tools: chatTools });
* const userMessages = messages.filter((m) => m.role === "user");
* if (userMessages.length > 0) {
* await validateUIMessages({ messages: userMessages, tools: chatTools });
* }
* return messages;
* },
* run: async ({ messages }) => {
* return streamText({ model, messages, tools: chatTools });
Expand Down Expand Up @@ -6071,30 +6191,47 @@ function chatAgent<
}
);

// Auto-merge tool approval updates: if any incoming wire message
// has an ID that matches a hydrated message, replace it. This makes
// tool approvals work transparently with backend hydration.
// Per-turn merge of incoming wire messages onto the hydrated
// chain. Hydrated stays authoritative for text, reasoning
// blobs, provider metadata, and tool `input`; we only
// overlay tool-part state/output/errorText for tool calls
// the wire copy has just resolved. Apps that slim the wire
// copy to fit the .in/append cap (or drop fields they
// re-source from their own DB) get the hydrated copy
// through unchanged.
const merged = [...hydrated] as TUIMessage[];
for (const incoming of cleanedUIMessages) {
if (!incoming.id) continue;
const idx = merged.findIndex((m) => m.id === incoming.id);
if (idx !== -1) {
merged[idx] = incoming as TUIMessage;
merged[idx] = mergeIncomingIntoHydrated(
merged[idx]!,
incoming
) as TUIMessage;
}
}
Comment thread
ericallam marked this conversation as resolved.

accumulatedUIMessages = merged;
accumulatedMessages = await toModelMessages(merged);
locals.set(chatCurrentUIMessagesKey, accumulatedUIMessages);

// Track new messages for onTurnComplete.newUIMessages
// Track new messages for onTurnComplete.newUIMessages.
// Surface the post-merge entry when the wire copy
// matched a hydrated message — the wire copy may have
// been slimmed (HITL tool-output continuation), and
// customers expect `newUIMessages` to carry full
// content (text, reasoning, tool `input`).
if (
currentWirePayload.trigger === "submit-message" &&
cleanedUIMessages.length > 0
) {
const lastUI = cleanedUIMessages[cleanedUIMessages.length - 1]!;
turnNewUIMessages.push(lastUI);
const lastModel = (await toModelMessages([lastUI]))[0];
const mergedEntry = lastUI.id
? merged.find((m) => m.id === lastUI.id)
: undefined;
const surfaceUI = (mergedEntry ?? lastUI) as TUIMessage;
turnNewUIMessages.push(surfaceUI);
const lastModel = (await toModelMessages([surfaceUI]))[0];
Comment thread
ericallam marked this conversation as resolved.
Outdated
if (lastModel) turnNewModelMessages.push(lastModel);
}
} else {
Expand All @@ -6121,15 +6258,17 @@ function chatAgent<
} else if (cleanedUIMessages.length > 0) {
// Submit-message (and the special-cased
// handover-prepare → submit-message rewrite earlier in
// this scope): append-or-replace-by-id for the single
// delta message.
// this scope): merge-or-append for the single delta
// message.
//
// Tool approval responses arrive as a single assistant
// message whose id collides with the existing assistant
// in the accumulator — we replace by id. The fallback
// for HITL `addToolOutput` continuations where AI SDK
// regenerates the id (TRI-9137) still applies via
// `rewriteIncomingIdViaToolCallMap`.
// in the accumulator — we merge the resolved tool-part
// resolutions onto the existing entry, keeping text,
// reasoning, and tool `input` from the prior snapshot.
// The fallback for HITL `addToolOutput` continuations
// where AI SDK regenerates the id (TRI-9137) still
// applies via `rewriteIncomingIdViaToolCallMap`.
let replaced = false;
for (const raw of cleanedUIMessages) {
let incoming = raw;
Expand All @@ -6146,7 +6285,10 @@ function chatAgent<
}
}
if (idx !== -1) {
accumulatedUIMessages[idx] = incoming as TUIMessage;
accumulatedUIMessages[idx] = mergeIncomingIntoHydrated(
accumulatedUIMessages[idx]!,
incoming
) as TUIMessage;
replaced = true;
} else {
accumulatedUIMessages.push(incoming as TUIMessage);
Expand Down
11 changes: 8 additions & 3 deletions packages/trigger-sdk/src/v3/chat-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
TRIGGER_CONTROL_SUBTYPE,
} from "@trigger.dev/core/v3";
import type { ChatInputChunk, ChatTaskWirePayload } from "./ai-shared.js";
import { slimSubmitMessageForWire } from "./ai-shared.js";
import { sessions } from "./sessions.js";

// ─── Type inference ────────────────────────────────────────────────
Expand Down Expand Up @@ -406,16 +407,20 @@ export class AgentChat<TAgent = unknown> {

// Slim wire — at most ONE message per record. The agent rebuilds prior
// history from its durable S3 snapshot + session.out replay at run
// boot. `regenerate-message` omits `message` (the agent slices its own
// history). See plan vivid-humming-bonbon.
// boot (or `hydrateMessages` if registered).
//
// For `submit-message`, assistant messages carrying resolved tool parts
// (HITL `addToolOutput` answers) are slimmed to just the resolution
// payload — reasoning blobs, text, and tool `input` come from the
// agent's authoritative chain. `regenerate-message` omits `message`.
if (triggerType === "submit-message" && messages.length === 0) {
throw new Error(
"AgentChat.sendRaw: 'submit-message' trigger requires at least one message"
);
}
const lastIfSubmit =
triggerType === "submit-message"
? (messages.at(-1) as UIMessage | undefined)
? slimSubmitMessageForWire(messages.at(-1) as UIMessage | undefined)
: undefined;
const payload: ChatTaskWirePayload = {
...(lastIfSubmit ? { message: lastIfSubmit } : {}),
Expand Down
Loading
Loading