Skip to content

Commit 4a88e92

Browse files
committed
fix(sdk,core): stop continuation chat boots idle-waiting on the session cursor scan
The .in resume cursor was found by draining an SSE subscription that only closes after its 5 second inactivity window, and the scan ran twice per continuation boot (once for the replay cursor, once for the subscribe cursor), stalling every continuation around 10 seconds before the first turn. The scan is now a non-blocking records read of the latest turn-complete header, runs at most once per boot, the snapshot and replay reads run concurrently, and chat snapshots carry the cursor so subsequent boots skip the scan entirely.
1 parent ba8bab3 commit 4a88e92

6 files changed

Lines changed: 225 additions & 80 deletions

File tree

.changeset/chat-boot-cursor.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Continuation chat boots no longer stall for around 10 seconds before the first turn. The `session.in` resume cursor is now found with a non-blocking records read instead of draining an SSE long-poll (which always waited out its full 5 second inactivity window, twice per boot), the boot reads run concurrently, and chat snapshots carry the cursor so subsequent boots skip the scan entirely.

packages/core/src/v3/schemas/api.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2051,6 +2051,11 @@ export const ReadSessionStreamRecordsResponseBody = z.object({
20512051
data: z.unknown(),
20522052
id: z.string(),
20532053
seqNum: z.number(),
2054+
// S2 record headers — present on Trigger control records (e.g.
2055+
// `trigger-control: turn-complete` plus sibling headers). The
2056+
// server has always serialized them; older schemas stripped them
2057+
// client-side, so treat as optional.
2058+
headers: z.array(z.tuple([z.string(), z.string()])).optional(),
20542059
})
20552060
),
20562061
});

packages/core/src/v3/sessionStreams/chatSnapshot.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ export type ChatSnapshotV1<TUIMessage extends UIMessage = UIMessage> = {
2626
savedAt: number;
2727
messages: TUIMessage[];
2828
lastOutEventId?: string;
29+
/**
30+
* Committed `.in` consume cursor (S2 seq_num, stringified) as of this
31+
* snapshot's turn-complete. Lets the next boot seed the `.in` resume
32+
* cursor without scanning `session.out` for the latest turn-complete
33+
* header. Absent on snapshots written before this field existed —
34+
* readers fall back to the scan.
35+
*/
36+
lastInEventId?: string;
2937
};
3038

