From f4e7fb27477590a753d72e827e389068819d6aeb Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 11 Jun 2026 15:03:02 +0100 Subject: [PATCH 1/4] fix(sdk): deliver the head-start handover partial when hydrateMessages is registered The turn-0 handover splice only ran on the default accumulation path, so agents registering hydrateMessages lost the warm route's step-1 response: pure-text turns fired onTurnComplete with no assistant message, tool-call turns re-ran step 1 from scratch under a fresh messageId, and the head-start user message never reached the hydrate hook. The first-turn history now reaches hydrateMessages as incoming messages, and the splice runs after both accumulation branches, deduped by the handover messageId. --- .changeset/chat-headstart-hydrate.md | 5 + packages/trigger-sdk/src/v3/ai.ts | 77 ++++--- .../src/v3/test/mock-chat-agent.ts | 7 + .../trigger-sdk/test/chatHandover.test.ts | 196 ++++++++++++++++++ 4 files changed, 258 insertions(+), 27 deletions(-) create mode 100644 .changeset/chat-headstart-hydrate.md diff --git a/.changeset/chat-headstart-hydrate.md b/.changeset/chat-headstart-hydrate.md new file mode 100644 index 00000000000..49e9bb926ee --- /dev/null +++ b/.changeset/chat-headstart-hydrate.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/sdk": patch +--- + +Fix `chat.headStart` when `hydrateMessages` is registered. The warm route's step-1 partial now reaches the agent's accumulator on the hydrate path, so `onTurnComplete` carries the full first turn (the head-start user message included), tool-call handovers resume from step 2 instead of re-running step 1, and the assistant `messageId` stays stable across the handover. diff --git a/packages/trigger-sdk/src/v3/ai.ts b/packages/trigger-sdk/src/v3/ai.ts index 5b448ed3336..eb33b0c37b7 100644 --- a/packages/trigger-sdk/src/v3/ai.ts +++ b/packages/trigger-sdk/src/v3/ai.ts @@ -6362,6 +6362,21 @@ function chatAgent< let cleanedUIMessages: TUIMessage[] = cleanedIncomingMessages; + // Turn-0 head-start with hydrateMessages: the boot seeding from + // `payload.headStartMessages` is non-hydrate-only, so ship the + // route handler's first-turn history to the hydrate hook as + // incoming messages instead (gated on the pending handover). + if ( + turn === 0 && + hydrateMessages && + cleanedUIMessages.length === 0 && + (locals.get(chatHandoverPartialKey)?.length ?? 0) > 0 && + Array.isArray(payload.headStartMessages) && + payload.headStartMessages.length > 0 + ) { + cleanedUIMessages = payload.headStartMessages as TUIMessage[]; + } + // Validate/transform UIMessages before conversion — catches malformed // messages from storage or untrusted input before they reach the model. // Slim wire: triggers like `regenerate-message` carry no incoming @@ -6568,32 +6583,40 @@ function chatAgent< // `preload` / `close` / `handover-prepare` and submits // with no incoming message fall through with the boot- // seeded accumulator unchanged. + } - if (turn === 0) { - // Head-start handover splice (turn 0 only): the - // `chat.handover` route handler signalled a mid-turn - // handover, so splice its partial assistant response - // (text + pending tool-calls + the synthesized - // tool-approval round) onto the accumulator. - // `streamText` then hits AI SDK's initial-tool- - // execution branch, runs the agent-side tool executes, - // and resumes from step 2 — skipping the first model - // call (already done by the handler). - // - // We also synthesize a UIMessage form of the partial - // assistant and push it to `accumulatedUIMessages` so - // AI SDK's `processUIMessageStream` (invoked when the - // run loop calls `runResult.toUIMessageStream({ - // onFinish })`) can initialize `state.message` from - // the trailing assistant in `originalMessages`. Without - // that, the `tool-output-available` chunks emitted by - // the initial-tool-execution branch can't find their - // matching tool-call in state and AI SDK throws - // `UIMessageStreamError: No tool invocation found`. - const pendingHandoverPartial = locals.get(chatHandoverPartialKey); - if (pendingHandoverPartial && pendingHandoverPartial.length > 0) { + if (turn === 0) { + // Head-start handover splice (turn 0 only, BOTH + // accumulation branches — hydrate and default): the + // `chat.handover` route handler signalled a mid-turn + // handover, so splice its partial assistant response + // (text + pending tool-calls + the synthesized + // tool-approval round) onto the accumulator. + // `streamText` then hits AI SDK's initial-tool- + // execution branch, runs the agent-side tool executes, + // and resumes from step 2 — skipping the first model + // call (already done by the handler). + // + // We also synthesize a UIMessage form of the partial + // assistant and push it to `accumulatedUIMessages` so + // AI SDK's `processUIMessageStream` (invoked when the + // run loop calls `runResult.toUIMessageStream({ + // onFinish })`) can initialize `state.message` from + // the trailing assistant in `originalMessages`. Without + // that, the `tool-output-available` chunks emitted by + // the initial-tool-execution branch can't find their + // matching tool-call in state and AI SDK throws + // `UIMessageStreamError: No tool invocation found`. + const pendingHandoverPartial = locals.get(chatHandoverPartialKey); + if (pendingHandoverPartial && pendingHandoverPartial.length > 0) { + const handoverMessageId = locals.get(chatHandoverMessageIdKey); + // Skip if the hydrated chain already persisted the + // partial under the handover messageId. + const alreadyInChain = + handoverMessageId !== undefined && + accumulatedUIMessages.some((m) => m.id === handoverMessageId); + if (!alreadyInChain) { accumulatedMessages.push(...pendingHandoverPartial); - const handoverMessageId = locals.get(chatHandoverMessageIdKey); const partialUI = synthesizeHandoverUIMessage( pendingHandoverPartial, handoverMessageId @@ -6601,13 +6624,13 @@ function chatAgent< if (partialUI) { accumulatedUIMessages.push(partialUI as TUIMessage); } - locals.set(chatHandoverPartialKey, []); // consume once } + locals.set(chatHandoverPartialKey, []); // consume once } - - locals.set(chatCurrentUIMessagesKey, accumulatedUIMessages); } + locals.set(chatCurrentUIMessagesKey, accumulatedUIMessages); + } // end if (trigger !== "action") // ── Action result handling ────────────────────────────── diff --git a/packages/trigger-sdk/src/v3/test/mock-chat-agent.ts b/packages/trigger-sdk/src/v3/test/mock-chat-agent.ts index fbcc166d14a..70eae4696d9 100644 --- a/packages/trigger-sdk/src/v3/test/mock-chat-agent.ts +++ b/packages/trigger-sdk/src/v3/test/mock-chat-agent.ts @@ -92,6 +92,12 @@ export type MockChatAgentOptions = { * `sendHandover()` / `sendHandoverSkip()` to dispatch the handover signal. */ mode?: "preload" | "submit-message" | "handover-prepare" | "continuation"; + /** + * First-turn UIMessage history shipped on the BOOT payload. Only + * meaningful with `mode: "handover-prepare"` — mirrors the + * `chat.headStart` route handler's `basePayload.headStartMessages`. + */ + headStartMessages?: UIMessage[]; /** * Pre-seed the snapshot the agent reads at run boot. The runtime's * snapshot read is replaced with one that returns this snapshot @@ -501,6 +507,7 @@ export function mockChatAgent( metadata: clientData, ...(!isContinuationMode && options.continuation ? { continuation: true } : {}), ...(options.previousRunId ? { previousRunId: options.previousRunId } : {}), + ...(options.headStartMessages ? { headStartMessages: options.headStartMessages } : {}), }; sendSessionInput = drivers.sessions.in.send; diff --git a/packages/trigger-sdk/test/chatHandover.test.ts b/packages/trigger-sdk/test/chatHandover.test.ts index 9e0d69ecb04..8b1b75da93a 100644 --- a/packages/trigger-sdk/test/chatHandover.test.ts +++ b/packages/trigger-sdk/test/chatHandover.test.ts @@ -260,6 +260,202 @@ describe("chat.handover", () => { } }); + it("pure-text head-start (isFinal: true) with hydrateMessages persists the partial (TRI-10715)", async () => { + // Same as the pure-text case above, but the customer registers + // `hydrateMessages` (the documented DB-as-source-of-truth pattern). + // The head-start user message must reach the hydrate hook as + // `incomingMessages`, and the warm route's partial must land in the + // accumulator so `onTurnComplete` carries the full first turn. + const runFn = vi.fn(); + const stored: { id: string; role: string; parts: unknown[] }[] = []; + const hydrateIncomingRoles: string[] = []; + let captured: + | { responseId?: string; responseText?: string; roles?: string[] } + | undefined; + + const agent = chat.agent({ + id: "chat.handover.hydrate-pure-text", + hydrateMessages: async ({ incomingMessages }) => { + hydrateIncomingRoles.push(...incomingMessages.map((m) => m.role)); + for (const m of incomingMessages) { + if (!stored.some((s) => s.id === m.id)) stored.push(m as (typeof stored)[number]); + } + return [...stored] as never; + }, + onTurnComplete: ({ responseMessage, uiMessages }) => { + captured = { + responseId: responseMessage?.id, + responseText: (responseMessage?.parts ?? []) + .filter((p) => p.type === "text") + .map((p) => (p as { text?: string }).text || "") + .join(""), + roles: uiMessages.map((m) => m.role), + }; + }, + run: async ({ messages, signal }) => { + runFn(); + return streamText({ + model: new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("should-not-run") }), + }), + messages, + abortSignal: signal, + }); + }, + }); + + const harness = mockChatAgent(agent, { + chatId: "test-handover-hydrate-final", + mode: "handover-prepare", + headStartMessages: [ + { id: "hs-user-1", role: "user", parts: [{ type: "text", text: "say hi" }] }, + ], + }); + + try { + await harness.sendHandover({ + partialAssistantMessage: [ + { + role: "assistant", + content: [{ type: "text", text: "Hi there, hope you're well." }], + }, + ], + messageId: "asst-hydrate-1", + isFinal: true, + }); + await new Promise((r) => setTimeout(r, 30)); + + // isFinal — the agent never calls the user's run(). + expect(runFn).not.toHaveBeenCalled(); + + // The head-start user message reached the hydrate hook as incoming. + expect(hydrateIncomingRoles).toContain("user"); + + // onTurnComplete carries the full first turn: user + the warm + // route's assistant, under the handover messageId. + expect(captured).toBeDefined(); + expect(captured!.roles).toEqual(["user", "assistant"]); + expect(captured!.responseId).toBe("asst-hydrate-1"); + expect(captured!.responseText).toBe("Hi there, hope you're well."); + } finally { + await harness.close(); + } + }); + + it("tool-call handover (isFinal: false) with hydrateMessages resumes from step 2 (TRI-10715)", async () => { + // Hydrate variant of the schema-only tool-call case: the spliced + // partial (assistant + approval round) must reach the agent's + // streamText so AI SDK executes the pending tool instead of + // re-running step 1 from scratch against an empty/short prompt. + const toolExecute = vi.fn(async ({ city }: { city: string }) => ({ city, temp: 22 })); + const weatherTool = tool({ + description: "Look up weather", + inputSchema: z.object({ city: z.string() }), + execute: toolExecute, + }); + + const stored: { id: string; role: string; parts: unknown[] }[] = []; + let runMessageRoles: string[] | undefined; + let captured: { roles?: string[]; assistantIds?: (string | undefined)[] } | undefined; + + const agent = chat.agent({ + id: "chat.handover.hydrate-schema-only-tool", + hydrateMessages: async ({ incomingMessages }) => { + for (const m of incomingMessages) { + if (!stored.some((s) => s.id === m.id)) stored.push(m as (typeof stored)[number]); + } + return [...stored] as never; + }, + onTurnComplete: ({ uiMessages }) => { + captured = { + roles: uiMessages.map((m) => m.role), + assistantIds: uiMessages.filter((m) => m.role === "assistant").map((m) => m.id), + }; + }, + run: async ({ messages, signal }) => { + runMessageRoles = messages.map((m) => m.role); + return streamText({ + model: new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("the weather in tokyo is 22°C") }), + }), + messages, + tools: { weather: weatherTool }, + abortSignal: signal, + }); + }, + }); + + const harness = mockChatAgent(agent, { + chatId: "test-handover-hydrate-tool", + mode: "handover-prepare", + headStartMessages: [ + { id: "hs-user-2", role: "user", parts: [{ type: "text", text: "weather in tokyo?" }] }, + ], + }); + + try { + const turn = await harness.sendHandover({ + isFinal: false, + messageId: "asst-hydrate-2", + partialAssistantMessage: [ + { + role: "assistant", + content: [ + { type: "text", text: "let me check the weather" }, + { + type: "tool-call", + toolCallId: "tc-h1", + toolName: "weather", + input: { city: "tokyo" }, + }, + { + type: "tool-approval-request", + approvalId: "handover-approval-h1", + toolCallId: "tc-h1", + }, + ], + }, + { + role: "tool", + content: [ + { + type: "tool-approval-response", + approvalId: "handover-approval-h1", + approved: true, + }, + ], + }, + ], + }); + await new Promise((r) => setTimeout(r, 30)); + + // The resume prompt contained the full splice: user + partial + // assistant + approval round — NOT an empty/user-only prompt. + expect(runMessageRoles).toEqual(["user", "assistant", "tool"]); + + // AI SDK's initial-tool-execution branch ran the agent-side + // execute (no step-1 re-run). + expect(toolExecute).toHaveBeenCalledWith( + expect.objectContaining({ city: "tokyo" }), + expect.anything() + ); + + // Step-2 text streamed through session.out. + const text = turn.chunks + .filter((c) => c.type === "text-delta") + .map((c) => (c as { delta: string }).delta) + .join(""); + expect(text).toContain("tokyo"); + + // One assistant in the final chain, under the handover messageId. + expect(captured).toBeDefined(); + expect(captured!.roles).toEqual(["user", "assistant"]); + expect(captured!.assistantIds).toEqual(["asst-hydrate-2"]); + } finally { + await harness.close(); + } + }); + it("onTurnStart fires after the handover signal arrives (lazy)", async () => { // Hooks should not fire during the wait — only once handover lands // and a real turn begins. Verifies the order so customers can From ba8bab39d0c126298d20f1c885d02a841413011b Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 11 Jun 2026 15:10:51 +0100 Subject: [PATCH 2/4] fix(sdk): preserve reasoning parts across the head-start handover synthesizeHandoverUIMessage only mapped text and tool-call parts, so an extended-thinking model's step-1 reasoning streamed to the browser but never reached the durable session history: onTurnComplete, chat.history, and reloads all lost it. Reasoning parts now map through with provider metadata so Anthropic thinking signatures survive the UIMessage round trip on hydrate replays. --- .changeset/chat-headstart-reasoning.md | 5 ++ packages/trigger-sdk/src/v3/ai.ts | 11 +++ .../trigger-sdk/test/chatHandover.test.ts | 69 +++++++++++++++++++ 3 files changed, 85 insertions(+) create mode 100644 .changeset/chat-headstart-reasoning.md diff --git a/.changeset/chat-headstart-reasoning.md b/.changeset/chat-headstart-reasoning.md new file mode 100644 index 00000000000..a53d388c561 --- /dev/null +++ b/.changeset/chat-headstart-reasoning.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/sdk": patch +--- + +Preserve reasoning parts across the `chat.headStart` handover. Extended-thinking models' step-1 reasoning now lands in the durable session history (and `onTurnComplete`) under the same assistant `messageId`, with provider metadata intact so Anthropic thinking signatures survive replays. diff --git a/packages/trigger-sdk/src/v3/ai.ts b/packages/trigger-sdk/src/v3/ai.ts index eb33b0c37b7..e5263324646 100644 --- a/packages/trigger-sdk/src/v3/ai.ts +++ b/packages/trigger-sdk/src/v3/ai.ts @@ -1810,6 +1810,9 @@ const chatHandoverIsFinalKey = locals.create("chat.handoverIsFinal"); * `tool-approval-response` rows are AI-SDK-internal and don't need a * UIMessage representation. We map: * - `text` parts → `{ type: "text", text }` + * - `reasoning` parts → `{ type: "reasoning", text, state: "done" }` + * (provider metadata carried so an Anthropic thinking signature + * survives a UIMessage → ModelMessage round trip) * - `tool-call` parts → `{ type: "tool-${name}", toolCallId, * state: "input-available", input }` * - `tool-approval-request` parts → skipped (AI SDK derives the @@ -1831,9 +1834,17 @@ function synthesizeHandoverUIMessage( toolCallId?: string; toolName?: string; input?: unknown; + providerOptions?: unknown; }>) { if (part.type === "text" && typeof part.text === "string") { parts.push({ type: "text", text: part.text } as UIMessage["parts"][number]); + } else if (part.type === "reasoning" && typeof part.text === "string") { + parts.push({ + type: "reasoning", + text: part.text, + state: "done", + ...(part.providerOptions ? { providerMetadata: part.providerOptions } : {}), + } as unknown as UIMessage["parts"][number]); } else if (part.type === "tool-call" && part.toolCallId && part.toolName) { parts.push({ type: `tool-${part.toolName}`, diff --git a/packages/trigger-sdk/test/chatHandover.test.ts b/packages/trigger-sdk/test/chatHandover.test.ts index 8b1b75da93a..72b3fa7e448 100644 --- a/packages/trigger-sdk/test/chatHandover.test.ts +++ b/packages/trigger-sdk/test/chatHandover.test.ts @@ -260,6 +260,75 @@ describe("chat.handover", () => { } }); + it("pure-text head-start preserves reasoning parts in the response (TRI-10716)", async () => { + // Extended-thinking models stream a reasoning part in step 1. The + // synthesized partial must carry it (with provider metadata, so an + // Anthropic signature survives a UIMessage -> ModelMessage round + // trip) or the durable history loses the step-1 thinking. + let captured: + | { partTypes?: string[]; reasoningText?: string; meta?: unknown } + | undefined; + + const agent = chat.agent({ + id: "chat.handover.reasoning", + onTurnComplete: ({ responseMessage }) => { + const parts = responseMessage?.parts ?? []; + captured = { + partTypes: parts.map((p) => p.type), + reasoningText: parts + .filter((p) => p.type === "reasoning") + .map((p) => (p as { text?: string }).text || "") + .join(""), + meta: (parts.find((p) => p.type === "reasoning") as + | { providerMetadata?: unknown } + | undefined)?.providerMetadata, + }; + }, + run: async ({ messages, signal }) => { + return streamText({ + model: new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("should-not-run") }), + }), + messages, + abortSignal: signal, + }); + }, + }); + + const harness = mockChatAgent(agent, { + chatId: "test-handover-reasoning", + mode: "handover-prepare", + }); + + try { + await harness.sendHandover({ + partialAssistantMessage: [ + { + role: "assistant", + content: [ + { + type: "reasoning", + text: "thinking about the greeting", + providerOptions: { anthropic: { signature: "sig-abc" } }, + }, + { type: "text", text: "Hello!" }, + ], + }, + ], + messageId: "asst-reason-1", + isFinal: true, + }); + await new Promise((r) => setTimeout(r, 30)); + + expect(captured).toBeDefined(); + expect(captured!.partTypes).toEqual(["reasoning", "text"]); + expect(captured!.reasoningText).toBe("thinking about the greeting"); + expect(captured!.meta).toEqual({ anthropic: { signature: "sig-abc" } }); + } finally { + await harness.close(); + } + }); + it("pure-text head-start (isFinal: true) with hydrateMessages persists the partial (TRI-10715)", async () => { // Same as the pure-text case above, but the customer registers // `hydrateMessages` (the documented DB-as-source-of-truth pattern). From 4a88e921e0d50720b8ae357a6f54e2e82d57a537 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 11 Jun 2026 16:04:36 +0100 Subject: [PATCH 3/4] fix(sdk,core): stop continuation chat boots idle-waiting on the session cursor scan The .in resume cursor was found by draining an SSE subscription that only closes after its 5 second inactivity window, and the scan ran twice per continuation boot (once for the replay cursor, once for the subscribe cursor), stalling every continuation around 10 seconds before the first turn. The scan is now a non-blocking records read of the latest turn-complete header, runs at most once per boot, the snapshot and replay reads run concurrently, and chat snapshots carry the cursor so subsequent boots skip the scan entirely. --- .changeset/chat-boot-cursor.md | 6 + packages/core/src/v3/schemas/api.ts | 5 + .../src/v3/sessionStreams/chatSnapshot.ts | 9 + packages/trigger-sdk/src/v3/ai.ts | 200 +++++++++++------- .../trigger-sdk/test/mockChatAgent.test.ts | 5 + .../test/replay-session-in.test.ts | 80 ++++++- 6 files changed, 225 insertions(+), 80 deletions(-) create mode 100644 .changeset/chat-boot-cursor.md diff --git a/.changeset/chat-boot-cursor.md b/.changeset/chat-boot-cursor.md new file mode 100644 index 00000000000..eb1b7a41c98 --- /dev/null +++ b/.changeset/chat-boot-cursor.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/sdk": patch +"@trigger.dev/core": patch +--- + +Continuation chat boots no longer stall for around 10 seconds before the first turn. The `session.in` resume cursor is now found with a non-blocking records read instead of draining an SSE long-poll (which always waited out its full 5 second inactivity window, twice per boot), the boot reads run concurrently, and chat snapshots carry the cursor so subsequent boots skip the scan entirely. diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index edd9d18ca61..a8a379f9307 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -2051,6 +2051,11 @@ export const ReadSessionStreamRecordsResponseBody = z.object({ data: z.unknown(), id: z.string(), seqNum: z.number(), + // S2 record headers — present on Trigger control records (e.g. + // `trigger-control: turn-complete` plus sibling headers). The + // server has always serialized them; older schemas stripped them + // client-side, so treat as optional. + headers: z.array(z.tuple([z.string(), z.string()])).optional(), }) ), }); diff --git a/packages/core/src/v3/sessionStreams/chatSnapshot.ts b/packages/core/src/v3/sessionStreams/chatSnapshot.ts index f30ea47b46b..84b204a134d 100644 --- a/packages/core/src/v3/sessionStreams/chatSnapshot.ts +++ b/packages/core/src/v3/sessionStreams/chatSnapshot.ts @@ -26,6 +26,14 @@ export type ChatSnapshotV1 = { savedAt: number; messages: TUIMessage[]; lastOutEventId?: string; + /** + * Committed `.in` consume cursor (S2 seq_num, stringified) as of this + * snapshot's turn-complete. Lets the next boot seed the `.in` resume + * cursor without scanning `session.out` for the latest turn-complete + * header. Absent on snapshots written before this field existed — + * readers fall back to the scan. + */ + lastInEventId?: string; }; /** @@ -39,6 +47,7 @@ export const ChatSnapshotV1Schema = z.object({ savedAt: z.number(), messages: z.array(z.unknown()), lastOutEventId: z.string().optional(), + lastInEventId: z.string().optional(), }); /** diff --git a/packages/trigger-sdk/src/v3/ai.ts b/packages/trigger-sdk/src/v3/ai.ts index e5263324646..47ce138dbee 100644 --- a/packages/trigger-sdk/src/v3/ai.ts +++ b/packages/trigger-sdk/src/v3/ai.ts @@ -183,51 +183,44 @@ const lastTurnCompleteSeqNumKey = locals.create<{ value: number | undefined }>( * the `.in` subscription so already-processed user messages don't get * replayed from S2. * - * Implementation streams the SSE endpoint and listens for `turn-complete` - * via the transport's `onControl` callback; the data-chunk for-await is - * just there to drive the stream. The scan is O(1 turn) because - * `session.out` is bounded to roughly one turn at steady state — every - * successful turn-complete is followed by an S2 trim back to the - * previous one (see `writeTurnCompleteChunk`). + * Implementation is a non-blocking records read (`wait=0`) — the + * endpoint returns everything currently stored (including pre-trim + * records, since S2 trims are eventually consistent) in one shot, and + * we keep the LAST matching header. The previous SSE-based scan had to + * idle-wait a full 5s window to know it reached the tail, which put a + * constant ~6s tax on every continuation boot. * * Returns `undefined` if no `turn-complete` carrying the header has been * written yet — first-turn-ever, first turn post-OOM-with-no-prior-runs, - * or a `turn-complete` written before this header existed (cross-version - * boot). Callers fall back to subscribing `.in` from seq 0 in that case; - * the slim-wire merge handles any dedup against snapshot-restored - * messages. + * a `turn-complete` written before this header existed, or a server old + * enough that the records endpoint doesn't serialize headers. Callers + * fall back to subscribing `.in` from seq 0 in that case; the slim-wire + * merge handles any dedup against snapshot-restored messages. * @internal */ async function findLatestSessionInCursor( chatId: string ): Promise { const apiClient = apiClientManager.clientOrThrow(); + const response = await apiClient.readSessionStreamRecords(chatId, "out"); let latestCursor: number | undefined; - const stream = await apiClient.subscribeToSessionStream(chatId, "out", { - // 5s rather than 1s: S2 trim is eventually-consistent (10-60s - // window), so a worker booting just after a trim could still see - // pre-trim records and need a bit longer to drain them all before - // the SSE long-poll closes. Without enough headroom the scan would - // fall back to `undefined`, the `.in` cursor wouldn't be seeded, - // and the next subscribe would replay messages already processed. - timeoutInSeconds: 5, - onControl: (event) => { - if (event.subtype !== TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) return; - const raw = headerValue(event.headers, SESSION_IN_EVENT_ID_HEADER); - if (!raw) return; - const parsed = Number.parseInt(raw, 10); - if (Number.isFinite(parsed)) latestCursor = parsed; - }, - }); - // Drain the stream so the underlying SSE reader runs to completion. We - // don't accumulate chunks; `onControl` fires inline as turn-complete - // records arrive. - for await (const _ of stream) { - // intentionally empty + for (const record of response.records) { + if (controlSubtype(record.headers) !== TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) continue; + const raw = headerValue(record.headers, SESSION_IN_EVENT_ID_HEADER); + if (!raw) continue; + const parsed = Number.parseInt(raw, 10); + if (Number.isFinite(parsed)) latestCursor = parsed; } return latestCursor; } +/** Test-only entry point for the records-based cursor scan. @internal */ +export async function __findLatestSessionInCursorForTests( + chatId: string +): Promise { + return findLatestSessionInCursor(chatId); +} + /** * Versioned blob written to S3 after every turn completes (when no * `hydrateMessages` hook is registered). Read at run boot to seed the @@ -5320,6 +5313,12 @@ function chatAgent< const couldHavePriorState = payload.continuation === true || ctx.attempt.number > 1; + // `.in` resume cursor, computed at most once per boot. The boot + // block below resolves it (snapshot field or records scan) and the + // resume-cursor block reuses it instead of re-scanning. + let bootInCursor: number | undefined; + let bootInCursorResolved = false; + if (!hydrateMessages && couldHavePriorState) { // Single parent span for the whole boot read phase — snapshot // read, session.out replay, session.in replay. Per-phase timing @@ -5363,34 +5362,39 @@ function chatAgent< } } - // session.out replay - const replayOutStart = Date.now(); - try { - const replayResult = await replaySessionOutTail( - sessionIdForSnapshot, - { lastEventId: bootSnapshot?.lastOutEventId } + // The `.out` replay and the `.in` cursor + tail read are + // independent (both depend only on the snapshot) — run them + // concurrently. Each phase keeps its own catch + duration + // attribute. + const replayOutPhase = async () => { + const replayOutStart = Date.now(); + try { + const replayResult = await replaySessionOutTail( + sessionIdForSnapshot, + { lastEventId: bootSnapshot?.lastOutEventId } + ); + replayedSettled = replayResult.settled; + replayedPartial = replayResult.partial; + replayedPartialRaw = replayResult.partialRaw; + } catch (error) { + logger.warn( + "chat.agent: session.out replay failed; using snapshot only", + { + error: error instanceof Error ? error.message : String(error), + sessionId: sessionIdForSnapshot, + } + ); + } + bootSpan.setAttribute( + "chat.boot.replay.out.durationMs", + Date.now() - replayOutStart ); - replayedSettled = replayResult.settled; - replayedPartial = replayResult.partial; - replayedPartialRaw = replayResult.partialRaw; - } catch (error) { - logger.warn( - "chat.agent: session.out replay failed; using snapshot only", - { - error: error instanceof Error ? error.message : String(error), - sessionId: sessionIdForSnapshot, - } + bootSpan.setAttribute("chat.boot.replay.out.settledCount", replayedSettled.length); + bootSpan.setAttribute( + "chat.boot.replay.out.partialPresent", + replayedPartial !== undefined ); - } - bootSpan.setAttribute( - "chat.boot.replay.out.durationMs", - Date.now() - replayOutStart - ); - bootSpan.setAttribute("chat.boot.replay.out.settledCount", replayedSettled.length); - bootSpan.setAttribute( - "chat.boot.replay.out.partialPresent", - replayedPartial !== undefined - ); + }; // session.in tail read // @@ -5402,28 +5406,49 @@ function chatAgent< // visible via the live SSE subscription — by which point they // would arrive AFTER the partial-assistant orphan and look like // brand-new turns to the model, producing inverted chains. - const replayInStart = Date.now(); - const lastInEventId = await findLatestSessionInCursor(payload.chatId) - .then((cursor) => (cursor !== undefined ? String(cursor) : undefined)) - .catch(() => undefined); - try { - replayedInTail = await replaySessionInTail(payload.chatId, { - lastEventId: lastInEventId, - }); - } catch (error) { - logger.warn( - "chat.agent: session.in replay failed; in-flight users may not be recovered", - { error: error instanceof Error ? error.message : String(error) } + // + // The cursor comes from the snapshot when present (written + // there since `lastInEventId` was added) — otherwise from a + // records scan of `.out`'s latest turn-complete header. + const replayInPhase = async () => { + const replayInStart = Date.now(); + const snapshotInCursor = + bootSnapshot?.lastInEventId !== undefined + ? Number.parseInt(bootSnapshot.lastInEventId, 10) + : undefined; + if (snapshotInCursor !== undefined && Number.isFinite(snapshotInCursor)) { + bootInCursor = snapshotInCursor; + } else { + bootInCursor = await findLatestSessionInCursor(payload.chatId).catch( + () => undefined + ); + } + bootInCursorResolved = true; + bootSpan.setAttribute( + "chat.boot.replay.in.cursorFromSnapshot", + snapshotInCursor !== undefined ); - } - bootSpan.setAttribute( - "chat.boot.replay.in.durationMs", - Date.now() - replayInStart - ); - bootSpan.setAttribute( - "chat.boot.replay.in.userCount", - replayedInTail.length - ); + try { + replayedInTail = await replaySessionInTail(payload.chatId, { + lastEventId: bootInCursor !== undefined ? String(bootInCursor) : undefined, + }); + } catch (error) { + logger.warn( + "chat.agent: session.in replay failed; in-flight users may not be recovered", + { error: error instanceof Error ? error.message : String(error) } + ); + } + bootSpan.setAttribute( + "chat.boot.replay.in.durationMs", + Date.now() - replayInStart + ); + bootSpan.setAttribute( + "chat.boot.replay.in.userCount", + replayedInTail.length + ); + }; + + await Promise.all([replayOutPhase(), replayInPhase()]); }, { attributes: { @@ -5469,7 +5494,12 @@ function chatAgent< if (needsResumeCursor) { try { - const cursor = await findLatestSessionInCursor(payload.chatId); + // Reuse the cursor the boot block already resolved (snapshot + // field or records scan) — only scan here when the boot block + // was skipped (hydrateMessages, or snapshot-only signals). + const cursor = bootInCursorResolved + ? bootInCursor + : await findLatestSessionInCursor(payload.chatId); if (cursor !== undefined) { sessionStreams.setLastSeqNum(payload.chatId, "in", cursor); sessionStreams.setLastDispatchedSeqNum(payload.chatId, "in", cursor); @@ -7428,11 +7458,17 @@ function chatAgent< await tracer.startActiveSpan( "snapshot.write", async () => { + const snapshotInCursor = + getChatSession().in.lastDispatchedSeqNum(); await writeChatSnapshot(sessionIdForSnapshot, { version: 1, savedAt: Date.now(), messages: accumulatedUIMessages, lastOutEventId: turnCompleteResult?.lastEventId, + lastInEventId: + snapshotInCursor !== undefined + ? String(snapshotInCursor) + : undefined, }); }, { @@ -7687,11 +7723,17 @@ function chatAgent< // neither the snapshot nor the replayable `.in` tail. if (!hydrateMessages) { try { + const errorSnapshotInCursor = + getChatSession().in.lastDispatchedSeqNum(); await writeChatSnapshot(sessionIdForSnapshot, { version: 1, savedAt: Date.now(), messages: erroredUIMessages, lastOutEventId: errorTurnCompleteResult?.lastEventId, + lastInEventId: + errorSnapshotInCursor !== undefined + ? String(errorSnapshotInCursor) + : undefined, }); } catch (error) { logger.warn("chat.agent: error-path snapshot write failed", { diff --git a/packages/trigger-sdk/test/mockChatAgent.test.ts b/packages/trigger-sdk/test/mockChatAgent.test.ts index 5038f21ca24..c2001de723f 100644 --- a/packages/trigger-sdk/test/mockChatAgent.test.ts +++ b/packages/trigger-sdk/test/mockChatAgent.test.ts @@ -1889,6 +1889,11 @@ describe("mockChatAgent", () => { // The snapshot reflects the post-turn accumulator: 1 user + 1 assistant. const roles = snap!.messages.map((m) => m.role); expect(roles).toEqual(["user", "assistant"]); + // `lastInEventId` stays undefined here: TestSessionStreamManager + // deliberately has no seq numbers, so the committed `.in` cursor + // the production write site reads is undefined in harness runs. + // The cursor round-trip is covered by the live smoke instead. + expect(snap!.lastInEventId).toBeUndefined(); } finally { await harness.close(); } diff --git a/packages/trigger-sdk/test/replay-session-in.test.ts b/packages/trigger-sdk/test/replay-session-in.test.ts index 92a1cb6f97c..a90ecc15396 100644 --- a/packages/trigger-sdk/test/replay-session-in.test.ts +++ b/packages/trigger-sdk/test/replay-session-in.test.ts @@ -3,7 +3,10 @@ import "../src/v3/test/index.js"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { apiClientManager } from "@trigger.dev/core/v3"; -import { __replaySessionInTailProductionPathForTests as replaySessionInTail } from "../src/v3/ai.js"; +import { + __findLatestSessionInCursorForTests as findLatestSessionInCursor, + __replaySessionInTailProductionPathForTests as replaySessionInTail, +} from "../src/v3/ai.js"; // ── Helpers ──────────────────────────────────────────────────────────── @@ -135,3 +138,78 @@ describe("replaySessionInTail", () => { expect(result).toEqual([]); }); }); + +// ── Cursor scan (non-blocking records read, not an SSE drain) ────────── + +function stubReadRecordsWithHeaders( + records: Array<{ data?: unknown; headers?: Array<[string, string]> }> +) { + const shaped = records.map((r, i) => ({ + data: r.data ?? "", + id: `evt-${i + 1}`, + seqNum: i + 1, + headers: r.headers, + })); + const spy = vi.fn(async () => ({ records: shaped })); + vi.spyOn(apiClientManager, "clientOrThrow").mockReturnValue({ + readSessionStreamRecords: spy, + } as never); + return spy; +} + +describe("findLatestSessionInCursor", () => { + it("returns the LAST turn-complete's session-in-event-id", async () => { + const spy = stubReadRecordsWithHeaders([ + { data: { type: "text-delta", delta: "hi" } }, + { + headers: [ + ["trigger-control", "turn-complete"], + ["session-in-event-id", "3"], + ], + }, + { data: { type: "text-delta", delta: "again" } }, + { + headers: [ + ["trigger-control", "turn-complete"], + ["session-in-event-id", "7"], + ], + }, + ]); + + const cursor = await findLatestSessionInCursor("sess"); + expect(cursor).toBe(7); + // Non-blocking records read on `.out`, no SSE subscribe. + expect(spy).toHaveBeenCalledWith("sess", "out"); + }); + + it("ignores other control subtypes and turn-completes without the header", async () => { + stubReadRecordsWithHeaders([ + { + headers: [ + ["trigger-control", "upgrade-required"], + ["session-in-event-id", "99"], + ], + }, + { headers: [["trigger-control", "turn-complete"]] }, + { + headers: [ + ["trigger-control", "turn-complete"], + ["session-in-event-id", "4"], + ], + }, + ]); + + const cursor = await findLatestSessionInCursor("sess"); + expect(cursor).toBe(4); + }); + + it("returns undefined when records carry no headers (older server)", async () => { + stubReadRecordsWithHeaders([ + { data: { type: "text-delta", delta: "hi" } }, + { data: { type: "finish" } }, + ]); + + const cursor = await findLatestSessionInCursor("sess"); + expect(cursor).toBeUndefined(); + }); +}); From 9042df6f2a8c80275f8d8f77b75c2b440b8f95c4 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 11 Jun 2026 17:20:17 +0100 Subject: [PATCH 4/4] fix(sdk): retry the resume-cursor scan when the boot lookup fails transiently A scan that threw was treated the same as one that found no cursor, so the resume-cursor block skipped its retry and the live subscription could replay from the start. Only a successful lookup (including a genuine no-cursor-yet answer) is shared now; a throw leaves the retry available. --- packages/trigger-sdk/src/v3/ai.ts | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/packages/trigger-sdk/src/v3/ai.ts b/packages/trigger-sdk/src/v3/ai.ts index 47ce138dbee..72e0cf1080e 100644 --- a/packages/trigger-sdk/src/v3/ai.ts +++ b/packages/trigger-sdk/src/v3/ai.ts @@ -5418,12 +5418,17 @@ function chatAgent< : undefined; if (snapshotInCursor !== undefined && Number.isFinite(snapshotInCursor)) { bootInCursor = snapshotInCursor; + bootInCursorResolved = true; } else { - bootInCursor = await findLatestSessionInCursor(payload.chatId).catch( - () => undefined - ); + try { + bootInCursor = await findLatestSessionInCursor(payload.chatId); + bootInCursorResolved = true; + } catch { + // Transient scan failure: leave unresolved so the + // resume-cursor block below retries the lookup. + bootInCursor = undefined; + } } - bootInCursorResolved = true; bootSpan.setAttribute( "chat.boot.replay.in.cursorFromSnapshot", snapshotInCursor !== undefined