Skip to content

Commit 19adfab

Browse files
committed
fix(rivetkit): use startup kv preload
1 parent eb18651 commit 19adfab

9 files changed

Lines changed: 503 additions & 19 deletions

File tree

engine/packages/pegboard/src/actor_kv/preload.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,6 @@ pub(crate) async fn batch_preload(
9696
break;
9797
}
9898

99-
// Mark this key as scanned regardless of whether it exists in FDB.
100-
requested_get_keys.push(key.clone());
101-
10299
let key_subspace = subspace.subspace(&keys::actor_kv::KeyWrapper(key.clone()));
103100
let mut stream = tx.get_ranges_keyvalues(
104101
universaldb::RangeOption {
@@ -139,14 +136,26 @@ pub(crate) async fn batch_preload(
139136
let size = entry_size(&k, &v, &m);
140137
if total_bytes + size <= max_total_bytes {
141138
total_bytes += size;
139+
requested_get_keys.push(key.clone());
142140
entries.push(ep::PreloadedKvEntry {
143141
key: k,
144142
value: v,
145143
metadata: m,
146144
});
145+
} else {
146+
tracing::debug!(
147+
?key,
148+
size,
149+
remaining_budget = max_total_bytes.saturating_sub(total_bytes),
150+
"preload get-key skipped due to global budget exhaustion"
151+
);
147152
}
153+
} else {
154+
// Mark missing keys as scanned so the runtime can distinguish
155+
// "preloaded and absent" from "not preloaded".
156+
requested_get_keys.push(key.clone());
157+
}
148158
}
149-
}
150159

151160
// 2. Read prefix ranges in priority order. Each prefix is bounded by
152161
// its per-prefix max_bytes and the remaining global budget.
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
use anyhow::Result;
2+
use gas::prelude::*;
3+
use pegboard::actor_kv as kv;
4+
use rivet_config::config::pegboard::Pegboard;
5+
use rivet_data::converted::ActorNameKeyData;
6+
use serde_json::json;
7+
8+
async fn setup_recipient(test_name: &str) -> Result<(
9+
rivet_test_deps::TestDeps,
10+
kv::Recipient,
11+
Id,
12+
)> {
13+
let test_id = Uuid::new_v4();
14+
let dc_label = 1;
15+
let datacenters = [(
16+
"test-dc".to_string(),
17+
rivet_config::config::topology::Datacenter {
18+
name: "test-dc".to_string(),
19+
datacenter_label: dc_label,
20+
is_leader: true,
21+
peer_url: url::Url::parse("http://127.0.0.1:8080")?,
22+
public_url: url::Url::parse("http://127.0.0.1:8081")?,
23+
proxy_url: None,
24+
valid_hosts: None,
25+
},
26+
)]
27+
.into_iter()
28+
.collect();
29+
30+
let api_peer_port = portpicker::pick_unused_port().expect("failed to pick api peer port");
31+
let guard_port = portpicker::pick_unused_port().expect("failed to pick guard port");
32+
let test_deps = rivet_test_deps::setup_single_datacenter(
33+
test_id,
34+
dc_label,
35+
datacenters,
36+
api_peer_port,
37+
guard_port,
38+
)
39+
.await?;
40+
41+
let actor_id = Id::new_v1(dc_label);
42+
let namespace_id = Id::new_v1(dc_label);
43+
let recipient = kv::Recipient {
44+
actor_id,
45+
namespace_id,
46+
name: test_name.to_string(),
47+
};
48+
49+
Ok((test_deps, recipient, namespace_id))
50+
}
51+
52+
#[tokio::test]
53+
async fn preload_oversized_exact_key_is_not_marked_requested() -> Result<()> {
54+
let (test_deps, recipient, namespace_id) =
55+
setup_recipient("preload_oversized_exact_key").await?;
56+
let db = &test_deps.pools.udb()?;
57+
let actor_name = "preload-oversized-exact-key";
58+
let key = b"large-exact-key".to_vec();
59+
let value = vec![42; 256];
60+
61+
kv::put(db, &recipient, vec![key.clone()], vec![value]).await?;
62+
db.run(|tx| {
63+
let actor_name = actor_name.to_string();
64+
let key = key.clone();
65+
async move {
66+
let tx = tx.with_subspace(pegboard::keys::subspace());
67+
tx.write(
68+
&pegboard::keys::ns::ActorNameKey::new(namespace_id, actor_name),
69+
ActorNameKeyData {
70+
metadata: serde_json::Map::from_iter([(
71+
"preload".to_string(),
72+
json!({
73+
"keys": [key],
74+
"prefixes": [],
75+
}),
76+
)]),
77+
},
78+
)?;
79+
Ok(())
80+
}
81+
})
82+
.await?;
83+
84+
let preloaded = kv::preload::fetch_preloaded_kv(
85+
db,
86+
&Pegboard {
87+
preload_max_total_bytes: Some(1),
88+
..Default::default()
89+
},
90+
recipient.actor_id,
91+
namespace_id,
92+
actor_name,
93+
)
94+
.await?
95+
.expect("preload should be enabled by actor metadata");
96+
97+
assert!(
98+
preloaded.entries.is_empty(),
99+
"oversized exact key should not be included"
100+
);
101+
assert!(
102+
preloaded.requested_get_keys.is_empty(),
103+
"oversized present key must not be marked requested so runtimes live-fetch it"
104+
);
105+
106+
Ok(())
107+
}
108+
109+
#[tokio::test]
110+
async fn preload_missing_exact_key_is_marked_requested() -> Result<()> {
111+
let (test_deps, recipient, namespace_id) =
112+
setup_recipient("preload_missing_exact_key").await?;
113+
let db = &test_deps.pools.udb()?;
114+
let actor_name = "preload-missing-exact-key";
115+
let key = b"missing-exact-key".to_vec();
116+
117+
db.run(|tx| {
118+
let actor_name = actor_name.to_string();
119+
let key = key.clone();
120+
async move {
121+
let tx = tx.with_subspace(pegboard::keys::subspace());
122+
tx.write(
123+
&pegboard::keys::ns::ActorNameKey::new(namespace_id, actor_name),
124+
ActorNameKeyData {
125+
metadata: serde_json::Map::from_iter([(
126+
"preload".to_string(),
127+
json!({
128+
"keys": [key],
129+
"prefixes": [],
130+
}),
131+
)]),
132+
},
133+
)?;
134+
Ok(())
135+
}
136+
})
137+
.await?;
138+
139+
let preloaded = kv::preload::fetch_preloaded_kv(
140+
db,
141+
&Pegboard {
142+
preload_max_total_bytes: Some(1_024),
143+
..Default::default()
144+
},
145+
recipient.actor_id,
146+
namespace_id,
147+
actor_name,
148+
)
149+
.await?
150+
.expect("preload should be enabled by actor metadata");
151+
152+
assert!(preloaded.entries.is_empty());
153+
assert_eq!(preloaded.requested_get_keys, vec![key]);
154+
155+
Ok(())
156+
}

