Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { actor } from "rivetkit";

Check failure on line 1 in rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/db-stress.ts

View workflow job for this annotation

GitHub Actions / RivetKit / Quality Check

format

Formatter would have printed the following content:
import { db } from "@/common/database/mod";

export const dbStressActor = actor({
Expand All @@ -12,6 +12,21 @@
created_at INTEGER NOT NULL
)
`);
await db.execute(`
CREATE TABLE IF NOT EXISTS stress_meta_kv (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at INTEGER NOT NULL
)
`);
await db.execute(`
CREATE TABLE IF NOT EXISTS stress_payloads (
id INTEGER PRIMARY KEY,
tag TEXT NOT NULL,
payload TEXT NOT NULL,
updated_at INTEGER NOT NULL
)
`);
},
}),
actions: {
Expand All @@ -29,6 +44,144 @@
return { count };
},

upsertMetaRows: async (c, count: number) => {
const normalizedCount = Math.max(0, Math.trunc(count));
for (let i = 0; i < normalizedCount; i++) {
await c.db.execute(
"INSERT OR REPLACE INTO stress_meta_kv (key, value, updated_at) VALUES (?, ?, ?)",
`key-${i % 32}`,
`value-${i}`,
Date.now(),
);
}
const results = await c.db.execute<{ count: number }>(
`SELECT COUNT(*) as count FROM stress_meta_kv`,
);
return results[0].count;
},

kitchenSinkSmoke: async (c, rounds: number) => {
const normalizedRounds = Math.max(1, Math.trunc(rounds));
const startedAt = Date.now();

await c.db.execute(
"INSERT OR REPLACE INTO stress_meta_kv (key, value, updated_at) VALUES (?, ?, ?)",
"started-at",
String(startedAt),
startedAt,
);

for (let round = 0; round < normalizedRounds; round++) {
const now = startedAt + round;

await Promise.all([
c.db.execute(
"INSERT OR REPLACE INTO stress_meta_kv (key, value, updated_at) VALUES (?, ?, ?)",
`parallel-key-${round % 17}`,
`parallel-value-${round}`,
now,
),
c.db.execute(
"INSERT INTO stress_data (value, created_at) VALUES (?, ?)",
`parallel-data-${round}`,
now,
),
c.db.execute<{ count: number }>(
"SELECT COUNT(*) as count FROM stress_meta_kv",
),
c.db.execute<Record<string, unknown>>("PRAGMA page_count"),
]);

await c.db.execute("BEGIN");
try {
await c.db.execute(
"INSERT INTO stress_data (value, created_at) VALUES (?, ?)",
`tx-data-${round}`,
now,
);
await c.db.execute(
"INSERT OR REPLACE INTO stress_payloads (id, tag, payload, updated_at) VALUES (?, ?, ?, ?)",
round + 1,
`payload-${round}`,
"x".repeat(1024 + (round % 5) * 2048),
now,
);
await c.db.execute("SAVEPOINT payload_patch");
await c.db.execute(
"UPDATE stress_payloads SET payload = payload || ?, updated_at = ? WHERE id = ?",
`-patch-${round}`,
now + 1,
round + 1,
);
await c.db.execute("ROLLBACK TO payload_patch");
await c.db.execute("RELEASE payload_patch");
await c.db.execute(
"UPDATE stress_meta_kv SET value = ?, updated_at = ? WHERE key = ?",
`tx-value-${round}`,
now + 2,
`parallel-key-${round % 17}`,
);
await c.db.execute("COMMIT");
} catch (error) {
await c.db.execute("ROLLBACK");
throw error;
}

await c.db.execute("BEGIN");
try {
await c.db.execute(
"INSERT INTO stress_data (value, created_at) VALUES (?, ?)",
`rollback-data-${round}`,
now,
);
await c.db.execute("ROLLBACK");
} catch (error) {
await c.db.execute("ROLLBACK");
throw error;
}

if (round % 3 === 0) {
await c.db.execute(
"DELETE FROM stress_data WHERE id IN (SELECT id FROM stress_data WHERE value LIKE 'parallel-data-%' ORDER BY id LIMIT 1)",
);
}
}

await c.db.execute(
"INSERT OR REPLACE INTO stress_meta_kv (key, value, updated_at) VALUES (?, ?, ?)",
"completed-rounds",
String(normalizedRounds),
Date.now(),
);
await c.db.execute("DELETE FROM stress_payloads WHERE id % 11 = 0");
await c.db.execute("VACUUM");

const [metaRows, dataRows, payloadRows, pageRows, integrityRows] =
await Promise.all([
c.db.execute<{ count: number }>(
"SELECT COUNT(*) as count FROM stress_meta_kv",
),
c.db.execute<{ count: number }>(
"SELECT COUNT(*) as count FROM stress_data",
),
c.db.execute<{ count: number }>(
"SELECT COUNT(*) as count FROM stress_payloads",
),
c.db.execute<Record<string, unknown>>("PRAGMA page_count"),
c.db.execute<Record<string, unknown>>(
"PRAGMA integrity_check",
),
]);

return {
metaCount: metaRows[0]?.count ?? 0,
dataCount: dataRows[0]?.count ?? 0,
payloadCount: payloadRows[0]?.count ?? 0,
pageCount: Number(Object.values(pageRows[0] ?? {})[0] ?? 0),
integrity: String(Object.values(integrityRows[0] ?? {})[0] ?? ""),
};
},

getCount: async (c) => {
const results = await c.db.execute<{ count: number }>(
`SELECT COUNT(*) as count FROM stress_data`,
Expand Down Expand Up @@ -91,6 +244,8 @@

reset: async (c) => {
await c.db.execute(`DELETE FROM stress_data`);
await c.db.execute(`DELETE FROM stress_meta_kv`);
await c.db.execute(`DELETE FROM stress_payloads`);
},

destroy: (c) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
import { describeDriverMatrix } from "./shared-matrix";
import { describe, expect, test, vi } from "vitest";
import { setupDriverTest } from "./shared-utils";
import { setupDriverTest, waitFor } from "./shared-utils";

const STRESS_TEST_TIMEOUT_MS = 60_000;
const KITCHEN_SINK_TEST_TIMEOUT_MS = 120_000;
const ACTOR_READY_TIMEOUT_MS = 15_000;
const RUNTIME_LOG_TAIL_CHARS = 20_000;

async function withRuntimeLogTail<T>(
getRuntimeOutput: () => string,
fn: () => Promise<T>,
): Promise<T> {
try {
return await fn();
} catch (error) {
const runtimeOutput = getRuntimeOutput();
const runtimeTail = runtimeOutput.slice(-RUNTIME_LOG_TAIL_CHARS);
if (error instanceof Error && runtimeTail) {
error.message = `${error.message}\n\nRuntime log tail:\n${runtimeTail}`;
}
throw error;
}
}

/**
* Stress and resilience tests for the SQLite database subsystem.
Expand Down Expand Up @@ -64,7 +82,10 @@ describeDriverMatrix("Actor Db Stress", (driverTestConfig) => {
test(
"rapid create-insert-destroy cycles handle DB lifecycle correctly",
async (c) => {
const { client } = await setupDriverTest(c, driverTestConfig);
const { client, getRuntimeOutput } = await setupDriverTest(
c,
driverTestConfig,
);

// Perform rapid cycles of create -> insert -> destroy.
// This exercises the close_database path racing with
Expand All @@ -78,7 +99,10 @@ describeDriverMatrix("Actor Db Stress", (driverTestConfig) => {
// Poll the first insert because the actor can still be starting when the initial DB action is sent.
await vi.waitFor(
async () => {
await getActor().insertBatch(10);
await withRuntimeLogTail(
getRuntimeOutput,
() => getActor().insertBatch(10),
);
},
{ timeout: ACTOR_READY_TIMEOUT_MS, interval: 100 },
);
Expand All @@ -88,9 +112,13 @@ describeDriverMatrix("Actor Db Stress", (driverTestConfig) => {
// through sleep teardown under the task model.
await vi.waitFor(
async () => {
const count = await client.dbStressActor
.getOrCreate(actorKey)
.getCount();
const count = await withRuntimeLogTail(
getRuntimeOutput,
() =>
client.dbStressActor
.getOrCreate(actorKey)
.getCount(),
);
expect(count).toBeGreaterThanOrEqual(10);
},
{ timeout: ACTOR_READY_TIMEOUT_MS, interval: 100 },
Expand All @@ -106,7 +134,10 @@ describeDriverMatrix("Actor Db Stress", (driverTestConfig) => {
test(
"DB operations complete without excessive blocking",
async (c) => {
const { client } = await setupDriverTest(c, driverTestConfig);
const { client, getRuntimeOutput } = await setupDriverTest(
c,
driverTestConfig,
);

const actorKey = [`stress-health-${crypto.randomUUID()}`];

Expand All @@ -117,9 +148,11 @@ describeDriverMatrix("Actor Db Stress", (driverTestConfig) => {
// expected because the action itself runs on that loop.
const health = await vi.waitFor(
async () =>
client.dbStressActor
.getOrCreate(actorKey)
.measureEventLoopHealth(100),
withRuntimeLogTail(getRuntimeOutput, () =>
client.dbStressActor
.getOrCreate(actorKey)
.measureEventLoopHealth(100),
),
{ timeout: ACTOR_READY_TIMEOUT_MS, interval: 100 },
);

Expand All @@ -132,14 +165,101 @@ describeDriverMatrix("Actor Db Stress", (driverTestConfig) => {
// Poll the integrity check because the actor may still be finishing the prior async insert loop.
const integrity = await vi.waitFor(
async () =>
client.dbStressActor
.getOrCreate(actorKey)
.integrityCheck(),
withRuntimeLogTail(getRuntimeOutput, () =>
client.dbStressActor
.getOrCreate(actorKey)
.integrityCheck(),
),
{ timeout: ACTOR_READY_TIMEOUT_MS, interval: 100 },
);
expect(integrity.toLowerCase()).toBe("ok");
},
STRESS_TEST_TIMEOUT_MS,
);

test(
"repeated autocommit upserts keep sqlite head txid consistent",
async (c) => {
const { client, getRuntimeOutput } = await setupDriverTest(
c,
driverTestConfig,
);
const actor = client.dbStressActor.getOrCreate([
`stress-autocommit-upsert-${crypto.randomUUID()}`,
]);

await actor.reset();

const count = await withRuntimeLogTail(
getRuntimeOutput,
() => actor.upsertMetaRows(240),
);
expect(count).toBe(32);

const integrity = await withRuntimeLogTail(
getRuntimeOutput,
() => actor.integrityCheck(),
);
expect(integrity.toLowerCase()).toBe("ok");
},
STRESS_TEST_TIMEOUT_MS,
);

test(
"kitchen sink sqlite smoke survives write churn and wake",
async (c) => {
const { client, getRuntimeOutput } = await setupDriverTest(
c,
driverTestConfig,
);
const actor = client.dbStressActor.getOrCreate([
`stress-kitchen-sink-${crypto.randomUUID()}`,
]);

await actor.reset();

const first = await withRuntimeLogTail(
getRuntimeOutput,
() => actor.kitchenSinkSmoke(320),
);
expect(first.metaCount).toBeGreaterThanOrEqual(19);
expect(first.dataCount).toBeGreaterThan(0);
expect(first.payloadCount).toBeGreaterThan(0);
expect(first.pageCount).toBeGreaterThan(0);
expect(first.integrity.toLowerCase()).toBe("ok");

const burst = await withRuntimeLogTail(getRuntimeOutput, () =>
Promise.all([
actor.upsertMetaRows(320),
actor.kitchenSinkSmoke(96),
actor.upsertMetaRows(320),
]),
);
expect(burst[0]).toBeGreaterThanOrEqual(32);
expect(burst[1].integrity.toLowerCase()).toBe("ok");
expect(burst[2]).toBeGreaterThanOrEqual(32);

await actor.triggerSleep();
await waitFor(driverTestConfig, 250);

// Poll because the actor can still be in the stopping window after triggerSleep.
const afterWake = await vi.waitFor(
async () =>
await withRuntimeLogTail(
getRuntimeOutput,
() => actor.kitchenSinkSmoke(96),
),
{ timeout: ACTOR_READY_TIMEOUT_MS, interval: 100 },
);
expect(afterWake.metaCount).toBeGreaterThanOrEqual(
first.metaCount,
);
expect(afterWake.dataCount).toBeGreaterThan(first.dataCount);
expect(afterWake.payloadCount).toBeGreaterThan(0);
expect(afterWake.pageCount).toBeGreaterThan(0);
expect(afterWake.integrity.toLowerCase()).toBe("ok");
},
KITCHEN_SINK_TEST_TIMEOUT_MS,
);
});
}, { encodings: ["bare"] });
Loading