Skip to content

Commit 498c0f7

Browse files
committed
test(sqlite): expand db stress coverage
1 parent 8705214 commit 498c0f7

2 files changed

Lines changed: 288 additions & 13 deletions

File tree

rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/db-stress.ts

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,21 @@ export const dbStressActor = actor({
1212
created_at INTEGER NOT NULL
1313
)
1414
`);
15+
await db.execute(`
16+
CREATE TABLE IF NOT EXISTS stress_meta_kv (
17+
key TEXT PRIMARY KEY,
18+
value TEXT NOT NULL,
19+
updated_at INTEGER NOT NULL
20+
)
21+
`);
22+
await db.execute(`
23+
CREATE TABLE IF NOT EXISTS stress_payloads (
24+
id INTEGER PRIMARY KEY,
25+
tag TEXT NOT NULL,
26+
payload TEXT NOT NULL,
27+
updated_at INTEGER NOT NULL
28+
)
29+
`);
1530
},
1631
}),
1732
actions: {
@@ -29,6 +44,144 @@ export const dbStressActor = actor({
2944
return { count };
3045
},
3146

47+
upsertMetaRows: async (c, count: number) => {
48+
const normalizedCount = Math.max(0, Math.trunc(count));
49+
for (let i = 0; i < normalizedCount; i++) {
50+
await c.db.execute(
51+
"INSERT OR REPLACE INTO stress_meta_kv (key, value, updated_at) VALUES (?, ?, ?)",
52+
`key-${i % 32}`,
53+
`value-${i}`,
54+
Date.now(),
55+
);
56+
}
57+
const results = await c.db.execute<{ count: number }>(
58+
`SELECT COUNT(*) as count FROM stress_meta_kv`,
59+
);
60+
return results[0].count;
61+
},
62+
63+
kitchenSinkSmoke: async (c, rounds: number) => {
64+
const normalizedRounds = Math.max(1, Math.trunc(rounds));
65+
const startedAt = Date.now();
66+
67+
await c.db.execute(
68+
"INSERT OR REPLACE INTO stress_meta_kv (key, value, updated_at) VALUES (?, ?, ?)",
69+
"started-at",
70+
String(startedAt),
71+
startedAt,
72+
);
73+
74+
for (let round = 0; round < normalizedRounds; round++) {
75+
const now = startedAt + round;
76+
77+
await Promise.all([
78+
c.db.execute(
79+
"INSERT OR REPLACE INTO stress_meta_kv (key, value, updated_at) VALUES (?, ?, ?)",
80+
`parallel-key-${round % 17}`,
81+
`parallel-value-${round}`,
82+
now,
83+
),
84+
c.db.execute(
85+
"INSERT INTO stress_data (value, created_at) VALUES (?, ?)",
86+
`parallel-data-${round}`,
87+
now,
88+
),
89+
c.db.execute<{ count: number }>(
90+
"SELECT COUNT(*) as count FROM stress_meta_kv",
91+
),
92+
c.db.execute<Record<string, unknown>>("PRAGMA page_count"),
93+
]);
94+
95+
await c.db.execute("BEGIN");
96+
try {
97+
await c.db.execute(
98+
"INSERT INTO stress_data (value, created_at) VALUES (?, ?)",
99+
`tx-data-${round}`,
100+
now,
101+
);
102+
await c.db.execute(
103+
"INSERT OR REPLACE INTO stress_payloads (id, tag, payload, updated_at) VALUES (?, ?, ?, ?)",
104+
round + 1,
105+
`payload-${round}`,
106+
"x".repeat(1024 + (round % 5) * 2048),
107+
now,
108+
);
109+
await c.db.execute("SAVEPOINT payload_patch");
110+
await c.db.execute(
111+
"UPDATE stress_payloads SET payload = payload || ?, updated_at = ? WHERE id = ?",
112+
`-patch-${round}`,
113+
now + 1,
114+
round + 1,
115+
);
116+
await c.db.execute("ROLLBACK TO payload_patch");
117+
await c.db.execute("RELEASE payload_patch");
118+
await c.db.execute(
119+
"UPDATE stress_meta_kv SET value = ?, updated_at = ? WHERE key = ?",
120+
`tx-value-${round}`,
121+
now + 2,
122+
`parallel-key-${round % 17}`,
123+
);
124+
await c.db.execute("COMMIT");
125+
} catch (error) {
126+
await c.db.execute("ROLLBACK");
127+
throw error;
128+
}
129+
130+
await c.db.execute("BEGIN");
131+
try {
132+
await c.db.execute(
133+
"INSERT INTO stress_data (value, created_at) VALUES (?, ?)",
134+
`rollback-data-${round}`,
135+
now,
136+
);
137+
await c.db.execute("ROLLBACK");
138+
} catch (error) {
139+
await c.db.execute("ROLLBACK");
140+
throw error;
141+
}
142+
143+
if (round % 3 === 0) {
144+
await c.db.execute(
145+
"DELETE FROM stress_data WHERE id IN (SELECT id FROM stress_data WHERE value LIKE 'parallel-data-%' ORDER BY id LIMIT 1)",
146+
);
147+
}
148+
}
149+
150+
await c.db.execute(
151+
"INSERT OR REPLACE INTO stress_meta_kv (key, value, updated_at) VALUES (?, ?, ?)",
152+
"completed-rounds",
153+
String(normalizedRounds),
154+
Date.now(),
155+
);
156+
await c.db.execute("DELETE FROM stress_payloads WHERE id % 11 = 0");
157+
await c.db.execute("VACUUM");
158+
159+
const [metaRows, dataRows, payloadRows, pageRows, integrityRows] =
160+
await Promise.all([
161+
c.db.execute<{ count: number }>(
162+
"SELECT COUNT(*) as count FROM stress_meta_kv",
163+
),
164+
c.db.execute<{ count: number }>(
165+
"SELECT COUNT(*) as count FROM stress_data",
166+
),
167+
c.db.execute<{ count: number }>(
168+
"SELECT COUNT(*) as count FROM stress_payloads",
169+
),
170+
c.db.execute<Record<string, unknown>>("PRAGMA page_count"),
171+
c.db.execute<Record<string, unknown>>(
172+
"PRAGMA integrity_check",
173+
),
174+
]);
175+
176+
return {
177+
metaCount: metaRows[0]?.count ?? 0,
178+
dataCount: dataRows[0]?.count ?? 0,
179+
payloadCount: payloadRows[0]?.count ?? 0,
180+
pageCount: Number(Object.values(pageRows[0] ?? {})[0] ?? 0),
181+
integrity: String(Object.values(integrityRows[0] ?? {})[0] ?? ""),
182+
};
183+
},
184+
32185
getCount: async (c) => {
33186
const results = await c.db.execute<{ count: number }>(
34187
`SELECT COUNT(*) as count FROM stress_data`,
@@ -91,6 +244,8 @@ export const dbStressActor = actor({
91244

92245
reset: async (c) => {
93246
await c.db.execute(`DELETE FROM stress_data`);
247+
await c.db.execute(`DELETE FROM stress_meta_kv`);
248+
await c.db.execute(`DELETE FROM stress_payloads`);
94249
},
95250

96251
destroy: (c) => {

rivetkit-typescript/packages/rivetkit/tests/driver/actor-db-stress.test.ts

Lines changed: 133 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,27 @@
11
import { describeDriverMatrix } from "./shared-matrix";
22
import { describe, expect, test, vi } from "vitest";
3-
import { setupDriverTest } from "./shared-utils";
3+
import { setupDriverTest, waitFor } from "./shared-utils";
44

55
const STRESS_TEST_TIMEOUT_MS = 60_000;
6+
const KITCHEN_SINK_TEST_TIMEOUT_MS = 120_000;
67
const ACTOR_READY_TIMEOUT_MS = 15_000;
8+
const RUNTIME_LOG_TAIL_CHARS = 20_000;
9+
10+
async function withRuntimeLogTail<T>(
11+
getRuntimeOutput: () => string,
12+
fn: () => Promise<T>,
13+
): Promise<T> {
14+
try {
15+
return await fn();
16+
} catch (error) {
17+
const runtimeOutput = getRuntimeOutput();
18+
const runtimeTail = runtimeOutput.slice(-RUNTIME_LOG_TAIL_CHARS);
19+
if (error instanceof Error && runtimeTail) {
20+
error.message = `${error.message}\n\nRuntime log tail:\n${runtimeTail}`;
21+
}
22+
throw error;
23+
}
24+
}
725

826
/**
927
* Stress and resilience tests for the SQLite database subsystem.
@@ -64,7 +82,10 @@ describeDriverMatrix("Actor Db Stress", (driverTestConfig) => {
6482
test(
6583
"rapid create-insert-destroy cycles handle DB lifecycle correctly",
6684
async (c) => {
67-
const { client } = await setupDriverTest(c, driverTestConfig);
85+
const { client, getRuntimeOutput } = await setupDriverTest(
86+
c,
87+
driverTestConfig,
88+
);
6889

6990
// Perform rapid cycles of create -> insert -> destroy.
7091
// This exercises the close_database path racing with
@@ -78,7 +99,10 @@ describeDriverMatrix("Actor Db Stress", (driverTestConfig) => {
7899
// Poll the first insert because the actor can still be starting when the initial DB action is sent.
79100
await vi.waitFor(
80101
async () => {
81-
await getActor().insertBatch(10);
102+
await withRuntimeLogTail(
103+
getRuntimeOutput,
104+
() => getActor().insertBatch(10),
105+
);
82106
},
83107
{ timeout: ACTOR_READY_TIMEOUT_MS, interval: 100 },
84108
);
@@ -88,9 +112,13 @@ describeDriverMatrix("Actor Db Stress", (driverTestConfig) => {
88112
// through sleep teardown under the task model.
89113
await vi.waitFor(
90114
async () => {
91-
const count = await client.dbStressActor
92-
.getOrCreate(actorKey)
93-
.getCount();
115+
const count = await withRuntimeLogTail(
116+
getRuntimeOutput,
117+
() =>
118+
client.dbStressActor
119+
.getOrCreate(actorKey)
120+
.getCount(),
121+
);
94122
expect(count).toBeGreaterThanOrEqual(10);
95123
},
96124
{ timeout: ACTOR_READY_TIMEOUT_MS, interval: 100 },
@@ -106,7 +134,10 @@ describeDriverMatrix("Actor Db Stress", (driverTestConfig) => {
106134
test(
107135
"DB operations complete without excessive blocking",
108136
async (c) => {
109-
const { client } = await setupDriverTest(c, driverTestConfig);
137+
const { client, getRuntimeOutput } = await setupDriverTest(
138+
c,
139+
driverTestConfig,
140+
);
110141

111142
const actorKey = [`stress-health-${crypto.randomUUID()}`];
112143

@@ -117,9 +148,11 @@ describeDriverMatrix("Actor Db Stress", (driverTestConfig) => {
117148
// expected because the action itself runs on that loop.
118149
const health = await vi.waitFor(
119150
async () =>
120-
client.dbStressActor
121-
.getOrCreate(actorKey)
122-
.measureEventLoopHealth(100),
151+
withRuntimeLogTail(getRuntimeOutput, () =>
152+
client.dbStressActor
153+
.getOrCreate(actorKey)
154+
.measureEventLoopHealth(100),
155+
),
123156
{ timeout: ACTOR_READY_TIMEOUT_MS, interval: 100 },
124157
);
125158

@@ -132,14 +165,101 @@ describeDriverMatrix("Actor Db Stress", (driverTestConfig) => {
132165
// Poll the integrity check because the actor may still be finishing the prior async insert loop.
133166
const integrity = await vi.waitFor(
134167
async () =>
135-
client.dbStressActor
136-
.getOrCreate(actorKey)
137-
.integrityCheck(),
168+
withRuntimeLogTail(getRuntimeOutput, () =>
169+
client.dbStressActor
170+
.getOrCreate(actorKey)
171+
.integrityCheck(),
172+
),
138173
{ timeout: ACTOR_READY_TIMEOUT_MS, interval: 100 },
139174
);
140175
expect(integrity.toLowerCase()).toBe("ok");
141176
},
142177
STRESS_TEST_TIMEOUT_MS,
143178
);
179+
180+
test(
181+
"repeated autocommit upserts keep sqlite head txid consistent",
182+
async (c) => {
183+
const { client, getRuntimeOutput } = await setupDriverTest(
184+
c,
185+
driverTestConfig,
186+
);
187+
const actor = client.dbStressActor.getOrCreate([
188+
`stress-autocommit-upsert-${crypto.randomUUID()}`,
189+
]);
190+
191+
await actor.reset();
192+
193+
const count = await withRuntimeLogTail(
194+
getRuntimeOutput,
195+
() => actor.upsertMetaRows(240),
196+
);
197+
expect(count).toBe(32);
198+
199+
const integrity = await withRuntimeLogTail(
200+
getRuntimeOutput,
201+
() => actor.integrityCheck(),
202+
);
203+
expect(integrity.toLowerCase()).toBe("ok");
204+
},
205+
STRESS_TEST_TIMEOUT_MS,
206+
);
207+
208+
test(
209+
"kitchen sink sqlite smoke survives write churn and wake",
210+
async (c) => {
211+
const { client, getRuntimeOutput } = await setupDriverTest(
212+
c,
213+
driverTestConfig,
214+
);
215+
const actor = client.dbStressActor.getOrCreate([
216+
`stress-kitchen-sink-${crypto.randomUUID()}`,
217+
]);
218+
219+
await actor.reset();
220+
221+
const first = await withRuntimeLogTail(
222+
getRuntimeOutput,
223+
() => actor.kitchenSinkSmoke(320),
224+
);
225+
expect(first.metaCount).toBeGreaterThanOrEqual(19);
226+
expect(first.dataCount).toBeGreaterThan(0);
227+
expect(first.payloadCount).toBeGreaterThan(0);
228+
expect(first.pageCount).toBeGreaterThan(0);
229+
expect(first.integrity.toLowerCase()).toBe("ok");
230+
231+
const burst = await withRuntimeLogTail(getRuntimeOutput, () =>
232+
Promise.all([
233+
actor.upsertMetaRows(320),
234+
actor.kitchenSinkSmoke(96),
235+
actor.upsertMetaRows(320),
236+
]),
237+
);
238+
expect(burst[0]).toBeGreaterThanOrEqual(32);
239+
expect(burst[1].integrity.toLowerCase()).toBe("ok");
240+
expect(burst[2]).toBeGreaterThanOrEqual(32);
241+
242+
await actor.triggerSleep();
243+
await waitFor(driverTestConfig, 250);
244+
245+
// Poll because the actor can still be in the stopping window after triggerSleep.
246+
const afterWake = await vi.waitFor(
247+
async () =>
248+
await withRuntimeLogTail(
249+
getRuntimeOutput,
250+
() => actor.kitchenSinkSmoke(96),
251+
),
252+
{ timeout: ACTOR_READY_TIMEOUT_MS, interval: 100 },
253+
);
254+
expect(afterWake.metaCount).toBeGreaterThanOrEqual(
255+
first.metaCount,
256+
);
257+
expect(afterWake.dataCount).toBeGreaterThan(first.dataCount);
258+
expect(afterWake.payloadCount).toBeGreaterThan(0);
259+
expect(afterWake.pageCount).toBeGreaterThan(0);
260+
expect(afterWake.integrity.toLowerCase()).toBe("ok");
261+
},
262+
KITCHEN_SINK_TEST_TIMEOUT_MS,
263+
);
144264
});
145265
}, { encodings: ["bare"] });

0 commit comments

Comments
 (0)