Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions engine/artifacts/config-schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
use utoipa::IntoParams;
use utoipa::ToSchema;

use super::utils::{ServerlessMetadataError, fetch_serverless_runner_metadata};
use super::utils::{ServerlessMetadataError, fetch_serverless_metadata};
use crate::ctx::ApiCtx;

#[derive(Debug, Serialize, Deserialize, Clone, IntoParams)]
Expand Down Expand Up @@ -72,7 +72,7 @@ async fn serverless_health_check_inner(

let ServerlessHealthCheckRequest { url, headers } = body;

match fetch_serverless_runner_metadata(&ctx, url, headers).await {
match fetch_serverless_metadata(&ctx, url, headers).await {
Ok(metadata) => Ok(ServerlessHealthCheckResponse::Success {
version: metadata.version,
}),
Expand Down
53 changes: 14 additions & 39 deletions engine/packages/api-public/src/runner_configs/utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::Result;
use std::collections::HashMap;

use gas::prelude::*;
Expand Down Expand Up @@ -35,11 +36,11 @@ impl From<pegboard::ops::serverless_metadata::fetch::Output> for ServerlessMetad
///
/// Returns metadata including runtime, version, and actor names if available.
#[tracing::instrument(skip_all)]
pub async fn fetch_serverless_runner_metadata(
pub async fn fetch_serverless_metadata(
ctx: &ApiCtx,
url: String,
headers: HashMap<String, String>,
) -> Result<ServerlessMetadata, ServerlessMetadataError> {
) -> std::result::Result<ServerlessMetadata, ServerlessMetadataError> {
ctx.op(pegboard::ops::serverless_metadata::fetch::Input { url, headers })
.await
.map_err(|_| ServerlessMetadataError::RequestFailed {})?
Expand All @@ -54,49 +55,23 @@ pub async fn refresh_runner_config_metadata(
runner_name: String,
url: String,
headers: HashMap<String, String>,
) -> anyhow::Result<()> {
) -> Result<()> {
tracing::debug!(
?namespace_id,
?runner_name,
"refreshing runner config metadata"
);

// Fetch metadata using the op
let metadata = ctx
.op(pegboard::ops::serverless_metadata::fetch::Input { url, headers })
.await?
.map_err(|e| {
pegboard::errors::ServerlessRunnerPool::FailedToFetchMetadata { reason: e }.build()
})?;

if !metadata.actor_names.is_empty() {
tracing::debug!(
actor_names_count = metadata.actor_names.len(),
"storing actor names metadata"
);

// Convert and store actor names
let actor_names: Vec<pegboard::ops::actor_name::upsert_batch::ActorNameEntry> = metadata
.actor_names
.into_iter()
.map(
|a| pegboard::ops::actor_name::upsert_batch::ActorNameEntry {
name: a.name,
metadata: a.metadata,
},
)
.collect();

ctx.op(pegboard::ops::actor_name::upsert_batch::Input {
namespace_id,
actor_names,
})
.await?;

tracing::debug!("successfully stored actor names metadata");
} else {
tracing::debug!("no actor names to store");
}
ctx.op(pegboard::ops::runner_config::refresh_metadata::Input {
namespace_id,
runner_name,
url,
headers,
})
.await?
.map_err(|e| {
pegboard::errors::ServerlessRunnerPool::FailedToFetchMetadata { reason: e }.build()
})?;

Ok(())
}
19 changes: 16 additions & 3 deletions engine/packages/cache/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub type Cache = Arc<CacheInner>;

/// Utility type used to hold information relating to caching.
pub struct CacheInner {
pub(crate) driver: Driver,
pub(crate) driver: Option<Driver>,
pub(crate) ups: Option<universalpubsub::PubSub>,
}

Expand All @@ -33,15 +33,28 @@ impl CacheInner {
let ups = pools.ups().ok();

match &config.cache().driver {
rivet_config::config::CacheDriver::InMemory => Ok(Self::new_in_memory(10000, ups)),
Some(rivet_config::config::CacheDriver::InMemory) => {
Ok(Self::new_in_memory(10000, ups))
}
None => Ok(Self::new_disabled()),
}
}

#[tracing::instrument(skip(ups))]
pub fn new_in_memory(max_capacity: u64, ups: Option<universalpubsub::PubSub>) -> Cache {
let driver = Driver::InMemory(InMemoryDriver::new(max_capacity));

Arc::new(CacheInner { driver, ups })
Arc::new(CacheInner {
driver: Some(driver),
ups,
})
}

pub fn new_disabled() -> Cache {
Arc::new(CacheInner {
driver: None,
ups: None,
})
}

pub(crate) fn in_flight(&self) -> &scc::HashMap<RawCacheKey, broadcast::Sender<()>> {
Expand Down
51 changes: 37 additions & 14 deletions engine/packages/cache/src/req_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,27 @@ impl RequestConfig {

let mut ctx = GetterCtx::new(keys);

// No driver (cache disabled)
let Some(driver) = &self.cache.driver else {
let keys = ctx.unresolved_keys();
let ctx = getter(ctx, keys).await.map_err(Error::Getter)?;

metrics::CACHE_VALUE_EMPTY_TOTAL
.with_label_values(&[&base_key])
.inc_by(ctx.unresolved_keys().len() as u64);

return Ok(ctx.into_values());
};

// Build driver-specific cache keys
let (keys, cache_keys): (Vec<_>, Vec<_>) = ctx
.entries()
.map(|(key, _)| (key.clone(), self.cache.driver.process_key(&base_key, key)))
.map(|(key, _)| (key.clone(), driver.process_key(&base_key, key)))
.unzip();
let cache_keys_len = cache_keys.len();

// Attempt to fetch value from cache, fall back to getter
match self.cache.driver.get(&base_key, &cache_keys).await {
match driver.get(&base_key, &cache_keys).await {
Ok(cached_values) => {
debug_assert_eq!(
cache_keys_len,
Expand Down Expand Up @@ -128,7 +140,7 @@ impl RequestConfig {

// Determine which keys are currently being fetched and not
for key in remaining_keys {
let cache_key = self.cache.driver.process_key(&base_key, &key);
let cache_key = driver.process_key(&base_key, &key);
match self.cache.in_flight().entry_async(cache_key).await {
scc::hash_map::Entry::Occupied(broadcast) => {
waiting_keys.push((key, broadcast.subscribe()));
Expand All @@ -141,7 +153,6 @@ impl RequestConfig {
}

let getter2 = getter.clone();
let cache = self.cache.clone();
let ctx2 = GetterCtx::new(leased_keys.clone());
let base_key2 = base_key.clone();
let leased_keys2 = leased_keys.clone();
Expand Down Expand Up @@ -178,8 +189,7 @@ impl RequestConfig {
.into_iter()
.partition_map(|(key, succeeded)| {
if succeeded {
let cache_key =
cache.driver.process_key(&base_key2, &key);
let cache_key = driver.process_key(&base_key2, &key);
Either::Left((key, cache_key))
} else {
Either::Right(key)
Expand All @@ -193,7 +203,7 @@ impl RequestConfig {
if succeeded_cache_keys.is_empty() {
Ok(Vec::new())
} else {
cache.driver.get(&base_key2, &succeeded_cache_keys).await
driver.get(&base_key2, &succeeded_cache_keys).await
}
},
async {
Expand Down Expand Up @@ -255,7 +265,7 @@ impl RequestConfig {
.into_iter()
.filter_map(|(key, value)| {
// Process the key with the appropriate driver
let cache_key = self.cache.driver.process_key(&base_key, key);
let cache_key = driver.process_key(&base_key, key);
// Try to decode the value using the driver
match encoder(value) {
Ok(value_bytes) => Some((cache_key, value_bytes, expire_at)),
Expand All @@ -269,10 +279,9 @@ impl RequestConfig {
.collect::<Vec<_>>();

if !entries_values.is_empty() {
let cache = self.cache.clone();
let base_key_clone = base_key.clone();

if let Err(err) = cache.driver.set(&base_key_clone, entries_values).await {
if let Err(err) = driver.set(&base_key_clone, entries_values).await {
tracing::error!(?err, "failed to write to cache");
}

Expand All @@ -281,7 +290,7 @@ impl RequestConfig {

// Release leases
for key in leased_keys {
let cache_key = self.cache.driver.process_key(&base_key, &key);
let cache_key = driver.process_key(&base_key, &key);
self.cache.in_flight().remove_async(&cache_key).await;
}
}
Expand All @@ -298,13 +307,17 @@ impl RequestConfig {
"failed to read batch keys from cache, falling back to getter"
);

let keys = ctx.unresolved_keys();

metrics::CACHE_REQUEST_ERRORS
.with_label_values(&[&base_key])
.inc();
metrics::CACHE_VALUE_MISS_TOTAL
.with_label_values(&[base_key.as_str()])
.inc_by(keys.len() as u64);

// Fall back to the getter since we can't fetch the value from
// the cache
let keys = ctx.unresolved_keys();
let ctx = getter(ctx, keys).await.map_err(Error::Getter)?;

metrics::CACHE_VALUE_EMPTY_TOTAL
Expand All @@ -325,11 +338,16 @@ impl RequestConfig {
where
Key: CacheKey + Send + Sync,
{
// Cache disabled
let Some(driver) = &self.cache.driver else {
return Ok(());
};

// Build keys
let base_key = base_key.as_ref();
let cache_keys = keys
.into_iter()
.map(|key| self.cache.driver.process_key(base_key, &key))
.map(|key| driver.process_key(base_key, &key))
.collect::<Vec<_>>();

if cache_keys.is_empty() {
Expand Down Expand Up @@ -375,6 +393,11 @@ impl RequestConfig {
base_key: impl AsRef<str> + Debug,
keys: Vec<RawCacheKey>,
) -> Result<()> {
// Cache disabled
let Some(driver) = &self.cache.driver else {
return Ok(());
};

let base_key = base_key.as_ref();

if keys.is_empty() {
Expand All @@ -389,7 +412,7 @@ impl RequestConfig {
.inc_by(keys.len() as u64);

// Delete keys locally
match self.cache.driver.delete(base_key, keys).await {
match driver.delete(base_key, keys).await {
Ok(_) => {
tracing::trace!("successfully deleted keys");
}
Expand Down
12 changes: 10 additions & 2 deletions engine/packages/config/src/config/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,25 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct Cache {
pub driver: CacheDriver,
pub enabled: bool,
pub driver: Option<CacheDriver>,
}

impl Default for Cache {
fn default() -> Cache {
Self {
driver: CacheDriver::InMemory,
enabled: true,
driver: None,
}
}
}

impl Cache {
pub fn driver(&self) -> CacheDriver {
self.driver.clone().unwrap_or(CacheDriver::InMemory)
}
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub enum CacheDriver {
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/engine/tests/common/test_envoy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ use std::collections::HashMap;
use std::sync::Arc;

// Re-export everything from the standalone package
pub use rivet_envoy_protocol::PROTOCOL_VERSION;
pub use rivet_test_envoy::{
ActorConfig, ActorEvent, ActorLifecycleEvent, ActorStartResult, ActorStopResult,
CountingCrashActor, CrashNTimesThenSucceedActor, CrashOnStartActor, CustomActor,
CustomActorBuilder, DelayedStartActor, EchoActor, Envoy, EnvoyBuilder, EnvoyConfig, KvRequest,
NotifyOnStartActor, SleepImmediatelyActor, StopImmediatelyActor, TestActor, TimeoutActor,
VerifyInputActor,
};
pub use rivet_envoy_protocol::PROTOCOL_VERSION;

// Type alias for backwards compatibility
pub type TestEnvoy = Envoy;
Expand Down
1 change: 0 additions & 1 deletion engine/packages/pegboard/src/ops/actor_name/mod.rs

This file was deleted.

Loading
Loading