Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/chat-boot-cursor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
---

Continuation chat boots no longer stall for around 10 seconds before the first turn. The `session.in` resume cursor is now found with a non-blocking records read instead of draining an SSE long-poll (which always waited out its full 5 second inactivity window, twice per boot), the boot reads run concurrently, and chat snapshots carry the cursor so subsequent boots skip the scan entirely.
5 changes: 5 additions & 0 deletions .changeset/chat-headstart-hydrate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/sdk": patch
---

Fix `chat.headStart` when `hydrateMessages` is registered. The warm route's step-1 partial now reaches the agent's accumulator on the hydrate path, so `onTurnComplete` carries the full first turn (the head-start user message included), tool-call handovers resume from step 2 instead of re-running step 1, and the assistant `messageId` stays stable across the handover.
5 changes: 5 additions & 0 deletions .changeset/chat-headstart-reasoning.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/sdk": patch
---

Preserve reasoning parts across the `chat.headStart` handover. Extended-thinking models' step-1 reasoning now lands in the durable session history (and `onTurnComplete`) under the same assistant `messageId`, with provider metadata intact so Anthropic thinking signatures survive replays.
5 changes: 5 additions & 0 deletions packages/core/src/v3/schemas/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2051,6 +2051,11 @@ export const ReadSessionStreamRecordsResponseBody = z.object({
data: z.unknown(),
id: z.string(),
seqNum: z.number(),
// S2 record headers — present on Trigger control records (e.g.
// `trigger-control: turn-complete` plus sibling headers). The
// server has always serialized them; older schemas stripped them
// client-side, so treat as optional.
headers: z.array(z.tuple([z.string(), z.string()])).optional(),
})
),
});
Expand Down
9 changes: 9 additions & 0 deletions packages/core/src/v3/sessionStreams/chatSnapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ export type ChatSnapshotV1<TUIMessage extends UIMessage = UIMessage> = {
savedAt: number;
messages: TUIMessage[];
lastOutEventId?: string;
/**
* Committed `.in` consume cursor (S2 seq_num, stringified) as of this
* snapshot's turn-complete. Lets the next boot seed the `.in` resume
* cursor without scanning `session.out` for the latest turn-complete
* header. Absent on snapshots written before this field existed —
* readers fall back to the scan.
*/
lastInEventId?: string;
};

