Skip to content

Commit 9a2f692

Browse files
committed
fix(rivetkit): Check for runner config before upserting
1 parent 9bfd2bc commit 9a2f692

2 files changed

Lines changed: 93 additions & 190 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/registry/runner_config.rs

Lines changed: 93 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
use std::collections::HashMap;
12
use std::net::IpAddr;
23

34
use anyhow::{Context, Result};
45
use reqwest::{Client, Url};
56
use serde::Deserialize;
6-
use serde_json::{Map as JsonMap, json};
7+
use serde_json::{Value as JsonValue, json};
78

89
use super::ServeConfig;
910

@@ -17,6 +18,24 @@ struct Datacenter {
1718
name: String,
1819
}
1920

21+
#[derive(Debug, Deserialize)]
22+
struct RunnerConfigsResponse {
23+
#[serde(default)]
24+
runner_configs: HashMap<String, RunnerConfigDatacenters>,
25+
}
26+
27+
#[derive(Debug, Deserialize)]
28+
struct RunnerConfigDatacenters {
29+
#[serde(default)]
30+
datacenters: HashMap<String, RunnerConfigEntry>,
31+
}
32+
33+
#[derive(Debug, Deserialize)]
34+
struct RunnerConfigEntry {
35+
#[serde(default)]
36+
normal: Option<JsonValue>,
37+
}
38+
2039
pub(super) async fn ensure_local_normal_runner_config(config: &ServeConfig) -> Result<()> {
2140
if !is_local_engine_endpoint(&config.endpoint) {
2241
return Ok(());
@@ -25,26 +44,56 @@ pub(super) async fn ensure_local_normal_runner_config(config: &ServeConfig) -> R
2544
let client = Client::builder()
2645
.build()
2746
.context("build reqwest client for runner config")?;
28-
let datacenters = get_datacenters(&client, config).await?;
29-
let mut runner_datacenters = JsonMap::new();
30-
31-
for datacenter in datacenters.datacenters {
32-
runner_datacenters.insert(
33-
datacenter.name,
34-
json!({
35-
"normal": {},
36-
"drain_on_version_upgrade": true,
37-
}),
47+
48+
let (datacenters, runner_configs) = tokio::try_join!(
49+
get_datacenters(&client, config),
50+
get_runner_configs(&client, config),
51+
)?;
52+
53+
let existing = runner_configs.runner_configs.get(config.pool_name.as_str());
54+
55+
let missing: Vec<String> = datacenters
56+
.datacenters
57+
.into_iter()
58+
.filter(|dc| {
59+
let has_normal = existing
60+
.and_then(|pool| pool.datacenters.get(&dc.name))
61+
.map(|entry| entry.normal.is_some())
62+
.unwrap_or(false);
63+
!has_normal
64+
})
65+
.map(|dc| dc.name)
66+
.collect();
67+
68+
if missing.is_empty() {
69+
tracing::debug!(
70+
namespace = %config.namespace,
71+
pool_name = %config.pool_name,
72+
"local normal runner config already up to date"
3873
);
74+
return Ok(());
3975
}
4076

77+
let missing_count = missing.len();
4178
let url = engine_api_url(
4279
&config.endpoint,
4380
&["runner-configs", config.pool_name.as_str()],
4481
&config.namespace,
4582
)?;
83+
let entry = json!({
84+
"normal": {
85+
"drain_on_version_upgrade": true,
86+
"actor_eviction_delay": 0,
87+
"actor_eviction_period": 0,
88+
"actor_eviction_rate": 1.0,
89+
},
90+
});
91+
let datacenters_body: serde_json::Map<String, JsonValue> = missing
92+
.iter()
93+
.map(|name| (name.clone(), entry.clone()))
94+
.collect();
4695
let body = json!({
47-
"datacenters": runner_datacenters,
96+
"datacenters": datacenters_body,
4897
});
4998

5099
let response = apply_auth(client.put(url), config)
@@ -69,6 +118,7 @@ pub(super) async fn ensure_local_normal_runner_config(config: &ServeConfig) -> R
69118
tracing::debug!(
70119
namespace = %config.namespace,
71120
pool_name = %config.pool_name,
121+
missing_count,
72122
"ensured local normal runner config"
73123
);
74124

@@ -100,6 +150,37 @@ async fn get_datacenters(client: &Client, config: &ServeConfig) -> Result<Datace
100150
.context("decode datacenters response")
101151
}
102152

153+
async fn get_runner_configs(
154+
client: &Client,
155+
config: &ServeConfig,
156+
) -> Result<RunnerConfigsResponse> {
157+
let mut url = engine_api_url(&config.endpoint, &["runner-configs"], &config.namespace)?;
158+
url.query_pairs_mut()
159+
.append_pair("runner_name", config.pool_name.as_str());
160+
let response = apply_auth(client.get(url), config)
161+
.send()
162+
.await
163+
.context("get local runner configs")?;
164+
let status = response.status();
165+
if !status.is_success() {
166+
let response_body = response
167+
.text()
168+
.await
169+
.context("read failed runner configs response body")?;
170+
anyhow::bail!(
171+
"failed to get local runner configs for `{}`: {} {}",
172+
config.pool_name,
173+
status,
174+
response_body
175+
);
176+
}
177+
178+
response
179+
.json::<RunnerConfigsResponse>()
180+
.await
181+
.context("decode runner configs response")
182+
}
183+
103184
fn apply_auth(request: reqwest::RequestBuilder, config: &ServeConfig) -> reqwest::RequestBuilder {
104185
match config.token.as_deref() {
105186
Some(token) => request.bearer_auth(token),

rivetkit-typescript/packages/rivetkit/runtime/index.ts

Lines changed: 0 additions & 178 deletions
This file was deleted.

0 commit comments

Comments
 (0)