Skip to content

Commit 3a5f121

Browse files
authored
fix: fix pool metadata protocol version refresh, rivetkit native ws (#4585)
# Description Please include a summary of the changes and the related issue. Please also include relevant motivation and context. ## Type of change - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] This change requires a documentation update ## How Has This Been Tested? Please describe the tests that you ran to verify your changes. ## Checklist: - [ ] My code follows the style guidelines of this project - [ ] I have performed a self-review of my code - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [ ] My changes generate no new warnings - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes
1 parent 4c94230 commit 3a5f121

File tree

47 files changed

+863
-744
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+863
-744
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/artifacts/config-schema.json

Lines changed: 12 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/api-public/src/runner_configs/serverless_health_check.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
1010
use utoipa::IntoParams;
1111
use utoipa::ToSchema;
1212

13-
use super::utils::{ServerlessMetadataError, fetch_serverless_runner_metadata};
13+
use super::utils::{ServerlessMetadataError, fetch_serverless_metadata};
1414
use crate::ctx::ApiCtx;
1515

1616
#[derive(Debug, Serialize, Deserialize, Clone, IntoParams)]
@@ -72,7 +72,7 @@ async fn serverless_health_check_inner(
7272

7373
let ServerlessHealthCheckRequest { url, headers } = body;
7474

75-
match fetch_serverless_runner_metadata(&ctx, url, headers).await {
75+
match fetch_serverless_metadata(&ctx, url, headers).await {
7676
Ok(metadata) => Ok(ServerlessHealthCheckResponse::Success {
7777
version: metadata.version,
7878
}),
Lines changed: 14 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use anyhow::Result;
12
use std::collections::HashMap;
23

34
use gas::prelude::*;
@@ -35,11 +36,11 @@ impl From<pegboard::ops::serverless_metadata::fetch::Output> for ServerlessMetad
3536
///
3637
/// Returns metadata including runtime, version, and actor names if available.
3738
#[tracing::instrument(skip_all)]
38-
pub async fn fetch_serverless_runner_metadata(
39+
pub async fn fetch_serverless_metadata(
3940
ctx: &ApiCtx,
4041
url: String,
4142
headers: HashMap<String, String>,
42-
) -> Result<ServerlessMetadata, ServerlessMetadataError> {
43+
) -> std::result::Result<ServerlessMetadata, ServerlessMetadataError> {
4344
ctx.op(pegboard::ops::serverless_metadata::fetch::Input { url, headers })
4445
.await
4546
.map_err(|_| ServerlessMetadataError::RequestFailed {})?
@@ -54,49 +55,23 @@ pub async fn refresh_runner_config_metadata(
5455
runner_name: String,
5556
url: String,
5657
headers: HashMap<String, String>,
57-
) -> anyhow::Result<()> {
58+
) -> Result<()> {
5859
tracing::debug!(
5960
?namespace_id,
6061
?runner_name,
6162
"refreshing runner config metadata"
6263
);
6364

64-
// Fetch metadata using the op
65-
let metadata = ctx
66-
.op(pegboard::ops::serverless_metadata::fetch::Input { url, headers })
67-
.await?
68-
.map_err(|e| {
69-
pegboard::errors::ServerlessRunnerPool::FailedToFetchMetadata { reason: e }.build()
70-
})?;
71-
72-
if !metadata.actor_names.is_empty() {
73-
tracing::debug!(
74-
actor_names_count = metadata.actor_names.len(),
75-
"storing actor names metadata"
76-
);
77-
78-
// Convert and store actor names
79-
let actor_names: Vec<pegboard::ops::actor_name::upsert_batch::ActorNameEntry> = metadata
80-
.actor_names
81-
.into_iter()
82-
.map(
83-
|a| pegboard::ops::actor_name::upsert_batch::ActorNameEntry {
84-
name: a.name,
85-
metadata: a.metadata,
86-
},
87-
)
88-
.collect();
89-
90-
ctx.op(pegboard::ops::actor_name::upsert_batch::Input {
91-
namespace_id,
92-
actor_names,
93-
})
94-
.await?;
95-
96-
tracing::debug!("successfully stored actor names metadata");
97-
} else {
98-
tracing::debug!("no actor names to store");
99-
}
65+
ctx.op(pegboard::ops::runner_config::refresh_metadata::Input {
66+
namespace_id,
67+
runner_name,
68+
url,
69+
headers,
70+
})
71+
.await?
72+
.map_err(|e| {
73+
pegboard::errors::ServerlessRunnerPool::FailedToFetchMetadata { reason: e }.build()
74+
})?;
10075

10176
Ok(())
10277
}

engine/packages/cache/src/inner.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ pub type Cache = Arc<CacheInner>;
1414

1515
/// Utility type used to hold information relating to caching.
1616
pub struct CacheInner {
17-
pub(crate) driver: Driver,
17+
pub(crate) driver: Option<Driver>,
1818
pub(crate) ups: Option<universalpubsub::PubSub>,
1919
}
2020

@@ -33,15 +33,28 @@ impl CacheInner {
3333
let ups = pools.ups().ok();
3434

3535
match &config.cache().driver {
36-
rivet_config::config::CacheDriver::InMemory => Ok(Self::new_in_memory(10000, ups)),
36+
Some(rivet_config::config::CacheDriver::InMemory) => {
37+
Ok(Self::new_in_memory(10000, ups))
38+
}
39+
None => Ok(Self::new_disabled()),
3740
}
3841
}
3942

4043
#[tracing::instrument(skip(ups))]
4144
pub fn new_in_memory(max_capacity: u64, ups: Option<universalpubsub::PubSub>) -> Cache {
4245
let driver = Driver::InMemory(InMemoryDriver::new(max_capacity));
4346

44-
Arc::new(CacheInner { driver, ups })
47+
Arc::new(CacheInner {
48+
driver: Some(driver),
49+
ups,
50+
})
51+
}
52+
53+
pub fn new_disabled() -> Cache {
54+
Arc::new(CacheInner {
55+
driver: None,
56+
ups: None,
57+
})
4558
}
4659

4760
pub(crate) fn in_flight(&self) -> &scc::HashMap<RawCacheKey, broadcast::Sender<()>> {

engine/packages/cache/src/req_config.rs

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,27 @@ impl RequestConfig {
8181

8282
let mut ctx = GetterCtx::new(keys);
8383

84+
// No driver (cache disabled)
85+
let Some(driver) = &self.cache.driver else {
86+
let keys = ctx.unresolved_keys();
87+
let ctx = getter(ctx, keys).await.map_err(Error::Getter)?;
88+
89+
metrics::CACHE_VALUE_EMPTY_TOTAL
90+
.with_label_values(&[&base_key])
91+
.inc_by(ctx.unresolved_keys().len() as u64);
92+
93+
return Ok(ctx.into_values());
94+
};
95+
8496
// Build driver-specific cache keys
8597
let (keys, cache_keys): (Vec<_>, Vec<_>) = ctx
8698
.entries()
87-
.map(|(key, _)| (key.clone(), self.cache.driver.process_key(&base_key, key)))
99+
.map(|(key, _)| (key.clone(), driver.process_key(&base_key, key)))
88100
.unzip();
89101
let cache_keys_len = cache_keys.len();
90102

91103
// Attempt to fetch value from cache, fall back to getter
92-
match self.cache.driver.get(&base_key, &cache_keys).await {
104+
match driver.get(&base_key, &cache_keys).await {
93105
Ok(cached_values) => {
94106
debug_assert_eq!(
95107
cache_keys_len,
@@ -128,7 +140,7 @@ impl RequestConfig {
128140

129141
// Determine which keys are currently being fetched and not
130142
for key in remaining_keys {
131-
let cache_key = self.cache.driver.process_key(&base_key, &key);
143+
let cache_key = driver.process_key(&base_key, &key);
132144
match self.cache.in_flight().entry_async(cache_key).await {
133145
scc::hash_map::Entry::Occupied(broadcast) => {
134146
waiting_keys.push((key, broadcast.subscribe()));
@@ -141,7 +153,6 @@ impl RequestConfig {
141153
}
142154

143155
let getter2 = getter.clone();
144-
let cache = self.cache.clone();
145156
let ctx2 = GetterCtx::new(leased_keys.clone());
146157
let base_key2 = base_key.clone();
147158
let leased_keys2 = leased_keys.clone();
@@ -178,8 +189,7 @@ impl RequestConfig {
178189
.into_iter()
179190
.partition_map(|(key, succeeded)| {
180191
if succeeded {
181-
let cache_key =
182-
cache.driver.process_key(&base_key2, &key);
192+
let cache_key = driver.process_key(&base_key2, &key);
183193
Either::Left((key, cache_key))
184194
} else {
185195
Either::Right(key)
@@ -193,7 +203,7 @@ impl RequestConfig {
193203
if succeeded_cache_keys.is_empty() {
194204
Ok(Vec::new())
195205
} else {
196-
cache.driver.get(&base_key2, &succeeded_cache_keys).await
206+
driver.get(&base_key2, &succeeded_cache_keys).await
197207
}
198208
},
199209
async {
@@ -255,7 +265,7 @@ impl RequestConfig {
255265
.into_iter()
256266
.filter_map(|(key, value)| {
257267
// Process the key with the appropriate driver
258-
let cache_key = self.cache.driver.process_key(&base_key, key);
268+
let cache_key = driver.process_key(&base_key, key);
259269
// Try to decode the value using the driver
260270
match encoder(value) {
261271
Ok(value_bytes) => Some((cache_key, value_bytes, expire_at)),
@@ -269,10 +279,9 @@ impl RequestConfig {
269279
.collect::<Vec<_>>();
270280

271281
if !entries_values.is_empty() {
272-
let cache = self.cache.clone();
273282
let base_key_clone = base_key.clone();
274283

275-
if let Err(err) = cache.driver.set(&base_key_clone, entries_values).await {
284+
if let Err(err) = driver.set(&base_key_clone, entries_values).await {
276285
tracing::error!(?err, "failed to write to cache");
277286
}
278287

@@ -281,7 +290,7 @@ impl RequestConfig {
281290

282291
// Release leases
283292
for key in leased_keys {
284-
let cache_key = self.cache.driver.process_key(&base_key, &key);
293+
let cache_key = driver.process_key(&base_key, &key);
285294
self.cache.in_flight().remove_async(&cache_key).await;
286295
}
287296
}
@@ -298,13 +307,17 @@ impl RequestConfig {
298307
"failed to read batch keys from cache, falling back to getter"
299308
);
300309

310+
let keys = ctx.unresolved_keys();
311+
301312
metrics::CACHE_REQUEST_ERRORS
302313
.with_label_values(&[&base_key])
303314
.inc();
315+
metrics::CACHE_VALUE_MISS_TOTAL
316+
.with_label_values(&[base_key.as_str()])
317+
.inc_by(keys.len() as u64);
304318

305319
// Fall back to the getter since we can't fetch the value from
306320
// the cache
307-
let keys = ctx.unresolved_keys();
308321
let ctx = getter(ctx, keys).await.map_err(Error::Getter)?;
309322

310323
metrics::CACHE_VALUE_EMPTY_TOTAL
@@ -325,11 +338,16 @@ impl RequestConfig {
325338
where
326339
Key: CacheKey + Send + Sync,
327340
{
341+
// Cache disabled
342+
let Some(driver) = &self.cache.driver else {
343+
return Ok(());
344+
};
345+
328346
// Build keys
329347
let base_key = base_key.as_ref();
330348
let cache_keys = keys
331349
.into_iter()
332-
.map(|key| self.cache.driver.process_key(base_key, &key))
350+
.map(|key| driver.process_key(base_key, &key))
333351
.collect::<Vec<_>>();
334352

335353
if cache_keys.is_empty() {
@@ -375,6 +393,11 @@ impl RequestConfig {
375393
base_key: impl AsRef<str> + Debug,
376394
keys: Vec<RawCacheKey>,
377395
) -> Result<()> {
396+
// Cache disabled
397+
let Some(driver) = &self.cache.driver else {
398+
return Ok(());
399+
};
400+
378401
let base_key = base_key.as_ref();
379402

380403
if keys.is_empty() {
@@ -389,7 +412,7 @@ impl RequestConfig {
389412
.inc_by(keys.len() as u64);
390413

391414
// Delete keys locally
392-
match self.cache.driver.delete(base_key, keys).await {
415+
match driver.delete(base_key, keys).await {
393416
Ok(_) => {
394417
tracing::trace!("successfully deleted keys");
395418
}

engine/packages/config/src/config/cache.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,25 @@ use serde::{Deserialize, Serialize};
55
#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
66
#[serde(deny_unknown_fields)]
77
pub struct Cache {
8-
pub driver: CacheDriver,
8+
pub enabled: bool,
9+
pub driver: Option<CacheDriver>,
910
}
1011

1112
impl Default for Cache {
1213
fn default() -> Cache {
1314
Self {
14-
driver: CacheDriver::InMemory,
15+
enabled: true,
16+
driver: None,
1517
}
1618
}
1719
}
1820

21+
impl Cache {
22+
pub fn driver(&self) -> CacheDriver {
23+
self.driver.clone().unwrap_or(CacheDriver::InMemory)
24+
}
25+
}
26+
1927
#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
2028
#[serde(rename_all = "snake_case", deny_unknown_fields)]
2129
pub enum CacheDriver {

engine/packages/engine/tests/common/test_envoy.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ use std::collections::HashMap;
88
use std::sync::Arc;
99

1010
// Re-export everything from the standalone package
11+
pub use rivet_envoy_protocol::PROTOCOL_VERSION;
1112
pub use rivet_test_envoy::{
1213
ActorConfig, ActorEvent, ActorLifecycleEvent, ActorStartResult, ActorStopResult,
1314
CountingCrashActor, CrashNTimesThenSucceedActor, CrashOnStartActor, CustomActor,
1415
CustomActorBuilder, DelayedStartActor, EchoActor, Envoy, EnvoyBuilder, EnvoyConfig, KvRequest,
1516
NotifyOnStartActor, SleepImmediatelyActor, StopImmediatelyActor, TestActor, TimeoutActor,
1617
VerifyInputActor,
1718
};
18-
pub use rivet_envoy_protocol::PROTOCOL_VERSION;
1919

2020
// Type alias for backwards compatibility
2121
pub type TestEnvoy = Envoy;

engine/packages/pegboard/src/ops/actor_name/mod.rs

Lines changed: 0 additions & 1 deletion
This file was deleted.

0 commit comments

Comments
 (0)