Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions engine/packages/pegboard/src/actor_kv/preload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@
break;
}

// Mark this key as scanned regardless of whether it exists in FDB.
requested_get_keys.push(key.clone());

let key_subspace = subspace.subspace(&keys::actor_kv::KeyWrapper(key.clone()));
let mut stream = tx.get_ranges_keyvalues(
universaldb::RangeOption {
Expand Down Expand Up @@ -139,14 +136,26 @@
let size = entry_size(&k, &v, &m);
if total_bytes + size <= max_total_bytes {
total_bytes += size;
requested_get_keys.push(key.clone());
entries.push(ep::PreloadedKvEntry {
key: k,
value: v,
metadata: m,
});
} else {
tracing::debug!(
?key,
size,
remaining_budget = max_total_bytes.saturating_sub(total_bytes),
"preload get-key skipped due to global budget exhaustion"
);
}
} else {
// Mark missing keys as scanned so the runtime can distinguish
// "preloaded and absent" from "not preloaded".

Check warning on line 155 in engine/packages/pegboard/src/actor_kv/preload.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/pegboard/src/actor_kv/preload.rs
requested_get_keys.push(key.clone());
}
}
}

// 2. Read prefix ranges in priority order. Each prefix is bounded by
// its per-prefix max_bytes and the remaining global budget.
Expand Down
156 changes: 156 additions & 0 deletions engine/packages/pegboard/tests/kv_preload.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
use anyhow::Result;
use gas::prelude::*;
use pegboard::actor_kv as kv;
use rivet_config::config::pegboard::Pegboard;
use rivet_data::converted::ActorNameKeyData;

Check warning on line 5 in engine/packages/pegboard/tests/kv_preload.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/pegboard/tests/kv_preload.rs
use serde_json::json;

async fn setup_recipient(test_name: &str) -> Result<(
rivet_test_deps::TestDeps,
kv::Recipient,
Id,
)> {
let test_id = Uuid::new_v4();
let dc_label = 1;
let datacenters = [(
"test-dc".to_string(),
rivet_config::config::topology::Datacenter {
name: "test-dc".to_string(),
datacenter_label: dc_label,
is_leader: true,
peer_url: url::Url::parse("http://127.0.0.1:8080")?,
public_url: url::Url::parse("http://127.0.0.1:8081")?,
proxy_url: None,
valid_hosts: None,
},
)]
.into_iter()
.collect();

let api_peer_port = portpicker::pick_unused_port().expect("failed to pick api peer port");
let guard_port = portpicker::pick_unused_port().expect("failed to pick guard port");
let test_deps = rivet_test_deps::setup_single_datacenter(
test_id,
dc_label,
datacenters,
api_peer_port,
guard_port,
)
.await?;

let actor_id = Id::new_v1(dc_label);
let namespace_id = Id::new_v1(dc_label);
let recipient = kv::Recipient {
actor_id,
namespace_id,
name: test_name.to_string(),
};

Ok((test_deps, recipient, namespace_id))
}

#[tokio::test]
async fn preload_oversized_exact_key_is_not_marked_requested() -> Result<()> {
let (test_deps, recipient, namespace_id) =
setup_recipient("preload_oversized_exact_key").await?;
let db = &test_deps.pools.udb()?;
let actor_name = "preload-oversized-exact-key";
let key = b"large-exact-key".to_vec();
let value = vec![42; 256];

kv::put(db, &recipient, vec![key.clone()], vec![value]).await?;
db.run(|tx| {
let actor_name = actor_name.to_string();
let key = key.clone();
async move {
let tx = tx.with_subspace(pegboard::keys::subspace());
tx.write(
&pegboard::keys::ns::ActorNameKey::new(namespace_id, actor_name),
ActorNameKeyData {
metadata: serde_json::Map::from_iter([(
"preload".to_string(),
json!({
"keys": [key],
"prefixes": [],
}),
)]),
},
)?;
Ok(())
}
})
.await?;

let preloaded = kv::preload::fetch_preloaded_kv(
db,
&Pegboard {
preload_max_total_bytes: Some(1),
..Default::default()
},
recipient.actor_id,
namespace_id,
actor_name,
)
.await?
.expect("preload should be enabled by actor metadata");

assert!(
preloaded.entries.is_empty(),
"oversized exact key should not be included"
);
assert!(
preloaded.requested_get_keys.is_empty(),
"oversized present key must not be marked requested so runtimes live-fetch it"
);

Ok(())
}

Check warning on line 108 in engine/packages/pegboard/tests/kv_preload.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/pegboard/tests/kv_preload.rs
#[tokio::test]
async fn preload_missing_exact_key_is_marked_requested() -> Result<()> {
let (test_deps, recipient, namespace_id) =
setup_recipient("preload_missing_exact_key").await?;
let db = &test_deps.pools.udb()?;
let actor_name = "preload-missing-exact-key";
let key = b"missing-exact-key".to_vec();

db.run(|tx| {
let actor_name = actor_name.to_string();
let key = key.clone();
async move {
let tx = tx.with_subspace(pegboard::keys::subspace());
tx.write(
&pegboard::keys::ns::ActorNameKey::new(namespace_id, actor_name),
ActorNameKeyData {
metadata: serde_json::Map::from_iter([(
"preload".to_string(),
json!({
"keys": [key],
"prefixes": [],
}),
)]),
},
)?;
Ok(())
}
})
.await?;

let preloaded = kv::preload::fetch_preloaded_kv(
db,
&Pegboard {
preload_max_total_bytes: Some(1_024),
..Default::default()
},
recipient.actor_id,
namespace_id,
actor_name,
)
.await?
.expect("preload should be enabled by actor metadata");

assert!(preloaded.entries.is_empty());
assert_eq!(preloaded.requested_get_keys, vec![key]);

Ok(())
}
8 changes: 7 additions & 1 deletion rivetkit-rust/packages/rivetkit-core/src/actor/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,13 @@ impl ActorContext {
}

