Skip to content

Commit 148d187

Browse files
committed
fix(rivetkit): fix empty state [slopfix]
1 parent 1d1ad71 commit 148d187

7 files changed

Lines changed: 63 additions & 14 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/guard/src/lib.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,6 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R
2121
Id::new_v1(config.dc_label()),
2222
)?;
2323

24-
// Initialize with a default CryptoProvider for rustls
25-
let provider = rustls::crypto::ring::default_provider();
26-
if provider.install_default().is_err() {
27-
tracing::debug!("crypto provider already installed in this process");
28-
}
29-
3024
// Share shared context
3125
let shared_state = shared_state::SharedState::new(&config, ctx.ups()?);
3226
shared_state.start().await?;

engine/packages/pools/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ reqwest.workspace = true
1818
rivet-config.workspace = true
1919
rivet-metrics.workspace = true
2020
rivet-util.workspace = true
21+
rustls.workspace = true
2122
serde.workspace = true
2223
tempfile.workspace = true
2324
thiserror.workspace = true

engine/packages/pools/src/pools.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ impl Pools {
2626
let token = CancellationToken::new();
2727
let node_id = NodeId::new();
2828

29+
// Initialize with a default CryptoProvider for rustls
30+
let provider = rustls::crypto::ring::default_provider();
31+
if provider.install_default().is_err() {
32+
tracing::debug!("crypto provider already installed in this process");
33+
}
34+
2935
let (ups, udb) = tokio::try_join!(
3036
crate::db::ups::setup(&config, client_name),
3137
crate::db::udb::setup(&config),

rivetkit-rust/packages/rivetkit-core/src/actor/connection.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ struct ConnHandleInner {
156156
state: RwLock<Vec<u8>>,
157157
is_hibernatable: bool,
158158
dirty: AtomicBool,
159+
state_initialized: AtomicBool,
159160
subscriptions: RwLock<BTreeSet<String>>,
160161
hibernation: RwLock<Option<HibernatableConnectionMetadata>>,
161162
state_change_handler: RwLock<Option<StateChangeCallback>>,
@@ -177,6 +178,7 @@ impl ConnHandle {
177178
state: RwLock::new(state),
178179
is_hibernatable,
179180
dirty: AtomicBool::new(false),
181+
state_initialized: AtomicBool::new(false),
180182
subscriptions: RwLock::new(BTreeSet::new()),
181183
hibernation: RwLock::new(None),
182184
state_change_handler: RwLock::new(None),
@@ -209,11 +211,16 @@ impl ConnHandle {
209211

210212
fn set_state_inner(&self, state: Vec<u8>, mark_dirty: bool) {
211213
*self.0.state.write() = state;
214+
self.0.state_initialized.store(true, Ordering::SeqCst);
212215
if mark_dirty {
213216
self.mark_hibernation_dirty();
214217
}
215218
}
216219

220+
pub fn state_initialized(&self) -> bool {
221+
self.0.state_initialized.load(Ordering::SeqCst)
222+
}
223+
217224
fn mark_hibernation_dirty(&self) {
218225
if !self.is_hibernatable() {
219226
return;
@@ -343,6 +350,7 @@ impl ConnHandle {
343350
persisted.state,
344351
true,
345352
);
353+
conn.0.state_initialized.store(true, Ordering::SeqCst);
346354
conn.configure_hibernation(Some(HibernatableConnectionMetadata {
347355
gateway_id: persisted.gateway_id,
348356
request_id: persisted.request_id,
@@ -851,8 +859,22 @@ impl ActorContext {
851859
};
852860
conn.clear_subscriptions();
853861

854-
self.try_send_actor_event(ActorEvent::ConnectionClosed { conn }, "connection_closed")
855-
.with_context(|| disconnect_message(conn_id, reason.as_deref()))?;
862+
// Skip the user `onDisconnect` event for connections that never had
863+
// their state populated (i.e. the transport disconnected before
864+
// `createConnState` ran). The user-visible `conn.state` would be
865+
// empty bytes, surfacing as `undefined` in foreign runtimes and
866+
// breaking user code that assumes a fully-initialized connection.
867+
if !conn.state_initialized() {
868+
tracing::debug!(
869+
actor_id = %self.actor_id(),
870+
conn_id,
871+
reason = ?reason.as_deref(),
872+
"connection_closed event skipped because state was never initialized"
873+
);
874+
} else {
875+
self.try_send_actor_event(ActorEvent::ConnectionClosed { conn }, "connection_closed")
876+
.with_context(|| disconnect_message(conn_id, reason.as_deref()))?;
877+
}
856878

857879
self.record_connections_updated();
858880
self.reset_sleep_timer();

rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1205,11 +1205,20 @@ impl ActorTask {
12051205
let is_new = !persisted.actor.has_initialized;
12061206
self.ctx.load_persisted_actor(persisted.actor);
12071207
self.ctx.load_last_pushed_alarm(persisted.last_pushed_alarm);
1208-
self.ctx.set_has_initialized(true);
1209-
self.ctx
1210-
.persist_state(SaveStateOpts { immediate: true })
1211-
.await
1212-
.context("persist actor initialization")?;
1208+
// Do not set has_initialized=true here for new actors. Flipping the
1209+
// flag and persisting before the runtime preamble has populated state
1210+
// (via createState or equivalent) leaves a window where a crash
1211+
// produces a persisted record with has_initialized=true and an empty
1212+
// state, after which the next wake skips state initialization and
1213+
// users observe `c.state === undefined` in createVars/onWake. The
1214+
// flag is now flipped after spawn_run_handle completes (see below).
1215+
if !is_new {
1216+
self.ctx.set_has_initialized(true);
1217+
self.ctx
1218+
.persist_state(SaveStateOpts { immediate: true })
1219+
.await
1220+
.context("persist actor initialization")?;
1221+
}
12131222
let init_inspector_token_started_at = Instant::now();
12141223
crate::inspector::auth::init_inspector_token_with_preload(
12151224
&self.ctx,
@@ -1234,6 +1243,14 @@ impl ActorTask {
12341243
self.transition_to(LifecycleState::Started);
12351244
self.spawn_run_handle(is_new).await?;
12361245
if is_new {
1246+
// Flag the actor as initialized only after the runtime preamble
1247+
// has populated state. For NAPI this is no-op because the JS
1248+
// preamble already calls set_has_initialized via
1249+
// mark_has_initialized_and_flush, but for runtimes whose run
1250+
// handle does not signal manual startup_ready, this is the
1251+
// authoritative point at which a fresh actor becomes "started"
1252+
// in KV.
1253+
self.ctx.set_has_initialized(true);
12371254
self.ctx
12381255
.persist_state(SaveStateOpts { immediate: true })
12391256
.await

rivetkit-typescript/packages/rivetkit-napi/src/napi_actor_events.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,12 @@ async fn run_preamble(
209209
snapshot: Option<Vec<u8>>,
210210
hibernated: Vec<(rivetkit_core::ConnHandle, Vec<u8>)>,
211211
) -> Result<RunHandlerSlot> {
212+
// Recover from actors persisted with `has_initialized=true` but an empty
213+
// state snapshot, which can happen if an earlier process crashed between
214+
// the core's startup persist and the JS preamble populating state. Treat
215+
// an empty snapshot as a new actor so createState reruns instead of
216+
// leaving `c.state` as undefined for downstream user code.
217+
let snapshot = snapshot.filter(|bytes| !bytes.is_empty());
212218
let is_new = snapshot.is_none();
213219

214220
// Run database migrations before any user lifecycle hook so `c.db` is
@@ -645,7 +651,9 @@ pub(crate) async fn dispatch_event(
645651
let conn = { ctx.inner().conns().find(|conn| conn.id() == conn_id) };
646652
if let Some(conn) = conn {
647653
if let Some(callback) = callback {
648-
call_on_disconnect_final(&callback, &ctx, conn.clone()).await?;
654+
if conn.state_initialized() {
655+
call_on_disconnect_final(&callback, &ctx, conn.clone()).await?;
656+
}
649657
}
650658
ctx.inner().disconnect_conn(conn_id).await?;
651659
}

0 commit comments

Comments
 (0)