Skip to content

Commit 3c7167b

Browse files
committed
fix(rivetkit): use startup kv preload
1 parent eb18651 commit 3c7167b

8 files changed

Lines changed: 271 additions & 19 deletions

File tree

engine/packages/pegboard/src/actor_kv/preload.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,6 @@ pub(crate) async fn batch_preload(
9696
break;
9797
}
9898

99-
// Mark this key as scanned regardless of whether it exists in FDB.
100-
requested_get_keys.push(key.clone());
101-
10299
let key_subspace = subspace.subspace(&keys::actor_kv::KeyWrapper(key.clone()));
103100
let mut stream = tx.get_ranges_keyvalues(
104101
universaldb::RangeOption {
@@ -139,14 +136,26 @@ pub(crate) async fn batch_preload(
139136
let size = entry_size(&k, &v, &m);
140137
if total_bytes + size <= max_total_bytes {
141138
total_bytes += size;
139+
requested_get_keys.push(key.clone());
142140
entries.push(ep::PreloadedKvEntry {
143141
key: k,
144142
value: v,
145143
metadata: m,
146144
});
145+
} else {
146+
tracing::debug!(
147+
?key,
148+
size,
149+
remaining_budget = max_total_bytes.saturating_sub(total_bytes),
150+
"preload get-key skipped due to global budget exhaustion"
151+
);
147152
}
153+
} else {
154+
// Mark missing keys as scanned so the runtime can distinguish
155+
// "preloaded and absent" from "not preloaded".
156+
requested_get_keys.push(key.clone());
157+
}
148158
}
149-
}
150159

151160
// 2. Read prefix ranges in priority order. Each prefix is bounded by
152161
// its per-prefix max_bytes and the remaining global budget.

rivetkit-rust/packages/rivetkit-core/src/actor/state.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,13 @@ impl ActorContext {
550550
}
551551