pub fn set_has_initialized(&self, has_initialized: bool) {
self.0.persisted.write().has_initialized = has_initialized;
{
let mut persisted = self.0.persisted.write();
if persisted.has_initialized == has_initialized {
return;
}
persisted.has_initialized = has_initialized;
}
self.0
.metrics
.inc_state_mutation(StateMutationReason::HasInitialized);
Expand Down
55 changes: 48 additions & 7 deletions rivetkit-rust/packages/rivetkit-core/src/actor/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1211,9 +1211,12 @@ impl ActorTask {
.await
.context("persist actor initialization")?;
let init_inspector_token_started_at = Instant::now();
crate::inspector::init_inspector_token(&self.ctx)
.await
.context("initialize inspector token")?;
crate::inspector::auth::init_inspector_token_with_preload(
&self.ctx,
self.preloaded_kv.as_ref(),
)
.await
.context("initialize inspector token")?;
tracing::debug!(
actor_id = %actor_id,
duration_ms = duration_ms_f64(init_inspector_token_started_at.elapsed()),
Expand Down Expand Up @@ -1250,9 +1253,10 @@ impl ActorTask {
async fn load_persisted_startup(&mut self) -> Result<PersistedStartup> {
match std::mem::take(&mut self.preload_persisted_actor) {
PreloadedPersistedActor::Some(preloaded) => {
let last_pushed_alarm = self.load_startup_last_pushed_alarm().await?;
return Ok(PersistedStartup {
actor: preloaded,
last_pushed_alarm: Self::load_last_pushed_alarm(self.ctx.kv().clone()).await?,
last_pushed_alarm,
});
}
PreloadedPersistedActor::BundleExistsButEmpty => {
Expand All @@ -1267,6 +1271,16 @@ impl ActorTask {
PreloadedPersistedActor::NoBundle => {}
}

if self.preloaded_kv.is_some() {
let actor =
self.decode_persisted_actor_startup(self.load_startup_key(PERSIST_DATA_KEY).await?)?;
let last_pushed_alarm = self.load_startup_last_pushed_alarm().await?;
return Ok(PersistedStartup {
actor,
last_pushed_alarm,
});
}

let mut values = self
.ctx
.kv()
Expand Down Expand Up @@ -1297,16 +1311,43 @@ impl ActorTask {
})
}

async fn load_last_pushed_alarm(kv: crate::kv::Kv) -> Result<Option<i64>> {
kv.get(LAST_PUSHED_ALARM_KEY)
async fn load_startup_key(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
if let Some(entry) = self
.preloaded_kv
.as_ref()
.and_then(|preloaded| preloaded.key_entry(key))
{
return Ok(entry);
}

self.ctx
.kv()
.get(key)
.await
.context("load persisted last pushed alarm")?
.context("load persisted actor startup key")
}

async fn load_startup_last_pushed_alarm(&self) -> Result<Option<i64>> {
self.load_startup_key(LAST_PUSHED_ALARM_KEY)
.await?
.map(|bytes| decode_last_pushed_alarm(&bytes))
.transpose()
.context("decode persisted last pushed alarm")
.map(Option::flatten)
}

fn decode_persisted_actor_startup(&self, encoded: Option<Vec<u8>>) -> Result<PersistedActor> {
match encoded {
Some(bytes) => {
decode_persisted_actor(&bytes).context("decode persisted actor startup data")
}
None => Ok(PersistedActor {
input: self.start_input.clone(),
..PersistedActor::default()
}),
}
}

fn ensure_actor_event_channel(&mut self) {
if self.actor_event_tx.is_some() && self.actor_event_rx.is_some() {
return;
Expand Down
22 changes: 17 additions & 5 deletions rivetkit-rust/packages/rivetkit-core/src/inspector/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use subtle::ConstantTimeEq;

use crate::ActorContext;
use crate::actor::keys::INSPECTOR_TOKEN_KEY;
use crate::actor::preload::PreloadedKv;

/// Test-only override. Not a public/production auth mechanism; production
/// inspector auth goes through the per-actor KV token.
Expand Down Expand Up @@ -75,15 +76,26 @@ impl InspectorAuth {
/// precedence over any KV-stored token and we do not want to pin a per-actor
/// token that will never be consulted.
pub async fn init_inspector_token(ctx: &ActorContext) -> Result<()> {
init_inspector_token_with_preload(ctx, None).await
}

pub(crate) async fn init_inspector_token_with_preload(
ctx: &ActorContext,
preloaded_kv: Option<&PreloadedKv>,
) -> Result<()> {
if configured_test_token().is_some() {
return Ok(());
}

let existing = ctx
.kv()
.get(&INSPECTOR_TOKEN_KEY)
.await
.context("load inspector token")?;
let existing =
match preloaded_kv.and_then(|preloaded| preloaded.key_entry(&INSPECTOR_TOKEN_KEY)) {
Some(existing) => existing,
None => ctx
.kv()
.get(&INSPECTOR_TOKEN_KEY)
.await
.context("load inspector token")?,
};
if existing.is_some() {
return Ok(());
}
Expand Down
1 change: 1 addition & 0 deletions rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ impl RegistryDispatcher {
[1],
[3],
[5, 1, 1],
[6],
],
"prefixes": [
{
Expand Down
Loading
Loading