From 148d187ff1fc33848c7b77fbd3067db1bdc92fbb Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Mon, 4 May 2026 17:44:27 -0700 Subject: [PATCH] fix(rivetkit): fix empty state [slopfix] --- Cargo.lock | 1 + engine/packages/guard/src/lib.rs | 6 ----- engine/packages/pools/Cargo.toml | 1 + engine/packages/pools/src/pools.rs | 6 +++++ .../rivetkit-core/src/actor/connection.rs | 26 ++++++++++++++++-- .../packages/rivetkit-core/src/actor/task.rs | 27 +++++++++++++++---- .../rivetkit-napi/src/napi_actor_events.rs | 10 ++++++- 7 files changed, 63 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2f99fef242..aeef286bff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5690,6 +5690,7 @@ dependencies = [ "rivet-config", "rivet-metrics", "rivet-util", + "rustls 0.23.29", "serde", "tempfile", "thiserror 1.0.69", diff --git a/engine/packages/guard/src/lib.rs b/engine/packages/guard/src/lib.rs index c67d3712a1..0289999ac0 100644 --- a/engine/packages/guard/src/lib.rs +++ b/engine/packages/guard/src/lib.rs @@ -21,12 +21,6 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R Id::new_v1(config.dc_label()), )?; - // Initialize with a default CryptoProvider for rustls - let provider = rustls::crypto::ring::default_provider(); - if provider.install_default().is_err() { - tracing::debug!("crypto provider already installed in this process"); - } - // Share shared context let shared_state = shared_state::SharedState::new(&config, ctx.ups()?); shared_state.start().await?; diff --git a/engine/packages/pools/Cargo.toml b/engine/packages/pools/Cargo.toml index 93ca668011..3a03a06fdc 100644 --- a/engine/packages/pools/Cargo.toml +++ b/engine/packages/pools/Cargo.toml @@ -18,6 +18,7 @@ reqwest.workspace = true rivet-config.workspace = true rivet-metrics.workspace = true rivet-util.workspace = true +rustls.workspace = true serde.workspace = true tempfile.workspace = true thiserror.workspace = true diff --git a/engine/packages/pools/src/pools.rs b/engine/packages/pools/src/pools.rs index f677784143..baafd9caec 100644 --- a/engine/packages/pools/src/pools.rs +++ b/engine/packages/pools/src/pools.rs @@ -26,6 +26,12 @@ impl Pools { let token = CancellationToken::new(); let node_id = NodeId::new(); + // Initialize with a default CryptoProvider for rustls + let provider = rustls::crypto::ring::default_provider(); + if provider.install_default().is_err() { + tracing::debug!("crypto provider already installed in this process"); + } + let (ups, udb) = tokio::try_join!( crate::db::ups::setup(&config, client_name), crate::db::udb::setup(&config), diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/connection.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/connection.rs index 617f25ed86..392b275587 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/connection.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/connection.rs @@ -156,6 +156,7 @@ struct ConnHandleInner { state: RwLock>, is_hibernatable: bool, dirty: AtomicBool, + state_initialized: AtomicBool, subscriptions: RwLock>, hibernation: RwLock>, state_change_handler: RwLock>, @@ -177,6 +178,7 @@ impl ConnHandle { state: RwLock::new(state), is_hibernatable, dirty: AtomicBool::new(false), + state_initialized: AtomicBool::new(false), subscriptions: RwLock::new(BTreeSet::new()), hibernation: RwLock::new(None), state_change_handler: RwLock::new(None), @@ -209,11 +211,16 @@ impl ConnHandle { fn set_state_inner(&self, state: Vec, mark_dirty: bool) { *self.0.state.write() = state; + self.0.state_initialized.store(true, Ordering::SeqCst); if mark_dirty { self.mark_hibernation_dirty(); } } + pub fn state_initialized(&self) -> bool { + self.0.state_initialized.load(Ordering::SeqCst) + } + fn mark_hibernation_dirty(&self) { if !self.is_hibernatable() { return; @@ -343,6 +350,7 @@ impl ConnHandle { persisted.state, true, ); + conn.0.state_initialized.store(true, Ordering::SeqCst); conn.configure_hibernation(Some(HibernatableConnectionMetadata { gateway_id: persisted.gateway_id, request_id: persisted.request_id, @@ -851,8 +859,22 @@ impl ActorContext { }; conn.clear_subscriptions(); - self.try_send_actor_event(ActorEvent::ConnectionClosed { conn }, "connection_closed") - .with_context(|| disconnect_message(conn_id, reason.as_deref()))?; + // Skip the user `onDisconnect` event for connections that never had + // their state populated (i.e. the transport disconnected before + // `createConnState` ran). The user-visible `conn.state` would be + // empty bytes, surfacing as `undefined` in foreign runtimes and + // breaking user code that assumes a fully-initialized connection. + if !conn.state_initialized() { + tracing::debug!( + actor_id = %self.actor_id(), + conn_id, + reason = ?reason.as_deref(), + "connection_closed event skipped because state was never initialized" + ); + } else { + self.try_send_actor_event(ActorEvent::ConnectionClosed { conn }, "connection_closed") + .with_context(|| disconnect_message(conn_id, reason.as_deref()))?; + } self.record_connections_updated(); self.reset_sleep_timer(); diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs index f76abf22b2..4f58ab9363 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs @@ -1205,11 +1205,20 @@ impl ActorTask { let is_new = !persisted.actor.has_initialized; self.ctx.load_persisted_actor(persisted.actor); self.ctx.load_last_pushed_alarm(persisted.last_pushed_alarm); - self.ctx.set_has_initialized(true); - self.ctx - .persist_state(SaveStateOpts { immediate: true }) - .await - .context("persist actor initialization")?; + // Do not set has_initialized=true here for new actors. Flipping the + // flag and persisting before the runtime preamble has populated state + // (via createState or equivalent) leaves a window where a crash + // produces a persisted record with has_initialized=true and an empty + // state, after which the next wake skips state initialization and + // users observe `c.state === undefined` in createVars/onWake. The + // flag is now flipped after spawn_run_handle completes (see below). + if !is_new { + self.ctx.set_has_initialized(true); + self.ctx + .persist_state(SaveStateOpts { immediate: true }) + .await + .context("persist actor initialization")?; + } let init_inspector_token_started_at = Instant::now(); crate::inspector::auth::init_inspector_token_with_preload( &self.ctx, @@ -1234,6 +1243,14 @@ impl ActorTask { self.transition_to(LifecycleState::Started); self.spawn_run_handle(is_new).await?; if is_new { + // Flag the actor as initialized only after the runtime preamble + // has populated state. For NAPI this is no-op because the JS + // preamble already calls set_has_initialized via + // mark_has_initialized_and_flush, but for runtimes whose run + // handle does not signal manual startup_ready, this is the + // authoritative point at which a fresh actor becomes "started" + // in KV. + self.ctx.set_has_initialized(true); self.ctx .persist_state(SaveStateOpts { immediate: true }) .await diff --git a/rivetkit-typescript/packages/rivetkit-napi/src/napi_actor_events.rs b/rivetkit-typescript/packages/rivetkit-napi/src/napi_actor_events.rs index 74fd941ccf..822131bd7d 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/src/napi_actor_events.rs +++ b/rivetkit-typescript/packages/rivetkit-napi/src/napi_actor_events.rs @@ -209,6 +209,12 @@ async fn run_preamble( snapshot: Option>, hibernated: Vec<(rivetkit_core::ConnHandle, Vec)>, ) -> Result { + // Recover from actors persisted with `has_initialized=true` but an empty + // state snapshot, which can happen if an earlier process crashed between + // the core's startup persist and the JS preamble populating state. Treat + // an empty snapshot as a new actor so createState reruns instead of + // leaving `c.state` as undefined for downstream user code. + let snapshot = snapshot.filter(|bytes| !bytes.is_empty()); let is_new = snapshot.is_none(); // Run database migrations before any user lifecycle hook so `c.db` is @@ -645,7 +651,9 @@ pub(crate) async fn dispatch_event( let conn = { ctx.inner().conns().find(|conn| conn.id() == conn_id) }; if let Some(conn) = conn { if let Some(callback) = callback { - call_on_disconnect_final(&callback, &ctx, conn.clone()).await?; + if conn.state_initialized() { + call_on_disconnect_final(&callback, &ctx, conn.clone()).await?; + } } ctx.inner().disconnect_conn(conn_id).await?; }