Skip to content

Commit 2b6d249

Browse files
authored
fix(sdk,core): head-start handover correctness and continuation boot latency (#3907)
## Summary Three related fixes for `chat.headStart` and continuation boots, found while investigating customer reports. **1. `chat.headStart` now works with `hydrateMessages`.** The turn-0 handover splice only ran on the default accumulation path, so agents registering `hydrateMessages` silently lost the warm route's step-1 response: pure-text turns fired `onTurnComplete` with no assistant message (and an empty durable write), tool-call turns re-ran step 1 from scratch under a fresh `messageId`, and the head-start user message never reached the hydrate hook at all. The first-turn history now reaches `hydrateMessages` as `incomingMessages`, and the splice runs after both accumulation branches, deduplicated by the handover `messageId`. **2. Reasoning parts survive the handover.** The synthesized partial only mapped text and tool-call parts, so an extended-thinking model's step-1 reasoning streamed to the browser but never reached durable history. Reasoning parts now map through with provider metadata, so Anthropic thinking signatures survive a UIMessage round trip on hydrate replays. **3. Continuation boots no longer stall for ~10 seconds.** The `.in` resume cursor was found by draining an SSE subscription that only closes after its 5 second inactivity window, and the scan ran twice per boot. It is now a non-blocking records read of the latest turn-complete header, runs at most once per boot, the boot reads run concurrently, and chat snapshots carry the cursor so subsequent boots skip the scan entirely. Measured locally on a cancel-then-continue repro: pre-turn continuation latency dropped from ~11s to ~0.5s. Every fix was verified red-green: new unit tests reproduced each failure before the fix, and end-to-end smoke tests against a live local stack covered both handover legs, reasoning persistence with extended thinking (including a follow-up turn that round-trips the persisted signed reasoning back to the provider), and the boot timing comparison. ## Rollout SDK-only; no server change required. A new SDK against a server that does not serialize record headers degrades to the existing no-cursor fallback. Old SDKs ignore the new snapshot field, and new SDKs fall back to the records scan on snapshots written before it existed.
1 parent 93b4715 commit 2b6d249

10 files changed

Lines changed: 573 additions & 107 deletions

File tree

.changeset/chat-boot-cursor.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Continuation chat boots no longer stall for around 10 seconds before the first turn. The `session.in` resume cursor is now found with a non-blocking records read instead of draining an SSE long-poll (which always waited out its full 5 second inactivity window, twice per boot), the boot reads run concurrently, and chat snapshots carry the cursor so subsequent boots skip the scan entirely.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Fix `chat.headStart` when `hydrateMessages` is registered. The warm route's step-1 partial now reaches the agent's accumulator on the hydrate path, so `onTurnComplete` carries the full first turn (the head-start user message included), tool-call handovers resume from step 2 instead of re-running step 1, and the assistant `messageId` stays stable across the handover.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Preserve reasoning parts across the `chat.headStart` handover. Extended-thinking models' step-1 reasoning now lands in the durable session history (and `onTurnComplete`) under the same assistant `messageId`, with provider metadata intact so Anthropic thinking signatures survive replays.

packages/core/src/v3/schemas/api.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2051,6 +2051,11 @@ export const ReadSessionStreamRecordsResponseBody = z.object({
20512051
data: z.unknown(),
20522052
id: z.string(),
20532053
seqNum: z.number(),
2054+
// S2 record headers — present on Trigger control records (e.g.
2055+
// `trigger-control: turn-complete` plus sibling headers). The
2056+
// server has always serialized them; older schemas stripped them
2057+
// client-side, so treat as optional.
2058+
headers: z.array(z.tuple([z.string(), z.string()])).optional(),
20542059
})
20552060
),
20562061
});

packages/core/src/v3/sessionStreams/chatSnapshot.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ export type ChatSnapshotV1<TUIMessage extends UIMessage = UIMessage> = {
2626
savedAt: number;
2727
messages: TUIMessage[];
2828
lastOutEventId?: string;
29+
/**
30+
* Committed `.in` consume cursor (S2 seq_num, stringified) as of this
31+
* snapshot's turn-complete. Lets the next boot seed the `.in` resume
32+
* cursor without scanning `session.out` for the latest turn-complete
33+
* header. Absent on snapshots written before this field existed —
34+
* readers fall back to the scan.
35+
*/
36+
lastInEventId?: string;
2937
};
3038

