Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 0 additions & 6 deletions engine/packages/guard/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R
Id::new_v1(config.dc_label()),
)?;

// Initialize with a default CryptoProvider for rustls
let provider = rustls::crypto::ring::default_provider();
if provider.install_default().is_err() {
tracing::debug!("crypto provider already installed in this process");
}

// Share shared context
let shared_state = shared_state::SharedState::new(&config, ctx.ups()?);
shared_state.start().await?;
Expand Down
1 change: 1 addition & 0 deletions engine/packages/pools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ reqwest.workspace = true
rivet-config.workspace = true
rivet-metrics.workspace = true
rivet-util.workspace = true
rustls.workspace = true
serde.workspace = true
tempfile.workspace = true
thiserror.workspace = true
Expand Down
6 changes: 6 additions & 0 deletions engine/packages/pools/src/pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ impl Pools {
let token = CancellationToken::new();
let node_id = NodeId::new();

// Initialize with a default CryptoProvider for rustls
let provider = rustls::crypto::ring::default_provider();
if provider.install_default().is_err() {
tracing::debug!("crypto provider already installed in this process");
}

let (ups, udb) = tokio::try_join!(
crate::db::ups::setup(&config, client_name),
crate::db::udb::setup(&config),
Expand Down
26 changes: 24 additions & 2 deletions rivetkit-rust/packages/rivetkit-core/src/actor/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ struct ConnHandleInner {
state: RwLock<Vec<u8>>,
is_hibernatable: bool,
dirty: AtomicBool,
state_initialized: AtomicBool,
subscriptions: RwLock<BTreeSet<String>>,
hibernation: RwLock<Option<HibernatableConnectionMetadata>>,
state_change_handler: RwLock<Option<StateChangeCallback>>,
Expand All @@ -177,6 +178,7 @@ impl ConnHandle {
state: RwLock::new(state),
is_hibernatable,
dirty: AtomicBool::new(false),
state_initialized: AtomicBool::new(false),
subscriptions: RwLock::new(BTreeSet::new()),
hibernation: RwLock::new(None),
state_change_handler: RwLock::new(None),
Expand Down Expand Up @@ -209,11 +211,16 @@ impl ConnHandle {

fn set_state_inner(&self, state: Vec<u8>, mark_dirty: bool) {
*self.0.state.write() = state;
self.0.state_initialized.store(true, Ordering::SeqCst);
if mark_dirty {
self.mark_hibernation_dirty();
}
}

pub fn state_initialized(&self) -> bool {
self.0.state_initialized.load(Ordering::SeqCst)
}

fn mark_hibernation_dirty(&self) {
if !self.is_hibernatable() {
return;
Expand Down Expand Up @@ -343,6 +350,7 @@ impl ConnHandle {
persisted.state,
true,
);
conn.0.state_initialized.store(true, Ordering::SeqCst);
conn.configure_hibernation(Some(HibernatableConnectionMetadata {
gateway_id: persisted.gateway_id,
request_id: persisted.request_id,
Expand Down Expand Up @@ -851,8 +859,22 @@ impl ActorContext {
};
conn.clear_subscriptions();

self.try_send_actor_event(ActorEvent::ConnectionClosed { conn }, "connection_closed")
.with_context(|| disconnect_message(conn_id, reason.as_deref()))?;
// Skip the user `onDisconnect` event for connections that never had
// their state populated (i.e. the transport disconnected before
// `createConnState` ran). The user-visible `conn.state` would be
// empty bytes, surfacing as `undefined` in foreign runtimes and
// breaking user code that assumes a fully-initialized connection.
if !conn.state_initialized() {
tracing::debug!(
actor_id = %self.actor_id(),
conn_id,
reason = ?reason.as_deref(),
"connection_closed event skipped because state was never initialized"
);
} else {
self.try_send_actor_event(ActorEvent::ConnectionClosed { conn }, "connection_closed")
.with_context(|| disconnect_message(conn_id, reason.as_deref()))?;
}

self.record_connections_updated();
self.reset_sleep_timer();
Expand Down
27 changes: 22 additions & 5 deletions rivetkit-rust/packages/rivetkit-core/src/actor/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1205,11 +1205,20 @@ impl ActorTask {
let is_new = !persisted.actor.has_initialized;
self.ctx.load_persisted_actor(persisted.actor);
self.ctx.load_last_pushed_alarm(persisted.last_pushed_alarm);
self.ctx.set_has_initialized(true);
self.ctx
.persist_state(SaveStateOpts { immediate: true })
.await
.context("persist actor initialization")?;
// Do not set has_initialized=true here for new actors. Flipping the
// flag and persisting before the runtime preamble has populated state
// (via createState or equivalent) leaves a window where a crash
// produces a persisted record with has_initialized=true and an empty
// state, after which the next wake skips state initialization and
// users observe `c.state === undefined` in createVars/onWake. The
// flag is now flipped after spawn_run_handle completes (see below).
if !is_new {
self.ctx.set_has_initialized(true);
self.ctx
.persist_state(SaveStateOpts { immediate: true })
.await
.context("persist actor initialization")?;
}
let init_inspector_token_started_at = Instant::now();
crate::inspector::auth::init_inspector_token_with_preload(
&self.ctx,
Expand All @@ -1234,6 +1243,14 @@ impl ActorTask {
self.transition_to(LifecycleState::Started);
self.spawn_run_handle(is_new).await?;
if is_new {
// Flag the actor as initialized only after the runtime preamble
// has populated state. For NAPI this is no-op because the JS
// preamble already calls set_has_initialized via
// mark_has_initialized_and_flush, but for runtimes whose run
// handle does not signal manual startup_ready, this is the
// authoritative point at which a fresh actor becomes "started"
// in KV.
self.ctx.set_has_initialized(true);
self.ctx
.persist_state(SaveStateOpts { immediate: true })
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@ async fn run_preamble(
snapshot: Option<Vec<u8>>,
hibernated: Vec<(rivetkit_core::ConnHandle, Vec<u8>)>,
) -> Result<RunHandlerSlot> {
// Recover from actors persisted with `has_initialized=true` but an empty
// state snapshot, which can happen if an earlier process crashed between
// the core's startup persist and the JS preamble populating state. Treat
// an empty snapshot as a new actor so createState reruns instead of
// leaving `c.state` as undefined for downstream user code.
let snapshot = snapshot.filter(|bytes| !bytes.is_empty());
let is_new = snapshot.is_none();

// Run database migrations before any user lifecycle hook so `c.db` is
Expand Down Expand Up @@ -645,7 +651,9 @@ pub(crate) async fn dispatch_event(
let conn = { ctx.inner().conns().find(|conn| conn.id() == conn_id) };
if let Some(conn) = conn {
if let Some(callback) = callback {
call_on_disconnect_final(&callback, &ctx, conn.clone()).await?;
if conn.state_initialized() {
call_on_disconnect_final(&callback, &ctx, conn.clone()).await?;
}
}
ctx.inner().disconnect_conn(conn_id).await?;
}
Expand Down
Loading