rivetkit-rust/packages/rivetkit-core/src/actor/state.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,13 @@ impl ActorContext {
550550
}
551551

552552
pub fn set_has_initialized(&self, has_initialized: bool) {
553-
self.0.persisted.write().has_initialized = has_initialized;
553+
{
554+
let mut persisted = self.0.persisted.write();
555+
if persisted.has_initialized == has_initialized {
556+
return;
557+
}
558+
persisted.has_initialized = has_initialized;
559+
}
554560
self.0
555561
.metrics
556562
.inc_state_mutation(StateMutationReason::HasInitialized);

rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,9 +1211,12 @@ impl ActorTask {
12111211
.await
12121212
.context("persist actor initialization")?;
12131213
let init_inspector_token_started_at = Instant::now();
1214-
crate::inspector::init_inspector_token(&self.ctx)
1215-
.await
1216-
.context("initialize inspector token")?;
1214+
crate::inspector::auth::init_inspector_token_with_preload(
1215+
&self.ctx,
1216+
self.preloaded_kv.as_ref(),
1217+
)
1218+
.await
1219+
.context("initialize inspector token")?;
12171220
tracing::debug!(
12181221
actor_id = %actor_id,
12191222
duration_ms = duration_ms_f64(init_inspector_token_started_at.elapsed()),
@@ -1250,9 +1253,10 @@ impl ActorTask {
12501253
async fn load_persisted_startup(&mut self) -> Result<PersistedStartup> {
12511254
match std::mem::take(&mut self.preload_persisted_actor) {
12521255
PreloadedPersistedActor::Some(preloaded) => {
1256+
let last_pushed_alarm = self.load_startup_last_pushed_alarm().await?;
12531257
return Ok(PersistedStartup {
12541258
actor: preloaded,
1255-
last_pushed_alarm: Self::load_last_pushed_alarm(self.ctx.kv().clone()).await?,
1259+
last_pushed_alarm,
12561260
});
12571261
}
12581262
PreloadedPersistedActor::BundleExistsButEmpty => {
@@ -1267,6 +1271,16 @@ impl ActorTask {
12671271
PreloadedPersistedActor::NoBundle => {}
12681272
}
12691273

1274+
if self.preloaded_kv.is_some() {
1275+
let actor =
1276+
self.decode_persisted_actor_startup(self.load_startup_key(PERSIST_DATA_KEY).await?)?;
1277+
let last_pushed_alarm = self.load_startup_last_pushed_alarm().await?;
1278+
return Ok(PersistedStartup {
1279+
actor,
1280+
last_pushed_alarm,
1281+
});
1282+
}
1283+
12701284
let mut values = self
12711285
.ctx
12721286
.kv()
@@ -1297,16 +1311,43 @@ impl ActorTask {
12971311
})
12981312
}
12991313

1300-
async fn load_last_pushed_alarm(kv: crate::kv::Kv) -> Result<Option<i64>> {
1301-
kv.get(LAST_PUSHED_ALARM_KEY)
1314+
async fn load_startup_key(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1315+
if let Some(entry) = self
1316+
.preloaded_kv
1317+
.as_ref()
1318+
.and_then(|preloaded| preloaded.key_entry(key))
1319+
{
1320+
return Ok(entry);
1321+
}
1322+
1323+
self.ctx
1324+
.kv()
1325+
.get(key)
13021326
.await
1303-
.context("load persisted last pushed alarm")?
1327+
.context("load persisted actor startup key")
1328+
}
1329+
1330+
async fn load_startup_last_pushed_alarm(&self) -> Result<Option<i64>> {
1331+
self.load_startup_key(LAST_PUSHED_ALARM_KEY)
1332+
.await?
13041333
.map(|bytes| decode_last_pushed_alarm(&bytes))
13051334
.transpose()
13061335
.context("decode persisted last pushed alarm")
13071336
.map(Option::flatten)
13081337
}
13091338

1339+
fn decode_persisted_actor_startup(&self, encoded: Option<Vec<u8>>) -> Result<PersistedActor> {
1340+
match encoded {
1341+
Some(bytes) => {
1342+
decode_persisted_actor(&bytes).context("decode persisted actor startup data")
1343+
}
1344+
None => Ok(PersistedActor {
1345+
input: self.start_input.clone(),
1346+
..PersistedActor::default()
1347+
}),
1348+
}
1349+
}
1350+
13101351
fn ensure_actor_event_channel(&mut self) {
13111352
if self.actor_event_tx.is_some() && self.actor_event_rx.is_some() {
13121353
return;

rivetkit-rust/packages/rivetkit-core/src/inspector/auth.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use subtle::ConstantTimeEq;
1111

1212
use crate::ActorContext;
1313
use crate::actor::keys::INSPECTOR_TOKEN_KEY;
14+
use crate::actor::preload::PreloadedKv;
1415

1516
/// Test-only override. Not a public/production auth mechanism; production
1617
/// inspector auth goes through the per-actor KV token.
@@ -75,15 +76,26 @@ impl InspectorAuth {
7576
/// precedence over any KV-stored token and we do not want to pin a per-actor
7677
/// token that will never be consulted.
7778
pub async fn init_inspector_token(ctx: &ActorContext) -> Result<()> {
79+
init_inspector_token_with_preload(ctx, None).await
80+
}
81+
82+
pub(crate) async fn init_inspector_token_with_preload(
83+
ctx: &ActorContext,
84+
preloaded_kv: Option<&PreloadedKv>,
85+
) -> Result<()> {
7886
if configured_test_token().is_some() {
7987
return Ok(());
8088
}
8189

82-
let existing = ctx
83-
.kv()
84-
.get(&INSPECTOR_TOKEN_KEY)
85-
.await
86-
.context("load inspector token")?;
90+
let existing =
91+
match preloaded_kv.and_then(|preloaded| preloaded.key_entry(&INSPECTOR_TOKEN_KEY)) {
92+
Some(existing) => existing,
93+
None => ctx
94+
.kv()
95+
.get(&INSPECTOR_TOKEN_KEY)
96+
.await
97+
.context("load inspector token")?,
98+
};
8799
if existing.is_some() {
88100
return Ok(());
89101
}

rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,7 @@ impl RegistryDispatcher {
550550
[1],
551551
[3],
552552
[5, 1, 1],
553+
[6],
553554
],
554555
"prefixes": [
555556
{

0 commit comments

Comments
 (0)