3139
/**
@@ -39,6 +47,7 @@ export const ChatSnapshotV1Schema = z.object({
3947
savedAt: z.number(),
4048
messages: z.array(z.unknown()),
4149
lastOutEventId: z.string().optional(),
50+
lastInEventId: z.string().optional(),
4251
});
4352

4453
/**

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 187 additions & 106 deletions
Large diffs are not rendered by default.

packages/trigger-sdk/src/v3/test/mock-chat-agent.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ export type MockChatAgentOptions = {
9292
* `sendHandover()` / `sendHandoverSkip()` to dispatch the handover signal.
9393
*/
9494
mode?: "preload" | "submit-message" | "handover-prepare" | "continuation";
95+
/**
96+
* First-turn UIMessage history shipped on the BOOT payload. Only
97+
* meaningful with `mode: "handover-prepare"` — mirrors the
98+
* `chat.headStart` route handler's `basePayload.headStartMessages`.
99+
*/
100+
headStartMessages?: UIMessage[];
95101
/**
96102
* Pre-seed the snapshot the agent reads at run boot. The runtime's
97103
* snapshot read is replaced with one that returns this snapshot
@@ -501,6 +507,7 @@ export function mockChatAgent(
501507
metadata: clientData,
502508
...(!isContinuationMode && options.continuation ? { continuation: true } : {}),
503509
...(options.previousRunId ? { previousRunId: options.previousRunId } : {}),
510+
...(options.headStartMessages ? { headStartMessages: options.headStartMessages } : {}),
504511
};
505512

506513
sendSessionInput = drivers.sessions.in.send;

packages/trigger-sdk/test/chatHandover.test.ts

Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,271 @@ describe("chat.handover", () => {
260260
}
261261
});
262262

263+
it("pure-text head-start preserves reasoning parts in the response (TRI-10716)", async () => {
264+
// Extended-thinking models stream a reasoning part in step 1. The
265+
// synthesized partial must carry it (with provider metadata, so an
266+
// Anthropic signature survives a UIMessage -> ModelMessage round
267+
// trip) or the durable history loses the step-1 thinking.
268+
let captured:
269+
| { partTypes?: string[]; reasoningText?: string; meta?: unknown }
270+
| undefined;
271+
272+
const agent = chat.agent({
273+
id: "chat.handover.reasoning",
274+
onTurnComplete: ({ responseMessage }) => {
275+
const parts = responseMessage?.parts ?? [];
276+
captured = {
277+
partTypes: parts.map((p) => p.type),
278+
reasoningText: parts
279+
.filter((p) => p.type === "reasoning")
280+
.map((p) => (p as { text?: string }).text || "")
281+
.join(""),
282+
meta: (parts.find((p) => p.type === "reasoning") as
283+
| { providerMetadata?: unknown }
284+
| undefined)?.providerMetadata,
285+
};
286+
},
287+
run: async ({ messages, signal }) => {
288+
return streamText({
289+
model: new MockLanguageModelV3({
290+
doStream: async () => ({ stream: textStream("should-not-run") }),
291+
}),
292+
messages,
293+
abortSignal: signal,
294+
});
295+
},
296+
});
297+
298+
const harness = mockChatAgent(agent, {
299+
chatId: "test-handover-reasoning",
300+
mode: "handover-prepare",
301+
});
302+
303+
try {
304+
await harness.sendHandover({
305+
partialAssistantMessage: [
306+
{
307+
role: "assistant",
308+
content: [
309+
{
310+
type: "reasoning",
311+
text: "thinking about the greeting",
312+
providerOptions: { anthropic: { signature: "sig-abc" } },
313+
},
314+
{ type: "text", text: "Hello!" },
315+
],
316+
},
317+
],
318+
messageId: "asst-reason-1",
319+
isFinal: true,
320+
});
321+
await new Promise((r) => setTimeout(r, 30));
322+
323+
expect(captured).toBeDefined();
324+
expect(captured!.partTypes).toEqual(["reasoning", "text"]);
325+
expect(captured!.reasoningText).toBe("thinking about the greeting");
326+
expect(captured!.meta).toEqual({ anthropic: { signature: "sig-abc" } });
327+
} finally {
328+
await harness.close();
329+
}
330+
});
331+
332+
it("pure-text head-start (isFinal: true) with hydrateMessages persists the partial (TRI-10715)", async () => {
333+
// Same as the pure-text case above, but the customer registers
334+
// `hydrateMessages` (the documented DB-as-source-of-truth pattern).
335+
// The head-start user message must reach the hydrate hook as
336+
// `incomingMessages`, and the warm route's partial must land in the
337+
// accumulator so `onTurnComplete` carries the full first turn.
338+
const runFn = vi.fn();
339+
const stored: { id: string; role: string; parts: unknown[] }[] = [];
340+
const hydrateIncomingRoles: string[] = [];
341+
let captured:
342+
| { responseId?: string; responseText?: string; roles?: string[] }
343+
| undefined;
344+
345+
const agent = chat.agent({
346+
id: "chat.handover.hydrate-pure-text",
347+
hydrateMessages: async ({ incomingMessages }) => {
348+
hydrateIncomingRoles.push(...incomingMessages.map((m) => m.role));
349+
for (const m of incomingMessages) {
350+
if (!stored.some((s) => s.id === m.id)) stored.push(m as (typeof stored)[number]);
351+
}
352+
return [...stored] as never;
353+
},
354+
onTurnComplete: ({ responseMessage, uiMessages }) => {
355+
captured = {
356+
responseId: responseMessage?.id,
357+
responseText: (responseMessage?.parts ?? [])
358+
.filter((p) => p.type === "text")
359+
.map((p) => (p as { text?: string }).text || "")
360+
.join(""),
361+
roles: uiMessages.map((m) => m.role),
362+
};
363+
},
364+
run: async ({ messages, signal }) => {
365+
runFn();
366+
return streamText({
367+
model: new MockLanguageModelV3({
368+
doStream: async () => ({ stream: textStream("should-not-run") }),
369+
}),
370+
messages,
371+
abortSignal: signal,
372+
});
373+
},
374+
});
375+
376+
const harness = mockChatAgent(agent, {
377+
chatId: "test-handover-hydrate-final",
378+
mode: "handover-prepare",
379+
headStartMessages: [
380+
{ id: "hs-user-1", role: "user", parts: [{ type: "text", text: "say hi" }] },
381+
],
382+
});
383+
384+
try {
385+
await harness.sendHandover({
386+
partialAssistantMessage: [
387+
{
388+
role: "assistant",
389+
content: [{ type: "text", text: "Hi there, hope you're well." }],
390+
},
391+
],
392+
messageId: "asst-hydrate-1",
393+
isFinal: true,
394+
});
395+
await new Promise((r) => setTimeout(r, 30));
396+
397+
// isFinal — the agent never calls the user's run().
398+
expect(runFn).not.toHaveBeenCalled();
399+
400+
// The head-start user message reached the hydrate hook as incoming.
401+
expect(hydrateIncomingRoles).toContain("user");
402+
403+
// onTurnComplete carries the full first turn: user + the warm
404+
// route's assistant, under the handover messageId.
405+
expect(captured).toBeDefined();
406+
expect(captured!.roles).toEqual(["user", "assistant"]);
407+
expect(captured!.responseId).toBe("asst-hydrate-1");
408+
expect(captured!.responseText).toBe("Hi there, hope you're well.");
409+
} finally {
410+
await harness.close();
411+
}
412+
});
413+
414+
it("tool-call handover (isFinal: false) with hydrateMessages resumes from step 2 (TRI-10715)", async () => {
415+
// Hydrate variant of the schema-only tool-call case: the spliced
416+
// partial (assistant + approval round) must reach the agent's
417+
// streamText so AI SDK executes the pending tool instead of
418+
// re-running step 1 from scratch against an empty/short prompt.
419+
const toolExecute = vi.fn(async ({ city }: { city: string }) => ({ city, temp: 22 }));
420+
const weatherTool = tool({
421+
description: "Look up weather",
422+
inputSchema: z.object({ city: z.string() }),
423+
execute: toolExecute,
424+
});
425+
426+
const stored: { id: string; role: string; parts: unknown[] }[] = [];
427+
let runMessageRoles: string[] | undefined;
428+
let captured: { roles?: string[]; assistantIds?: (string | undefined)[] } | undefined;
429+
430+
const agent = chat.agent({
431+
id: "chat.handover.hydrate-schema-only-tool",
432+
hydrateMessages: async ({ incomingMessages }) => {
433+
for (const m of incomingMessages) {
434+
if (!stored.some((s) => s.id === m.id)) stored.push(m as (typeof stored)[number]);
435+
}
436+
return [...stored] as never;
437+
},
438+
onTurnComplete: ({ uiMessages }) => {
439+
captured = {
440+
roles: uiMessages.map((m) => m.role),
441+
assistantIds: uiMessages.filter((m) => m.role === "assistant").map((m) => m.id),
442+
};
443+
},
444+
run: async ({ messages, signal }) => {
445+
runMessageRoles = messages.map((m) => m.role);
446+
return streamText({
447+
model: new MockLanguageModelV3({
448+
doStream: async () => ({ stream: textStream("the weather in tokyo is 22°C") }),
449+
}),
450+
messages,
451+
tools: { weather: weatherTool },
452+
abortSignal: signal,
453+
});
454+
},
455+
});
456+
457+
const harness = mockChatAgent(agent, {
458+
chatId: "test-handover-hydrate-tool",
459+
mode: "handover-prepare",
460+
headStartMessages: [
461+
{ id: "hs-user-2", role: "user", parts: [{ type: "text", text: "weather in tokyo?" }] },
462+
],
463+
});
464+
465+
try {
466+
const turn = await harness.sendHandover({
467+
isFinal: false,
468+
messageId: "asst-hydrate-2",
469+
partialAssistantMessage: [
470+
{
471+
role: "assistant",
472+
content: [
473+
{ type: "text", text: "let me check the weather" },
474+
{
475+
type: "tool-call",
476+
toolCallId: "tc-h1",
477+
toolName: "weather",
478+
input: { city: "tokyo" },
479+
},
480+
{
481+
type: "tool-approval-request",
482+
approvalId: "handover-approval-h1",
483+
toolCallId: "tc-h1",
484+
},
485+
],
486+
},
487+
{
488+
role: "tool",
489+
content: [
490+
{
491+
type: "tool-approval-response",
492+
approvalId: "handover-approval-h1",
493+
approved: true,
494+
},
495+
],
496+
},
497+
],
498+
});
499+
await new Promise((r) => setTimeout(r, 30));
500+
501+
// The resume prompt contained the full splice: user + partial
502+
// assistant + approval round — NOT an empty/user-only prompt.
503+
expect(runMessageRoles).toEqual(["user", "assistant", "tool"]);
504+
505+
// AI SDK's initial-tool-execution branch ran the agent-side
506+
// execute (no step-1 re-run).
507+
expect(toolExecute).toHaveBeenCalledWith(
508+
expect.objectContaining({ city: "tokyo" }),
509+
expect.anything()
510+
);
511+
512+
// Step-2 text streamed through session.out.
513+
const text = turn.chunks
514+
.filter((c) => c.type === "text-delta")
515+
.map((c) => (c as { delta: string }).delta)
516+
.join("");
517+
expect(text).toContain("tokyo");
518+
519+
// One assistant in the final chain, under the handover messageId.
520+
expect(captured).toBeDefined();
521+
expect(captured!.roles).toEqual(["user", "assistant"]);
522+
expect(captured!.assistantIds).toEqual(["asst-hydrate-2"]);
523+
} finally {
524+
await harness.close();
525+
}
526+
});
527+
263528
it("onTurnStart fires after the handover signal arrives (lazy)", async () => {
264529
// Hooks should not fire during the wait — only once handover lands
265530
// and a real turn begins. Verifies the order so customers can

packages/trigger-sdk/test/mockChatAgent.test.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1889,6 +1889,11 @@ describe("mockChatAgent", () => {
18891889
// The snapshot reflects the post-turn accumulator: 1 user + 1 assistant.
18901890
const roles = snap!.messages.map((m) => m.role);
18911891
expect(roles).toEqual(["user", "assistant"]);
1892+
// `lastInEventId` stays undefined here: TestSessionStreamManager
1893+
// deliberately has no seq numbers, so the committed `.in` cursor
1894+
// the production write site reads is undefined in harness runs.
1895+
// The cursor round-trip is covered by the live smoke instead.
1896+
expect(snap!.lastInEventId).toBeUndefined();
18921897
} finally {
18931898
await harness.close();
18941899
}

0 commit comments

Comments
 (0)