diff --git a/engine/sdks/rust/envoy-client/src/commands.rs b/engine/sdks/rust/envoy-client/src/commands.rs index 0e98fcd335..a1262af880 100644 --- a/engine/sdks/rust/envoy-client/src/commands.rs +++ b/engine/sdks/rust/envoy-client/src/commands.rs @@ -34,6 +34,7 @@ pub async fn handle_commands(ctx: &mut EnvoyContext, commands: Vec, pub last_command_idx: i64, + pub received_stop: bool, } pub enum ToEnvoyMessage { diff --git a/engine/sdks/rust/envoy-client/src/events.rs b/engine/sdks/rust/envoy-client/src/events.rs index 3408728791..313fec221f 100644 --- a/engine/sdks/rust/envoy-client/src/events.rs +++ b/engine/sdks/rust/envoy-client/src/events.rs @@ -10,6 +10,18 @@ pub async fn handle_send_events(ctx: &mut EnvoyContext, events: Vec checkpoint.index); - - gen_entry.event_history.is_empty() && gen_entry.handle.is_closed() - } else { - false - }; - - // Clean up fully acked stopped actors - if remove { - actor_entry.remove(&checkpoint.generation); - if actor_entry.is_empty() { - ctx.actors.remove(&checkpoint.actor_id); - } - } + let entry = ctx.get_actor_entry_mut(&checkpoint.actor_id, checkpoint.generation); + if let Some(entry) = entry { + entry + .event_history + .retain(|event| event.checkpoint.index > checkpoint.index); } } } +// TODO: If the envoy disconnects, actor stops, then envoy reconnects, we will send the stop event but there +// is no mechanism to remove the actor entry afterwards. We only remove the actor entry if rivet stops the actor. pub async fn resend_unacknowledged_events(ctx: &EnvoyContext) { let mut events: Vec = Vec::new(); diff --git a/rivetkit-typescript/packages/rivetkit-native/wrapper.js b/rivetkit-typescript/packages/rivetkit-native/wrapper.js index a264578561..8a01d4ed97 100644 --- a/rivetkit-typescript/packages/rivetkit-native/wrapper.js +++ b/rivetkit-typescript/packages/rivetkit-native/wrapper.js @@ -8,6 +8,18 @@ const native = require("./index"); +// CloseEvent was added to Node.js in v22. Polyfill for older versions. +if (typeof CloseEvent === "undefined") { + global.CloseEvent = class CloseEvent extends Event { + constructor(type, init = {}) { + super(type); + this.code = init.code ?? 0; + this.reason = init.reason ?? ""; + this.wasClean = init.wasClean ?? false; + } + }; +} + // Re-export protocol for consumers that need protocol types at runtime let _protocol; try { diff --git a/rivetkit-typescript/packages/rivetkit/src/serverless/router.ts b/rivetkit-typescript/packages/rivetkit/src/serverless/router.ts index 3989ef8e32..bb66730b79 100644 --- a/rivetkit-typescript/packages/rivetkit/src/serverless/router.ts +++ b/rivetkit-typescript/packages/rivetkit/src/serverless/router.ts @@ -75,7 +75,7 @@ export function buildServerlessRouter(config: RegistryConfig) { }; const runnerConfig: RegistryConfig = { ...sharedConfig, - token, + token: config.token ?? token, }; const clientConfig: RegistryConfig = { ...sharedConfig, diff --git a/rivetkit-typescript/packages/sqlite-native/Cargo.toml b/rivetkit-typescript/packages/sqlite-native/Cargo.toml index c038bbd962..9e3ba82c38 100644 --- a/rivetkit-typescript/packages/sqlite-native/Cargo.toml +++ b/rivetkit-typescript/packages/sqlite-native/Cargo.toml @@ -14,6 +14,3 @@ tokio = { version = "1", features = ["rt"] } tracing = "0.1" async-trait = "0.1" getrandom = "0.2" - -[profile.release] -lto = true diff --git a/scripts/tests/actor_spam.ts b/scripts/tests/actor_spam.ts deleted file mode 100755 index ec04a19f1e..0000000000 --- a/scripts/tests/actor_spam.ts +++ /dev/null @@ -1,148 +0,0 @@ -#!/usr/bin/env tsx - -import { - createActor, - destroyActor, - RIVET_ENDPOINT, - RIVET_NAMESPACE, - RIVET_TOKEN, -} from "./utils"; - -const ACTORS = parseInt(process.argv[2]) || 15; - -async function main() { - console.log(`Starting ${ACTORS} actor E2E tests...`); - - const promises = [...Array(ACTORS)].map((_, i) => testActor(i)); - - await Promise.all(promises); - - console.log("E2E test completed"); - - // HACK: This script does not exit by itself for some reason - process.exit(0); -} - -async function testActor(i: number) { - let actorId; - try { - // Create an actor - console.log(`Creating actor ${i}...`); - const actorResponse = await createActor(RIVET_NAMESPACE, "test-runner", false); - console.log("Actor created:", actorResponse.actor); - - actorId = actorResponse.actor.actor_id; - - // Make a request to the actor - console.log(`Making request to actor ${i}...`); - const actorPingResponse = await fetch(`${RIVET_ENDPOINT}/ping`, { - method: "GET", - headers: { - "X-Rivet-Token": RIVET_TOKEN, - "X-Rivet-Target": "actor", - "X-Rivet-Actor": actorResponse.actor.actor_id, - }, - }); - - const pingResult = await actorPingResponse.text(); - - if (!actorPingResponse.ok) { - throw new Error( - `Failed to ping actor ${i}: ${actorPingResponse.status} ${actorPingResponse.statusText}\n${pingResult}`, - ); - } - - console.log(`Actor ${i} ping response:`, pingResult); - - await testWebSocket(actorResponse.actor.actor_id); - } catch (error) { - console.error(`Actor ${i} test failed:`, error); - } finally { - if (actorId) { - console.log(`Destroying actor ${i}...`); - await destroyActor(RIVET_NAMESPACE, actorId); - } - } -} - -function testWebSocket(actorId: string): Promise { - console.log(`Testing WebSocket connection to actor ${actorId}...`); - - return new Promise((resolve, reject) => { - // Parse the RIVET_ENDPOINT to get WebSocket URL - const wsEndpoint = RIVET_ENDPOINT.replace("http://", "ws://").replace( - "https://", - "wss://", - ); - const wsUrl = `${wsEndpoint}/ws`; - - console.log(`Connecting WebSocket to: ${wsUrl}`); - - const protocols = [ - "rivet", - "rivet_target.actor", - `rivet_actor.${actorId}`, - `rivet_token.${RIVET_TOKEN}`, - ]; - const ws = new WebSocket(wsUrl, protocols); - - let pingReceived = false; - let echoReceived = false; - const timeout = setTimeout(() => { - console.log( - "No websocket response received within timeout, but connection was established", - ); - // Connection was established, that's enough for the test - ws.close(); - resolve(); - }, 2000); - - ws.addEventListener("open", () => { - console.log("WebSocket connected"); - - // Test ping-pong - console.log("Sending 'ping' message..."); - ws.send("ping"); - }); - - ws.addEventListener("message", (ev) => { - const message = ev.data.toString(); - console.log(`WebSocket received raw data:`, ev.data); - console.log(`WebSocket received message: "${message}"`); - - if ( - (message === "Echo: ping" || message === "pong") && - !pingReceived - ) { - pingReceived = true; - console.log("Ping test successful!"); - - // Test echo - console.log("Sending 'hello' message..."); - ws.send("hello"); - } else if (message === "Echo: hello" && !echoReceived) { - echoReceived = true; - console.log("Echo test successful!"); - - // All tests passed - clearTimeout(timeout); - ws.close(); - resolve(); - } - }); - - ws.addEventListener("error", (error) => { - clearTimeout(timeout); - reject(new Error(`WebSocket error: ${(error as any)?.message || "Unknown error"}`)); - }); - - ws.addEventListener("close", () => { - clearTimeout(timeout); - if (!pingReceived || !echoReceived) { - reject(new Error("WebSocket closed before completing tests")); - } - }); - }); -} - -main(); diff --git a/scripts/tests/load-test/.gitignore b/scripts/tests/load-test/.gitignore deleted file mode 100644 index 01c98168e1..0000000000 --- a/scripts/tests/load-test/.gitignore +++ /dev/null @@ -1,6 +0,0 @@ -# k6 output files -results.json -*.log - -# k6 cloud results -.k6cloud/ diff --git a/scripts/tests/load-test/QUICKSTART.md b/scripts/tests/load-test/QUICKSTART.md deleted file mode 100644 index 8bbc689d13..0000000000 --- a/scripts/tests/load-test/QUICKSTART.md +++ /dev/null @@ -1,147 +0,0 @@ -# Quick Start Guide - -Get started with Rivet Actor load testing in 5 minutes. - -## Prerequisites Check - -```bash -# 1. Check if k6 is installed -k6 version - -# If not installed: -# macOS: brew install k6 -# Linux: See README.md for installation instructions -# Windows: choco install k6 -``` - -## Step 1: Start Test Runner - -Open a terminal and start the test runner: - -```bash -cd engine/sdks/typescript/test-runner -pnpm install # First time only -pnpm dev -``` - -Keep this terminal open. You should see: -``` -Starting runner -Runner started -``` - -## Step 2: Run Your First Load Test - -Open a new terminal and run: - -```bash -cd scripts/tests -tsx load-test/run.ts --stages "1m:5" -``` - -This will: -- โœ“ Check if k6 is installed -- โœ“ Verify test runner is healthy -- ๐Ÿš€ Start a 1-minute test with 5 virtual users -- ๐Ÿ“Š Show real-time metrics -- โœ… Display summary with pass/fail thresholds - -## Step 3: View Results - -After the test completes, you'll see a summary like: - -``` - โœ“ actor_create_success.........: 98.50% โœ“ 197 โœ— 3 - โœ“ actor_destroy_success........: 99.00% โœ“ 198 โœ— 2 - โœ“ actor_ping_success...........: 98.00% โœ“ 196 โœ— 4 - โœ“ websocket_success............: 95.00% โœ“ 190 โœ— 10 - โœ“ http_req_duration............: avg=234ms min=120ms med=210ms max=890ms p(90)=345ms p(95)=456ms p(99)=678ms -``` - -โœ“ means the test passed all thresholds! - -## Next Steps - -### Run Different Test Scenarios - -```bash -# Quick test (30 seconds) -tsx load-test/run.ts --stages "30s:5" - -# Stress test (ramp up to 50 users) -tsx load-test/run.ts --stages "2m:10,5m:50,2m:0" - -# Save results to file -tsx load-test/run.ts --stages "1m:10" --out json=results.json -``` - -### Use pnpm Scripts - -```bash -cd scripts/tests - -# Quick 1-minute test -pnpm load-test:quick - -# Stress test -pnpm load-test:stress - -# Custom options -pnpm load-test -- --stages "2m:15" --quiet -``` - -### View All Options - -```bash -tsx load-test/run.ts --help -``` - -## Common Issues - -### "k6 is not installed" -Install k6: https://k6.io/docs/get-started/installation/ - -### "Test Runner Not Running" -Make sure the test runner is started: -```bash -cd engine/sdks/typescript/test-runner && pnpm dev -``` - -### High Failure Rates -- Reduce the number of VUs (e.g., `--stages "1m:3"`) -- Check test runner logs for errors -- Verify Rivet engine is running - -## Understanding the Output - -### Key Metrics - -- **actor_create_success**: % of actors successfully created -- **actor_destroy_success**: % of actors successfully destroyed -- **websocket_success**: % of successful WebSocket connections -- **http_req_duration**: HTTP request latency (p95 should be < 5s) -- **actor_create_duration**: Time to create an actor (p95 should be < 3s) - -### Thresholds - -Tests fail if: -- Actor operations < 95% success rate -- WebSocket connections < 90% success rate -- HTTP p95 latency > 5 seconds -- HTTP p99 latency > 10 seconds - -## What Gets Tested - -Each virtual user: -1. Creates a unique actor -2. Pings the actor via HTTP -3. Connects via WebSocket and exchanges messages -4. Puts the actor to sleep -5. Wakes the actor with a ping -6. Destroys the actor - -This tests the complete actor lifecycle under load. - -## More Information - -See [README.md](./README.md) for comprehensive documentation. diff --git a/scripts/tests/load-test/actor-lifecycle.js b/scripts/tests/load-test/actor-lifecycle.js deleted file mode 100644 index b26e81f52a..0000000000 --- a/scripts/tests/load-test/actor-lifecycle.js +++ /dev/null @@ -1,727 +0,0 @@ -import http from 'k6/http'; -import ws from 'k6/ws'; -import { check, sleep } from 'k6'; -import { Counter, Rate, Trend } from 'k6/metrics'; -import encoding from 'k6/encoding'; -import exec from 'k6/execution'; - -// Custom metrics for detailed tracking -const actorCreateSuccessRate = new Rate('actor_create_success'); -const actorDestroySuccessRate = new Rate('actor_destroy_success'); -const actorPingSuccessRate = new Rate('actor_ping_success'); -const actorSleepSuccessRate = new Rate('actor_sleep_success'); -const actorWakeSuccessRate = new Rate('actor_wake_success'); -const websocketSuccessRate = new Rate('websocket_success'); - -const actorCreateDuration = new Trend('actor_create_duration'); -const actorDestroyDuration = new Trend('actor_destroy_duration'); -const websocketMessageDuration = new Trend('websocket_message_duration'); - -const activeActors = new Counter('active_actors_count'); -const chattyRequestsCount = new Counter('chatty_requests_sent'); -const chattyMessagesCount = new Counter('chatty_websocket_messages_sent'); - -// Get environment variables with defaults -const RIVET_ENDPOINT = __ENV.RIVET_ENDPOINT || 'http://localhost:6420'; -const RIVET_TOKEN = __ENV.RIVET_TOKEN || 'dev'; -const RIVET_NAMESPACE = __ENV.RIVET_NAMESPACE || 'default'; -const RUNNER_NAME_SELECTOR = __ENV.RUNNER_NAME_SELECTOR || 'test-runner'; -const VARIATION = __ENV.VARIATION || 'sporadic'; // sporadic, idle, chatty -const VARIATION_DURATION = parseInt(__ENV.VARIATION_DURATION || '120'); // seconds - -// Calculate total stage duration for ramp-down detection -function calculateStageDuration(stagesStr) { - const stages = stagesStr.split(','); - let totalSeconds = 0; - - for (const stage of stages) { - const [duration] = stage.split(':'); - totalSeconds += parseDuration(duration); - } - - return totalSeconds; -} - -// Parse duration string (e.g., "1m", "30s", "1h30m") to seconds -function parseDuration(duration) { - let totalSeconds = 0; - let currentNumber = ''; - - for (let i = 0; i < duration.length; i++) { - const char = duration[i]; - - if (char >= '0' && char <= '9') { - currentNumber += char; - } else if (char === 'h') { - totalSeconds += parseInt(currentNumber || '0') * 60 * 60; - currentNumber = ''; - } else if (char === 'm') { - totalSeconds += parseInt(currentNumber || '0') * 60; - currentNumber = ''; - } else if (char === 's') { - totalSeconds += parseInt(currentNumber || '0'); - currentNumber = ''; - } - } - - // If there's a trailing number without unit, assume seconds - if (currentNumber) { - totalSeconds += parseInt(currentNumber); - } - - return totalSeconds; -} - -// Test configuration via environment variables -export const options = { - scenarios: { - actor_lifecycle: { - executor: __ENV.EXECUTOR || 'ramping-vus', - startVUs: parseInt(__ENV.START_VUS || '0'), - stages: parseStages(__ENV.STAGES || '1m:10,2m:20,1m:0'), - gracefulRampDown: __ENV.GRACEFUL_RAMPDOWN || '30s', - }, - }, - thresholds: { - 'actor_create_success': ['rate>0.95'], - 'actor_destroy_success': ['rate>0.95'], - 'actor_ping_success': ['rate>0.95'], - 'websocket_success': ['rate>0.90'], - }, - noConnectionReuse: false, - userAgent: 'k6-actor-lifecycle-test', -}; - -// Parse stages from string format: "1m:10,2m:20,1m:0" -function parseStages(stagesStr) { - return stagesStr.split(',').map(stage => { - const [duration, target] = stage.split(':'); - return { duration, target: parseInt(target) }; - }); -} - -// Generate a unique actor key for this VU and iteration -function generateActorKey() { - return `load-test-${__VU}-${__ITER}-${Date.now()}`; -} - -// Create an actor -function createActor() { - const key = generateActorKey(); - const startTime = Date.now(); - - const payload = JSON.stringify({ - name: 'load-test-actor', - key, - input: encoding.b64encode('load-test'), - runner_name_selector: RUNNER_NAME_SELECTOR, - crash_policy: 'sleep', - }); - - let response; - try { - response = http.post( - `${RIVET_ENDPOINT}/actors?namespace=${RIVET_NAMESPACE}`, - payload, - { - headers: { - 'Authorization': `Bearer ${RIVET_TOKEN}`, - 'Content-Type': 'application/json', - }, - tags: { name: 'create_actor' }, - } - ); - } catch (error) { - console.error(`[CreateActor] Request failed: ${error}`); - actorCreateSuccessRate.add(false); - return null; - } - - const duration = Date.now() - startTime; - actorCreateDuration.add(duration); - - const success = check(response, { - 'actor created': (r) => r.status === 200, - 'actor has id': (r) => { - try { - const body = JSON.parse(r.body); - return body.actor && body.actor.actor_id; - } catch (parseError) { - console.error(`[CreateActor] Failed to parse response: ${parseError}`); - return false; - } - }, - }); - - actorCreateSuccessRate.add(success); - - if (!success) { - console.error(`[CreateActor] Failed with status ${response.status}: ${response.body}`); - return null; - } - - const body = JSON.parse(response.body); - activeActors.add(1); - - return { - actorId: body.actor.actor_id, - key, - }; -} - -// Ping the actor via HTTP -function pingActor(actorId) { - let response; - try { - response = http.get( - `${RIVET_ENDPOINT}/ping`, - { - headers: { - 'X-Rivet-Token': RIVET_TOKEN, - 'X-Rivet-Target': 'actor', - 'X-Rivet-Actor': actorId, - }, - tags: { name: 'ping_actor' }, - } - ); - } catch (error) { - console.error(`[PingActor ${actorId}] Request failed: ${error}`); - actorPingSuccessRate.add(false); - return false; - } - - const success = check(response, { - 'ping successful': (r) => r.status === 200, - 'ping has response': (r) => r.body && r.body.length > 0, - }); - - actorPingSuccessRate.add(success); - - if (!success) { - console.error(`[PingActor ${actorId}] Failed with status ${response.status}: ${response.body}`); - } - - return success; -} - -// Test WebSocket connection to actor -function testWebSocket(actorId) { - const wsEndpoint = RIVET_ENDPOINT.replace('http://', 'ws://').replace('https://', 'wss://'); - const wsUrl = `${wsEndpoint}/gateway/${actorId}@${RIVET_TOKEN}/ws`; - - let success = false; - let messagesReceived = 0; - let messageSentAt = 0; - - let response; - try { - response = ws.connect(wsUrl, (socket) => { - socket.on('open', () => { - try { - // Send a ping message - messageSentAt = Date.now(); - socket.send('ping'); - } catch (error) { - console.error(`[WebSocket ${actorId}] Failed to send ping: ${error}`); - socket.close(); - } - }); - - socket.on('message', (data) => { - messagesReceived++; - if (messageSentAt > 0) { - const duration = Date.now() - messageSentAt; - websocketMessageDuration.add(duration); - } - - const message = data.toString(); - - if (message === 'Echo: ping' || message === 'pong') { - try { - // Send hello message - messageSentAt = Date.now(); - socket.send('hello'); - } catch (error) { - console.error(`[WebSocket ${actorId}] Failed to send hello: ${error}`); - socket.close(); - } - } else if (message === 'Echo: hello') { - success = true; - socket.close(); - } - }); - - socket.on('error', (e) => { - console.error(`[WebSocket ${actorId}] Socket error: ${e}`); - socket.close(); - }); - - // Set timeout to close connection if messages take too long - socket.setTimeout(() => { - if (messagesReceived === 0) { - console.error(`[WebSocket ${actorId}] Timeout - no messages received`); - } - socket.close(); - }, 5000); - }); - } catch (error) { - console.error(`[WebSocket ${actorId}] Connection failed: ${error}`); - websocketSuccessRate.add(false); - return false; - } - - const connected = check(response, { - 'websocket connected': (r) => r && r.status === 101, - }); - - if (!connected) { - console.error(`[WebSocket ${actorId}] Failed to connect - status: ${response?.status || 'unknown'}`); - } - - websocketSuccessRate.add(success); - return success; -} - -// Keep WebSocket open during idle period without sending messages -function idleWebSocket(actorId, durationSeconds) { - const wsEndpoint = RIVET_ENDPOINT.replace('http://', 'ws://').replace('https://', 'wss://'); - const wsUrl = `${wsEndpoint}/gateway/${actorId}@${RIVET_TOKEN}/ws`; - - let success = false; - let closedEarly = false; - - // Calculate how long until ramp-down starts - const stageDuration = calculateStageDuration(__ENV.STAGES || '1m:10,2m:20,1m:0'); - const elapsed = (Date.now() - exec.scenario.startTime) / 1000; - const timeUntilRampDown = Math.max(0, stageDuration - elapsed); - - // Use the shorter of: requested duration or time until ramp-down - const actualDuration = Math.min(durationSeconds, timeUntilRampDown); - const durationMs = actualDuration * 1000; - - if (actualDuration < durationSeconds) { - console.log(`[IdleWS ${actorId}] Limiting idle to ${Math.floor(actualDuration)}s (ramp-down starts soon)`); - } - - try { - const response = ws.connect(wsUrl, (socket) => { - socket.on('open', () => { - console.log(`[IdleWS ${actorId}] Connection opened, keeping idle for ${Math.floor(actualDuration)}s`); - success = true; - }); - - socket.on('message', (data) => { - // Log any messages received from the server during idle period - console.log(`[IdleWS ${actorId}] Received: ${data.toString()}`); - }); - - socket.on('error', (e) => { - console.error(`[IdleWS ${actorId}] Socket error: ${e}`); - closedEarly = true; - }); - - socket.on('close', () => { - if (closedEarly) { - console.warn(`[IdleWS ${actorId}] Connection closed early`); - } else { - console.log(`[IdleWS ${actorId}] Connection closed after ${Math.floor(actualDuration)}s`); - } - }); - - // Set timeout to close the connection after the actual duration - socket.setTimeout(() => { - console.log(`[IdleWS ${actorId}] Idle period complete, closing connection`); - socket.close(); - }, durationMs); - }); - - websocketSuccessRate.add(success); - } catch (error) { - console.error(`[IdleWS ${actorId}] Connection failed: ${error}`); - websocketSuccessRate.add(false); - } - - return success; -} - -// Sleep the actor -function sleepActor(actorId) { - let response; - try { - response = http.get( - `${RIVET_ENDPOINT}/sleep`, - { - headers: { - 'X-Rivet-Token': RIVET_TOKEN, - 'X-Rivet-Target': 'actor', - 'X-Rivet-Actor': actorId, - }, - tags: { name: 'sleep_actor' }, - } - ); - } catch (error) { - console.error(`[SleepActor ${actorId}] Request failed: ${error}`); - actorSleepSuccessRate.add(false); - return false; - } - - const success = check(response, { - 'sleep successful': (r) => r.status === 200, - }); - - actorSleepSuccessRate.add(success); - - if (!success) { - console.error(`[SleepActor ${actorId}] Failed with status ${response.status}: ${response.body}`); - } - - return success; -} - -// Wake the actor with a ping request -function wakeActor(actorId) { - let response; - try { - response = http.get( - `${RIVET_ENDPOINT}/ping`, - { - headers: { - 'X-Rivet-Token': RIVET_TOKEN, - 'X-Rivet-Target': 'actor', - 'X-Rivet-Actor': actorId, - }, - tags: { name: 'wake_actor' }, - } - ); - } catch (error) { - console.error(`[WakeActor ${actorId}] Request failed: ${error}`); - actorWakeSuccessRate.add(false); - return false; - } - - const success = check(response, { - 'wake successful': (r) => r.status === 200, - }); - - actorWakeSuccessRate.add(success); - - if (!success) { - console.error(`[WakeActor ${actorId}] Failed with status ${response.status}: ${response.body}`); - } - - return success; -} - -// Destroy the actor -function destroyActor(actorId) { - const startTime = Date.now(); - - let response; - try { - response = http.del( - `${RIVET_ENDPOINT}/actors/${actorId}?namespace=${RIVET_NAMESPACE}`, - null, - { - headers: { - 'Authorization': `Bearer ${RIVET_TOKEN}`, - }, - tags: { name: 'destroy_actor' }, - } - ); - } catch (error) { - console.error(`[DestroyActor ${actorId}] Request failed: ${error}`); - actorDestroySuccessRate.add(false); - return false; - } - - const duration = Date.now() - startTime; - actorDestroyDuration.add(duration); - - const success = check(response, { - 'actor destroyed': (r) => r.status === 200, - }); - - actorDestroySuccessRate.add(success); - - if (!success) { - console.error(`[DestroyActor ${actorId}] Failed with status ${response.status}: ${response.body}`); - } - - if (success) { - activeActors.add(-1); - } - - return success; -} - -// Main test function - executed by each VU -export default function () { - if (VARIATION === 'sporadic') { - runSporadicTest(); - } else if (VARIATION === 'idle') { - runIdleTest(); - } else if (VARIATION === 'chatty') { - runChattyTest(); - } else { - console.error(`Unknown variation: ${VARIATION}`); - } -} - -// Sporadic test - create, test, destroy immediately (original behavior) -function runSporadicTest() { - let actorId = null; - - try { - // 1. Create an actor - const actor = createActor(); - if (!actor) { - return; - } - actorId = actor.actorId; - - console.log(`[Sporadic] Created actor ${actor.key} ${actorId}`); - - // Small delay to let actor fully initialize - sleep(0.5); - - // 2. Ping the actor via HTTP - if (!pingActor(actorId)) { - console.warn(`Ping failed for actor ${actorId}`); - } - - // Small delay between operations - sleep(0.2); - - // 3. Test WebSocket connection - if (!testWebSocket(actorId)) { - console.warn(`WebSocket test failed for actor ${actorId}`); - } - - // Small delay between operations - sleep(0.2); - - // 4. Sleep the actor - if (!sleepActor(actorId)) { - console.warn(`Sleep failed for actor ${actorId}`); - } - - // Wait a bit while actor is sleeping - sleep(1); - - // 5. Wake the actor with a ping - if (!wakeActor(actorId)) { - console.warn(`Wake failed for actor ${actorId}`); - } - - // Small delay before destruction - sleep(0.2); - - // 6. Destroy the actor - if (!destroyActor(actorId)) { - console.error(`Failed to destroy actor ${actorId}`); - } - actorId = null; - - } catch (error) { - console.error(`Error in sporadic test: ${error}`); - } finally { - if (actorId) { - destroyActor(actorId); - } - } - - sleep(1); -} - -// Idle test - create, test, keep WebSocket open during sleep, then destroy -function runIdleTest() { - let actorId = null; - - try { - // 1. Create an actor - const actor = createActor(); - if (!actor) { - return; - } - actorId = actor.actorId; - - console.log(`[Idle] Created actor ${actor.key} ${actorId}`); - - // Small delay to let actor fully initialize - sleep(0.5); - - // 2. Basic lifecycle test - initial ping - pingActor(actorId); - sleep(0.2); - - // 3. Sleep the actor - if (!sleepActor(actorId)) { - console.warn(`Sleep failed for actor ${actorId}`); - } - - // Calculate how long until ramp-down starts - const stageDuration = calculateStageDuration(__ENV.STAGES || '1m:10,2m:20,1m:0'); - const elapsed = (Date.now() - exec.scenario.startTime) / 1000; - const timeUntilRampDown = Math.max(0, stageDuration - elapsed); - - // Use the shorter of: requested duration or time until ramp-down - const actualDuration = Math.min(VARIATION_DURATION, timeUntilRampDown); - - if (actualDuration === 0) { - console.log(`[Idle] Test in graceful ramp-down period, skipping idle period for graceful shutdown`); - } else { - if (actualDuration < VARIATION_DURATION) { - console.log(`[Idle] Limiting idle to ${Math.floor(actualDuration)}s (ramp-down starts soon)`); - } else { - console.log(`[Idle] Actor ${actorId} sleeping for ${VARIATION_DURATION}s with WebSocket kept alive`); - } - - // 4. Keep WebSocket open during the actual idle duration - idleWebSocket(actorId, actualDuration); - } - - // 5. Wake and destroy - wakeActor(actorId); - sleep(0.5); - - if (!destroyActor(actorId)) { - console.error(`Failed to destroy idle actor ${actorId}`); - } - actorId = null; - - } catch (error) { - console.error(`Error in idle test: ${error}`); - } finally { - if (actorId) { - destroyActor(actorId); - } - } - - sleep(1); -} - -// Chatty test - create, continuously send requests and websocket messages for duration -function runChattyTest() { - let actorId = null; - - try { - // 1. Create an actor - const actor = createActor(); - if (!actor) { - return; - } - actorId = actor.actorId; - - console.log(`[Chatty] Created actor ${actor.key} ${actorId}, will be chatty for ${VARIATION_DURATION}s`); - - // Small delay to let actor fully initialize - sleep(0.5); - - // Calculate how long until ramp-down starts - const stageDuration = calculateStageDuration(__ENV.STAGES || '1m:10,2m:20,1m:0'); - const elapsed = (Date.now() - exec.scenario.startTime) / 1000; - const timeUntilRampDown = Math.max(0, stageDuration - elapsed); - - // Use the shorter of: requested duration or time until ramp-down - const actualDuration = Math.min(VARIATION_DURATION, timeUntilRampDown); - - if (actualDuration === 0) { - console.log(`[Chatty] Test in graceful ramp-down period, skipping chatty period for graceful shutdown`); - } else { - if (actualDuration < VARIATION_DURATION) { - console.log(`[Chatty] Limiting chatty to ${Math.floor(actualDuration)}s (ramp-down starts soon)`); - } - - // 2. Be chatty for the actual duration - const endTime = Date.now() + (actualDuration * 1000); - let requestCount = 0; - - // Start a WebSocket connection and keep it open, sending messages - // We'll run this in parallel with HTTP requests - const wsEndpoint = RIVET_ENDPOINT.replace('http://', 'ws://').replace('https://', 'wss://'); - const wsUrl = `${wsEndpoint}/gateway/${actorId}@${RIVET_TOKEN}/ws`; - - // For chatty mode, we alternate between HTTP and WebSocket - while (Date.now() < endTime) { - - // Send HTTP ping - try { - const pingResponse = http.get( - `${RIVET_ENDPOINT}/ping`, - { - headers: { - 'X-Rivet-Token': RIVET_TOKEN, - 'X-Rivet-Target': 'actor', - 'X-Rivet-Actor': actorId, - }, - tags: { name: 'chatty_ping' }, - } - ); - - if (pingResponse.status === 200) { - chattyRequestsCount.add(1); - requestCount++; - } else { - console.error(`[Chatty ${actorId}] Ping failed with status ${pingResponse.status}: ${pingResponse.body}`); - } - } catch (error) { - console.error(`[Chatty ${actorId}] Ping request failed: ${error}`); - } - - // Send a few WebSocket messages - try { - ws.connect(wsUrl, (socket) => { - socket.on('open', () => { - try { - for (let i = 0; i < 3; i++) { - socket.send(`chatty-msg-${requestCount}-${i}`); - chattyMessagesCount.add(1); - } - socket.close(); - } catch (error) { - console.error(`[Chatty ${actorId}] Failed to send WS messages: ${error}`); - socket.close(); - } - }); - - socket.on('error', (e) => { - console.error(`[Chatty ${actorId}] WS error: ${e}`); - socket.close(); - }); - - socket.setTimeout(() => { - socket.close(); - }, 1000); - }); - } catch (error) { - console.error(`[Chatty ${actorId}] WS connection failed: ${error}`); - } - - // Small delay between bursts - sleep(0.5); - } - - console.log(`[Chatty] Actor ${actorId} sent ${requestCount} HTTP requests`); - } - - // 3. Destroy the actor - if (!destroyActor(actorId)) { - console.error(`Failed to destroy chatty actor ${actorId}`); - } - actorId = null; - - } catch (error) { - console.error(`Error in chatty test: ${error}`); - } finally { - if (actorId) { - destroyActor(actorId); - } - } - - sleep(1); -} - -// Teardown function - runs once at the end -export function teardown(data) { - console.log('Load test completed'); -} diff --git a/scripts/tests/load-test/run.ts b/scripts/tests/load-test/run.ts deleted file mode 100755 index 387af425c3..0000000000 --- a/scripts/tests/load-test/run.ts +++ /dev/null @@ -1,619 +0,0 @@ -#!/usr/bin/env tsx - -import { spawn } from "node:child_process"; -import { parseArgs } from "node:util"; - -const RIVET_ENDPOINT = process.env.RIVET_ENDPOINT ?? "http://localhost:6420"; -const RIVET_TOKEN = process.env.RIVET_TOKEN ?? "dev"; -const RIVET_NAMESPACE = process.env.RIVET_NAMESPACE ?? "default"; -const RUNNER_NAME_SELECTOR = - process.env.RUNNER_NAME_SELECTOR ?? "test-runner"; - -interface LoadTestConfig { - executor: string; - startVUs: number; - stages: string; - gracefulRampDown: string; - rivetEndpoint: string; - rivetToken: string; - rivetNamespace: string; - runnerNameSelector: string; - variation: string; - variationDuration: number; - out?: string; - summaryTrendStats: string; - quiet: boolean; - skipHealthCheck: boolean; - cleanup: boolean; -} - -// Parse command line arguments -function parseArguments(): LoadTestConfig { - const { values } = parseArgs({ - options: { - help: { type: "boolean", short: "h" }, - executor: { - type: "string", - default: "ramping-vus", - }, - "start-vus": { - type: "string", - default: "0", - }, - stages: { - type: "string", - default: "30s:5,1m:10,30s:0", - }, - "graceful-rampdown": { - type: "string", - default: "30s", - }, - endpoint: { - type: "string", - default: RIVET_ENDPOINT, - }, - token: { - type: "string", - default: RIVET_TOKEN, - }, - namespace: { - type: "string", - default: RIVET_NAMESPACE, - }, - runner: { - type: "string", - default: RUNNER_NAME_SELECTOR, - }, - variation: { - type: "string", - default: "sporadic", - }, - "variation-duration": { - type: "string", - default: "120", - }, - out: { - type: "string", - }, - "summary-trend-stats": { - type: "string", - default: "avg,min,med,max,p(90),p(95),p(99)", - }, - quiet: { - type: "boolean", - short: "q", - default: false, - }, - "skip-health-check": { - type: "boolean", - default: false, - }, - cleanup: { - type: "boolean", - default: false, - }, - }, - strict: true, - allowPositionals: false, - }); - - if (values.help) { - printHelp(); - process.exit(0); - } - - return { - executor: values.executor as string, - startVUs: parseInt(values["start-vus"] as string), - stages: values.stages as string, - gracefulRampDown: values["graceful-rampdown"] as string, - rivetEndpoint: values.endpoint as string, - rivetToken: values.token as string, - rivetNamespace: values.namespace as string, - runnerNameSelector: values.runner as string, - variation: values.variation as string, - variationDuration: parseInt(values["variation-duration"] as string), - out: values.out as string | undefined, - summaryTrendStats: values["summary-trend-stats"] as string, - quiet: values.quiet as boolean, - skipHealthCheck: values["skip-health-check"] as boolean, - cleanup: values.cleanup as boolean, - }; -} - -// Calculate total test duration from stages string -function calculateTestDuration(stagesStr: string): number { - const stages = stagesStr.split(","); - let totalMs = 0; - - for (const stage of stages) { - const [duration] = stage.split(":"); - totalMs += parseDuration(duration); - } - - return totalMs; -} - -// Parse duration string (e.g., "1m", "30s", "1h30m") to milliseconds -function parseDuration(duration: string): number { - let totalMs = 0; - let currentNumber = ""; - - for (let i = 0; i < duration.length; i++) { - const char = duration[i]; - - if (char >= "0" && char <= "9") { - currentNumber += char; - } else if (char === "h") { - totalMs += parseInt(currentNumber || "0") * 60 * 60 * 1000; - currentNumber = ""; - } else if (char === "m") { - totalMs += parseInt(currentNumber || "0") * 60 * 1000; - currentNumber = ""; - } else if (char === "s") { - totalMs += parseInt(currentNumber || "0") * 1000; - currentNumber = ""; - } - } - - // If there's a trailing number without unit, assume seconds - if (currentNumber) { - totalMs += parseInt(currentNumber) * 1000; - } - - return totalMs; -} - -function printHelp() { - console.log(` -Rivet Actor Lifecycle Load Test - -A comprehensive load testing tool for Rivet Actors using k6. Tests the complete -actor lifecycle including creation, HTTP routes, WebSocket connections, sleep/wake -cycles, and destruction. - -USAGE: - tsx run.ts [OPTIONS] - -PREREQUISITES: - 1. Install k6: https://k6.io/docs/get-started/installation/ - 2. Start/configure engine: --endpoint or cd engine/docker/dev && docker-compose up -d - 3. Start test-runner: cd engine/sdks/typescript/test-runner && pnpm start - -OPTIONS: - -h, --help Show this help message - - Load Test Configuration: - --executor k6 executor type (default: ramping-vus) - Options: ramping-vus, constant-vus, shared-iterations - --start-vus Initial number of virtual users (default: 0) - --stages Test stages in format "duration:target,..." - (default: "30s:5,1m:10,30s:0") - Example: "1m:10,2m:20,1m:0" means: - - Ramp up to 10 VUs over 1 minute - - Ramp up to 20 VUs over 2 minutes - - Ramp down to 0 VUs over 1 minute - --graceful-rampdown