-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathreplay-after-crash.test.ts
More file actions
317 lines (284 loc) · 13.2 KB
/
Copy pathreplay-after-crash.test.ts
File metadata and controls
317 lines (284 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
// Plan F.3: integration test for the crash-recovery boot path. The
// scenario it locks down:
//
// 1. Run A streams chunks to `session.out` and `onTurnComplete` fires.
// 2. Run A crashes BEFORE `writeChatSnapshot` lands the post-turn
// blob (or the write fails silently — both have the same effect).
// 3. Run B boots: `readChatSnapshot` returns `undefined` (no snapshot
// yet, or stale-from-prior-turn). Replay then drains
// `session.out` from the snapshot's `lastOutEventId` (or seq 0)
// and reduces the chunks back into UIMessage[].
// 4. The accumulator is consistent — Run A's completed chunks reach
// Run B's run loop without losing data.
//
// Plan section H.1 / H.4 spell out the "snapshot didn't make it before
// crash" path; this test is the integration safety net behind the
// unit tests in `packages/trigger-sdk/test/replay-session-out.test.ts`.
//
// We exercise the SDK's `__replaySessionOutTailProductionPathForTests`
// against a stubbed `apiClient.readSessionStreamRecords` — the new
// non-SSE records endpoint introduced in plan task #22. The replay path
// is a single GET that returns whatever's already on the stream; no
// long-poll. MinIO is provisioned to keep parity with
// `chat-snapshot-integration.test.ts` (the snapshot read path runs
// through it), even though the replay path itself doesn't read from S3.
import { postgresAndMinioTest } from "@internal/testcontainers";
import { apiClientManager } from "@trigger.dev/core/v3";
import {
__readChatSnapshotProductionPathForTests as readChatSnapshot,
__replaySessionOutTailProductionPathForTests as replaySessionOutTail,
type ChatSnapshotV1,
} from "@trigger.dev/sdk/ai";
import type { UIMessageChunk } from "ai";
import { afterEach, describe, expect, vi } from "vitest";
import { env } from "~/env.server";
import { generatePresignedUrl } from "~/v3/objectStore.server";
vi.setConfig({ testTimeout: 60_000 });
// ── Helpers ────────────────────────────────────────────────────────────
function textTurn(id: string, text: string): UIMessageChunk[] {
return [
{ type: "start", messageId: id, messageMetadata: { role: "assistant" } } as UIMessageChunk,
{ type: "text-start", id: `${id}.t1` } as UIMessageChunk,
{ type: "text-delta", id: `${id}.t1`, delta: text } as UIMessageChunk,
{ type: "text-end", id: `${id}.t1` } as UIMessageChunk,
{ type: "finish" } as UIMessageChunk,
];
}
/**
* Stub `apiClientManager.clientOrThrow()` so:
* - `getPayloadUrl` / `createUploadPayloadUrl` mint MinIO presigned URLs
* via the webapp's real `generatePresignedUrl` (so snapshot reads
* hit a real S3-compatible backend).
* - `readSessionStreamRecords` returns the canonical
* `{ records: [{ data, id, seqNum }] }` shape. `data` is the parsed
* chunk OBJECT — the SDK writer puts the chunk object directly into
* the record envelope and the webapp route forwards it as-is, so
* the schema now declares `data: z.unknown()` and consumers use it
* without an extra `JSON.parse` step.
*/
function stubApiClient(opts: {
projectRef: string;
envSlug: string;
sessionOutChunks: unknown[];
}) {
const records = opts.sessionOutChunks.map((chunk, i) => ({
data: chunk,
id: `evt-${i + 1}`,
seqNum: i + 1,
}));
const readRecordsSpy = vi.fn(
async (_id: string, _io: "in" | "out", _options?: { afterEventId?: string }) => ({
records,
})
);
vi.spyOn(apiClientManager, "clientOrThrow").mockReturnValue({
async getPayloadUrl(filename: string) {
const result = await generatePresignedUrl(opts.projectRef, opts.envSlug, filename, "GET");
if (!result.success) throw new Error(result.error);
return { presignedUrl: result.url };
},
async createUploadPayloadUrl(filename: string) {
const result = await generatePresignedUrl(opts.projectRef, opts.envSlug, filename, "PUT");
if (!result.success) throw new Error(result.error);
return { presignedUrl: result.url };
},
readSessionStreamRecords: readRecordsSpy,
} as never);
return readRecordsSpy;
}
let warnSpy: ReturnType<typeof vi.spyOn>;
afterEach(() => {
vi.restoreAllMocks();
warnSpy?.mockRestore();
});
// ── Tests ──────────────────────────────────────────────────────────────
describe("replay after crash (MinIO + SDK helpers)", () => {
postgresAndMinioTest(
"boot reconstructs accumulator from session.out replay when no snapshot exists",
async ({ minioConfig }) => {
env.OBJECT_STORE_BASE_URL = minioConfig.baseUrl;
env.OBJECT_STORE_ACCESS_KEY_ID = minioConfig.accessKeyId;
env.OBJECT_STORE_SECRET_ACCESS_KEY = minioConfig.secretAccessKey;
env.OBJECT_STORE_REGION = minioConfig.region;
env.OBJECT_STORE_DEFAULT_PROTOCOL = undefined;
warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
// The crashed run's session.out: two completed assistant turns, no
// snapshot ever written. Boot must recover both via replay.
const chunks = [...textTurn("a-1", "first turn"), ...textTurn("a-2", "second turn")];
stubApiClient({
projectRef: "proj_replay_crash",
envSlug: "dev",
sessionOutChunks: chunks,
});
// Step 1: read snapshot — returns undefined (fresh boot, no snap).
const snapshot = await readChatSnapshot("sess_no_snap");
expect(snapshot).toBeUndefined();
// Step 2: replay tail.
const replayed = await replaySessionOutTail("sess_no_snap");
expect(replayed).toHaveLength(2);
expect(replayed.map((m) => m.id)).toEqual(["a-1", "a-2"]);
const texts = replayed.flatMap((m) =>
(m.parts as Array<{ type: string; text?: string }>)
.filter((p) => p.type === "text")
.map((p) => p.text)
);
expect(texts).toEqual(["first turn", "second turn"]);
}
);
postgresAndMinioTest(
"boot replays only chunks AFTER snapshot.lastOutEventId (resume cursor)",
async ({ minioConfig }) => {
env.OBJECT_STORE_BASE_URL = minioConfig.baseUrl;
env.OBJECT_STORE_ACCESS_KEY_ID = minioConfig.accessKeyId;
env.OBJECT_STORE_SECRET_ACCESS_KEY = minioConfig.secretAccessKey;
env.OBJECT_STORE_REGION = minioConfig.region;
env.OBJECT_STORE_DEFAULT_PROTOCOL = undefined;
// The replay helper accepts the snapshot's `lastEventId` cursor
// and forwards it as `afterEventId` on the records endpoint —
// that's the cursor field name on the new non-SSE route. Here we
// feed only the post-snapshot chunks (modeling what the server
// returns for `afterEventId=evt-snapped`) and verify the helper
// threads the cursor through.
const readRecordsSpy = stubApiClient({
projectRef: "proj_replay_resume",
envSlug: "dev",
sessionOutChunks: textTurn("a-after-snap", "post-snapshot turn"),
});
const result = await replaySessionOutTail("sess_resume", { lastEventId: "evt-snapped" });
expect(readRecordsSpy).toHaveBeenCalledWith(
"sess_resume",
"out",
expect.objectContaining({ afterEventId: "evt-snapped" })
);
expect(result).toHaveLength(1);
expect(result[0]!.id).toBe("a-after-snap");
}
);
postgresAndMinioTest(
"boot returns [] when session.out is empty (first-ever turn, no snapshot)",
async ({ minioConfig }) => {
env.OBJECT_STORE_BASE_URL = minioConfig.baseUrl;
env.OBJECT_STORE_ACCESS_KEY_ID = minioConfig.accessKeyId;
env.OBJECT_STORE_SECRET_ACCESS_KEY = minioConfig.secretAccessKey;
env.OBJECT_STORE_REGION = minioConfig.region;
env.OBJECT_STORE_DEFAULT_PROTOCOL = undefined;
warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
stubApiClient({
projectRef: "proj_replay_empty",
envSlug: "dev",
sessionOutChunks: [],
});
const snapshot = await readChatSnapshot("sess_empty");
expect(snapshot).toBeUndefined();
const replayed = await replaySessionOutTail("sess_empty");
expect(replayed).toEqual([]);
}
);
postgresAndMinioTest(
"boot drops orphaned trailing tool parts (cleanupAbortedParts) — partial crash",
async ({ minioConfig }) => {
// Simulates a true mid-turn crash: assistant finished one turn,
// then started a tool-call but the run died before resolution.
// Replay must surface the completed turn but NOT include the
// orphaned tool part in `input-streaming` state.
env.OBJECT_STORE_BASE_URL = minioConfig.baseUrl;
env.OBJECT_STORE_ACCESS_KEY_ID = minioConfig.accessKeyId;
env.OBJECT_STORE_SECRET_ACCESS_KEY = minioConfig.secretAccessKey;
env.OBJECT_STORE_REGION = minioConfig.region;
env.OBJECT_STORE_DEFAULT_PROTOCOL = undefined;
stubApiClient({
projectRef: "proj_replay_partial",
envSlug: "dev",
sessionOutChunks: [
...textTurn("a-complete", "I finished step 1"),
// Partial tool turn — no tool-input-end, no finish.
{ type: "start", messageId: "a-orphan", messageMetadata: { role: "assistant" } } as UIMessageChunk,
{ type: "tool-input-start", id: "tc-cut", toolName: "search" } as UIMessageChunk,
{ type: "tool-input-delta", id: "tc-cut", delta: '{"q":"x"}' } as UIMessageChunk,
],
});
const replayed = await replaySessionOutTail("sess_partial_crash");
// Completed turn always present.
expect(replayed.find((m) => m.id === "a-complete")).toBeTruthy();
// Orphaned tool-call never surfaces in `input-streaming` state.
const orphan = replayed.find((m) => m.id === "a-orphan");
if (orphan) {
const stillStreaming = (orphan.parts as Array<{ toolCallId?: string; state?: string }>).find(
(p) => p.toolCallId === "tc-cut" && p.state === "input-streaming"
);
expect(stillStreaming).toBeUndefined();
}
}
);
postgresAndMinioTest(
"snapshot+replay merge: snapshot supplies user msgs, replay supplies assistants",
async ({ minioConfig }) => {
// The boot orchestration calls
// `mergeByIdReplaceWins(snapshot.messages, replayed)`. The runtime
// contract is that user messages live in snapshot only (session.in
// never goes through replay) and assistants come from replay
// (which carries the freshest representation). Here we simulate
// the realistic split: snapshot has [u-1, a-1-stale], replay has
// [a-1-fresh, a-2-new]. After merge the accumulator should reflect
// the fresh assistant + new assistant, with the user message
// preserved.
//
// Note: this is a pre-merge round-trip — we drive the read and
// replay through real MinIO + stubbed S2 to confirm both arrive
// intact for the orchestration to merge.
env.OBJECT_STORE_BASE_URL = minioConfig.baseUrl;
env.OBJECT_STORE_ACCESS_KEY_ID = minioConfig.accessKeyId;
env.OBJECT_STORE_SECRET_ACCESS_KEY = minioConfig.secretAccessKey;
env.OBJECT_STORE_REGION = minioConfig.region;
env.OBJECT_STORE_DEFAULT_PROTOCOL = undefined;
// Pre-write a snapshot to MinIO via real apiClient stub.
const sessionId = "sess_merge_round_trip";
const snapshot: ChatSnapshotV1 = {
version: 1,
savedAt: 1_700_000_000_000,
messages: [
{ id: "u-1", role: "user", parts: [{ type: "text", text: "hi" }] },
{ id: "a-1", role: "assistant", parts: [{ type: "text", text: "stale-assistant" }] },
],
lastOutEventId: "evt-prev",
};
// Use the SDK's own writer to lay the snapshot down, then swap
// the stub to also serve replay chunks for the read path.
stubApiClient({
projectRef: "proj_merge",
envSlug: "dev",
sessionOutChunks: [],
});
const { __writeChatSnapshotProductionPathForTests: writeSnapshot } = await import(
"@trigger.dev/sdk/ai"
);
await writeSnapshot(sessionId, snapshot);
// Restubbing for the boot phase: replay tail carries the fresh
// assistant for `a-1` plus a brand-new `a-2`. The orchestration's
// merge would replace `a-1` and append `a-2` after `u-1`.
vi.restoreAllMocks();
stubApiClient({
projectRef: "proj_merge",
envSlug: "dev",
sessionOutChunks: [
...textTurn("a-1", "fresh-assistant"),
...textTurn("a-2", "next-assistant"),
],
});
const readBack = await readChatSnapshot(sessionId);
expect(readBack?.messages.map((m) => m.id)).toEqual(["u-1", "a-1"]);
const replayed = await replaySessionOutTail(sessionId, {
lastEventId: readBack?.lastOutEventId,
});
expect(replayed.map((m) => m.id)).toEqual(["a-1", "a-2"]);
// Replay's `a-1` carries the fresh content — when merge runs in
// the runtime, this version would replace the snapshot's stale
// `a-1`.
const replayedA1Text = (replayed[0]!.parts as Array<{ type: string; text?: string }>)
.filter((p) => p.type === "text")
.map((p) => p.text)
.join("");
expect(replayedA1Text).toBe("fresh-assistant");
}
);
});