Skip to content

Commit bd6ef15

Browse files
committed
fix: move runner configs to epoxy
1 parent 0b43aa9 commit bd6ef15

File tree

26 files changed

+654
-457
lines changed

26 files changed

+654
-457
lines changed

engine/artifacts/openapi.json

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/api-peer/src/internal.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,6 @@ pub async fn get_epoxy_key_debug(
352352
.await?;
353353

354354
let instances = local_value
355-
.value
356355
.map(|_| {
357356
vec![EpoxyKeyInstance {
358357
replica_id,
@@ -499,10 +498,8 @@ pub async fn get_epoxy_kv_local(
499498
.await?;
500499

501500
Ok(GetEpoxyKvResponse {
502-
exists: output.value.is_some(),
503-
value: output
504-
.value
505-
.map(|v| base64::engine::general_purpose::STANDARD.encode(&v)),
501+
exists: output.is_some(),
502+
value: output.map(|v| base64::engine::general_purpose::STANDARD.encode(&v.value)),
506503
})
507504
}
508505

@@ -526,6 +523,7 @@ pub async fn get_epoxy_kv_optimistic(
526523
replica_id,
527524
key: key_bytes,
528525
caching_behavior: epoxy::protocol::CachingBehavior::Optimistic,
526+
save_empty: false,
529527
})
530528
.await?;
531529

engine/packages/api-peer/src/runner_configs.rs

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub async fn list(ctx: ApiCtx, _path: ListPath, query: ListQuery) -> Result<List
2828
]
2929
.concat();
3030

31-
let runner_configs = if !runner_names.is_empty() {
31+
let (runner_configs, cursor) = if !runner_names.is_empty() {
3232
let runner_configs = ctx
3333
.op(pegboard::ops::runner_config::get::Input {
3434
runners: runner_names
@@ -42,7 +42,7 @@ pub async fn list(ctx: ApiCtx, _path: ListPath, query: ListQuery) -> Result<List
4242
(
4343
runner_configs
4444
.into_iter()
45-
.map(|c| (c.name, c.config))
45+
.map(|c| (c.name, c.config, c.protocol_version))
4646
.collect::<Vec<_>>(),
4747
None,
4848
)
@@ -76,48 +76,49 @@ pub async fn list(ctx: ApiCtx, _path: ListPath, query: ListQuery) -> Result<List
7676

7777
let cursor = runner_configs
7878
.last()
79-
.map(|(name, config)| format!("{}:{}", runner_config_variant(&config), name));
79+
.map(|c| format!("{}:{}", runner_config_variant(&c.config), c.name));
8080

81-
(runner_configs, cursor)
81+
(
82+
runner_configs
83+
.into_iter()
84+
.map(|c| (c.name, c.config, c.protocol_version))
85+
.collect::<Vec<_>>(),
86+
cursor,
87+
)
8288
};
8389

8490
// Fetch pool errors
85-
let runner_pool_errors: HashMap<String, _> = if !runner_configs.0.is_empty() {
91+
let runner_pool_errors = if !runner_configs.is_empty() {
8692
let runners = runner_configs
87-
.0
8893
.iter()
89-
.map(|(name, _)| (namespace.namespace_id, name.clone()))
94+
.map(|(name, _, _)| (namespace.namespace_id, name.clone()))
9095
.collect();
9196
ctx.op(pegboard::ops::runner_config::get_error::Input { runners })
9297
.await?
9398
.into_iter()
9499
.map(|e| (e.runner_name, e.error))
95-
.collect()
100+
.collect::<HashMap<_, _>>()
96101
} else {
97102
HashMap::new()
98103
};
99104

100-
// Build response with pool errors
101-
let runner_configs_with_errors: HashMap<String, RunnerConfigResponse> = runner_configs
102-
.0
103-
.into_iter()
104-
.map(|(name, config)| {
105-
let runner_pool_error = runner_pool_errors.get(&name).cloned();
106-
(
107-
name,
108-
RunnerConfigResponse {
109-
config,
110-
runner_pool_error,
111-
},
112-
)
113-
})
114-
.collect();
115-
116105
Ok(ListResponse {
117-
runner_configs: runner_configs_with_errors,
118-
pagination: Pagination {
119-
cursor: runner_configs.1,
120-
},
106+
// Build response with pool errors
107+
runner_configs: runner_configs
108+
.into_iter()
109+
.map(|(name, config, protocol_version)| {
110+
let runner_pool_error = runner_pool_errors.get(&name).cloned();
111+
(
112+
name,
113+
RunnerConfigResponse {
114+
config,
115+
runner_pool_error,
116+
protocol_version,
117+
},
118+
)
119+
})
120+
.collect(),
121+
pagination: Pagination { cursor },
121122
})
122123
}
123124

engine/packages/api-public/src/runner_configs/upsert.rs

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ use rivet_api_util::request_remote_datacenter;
1212
use serde::{Deserialize, Serialize};
1313
use utoipa::ToSchema;
1414

15-
use super::utils;
1615
use crate::ctx::ApiCtx;
1716

1817
#[derive(Deserialize, Serialize, ToSchema)]
@@ -58,34 +57,6 @@ async fn upsert_inner(
5857
) -> Result<UpsertResponse> {
5958
ctx.auth().await?;
6059

61-
tracing::debug!(runner_name = ?path.runner_name, datacenters_count = body.datacenters.len(), "starting upsert");
62-
63-
// Resolve namespace
64-
let namespace = ctx
65-
.op(namespace::ops::resolve_for_name_global::Input {
66-
name: query.namespace.clone(),
67-
})
68-
.await?
69-
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;
70-
71-
// Store serverless config before processing (since we'll remove from body.datacenters)
72-
let serverless_config = body
73-
.datacenters
74-
.iter()
75-
.filter_map(|(_dc_name, runner_config)| {
76-
if let rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Serverless {
77-
url,
78-
headers,
79-
..
80-
} = &runner_config.kind
81-
{
82-
Some((url.clone(), headers.clone().unwrap_or_default()))
83-
} else {
84-
None
85-
}
86-
})
87-
.next();
88-
8960
let dcs = ctx
9061
.config()
9162
.topology()
@@ -162,35 +133,6 @@ async fn upsert_inner(
162133
.into_iter()
163134
.any(|endpoint_config_changed| endpoint_config_changed);
164135

165-
// Update runner metadata
166-
//
167-
// This allows us to populate the actor names immediately upon configuring a serverless runner
168-
if let Some((url, metadata_headers)) = serverless_config {
169-
if any_endpoint_config_changed {
170-
tracing::debug!("endpoint config changed, refreshing metadata");
171-
if let Err(err) = utils::refresh_runner_config_metadata(
172-
ctx.clone(),
173-
namespace.namespace_id,
174-
path.runner_name.clone(),
175-
url,
176-
metadata_headers,
177-
)
178-
.await
179-
{
180-
tracing::warn!(?err, runner_name=?path.runner_name, "failed to refresh runner config metadata");
181-
}
182-
} else {
183-
tracing::debug!("endpoint config unchanged, skipping metadata refresh");
184-
}
185-
}
186-
187-
pegboard::utils::purge_runner_config_caches(
188-
ctx.cache(),
189-
namespace.namespace_id,
190-
&path.runner_name,
191-
)
192-
.await?;
193-
194136
Ok(UpsertResponse {
195137
endpoint_config_changed: any_endpoint_config_changed,
196138
})

engine/packages/api-types/src/namespaces/runner_configs.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,13 @@ pub enum RunnerConfigKind {
2525
max_concurrent_actors: Option<u64>,
2626
/// Seconds.
2727
drain_grace_period: Option<u32>,
28+
/// Deprecated.
2829
slots_per_runner: u32,
30+
/// Deprecated.
2931
min_runners: Option<u32>,
32+
/// Deprecated.
3033
max_runners: u32,
34+
/// Deprecated.
3135
runners_margin: Option<u32>,
3236
/// Milliseconds between metadata polling. If not set, uses the global default.
3337
metadata_poll_interval: Option<u64>,

engine/packages/api-types/src/runner_configs/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub mod list;
77
pub struct RunnerConfigResponse {
88
#[serde(flatten)]
99
pub config: rivet_types::runner_configs::RunnerConfig,
10+
pub protocol_version: Option<u16>,
1011
#[serde(default, skip_serializing_if = "Option::is_none")]
1112
#[schema(value_type = Option<Object>, additional_properties = true)]
1213
pub runner_pool_error: Option<rivet_types::actor::RunnerPoolError>,

engine/packages/bootstrap/src/backfill.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,19 @@ pub async fn run(ctx: &StandaloneCtx) -> Result<()> {
2828
.await?;
2929
}
3030

31+
// Runner config backfill
32+
if !is_complete(
33+
ctx,
34+
pegboard::workflows::runner_pool_backfill::BACKFILL_NAME,
35+
)
36+
.await?
37+
{
38+
ctx.workflow(pegboard::workflows::runner_pool_backfill::Input {})
39+
.unique()
40+
.dispatch()
41+
.await?;
42+
}
43+
3144
Ok(())
3245
}
3346

engine/packages/config/src/config/topology.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,34 @@ impl<'a> IntoIterator for &'a DatacentersRepr {
124124
}
125125
}
126126

127+
pub enum DatacentersIntoIter {
128+
Map(std::collections::hash_map::IntoValues<String, Datacenter>),
129+
List(std::vec::IntoIter<Datacenter>),
130+
}
131+
132+
impl Iterator for DatacentersIntoIter {
133+
type Item = Datacenter;
134+
135+
fn next(&mut self) -> Option<Self::Item> {
136+
match self {
137+
DatacentersIntoIter::Map(iter) => iter.next(),
138+
DatacentersIntoIter::List(iter) => iter.next(),
139+
}
140+
}
141+
}
142+
143+
impl IntoIterator for DatacentersRepr {
144+
type Item = Datacenter;
145+
type IntoIter = DatacentersIntoIter;
146+
147+
fn into_iter(self) -> Self::IntoIter {
148+
match self {
149+
DatacentersRepr::Map(map) => DatacentersIntoIter::Map(map.into_values()),
150+
DatacentersRepr::List(vec) => DatacentersIntoIter::List(vec.into_iter()),
151+
}
152+
}
153+
}
154+
127155
#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
128156
#[serde(deny_unknown_fields)]
129157
pub struct Datacenter {

engine/packages/epoxy/src/keys/keys.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ use universaldb::tuple::Versionstamp;
66

77
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
88
pub struct CommittedValue {
9+
// NOTE: An empty value may exist for cached entries to denote the value was not found on any datacenter
10+
// and cached as such.
911
pub value: Vec<u8>,
1012
pub version: u64,
1113
pub mutable: bool,

engine/packages/epoxy/src/ops/kv/get_local.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,16 @@ pub(crate) async fn read_local_value(
9393
}
9494

9595
if let Some(value) = cache_value {
96+
let cache_value = cache_key.deserialize(&value)?;
97+
98+
// Special case with empty values. These are inserted in kv_get_optimistic with `save_empty`
99+
if cache_value.value.is_empty() {
100+
return Ok(LocalValueRead {
101+
value: None,
102+
cache_value: None,
103+
});
104+
}
105+
96106
return Ok(LocalValueRead {
97107
value: None,
98108
cache_value: Some(cache_key.deserialize(&value)?),

0 commit comments

Comments
 (0)