3139
/**
@@ -39,6 +47,7 @@ export const ChatSnapshotV1Schema = z.object({
3947
savedAt: z.number(),
4048
messages: z.array(z.unknown()),
4149
lastOutEventId: z.string().optional(),
50+
lastInEventId: z.string().optional(),
4251
});
4352

4453
/**

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 121 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -183,51 +183,44 @@ const lastTurnCompleteSeqNumKey = locals.create<{ value: number | undefined }>(
183183
* the `.in` subscription so already-processed user messages don't get
184184
* replayed from S2.
185185
*
186-
* Implementation streams the SSE endpoint and listens for `turn-complete`
187-
* via the transport's `onControl` callback; the data-chunk for-await is
188-
* just there to drive the stream. The scan is O(1 turn) because
189-
* `session.out` is bounded to roughly one turn at steady state — every
190-
* successful turn-complete is followed by an S2 trim back to the
191-
* previous one (see `writeTurnCompleteChunk`).
186+
* Implementation is a non-blocking records read (`wait=0`) — the
187+
* endpoint returns everything currently stored (including pre-trim
188+
* records, since S2 trims are eventually consistent) in one shot, and
189+
* we keep the LAST matching header. The previous SSE-based scan had to
190+
* idle-wait a full 5s window to know it reached the tail, which put a
191+
* constant ~6s tax on every continuation boot.
192192
*
193193
* Returns `undefined` if no `turn-complete` carrying the header has been
194194
* written yet — first-turn-ever, first turn post-OOM-with-no-prior-runs,
195-
* or a `turn-complete` written before this header existed (cross-version
196-
* boot). Callers fall back to subscribing `.in` from seq 0 in that case;
197-
* the slim-wire merge handles any dedup against snapshot-restored
198-
* messages.
195+
* a `turn-complete` written before this header existed, or a server old
196+
* enough that the records endpoint doesn't serialize headers. Callers
197+
* fall back to subscribing `.in` from seq 0 in that case; the slim-wire
198+
* merge handles any dedup against snapshot-restored messages.
199199
* @internal
200200
*/
201201
async function findLatestSessionInCursor(
202202
chatId: string
203203
): Promise<number | undefined> {
204204
const apiClient = apiClientManager.clientOrThrow();
205+
const response = await apiClient.readSessionStreamRecords(chatId, "out");
205206
let latestCursor: number | undefined;
206-
const stream = await apiClient.subscribeToSessionStream<unknown>(chatId, "out", {
207-
// 5s rather than 1s: S2 trim is eventually-consistent (10-60s
208-
// window), so a worker booting just after a trim could still see
209-
// pre-trim records and need a bit longer to drain them all before
210-
// the SSE long-poll closes. Without enough headroom the scan would
211-
// fall back to `undefined`, the `.in` cursor wouldn't be seeded,
212-
// and the next subscribe would replay messages already processed.
213-
timeoutInSeconds: 5,
214-
onControl: (event) => {
215-
if (event.subtype !== TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) return;
216-
const raw = headerValue(event.headers, SESSION_IN_EVENT_ID_HEADER);
217-
if (!raw) return;
218-
const parsed = Number.parseInt(raw, 10);
219-
if (Number.isFinite(parsed)) latestCursor = parsed;
220-
},
221-
});
222-
// Drain the stream so the underlying SSE reader runs to completion. We
223-
// don't accumulate chunks; `onControl` fires inline as turn-complete
224-
// records arrive.
225-
for await (const _ of stream) {
226-
// intentionally empty
207+
for (const record of response.records) {
208+
if (controlSubtype(record.headers) !== TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) continue;
209+
const raw = headerValue(record.headers, SESSION_IN_EVENT_ID_HEADER);
210+
if (!raw) continue;
211+
const parsed = Number.parseInt(raw, 10);
212+
if (Number.isFinite(parsed)) latestCursor = parsed;
227213
}
228214
return latestCursor;
229215
}
230216

217+
/** Test-only entry point for the records-based cursor scan. @internal */
218+
export async function __findLatestSessionInCursorForTests(
219+
chatId: string
220+
): Promise<number | undefined> {
221+
return findLatestSessionInCursor(chatId);
222+
}
223+
231224
/**
232225
* Versioned blob written to S3 after every turn completes (when no
233226
* `hydrateMessages` hook is registered). Read at run boot to seed the
@@ -5320,6 +5313,12 @@ function chatAgent<
53205313
const couldHavePriorState =
53215314
payload.continuation === true || ctx.attempt.number > 1;
53225315

5316+
// `.in` resume cursor, computed at most once per boot. The boot
5317+
// block below resolves it (snapshot field or records scan) and the
5318+
// resume-cursor block reuses it instead of re-scanning.
5319+
let bootInCursor: number | undefined;
5320+
let bootInCursorResolved = false;
5321+
53235322
if (!hydrateMessages && couldHavePriorState) {
53245323
// Single parent span for the whole boot read phase — snapshot
53255324
// read, session.out replay, session.in replay. Per-phase timing
@@ -5363,34 +5362,39 @@ function chatAgent<
53635362
}
53645363
}
53655364

5366-
// session.out replay
5367-
const replayOutStart = Date.now();
5368-
try {
5369-
const replayResult = await replaySessionOutTail<TUIMessage>(
5370-
sessionIdForSnapshot,
5371-
{ lastEventId: bootSnapshot?.lastOutEventId }
5365+
// The `.out` replay and the `.in` cursor + tail read are
5366+
// independent (both depend only on the snapshot) — run them
5367+
// concurrently. Each phase keeps its own catch + duration
5368+
// attribute.
5369+
const replayOutPhase = async () => {
5370+
const replayOutStart = Date.now();
5371+
try {
5372+
const replayResult = await replaySessionOutTail<TUIMessage>(
5373+
sessionIdForSnapshot,
5374+
{ lastEventId: bootSnapshot?.lastOutEventId }
5375+
);
5376+
replayedSettled = replayResult.settled;
5377+
replayedPartial = replayResult.partial;
5378+
replayedPartialRaw = replayResult.partialRaw;
5379+
} catch (error) {
5380+
logger.warn(
5381+
"chat.agent: session.out replay failed; using snapshot only",
5382+
{
5383+
error: error instanceof Error ? error.message : String(error),
5384+
sessionId: sessionIdForSnapshot,
5385+
}
5386+
);
5387+
}
5388+
bootSpan.setAttribute(
5389+
"chat.boot.replay.out.durationMs",
5390+
Date.now() - replayOutStart
53725391
);
5373-
replayedSettled = replayResult.settled;
5374-
replayedPartial = replayResult.partial;
5375-
replayedPartialRaw = replayResult.partialRaw;
5376-
} catch (error) {
5377-
logger.warn(
5378-
"chat.agent: session.out replay failed; using snapshot only",
5379-
{
5380-
error: error instanceof Error ? error.message : String(error),
5381-
sessionId: sessionIdForSnapshot,
5382-
}
5392+
bootSpan.setAttribute("chat.boot.replay.out.settledCount", replayedSettled.length);
5393+
bootSpan.setAttribute(
5394+
"chat.boot.replay.out.partialPresent",
5395+
replayedPartial !== undefined
53835396
);
5384-
}
5385-
bootSpan.setAttribute(
5386-
"chat.boot.replay.out.durationMs",
5387-
Date.now() - replayOutStart
5388-
);
5389-
bootSpan.setAttribute("chat.boot.replay.out.settledCount", replayedSettled.length);
5390-
bootSpan.setAttribute(
5391-
"chat.boot.replay.out.partialPresent",
5392-
replayedPartial !== undefined
5393-
);
5397+
};
53945398

53955399
// session.in tail read
53965400
//
@@ -5402,28 +5406,49 @@ function chatAgent<
54025406
// visible via the live SSE subscription — by which point they
54035407
// would arrive AFTER the partial-assistant orphan and look like
54045408
// brand-new turns to the model, producing inverted chains.
5405-
const replayInStart = Date.now();
5406-
const lastInEventId = await findLatestSessionInCursor(payload.chatId)
5407-
.then((cursor) => (cursor !== undefined ? String(cursor) : undefined))
5408-
.catch(() => undefined);
5409-
try {
5410-
replayedInTail = await replaySessionInTail<TUIMessage>(payload.chatId, {
5411-
lastEventId: lastInEventId,
5412-
});
5413-
} catch (error) {
5414-
logger.warn(
5415-
"chat.agent: session.in replay failed; in-flight users may not be recovered",
5416-
{ error: error instanceof Error ? error.message : String(error) }
5409+
//
5410+
// The cursor comes from the snapshot when present (written
5411+
// there since `lastInEventId` was added) — otherwise from a
5412+
// records scan of `.out`'s latest turn-complete header.
5413+
const replayInPhase = async () => {
5414+
const replayInStart = Date.now();
5415+
const snapshotInCursor =
5416+
bootSnapshot?.lastInEventId !== undefined
5417+
? Number.parseInt(bootSnapshot.lastInEventId, 10)
5418+
: undefined;
5419+
if (snapshotInCursor !== undefined && Number.isFinite(snapshotInCursor)) {
5420+
bootInCursor = snapshotInCursor;
5421+
} else {
5422+
bootInCursor = await findLatestSessionInCursor(payload.chatId).catch(
5423+
() => undefined
5424+
);
5425+
}
5426+
bootInCursorResolved = true;
5427+
bootSpan.setAttribute(
5428+
"chat.boot.replay.in.cursorFromSnapshot",
5429+
snapshotInCursor !== undefined
54175430
);
5418-
}
5419-
bootSpan.setAttribute(
5420-
"chat.boot.replay.in.durationMs",
5421-
Date.now() - replayInStart
5422-
);
5423-
bootSpan.setAttribute(
5424-
"chat.boot.replay.in.userCount",
5425-
replayedInTail.length
5426-
);
5431+
try {
5432+
replayedInTail = await replaySessionInTail<TUIMessage>(payload.chatId, {
5433+
lastEventId: bootInCursor !== undefined ? String(bootInCursor) : undefined,
5434+
});
5435+
} catch (error) {
5436+
logger.warn(
5437+
"chat.agent: session.in replay failed; in-flight users may not be recovered",
5438+
{ error: error instanceof Error ? error.message : String(error) }
5439+
);
5440+
}
5441+
bootSpan.setAttribute(
5442+
"chat.boot.replay.in.durationMs",
5443+
Date.now() - replayInStart
5444+
);
5445+
bootSpan.setAttribute(
5446+
"chat.boot.replay.in.userCount",
5447+
replayedInTail.length
5448+
);
5449+
};
5450+
5451+
await Promise.all([replayOutPhase(), replayInPhase()]);
54275452
},
54285453
{
54295454
attributes: {
@@ -5469,7 +5494,12 @@ function chatAgent<
54695494

54705495
if (needsResumeCursor) {
54715496
try {
5472-
const cursor = await findLatestSessionInCursor(payload.chatId);
5497+
// Reuse the cursor the boot block already resolved (snapshot
5498+
// field or records scan) — only scan here when the boot block
5499+
// was skipped (hydrateMessages, or snapshot-only signals).
5500+
const cursor = bootInCursorResolved
5501+
? bootInCursor
5502+
: await findLatestSessionInCursor(payload.chatId);
54735503
if (cursor !== undefined) {
54745504
sessionStreams.setLastSeqNum(payload.chatId, "in", cursor);
54755505
sessionStreams.setLastDispatchedSeqNum(payload.chatId, "in", cursor);
@@ -7428,11 +7458,17 @@ function chatAgent<
74287458
await tracer.startActiveSpan(
74297459
"snapshot.write",
74307460
async () => {
7461+
const snapshotInCursor =
7462+
getChatSession().in.lastDispatchedSeqNum();
74317463
await writeChatSnapshot<TUIMessage>(sessionIdForSnapshot, {
74327464
version: 1,
74337465
savedAt: Date.now(),
74347466
messages: accumulatedUIMessages,
74357467
lastOutEventId: turnCompleteResult?.lastEventId,
7468+
lastInEventId:
7469+
snapshotInCursor !== undefined
7470+
? String(snapshotInCursor)
7471+
: undefined,
74367472
});
74377473
},
74387474
{
@@ -7687,11 +7723,17 @@ function chatAgent<
76877723
// neither the snapshot nor the replayable `.in` tail.
76887724
if (!hydrateMessages) {
76897725
try {
7726+
const errorSnapshotInCursor =
7727+
getChatSession().in.lastDispatchedSeqNum();
76907728
await writeChatSnapshot<TUIMessage>(sessionIdForSnapshot, {
76917729
version: 1,
76927730
savedAt: Date.now(),
76937731
messages: erroredUIMessages,
76947732
lastOutEventId: errorTurnCompleteResult?.lastEventId,
7733+
lastInEventId:
7734+
errorSnapshotInCursor !== undefined
7735+
? String(errorSnapshotInCursor)
7736+
: undefined,
76957737
});
76967738
} catch (error) {
76977739
logger.warn("chat.agent: error-path snapshot write failed", {

packages/trigger-sdk/test/mockChatAgent.test.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1889,6 +1889,11 @@ describe("mockChatAgent", () => {
18891889
// The snapshot reflects the post-turn accumulator: 1 user + 1 assistant.
18901890
const roles = snap!.messages.map((m) => m.role);
18911891
expect(roles).toEqual(["user", "assistant"]);
1892+
// `lastInEventId` stays undefined here: TestSessionStreamManager
1893+
// deliberately has no seq numbers, so the committed `.in` cursor
1894+
// the production write site reads is undefined in harness runs.
1895+
// The cursor round-trip is covered by the live smoke instead.
1896+
expect(snap!.lastInEventId).toBeUndefined();
18921897
} finally {
18931898
await harness.close();
18941899
}

0 commit comments

Comments
 (0)