diff --git a/CLAUDE.md b/CLAUDE.md index f79f679850..88f9cfe954 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -22,18 +22,6 @@ The `rivet.gg` domain is deprecated and should never be used in this codebase. - Add a new versioned schema instead, then migrate `versioned.rs` and related compatibility code to bridge old versions forward. - When bumping the protocol version, update `PROTOCOL_MK2_VERSION` in `engine/packages/runner-protocol/src/lib.rs` and `PROTOCOL_VERSION` in `rivetkit-typescript/packages/engine-runner/src/mod.ts` together. Both must match the latest schema version. -**Keep the KV API in sync between the runner protocol and the KV channel protocol.** - -- The runner protocol (`engine/sdks/schemas/runner-protocol/`) and KV channel protocol (`engine/sdks/schemas/kv-channel-protocol/`) both expose KV operations. When adding, removing, or changing KV request/response types in one protocol, update the other to match. - -**Keep KV channel protocol versions in sync.** - -- When bumping the KV channel protocol version, update these two locations together: - - `engine/sdks/rust/kv-channel-protocol/src/lib.rs` (`PROTOCOL_VERSION`) - - `engine/sdks/rust/kv-channel-protocol/build.rs` (TypeScript `PROTOCOL_VERSION` in post-processing) -- All consumers (pegboard-kv-channel, sqlite-native, TS manager) get the version from the shared crate. -- The TypeScript SDK at `engine/sdks/typescript/kv-channel-protocol/src/index.ts` is auto-generated from the BARE schema during the Rust build. Do not edit it by hand. - ## Commands ### Build Commands @@ -274,9 +262,6 @@ let error_with_meta = ApiRateLimited { limit: 100, reset_at: 1234567890 }.build( - If you need to add a dependency and can't find it in the Cargo.toml of the workspace, add it to the workspace dependencies in Cargo.toml (`[workspace.dependencies]`) and then add it to the package you need with `{dependency}.workspace = true` **Native SQLite & KV Channel** -- Native SQLite (`rivetkit-typescript/packages/sqlite-native/`) is a napi-rs addon that statically links SQLite and uses a custom VFS backed by KV over a WebSocket KV channel. The WASM implementation (`@rivetkit/sqlite-vfs`) is the fallback. -- The KV channel (`engine/sdks/schemas/kv-channel-protocol/`) is independent of the runner protocol. It authenticates with `admin_token` (engine) or `config.token` (manager), not the runner key. -- The KV channel enforces single-writer locks per actor. Open/close are optimistic (no round-trip wait). - The native VFS uses the same 4 KiB chunk layout and KV key encoding as the WASM VFS. Data is compatible between backends. - **The native Rust VFS and the WASM TypeScript VFS must match 1:1.** This includes: KV key layout and encoding, chunk size, PRAGMA settings, VFS callback-to-KV-operation mapping, delete/truncate strategy (both must use `deleteRange`), and journal mode. When changing any VFS behavior in one implementation, update the other. The relevant files are: - Native: `rivetkit-typescript/packages/sqlite-native/src/vfs.rs`, `kv.rs` diff --git a/Cargo.toml b/Cargo.toml index 616ecde13a..98a13bf0f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,6 @@ members = [ "engine/packages/pegboard-envoy", "engine/packages/pegboard-gateway", "engine/packages/pegboard-gateway2", - "engine/packages/pegboard-kv-channel", "engine/packages/pegboard-outbound", "engine/packages/pegboard-runner", "engine/packages/pools", @@ -53,7 +52,6 @@ members = [ "engine/sdks/rust/data", "engine/sdks/rust/envoy-protocol", "engine/sdks/rust/epoxy-protocol", - "engine/sdks/rust/kv-channel-protocol", "engine/packages/runner-protocol", "engine/sdks/rust/test-envoy", "engine/sdks/typescript/test-envoy-native", @@ -436,9 +434,6 @@ members = [ [workspace.dependencies.pegboard-gateway2] path = "engine/packages/pegboard-gateway2" - [workspace.dependencies.pegboard-kv-channel] - path = "engine/packages/pegboard-kv-channel" - [workspace.dependencies.pegboard-outbound] path = "engine/packages/pegboard-outbound" @@ -506,9 +501,6 @@ members = [ [workspace.dependencies.epoxy-protocol] path = "engine/sdks/rust/epoxy-protocol" - [workspace.dependencies.rivet-kv-channel-protocol] - path = "engine/sdks/rust/kv-channel-protocol" - [workspace.dependencies.rivet-envoy-protocol] path = "engine/sdks/rust/envoy-protocol" diff --git a/engine/packages/guard/Cargo.toml b/engine/packages/guard/Cargo.toml index 61ff4b0190..0b550dcf4f 100644 --- a/engine/packages/guard/Cargo.toml +++ b/engine/packages/guard/Cargo.toml @@ -32,7 +32,6 @@ once_cell.workspace = true pegboard-envoy.workspace = true pegboard-gateway.workspace = true pegboard-gateway2.workspace = true -pegboard-kv-channel.workspace = true pegboard-runner.workspace = true pegboard.workspace = true regex.workspace = true diff --git a/engine/packages/guard/src/routing/kv_channel.rs b/engine/packages/guard/src/routing/kv_channel.rs deleted file mode 100644 index 7f56971b0a..0000000000 --- a/engine/packages/guard/src/routing/kv_channel.rs +++ /dev/null @@ -1,58 +0,0 @@ -use anyhow::*; -use gas::prelude::*; -use rivet_guard_core::{RoutingOutput, request_context::RequestContext}; -use std::sync::Arc; -use subtle::ConstantTimeEq; - -use super::validate_regional_host; - -/// Route requests to the KV channel service using path-based routing. -/// Matches path: /kv/connect -#[tracing::instrument(skip_all)] -pub async fn route_request_path_based( - ctx: &StandaloneCtx, - req_ctx: &RequestContext, - handler: &Arc, -) -> Result> { - let path_without_query = req_ctx.path().split('?').next().unwrap_or(req_ctx.path()); - if path_without_query != "/kv/connect" && path_without_query != "/kv/connect/" { - return Ok(None); - } - - tracing::debug!( - hostname = %req_ctx.hostname(), - path = %req_ctx.path(), - "routing to kv channel via path" - ); - - validate_regional_host(ctx, req_ctx)?; - - // Check auth (if enabled). - if let Some(auth) = &ctx.config().auth { - // Extract token from query params. - let url = url::Url::parse(&format!("ws://placeholder{}", req_ctx.path())) - .context("failed to parse URL for auth")?; - let token = url - .query_pairs() - .find(|(k, _)| k == "token") - .map(|(_, v)| v.to_string()) - .ok_or_else(|| { - crate::errors::MissingQueryParameter { - parameter: "token".to_string(), - } - .build() - })?; - - if token - .as_bytes() - .ct_ne(auth.admin_token.read().as_bytes()) - .into() - { - return Err(rivet_api_builder::ApiForbidden.build()); - } - - tracing::debug!("authenticated kv channel connection"); - } - - Ok(Some(RoutingOutput::CustomServe(handler.clone()))) -} diff --git a/engine/packages/guard/src/routing/mod.rs b/engine/packages/guard/src/routing/mod.rs index f8a26a7aca..5eede5eaa7 100644 --- a/engine/packages/guard/src/routing/mod.rs +++ b/engine/packages/guard/src/routing/mod.rs @@ -10,7 +10,6 @@ use crate::{errors, metrics, shared_state::SharedState}; pub mod actor_path; mod api_public; mod envoy; -mod kv_channel; pub mod pegboard_gateway; mod runner; mod ws_health; @@ -27,13 +26,9 @@ pub(crate) const WS_PROTOCOL_TOKEN: &str = "rivet_token."; #[tracing::instrument(skip_all)] pub fn create_routing_function(ctx: &StandaloneCtx, shared_state: SharedState) -> RoutingFn { let ctx = ctx.clone(); - let kv_channel_handler = Arc::new(pegboard_kv_channel::PegboardKvChannelCustomServe::new( - ctx.clone(), - )); Arc::new(move |req_ctx| { let ctx = ctx.with_ray(req_ctx.ray_id(), req_ctx.req_id()).unwrap(); let shared_state = shared_state.clone(); - let kv_channel_handler = kv_channel_handler.clone(); let hostname = req_ctx.hostname().to_string(); let path = req_ctx.path().to_string(); @@ -83,17 +78,6 @@ pub fn create_routing_function(ctx: &StandaloneCtx, shared_state: SharedState) - return Ok(routing_output); } - // Route KV channel - if let Some(routing_output) = - kv_channel::route_request_path_based(&ctx, req_ctx, &kv_channel_handler).await? - { - metrics::ROUTE_TOTAL - .with_label_values(&["kv_channel"]) - .inc(); - - return Ok(routing_output); - } - // MARK: Header- & protocol-based routing (X-Rivet-Target) // Determine target let target = if req_ctx.is_websocket() { diff --git a/engine/packages/pegboard-kv-channel/Cargo.toml b/engine/packages/pegboard-kv-channel/Cargo.toml deleted file mode 100644 index 5c0ef864e3..0000000000 --- a/engine/packages/pegboard-kv-channel/Cargo.toml +++ /dev/null @@ -1,35 +0,0 @@ -[package] -name = "pegboard-kv-channel" -version.workspace = true -authors.workspace = true -license.workspace = true -edition.workspace = true - -[dependencies] -anyhow.workspace = true -async-trait.workspace = true -lazy_static.workspace = true -bytes.workspace = true -futures-util.workspace = true -gas.workspace = true -http-body.workspace = true -http-body-util.workspace = true -# TODO: Make this use workspace version -hyper = "1.6" -hyper-tungstenite.workspace = true -rivet-config.workspace = true -rivet-error.workspace = true -rivet-guard-core.workspace = true -rivet-metrics.workspace = true -rivet-runtime.workspace = true -rivet-kv-channel-protocol.workspace = true -tokio.workspace = true -tokio-tungstenite.workspace = true -tracing.workspace = true -universaldb.workspace = true -url.workspace = true -uuid.workspace = true - -pegboard.workspace = true -namespace.workspace = true -util.workspace = true diff --git a/engine/packages/pegboard-kv-channel/src/lib.rs b/engine/packages/pegboard-kv-channel/src/lib.rs deleted file mode 100644 index 3a13b319c2..0000000000 --- a/engine/packages/pegboard-kv-channel/src/lib.rs +++ /dev/null @@ -1,804 +0,0 @@ -//! KV channel WebSocket handler for the engine. -//! -//! Serves the KV channel protocol at /kv/connect for native SQLite to route -//! page-level KV operations over WebSocket. See -//! docs-internal/engine/NATIVE_SQLITE_DATA_CHANNEL.md for the full spec. - -mod metrics; - -use std::collections::{HashMap, HashSet}; -use std::sync::Arc; -use std::sync::atomic::{AtomicI64, Ordering}; -use std::time::{Duration, Instant}; - -use anyhow::{Context, Result}; -use async_trait::async_trait; -use bytes::Bytes; -use futures_util::TryStreamExt; -use gas::prelude::*; -use http_body_util::Full; -use hyper::{Response, StatusCode}; -use hyper_tungstenite::tungstenite::Message; -use pegboard::actor_kv; -use rivet_guard_core::{ - ResponseBody, WebSocketHandle, custom_serve::CustomServeTrait, request_context::RequestContext, -}; -use tokio::sync::{Mutex, mpsc, watch}; -use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame; - -pub use rivet_kv_channel_protocol as protocol; - -use actor_kv::{MAX_KEY_SIZE, MAX_KEYS, MAX_PUT_PAYLOAD_SIZE, MAX_VALUE_SIZE}; - -/// Overhead added by KeyWrapper tuple packing (NESTED prefix byte + NIL suffix -/// byte). Must match `KeyWrapper::tuple_len` in -/// `engine/packages/pegboard/src/keys/actor_kv.rs`. -const KEY_WRAPPER_OVERHEAD: usize = 2; - -/// Maximum number of actors a single connection can have open simultaneously. -/// Prevents a malicious client from exhausting memory via unbounded actor_channels. -const MAX_ACTORS_PER_CONNECTION: usize = 1000; - -pub struct PegboardKvChannelCustomServe { - ctx: StandaloneCtx, -} - -impl PegboardKvChannelCustomServe { - pub fn new(ctx: StandaloneCtx) -> Self { - Self { ctx } - } -} - -#[async_trait] -impl CustomServeTrait for PegboardKvChannelCustomServe { - #[tracing::instrument(skip_all)] - async fn handle_request( - &self, - _req: hyper::Request>, - _req_ctx: &mut RequestContext, - ) -> Result> { - let response = Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "text/plain") - .body(ResponseBody::Full(Full::new(Bytes::from( - "kv-channel WebSocket endpoint", - ))))?; - Ok(response) - } - - #[tracing::instrument(skip_all)] - async fn handle_websocket( - &self, - req_ctx: &mut RequestContext, - ws_handle: WebSocketHandle, - _after_hibernation: bool, - ) -> Result> { - let ctx = self.ctx.with_ray(req_ctx.ray_id(), req_ctx.req_id())?; - - // Parse URL params. - let url = url::Url::parse(&format!("ws://placeholder{}", req_ctx.path())) - .context("failed to parse WebSocket URL")?; - - // Validate protocol version. - let protocol_version: u32 = url - .query_pairs() - .find_map(|(n, v)| (n == "protocol_version").then_some(v)) - .context("missing protocol_version query param")? - .parse() - .context("invalid protocol_version")?; - anyhow::ensure!( - protocol_version == protocol::PROTOCOL_VERSION, - "unsupported protocol version: {protocol_version}, expected {}", - protocol::PROTOCOL_VERSION - ); - - // Resolve namespace. - let namespace_name = url - .query_pairs() - .find_map(|(n, v)| (n == "namespace").then_some(v)) - .context("missing namespace query param")? - .to_string(); - let namespace = ctx - .op(namespace::ops::resolve_for_name_global::Input { - name: namespace_name.clone(), - }) - .await - .with_context(|| format!("failed to resolve namespace: {namespace_name}"))? - .ok_or_else(|| namespace::errors::Namespace::NotFound.build()) - .with_context(|| format!("namespace not found: {namespace_name}"))?; - - let namespace_id = namespace.namespace_id; - - tracing::info!(%namespace_id, "kv channel connection established"); - - // Track actors opened by this connection. - let open_actors: Arc>> = Arc::new(Mutex::new(HashSet::new())); - let last_pong_ts = Arc::new(AtomicI64::new(util::timestamp::now())); - - let result = run_connection( - ctx.clone(), - ws_handle, - namespace_id, - open_actors, - last_pong_ts, - ) - .await; - - tracing::info!("kv channel connection closed"); - - result.map(|_| None) - } -} - -// MARK: Connection lifecycle - -async fn run_connection( - ctx: StandaloneCtx, - ws_handle: WebSocketHandle, - namespace_id: Id, - open_actors: Arc>>, - last_pong_ts: Arc, -) -> Result<()> { - let ping_interval = - Duration::from_millis(ctx.config().pegboard().runner_update_ping_interval_ms()); - let ping_timeout_ms = ctx.config().pegboard().runner_ping_timeout_ms(); - - let (ping_abort_tx, ping_abort_rx) = watch::channel(()); - - // Spawn ping task. - let ping_ws = ws_handle.clone(); - let ping_last_pong = last_pong_ts.clone(); - let ping = tokio::spawn(async move { - ping_task( - ping_ws, - ping_last_pong, - ping_abort_rx, - ping_interval, - ping_timeout_ms, - ) - .await - }); - - // Run message loop. - let msg_result = message_loop( - &ctx, - &ws_handle, - namespace_id, - &open_actors, - &last_pong_ts, - ) - .await; - - // Signal ping task to stop and wait for it. - let _ = ping_abort_tx.send(()); - let _ = ping.await; - - msg_result -} - -// MARK: Ping task - -async fn ping_task( - ws_handle: WebSocketHandle, - last_pong_ts: Arc, - mut abort_rx: watch::Receiver<()>, - interval: Duration, - timeout_ms: i64, -) -> Result<()> { - loop { - tokio::select! { - _ = tokio::time::sleep(interval) => {} - _ = abort_rx.changed() => return Ok(()), - } - - // Check pong timeout. - let last = last_pong_ts.load(Ordering::Relaxed); - let now = util::timestamp::now(); - if now - last > timeout_ms { - tracing::warn!("kv channel ping timed out, closing connection"); - return Err(anyhow::anyhow!("ping timed out")); - } - - // Send ping. - let ping = protocol::ToClient::ToClientPing(protocol::ToClientPing { ts: now }); - let data = protocol::encode_to_client(&ping)?; - ws_handle.send(Message::Binary(data.into())).await?; - } -} - -// MARK: Message loop - -async fn message_loop( - ctx: &StandaloneCtx, - ws_handle: &WebSocketHandle, - namespace_id: Id, - open_actors: &Arc>>, - last_pong_ts: &AtomicI64, -) -> Result<()> { - let ws_rx = ws_handle.recv(); - let mut ws_rx = ws_rx.lock().await; - let mut term_signal = rivet_runtime::TermSignal::get(); - - // Per-actor channel routing for concurrent cross-actor request processing. - // Each actor gets its own mpsc channel and a spawned task that drains it - // sequentially, preserving intra-actor ordering while allowing inter-actor - // parallelism. Do not use tokio::spawn per request as that would break - // optimistic pipelining and journal write ordering. - // See docs-internal/engine/NATIVE_SQLITE_REVIEW_FINDINGS.md Finding 2. - let mut actor_channels: HashMap> = - HashMap::new(); - let mut actor_tasks = tokio::task::JoinSet::new(); - - // Use an async block so that early returns (via ?) still run cleanup below. - let result = async { - loop { - let msg = tokio::select! { - res = ws_rx.try_next() => { - match res? { - Some(msg) => msg, - None => { - tracing::debug!("websocket closed"); - return Ok(()); - } - } - } - _ = term_signal.recv() => { - // Send ToClientClose before shutting down. - let close_msg = protocol::ToClient::ToClientClose; - let data = protocol::encode_to_client(&close_msg)?; - let _ = ws_handle.send(Message::Binary(data.into())).await; - return Ok(()); - } - }; - - match msg { - Message::Binary(data) => { - handle_binary_message( - ctx, - ws_handle, - namespace_id, - open_actors, - last_pong_ts, - &data, - &mut actor_channels, - &mut actor_tasks, - ) - .await?; - } - Message::Close(_) => { - tracing::debug!("websocket close frame received"); - return Ok(()); - } - _ => {} - } - } - } - .await; - - // Drop all senders to signal per-actor tasks to stop, then wait for them - // to finish draining any in-flight requests. - actor_channels.clear(); - while actor_tasks.join_next().await.is_some() {} - - result -} - -async fn handle_binary_message( - ctx: &StandaloneCtx, - ws_handle: &WebSocketHandle, - namespace_id: Id, - open_actors: &Arc>>, - last_pong_ts: &AtomicI64, - data: &[u8], - actor_channels: &mut HashMap>, - actor_tasks: &mut tokio::task::JoinSet<()>, -) -> Result<()> { - let msg = match protocol::decode_to_server(data) { - Ok(msg) => msg, - Err(err) => { - tracing::warn!( - ?err, - data_len = data.len(), - "failed to deserialize kv channel message" - ); - return Ok(()); - } - }; - - match msg { - protocol::ToRivet::ToRivetPong(pong) => { - last_pong_ts.store(util::timestamp::now(), Ordering::Relaxed); - tracing::trace!(ts = pong.ts, "received pong"); - } - protocol::ToRivet::ToRivetRequest(req) => { - let is_close = matches!(req.data, protocol::RequestData::ActorCloseRequest); - let actor_id = req.actor_id.clone(); - let request_id = req.request_id; - - // Create a per-actor channel and task on first request for this actor. - if !actor_channels.contains_key(&actor_id) { - let (tx, rx) = mpsc::channel(64); - actor_tasks.spawn(actor_request_task( - Clone::clone(ctx), - Clone::clone(ws_handle), - namespace_id, - Clone::clone(open_actors), - rx, - )); - actor_channels.insert(actor_id.clone(), tx); - } - - // Route request to the actor's channel for sequential processing. - if let Some(tx) = actor_channels.get(&actor_id) { - match tx.try_send(req) { - Ok(()) => {} - Err(mpsc::error::TrySendError::Full(_)) => { - tracing::warn!(%actor_id, "per-actor channel full, applying backpressure"); - send_response( - ws_handle, - request_id, - error_response( - "backpressure", - "too many in-flight requests for this actor", - ), - ) - .await; - } - Err(mpsc::error::TrySendError::Closed(_)) => { - tracing::warn!(%actor_id, "per-actor task channel closed, removing dead entry"); - actor_channels.remove(&actor_id); - send_response( - ws_handle, - request_id, - error_response("internal_error", "internal error"), - ) - .await; - } - } - } - - // Remove the channel entry on close so the task exits after draining - // remaining requests and resources are freed. - if is_close { - actor_channels.remove(&actor_id); - } - } - } - - Ok(()) -} - -/// Processes requests for a single actor sequentially, preserving intra-actor -/// ordering. Spawned once per actor per connection. Exits when the sender is -/// dropped (connection end) or after processing an ActorCloseRequest. -async fn actor_request_task( - ctx: StandaloneCtx, - ws_handle: WebSocketHandle, - namespace_id: Id, - open_actors: Arc>>, - mut rx: mpsc::Receiver, -) { - // Cache keyed by actor id since a single connection multiplexes many actors. - let mut cached_actors: HashMap = HashMap::new(); - - while let Some(req) = rx.recv().await { - let is_close = matches!(req.data, protocol::RequestData::ActorCloseRequest); - - let response_data = match &req.data { - // Open/close are lifecycle ops that don't need a resolved actor. - protocol::RequestData::ActorOpenRequest - | protocol::RequestData::ActorCloseRequest => { - handle_request(&open_actors, &req).await - } - // KV ops: resolve once, cache, reuse. - _ => { - let is_open = open_actors.lock().await.contains(&req.actor_id); - if !is_open { - error_response( - "actor_not_open", - "actor is not opened on this connection", - ) - } else { - if !cached_actors.contains_key(&req.actor_id) { - match resolve_actor(&ctx, &req.actor_id, namespace_id).await { - Ok(v) => { - cached_actors.insert(req.actor_id.clone(), v); - } - Err(resp) => { - // Don't cache failures. Next request will retry. - send_response(&ws_handle, req.request_id, resp).await; - if is_close { - break; - } - continue; - } - } - } - let (parsed_id, actor_name) = - cached_actors.get(&req.actor_id).unwrap(); - - let recipient = actor_kv::Recipient { - actor_id: *parsed_id, - namespace_id, - name: actor_name.clone(), - }; - - match &req.data { - protocol::RequestData::KvGetRequest(body) => { - handle_kv_get(&ctx, &recipient, body).await - } - protocol::RequestData::KvPutRequest(body) => { - handle_kv_put(&ctx, &recipient, body).await - } - protocol::RequestData::KvDeleteRequest(body) => { - handle_kv_delete(&ctx, &recipient, body).await - } - protocol::RequestData::KvDeleteRangeRequest(body) => { - handle_kv_delete_range(&ctx, &recipient, body).await - } - _ => unreachable!(), - } - } - } - }; - - send_response(&ws_handle, req.request_id, response_data).await; - - // Stop processing after a close request. The sender is also removed - // from actor_channels by the message loop so no new requests arrive. - if is_close { - break; - } - } -} - -/// Encode and send a response to the client. Logs warnings on failure. -async fn send_response(ws_handle: &WebSocketHandle, request_id: u32, data: protocol::ResponseData) { - let response = - protocol::ToClient::ToClientResponse(protocol::ToClientResponse { request_id, data }); - - match protocol::encode_to_client(&response) { - Ok(encoded) => { - if let Err(err) = ws_handle.send(Message::Binary(encoded.into())).await { - tracing::warn!(?err, "failed to send kv channel response from actor task"); - } - } - Err(err) => { - tracing::warn!(?err, "failed to encode kv channel response"); - } - } -} - -// MARK: Request handling - -/// Handles actor lifecycle requests (open/close). KV operations are handled -/// directly in `actor_request_task` with cached actor resolution. -async fn handle_request( - open_actors: &Arc>>, - req: &protocol::ToRivetRequest, -) -> protocol::ResponseData { - match &req.data { - protocol::RequestData::ActorOpenRequest => { - handle_actor_open(open_actors, &req.actor_id).await - } - protocol::RequestData::ActorCloseRequest => { - handle_actor_close(open_actors, &req.actor_id).await - } - _ => unreachable!("KV operations are handled in actor_request_task"), - } -} - -// MARK: Actor open/close - -async fn handle_actor_open( - open_actors: &Arc>>, - actor_id: &str, -) -> protocol::ResponseData { - // Reject if this connection already has too many actors open. - { - let current_count = open_actors.lock().await.len(); - if current_count >= MAX_ACTORS_PER_CONNECTION { - return error_response( - "too_many_actors", - &format!("connection has too many open actors (max {MAX_ACTORS_PER_CONNECTION})"), - ); - } - } - - open_actors.lock().await.insert(actor_id.to_string()); - tracing::debug!(%actor_id, "actor opened"); - protocol::ResponseData::ActorOpenResponse -} - -async fn handle_actor_close( - open_actors: &Arc>>, - actor_id: &str, -) -> protocol::ResponseData { - open_actors.lock().await.remove(actor_id); - tracing::debug!(%actor_id, "actor closed"); - protocol::ResponseData::ActorCloseResponse -} - -// MARK: KV operations - -async fn handle_kv_get( - ctx: &StandaloneCtx, - recipient: &actor_kv::Recipient, - body: &protocol::KvGetRequest, -) -> protocol::ResponseData { - let start = Instant::now(); - metrics::KV_CHANNEL_REQUESTS_TOTAL - .with_label_values(&["get"]) - .inc(); - metrics::KV_CHANNEL_REQUEST_KEYS - .with_label_values(&["get"]) - .observe(body.keys.len() as f64); - - if let Err(resp) = validate_keys(&body.keys) { - return resp; - } - - let udb = match ctx.udb() { - Ok(udb) => udb, - Err(err) => return internal_error(&err), - }; - - let result = match actor_kv::get(&*udb, recipient, body.keys.clone()).await { - Ok((keys, values, _metadata)) => { - protocol::ResponseData::KvGetResponse(protocol::KvGetResponse { keys, values }) - } - Err(err) => internal_error(&err), - }; - metrics::KV_CHANNEL_REQUEST_DURATION - .with_label_values(&["get"]) - .observe(start.elapsed().as_secs_f64()); - result -} - -async fn handle_kv_put( - ctx: &StandaloneCtx, - recipient: &actor_kv::Recipient, - body: &protocol::KvPutRequest, -) -> protocol::ResponseData { - let start = Instant::now(); - metrics::KV_CHANNEL_REQUESTS_TOTAL - .with_label_values(&["put"]) - .inc(); - metrics::KV_CHANNEL_REQUEST_KEYS - .with_label_values(&["put"]) - .observe(body.keys.len() as f64); - - // Validate keys/values length match. - if body.keys.len() != body.values.len() { - return error_response( - "keys_values_length_mismatch", - "keys and values must have the same length", - ); - } - - // Validate batch size. - if body.keys.len() > MAX_KEYS { - return error_response( - "batch_too_large", - &format!("a maximum of {MAX_KEYS} entries is allowed"), - ); - } - - for key in &body.keys { - if key.len() + KEY_WRAPPER_OVERHEAD > MAX_KEY_SIZE { - return error_response( - "key_too_large", - &format!( - "key is too long (max {} bytes)", - MAX_KEY_SIZE - KEY_WRAPPER_OVERHEAD - ), - ); - } - } - for value in &body.values { - if value.len() > MAX_VALUE_SIZE { - return error_response( - "value_too_large", - &format!("value is too large (max {} KiB)", MAX_VALUE_SIZE / 1024), - ); - } - } - - let payload_size: usize = body - .keys - .iter() - .map(|k| k.len() + KEY_WRAPPER_OVERHEAD) - .sum::() - + body.values.iter().map(|v| v.len()).sum::(); - if payload_size > MAX_PUT_PAYLOAD_SIZE { - return error_response( - "payload_too_large", - &format!( - "total payload is too large (max {} KiB)", - MAX_PUT_PAYLOAD_SIZE / 1024 - ), - ); - } - - let udb = match ctx.udb() { - Ok(udb) => udb, - Err(err) => return internal_error(&err), - }; - - let result = match actor_kv::put(&*udb, recipient, body.keys.clone(), body.values.clone()).await - { - Ok(()) => protocol::ResponseData::KvPutResponse, - Err(err) => { - let rivet_err = rivet_error::RivetError::extract(&err); - if rivet_err.code() == "kv_storage_quota_exceeded" { - error_response("storage_quota_exceeded", rivet_err.message()) - } else { - internal_error(&err) - } - } - }; - metrics::KV_CHANNEL_REQUEST_DURATION - .with_label_values(&["put"]) - .observe(start.elapsed().as_secs_f64()); - result -} - -async fn handle_kv_delete( - ctx: &StandaloneCtx, - recipient: &actor_kv::Recipient, - body: &protocol::KvDeleteRequest, -) -> protocol::ResponseData { - let start = Instant::now(); - metrics::KV_CHANNEL_REQUESTS_TOTAL - .with_label_values(&["delete"]) - .inc(); - metrics::KV_CHANNEL_REQUEST_KEYS - .with_label_values(&["delete"]) - .observe(body.keys.len() as f64); - - if let Err(resp) = validate_keys(&body.keys) { - return resp; - } - - let udb = match ctx.udb() { - Ok(udb) => udb, - Err(err) => return internal_error(&err), - }; - - let result = match actor_kv::delete(&*udb, recipient, body.keys.clone()).await { - Ok(()) => protocol::ResponseData::KvDeleteResponse, - Err(err) => internal_error(&err), - }; - metrics::KV_CHANNEL_REQUEST_DURATION - .with_label_values(&["delete"]) - .observe(start.elapsed().as_secs_f64()); - result -} - -async fn handle_kv_delete_range( - ctx: &StandaloneCtx, - recipient: &actor_kv::Recipient, - body: &protocol::KvDeleteRangeRequest, -) -> protocol::ResponseData { - let start = Instant::now(); - metrics::KV_CHANNEL_REQUESTS_TOTAL - .with_label_values(&["delete_range"]) - .inc(); - if body.start.len() + KEY_WRAPPER_OVERHEAD > MAX_KEY_SIZE { - return error_response( - "key_too_large", - &format!( - "start key is too long (max {} bytes)", - MAX_KEY_SIZE - KEY_WRAPPER_OVERHEAD - ), - ); - } - if body.end.len() + KEY_WRAPPER_OVERHEAD > MAX_KEY_SIZE { - return error_response( - "key_too_large", - &format!( - "end key is too long (max {} bytes)", - MAX_KEY_SIZE - KEY_WRAPPER_OVERHEAD - ), - ); - } - - let udb = match ctx.udb() { - Ok(udb) => udb, - Err(err) => return internal_error(&err), - }; - - let result = match actor_kv::delete_range( - &*udb, - recipient, - body.start.clone(), - body.end.clone(), - ) - .await - { - Ok(()) => protocol::ResponseData::KvDeleteResponse, - Err(err) => internal_error(&err), - }; - metrics::KV_CHANNEL_REQUEST_DURATION - .with_label_values(&["delete_range"]) - .observe(start.elapsed().as_secs_f64()); - result -} - -// MARK: Helpers - -/// Look up an actor by ID and return the parsed ID and actor name. -/// -/// Verifies the actor belongs to the authenticated namespace. -async fn resolve_actor( - ctx: &StandaloneCtx, - actor_id: &str, - expected_namespace_id: Id, -) -> std::result::Result<(Id, String), protocol::ResponseData> { - let parsed_id = Id::parse(actor_id) - .map_err(|err| error_response("actor_not_found", &format!("invalid actor id: {err}")))?; - - let actor = ctx - .op(pegboard::ops::actor::get::Input { - actor_ids: vec![parsed_id], - fetch_error: false, - }) - .await - .map_err(|err| internal_error(&err))? - .actors - .into_iter() - .next(); - - match actor { - Some(actor) => { - if actor.namespace_id != expected_namespace_id { - return Err(error_response( - "actor_not_found", - "actor does not exist or is not running", - )); - } - Ok((parsed_id, actor.name)) - } - None => Err(error_response( - "actor_not_found", - "actor does not exist or is not running", - )), - } -} - -/// Validate a list of KV keys against size and count limits. -fn validate_keys(keys: &[protocol::KvKey]) -> std::result::Result<(), protocol::ResponseData> { - if keys.len() > MAX_KEYS { - return Err(error_response( - "batch_too_large", - &format!("a maximum of {MAX_KEYS} keys is allowed"), - )); - } - for key in keys { - if key.len() + KEY_WRAPPER_OVERHEAD > MAX_KEY_SIZE { - return Err(error_response( - "key_too_large", - &format!( - "key is too long (max {} bytes)", - MAX_KEY_SIZE - KEY_WRAPPER_OVERHEAD - ), - )); - } - } - Ok(()) -} - -fn error_response(code: &str, message: &str) -> protocol::ResponseData { - protocol::ResponseData::ErrorResponse(protocol::ErrorResponse { - code: code.to_string(), - message: message.to_string(), - }) -} - -/// Log an internal error with full details server-side and return a generic -/// error message to the client. Prevents leaking stack traces, database errors, -/// or other internal state over the wire. -fn internal_error(err: &anyhow::Error) -> protocol::ResponseData { - tracing::error!(?err, "kv channel internal error"); - error_response("internal_error", "internal error") -} diff --git a/engine/packages/pegboard-kv-channel/src/metrics.rs b/engine/packages/pegboard-kv-channel/src/metrics.rs deleted file mode 100644 index 6c71756979..0000000000 --- a/engine/packages/pegboard-kv-channel/src/metrics.rs +++ /dev/null @@ -1,26 +0,0 @@ -use rivet_metrics::{BUCKETS, REGISTRY, prometheus::*}; - -lazy_static::lazy_static! { - pub static ref KV_CHANNEL_REQUEST_DURATION: HistogramVec = register_histogram_vec_with_registry!( - "pegboard_kv_channel_request_duration_seconds", - "Duration of KV channel handler requests.", - &["op"], - BUCKETS.to_vec(), - *REGISTRY - ).unwrap(); - - pub static ref KV_CHANNEL_REQUEST_KEYS: HistogramVec = register_histogram_vec_with_registry!( - "pegboard_kv_channel_request_keys", - "Number of keys per KV channel request.", - &["op"], - vec![1.0, 2.0, 5.0, 10.0, 25.0, 50.0, 100.0, 128.0], - *REGISTRY - ).unwrap(); - - pub static ref KV_CHANNEL_REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( - "pegboard_kv_channel_requests_total", - "Total KV channel requests handled.", - &["op"], - *REGISTRY - ).unwrap(); -} diff --git a/engine/sdks/rust/kv-channel-protocol/Cargo.toml b/engine/sdks/rust/kv-channel-protocol/Cargo.toml deleted file mode 100644 index 7991d5772b..0000000000 --- a/engine/sdks/rust/kv-channel-protocol/Cargo.toml +++ /dev/null @@ -1,14 +0,0 @@ -[package] -name = "rivet-kv-channel-protocol" -version.workspace = true -authors.workspace = true -license.workspace = true -edition.workspace = true - -[dependencies] -serde_bare.workspace = true -serde.workspace = true -vbare.workspace = true - -[build-dependencies] -vbare-compiler.workspace = true diff --git a/engine/sdks/rust/kv-channel-protocol/build.rs b/engine/sdks/rust/kv-channel-protocol/build.rs deleted file mode 100644 index 7867d7183a..0000000000 --- a/engine/sdks/rust/kv-channel-protocol/build.rs +++ /dev/null @@ -1,157 +0,0 @@ -use std::path::Path; - -fn main() -> Result<(), Box> { - let manifest_dir = std::env::var("CARGO_MANIFEST_DIR")?; - let workspace_root = Path::new(&manifest_dir) - .parent() - .and_then(|p| p.parent()) - .and_then(|p| p.parent()) - .ok_or("Failed to find workspace root")?; - - let schema_dir = workspace_root - .join("sdks") - .join("schemas") - .join("kv-channel-protocol"); - - // Rust type generation from BARE schemas. - let cfg = vbare_compiler::Config::with_hashable_map(); - vbare_compiler::process_schemas_with_config(&schema_dir, &cfg)?; - - // TypeScript SDK generation. - let cli_js_path = workspace_root - .parent() - .unwrap() - .join("node_modules/@bare-ts/tools/dist/bin/cli.js"); - if cli_js_path.exists() { - typescript::generate_sdk(&schema_dir, workspace_root); - } else { - println!( - "cargo:warning=TypeScript SDK generation skipped: cli.js not found at {}. Run `pnpm install` to install.", - cli_js_path.display() - ); - } - - Ok(()) -} - -mod typescript { - use super::*; - use std::{fs, path::PathBuf, process::Command}; - - pub fn generate_sdk(schema_dir: &Path, workspace_root: &Path) { - let sdk_dir = workspace_root - .join("sdks") - .join("typescript") - .join("kv-channel-protocol"); - let src_dir = sdk_dir.join("src"); - - let highest_version_path = find_highest_version(schema_dir); - - let _ = fs::remove_dir_all(&src_dir); - if let Err(e) = fs::create_dir_all(&src_dir) { - panic!("Failed to create SDK directory: {}", e); - } - - let output_path = src_dir.join("index.ts"); - - let output = Command::new( - workspace_root - .parent() - .unwrap() - .join("node_modules/@bare-ts/tools/dist/bin/cli.js"), - ) - .arg("compile") - .arg("--generator") - .arg("ts") - .arg(&highest_version_path) - .arg("-o") - .arg(&output_path) - .output() - .expect("Failed to execute bare compiler for TypeScript"); - - if !output.status.success() { - panic!( - "BARE TypeScript generation failed: {}", - String::from_utf8_lossy(&output.stderr), - ); - } - - // Post-process the generated TypeScript file. - // IMPORTANT: Keep this in sync with rivetkit-typescript/packages/rivetkit/scripts/compile-bare.ts - post_process_generated_ts(&output_path); - } - - const POST_PROCESS_MARKER: &str = "// @generated - post-processed by build.rs\n"; - - /// Post-process the generated TypeScript file to: - /// 1. Replace @bare-ts/lib import with @rivetkit/bare-ts - /// 2. Replace Node.js assert import with a custom assert function - /// - /// IMPORTANT: Keep this in sync with rivetkit-typescript/packages/rivetkit/scripts/compile-bare.ts - fn post_process_generated_ts(path: &Path) { - let content = fs::read_to_string(path).expect("Failed to read generated TypeScript file"); - - // Skip if already post-processed. - if content.starts_with(POST_PROCESS_MARKER) { - return; - } - - // Add PROTOCOL_VERSION constant at the top (not in the BARE schema). - let content = format!("export const PROTOCOL_VERSION = 1;\n\n{content}"); - - // Replace @bare-ts/lib with @rivetkit/bare-ts. - let content = content.replace("@bare-ts/lib", "@rivetkit/bare-ts"); - - // Replace Node.js assert import with custom assert function. - let content = content.replace("import assert from \"assert\"", ""); - let content = content.replace("import assert from \"node:assert\"", ""); - - // Append custom assert function. - let assert_function = r#" -function assert(condition: boolean, message?: string): asserts condition { - if (!condition) throw new Error(message ?? "Assertion failed") -} -"#; - let content = format!("{}{}\n{}", POST_PROCESS_MARKER, content, assert_function); - - // Validate post-processing succeeded. - assert!( - !content.contains("@bare-ts/lib"), - "Failed to replace @bare-ts/lib import" - ); - assert!( - !content.contains("import assert from"), - "Failed to remove Node.js assert import" - ); - - fs::write(path, content).expect("Failed to write post-processed TypeScript file"); - } - - fn find_highest_version(schema_dir: &Path) -> PathBuf { - let mut highest_version = 0; - let mut highest_version_path = PathBuf::new(); - - for entry in fs::read_dir(schema_dir).unwrap().flatten() { - if !entry.path().is_dir() { - let path = entry.path(); - let bare_name = path - .file_name() - .unwrap() - .to_str() - .unwrap() - .split_once('.') - .unwrap() - .0; - - if let Ok(version) = bare_name[1..].parse::() { - if version > highest_version { - highest_version = version; - highest_version_path = path; - } - } - } - } - - highest_version_path - } -} diff --git a/engine/sdks/rust/kv-channel-protocol/src/generated.rs b/engine/sdks/rust/kv-channel-protocol/src/generated.rs deleted file mode 100644 index 84801af8dc..0000000000 --- a/engine/sdks/rust/kv-channel-protocol/src/generated.rs +++ /dev/null @@ -1 +0,0 @@ -include!(concat!(env!("OUT_DIR"), "/combined_imports.rs")); diff --git a/engine/sdks/rust/kv-channel-protocol/src/lib.rs b/engine/sdks/rust/kv-channel-protocol/src/lib.rs deleted file mode 100644 index 8851f713aa..0000000000 --- a/engine/sdks/rust/kv-channel-protocol/src/lib.rs +++ /dev/null @@ -1,141 +0,0 @@ -pub mod generated; - -// Re-export latest version. -pub use generated::v1::*; - -pub const PROTOCOL_VERSION: u32 = 1; - -/// Serialize a ToRivet message to BARE bytes. -pub fn encode_to_server(msg: &ToRivet) -> Result, serde_bare::error::Error> { - serde_bare::to_vec(msg) -} - -/// Deserialize a ToRivet message from BARE bytes. -pub fn decode_to_server(bytes: &[u8]) -> Result { - serde_bare::from_slice(bytes) -} - -/// Serialize a ToClient message to BARE bytes. -pub fn encode_to_client(msg: &ToClient) -> Result, serde_bare::error::Error> { - serde_bare::to_vec(msg) -} - -/// Deserialize a ToClient message from BARE bytes. -pub fn decode_to_client(bytes: &[u8]) -> Result { - serde_bare::from_slice(bytes) -} - -#[cfg(test)] -mod tests { - use super::*; - - // MARK: Round-trip tests - - #[test] - fn round_trip_to_server_request_actor_open() { - let msg = ToRivet::ToRivetRequest(ToRivetRequest { - request_id: 1, - actor_id: "abc".into(), - data: RequestData::ActorOpenRequest, - }); - let bytes = encode_to_server(&msg).unwrap(); - let decoded = decode_to_server(&bytes).unwrap(); - assert_eq!(msg, decoded); - } - - #[test] - fn round_trip_to_server_request_kv_get() { - let msg = ToRivet::ToRivetRequest(ToRivetRequest { - request_id: 3, - actor_id: "actor1".into(), - data: RequestData::KvGetRequest(KvGetRequest { - keys: vec![vec![1, 2, 3], vec![4, 5]], - }), - }); - let bytes = encode_to_server(&msg).unwrap(); - let decoded = decode_to_server(&bytes).unwrap(); - assert_eq!(msg, decoded); - } - - #[test] - fn round_trip_to_client_response_error() { - let msg = ToClient::ToClientResponse(ToClientResponse { - request_id: 10, - data: ResponseData::ErrorResponse(ErrorResponse { - code: "actor_locked".into(), - message: "actor is locked by another connection".into(), - }), - }); - let bytes = encode_to_client(&msg).unwrap(); - let decoded = decode_to_client(&bytes).unwrap(); - assert_eq!(msg, decoded); - } - - #[test] - fn round_trip_to_client_ping() { - let msg = ToClient::ToClientPing(ToClientPing { ts: 9876543210 }); - let bytes = encode_to_client(&msg).unwrap(); - let decoded = decode_to_client(&bytes).unwrap(); - assert_eq!(msg, decoded); - } - - #[test] - fn round_trip_to_client_close() { - let msg = ToClient::ToClientClose; - let bytes = encode_to_client(&msg).unwrap(); - let decoded = decode_to_client(&bytes).unwrap(); - assert_eq!(msg, decoded); - } - - // MARK: Cross-language byte compatibility tests - - #[test] - fn bytes_to_server_request_actor_open() { - let msg = ToRivet::ToRivetRequest(ToRivetRequest { - request_id: 1, - actor_id: "abc".into(), - data: RequestData::ActorOpenRequest, - }); - let bytes = encode_to_server(&msg).unwrap(); - assert_eq!( - bytes, - [0x00, 0x01, 0x00, 0x00, 0x00, 0x03, 0x61, 0x62, 0x63, 0x00] - ); - } - - #[test] - fn bytes_to_server_pong() { - let msg = ToRivet::ToRivetPong(ToRivetPong { ts: 1234567890 }); - let bytes = encode_to_server(&msg).unwrap(); - assert_eq!( - bytes, - [0x01, 0xD2, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00] - ); - } - - #[test] - fn bytes_to_client_close() { - let msg = ToClient::ToClientClose; - let bytes = encode_to_client(&msg).unwrap(); - assert_eq!(bytes, [0x02]); - } - - #[test] - fn bytes_to_client_response_kv_get() { - let msg = ToClient::ToClientResponse(ToClientResponse { - request_id: 42, - data: ResponseData::KvGetResponse(KvGetResponse { - keys: vec![vec![1, 2]], - values: vec![vec![3, 4, 5]], - }), - }); - let bytes = encode_to_client(&msg).unwrap(); - assert_eq!( - bytes, - [ - 0x00, 0x2A, 0x00, 0x00, 0x00, 0x03, 0x01, 0x02, 0x01, 0x02, 0x01, 0x03, 0x03, 0x04, - 0x05 - ] - ); - } -} diff --git a/engine/sdks/schemas/kv-channel-protocol/v1.bare b/engine/sdks/schemas/kv-channel-protocol/v1.bare deleted file mode 100644 index 1130a406da..0000000000 --- a/engine/sdks/schemas/kv-channel-protocol/v1.bare +++ /dev/null @@ -1,146 +0,0 @@ -# KV Channel Protocol v1 - -# MARK: Core - -# Id is a 30-character base36 string encoding the V1 format from -# engine/packages/util-id/. Use the util-id library for parsing -# and validation. Do not hand-roll Id parsing. -type Id str - -# MARK: Actor Session -# -# ActorOpen acquires a single-writer lock on an actor's KV data. -# ActorClose releases the lock. These are optimistic: the client -# does not wait for a response before sending KV requests. The -# server processes messages in WebSocket order, so the open is -# always processed before any KV requests that follow it. -# -# If the lock cannot be acquired (another connection holds it), -# the server sends an error response for the open and rejects -# subsequent KV requests for that actor with "actor_locked". - -# actorId is on ToRivetRequest, not on open/close. The outer -# actorId is the single source of truth for routing. -type ActorOpenRequest void - -type ActorCloseRequest void - -type ActorOpenResponse void - -type ActorCloseResponse void - -# MARK: KV -# -# These types mirror the runner protocol KV types -# (engine/sdks/schemas/runner-protocol/). Changes to KV types in -# either protocol must be mirrored in the other. -# -# Omitted from the runner protocol (not needed by the VFS): -# - KvListRequest/KvListResponse (prefix scan) -# - KvDropRequest/KvDropResponse (drop all KV data) -# - KvMetadata on responses (update timestamps) -# -# The same engine KV limits apply to both protocols. See the -# "KV Limits" section below. - -type KvKey data -type KvValue data - -type KvGetRequest struct { - keys: list -} - -type KvPutRequest struct { - # keys and values are parallel lists. keys.len() must equal values.len(). - keys: list - values: list -} - -type KvDeleteRequest struct { - keys: list -} - -type KvDeleteRangeRequest struct { - start: KvKey - end: KvKey -} - -# MARK: Request/Response - -type RequestData union { - ActorOpenRequest | - ActorCloseRequest | - KvGetRequest | - KvPutRequest | - KvDeleteRequest | - KvDeleteRangeRequest -} - -type ErrorResponse struct { - code: str - message: str -} - -type KvGetResponse struct { - # Only keys that exist are returned. Missing keys are omitted. - # The client infers missing keys by comparing request keys to - # response keys. This matches the runner protocol behavior - # (engine/packages/pegboard/src/actor_kv/mod.rs). - keys: list - values: list -} - -type KvPutResponse void - -# KvDeleteResponse is used for both KvDeleteRequest and -# KvDeleteRangeRequest, same as the runner protocol. -type KvDeleteResponse void - -type ResponseData union { - ErrorResponse | - ActorOpenResponse | - ActorCloseResponse | - KvGetResponse | - KvPutResponse | - KvDeleteResponse -} - -# MARK: To Rivet - -type ToRivetRequest struct { - requestId: u32 - actorId: Id - data: RequestData -} - -type ToRivetPong struct { - ts: i64 -} - -type ToRivet union { - ToRivetRequest | - ToRivetPong -} - -# MARK: To Client - -type ToClientResponse struct { - requestId: u32 - data: ResponseData -} - -type ToClientPing struct { - ts: i64 -} - -# Server-initiated close. Sent when the server is shutting down -# or draining connections. The client should close all actors -# and reconnect with backoff. Same pattern as the runner -# protocol's ToRunnerClose. -type ToClientClose void - -type ToClient union { - ToClientResponse | - ToClientPing | - ToClientClose -} diff --git a/engine/sdks/typescript/kv-channel-protocol/package.json b/engine/sdks/typescript/kv-channel-protocol/package.json deleted file mode 100644 index 79cb0822b9..0000000000 --- a/engine/sdks/typescript/kv-channel-protocol/package.json +++ /dev/null @@ -1,36 +0,0 @@ -{ - "name": "@rivetkit/engine-kv-channel-protocol", - "version": "2.1.6", - "license": "Apache-2.0", - "type": "module", - "exports": { - ".": { - "import": { - "types": "./dist/index.d.ts", - "default": "./dist/index.js" - }, - "require": { - "types": "./dist/index.d.cts", - "default": "./dist/index.cjs" - } - } - }, - "files": [ - "dist/**/*.js", - "dist/**/*.d.ts" - ], - "scripts": { - "build": "tsup src/index.ts", - "clean": "rm -rf dist", - "check-types": "tsc --noEmit" - }, - "types": "dist/index.d.ts", - "dependencies": { - "@rivetkit/bare-ts": "^0.6.2" - }, - "devDependencies": { - "@types/node": "^20.19.13", - "tsup": "^8.5.0", - "typescript": "^5.9.2" - } -} diff --git a/engine/sdks/typescript/kv-channel-protocol/src/index.ts b/engine/sdks/typescript/kv-channel-protocol/src/index.ts deleted file mode 100644 index fad75ec166..0000000000 --- a/engine/sdks/typescript/kv-channel-protocol/src/index.ts +++ /dev/null @@ -1,524 +0,0 @@ -// @generated - post-processed by build.rs -export const PROTOCOL_VERSION = 1; - -import * as bare from "@rivetkit/bare-ts" - -const DEFAULT_CONFIG = /* @__PURE__ */ bare.Config({}) - -export type i64 = bigint -export type u32 = number - -/** - * Id is a 30-character base36 string encoding the V1 format from - * engine/packages/util-id/. Use the util-id library for parsing - * and validation. Do not hand-roll Id parsing. - */ -export type Id = string - -export function readId(bc: bare.ByteCursor): Id { - return bare.readString(bc) -} - -export function writeId(bc: bare.ByteCursor, x: Id): void { - bare.writeString(bc, x) -} - -/** - * actorId is on ToRivetRequest, not on open/close. The outer - * actorId is the single source of truth for routing. - */ -export type ActorOpenRequest = null - -export type ActorCloseRequest = null - -export type ActorOpenResponse = null - -export type ActorCloseResponse = null - -export type KvKey = ArrayBuffer - -export function readKvKey(bc: bare.ByteCursor): KvKey { - return bare.readData(bc) -} - -export function writeKvKey(bc: bare.ByteCursor, x: KvKey): void { - bare.writeData(bc, x) -} - -export type KvValue = ArrayBuffer - -export function readKvValue(bc: bare.ByteCursor): KvValue { - return bare.readData(bc) -} - -export function writeKvValue(bc: bare.ByteCursor, x: KvValue): void { - bare.writeData(bc, x) -} - -function read0(bc: bare.ByteCursor): readonly KvKey[] { - const len = bare.readUintSafe(bc) - if (len === 0) { - return [] - } - const result = [readKvKey(bc)] - for (let i = 1; i < len; i++) { - result[i] = readKvKey(bc) - } - return result -} - -function write0(bc: bare.ByteCursor, x: readonly KvKey[]): void { - bare.writeUintSafe(bc, x.length) - for (let i = 0; i < x.length; i++) { - writeKvKey(bc, x[i]) - } -} - -export type KvGetRequest = { - readonly keys: readonly KvKey[] -} - -export function readKvGetRequest(bc: bare.ByteCursor): KvGetRequest { - return { - keys: read0(bc), - } -} - -export function writeKvGetRequest(bc: bare.ByteCursor, x: KvGetRequest): void { - write0(bc, x.keys) -} - -function read1(bc: bare.ByteCursor): readonly KvValue[] { - const len = bare.readUintSafe(bc) - if (len === 0) { - return [] - } - const result = [readKvValue(bc)] - for (let i = 1; i < len; i++) { - result[i] = readKvValue(bc) - } - return result -} - -function write1(bc: bare.ByteCursor, x: readonly KvValue[]): void { - bare.writeUintSafe(bc, x.length) - for (let i = 0; i < x.length; i++) { - writeKvValue(bc, x[i]) - } -} - -export type KvPutRequest = { - /** - * keys and values are parallel lists. keys.len() must equal values.len(). - */ - readonly keys: readonly KvKey[] - readonly values: readonly KvValue[] -} - -export function readKvPutRequest(bc: bare.ByteCursor): KvPutRequest { - return { - keys: read0(bc), - values: read1(bc), - } -} - -export function writeKvPutRequest(bc: bare.ByteCursor, x: KvPutRequest): void { - write0(bc, x.keys) - write1(bc, x.values) -} - -export type KvDeleteRequest = { - readonly keys: readonly KvKey[] -} - -export function readKvDeleteRequest(bc: bare.ByteCursor): KvDeleteRequest { - return { - keys: read0(bc), - } -} - -export function writeKvDeleteRequest(bc: bare.ByteCursor, x: KvDeleteRequest): void { - write0(bc, x.keys) -} - -export type KvDeleteRangeRequest = { - readonly start: KvKey - readonly end: KvKey -} - -export function readKvDeleteRangeRequest(bc: bare.ByteCursor): KvDeleteRangeRequest { - return { - start: readKvKey(bc), - end: readKvKey(bc), - } -} - -export function writeKvDeleteRangeRequest(bc: bare.ByteCursor, x: KvDeleteRangeRequest): void { - writeKvKey(bc, x.start) - writeKvKey(bc, x.end) -} - -export type RequestData = - | { readonly tag: "ActorOpenRequest"; readonly val: ActorOpenRequest } - | { readonly tag: "ActorCloseRequest"; readonly val: ActorCloseRequest } - | { readonly tag: "KvGetRequest"; readonly val: KvGetRequest } - | { readonly tag: "KvPutRequest"; readonly val: KvPutRequest } - | { readonly tag: "KvDeleteRequest"; readonly val: KvDeleteRequest } - | { readonly tag: "KvDeleteRangeRequest"; readonly val: KvDeleteRangeRequest } - -export function readRequestData(bc: bare.ByteCursor): RequestData { - const offset = bc.offset - const tag = bare.readU8(bc) - switch (tag) { - case 0: - return { tag: "ActorOpenRequest", val: null } - case 1: - return { tag: "ActorCloseRequest", val: null } - case 2: - return { tag: "KvGetRequest", val: readKvGetRequest(bc) } - case 3: - return { tag: "KvPutRequest", val: readKvPutRequest(bc) } - case 4: - return { tag: "KvDeleteRequest", val: readKvDeleteRequest(bc) } - case 5: - return { tag: "KvDeleteRangeRequest", val: readKvDeleteRangeRequest(bc) } - default: { - bc.offset = offset - throw new bare.BareError(offset, "invalid tag") - } - } -} - -export function writeRequestData(bc: bare.ByteCursor, x: RequestData): void { - switch (x.tag) { - case "ActorOpenRequest": { - bare.writeU8(bc, 0) - break - } - case "ActorCloseRequest": { - bare.writeU8(bc, 1) - break - } - case "KvGetRequest": { - bare.writeU8(bc, 2) - writeKvGetRequest(bc, x.val) - break - } - case "KvPutRequest": { - bare.writeU8(bc, 3) - writeKvPutRequest(bc, x.val) - break - } - case "KvDeleteRequest": { - bare.writeU8(bc, 4) - writeKvDeleteRequest(bc, x.val) - break - } - case "KvDeleteRangeRequest": { - bare.writeU8(bc, 5) - writeKvDeleteRangeRequest(bc, x.val) - break - } - } -} - -export type ErrorResponse = { - readonly code: string - readonly message: string -} - -export function readErrorResponse(bc: bare.ByteCursor): ErrorResponse { - return { - code: bare.readString(bc), - message: bare.readString(bc), - } -} - -export function writeErrorResponse(bc: bare.ByteCursor, x: ErrorResponse): void { - bare.writeString(bc, x.code) - bare.writeString(bc, x.message) -} - -export type KvGetResponse = { - /** - * Only keys that exist are returned. Missing keys are omitted. - * The client infers missing keys by comparing request keys to - * response keys. This matches the runner protocol behavior - * (engine/packages/pegboard/src/actor_kv/mod.rs). - */ - readonly keys: readonly KvKey[] - readonly values: readonly KvValue[] -} - -export function readKvGetResponse(bc: bare.ByteCursor): KvGetResponse { - return { - keys: read0(bc), - values: read1(bc), - } -} - -export function writeKvGetResponse(bc: bare.ByteCursor, x: KvGetResponse): void { - write0(bc, x.keys) - write1(bc, x.values) -} - -export type KvPutResponse = null - -/** - * KvDeleteResponse is used for both KvDeleteRequest and - * KvDeleteRangeRequest, same as the runner protocol. - */ -export type KvDeleteResponse = null - -export type ResponseData = - | { readonly tag: "ErrorResponse"; readonly val: ErrorResponse } - | { readonly tag: "ActorOpenResponse"; readonly val: ActorOpenResponse } - | { readonly tag: "ActorCloseResponse"; readonly val: ActorCloseResponse } - | { readonly tag: "KvGetResponse"; readonly val: KvGetResponse } - | { readonly tag: "KvPutResponse"; readonly val: KvPutResponse } - | { readonly tag: "KvDeleteResponse"; readonly val: KvDeleteResponse } - -export function readResponseData(bc: bare.ByteCursor): ResponseData { - const offset = bc.offset - const tag = bare.readU8(bc) - switch (tag) { - case 0: - return { tag: "ErrorResponse", val: readErrorResponse(bc) } - case 1: - return { tag: "ActorOpenResponse", val: null } - case 2: - return { tag: "ActorCloseResponse", val: null } - case 3: - return { tag: "KvGetResponse", val: readKvGetResponse(bc) } - case 4: - return { tag: "KvPutResponse", val: null } - case 5: - return { tag: "KvDeleteResponse", val: null } - default: { - bc.offset = offset - throw new bare.BareError(offset, "invalid tag") - } - } -} - -export function writeResponseData(bc: bare.ByteCursor, x: ResponseData): void { - switch (x.tag) { - case "ErrorResponse": { - bare.writeU8(bc, 0) - writeErrorResponse(bc, x.val) - break - } - case "ActorOpenResponse": { - bare.writeU8(bc, 1) - break - } - case "ActorCloseResponse": { - bare.writeU8(bc, 2) - break - } - case "KvGetResponse": { - bare.writeU8(bc, 3) - writeKvGetResponse(bc, x.val) - break - } - case "KvPutResponse": { - bare.writeU8(bc, 4) - break - } - case "KvDeleteResponse": { - bare.writeU8(bc, 5) - break - } - } -} - -export type ToRivetRequest = { - readonly requestId: u32 - readonly actorId: Id - readonly data: RequestData -} - -export function readToRivetRequest(bc: bare.ByteCursor): ToRivetRequest { - return { - requestId: bare.readU32(bc), - actorId: readId(bc), - data: readRequestData(bc), - } -} - -export function writeToRivetRequest(bc: bare.ByteCursor, x: ToRivetRequest): void { - bare.writeU32(bc, x.requestId) - writeId(bc, x.actorId) - writeRequestData(bc, x.data) -} - -export type ToRivetPong = { - readonly ts: i64 -} - -export function readToRivetPong(bc: bare.ByteCursor): ToRivetPong { - return { - ts: bare.readI64(bc), - } -} - -export function writeToRivetPong(bc: bare.ByteCursor, x: ToRivetPong): void { - bare.writeI64(bc, x.ts) -} - -export type ToRivet = - | { readonly tag: "ToRivetRequest"; readonly val: ToRivetRequest } - | { readonly tag: "ToRivetPong"; readonly val: ToRivetPong } - -export function readToRivet(bc: bare.ByteCursor): ToRivet { - const offset = bc.offset - const tag = bare.readU8(bc) - switch (tag) { - case 0: - return { tag: "ToRivetRequest", val: readToRivetRequest(bc) } - case 1: - return { tag: "ToRivetPong", val: readToRivetPong(bc) } - default: { - bc.offset = offset - throw new bare.BareError(offset, "invalid tag") - } - } -} - -export function writeToRivet(bc: bare.ByteCursor, x: ToRivet): void { - switch (x.tag) { - case "ToRivetRequest": { - bare.writeU8(bc, 0) - writeToRivetRequest(bc, x.val) - break - } - case "ToRivetPong": { - bare.writeU8(bc, 1) - writeToRivetPong(bc, x.val) - break - } - } -} - -export function encodeToRivet(x: ToRivet, config?: Partial): Uint8Array { - const fullConfig = config != null ? bare.Config(config) : DEFAULT_CONFIG - const bc = new bare.ByteCursor( - new Uint8Array(fullConfig.initialBufferLength), - fullConfig, - ) - writeToRivet(bc, x) - return new Uint8Array(bc.view.buffer, bc.view.byteOffset, bc.offset) -} - -export function decodeToRivet(bytes: Uint8Array): ToRivet { - const bc = new bare.ByteCursor(bytes, DEFAULT_CONFIG) - const result = readToRivet(bc) - if (bc.offset < bc.view.byteLength) { - throw new bare.BareError(bc.offset, "remaining bytes") - } - return result -} - -export type ToClientResponse = { - readonly requestId: u32 - readonly data: ResponseData -} - -export function readToClientResponse(bc: bare.ByteCursor): ToClientResponse { - return { - requestId: bare.readU32(bc), - data: readResponseData(bc), - } -} - -export function writeToClientResponse(bc: bare.ByteCursor, x: ToClientResponse): void { - bare.writeU32(bc, x.requestId) - writeResponseData(bc, x.data) -} - -export type ToClientPing = { - readonly ts: i64 -} - -export function readToClientPing(bc: bare.ByteCursor): ToClientPing { - return { - ts: bare.readI64(bc), - } -} - -export function writeToClientPing(bc: bare.ByteCursor, x: ToClientPing): void { - bare.writeI64(bc, x.ts) -} - -/** - * Server-initiated close. Sent when the server is shutting down - * or draining connections. The client should close all actors - * and reconnect with backoff. Same pattern as the runner - * protocol's ToRunnerClose. - */ -export type ToClientClose = null - -export type ToClient = - | { readonly tag: "ToClientResponse"; readonly val: ToClientResponse } - | { readonly tag: "ToClientPing"; readonly val: ToClientPing } - | { readonly tag: "ToClientClose"; readonly val: ToClientClose } - -export function readToClient(bc: bare.ByteCursor): ToClient { - const offset = bc.offset - const tag = bare.readU8(bc) - switch (tag) { - case 0: - return { tag: "ToClientResponse", val: readToClientResponse(bc) } - case 1: - return { tag: "ToClientPing", val: readToClientPing(bc) } - case 2: - return { tag: "ToClientClose", val: null } - default: { - bc.offset = offset - throw new bare.BareError(offset, "invalid tag") - } - } -} - -export function writeToClient(bc: bare.ByteCursor, x: ToClient): void { - switch (x.tag) { - case "ToClientResponse": { - bare.writeU8(bc, 0) - writeToClientResponse(bc, x.val) - break - } - case "ToClientPing": { - bare.writeU8(bc, 1) - writeToClientPing(bc, x.val) - break - } - case "ToClientClose": { - bare.writeU8(bc, 2) - break - } - } -} - -export function encodeToClient(x: ToClient, config?: Partial): Uint8Array { - const fullConfig = config != null ? bare.Config(config) : DEFAULT_CONFIG - const bc = new bare.ByteCursor( - new Uint8Array(fullConfig.initialBufferLength), - fullConfig, - ) - writeToClient(bc, x) - return new Uint8Array(bc.view.buffer, bc.view.byteOffset, bc.offset) -} - -export function decodeToClient(bytes: Uint8Array): ToClient { - const bc = new bare.ByteCursor(bytes, DEFAULT_CONFIG) - const result = readToClient(bc) - if (bc.offset < bc.view.byteLength) { - throw new bare.BareError(bc.offset, "remaining bytes") - } - return result -} - - -function assert(condition: boolean, message?: string): asserts condition { - if (!condition) throw new Error(message ?? "Assertion failed") -} diff --git a/engine/sdks/typescript/kv-channel-protocol/tsconfig.json b/engine/sdks/typescript/kv-channel-protocol/tsconfig.json deleted file mode 100644 index d8dc820c55..0000000000 --- a/engine/sdks/typescript/kv-channel-protocol/tsconfig.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "extends": "../../../../tsconfig.base.json", - "compilerOptions": { - "declaration": true, - "outDir": "./dist" - }, - "exclude": ["dist", "node_modules"], - "include": ["**/*.ts"] -} diff --git a/engine/sdks/typescript/kv-channel-protocol/tsup.config.ts b/engine/sdks/typescript/kv-channel-protocol/tsup.config.ts deleted file mode 100644 index 2d399b5efe..0000000000 --- a/engine/sdks/typescript/kv-channel-protocol/tsup.config.ts +++ /dev/null @@ -1,4 +0,0 @@ -import { defineConfig } from "tsup"; -import defaultConfig from "../../../../tsup.base"; - -export default defineConfig(defaultConfig); diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index 9a068ca917..69e72fef40 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -4,7 +4,6 @@ packages: - engine/sdks/typescript/api-full - engine/sdks/typescript/envoy-client - engine/sdks/typescript/envoy-protocol - - engine/sdks/typescript/kv-channel-protocol - engine/sdks/typescript/test-envoy - engine/sdks/typescript/test-envoy-native - engine/sdks/typescript/test-runner diff --git a/rivetkit-typescript/packages/rivetkit/package.json b/rivetkit-typescript/packages/rivetkit/package.json index 742efed918..045224a8d2 100644 --- a/rivetkit-typescript/packages/rivetkit/package.json +++ b/rivetkit-typescript/packages/rivetkit/package.json @@ -327,7 +327,6 @@ "@hono/zod-openapi": "^1.1.5", "@rivetkit/bare-ts": "^0.6.2", "@rivetkit/engine-envoy-client": "workspace:*", - "@rivetkit/engine-kv-channel-protocol": "workspace:*", "@rivetkit/engine-runner": "workspace:*", "@rivetkit/fast-json-patch": "^3.1.2", "@rivetkit/on-change": "^6.0.2-rc.1", diff --git a/rivetkit-typescript/packages/rivetkit/src/actor-gateway/gateway.ts b/rivetkit-typescript/packages/rivetkit/src/actor-gateway/gateway.ts index d98f403eea..d313198607 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor-gateway/gateway.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor-gateway/gateway.ts @@ -148,11 +148,6 @@ export async function actorGateway( return next(); } - // Skip KV channel routes - handled by the dedicated KV channel endpoint - if (c.req.path.endsWith("/kv/connect")) { - return next(); - } - // Strip basePath from the request path let strippedPath = c.req.path; if ( diff --git a/rivetkit-typescript/packages/rivetkit/src/engine-client/driver.ts b/rivetkit-typescript/packages/rivetkit/src/engine-client/driver.ts index c6b5975c13..b6ebe34950 100644 --- a/rivetkit-typescript/packages/rivetkit/src/engine-client/driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/engine-client/driver.ts @@ -44,7 +44,10 @@ export interface EngineControlClient { modifyRuntimeRouter?: (config: RegistryConfig, router: Hono) => void; setGetUpgradeWebSocket(getUpgradeWebSocket: GetUpgradeWebSocket): void; shutdown?(): void; - setKvChannelShutdown?(fn: () => void): void; + + /** + * Test-only helper that simulates an abrupt actor crash. + */ hardCrashActor?(actorId: string): Promise; setNativeSqliteConfig?(config: NativeSqliteConfig): void; kvGet(actorId: string, key: Uint8Array): Promise; diff --git a/rivetkit-typescript/packages/rivetkit/src/runtime-router/kv-channel.ts b/rivetkit-typescript/packages/rivetkit/src/runtime-router/kv-channel.ts deleted file mode 100644 index 8b330ebbe9..0000000000 --- a/rivetkit-typescript/packages/rivetkit/src/runtime-router/kv-channel.ts +++ /dev/null @@ -1,709 +0,0 @@ -// KV Channel WebSocket handler for the local runtime server. -// -// Serves the /kv/connect endpoint that the native SQLite addon -// (rivetkit-typescript/packages/sqlite-native/) connects to for -// KV-backed database I/O. See docs-internal/engine/NATIVE_SQLITE_DATA_CHANNEL.md -// for the full specification. - -import type { WSContext } from "hono/ws"; -import { - PROTOCOL_VERSION, - type ToServer as ToRivet, - type ToClient, - type RequestData, - type ResponseData, - type ToServerRequest as ToRivetRequest, - decodeToServer as decodeToRivet, - encodeToClient, -} from "@rivetkit/engine-kv-channel-protocol"; -import { KvStorageQuotaExceededError } from "./kv-limits"; -import type { EngineControlClient } from "@/engine-client/driver"; -import { logger } from "./log"; - -// Ping every 3 seconds, close if no pong within 15 seconds. -// Matches runner protocol defaults (runner_update_ping_interval_ms=3000, -// runner_ping_timeout_ms=15000 in engine/packages/config/src/config/pegboard.rs). -const PING_INTERVAL_MS = 3_000; -const PONG_TIMEOUT_MS = 15_000; - -// Maximum actors a single connection can open. Prevents unbounded memory growth. -const MAX_ACTORS_PER_CONNECTION = 1_000; - -// Sweep interval for removing stale lock entries from dead connections. -const STALE_LOCK_SWEEP_INTERVAL_MS = 60_000; - -/** Per-connection state for the KV channel WebSocket. */ -interface KvChannelConnection { - /** Actor IDs locked by this connection. */ - openActors: Set; - - /** Timer for sending pings. */ - pingInterval: ReturnType | null; - - /** Timer for detecting pong timeout. */ - pongTimeout: ReturnType | null; - - /** Timestamp of the last pong received. */ - lastPongTs: number; - - /** Whether the connection has been closed. */ - closed: boolean; - - /** Reference to the WebSocket context for sending messages. */ - ws: WSContext | null; - - /** Per-actor request queues for sequential execution. */ - actorQueues: Map>; -} - -/** Instance-scoped state for a KV channel manager. */ -interface KvChannelManagerState { - actorLocks: Map; - activeConnections: Set; - staleLockSweepTimer: ReturnType | null; -} - -/** Return type of createKvChannelManager. */ -export interface KvChannelManager { - createHandler: (engineClient: EngineControlClient) => { - onOpen: (event: any, ws: WSContext) => void; - onMessage: (event: any, ws: WSContext) => void; - onClose: (event: any, ws: WSContext) => void; - onError: (error: any, ws: WSContext) => void; - }; - shutdown: () => void; - _testForceCloseAllKvChannels: () => number; -} - -/** - * Create an instance-scoped KV channel manager. - * - * All lock state and timers are scoped to the returned object, so multiple - * manager instances in the same process (e.g., tests) do not share state. - */ -export function createKvChannelManager(): KvChannelManager { - const state: KvChannelManagerState = { - actorLocks: new Map(), - activeConnections: new Set(), - staleLockSweepTimer: null, - }; - - return { - createHandler(engineClient: EngineControlClient) { - const conn: KvChannelConnection = { - openActors: new Set(), - pingInterval: null, - pongTimeout: null, - lastPongTs: Date.now(), - closed: false, - ws: null, - actorQueues: new Map(), - }; - - state.activeConnections.add(conn); - - return { - onOpen: (_event: any, ws: WSContext) => { - logger().debug({ msg: "kv channel websocket opened" }); - conn.ws = ws; - startPingPong(state, conn); - }, - - onMessage: (event: any, _ws: WSContext) => { - try { - let bytes: Uint8Array; - if (event.data instanceof ArrayBuffer) { - bytes = new Uint8Array(event.data); - } else if (event.data instanceof Uint8Array) { - bytes = event.data; - } else if (Buffer.isBuffer(event.data)) { - bytes = new Uint8Array(event.data); - } else { - logger().warn({ - msg: "kv channel received non-binary message, ignoring", - }); - return; - } - - const msg = decodeToRivet(bytes); - handleToRivetMessage(state, conn, engineClient, msg); - } catch (err: unknown) { - logger().error({ - msg: "kv channel failed to decode message", - error: - err instanceof Error - ? err.message - : String(err), - }); - } - }, - - onClose: (_event: any, _ws: WSContext) => { - logger().debug({ msg: "kv channel websocket closed" }); - cleanupConnection(state, conn); - }, - - onError: (error: any, _ws: WSContext) => { - logger().error({ - msg: "kv channel websocket error", - error: - error instanceof Error - ? error.message - : String(error), - }); - cleanupConnection(state, conn); - }, - }; - }, - - shutdown() { - if (state.staleLockSweepTimer) { - clearInterval(state.staleLockSweepTimer); - state.staleLockSweepTimer = null; - } - state.actorLocks.clear(); - state.activeConnections.clear(); - }, - - _testForceCloseAllKvChannels() { - let closed = 0; - for (const conn of state.activeConnections) { - if (!conn.closed && conn.ws) { - const ws = conn.ws; - cleanupConnection(state, conn); - ws.close(1001, "test force disconnect"); - closed++; - } - } - return closed; - }, - }; -} - -function makeErrorResponse( - requestId: number, - code: string, - message: string, -): ToClient { - return { - tag: "ToClientResponse", - val: { - requestId, - data: { - tag: "ErrorResponse", - val: { code, message }, - }, - }, - }; -} - -function makeResponse(requestId: number, data: ResponseData): ToClient { - return { - tag: "ToClientResponse", - val: { requestId, data }, - }; -} - -function sendMessage(conn: KvChannelConnection, msg: ToClient): void { - if (conn.closed || !conn.ws) return; - const bytes = encodeToClient(msg); - // Copy to a fresh ArrayBuffer to satisfy WSContext.send() parameter type. - const copy = new ArrayBuffer(bytes.byteLength); - new Uint8Array(copy).set(bytes); - conn.ws.send(copy); -} - -function startPingPong( - state: KvChannelManagerState, - conn: KvChannelConnection, -): void { - conn.lastPongTs = Date.now(); - - conn.pingInterval = setInterval(() => { - if (conn.closed || !conn.ws) return; - - const ts = BigInt(Date.now()); - sendMessage(conn, { - tag: "ToClientPing", - val: { ts }, - }); - - // Check if the last pong was too long ago. - if (Date.now() - conn.lastPongTs > PONG_TIMEOUT_MS) { - logger().warn({ - msg: "kv channel pong timeout, closing connection", - }); - // Capture ws before cleanup nulls it. - const ws = conn.ws; - cleanupConnection(state, conn); - if (ws) { - ws.close(1000, "pong timeout"); - } - } - }, PING_INTERVAL_MS); -} - -function cleanupConnection( - state: KvChannelManagerState, - conn: KvChannelConnection, -): void { - conn.closed = true; - conn.ws = null; - state.activeConnections.delete(conn); - - if (conn.pingInterval) { - clearInterval(conn.pingInterval); - conn.pingInterval = null; - } - if (conn.pongTimeout) { - clearTimeout(conn.pongTimeout); - conn.pongTimeout = null; - } - - // Release all actor locks held by this connection. - for (const actorId of conn.openActors) { - if (state.actorLocks.get(actorId) === conn) { - state.actorLocks.delete(actorId); - } - } - conn.openActors.clear(); -} - -async function handleRequest( - state: KvChannelManagerState, - conn: KvChannelConnection, - engineClient: EngineControlClient, - request: ToRivetRequest, -): Promise { - const { requestId, actorId, data } = request; - - try { - const responseData = await processRequestData( - state, - conn, - engineClient, - actorId, - data, - ); - sendMessage(conn, makeResponse(requestId, responseData)); - } catch (err: unknown) { - // Log the full error server-side but return a generic message to the - // client to avoid leaking internal details. Specific known error codes - // (actor_not_open, actor_locked, storage_quota_exceeded, etc.) are - // returned as structured responses before reaching this catch block. - logger().error({ - msg: "kv channel request error", - requestId, - actorId, - error: err instanceof Error ? err.message : String(err), - }); - sendMessage( - conn, - makeErrorResponse(requestId, "internal_error", "internal error"), - ); - } -} - -// Defense-in-depth: in the engine KV channel, resolve_actor verifies the actor -// belongs to the authenticated namespace. The local dev manager is -// single-namespace, so all actors implicitly belong to the same namespace and -// no cross-namespace access is possible. If a less-privileged auth mechanism is -// introduced for the dev manager, namespace verification should be added here. -async function processRequestData( - state: KvChannelManagerState, - conn: KvChannelConnection, - engineClient: EngineControlClient, - actorId: string, - data: RequestData, -): Promise { - switch (data.tag) { - case "ActorOpenRequest": - return handleActorOpen(state, conn, actorId); - - case "ActorCloseRequest": - return handleActorClose(state, conn, actorId); - - case "KvGetRequest": - case "KvPutRequest": - case "KvDeleteRequest": - case "KvDeleteRangeRequest": { - // All KV operations require the actor to be open on this connection. - const lockHolder = state.actorLocks.get(actorId); - if (!lockHolder || lockHolder !== conn) { - if (lockHolder && lockHolder !== conn) { - return { - tag: "ErrorResponse", - val: { - code: "actor_locked", - message: `actor ${actorId} is locked by another connection`, - }, - }; - } - return { - tag: "ErrorResponse", - val: { - code: "actor_not_open", - message: `actor ${actorId} is not open on this connection`, - }, - }; - } - return await handleKvOperation(engineClient, actorId, data); - } - } -} - -function handleActorOpen( - state: KvChannelManagerState, - conn: KvChannelConnection, - actorId: string, -): ResponseData { - // Reject if this connection already has too many actors open. - if (conn.openActors.size >= MAX_ACTORS_PER_CONNECTION) { - return { - tag: "ErrorResponse", - val: { - code: "too_many_actors", - message: `connection has too many open actors (max ${MAX_ACTORS_PER_CONNECTION})`, - }, - }; - } - - const existingLock = state.actorLocks.get(actorId); - if (existingLock && existingLock !== conn) { - // Unconditionally evict the old connection's lock. The old connection - // is either dead (network issue) or stale (same process reconnecting). - // Remove the actor from the old connection's openActors so its next KV - // request fails the fast-path check immediately with actor_not_open. - existingLock.openActors.delete(actorId); - logger().info({ - msg: "kv channel evicting actor lock from old connection", - actorId, - }); - } - - state.actorLocks.set(actorId, conn); - conn.openActors.add(actorId); - - // Start the stale lock sweep if not already running. - ensureStaleLockSweep(state); - - return { tag: "ActorOpenResponse", val: null }; -} - -function handleActorClose( - state: KvChannelManagerState, - conn: KvChannelConnection, - actorId: string, -): ResponseData { - if (state.actorLocks.get(actorId) === conn) { - state.actorLocks.delete(actorId); - } - conn.openActors.delete(actorId); - - return { tag: "ActorCloseResponse", val: null }; -} - -/** Start the stale lock sweep if not already running. */ -function ensureStaleLockSweep(state: KvChannelManagerState): void { - if (state.staleLockSweepTimer) return; - state.staleLockSweepTimer = setInterval(() => { - let removed = 0; - for (const [actorId, conn] of state.actorLocks) { - if (conn.closed) { - state.actorLocks.delete(actorId); - removed++; - } - } - if (removed > 0) { - logger().debug({ - msg: "kv channel stale lock sweep completed", - removedCount: removed, - remainingCount: state.actorLocks.size, - }); - } - // Stop the sweep if there are no more lock entries. - if (state.actorLocks.size === 0 && state.staleLockSweepTimer) { - clearInterval(state.staleLockSweepTimer); - state.staleLockSweepTimer = null; - } - }, STALE_LOCK_SWEEP_INTERVAL_MS); - // Allow the process to exit even if the sweep timer is still running. - state.staleLockSweepTimer.unref?.(); -} - -type KvRequestData = Extract< - RequestData, - | { readonly tag: "KvGetRequest" } - | { readonly tag: "KvPutRequest" } - | { readonly tag: "KvDeleteRequest" } - | { readonly tag: "KvDeleteRangeRequest" } ->; - -async function handleKvOperation( - engineClient: EngineControlClient, - actorId: string, - data: KvRequestData, -): Promise { - switch (data.tag) { - case "KvGetRequest": { - const keys = data.val.keys.map( - (k) => new Uint8Array(k), - ); - - // Validate key count. - if (keys.length > 128) { - return { - tag: "ErrorResponse", - val: { - code: "batch_too_large", - message: "a maximum of 128 keys is allowed", - }, - }; - } - - // Validate individual key sizes. - for (const key of keys) { - if (key.byteLength + 2 > 2048) { - return { - tag: "ErrorResponse", - val: { - code: "key_too_large", - message: "key is too long (max 2048 bytes)", - }, - }; - } - } - - const results = await engineClient.kvBatchGet(actorId, keys); - - // Return only found keys and values. - const foundKeys: ArrayBuffer[] = []; - const foundValues: ArrayBuffer[] = []; - for (let i = 0; i < keys.length; i++) { - const val = results[i]; - if (val !== null) { - foundKeys.push(new Uint8Array(keys[i]).buffer as ArrayBuffer); - foundValues.push(new Uint8Array(val).buffer as ArrayBuffer); - } - } - - return { - tag: "KvGetResponse", - val: { keys: foundKeys, values: foundValues }, - }; - } - - case "KvPutRequest": { - const keys = data.val.keys.map( - (k) => new Uint8Array(k), - ); - const values = data.val.values.map( - (v) => new Uint8Array(v), - ); - - if (keys.length !== values.length) { - return { - tag: "ErrorResponse", - val: { - code: "keys_values_length_mismatch", - message: - "keys and values arrays must have the same length", - }, - }; - } - - if (keys.length > 128) { - return { - tag: "ErrorResponse", - val: { - code: "batch_too_large", - message: - "a maximum of 128 key-value entries is allowed", - }, - }; - } - - // Validate sizes. - let payloadSize = 0; - for (let i = 0; i < keys.length; i++) { - if (keys[i].byteLength + 2 > 2048) { - return { - tag: "ErrorResponse", - val: { - code: "key_too_large", - message: "key is too long (max 2048 bytes)", - }, - }; - } - if (values[i].byteLength > 128 * 1024) { - return { - tag: "ErrorResponse", - val: { - code: "value_too_large", - message: "value is too large (max 128 KiB)", - }, - }; - } - payloadSize += - keys[i].byteLength + 2 + values[i].byteLength; - } - - if (payloadSize > 976 * 1024) { - return { - tag: "ErrorResponse", - val: { - code: "payload_too_large", - message: - "total payload is too large (max 976 KiB)", - }, - }; - } - - const entries: [Uint8Array, Uint8Array][] = keys.map( - (k, i) => [k, values[i]], - ); - - try { - await engineClient.kvBatchPut(actorId, entries); - } catch (err: unknown) { - if (err instanceof KvStorageQuotaExceededError) { - return { - tag: "ErrorResponse", - val: { - code: "storage_quota_exceeded", - message: err.message, - }, - }; - } - throw err; - } - - return { tag: "KvPutResponse", val: null }; - } - - case "KvDeleteRequest": { - const keys = data.val.keys.map( - (k) => new Uint8Array(k), - ); - - if (keys.length > 128) { - return { - tag: "ErrorResponse", - val: { - code: "batch_too_large", - message: "a maximum of 128 keys is allowed", - }, - }; - } - - for (const key of keys) { - if (key.byteLength + 2 > 2048) { - return { - tag: "ErrorResponse", - val: { - code: "key_too_large", - message: "key is too long (max 2048 bytes)", - }, - }; - } - } - - await engineClient.kvBatchDelete(actorId, keys); - - return { tag: "KvDeleteResponse", val: null }; - } - - case "KvDeleteRangeRequest": { - const start = new Uint8Array(data.val.start); - const end = new Uint8Array(data.val.end); - - if (start.byteLength + 2 > 2048) { - return { - tag: "ErrorResponse", - val: { - code: "key_too_large", - message: "start key is too long (max 2048 bytes)", - }, - }; - } - if (end.byteLength + 2 > 2048) { - return { - tag: "ErrorResponse", - val: { - code: "key_too_large", - message: "end key is too long (max 2048 bytes)", - }, - }; - } - - await engineClient.kvDeleteRange(actorId, start, end); - - return { tag: "KvDeleteResponse", val: null }; - } - - default: { - // Should never happen since processRequestData routes only KV tags here. - const _exhaustive: never = data; - throw new Error(`unexpected request tag`); - } - } -} - -function handleToRivetMessage( - state: KvChannelManagerState, - conn: KvChannelConnection, - engineClient: EngineControlClient, - msg: ToRivet, -): void { - switch (msg.tag) { - case "ToServerRequest": { - const { actorId } = msg.val; - - // Chain requests per actor so they execute sequentially, - // preventing journal write ordering violations. Cross-actor - // requests still execute concurrently since each actor has its - // own queue. See docs-internal/engine/NATIVE_SQLITE_REVIEW_FIXES.md H2. - const prev = conn.actorQueues.get(actorId) ?? Promise.resolve(); - const next = prev.then(() => - handleRequest(state, conn, engineClient, msg.val).catch( - (err) => { - logger().error({ - msg: "unhandled error in kv channel request handler", - error: - err instanceof Error - ? err.message - : String(err), - }); - }, - ), - ); - conn.actorQueues.set(actorId, next); - - // Clean up the queue entry once it settles to avoid unbounded map growth. - next.then(() => { - if (conn.actorQueues.get(actorId) === next) { - conn.actorQueues.delete(actorId); - } - }); - break; - } - - case "ToServerPong": - conn.lastPongTs = Date.now(); - break; - } -} - -/** Validate the protocol version query parameter. Returns an error string or null. */ -export function validateProtocolVersion( - protocolVersion: string | undefined, -): string | null { - if (!protocolVersion) { - return "missing protocol_version query parameter"; - } - const version = Number.parseInt(protocolVersion, 10); - if (Number.isNaN(version) || version !== PROTOCOL_VERSION) { - return `unsupported protocol_version: ${protocolVersion} (server supports ${PROTOCOL_VERSION})`; - } - return null; -} diff --git a/rivetkit-typescript/packages/rivetkit/src/runtime-router/router.ts b/rivetkit-typescript/packages/rivetkit/src/runtime-router/router.ts index 01e1d90fee..65f93cc9a5 100644 --- a/rivetkit-typescript/packages/rivetkit/src/runtime-router/router.ts +++ b/rivetkit-typescript/packages/rivetkit/src/runtime-router/router.ts @@ -47,10 +47,6 @@ import { actorGateway, createTestWebSocketProxy, } from "@/actor-gateway/gateway"; -import { - createKvChannelManager, - validateProtocolVersion, -} from "./kv-channel"; import { logger } from "./log"; export function buildRuntimeRouter( @@ -59,12 +55,6 @@ export function buildRuntimeRouter( getUpgradeWebSocket: GetUpgradeWebSocket | undefined, runtime: Runtime = "node", ) { - const kvChannelManager = createKvChannelManager(); - - // Inject the KV channel shutdown into the driver so it can be - // called during the driver's teardown, after all actors have stopped. - engineClient.setKvChannelShutdown?.(kvChannelManager.shutdown); - return createRouter(config.managerBasePath, (router) => { // Actor gateway router.use( @@ -364,53 +354,6 @@ export function buildRuntimeRouter( }); } - // GET /kv/connect - KV channel WebSocket endpoint for native SQLite - router.get("/kv/connect", async (c) => { - // Validate authentication. - if (isDev() && !config.token) { - logger().warn({ - msg: "RIVET_TOKEN is not set, skipping KV channel auth in development mode", - }); - } else { - const token = c.req.query("token"); - if (!config.token) { - return c.text("KV channel requires RIVET_TOKEN to be set", 403); - } - if ( - !token || - timingSafeEqual(config.token, token) === false - ) { - return c.json( - { - error: { - code: "unauthorized", - message: "invalid or missing authentication token", - }, - }, - 401, - ); - } - } - - // Validate protocol version. - const versionError = validateProtocolVersion( - c.req.query("protocol_version"), - ); - if (versionError) { - return c.text(versionError, 400); - } - - // Upgrade to WebSocket. - const upgradeWebSocket = getUpgradeWebSocket?.(); - if (!upgradeWebSocket) { - return c.text("WebSocket upgrades not supported on this platform", 500); - } - - return upgradeWebSocket(() => - kvChannelManager.createHandler(engineClient), - )(c, noopNext()); - }); - // TODO: // // DELETE /actors/{actor_id} // { @@ -483,12 +426,6 @@ export function buildRuntimeRouter( } }); - // Force-close all KV channel WebSocket connections. Used by - // stress tests to simulate network failures mid-operation. - router.post("/.test/kv-channel/force-disconnect", async (c) => { - const closed = kvChannelManager._testForceCloseAllKvChannels(); - return c.json({ closed }); - }); } if (config.inspector.enabled) {