552552
pub fn set_has_initialized(&self, has_initialized: bool) {
553-
self.0.persisted.write().has_initialized = has_initialized;
553+
{
554+
let mut persisted = self.0.persisted.write();
555+
if persisted.has_initialized == has_initialized {
556+
return;
557+
}
558+
persisted.has_initialized = has_initialized;
559+
}
554560
self.0
555561
.metrics
556562
.inc_state_mutation(StateMutationReason::HasInitialized);

rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,9 +1211,12 @@ impl ActorTask {
12111211
.await
12121212
.context("persist actor initialization")?;
12131213
let init_inspector_token_started_at = Instant::now();
1214-
crate::inspector::init_inspector_token(&self.ctx)
1215-
.await
1216-
.context("initialize inspector token")?;
1214+
crate::inspector::auth::init_inspector_token_with_preload(
1215+
&self.ctx,
1216+
self.preloaded_kv.as_ref(),
1217+
)
1218+
.await
1219+
.context("initialize inspector token")?;
12171220
tracing::debug!(
12181221
actor_id = %actor_id,
12191222
duration_ms = duration_ms_f64(init_inspector_token_started_at.elapsed()),
@@ -1250,9 +1253,10 @@ impl ActorTask {
12501253
async fn load_persisted_startup(&mut self) -> Result<PersistedStartup> {
12511254
match std::mem::take(&mut self.preload_persisted_actor) {
12521255
PreloadedPersistedActor::Some(preloaded) => {
1256+
let last_pushed_alarm = self.load_startup_last_pushed_alarm().await?;
12531257
return Ok(PersistedStartup {
12541258
actor: preloaded,
1255-
last_pushed_alarm: Self::load_last_pushed_alarm(self.ctx.kv().clone()).await?,
1259+
last_pushed_alarm,
12561260
});
12571261
}
12581262
PreloadedPersistedActor::BundleExistsButEmpty => {
@@ -1267,6 +1271,16 @@ impl ActorTask {
12671271
PreloadedPersistedActor::NoBundle => {}
12681272
}
12691273

1274+
if self.preloaded_kv.is_some() {
1275+
let actor =
1276+
self.decode_persisted_actor_startup(self.load_startup_key(PERSIST_DATA_KEY).await?)?;
1277+
let last_pushed_alarm = self.load_startup_last_pushed_alarm().await?;
1278+
return Ok(PersistedStartup {
1279+
actor,
1280+
last_pushed_alarm,
1281+
});
1282+
}
1283+
12701284
let mut values = self
12711285
.ctx
12721286
.kv()
@@ -1297,16 +1311,43 @@ impl ActorTask {
12971311
})
12981312
}
12991313

1300-
async fn load_last_pushed_alarm(kv: crate::kv::Kv) -> Result<Option<i64>> {
1301-
kv.get(LAST_PUSHED_ALARM_KEY)
1314+
async fn load_startup_key(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1315+
if let Some(entry) = self
1316+
.preloaded_kv
1317+
.as_ref()
1318+
.and_then(|preloaded| preloaded.key_entry(key))
1319+
{
1320+
return Ok(entry);
1321+
}
1322+
1323+
self.ctx
1324+
.kv()
1325+
.get(key)
13021326
.await
1303-
.context("load persisted last pushed alarm")?
1327+
.context("load persisted actor startup key")
1328+
}
1329+
1330+
async fn load_startup_last_pushed_alarm(&self) -> Result<Option<i64>> {
1331+
self.load_startup_key(LAST_PUSHED_ALARM_KEY)
1332+
.await?
13041333
.map(|bytes| decode_last_pushed_alarm(&bytes))
13051334
.transpose()
13061335
.context("decode persisted last pushed alarm")
13071336
.map(Option::flatten)
13081337
}
13091338

1339+
fn decode_persisted_actor_startup(&self, encoded: Option<Vec<u8>>) -> Result<PersistedActor> {
1340+
match encoded {
1341+
Some(bytes) => {
1342+
decode_persisted_actor(&bytes).context("decode persisted actor startup data")
1343+
}
1344+
None => Ok(PersistedActor {
1345+
input: self.start_input.clone(),
1346+
..PersistedActor::default()
1347+
}),
1348+
}
1349+
}
1350+
13101351
fn ensure_actor_event_channel(&mut self) {
13111352
if self.actor_event_tx.is_some() && self.actor_event_rx.is_some() {
13121353
return;

rivetkit-rust/packages/rivetkit-core/src/inspector/auth.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use subtle::ConstantTimeEq;
1111

1212
use crate::ActorContext;
1313
use crate::actor::keys::INSPECTOR_TOKEN_KEY;
14+
use crate::actor::preload::PreloadedKv;
1415

1516
/// Test-only override. Not a public/production auth mechanism; production
1617
/// inspector auth goes through the per-actor KV token.
@@ -75,15 +76,26 @@ impl InspectorAuth {
7576
/// precedence over any KV-stored token and we do not want to pin a per-actor
7677
/// token that will never be consulted.
7778
pub async fn init_inspector_token(ctx: &ActorContext) -> Result<()> {
79+
init_inspector_token_with_preload(ctx, None).await
80+
}
81+
82+
pub(crate) async fn init_inspector_token_with_preload(
83+
ctx: &ActorContext,
84+
preloaded_kv: Option<&PreloadedKv>,
85+
) -> Result<()> {
7886
if configured_test_token().is_some() {
7987
return Ok(());
8088
}
8189

82-
let existing = ctx
83-
.kv()
84-
.get(&INSPECTOR_TOKEN_KEY)
85-
.await
86-
.context("load inspector token")?;
90+
let existing =
91+
match preloaded_kv.and_then(|preloaded| preloaded.key_entry(&INSPECTOR_TOKEN_KEY)) {
92+
Some(existing) => existing,
93+
None => ctx
94+
.kv()
95+
.get(&INSPECTOR_TOKEN_KEY)
96+
.await
97+
.context("load inspector token")?,
98+
};
8799
if existing.is_some() {
88100
return Ok(());
89101
}

rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,7 @@ impl RegistryDispatcher {
550550
[1],
551551
[3],
552552
[5, 1, 1],
553+
[6],
553554
],
554555
"prefixes": [
555556
{

rivetkit-rust/packages/rivetkit-core/tests/task.rs

Lines changed: 183 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,12 @@ mod moved_tests {
3131
ConnHandle, HibernatableConnectionMetadata, decode_persisted_connection,
3232
};
3333
use crate::actor::context::tests::new_with_kv;
34-
use crate::actor::keys::{LAST_PUSHED_ALARM_KEY, PERSIST_DATA_KEY, make_connection_key};
34+
use crate::actor::keys::{
35+
CONN_PREFIX, INSPECTOR_TOKEN_KEY, LAST_PUSHED_ALARM_KEY, PERSIST_DATA_KEY,
36+
QUEUE_MESSAGES_PREFIX, WORKFLOW_STORAGE_PREFIX, make_connection_key,
37+
};
3538
use crate::actor::messages::{ActorEvent, SerializeStateReason, StateDelta};
36-
use crate::actor::preload::PreloadedPersistedActor;
39+
use crate::actor::preload::{PreloadedKv, PreloadedPersistedActor};
3740
use crate::actor::sleep::CanSleep;
3841
use crate::actor::state::{
3942
PersistedActor, PersistedScheduleEvent, RequestSaveOpts, decode_last_pushed_alarm,
@@ -47,6 +50,7 @@ mod moved_tests {
4750
use crate::inspector::auth::test_inspector_env_lock;
4851
use crate::kv::tests::new_in_memory;
4952
use crate::{ActorConfig, ActorContext, ActorFactory};
53+
use rivet_envoy_client::utils::EnvoyShutdownError;
5054

5155
fn test_hook_lock() -> &'static AsyncMutex<()> {
5256
static LOCK: OnceLock<AsyncMutex<()>> = OnceLock::new();
@@ -2108,6 +2112,183 @@ mod moved_tests {
21082112
assert!(ctx.persisted_actor().has_initialized);
21092113
}
21102114

2115+
#[tokio::test]
2116+
async fn startup_uses_preloaded_last_pushed_alarm_without_live_kv() {
2117+
let _env_guard = test_inspector_env_lock().lock().expect("env lock poisoned");
2118+
unsafe {
2119+
std::env::remove_var("_RIVET_TEST_INSPECTOR_TOKEN");
2120+
}
2121+
2122+
let future_ts = 4_102_445_000_000;
2123+
let persisted = PersistedActor {
2124+
has_initialized: true,
2125+
scheduled_events: vec![PersistedScheduleEvent {
2126+
event_id: "evt-preloaded-alarm".to_owned(),
2127+
timestamp: future_ts,
2128+
action: "tick".to_owned(),
2129+
args: None,
2130+
}],
2131+
..PersistedActor::default()
2132+
};
2133+
let encoded_alarm =
2134+
encode_last_pushed_alarm(Some(future_ts)).expect("last pushed alarm should encode");
2135+
let preloaded_kv = PreloadedKv::new_with_requested_get_keys(
2136+
vec![
2137+
(LAST_PUSHED_ALARM_KEY.to_vec(), encoded_alarm),
2138+
(
2139+
INSPECTOR_TOKEN_KEY.to_vec(),
2140+
b"startup-preload-token".to_vec(),
2141+
),
2142+
],
2143+
vec![
2144+
PERSIST_DATA_KEY.to_vec(),
2145+
LAST_PUSHED_ALARM_KEY.to_vec(),
2146+
INSPECTOR_TOKEN_KEY.to_vec(),
2147+
vec![5, 1, 1],
2148+
],
2149+
vec![
2150+
WORKFLOW_STORAGE_PREFIX.to_vec(),
2151+
CONN_PREFIX.to_vec(),
2152+
QUEUE_MESSAGES_PREFIX.to_vec(),
2153+
],
2154+
);
2155+
2156+
let (handle, mut rx) = test_envoy_handle();
2157+
tokio::spawn(async move {
2158+
while let Some(message) = rx.recv().await {
2159+
if let ToEnvoyMessage::KvRequest { response_tx, .. } = message {
2160+
let _ = response_tx.send(Err(anyhow::anyhow!(EnvoyShutdownError)));
2161+
}
2162+
}
2163+
});
2164+
2165+
let ctx = new_with_kv(
2166+
"actor-preloaded-alarm",
2167+
"task-preloaded-alarm",
2168+
Vec::new(),
2169+
"local",
2170+
crate::kv::Kv::new(handle.clone(), "actor-preloaded-alarm"),
2171+
);
2172+
ctx.configure_envoy(handle, Some(31));
2173+
let mut task = new_task(ctx.clone())
2174+
.with_preloaded_persisted_actor(PreloadedPersistedActor::Some(persisted))
2175+
.with_preloaded_kv(Some(preloaded_kv));
2176+
let (start_tx, start_rx) = oneshot::channel();
2177+
2178+
task.handle_lifecycle(LifecycleCommand::Start { reply: start_tx })
2179+
.await;
2180+
start_rx
2181+
.await
2182+
.expect("start reply should send")
2183+
.expect("start should use preloaded alarm without live kv");
2184+
2185+
assert_eq!(ctx.last_pushed_alarm(), Some(future_ts));
2186+
}
2187+
2188+
#[tokio::test]
2189+
async fn startup_uses_preloaded_alarm_with_partial_startup_preload() {
2190+
let _env_guard = test_inspector_env_lock().lock().expect("env lock poisoned");
2191+
unsafe {
2192+
std::env::remove_var("_RIVET_TEST_INSPECTOR_TOKEN");
2193+
}
2194+
2195+
let future_ts = 4_102_445_123_000;
2196+
let persisted = PersistedActor {
2197+
has_initialized: true,
2198+
scheduled_events: vec![PersistedScheduleEvent {
2199+
event_id: "evt-partial-preload-alarm".to_owned(),
2200+
timestamp: future_ts,
2201+
action: "tick".to_owned(),
2202+
args: None,
2203+
}],
2204+
..PersistedActor::default()
2205+
};
2206+
let encoded_persisted =
2207+
encode_persisted_actor(&persisted).expect("persisted actor should encode");
2208+
let encoded_alarm =
2209+
encode_last_pushed_alarm(Some(future_ts)).expect("last pushed alarm should encode");
2210+
let preloaded_kv = PreloadedKv::new_with_requested_get_keys(
2211+
vec![
2212+
(LAST_PUSHED_ALARM_KEY.to_vec(), encoded_alarm),
2213+
(
2214+
INSPECTOR_TOKEN_KEY.to_vec(),
2215+
b"startup-partial-preload-token".to_vec(),
2216+
),
2217+
],
2218+
vec![
2219+
LAST_PUSHED_ALARM_KEY.to_vec(),
2220+
INSPECTOR_TOKEN_KEY.to_vec(),
2221+
vec![5, 1, 1],
2222+
],
2223+
vec![
2224+
WORKFLOW_STORAGE_PREFIX.to_vec(),
2225+
CONN_PREFIX.to_vec(),
2226+
QUEUE_MESSAGES_PREFIX.to_vec(),
2227+
],
2228+
);
2229+
2230+
let saw_persist_live_get = Arc::new(AtomicBool::new(false));
2231+
let saw_persist_live_get_for_task = Arc::clone(&saw_persist_live_get);
2232+
let (handle, mut rx) = test_envoy_handle();
2233+
tokio::spawn(async move {
2234+
while let Some(message) = rx.recv().await {
2235+
if let ToEnvoyMessage::KvRequest {
2236+
data, response_tx, ..
2237+
} = message
2238+
{
2239+
match data {
2240+
protocol::KvRequestData::KvGetRequest(request) => {
2241+
assert_eq!(
2242+
request.keys,
2243+
vec![PERSIST_DATA_KEY.to_vec()],
2244+
"partial preload should only live-fetch missing persisted actor data"
2245+
);
2246+
saw_persist_live_get_for_task.store(true, Ordering::SeqCst);
2247+
let _ = response_tx.send(Ok(
2248+
protocol::KvResponseData::KvGetResponse(protocol::KvGetResponse {
2249+
keys: vec![PERSIST_DATA_KEY.to_vec()],
2250+
values: vec![encoded_persisted.clone()],
2251+
metadata: vec![protocol::KvMetadata {
2252+
version: Vec::new(),
2253+
update_ts: 0,
2254+
}],
2255+
}),
2256+
));
2257+
}
2258+
protocol::KvRequestData::KvPutRequest(_) => {
2259+
let _ = response_tx.send(Ok(protocol::KvResponseData::KvPutResponse));
2260+
}
2261+
other => {
2262+
let _ = response_tx
2263+
.send(Err(anyhow::anyhow!("unexpected KV request: {other:?}")));
2264+
}
2265+
}
2266+
}
2267+
}
2268+
});
2269+
2270+
let ctx = new_with_kv(
2271+
"actor-partial-preload-alarm",
2272+
"task-partial-preload-alarm",
2273+
Vec::new(),
2274+
"local",
2275+
crate::kv::Kv::new(handle.clone(), "actor-partial-preload-alarm"),
2276+
);
2277+
ctx.configure_envoy(handle, Some(32));
2278+
let mut task = new_task(ctx.clone()).with_preloaded_kv(Some(preloaded_kv));
2279+
let (start_tx, start_rx) = oneshot::channel();
2280+
2281+
task.handle_lifecycle(LifecycleCommand::Start { reply: start_tx })
2282+
.await;
2283+
start_rx
2284+
.await
2285+
.expect("start reply should send")
2286+
.expect("start should use preloaded alarm in partial preload bundle");
2287+
2288+
assert!(saw_persist_live_get.load(Ordering::SeqCst));
2289+
assert_eq!(ctx.last_pushed_alarm(), Some(future_ts));
2290+
}
2291+
21112292
#[tokio::test]
21122293
async fn startup_skips_future_alarm_push_when_last_pushed_matches() {
21132294
let kv = new_in_memory();

0 commit comments

Comments
 (0)