Skip to content

Commit de6ffdd

Browse files
committed
fix(rivetkit): defer initialized persist until state exists
1 parent e987ebe commit de6ffdd

4 files changed

Lines changed: 104 additions & 7 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1205,11 +1205,13 @@ impl ActorTask {
12051205
let is_new = !persisted.actor.has_initialized;
12061206
self.ctx.load_persisted_actor(persisted.actor);
12071207
self.ctx.load_last_pushed_alarm(persisted.last_pushed_alarm);
1208-
self.ctx.set_has_initialized(true);
1209-
self.ctx
1210-
.persist_state(SaveStateOpts { immediate: true })
1211-
.await
1212-
.context("persist actor initialization")?;
1208+
if !is_new || !self.factory.requires_manual_startup_ready() {
1209+
self.ctx.set_has_initialized(true);
1210+
self.ctx
1211+
.persist_state(SaveStateOpts { immediate: true })
1212+
.await
1213+
.context("persist actor initialization")?;
1214+
}
12131215
let init_inspector_token_started_at = Instant::now();
12141216
crate::inspector::auth::init_inspector_token_with_preload(
12151217
&self.ctx,
@@ -1234,6 +1236,9 @@ impl ActorTask {
12341236
self.transition_to(LifecycleState::Started);
12351237
self.spawn_run_handle(is_new).await?;
12361238
if is_new {
1239+
if !self.ctx.persisted_actor().has_initialized {
1240+
self.ctx.set_has_initialized(true);
1241+
}
12371242
self.ctx
12381243
.persist_state(SaveStateOpts { immediate: true })
12391244
.await

rivetkit-rust/packages/rivetkit-core/tests/task.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2113,6 +2113,73 @@ mod moved_tests {
21132113
assert!(ctx.persisted_actor().has_initialized);
21142114
}
21152115

2116+
#[tokio::test]
2117+
async fn manual_startup_does_not_mark_initialized_before_runtime_preamble() {
2118+
let kv = new_in_memory();
2119+
let ctx = new_with_kv(
2120+
"actor-manual-startup-init",
2121+
"task-manual-startup-init",
2122+
Vec::new(),
2123+
"local",
2124+
kv,
2125+
);
2126+
let (observed_tx, observed_rx) = oneshot::channel();
2127+
let observed_tx = Arc::new(Mutex::new(Some(observed_tx)));
2128+
let factory = Arc::new(ActorFactory::new_with_manual_startup_ready(
2129+
Default::default(),
2130+
move |mut start| {
2131+
let observed_tx = observed_tx.clone();
2132+
Box::pin(async move {
2133+
observed_tx
2134+
.lock()
2135+
.expect("observed lock poisoned")
2136+
.take()
2137+
.expect("observed sender should exist")
2138+
.send(start.ctx.persisted_actor().has_initialized)
2139+
.expect("observed sender should send");
2140+
start.ctx.set_state_initial(vec![4, 5, 6]);
2141+
start.ctx.set_has_initialized(true);
2142+
start
2143+
.startup_ready
2144+
.take()
2145+
.expect("manual runtime should receive startup ready sender")
2146+
.send(Ok(()))
2147+
.expect("startup ready receiver should exist");
2148+
2149+
while let Some(event) = start.events.recv().await {
2150+
match event {
2151+
ActorEvent::SerializeState { reply, .. } => {
2152+
reply.send(Ok(vec![StateDelta::ActorState(start.ctx.state())]));
2153+
}
2154+
ActorEvent::RunGracefulCleanup { reply, .. } => {
2155+
reply.send(Ok(()));
2156+
}
2157+
_ => {}
2158+
}
2159+
}
2160+
Ok(())
2161+
})
2162+
},
2163+
));
2164+
let mut task = new_task_with_factory(ctx.clone(), factory);
2165+
let (start_tx, start_rx) = oneshot::channel();
2166+
2167+
task.handle_lifecycle(LifecycleCommand::Start { reply: start_tx })
2168+
.await;
2169+
start_rx
2170+
.await
2171+
.expect("start reply should send")
2172+
.expect("start should succeed");
2173+
2174+
assert!(!observed_rx.await.expect("runtime should observe startup"));
2175+
assert!(ctx.persisted_actor().has_initialized);
2176+
assert_eq!(ctx.state(), vec![4, 5, 6]);
2177+
2178+
let run_handle = task.run_handle.take().expect("run handle should exist");
2179+
run_handle.abort();
2180+
let _ = run_handle.await;
2181+
}
2182+
21162183
#[tokio::test]
21172184
async fn startup_uses_preloaded_last_pushed_alarm_without_live_kv() {
21182185
let _env_guard = test_inspector_env_lock().lock().expect("env lock poisoned");

rivetkit-typescript/packages/rivetkit-napi/src/napi_actor_events.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ async fn run_preamble(
209209
snapshot: Option<Vec<u8>>,
210210
hibernated: Vec<(rivetkit_core::ConnHandle, Vec<u8>)>,
211211
) -> Result<RunHandlerSlot> {
212+
let snapshot = normalize_startup_snapshot(bindings.create_state.is_some(), snapshot);
212213
let is_new = snapshot.is_none();
213214

214215
// Run database migrations before any user lifecycle hook so `c.db` is
@@ -290,6 +291,16 @@ async fn run_preamble(
290291
Ok(run_handler)
291292
}
292293

294+
fn normalize_startup_snapshot(
295+
has_create_state: bool,
296+
snapshot: Option<Vec<u8>>,
297+
) -> Option<Vec<u8>> {
298+
match snapshot {
299+
Some(bytes) if bytes.is_empty() && has_create_state => None,
300+
other => other,
301+
}
302+
}
303+
293304
fn configure_run_handler(bindings: &CallbackBindings, ctx: &ActorContext) -> RunHandlerSlot {
294305
let run_handler = Arc::new(Mutex::new(None));
295306
let Some(callback) = bindings.run.as_ref().cloned() else {

rivetkit-typescript/packages/rivetkit-napi/tests/napi_actor_events.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,16 @@ mod moved_tests {
55
use std::sync::Arc as StdArc;
66
use std::time::Duration;
77

8-
use rivet_error::RivetError as RivetTransportError;
98
use rivet_error::{RivetError as RivetTransportError, RivetErrorSchema};
109
use rivetkit_actor_persist::versioned as persist_versioned;
1110
use rivetkit_core::Kv;
12-
use rivetkit_core::actor::state::PERSIST_DATA_KEY;
1311
use tokio::sync::oneshot;
1412
use vbare::OwnedVersionedData;
1513

1614
use super::*;
1715

16+
const PERSIST_DATA_KEY: &[u8] = &[1];
17+
1818
fn test_adapter_config() -> AdapterConfig {
1919
let timeout = Duration::from_secs(1);
2020
AdapterConfig {
@@ -64,6 +64,20 @@ mod moved_tests {
6464
assert_eq!(error.code(), code);
6565
}
6666

67+
#[test]
68+
fn startup_snapshot_recovery_only_treats_empty_stateful_snapshot_as_new() {
69+
assert_eq!(normalize_startup_snapshot(true, Some(Vec::new())), None);
70+
assert_eq!(
71+
normalize_startup_snapshot(false, Some(Vec::new())),
72+
Some(Vec::new())
73+
);
74+
assert_eq!(
75+
normalize_startup_snapshot(true, Some(vec![1, 2, 3])),
76+
Some(vec![1, 2, 3])
77+
);
78+
assert_eq!(normalize_startup_snapshot(true, None), None);
79+
}
80+
6781
fn schema_ptr(error: &anyhow::Error) -> *const RivetErrorSchema {
6882
error
6983
.chain()

0 commit comments

Comments
 (0)