diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/db-stress.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/db-stress.ts index 78a92b1fce..ebe07efb5f 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/db-stress.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/db-stress.ts @@ -12,6 +12,21 @@ export const dbStressActor = actor({ created_at INTEGER NOT NULL ) `); + await db.execute(` + CREATE TABLE IF NOT EXISTS stress_meta_kv ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + updated_at INTEGER NOT NULL + ) + `); + await db.execute(` + CREATE TABLE IF NOT EXISTS stress_payloads ( + id INTEGER PRIMARY KEY, + tag TEXT NOT NULL, + payload TEXT NOT NULL, + updated_at INTEGER NOT NULL + ) + `); }, }), actions: { @@ -29,6 +44,144 @@ export const dbStressActor = actor({ return { count }; }, + upsertMetaRows: async (c, count: number) => { + const normalizedCount = Math.max(0, Math.trunc(count)); + for (let i = 0; i < normalizedCount; i++) { + await c.db.execute( + "INSERT OR REPLACE INTO stress_meta_kv (key, value, updated_at) VALUES (?, ?, ?)", + `key-${i % 32}`, + `value-${i}`, + Date.now(), + ); + } + const results = await c.db.execute<{ count: number }>( + `SELECT COUNT(*) as count FROM stress_meta_kv`, + ); + return results[0].count; + }, + + kitchenSinkSmoke: async (c, rounds: number) => { + const normalizedRounds = Math.max(1, Math.trunc(rounds)); + const startedAt = Date.now(); + + await c.db.execute( + "INSERT OR REPLACE INTO stress_meta_kv (key, value, updated_at) VALUES (?, ?, ?)", + "started-at", + String(startedAt), + startedAt, + ); + + for (let round = 0; round < normalizedRounds; round++) { + const now = startedAt + round; + + await Promise.all([ + c.db.execute( + "INSERT OR REPLACE INTO stress_meta_kv (key, value, updated_at) VALUES (?, ?, ?)", + `parallel-key-${round % 17}`, + `parallel-value-${round}`, + now, + ), + c.db.execute( + "INSERT INTO stress_data (value, created_at) VALUES (?, ?)", + `parallel-data-${round}`, + now, + ), + c.db.execute<{ count: number }>( + "SELECT COUNT(*) as count FROM stress_meta_kv", + ), + c.db.execute>("PRAGMA page_count"), + ]); + + await c.db.execute("BEGIN"); + try { + await c.db.execute( + "INSERT INTO stress_data (value, created_at) VALUES (?, ?)", + `tx-data-${round}`, + now, + ); + await c.db.execute( + "INSERT OR REPLACE INTO stress_payloads (id, tag, payload, updated_at) VALUES (?, ?, ?, ?)", + round + 1, + `payload-${round}`, + "x".repeat(1024 + (round % 5) * 2048), + now, + ); + await c.db.execute("SAVEPOINT payload_patch"); + await c.db.execute( + "UPDATE stress_payloads SET payload = payload || ?, updated_at = ? WHERE id = ?", + `-patch-${round}`, + now + 1, + round + 1, + ); + await c.db.execute("ROLLBACK TO payload_patch"); + await c.db.execute("RELEASE payload_patch"); + await c.db.execute( + "UPDATE stress_meta_kv SET value = ?, updated_at = ? WHERE key = ?", + `tx-value-${round}`, + now + 2, + `parallel-key-${round % 17}`, + ); + await c.db.execute("COMMIT"); + } catch (error) { + await c.db.execute("ROLLBACK"); + throw error; + } + + await c.db.execute("BEGIN"); + try { + await c.db.execute( + "INSERT INTO stress_data (value, created_at) VALUES (?, ?)", + `rollback-data-${round}`, + now, + ); + await c.db.execute("ROLLBACK"); + } catch (error) { + await c.db.execute("ROLLBACK"); + throw error; + } + + if (round % 3 === 0) { + await c.db.execute( + "DELETE FROM stress_data WHERE id IN (SELECT id FROM stress_data WHERE value LIKE 'parallel-data-%' ORDER BY id LIMIT 1)", + ); + } + } + + await c.db.execute( + "INSERT OR REPLACE INTO stress_meta_kv (key, value, updated_at) VALUES (?, ?, ?)", + "completed-rounds", + String(normalizedRounds), + Date.now(), + ); + await c.db.execute("DELETE FROM stress_payloads WHERE id % 11 = 0"); + await c.db.execute("VACUUM"); + + const [metaRows, dataRows, payloadRows, pageRows, integrityRows] = + await Promise.all([ + c.db.execute<{ count: number }>( + "SELECT COUNT(*) as count FROM stress_meta_kv", + ), + c.db.execute<{ count: number }>( + "SELECT COUNT(*) as count FROM stress_data", + ), + c.db.execute<{ count: number }>( + "SELECT COUNT(*) as count FROM stress_payloads", + ), + c.db.execute>("PRAGMA page_count"), + c.db.execute>( + "PRAGMA integrity_check", + ), + ]); + + return { + metaCount: metaRows[0]?.count ?? 0, + dataCount: dataRows[0]?.count ?? 0, + payloadCount: payloadRows[0]?.count ?? 0, + pageCount: Number(Object.values(pageRows[0] ?? {})[0] ?? 0), + integrity: String(Object.values(integrityRows[0] ?? {})[0] ?? ""), + }; + }, + getCount: async (c) => { const results = await c.db.execute<{ count: number }>( `SELECT COUNT(*) as count FROM stress_data`, @@ -91,6 +244,8 @@ export const dbStressActor = actor({ reset: async (c) => { await c.db.execute(`DELETE FROM stress_data`); + await c.db.execute(`DELETE FROM stress_meta_kv`); + await c.db.execute(`DELETE FROM stress_payloads`); }, destroy: (c) => { diff --git a/rivetkit-typescript/packages/rivetkit/tests/driver/actor-db-stress.test.ts b/rivetkit-typescript/packages/rivetkit/tests/driver/actor-db-stress.test.ts index 2a69b3ed7d..2dfd579287 100644 --- a/rivetkit-typescript/packages/rivetkit/tests/driver/actor-db-stress.test.ts +++ b/rivetkit-typescript/packages/rivetkit/tests/driver/actor-db-stress.test.ts @@ -1,9 +1,27 @@ import { describeDriverMatrix } from "./shared-matrix"; import { describe, expect, test, vi } from "vitest"; -import { setupDriverTest } from "./shared-utils"; +import { setupDriverTest, waitFor } from "./shared-utils"; const STRESS_TEST_TIMEOUT_MS = 60_000; +const KITCHEN_SINK_TEST_TIMEOUT_MS = 120_000; const ACTOR_READY_TIMEOUT_MS = 15_000; +const RUNTIME_LOG_TAIL_CHARS = 20_000; + +async function withRuntimeLogTail( + getRuntimeOutput: () => string, + fn: () => Promise, +): Promise { + try { + return await fn(); + } catch (error) { + const runtimeOutput = getRuntimeOutput(); + const runtimeTail = runtimeOutput.slice(-RUNTIME_LOG_TAIL_CHARS); + if (error instanceof Error && runtimeTail) { + error.message = `${error.message}\n\nRuntime log tail:\n${runtimeTail}`; + } + throw error; + } +} /** * Stress and resilience tests for the SQLite database subsystem. @@ -64,7 +82,10 @@ describeDriverMatrix("Actor Db Stress", (driverTestConfig) => { test( "rapid create-insert-destroy cycles handle DB lifecycle correctly", async (c) => { - const { client } = await setupDriverTest(c, driverTestConfig); + const { client, getRuntimeOutput } = await setupDriverTest( + c, + driverTestConfig, + ); // Perform rapid cycles of create -> insert -> destroy. // This exercises the close_database path racing with @@ -78,7 +99,10 @@ describeDriverMatrix("Actor Db Stress", (driverTestConfig) => { // Poll the first insert because the actor can still be starting when the initial DB action is sent. await vi.waitFor( async () => { - await getActor().insertBatch(10); + await withRuntimeLogTail( + getRuntimeOutput, + () => getActor().insertBatch(10), + ); }, { timeout: ACTOR_READY_TIMEOUT_MS, interval: 100 }, ); @@ -88,9 +112,13 @@ describeDriverMatrix("Actor Db Stress", (driverTestConfig) => { // through sleep teardown under the task model. await vi.waitFor( async () => { - const count = await client.dbStressActor - .getOrCreate(actorKey) - .getCount(); + const count = await withRuntimeLogTail( + getRuntimeOutput, + () => + client.dbStressActor + .getOrCreate(actorKey) + .getCount(), + ); expect(count).toBeGreaterThanOrEqual(10); }, { timeout: ACTOR_READY_TIMEOUT_MS, interval: 100 }, @@ -106,7 +134,10 @@ describeDriverMatrix("Actor Db Stress", (driverTestConfig) => { test( "DB operations complete without excessive blocking", async (c) => { - const { client } = await setupDriverTest(c, driverTestConfig); + const { client, getRuntimeOutput } = await setupDriverTest( + c, + driverTestConfig, + ); const actorKey = [`stress-health-${crypto.randomUUID()}`]; @@ -117,9 +148,11 @@ describeDriverMatrix("Actor Db Stress", (driverTestConfig) => { // expected because the action itself runs on that loop. const health = await vi.waitFor( async () => - client.dbStressActor - .getOrCreate(actorKey) - .measureEventLoopHealth(100), + withRuntimeLogTail(getRuntimeOutput, () => + client.dbStressActor + .getOrCreate(actorKey) + .measureEventLoopHealth(100), + ), { timeout: ACTOR_READY_TIMEOUT_MS, interval: 100 }, ); @@ -132,14 +165,101 @@ describeDriverMatrix("Actor Db Stress", (driverTestConfig) => { // Poll the integrity check because the actor may still be finishing the prior async insert loop. const integrity = await vi.waitFor( async () => - client.dbStressActor - .getOrCreate(actorKey) - .integrityCheck(), + withRuntimeLogTail(getRuntimeOutput, () => + client.dbStressActor + .getOrCreate(actorKey) + .integrityCheck(), + ), { timeout: ACTOR_READY_TIMEOUT_MS, interval: 100 }, ); expect(integrity.toLowerCase()).toBe("ok"); }, STRESS_TEST_TIMEOUT_MS, ); + + test( + "repeated autocommit upserts keep sqlite head txid consistent", + async (c) => { + const { client, getRuntimeOutput } = await setupDriverTest( + c, + driverTestConfig, + ); + const actor = client.dbStressActor.getOrCreate([ + `stress-autocommit-upsert-${crypto.randomUUID()}`, + ]); + + await actor.reset(); + + const count = await withRuntimeLogTail( + getRuntimeOutput, + () => actor.upsertMetaRows(240), + ); + expect(count).toBe(32); + + const integrity = await withRuntimeLogTail( + getRuntimeOutput, + () => actor.integrityCheck(), + ); + expect(integrity.toLowerCase()).toBe("ok"); + }, + STRESS_TEST_TIMEOUT_MS, + ); + + test( + "kitchen sink sqlite smoke survives write churn and wake", + async (c) => { + const { client, getRuntimeOutput } = await setupDriverTest( + c, + driverTestConfig, + ); + const actor = client.dbStressActor.getOrCreate([ + `stress-kitchen-sink-${crypto.randomUUID()}`, + ]); + + await actor.reset(); + + const first = await withRuntimeLogTail( + getRuntimeOutput, + () => actor.kitchenSinkSmoke(320), + ); + expect(first.metaCount).toBeGreaterThanOrEqual(19); + expect(first.dataCount).toBeGreaterThan(0); + expect(first.payloadCount).toBeGreaterThan(0); + expect(first.pageCount).toBeGreaterThan(0); + expect(first.integrity.toLowerCase()).toBe("ok"); + + const burst = await withRuntimeLogTail(getRuntimeOutput, () => + Promise.all([ + actor.upsertMetaRows(320), + actor.kitchenSinkSmoke(96), + actor.upsertMetaRows(320), + ]), + ); + expect(burst[0]).toBeGreaterThanOrEqual(32); + expect(burst[1].integrity.toLowerCase()).toBe("ok"); + expect(burst[2]).toBeGreaterThanOrEqual(32); + + await actor.triggerSleep(); + await waitFor(driverTestConfig, 250); + + // Poll because the actor can still be in the stopping window after triggerSleep. + const afterWake = await vi.waitFor( + async () => + await withRuntimeLogTail( + getRuntimeOutput, + () => actor.kitchenSinkSmoke(96), + ), + { timeout: ACTOR_READY_TIMEOUT_MS, interval: 100 }, + ); + expect(afterWake.metaCount).toBeGreaterThanOrEqual( + first.metaCount, + ); + expect(afterWake.dataCount).toBeGreaterThan(first.dataCount); + expect(afterWake.payloadCount).toBeGreaterThan(0); + expect(afterWake.pageCount).toBeGreaterThan(0); + expect(afterWake.integrity.toLowerCase()).toBe("ok"); + }, + KITCHEN_SINK_TEST_TIMEOUT_MS, + ); }); }, { encodings: ["bare"] });