Skip to content

Commit c15aa52

Browse files
committed
fix: move runner configs to epoxy
1 parent e68c174 commit c15aa52

11 files changed

Lines changed: 262 additions & 429 deletions

File tree

engine/artifacts/openapi.json

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/api-peer/src/internal.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,6 @@ pub async fn get_epoxy_key_debug(
352352
.await?;
353353

354354
let instances = local_value
355-
.value
356355
.map(|_| {
357356
vec![EpoxyKeyInstance {
358357
replica_id,
@@ -499,10 +498,8 @@ pub async fn get_epoxy_kv_local(
499498
.await?;
500499

501500
Ok(GetEpoxyKvResponse {
502-
exists: output.value.is_some(),
503-
value: output
504-
.value
505-
.map(|v| base64::engine::general_purpose::STANDARD.encode(&v)),
501+
exists: output.is_some(),
502+
value: output.map(|v| base64::engine::general_purpose::STANDARD.encode(&v.value)),
506503
})
507504
}
508505

engine/packages/api-peer/src/runner_configs.rs

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub async fn list(ctx: ApiCtx, _path: ListPath, query: ListQuery) -> Result<List
2828
]
2929
.concat();
3030

31-
let runner_configs = if !runner_names.is_empty() {
31+
let (runner_configs, cursor) = if !runner_names.is_empty() {
3232
let runner_configs = ctx
3333
.op(pegboard::ops::runner_config::get::Input {
3434
runners: runner_names
@@ -42,7 +42,7 @@ pub async fn list(ctx: ApiCtx, _path: ListPath, query: ListQuery) -> Result<List
4242
(
4343
runner_configs
4444
.into_iter()
45-
.map(|c| (c.name, c.config))
45+
.map(|c| (c.name, c.config, c.protocol_version))
4646
.collect::<Vec<_>>(),
4747
None,
4848
)
@@ -76,48 +76,49 @@ pub async fn list(ctx: ApiCtx, _path: ListPath, query: ListQuery) -> Result<List
7676

7777
let cursor = runner_configs
7878
.last()
79-
.map(|(name, config)| format!("{}:{}", runner_config_variant(&config), name));
79+
.map(|c| format!("{}:{}", runner_config_variant(&c.config), c.name));
8080

81-
(runner_configs, cursor)
81+
(
82+
runner_configs
83+
.into_iter()
84+
.map(|c| (c.name, c.config, c.protocol_version))
85+
.collect::<Vec<_>>(),
86+
cursor,
87+
)
8288
};
8389

8490
// Fetch pool errors
85-
let runner_pool_errors: HashMap<String, _> = if !runner_configs.0.is_empty() {
91+
let runner_pool_errors = if !runner_configs.is_empty() {
8692
let runners = runner_configs
87-
.0
8893
.iter()
89-
.map(|(name, _)| (namespace.namespace_id, name.clone()))
94+
.map(|(name, _, _)| (namespace.namespace_id, name.clone()))
9095
.collect();
9196
ctx.op(pegboard::ops::runner_config::get_error::Input { runners })
9297
.await?
9398
.into_iter()
9499
.map(|e| (e.runner_name, e.error))
95-
.collect()
100+
.collect::<HashMap<_, _>>()
96101
} else {
97102
HashMap::new()
98103
};
99104

100-
// Build response with pool errors
101-
let runner_configs_with_errors: HashMap<String, RunnerConfigResponse> = runner_configs
102-
.0
103-
.into_iter()
104-
.map(|(name, config)| {
105-
let runner_pool_error = runner_pool_errors.get(&name).cloned();
106-
(
107-
name,
108-
RunnerConfigResponse {
109-
config,
110-
runner_pool_error,
111-
},
112-
)
113-
})
114-
.collect();
115-
116105
Ok(ListResponse {
117-
runner_configs: runner_configs_with_errors,
118-
pagination: Pagination {
119-
cursor: runner_configs.1,
120-
},
106+
// Build response with pool errors
107+
runner_configs: runner_configs
108+
.into_iter()
109+
.map(|(name, config, protocol_version)| {
110+
let runner_pool_error = runner_pool_errors.get(&name).cloned();
111+
(
112+
name,
113+
RunnerConfigResponse {
114+
config,
115+
runner_pool_error,
116+
protocol_version,
117+
},
118+
)
119+
})
120+
.collect(),
121+
pagination: Pagination { cursor },
121122
})
122123
}
123124

engine/packages/api-public/src/runner_configs/upsert.rs

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@ async fn upsert_inner(
5858
) -> Result<UpsertResponse> {
5959
ctx.auth().await?;
6060

61-
tracing::debug!(runner_name = ?path.runner_name, datacenters_count = body.datacenters.len(), "starting upsert");
62-
6361
// Resolve namespace
6462
let namespace = ctx
6563
.op(namespace::ops::resolve_for_name_global::Input {
@@ -68,24 +66,6 @@ async fn upsert_inner(
6866
.await?
6967
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;
7068

71-
// Store serverless config before processing (since we'll remove from body.datacenters)
72-
let serverless_config = body
73-
.datacenters
74-
.iter()
75-
.filter_map(|(_dc_name, runner_config)| {
76-
if let rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Serverless {
77-
url,
78-
headers,
79-
..
80-
} = &runner_config.kind
81-
{
82-
Some((url.clone(), headers.clone().unwrap_or_default()))
83-
} else {
84-
None
85-
}
86-
})
87-
.next();
88-
8969
let dcs = ctx
9070
.config()
9171
.topology()
@@ -162,35 +142,6 @@ async fn upsert_inner(
162142
.into_iter()
163143
.any(|endpoint_config_changed| endpoint_config_changed);
164144

165-
// Update runner metadata
166-
//
167-
// This allows us to populate the actor names immediately upon configuring a serverless runner
168-
if let Some((url, metadata_headers)) = serverless_config {
169-
if any_endpoint_config_changed {
170-
tracing::debug!("endpoint config changed, refreshing metadata");
171-
if let Err(err) = utils::refresh_runner_config_metadata(
172-
ctx.clone(),
173-
namespace.namespace_id,
174-
path.runner_name.clone(),
175-
url,
176-
metadata_headers,
177-
)
178-
.await
179-
{
180-
tracing::warn!(?err, runner_name=?path.runner_name, "failed to refresh runner config metadata");
181-
}
182-
} else {
183-
tracing::debug!("endpoint config unchanged, skipping metadata refresh");
184-
}
185-
}
186-
187-
pegboard::utils::purge_runner_config_caches(
188-
ctx.cache(),
189-
namespace.namespace_id,
190-
&path.runner_name,
191-
)
192-
.await?;
193-
194145
Ok(UpsertResponse {
195146
endpoint_config_changed: any_endpoint_config_changed,
196147
})

engine/packages/api-types/src/namespaces/runner_configs.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,13 @@ pub enum RunnerConfigKind {
2525
max_concurrent_actors: Option<u64>,
2626
/// Seconds.
2727
drain_grace_period: Option<u32>,
28+
/// Deprecated.
2829
slots_per_runner: u32,
30+
/// Deprecated.
2931
min_runners: Option<u32>,
32+
/// Deprecated.
3033
max_runners: u32,
34+
/// Deprecated.
3135
runners_margin: Option<u32>,
3236
/// Milliseconds between metadata polling. If not set, uses the global default.
3337
metadata_poll_interval: Option<u64>,

engine/packages/api-types/src/runner_configs/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub mod list;
77
pub struct RunnerConfigResponse {
88
#[serde(flatten)]
99
pub config: rivet_types::runner_configs::RunnerConfig,
10+
pub protocol_version: Option<u16>,
1011
#[serde(default, skip_serializing_if = "Option::is_none")]
1112
#[schema(value_type = Option<Object>, additional_properties = true)]
1213
pub runner_pool_error: Option<rivet_types::actor::RunnerPoolError>,

engine/packages/epoxy/src/ops/kv/get_optimistic.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul
8080
&quorum_members,
8181
utils::QuorumType::Any,
8282
|replica_id| {
83+
let ctx = ctx.clone();
8384
let config = config.clone();
8485
let key = input.key.clone();
8586
let from_replica_id = input.replica_id;
@@ -109,7 +110,7 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul
109110
.await?;
110111

111112
// Should only have 1 response
112-
if let Some(value) = responses.first().and_then(|r| r.response) {
113+
if let Some(value) = responses.into_iter().flatten().next() {
113114
let value = CommittedValue {
114115
value: value.value,
115116
version: value.version,

0 commit comments

Comments
 (0)