/**
Expand All @@ -39,6 +47,7 @@ export const ChatSnapshotV1Schema = z.object({
savedAt: z.number(),
messages: z.array(z.unknown()),
lastOutEventId: z.string().optional(),
lastInEventId: z.string().optional(),
});

/**
Expand Down
293 changes: 187 additions & 106 deletions packages/trigger-sdk/src/v3/ai.ts

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions packages/trigger-sdk/src/v3/test/mock-chat-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ export type MockChatAgentOptions = {
* `sendHandover()` / `sendHandoverSkip()` to dispatch the handover signal.
*/
mode?: "preload" | "submit-message" | "handover-prepare" | "continuation";
/**
* First-turn UIMessage history shipped on the BOOT payload. Only
* meaningful with `mode: "handover-prepare"` — mirrors the
* `chat.headStart` route handler's `basePayload.headStartMessages`.
*/
headStartMessages?: UIMessage[];
/**
* Pre-seed the snapshot the agent reads at run boot. The runtime's
* snapshot read is replaced with one that returns this snapshot
Expand Down Expand Up @@ -501,6 +507,7 @@ export function mockChatAgent(
metadata: clientData,
...(!isContinuationMode && options.continuation ? { continuation: true } : {}),
...(options.previousRunId ? { previousRunId: options.previousRunId } : {}),
...(options.headStartMessages ? { headStartMessages: options.headStartMessages } : {}),
};

sendSessionInput = drivers.sessions.in.send;
Expand Down
265 changes: 265 additions & 0 deletions packages/trigger-sdk/test/chatHandover.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,271 @@ describe("chat.handover", () => {
}
});

it("pure-text head-start preserves reasoning parts in the response (TRI-10716)", async () => {
// Extended-thinking models stream a reasoning part in step 1. The
// synthesized partial must carry it (with provider metadata, so an
// Anthropic signature survives a UIMessage -> ModelMessage round
// trip) or the durable history loses the step-1 thinking.
let captured:
| { partTypes?: string[]; reasoningText?: string; meta?: unknown }
| undefined;

const agent = chat.agent({
id: "chat.handover.reasoning",
onTurnComplete: ({ responseMessage }) => {
const parts = responseMessage?.parts ?? [];
captured = {
partTypes: parts.map((p) => p.type),
reasoningText: parts
.filter((p) => p.type === "reasoning")
.map((p) => (p as { text?: string }).text || "")
.join(""),
meta: (parts.find((p) => p.type === "reasoning") as
| { providerMetadata?: unknown }
| undefined)?.providerMetadata,
};
},
run: async ({ messages, signal }) => {
return streamText({
model: new MockLanguageModelV3({
doStream: async () => ({ stream: textStream("should-not-run") }),
}),
messages,
abortSignal: signal,
});
},
});

const harness = mockChatAgent(agent, {
chatId: "test-handover-reasoning",
mode: "handover-prepare",
});

try {
await harness.sendHandover({
partialAssistantMessage: [
{
role: "assistant",
content: [
{
type: "reasoning",
text: "thinking about the greeting",
providerOptions: { anthropic: { signature: "sig-abc" } },
},
{ type: "text", text: "Hello!" },
],
},
],
messageId: "asst-reason-1",
isFinal: true,
});
await new Promise((r) => setTimeout(r, 30));

expect(captured).toBeDefined();
expect(captured!.partTypes).toEqual(["reasoning", "text"]);
expect(captured!.reasoningText).toBe("thinking about the greeting");
expect(captured!.meta).toEqual({ anthropic: { signature: "sig-abc" } });
} finally {
await harness.close();
}
});

it("pure-text head-start (isFinal: true) with hydrateMessages persists the partial (TRI-10715)", async () => {
// Same as the pure-text case above, but the customer registers
// `hydrateMessages` (the documented DB-as-source-of-truth pattern).
// The head-start user message must reach the hydrate hook as
// `incomingMessages`, and the warm route's partial must land in the
// accumulator so `onTurnComplete` carries the full first turn.
const runFn = vi.fn();
const stored: { id: string; role: string; parts: unknown[] }[] = [];
const hydrateIncomingRoles: string[] = [];
let captured:
| { responseId?: string; responseText?: string; roles?: string[] }
| undefined;

const agent = chat.agent({
id: "chat.handover.hydrate-pure-text",
hydrateMessages: async ({ incomingMessages }) => {
hydrateIncomingRoles.push(...incomingMessages.map((m) => m.role));
for (const m of incomingMessages) {
if (!stored.some((s) => s.id === m.id)) stored.push(m as (typeof stored)[number]);
}
return [...stored] as never;
},
onTurnComplete: ({ responseMessage, uiMessages }) => {
captured = {
responseId: responseMessage?.id,
responseText: (responseMessage?.parts ?? [])
.filter((p) => p.type === "text")
.map((p) => (p as { text?: string }).text || "")
.join(""),
roles: uiMessages.map((m) => m.role),
};
},
run: async ({ messages, signal }) => {
runFn();
return streamText({
model: new MockLanguageModelV3({
doStream: async () => ({ stream: textStream("should-not-run") }),
}),
messages,
abortSignal: signal,
});
},
});

const harness = mockChatAgent(agent, {
chatId: "test-handover-hydrate-final",
mode: "handover-prepare",
headStartMessages: [
{ id: "hs-user-1", role: "user", parts: [{ type: "text", text: "say hi" }] },
],
});

try {
await harness.sendHandover({
partialAssistantMessage: [
{
role: "assistant",
content: [{ type: "text", text: "Hi there, hope you're well." }],
},
],
messageId: "asst-hydrate-1",
isFinal: true,
});
await new Promise((r) => setTimeout(r, 30));

// isFinal — the agent never calls the user's run().
expect(runFn).not.toHaveBeenCalled();

// The head-start user message reached the hydrate hook as incoming.
expect(hydrateIncomingRoles).toContain("user");

// onTurnComplete carries the full first turn: user + the warm
// route's assistant, under the handover messageId.
expect(captured).toBeDefined();
expect(captured!.roles).toEqual(["user", "assistant"]);
expect(captured!.responseId).toBe("asst-hydrate-1");
expect(captured!.responseText).toBe("Hi there, hope you're well.");
} finally {
await harness.close();
}
});

it("tool-call handover (isFinal: false) with hydrateMessages resumes from step 2 (TRI-10715)", async () => {
// Hydrate variant of the schema-only tool-call case: the spliced
// partial (assistant + approval round) must reach the agent's
// streamText so AI SDK executes the pending tool instead of
// re-running step 1 from scratch against an empty/short prompt.
const toolExecute = vi.fn(async ({ city }: { city: string }) => ({ city, temp: 22 }));
const weatherTool = tool({
description: "Look up weather",
inputSchema: z.object({ city: z.string() }),
execute: toolExecute,
});

const stored: { id: string; role: string; parts: unknown[] }[] = [];
let runMessageRoles: string[] | undefined;
let captured: { roles?: string[]; assistantIds?: (string | undefined)[] } | undefined;

const agent = chat.agent({
id: "chat.handover.hydrate-schema-only-tool",
hydrateMessages: async ({ incomingMessages }) => {
for (const m of incomingMessages) {
if (!stored.some((s) => s.id === m.id)) stored.push(m as (typeof stored)[number]);
}
return [...stored] as never;
},
onTurnComplete: ({ uiMessages }) => {
captured = {
roles: uiMessages.map((m) => m.role),
assistantIds: uiMessages.filter((m) => m.role === "assistant").map((m) => m.id),
};
},
run: async ({ messages, signal }) => {
runMessageRoles = messages.map((m) => m.role);
return streamText({
model: new MockLanguageModelV3({
doStream: async () => ({ stream: textStream("the weather in tokyo is 22°C") }),
}),
messages,
tools: { weather: weatherTool },
abortSignal: signal,
});
},
});

const harness = mockChatAgent(agent, {
chatId: "test-handover-hydrate-tool",
mode: "handover-prepare",
headStartMessages: [
{ id: "hs-user-2", role: "user", parts: [{ type: "text", text: "weather in tokyo?" }] },
],
});

try {
const turn = await harness.sendHandover({
isFinal: false,
messageId: "asst-hydrate-2",
partialAssistantMessage: [
{
role: "assistant",
content: [
{ type: "text", text: "let me check the weather" },
{
type: "tool-call",
toolCallId: "tc-h1",
toolName: "weather",
input: { city: "tokyo" },
},
{
type: "tool-approval-request",
approvalId: "handover-approval-h1",
toolCallId: "tc-h1",
},
],
},
{
role: "tool",
content: [
{
type: "tool-approval-response",
approvalId: "handover-approval-h1",
approved: true,
},
],
},
],
});
await new Promise((r) => setTimeout(r, 30));

// The resume prompt contained the full splice: user + partial
// assistant + approval round — NOT an empty/user-only prompt.
expect(runMessageRoles).toEqual(["user", "assistant", "tool"]);

// AI SDK's initial-tool-execution branch ran the agent-side
// execute (no step-1 re-run).
expect(toolExecute).toHaveBeenCalledWith(
expect.objectContaining({ city: "tokyo" }),
expect.anything()
);

// Step-2 text streamed through session.out.
const text = turn.chunks
.filter((c) => c.type === "text-delta")
.map((c) => (c as { delta: string }).delta)
.join("");
expect(text).toContain("tokyo");

// One assistant in the final chain, under the handover messageId.
expect(captured).toBeDefined();
expect(captured!.roles).toEqual(["user", "assistant"]);
expect(captured!.assistantIds).toEqual(["asst-hydrate-2"]);
} finally {
await harness.close();
}
});

it("onTurnStart fires after the handover signal arrives (lazy)", async () => {
// Hooks should not fire during the wait — only once handover lands
// and a real turn begins. Verifies the order so customers can
Expand Down
5 changes: 5 additions & 0 deletions packages/trigger-sdk/test/mockChatAgent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1889,6 +1889,11 @@ describe("mockChatAgent", () => {
// The snapshot reflects the post-turn accumulator: 1 user + 1 assistant.
const roles = snap!.messages.map((m) => m.role);
expect(roles).toEqual(["user", "assistant"]);
// `lastInEventId` stays undefined here: TestSessionStreamManager
// deliberately has no seq numbers, so the committed `.in` cursor
// the production write site reads is undefined in harness runs.
// The cursor round-trip is covered by the live smoke instead.
expect(snap!.lastInEventId).toBeUndefined();
Comment thread
ericallam marked this conversation as resolved.
} finally {
await harness.close();
}
Expand Down
Loading
Loading