diff --git a/.changeset/chat-boot-cursor.md b/.changeset/chat-boot-cursor.md new file mode 100644 index 00000000000..eb1b7a41c98 --- /dev/null +++ b/.changeset/chat-boot-cursor.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/sdk": patch +"@trigger.dev/core": patch +--- + +Continuation chat boots no longer stall for around 10 seconds before the first turn. The `session.in` resume cursor is now found with a non-blocking records read instead of draining an SSE long-poll (which always waited out its full 5 second inactivity window, twice per boot), the boot reads run concurrently, and chat snapshots carry the cursor so subsequent boots skip the scan entirely. diff --git a/.changeset/chat-headstart-hydrate.md b/.changeset/chat-headstart-hydrate.md new file mode 100644 index 00000000000..49e9bb926ee --- /dev/null +++ b/.changeset/chat-headstart-hydrate.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/sdk": patch +--- + +Fix `chat.headStart` when `hydrateMessages` is registered. The warm route's step-1 partial now reaches the agent's accumulator on the hydrate path, so `onTurnComplete` carries the full first turn (the head-start user message included), tool-call handovers resume from step 2 instead of re-running step 1, and the assistant `messageId` stays stable across the handover. diff --git a/.changeset/chat-headstart-reasoning.md b/.changeset/chat-headstart-reasoning.md new file mode 100644 index 00000000000..a53d388c561 --- /dev/null +++ b/.changeset/chat-headstart-reasoning.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/sdk": patch +--- + +Preserve reasoning parts across the `chat.headStart` handover. Extended-thinking models' step-1 reasoning now lands in the durable session history (and `onTurnComplete`) under the same assistant `messageId`, with provider metadata intact so Anthropic thinking signatures survive replays. diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index edd9d18ca61..a8a379f9307 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -2051,6 +2051,11 @@ export const ReadSessionStreamRecordsResponseBody = z.object({ data: z.unknown(), id: z.string(), seqNum: z.number(), + // S2 record headers — present on Trigger control records (e.g. + // `trigger-control: turn-complete` plus sibling headers). The + // server has always serialized them; older schemas stripped them + // client-side, so treat as optional. + headers: z.array(z.tuple([z.string(), z.string()])).optional(), }) ), }); diff --git a/packages/core/src/v3/sessionStreams/chatSnapshot.ts b/packages/core/src/v3/sessionStreams/chatSnapshot.ts index f30ea47b46b..84b204a134d 100644 --- a/packages/core/src/v3/sessionStreams/chatSnapshot.ts +++ b/packages/core/src/v3/sessionStreams/chatSnapshot.ts @@ -26,6 +26,14 @@ export type ChatSnapshotV1 = { savedAt: number; messages: TUIMessage[]; lastOutEventId?: string; + /** + * Committed `.in` consume cursor (S2 seq_num, stringified) as of this + * snapshot's turn-complete. Lets the next boot seed the `.in` resume + * cursor without scanning `session.out` for the latest turn-complete + * header. Absent on snapshots written before this field existed — + * readers fall back to the scan. + */ + lastInEventId?: string; }; /** @@ -39,6 +47,7 @@ export const ChatSnapshotV1Schema = z.object({ savedAt: z.number(), messages: z.array(z.unknown()), lastOutEventId: z.string().optional(), + lastInEventId: z.string().optional(), }); /** diff --git a/packages/trigger-sdk/src/v3/ai.ts b/packages/trigger-sdk/src/v3/ai.ts index 5b448ed3336..72e0cf1080e 100644 --- a/packages/trigger-sdk/src/v3/ai.ts +++ b/packages/trigger-sdk/src/v3/ai.ts @@ -183,51 +183,44 @@ const lastTurnCompleteSeqNumKey = locals.create<{ value: number | undefined }>( * the `.in` subscription so already-processed user messages don't get * replayed from S2. * - * Implementation streams the SSE endpoint and listens for `turn-complete` - * via the transport's `onControl` callback; the data-chunk for-await is - * just there to drive the stream. The scan is O(1 turn) because - * `session.out` is bounded to roughly one turn at steady state — every - * successful turn-complete is followed by an S2 trim back to the - * previous one (see `writeTurnCompleteChunk`). + * Implementation is a non-blocking records read (`wait=0`) — the + * endpoint returns everything currently stored (including pre-trim + * records, since S2 trims are eventually consistent) in one shot, and + * we keep the LAST matching header. The previous SSE-based scan had to + * idle-wait a full 5s window to know it reached the tail, which put a + * constant ~6s tax on every continuation boot. * * Returns `undefined` if no `turn-complete` carrying the header has been * written yet — first-turn-ever, first turn post-OOM-with-no-prior-runs, - * or a `turn-complete` written before this header existed (cross-version - * boot). Callers fall back to subscribing `.in` from seq 0 in that case; - * the slim-wire merge handles any dedup against snapshot-restored - * messages. + * a `turn-complete` written before this header existed, or a server old + * enough that the records endpoint doesn't serialize headers. Callers + * fall back to subscribing `.in` from seq 0 in that case; the slim-wire + * merge handles any dedup against snapshot-restored messages. * @internal */ async function findLatestSessionInCursor( chatId: string ): Promise { const apiClient = apiClientManager.clientOrThrow(); + const response = await apiClient.readSessionStreamRecords(chatId, "out"); let latestCursor: number | undefined; - const stream = await apiClient.subscribeToSessionStream(chatId, "out", { - // 5s rather than 1s: S2 trim is eventually-consistent (10-60s - // window), so a worker booting just after a trim could still see - // pre-trim records and need a bit longer to drain them all before - // the SSE long-poll closes. Without enough headroom the scan would - // fall back to `undefined`, the `.in` cursor wouldn't be seeded, - // and the next subscribe would replay messages already processed. - timeoutInSeconds: 5, - onControl: (event) => { - if (event.subtype !== TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) return; - const raw = headerValue(event.headers, SESSION_IN_EVENT_ID_HEADER); - if (!raw) return; - const parsed = Number.parseInt(raw, 10); - if (Number.isFinite(parsed)) latestCursor = parsed; - }, - }); - // Drain the stream so the underlying SSE reader runs to completion. We - // don't accumulate chunks; `onControl` fires inline as turn-complete - // records arrive. - for await (const _ of stream) { - // intentionally empty + for (const record of response.records) { + if (controlSubtype(record.headers) !== TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) continue; + const raw = headerValue(record.headers, SESSION_IN_EVENT_ID_HEADER); + if (!raw) continue; + const parsed = Number.parseInt(raw, 10); + if (Number.isFinite(parsed)) latestCursor = parsed; } return latestCursor; } +/** Test-only entry point for the records-based cursor scan. @internal */ +export async function __findLatestSessionInCursorForTests( + chatId: string +): Promise { + return findLatestSessionInCursor(chatId); +} + /** * Versioned blob written to S3 after every turn completes (when no * `hydrateMessages` hook is registered). Read at run boot to seed the @@ -1810,6 +1803,9 @@ const chatHandoverIsFinalKey = locals.create("chat.handoverIsFinal"); * `tool-approval-response` rows are AI-SDK-internal and don't need a * UIMessage representation. We map: * - `text` parts → `{ type: "text", text }` + * - `reasoning` parts → `{ type: "reasoning", text, state: "done" }` + * (provider metadata carried so an Anthropic thinking signature + * survives a UIMessage → ModelMessage round trip) * - `tool-call` parts → `{ type: "tool-${name}", toolCallId, * state: "input-available", input }` * - `tool-approval-request` parts → skipped (AI SDK derives the @@ -1831,9 +1827,17 @@ function synthesizeHandoverUIMessage( toolCallId?: string; toolName?: string; input?: unknown; + providerOptions?: unknown; }>) { if (part.type === "text" && typeof part.text === "string") { parts.push({ type: "text", text: part.text } as UIMessage["parts"][number]); + } else if (part.type === "reasoning" && typeof part.text === "string") { + parts.push({ + type: "reasoning", + text: part.text, + state: "done", + ...(part.providerOptions ? { providerMetadata: part.providerOptions } : {}), + } as unknown as UIMessage["parts"][number]); } else if (part.type === "tool-call" && part.toolCallId && part.toolName) { parts.push({ type: `tool-${part.toolName}`, @@ -5309,6 +5313,12 @@ function chatAgent< const couldHavePriorState = payload.continuation === true || ctx.attempt.number > 1; + // `.in` resume cursor, computed at most once per boot. The boot + // block below resolves it (snapshot field or records scan) and the + // resume-cursor block reuses it instead of re-scanning. + let bootInCursor: number | undefined; + let bootInCursorResolved = false; + if (!hydrateMessages && couldHavePriorState) { // Single parent span for the whole boot read phase — snapshot // read, session.out replay, session.in replay. Per-phase timing @@ -5352,34 +5362,39 @@ function chatAgent< } } - // session.out replay - const replayOutStart = Date.now(); - try { - const replayResult = await replaySessionOutTail( - sessionIdForSnapshot, - { lastEventId: bootSnapshot?.lastOutEventId } + // The `.out` replay and the `.in` cursor + tail read are + // independent (both depend only on the snapshot) — run them + // concurrently. Each phase keeps its own catch + duration + // attribute. + const replayOutPhase = async () => { + const replayOutStart = Date.now(); + try { + const replayResult = await replaySessionOutTail( + sessionIdForSnapshot, + { lastEventId: bootSnapshot?.lastOutEventId } + ); + replayedSettled = replayResult.settled; + replayedPartial = replayResult.partial; + replayedPartialRaw = replayResult.partialRaw; + } catch (error) { + logger.warn( + "chat.agent: session.out replay failed; using snapshot only", + { + error: error instanceof Error ? error.message : String(error), + sessionId: sessionIdForSnapshot, + } + ); + } + bootSpan.setAttribute( + "chat.boot.replay.out.durationMs", + Date.now() - replayOutStart ); - replayedSettled = replayResult.settled; - replayedPartial = replayResult.partial; - replayedPartialRaw = replayResult.partialRaw; - } catch (error) { - logger.warn( - "chat.agent: session.out replay failed; using snapshot only", - { - error: error instanceof Error ? error.message : String(error), - sessionId: sessionIdForSnapshot, - } + bootSpan.setAttribute("chat.boot.replay.out.settledCount", replayedSettled.length); + bootSpan.setAttribute( + "chat.boot.replay.out.partialPresent", + replayedPartial !== undefined ); - } - bootSpan.setAttribute( - "chat.boot.replay.out.durationMs", - Date.now() - replayOutStart - ); - bootSpan.setAttribute("chat.boot.replay.out.settledCount", replayedSettled.length); - bootSpan.setAttribute( - "chat.boot.replay.out.partialPresent", - replayedPartial !== undefined - ); + }; // session.in tail read // @@ -5391,28 +5406,54 @@ function chatAgent< // visible via the live SSE subscription — by which point they // would arrive AFTER the partial-assistant orphan and look like // brand-new turns to the model, producing inverted chains. - const replayInStart = Date.now(); - const lastInEventId = await findLatestSessionInCursor(payload.chatId) - .then((cursor) => (cursor !== undefined ? String(cursor) : undefined)) - .catch(() => undefined); - try { - replayedInTail = await replaySessionInTail(payload.chatId, { - lastEventId: lastInEventId, - }); - } catch (error) { - logger.warn( - "chat.agent: session.in replay failed; in-flight users may not be recovered", - { error: error instanceof Error ? error.message : String(error) } + // + // The cursor comes from the snapshot when present (written + // there since `lastInEventId` was added) — otherwise from a + // records scan of `.out`'s latest turn-complete header. + const replayInPhase = async () => { + const replayInStart = Date.now(); + const snapshotInCursor = + bootSnapshot?.lastInEventId !== undefined + ? Number.parseInt(bootSnapshot.lastInEventId, 10) + : undefined; + if (snapshotInCursor !== undefined && Number.isFinite(snapshotInCursor)) { + bootInCursor = snapshotInCursor; + bootInCursorResolved = true; + } else { + try { + bootInCursor = await findLatestSessionInCursor(payload.chatId); + bootInCursorResolved = true; + } catch { + // Transient scan failure: leave unresolved so the + // resume-cursor block below retries the lookup. + bootInCursor = undefined; + } + } + bootSpan.setAttribute( + "chat.boot.replay.in.cursorFromSnapshot", + snapshotInCursor !== undefined ); - } - bootSpan.setAttribute( - "chat.boot.replay.in.durationMs", - Date.now() - replayInStart - ); - bootSpan.setAttribute( - "chat.boot.replay.in.userCount", - replayedInTail.length - ); + try { + replayedInTail = await replaySessionInTail(payload.chatId, { + lastEventId: bootInCursor !== undefined ? String(bootInCursor) : undefined, + }); + } catch (error) { + logger.warn( + "chat.agent: session.in replay failed; in-flight users may not be recovered", + { error: error instanceof Error ? error.message : String(error) } + ); + } + bootSpan.setAttribute( + "chat.boot.replay.in.durationMs", + Date.now() - replayInStart + ); + bootSpan.setAttribute( + "chat.boot.replay.in.userCount", + replayedInTail.length + ); + }; + + await Promise.all([replayOutPhase(), replayInPhase()]); }, { attributes: { @@ -5458,7 +5499,12 @@ function chatAgent< if (needsResumeCursor) { try { - const cursor = await findLatestSessionInCursor(payload.chatId); + // Reuse the cursor the boot block already resolved (snapshot + // field or records scan) — only scan here when the boot block + // was skipped (hydrateMessages, or snapshot-only signals). + const cursor = bootInCursorResolved + ? bootInCursor + : await findLatestSessionInCursor(payload.chatId); if (cursor !== undefined) { sessionStreams.setLastSeqNum(payload.chatId, "in", cursor); sessionStreams.setLastDispatchedSeqNum(payload.chatId, "in", cursor); @@ -6362,6 +6408,21 @@ function chatAgent< let cleanedUIMessages: TUIMessage[] = cleanedIncomingMessages; + // Turn-0 head-start with hydrateMessages: the boot seeding from + // `payload.headStartMessages` is non-hydrate-only, so ship the + // route handler's first-turn history to the hydrate hook as + // incoming messages instead (gated on the pending handover). + if ( + turn === 0 && + hydrateMessages && + cleanedUIMessages.length === 0 && + (locals.get(chatHandoverPartialKey)?.length ?? 0) > 0 && + Array.isArray(payload.headStartMessages) && + payload.headStartMessages.length > 0 + ) { + cleanedUIMessages = payload.headStartMessages as TUIMessage[]; + } + // Validate/transform UIMessages before conversion — catches malformed // messages from storage or untrusted input before they reach the model. // Slim wire: triggers like `regenerate-message` carry no incoming @@ -6568,32 +6629,40 @@ function chatAgent< // `preload` / `close` / `handover-prepare` and submits // with no incoming message fall through with the boot- // seeded accumulator unchanged. + } - if (turn === 0) { - // Head-start handover splice (turn 0 only): the - // `chat.handover` route handler signalled a mid-turn - // handover, so splice its partial assistant response - // (text + pending tool-calls + the synthesized - // tool-approval round) onto the accumulator. - // `streamText` then hits AI SDK's initial-tool- - // execution branch, runs the agent-side tool executes, - // and resumes from step 2 — skipping the first model - // call (already done by the handler). - // - // We also synthesize a UIMessage form of the partial - // assistant and push it to `accumulatedUIMessages` so - // AI SDK's `processUIMessageStream` (invoked when the - // run loop calls `runResult.toUIMessageStream({ - // onFinish })`) can initialize `state.message` from - // the trailing assistant in `originalMessages`. Without - // that, the `tool-output-available` chunks emitted by - // the initial-tool-execution branch can't find their - // matching tool-call in state and AI SDK throws - // `UIMessageStreamError: No tool invocation found`. - const pendingHandoverPartial = locals.get(chatHandoverPartialKey); - if (pendingHandoverPartial && pendingHandoverPartial.length > 0) { + if (turn === 0) { + // Head-start handover splice (turn 0 only, BOTH + // accumulation branches — hydrate and default): the + // `chat.handover` route handler signalled a mid-turn + // handover, so splice its partial assistant response + // (text + pending tool-calls + the synthesized + // tool-approval round) onto the accumulator. + // `streamText` then hits AI SDK's initial-tool- + // execution branch, runs the agent-side tool executes, + // and resumes from step 2 — skipping the first model + // call (already done by the handler). + // + // We also synthesize a UIMessage form of the partial + // assistant and push it to `accumulatedUIMessages` so + // AI SDK's `processUIMessageStream` (invoked when the + // run loop calls `runResult.toUIMessageStream({ + // onFinish })`) can initialize `state.message` from + // the trailing assistant in `originalMessages`. Without + // that, the `tool-output-available` chunks emitted by + // the initial-tool-execution branch can't find their + // matching tool-call in state and AI SDK throws + // `UIMessageStreamError: No tool invocation found`. + const pendingHandoverPartial = locals.get(chatHandoverPartialKey); + if (pendingHandoverPartial && pendingHandoverPartial.length > 0) { + const handoverMessageId = locals.get(chatHandoverMessageIdKey); + // Skip if the hydrated chain already persisted the + // partial under the handover messageId. + const alreadyInChain = + handoverMessageId !== undefined && + accumulatedUIMessages.some((m) => m.id === handoverMessageId); + if (!alreadyInChain) { accumulatedMessages.push(...pendingHandoverPartial); - const handoverMessageId = locals.get(chatHandoverMessageIdKey); const partialUI = synthesizeHandoverUIMessage( pendingHandoverPartial, handoverMessageId @@ -6601,13 +6670,13 @@ function chatAgent< if (partialUI) { accumulatedUIMessages.push(partialUI as TUIMessage); } - locals.set(chatHandoverPartialKey, []); // consume once } + locals.set(chatHandoverPartialKey, []); // consume once } - - locals.set(chatCurrentUIMessagesKey, accumulatedUIMessages); } + locals.set(chatCurrentUIMessagesKey, accumulatedUIMessages); + } // end if (trigger !== "action") // ── Action result handling ────────────────────────────── @@ -7394,11 +7463,17 @@ function chatAgent< await tracer.startActiveSpan( "snapshot.write", async () => { + const snapshotInCursor = + getChatSession().in.lastDispatchedSeqNum(); await writeChatSnapshot(sessionIdForSnapshot, { version: 1, savedAt: Date.now(), messages: accumulatedUIMessages, lastOutEventId: turnCompleteResult?.lastEventId, + lastInEventId: + snapshotInCursor !== undefined + ? String(snapshotInCursor) + : undefined, }); }, { @@ -7653,11 +7728,17 @@ function chatAgent< // neither the snapshot nor the replayable `.in` tail. if (!hydrateMessages) { try { + const errorSnapshotInCursor = + getChatSession().in.lastDispatchedSeqNum(); await writeChatSnapshot(sessionIdForSnapshot, { version: 1, savedAt: Date.now(), messages: erroredUIMessages, lastOutEventId: errorTurnCompleteResult?.lastEventId, + lastInEventId: + errorSnapshotInCursor !== undefined + ? String(errorSnapshotInCursor) + : undefined, }); } catch (error) { logger.warn("chat.agent: error-path snapshot write failed", { diff --git a/packages/trigger-sdk/src/v3/test/mock-chat-agent.ts b/packages/trigger-sdk/src/v3/test/mock-chat-agent.ts index fbcc166d14a..70eae4696d9 100644 --- a/packages/trigger-sdk/src/v3/test/mock-chat-agent.ts +++ b/packages/trigger-sdk/src/v3/test/mock-chat-agent.ts @@ -92,6 +92,12 @@ export type MockChatAgentOptions = { * `sendHandover()` / `sendHandoverSkip()` to dispatch the handover signal. */ mode?: "preload" | "submit-message" | "handover-prepare" | "continuation"; + /** + * First-turn UIMessage history shipped on the BOOT payload. Only + * meaningful with `mode: "handover-prepare"` — mirrors the + * `chat.headStart` route handler's `basePayload.headStartMessages`. + */ + headStartMessages?: UIMessage[]; /** * Pre-seed the snapshot the agent reads at run boot. The runtime's * snapshot read is replaced with one that returns this snapshot @@ -501,6 +507,7 @@ export function mockChatAgent( metadata: clientData, ...(!isContinuationMode && options.continuation ? { continuation: true } : {}), ...(options.previousRunId ? { previousRunId: options.previousRunId } : {}), + ...(options.headStartMessages ? { headStartMessages: options.headStartMessages } : {}), }; sendSessionInput = drivers.sessions.in.send; diff --git a/packages/trigger-sdk/test/chatHandover.test.ts b/packages/trigger-sdk/test/chatHandover.test.ts index 9e0d69ecb04..72b3fa7e448 100644 --- a/packages/trigger-sdk/test/chatHandover.test.ts +++ b/packages/trigger-sdk/test/chatHandover.test.ts @@ -260,6 +260,271 @@ describe("chat.handover", () => { } }); + it("pure-text head-start preserves reasoning parts in the response (TRI-10716)", async () => { + // Extended-thinking models stream a reasoning part in step 1. The + // synthesized partial must carry it (with provider metadata, so an + // Anthropic signature survives a UIMessage -> ModelMessage round + // trip) or the durable history loses the step-1 thinking. + let captured: + | { partTypes?: string[]; reasoningText?: string; meta?: unknown } + | undefined; + + const agent = chat.agent({ + id: "chat.handover.reasoning", + onTurnComplete: ({ responseMessage }) => { + const parts = responseMessage?.parts ?? []; + captured = { + partTypes: parts.map((p) => p.type), + reasoningText: parts + .filter((p) => p.type === "reasoning") + .map((p) => (p as { text?: string }).text || "") + .join(""), + meta: (parts.find((p) => p.type === "reasoning") as + | { providerMetadata?: unknown } + | undefined)?.providerMetadata, + }; + }, + run: async ({ messages, signal }) => { + return streamText({ + model: new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("should-not-run") }), + }), + messages, + abortSignal: signal, + }); + }, + }); + + const harness = mockChatAgent(agent, { + chatId: "test-handover-reasoning", + mode: "handover-prepare", + }); + + try { + await harness.sendHandover({ + partialAssistantMessage: [ + { + role: "assistant", + content: [ + { + type: "reasoning", + text: "thinking about the greeting", + providerOptions: { anthropic: { signature: "sig-abc" } }, + }, + { type: "text", text: "Hello!" }, + ], + }, + ], + messageId: "asst-reason-1", + isFinal: true, + }); + await new Promise((r) => setTimeout(r, 30)); + + expect(captured).toBeDefined(); + expect(captured!.partTypes).toEqual(["reasoning", "text"]); + expect(captured!.reasoningText).toBe("thinking about the greeting"); + expect(captured!.meta).toEqual({ anthropic: { signature: "sig-abc" } }); + } finally { + await harness.close(); + } + }); + + it("pure-text head-start (isFinal: true) with hydrateMessages persists the partial (TRI-10715)", async () => { + // Same as the pure-text case above, but the customer registers + // `hydrateMessages` (the documented DB-as-source-of-truth pattern). + // The head-start user message must reach the hydrate hook as + // `incomingMessages`, and the warm route's partial must land in the + // accumulator so `onTurnComplete` carries the full first turn. + const runFn = vi.fn(); + const stored: { id: string; role: string; parts: unknown[] }[] = []; + const hydrateIncomingRoles: string[] = []; + let captured: + | { responseId?: string; responseText?: string; roles?: string[] } + | undefined; + + const agent = chat.agent({ + id: "chat.handover.hydrate-pure-text", + hydrateMessages: async ({ incomingMessages }) => { + hydrateIncomingRoles.push(...incomingMessages.map((m) => m.role)); + for (const m of incomingMessages) { + if (!stored.some((s) => s.id === m.id)) stored.push(m as (typeof stored)[number]); + } + return [...stored] as never; + }, + onTurnComplete: ({ responseMessage, uiMessages }) => { + captured = { + responseId: responseMessage?.id, + responseText: (responseMessage?.parts ?? []) + .filter((p) => p.type === "text") + .map((p) => (p as { text?: string }).text || "") + .join(""), + roles: uiMessages.map((m) => m.role), + }; + }, + run: async ({ messages, signal }) => { + runFn(); + return streamText({ + model: new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("should-not-run") }), + }), + messages, + abortSignal: signal, + }); + }, + }); + + const harness = mockChatAgent(agent, { + chatId: "test-handover-hydrate-final", + mode: "handover-prepare", + headStartMessages: [ + { id: "hs-user-1", role: "user", parts: [{ type: "text", text: "say hi" }] }, + ], + }); + + try { + await harness.sendHandover({ + partialAssistantMessage: [ + { + role: "assistant", + content: [{ type: "text", text: "Hi there, hope you're well." }], + }, + ], + messageId: "asst-hydrate-1", + isFinal: true, + }); + await new Promise((r) => setTimeout(r, 30)); + + // isFinal — the agent never calls the user's run(). + expect(runFn).not.toHaveBeenCalled(); + + // The head-start user message reached the hydrate hook as incoming. + expect(hydrateIncomingRoles).toContain("user"); + + // onTurnComplete carries the full first turn: user + the warm + // route's assistant, under the handover messageId. + expect(captured).toBeDefined(); + expect(captured!.roles).toEqual(["user", "assistant"]); + expect(captured!.responseId).toBe("asst-hydrate-1"); + expect(captured!.responseText).toBe("Hi there, hope you're well."); + } finally { + await harness.close(); + } + }); + + it("tool-call handover (isFinal: false) with hydrateMessages resumes from step 2 (TRI-10715)", async () => { + // Hydrate variant of the schema-only tool-call case: the spliced + // partial (assistant + approval round) must reach the agent's + // streamText so AI SDK executes the pending tool instead of + // re-running step 1 from scratch against an empty/short prompt. + const toolExecute = vi.fn(async ({ city }: { city: string }) => ({ city, temp: 22 })); + const weatherTool = tool({ + description: "Look up weather", + inputSchema: z.object({ city: z.string() }), + execute: toolExecute, + }); + + const stored: { id: string; role: string; parts: unknown[] }[] = []; + let runMessageRoles: string[] | undefined; + let captured: { roles?: string[]; assistantIds?: (string | undefined)[] } | undefined; + + const agent = chat.agent({ + id: "chat.handover.hydrate-schema-only-tool", + hydrateMessages: async ({ incomingMessages }) => { + for (const m of incomingMessages) { + if (!stored.some((s) => s.id === m.id)) stored.push(m as (typeof stored)[number]); + } + return [...stored] as never; + }, + onTurnComplete: ({ uiMessages }) => { + captured = { + roles: uiMessages.map((m) => m.role), + assistantIds: uiMessages.filter((m) => m.role === "assistant").map((m) => m.id), + }; + }, + run: async ({ messages, signal }) => { + runMessageRoles = messages.map((m) => m.role); + return streamText({ + model: new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("the weather in tokyo is 22°C") }), + }), + messages, + tools: { weather: weatherTool }, + abortSignal: signal, + }); + }, + }); + + const harness = mockChatAgent(agent, { + chatId: "test-handover-hydrate-tool", + mode: "handover-prepare", + headStartMessages: [ + { id: "hs-user-2", role: "user", parts: [{ type: "text", text: "weather in tokyo?" }] }, + ], + }); + + try { + const turn = await harness.sendHandover({ + isFinal: false, + messageId: "asst-hydrate-2", + partialAssistantMessage: [ + { + role: "assistant", + content: [ + { type: "text", text: "let me check the weather" }, + { + type: "tool-call", + toolCallId: "tc-h1", + toolName: "weather", + input: { city: "tokyo" }, + }, + { + type: "tool-approval-request", + approvalId: "handover-approval-h1", + toolCallId: "tc-h1", + }, + ], + }, + { + role: "tool", + content: [ + { + type: "tool-approval-response", + approvalId: "handover-approval-h1", + approved: true, + }, + ], + }, + ], + }); + await new Promise((r) => setTimeout(r, 30)); + + // The resume prompt contained the full splice: user + partial + // assistant + approval round — NOT an empty/user-only prompt. + expect(runMessageRoles).toEqual(["user", "assistant", "tool"]); + + // AI SDK's initial-tool-execution branch ran the agent-side + // execute (no step-1 re-run). + expect(toolExecute).toHaveBeenCalledWith( + expect.objectContaining({ city: "tokyo" }), + expect.anything() + ); + + // Step-2 text streamed through session.out. + const text = turn.chunks + .filter((c) => c.type === "text-delta") + .map((c) => (c as { delta: string }).delta) + .join(""); + expect(text).toContain("tokyo"); + + // One assistant in the final chain, under the handover messageId. + expect(captured).toBeDefined(); + expect(captured!.roles).toEqual(["user", "assistant"]); + expect(captured!.assistantIds).toEqual(["asst-hydrate-2"]); + } finally { + await harness.close(); + } + }); + it("onTurnStart fires after the handover signal arrives (lazy)", async () => { // Hooks should not fire during the wait — only once handover lands // and a real turn begins. Verifies the order so customers can diff --git a/packages/trigger-sdk/test/mockChatAgent.test.ts b/packages/trigger-sdk/test/mockChatAgent.test.ts index 5038f21ca24..c2001de723f 100644 --- a/packages/trigger-sdk/test/mockChatAgent.test.ts +++ b/packages/trigger-sdk/test/mockChatAgent.test.ts @@ -1889,6 +1889,11 @@ describe("mockChatAgent", () => { // The snapshot reflects the post-turn accumulator: 1 user + 1 assistant. const roles = snap!.messages.map((m) => m.role); expect(roles).toEqual(["user", "assistant"]); + // `lastInEventId` stays undefined here: TestSessionStreamManager + // deliberately has no seq numbers, so the committed `.in` cursor + // the production write site reads is undefined in harness runs. + // The cursor round-trip is covered by the live smoke instead. + expect(snap!.lastInEventId).toBeUndefined(); } finally { await harness.close(); } diff --git a/packages/trigger-sdk/test/replay-session-in.test.ts b/packages/trigger-sdk/test/replay-session-in.test.ts index 92a1cb6f97c..a90ecc15396 100644 --- a/packages/trigger-sdk/test/replay-session-in.test.ts +++ b/packages/trigger-sdk/test/replay-session-in.test.ts @@ -3,7 +3,10 @@ import "../src/v3/test/index.js"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { apiClientManager } from "@trigger.dev/core/v3"; -import { __replaySessionInTailProductionPathForTests as replaySessionInTail } from "../src/v3/ai.js"; +import { + __findLatestSessionInCursorForTests as findLatestSessionInCursor, + __replaySessionInTailProductionPathForTests as replaySessionInTail, +} from "../src/v3/ai.js"; // ── Helpers ──────────────────────────────────────────────────────────── @@ -135,3 +138,78 @@ describe("replaySessionInTail", () => { expect(result).toEqual([]); }); }); + +// ── Cursor scan (non-blocking records read, not an SSE drain) ────────── + +function stubReadRecordsWithHeaders( + records: Array<{ data?: unknown; headers?: Array<[string, string]> }> +) { + const shaped = records.map((r, i) => ({ + data: r.data ?? "", + id: `evt-${i + 1}`, + seqNum: i + 1, + headers: r.headers, + })); + const spy = vi.fn(async () => ({ records: shaped })); + vi.spyOn(apiClientManager, "clientOrThrow").mockReturnValue({ + readSessionStreamRecords: spy, + } as never); + return spy; +} + +describe("findLatestSessionInCursor", () => { + it("returns the LAST turn-complete's session-in-event-id", async () => { + const spy = stubReadRecordsWithHeaders([ + { data: { type: "text-delta", delta: "hi" } }, + { + headers: [ + ["trigger-control", "turn-complete"], + ["session-in-event-id", "3"], + ], + }, + { data: { type: "text-delta", delta: "again" } }, + { + headers: [ + ["trigger-control", "turn-complete"], + ["session-in-event-id", "7"], + ], + }, + ]); + + const cursor = await findLatestSessionInCursor("sess"); + expect(cursor).toBe(7); + // Non-blocking records read on `.out`, no SSE subscribe. + expect(spy).toHaveBeenCalledWith("sess", "out"); + }); + + it("ignores other control subtypes and turn-completes without the header", async () => { + stubReadRecordsWithHeaders([ + { + headers: [ + ["trigger-control", "upgrade-required"], + ["session-in-event-id", "99"], + ], + }, + { headers: [["trigger-control", "turn-complete"]] }, + { + headers: [ + ["trigger-control", "turn-complete"], + ["session-in-event-id", "4"], + ], + }, + ]); + + const cursor = await findLatestSessionInCursor("sess"); + expect(cursor).toBe(4); + }); + + it("returns undefined when records carry no headers (older server)", async () => { + stubReadRecordsWithHeaders([ + { data: { type: "text-delta", delta: "hi" } }, + { data: { type: "finish" } }, + ]); + + const cursor = await findLatestSessionInCursor("sess"); + expect(cursor).toBeUndefined(); + }); +});