diff --git a/Cargo.toml b/Cargo.toml index 297de2cfdc..349082bfef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ rev = "b2c6f366ee68a7956fb69dd4f39357b3c184bd15" [workspace.dependencies.rivet-term] git = "https://github.com/rivet-gg/rivet-term" -rev = "b21d7a2" +rev = "55e328470b68c557fb9bc8298369f90182d35b6d" [workspace.dependencies.redis] git = "https://github.com/rivet-gg/redis-rs" diff --git a/packages/core/services/cluster/src/ops/server/get.rs b/packages/core/services/cluster/src/ops/server/get.rs index c129efc9ed..1264f2ef62 100644 --- a/packages/core/services/cluster/src/ops/server/get.rs +++ b/packages/core/services/cluster/src/ops/server/get.rs @@ -5,7 +5,7 @@ use std::{ use chirp_workflow::prelude::*; -use crate::types::{PoolType, Server}; +use crate::types::{PoolType, Server, ServerState}; #[derive(Debug)] pub struct Input { @@ -26,6 +26,7 @@ pub(crate) struct ServerRow { vlan_ip: Option, public_ip: Option, cloud_destroy_ts: Option, + state: i64, } impl TryFrom for Server { @@ -40,6 +41,7 @@ impl TryFrom for Server { lan_ip: value.vlan_ip, wan_ip: value.public_ip, cloud_destroy_ts: value.cloud_destroy_ts, + state: unwrap!(ServerState::from_repr(value.state.try_into()?)), }) } } @@ -56,7 +58,16 @@ pub async fn cluster_server_get(ctx: &OperationCtx, input: &Input) -> GlobalResu provider_server_id, vlan_ip, public_ip, - cloud_destroy_ts + cloud_destroy_ts, + CASE + WHEN cloud_destroy_ts IS NOT NULL THEN 6 -- Destroyed + WHEN taint_ts IS NOT NULL AND drain_ts IS NOT NULL THEN 5 -- TaintedDraining + WHEN drain_ts IS NOT NULL THEN 4 -- Draining + WHEN taint_ts IS NOT NULL THEN 3 -- Tainted + WHEN install_complete_ts IS NOT NULL THEN 2 -- Running + WHEN provision_complete_ts IS NOT NULL THEN 1 -- Installing + ELSE 0 -- Provisioning + END AS state FROM db_cluster.servers WHERE server_id = ANY($1) ", diff --git a/packages/core/services/cluster/src/ops/server/list.rs b/packages/core/services/cluster/src/ops/server/list.rs index c51a6e7555..6483e9f240 100644 --- a/packages/core/services/cluster/src/ops/server/list.rs +++ b/packages/core/services/cluster/src/ops/server/list.rs @@ -30,7 +30,16 @@ pub async fn cluster_server_list(ctx: &OperationCtx, input: &Input) -> GlobalRes s.provider_server_id, s.vlan_ip, s.public_ip, - s.cloud_destroy_ts + s.cloud_destroy_ts, + CASE + WHEN s.cloud_destroy_ts IS NOT NULL THEN 6 -- Destroyed + WHEN s.taint_ts IS NOT NULL AND s.drain_ts IS NOT NULL THEN 5 -- TaintedDraining + WHEN s.drain_ts IS NOT NULL THEN 4 -- Draining + WHEN s.taint_ts IS NOT NULL THEN 3 -- Tainted + WHEN s.install_complete_ts IS NOT NULL THEN 2 -- Running + WHEN s.provision_complete_ts IS NOT NULL THEN 1 -- Installing + ELSE 0 -- Provisioning + END AS state FROM db_cluster.servers AS s JOIN db_cluster.datacenters AS d ON s.datacenter_id = d.datacenter_id diff --git a/packages/core/services/cluster/src/types.rs b/packages/core/services/cluster/src/types.rs index dbc8400acb..85694a83de 100644 --- a/packages/core/services/cluster/src/types.rs +++ b/packages/core/services/cluster/src/types.rs @@ -149,6 +149,18 @@ pub struct Server { pub lan_ip: Option, pub wan_ip: Option, pub cloud_destroy_ts: Option, + pub state: ServerState, +} + +#[derive(Serialize, Deserialize, Hash, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, FromRepr)] +pub enum ServerState { + Provisioning = 0, + Installing = 1, + Running = 2, + Tainted = 3, + Draining = 4, + TaintedDraining = 5, + Destroyed = 6, } #[derive(Debug, Default, Clone)] diff --git a/packages/edge/infra/edge-server/src/run_config.rs b/packages/edge/infra/edge-server/src/run_config.rs index 73b914e7ce..5e2105f1d2 100644 --- a/packages/edge/infra/edge-server/src/run_config.rs +++ b/packages/edge/infra/edge-server/src/run_config.rs @@ -22,6 +22,11 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result { ServiceKind::Standalone, |config, pools| Box::pin(edge_monolith_workflow_worker::start(config, pools)), ), + Service::new( + "pegboard_usage_metrics_publish", + ServiceKind::Singleton, + |config, pools| Box::pin(pegboard_usage_metrics_publish::start(config, pools)), + ), ]; Ok(RunConfigData { diff --git a/packages/edge/services/pegboard/src/keys/client.rs b/packages/edge/services/pegboard/src/keys/client.rs index 733fb9f8a2..9900db56a8 100644 --- a/packages/edge/services/pegboard/src/keys/client.rs +++ b/packages/edge/services/pegboard/src/keys/client.rs @@ -246,6 +246,10 @@ impl ActorKey { pub fn subspace(client_id: Uuid) -> ActorSubspaceKey { ActorSubspaceKey::new(client_id) } + + pub fn entire_subspace() -> ActorSubspaceKey { + ActorSubspaceKey::entire() + } } impl FormalKey for ActorKey { @@ -290,12 +294,16 @@ impl<'de> TupleUnpack<'de> for ActorKey { } pub struct ActorSubspaceKey { - client_id: Uuid, + client_id: Option, } impl ActorSubspaceKey { fn new(client_id: Uuid) -> Self { - ActorSubspaceKey { client_id } + ActorSubspaceKey { client_id: Some(client_id) } + } + + fn entire() -> Self { + ActorSubspaceKey { client_id: None } } } @@ -305,8 +313,16 @@ impl TuplePack for ActorSubspaceKey { w: &mut W, tuple_depth: TupleDepth, ) -> std::io::Result { - let t = (CLIENT, ACTOR, self.client_id); - t.pack(w, tuple_depth) + let mut offset = VersionstampOffset::None { size: 0 }; + + let t = (CLIENT, ACTOR); + offset += t.pack(w, tuple_depth)?; + + if let Some(client_id) = &self.client_id { + offset += client_id.pack(w, tuple_depth)?; + } + + Ok(offset) } } diff --git a/packages/edge/services/pegboard/src/metrics.rs b/packages/edge/services/pegboard/src/metrics.rs index 2d9ec38c0e..db5515a90e 100644 --- a/packages/edge/services/pegboard/src/metrics.rs +++ b/packages/edge/services/pegboard/src/metrics.rs @@ -37,4 +37,18 @@ lazy_static::lazy_static! { BUCKETS.to_vec(), *REGISTRY, ).unwrap(); + + pub static ref ENV_CPU_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!( + "pegboard_env_cpu_usage", + "Total percent of CPU (per core) used by an environment.", + &["env_id", "flavor"], + *REGISTRY, + ).unwrap(); + + pub static ref ENV_MEMORY_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!( + "pegboard_env_memory_usage", + "Total MiB of memory used by an environment.", + &["env_id", "flavor"], + *REGISTRY, + ).unwrap(); } diff --git a/packages/edge/services/pegboard/src/ops/client/mod.rs b/packages/edge/services/pegboard/src/ops/client/mod.rs index d1f9520288..6c485e99ea 100644 --- a/packages/edge/services/pegboard/src/ops/client/mod.rs +++ b/packages/edge/services/pegboard/src/ops/client/mod.rs @@ -1,2 +1 @@ pub mod update_allocation_idx; -pub mod usage_get; diff --git a/packages/edge/services/pegboard/src/ops/client/usage_get.rs b/packages/edge/services/pegboard/src/ops/client/usage_get.rs deleted file mode 100644 index aa130d329b..0000000000 --- a/packages/edge/services/pegboard/src/ops/client/usage_get.rs +++ /dev/null @@ -1,162 +0,0 @@ -use std::collections::HashMap; - -use chirp_workflow::prelude::*; - -#[derive(Debug)] -pub struct Input { - pub client_ids: Vec, -} - -#[derive(Debug)] -pub struct Output { - pub clients: HashMap, -} - -#[derive(Debug)] -pub struct Stats { - /// Mhz - pub cpu: u32, - /// MiB - pub memory: u32, - /// MiB - pub disk: u32, -} - -#[operation] -pub async fn pegboard_client_usage_get(ctx: &OperationCtx, input: &Input) -> GlobalResult { - if ctx.config().server()?.prometheus.is_none() { - tracing::debug!("prometheus disabled"); - return Ok(Output { - clients: HashMap::new(), - }); - }; - - let prom_res = handle_request( - &util::url::to_string_without_slash(&ctx.config().server()?.prometheus()?.url), - formatdoc!( - r#" - label_replace( - sum by (client_id) ( - last_over_time( - rivet_pegboard_cpu_allocated{{ - client_id=~"({client_ids})", - }} - [15m:15s] - ) - ), - "metric", "cpu", "", "" - ) - OR - label_replace( - sum by (client_id) ( - last_over_time( - rivet_pegboard_memory_allocated{{ - client_id=~"({client_ids})", - }} - [15m:15s] - ) - ), - "metric", "mem", "", "" - ) - "#, - client_ids = input - .client_ids - .iter() - .map(|client_id| client_id.to_string()) - .collect::>() - .join("|"), - ) - .to_string(), - ) - .await?; - - let mut stats_by_client_id = HashMap::new(); - - // Aggregate rows into hashmap - for row in prom_res { - let server_entry = stats_by_client_id - .entry(row.labels.client_id) - .or_insert_with(|| Stats { - cpu: 0, - memory: 0, - disk: 0, - }); - - // Aggregate data - if let Some((_, value)) = row.value { - match row.labels.metric { - Metric::Cpu => { - // MiB - server_entry.cpu += value.parse::()? as u32; - } - Metric::Memory => { - // MHz - server_entry.memory += - value.parse::()? as u32 * server_spec::LINODE_CPU_PER_CORE / 1000; - } - } - } else { - tracing::warn!(?row, "no value from metric"); - } - } - - Ok(Output { - clients: stats_by_client_id, - }) -} - -#[derive(Debug, Deserialize)] -struct PrometheusResponse { - data: PrometheusData, -} - -#[derive(Debug, Deserialize)] -struct PrometheusData { - #[serde(rename = "resultType")] - _result_type: String, - result: Vec, -} - -#[derive(Debug, Deserialize)] -struct PrometheusResult { - #[serde(rename = "metric")] - labels: PrometheusLabels, - value: Option<(f64, String)>, -} - -#[derive(Debug, Deserialize)] -struct PrometheusLabels { - client_id: Uuid, - metric: Metric, -} - -#[derive(Debug, Deserialize)] -enum Metric { - #[serde(rename = "cpu")] - Cpu, - #[serde(rename = "mem")] - Memory, -} - -// TODO: Copied from topology_get -async fn handle_request(url: &String, query: String) -> GlobalResult> { - let query_pairs = vec![("query", query), ("timeout", "2500ms".to_owned())]; - - let query_string = serde_urlencoded::to_string(query_pairs)?; - let req_url = format!("{}/api/v1/query?{}", url, query_string); - - // Query prometheus - tracing::info!("querying prometheus"); - let res = reqwest::Client::new().get(req_url).send().await?; - - if !res.status().is_success() { - let status = res.status(); - let text = res.text().await?; - - bail!("failed prometheus request: ({}) {}", status, text); - } - - let body = res.json::().await?; - - Ok(body.data.result) -} diff --git a/packages/edge/services/pegboard/src/workflows/client/mod.rs b/packages/edge/services/pegboard/src/workflows/client/mod.rs index d5aecfa1d1..7dcb876d88 100644 --- a/packages/edge/services/pegboard/src/workflows/client/mod.rs +++ b/packages/edge/services/pegboard/src/workflows/client/mod.rs @@ -106,7 +106,7 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu config, system, }), - activity(UpdateMetricsInput { client_id, flavor }), + activity(UpdateMetricsInput { client_id, flavor, clear: false }), )) .await?; } @@ -118,7 +118,7 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu client_id, events: events.clone(), }), - activity(UpdateMetricsInput { client_id, flavor }), + activity(UpdateMetricsInput { client_id, flavor, clear: false }), )) .await?; @@ -243,6 +243,8 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu }) .await?; + ctx.activity(UpdateMetricsInput { client_id: input.client_id, flavor: input.flavor, clear: true }).await?; + let actors = ctx .activity(FetchRemainingActorsInput { client_id: input.client_id, @@ -673,7 +675,7 @@ pub async fn handle_commands( activity(InsertCommandsInput { commands: raw_commands.clone(), }), - activity(UpdateMetricsInput { client_id, flavor }), + activity(UpdateMetricsInput { client_id, flavor, clear: false }), )) .await?; @@ -914,11 +916,14 @@ async fn check_expired(ctx: &ActivityCtx, input: &CheckExpiredInput) -> GlobalRe struct UpdateMetricsInput { client_id: Uuid, flavor: ClientFlavor, + clear: bool, } #[activity(UpdateMetrics)] async fn update_metrics(ctx: &ActivityCtx, input: &UpdateMetricsInput) -> GlobalResult<()> { - let (memory, cpu) = + let (memory, cpu) = if input.clear { + (0, 0) + } else { ctx.fdb() .await? .run(|tx, _mc| async move { @@ -966,7 +971,8 @@ async fn update_metrics(ctx: &ActivityCtx, input: &UpdateMetricsInput) -> Global )) }) .custom_instrument(tracing::info_span!("client_update_metrics_tx")) - .await?; + .await? + }; metrics::CLIENT_CPU_ALLOCATED .with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()]) diff --git a/packages/edge/services/pegboard/standalone/usage-metrics-publish/Cargo.toml b/packages/edge/services/pegboard/standalone/usage-metrics-publish/Cargo.toml new file mode 100644 index 0000000000..dc122c6c57 --- /dev/null +++ b/packages/edge/services/pegboard/standalone/usage-metrics-publish/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "pegboard-usage-metrics-publish" +version.workspace = true +authors.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +chirp-client.workspace = true +chirp-workflow.workspace = true +foundationdb.workspace = true +fdb-util.workspace = true +rivet-connection.workspace = true +rivet-health-checks.workspace = true +rivet-metrics.workspace = true +rivet-runtime.workspace = true +tokio.workspace = true +tracing = "0.1" +tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "json", "ansi"] } + +pegboard.workspace = true +build.workspace = true +rivet-config.workspace = true + +[dependencies.sqlx] +workspace = true + +[dev-dependencies] diff --git a/packages/edge/services/pegboard/standalone/usage-metrics-publish/src/lib.rs b/packages/edge/services/pegboard/standalone/usage-metrics-publish/src/lib.rs new file mode 100644 index 0000000000..da1be365e1 --- /dev/null +++ b/packages/edge/services/pegboard/standalone/usage-metrics-publish/src/lib.rs @@ -0,0 +1,132 @@ +use std::collections::HashMap; + +use build::types::BuildKind; +use chirp_workflow::prelude::*; +use fdb_util::SNAPSHOT; +use foundationdb::{self as fdb, options::StreamingMode}; +use futures_util::{StreamExt, TryStreamExt}; +use pegboard::{keys, protocol}; + +struct Usage { + // Percent of core. + pub cpu: u64, + /// MiB. + pub memory: u64, +} + +pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> GlobalResult<()> { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(7)); + loop { + interval.tick().await; + + run_from_env(config.clone(), pools.clone()).await?; + } +} + +#[tracing::instrument(skip_all)] +pub async fn run_from_env( + config: rivet_config::Config, + pools: rivet_pools::Pools, +) -> GlobalResult<()> { + let client = chirp_client::SharedClient::from_env(pools.clone())? + .wrap_new("pegboard-usage-metrics-publish"); + let cache = rivet_cache::CacheInner::from_env(&config, pools.clone())?; + let ctx = StandaloneCtx::new( + db::DatabaseCrdbNats::from_pools(pools.clone())?, + config, + rivet_connection::Connection::new(client, pools, cache), + "pegboard-usage-metrics-publish", + ) + .await?; + + // List all actor ids that are currently running + let actor_ids = ctx + .fdb() + .await? + .run(|tx, _mc| async move { + let actor_subspace = + keys::subspace().subspace(&keys::client::ActorKey::entire_subspace()); + + tx.get_ranges_keyvalues( + fdb::RangeOption { + mode: StreamingMode::WantAll, + ..(&actor_subspace).into() + }, + // Not serializable because we don't want to interfere with normal operations + SNAPSHOT, + ) + .map(|res| match res { + Ok(entry) => { + let key = keys::subspace() + .unpack::(entry.key()) + .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; + + Ok(key.actor_id) + } + Err(err) => Err(Into::::into(err)), + }) + .try_collect::>() + .await + }) + .custom_instrument(tracing::info_span!("client_fetch_remaining_actors_tx")) + .await?; + + let actors_res = ctx + .op(pegboard::ops::actor::get::Input { + actor_ids, + endpoint_type: None, + }) + .await?; + + let builds_res = ctx + .op(build::ops::get::Input { + build_ids: actors_res + .actors + .iter() + .map(|actor| actor.image_id) + .collect(), + }) + .await?; + + let mut usage_by_env_and_flavor = HashMap::new(); + + // Aggregate data per env and flavor + for actor in &actors_res.actors { + if actor.start_ts.is_none() || actor.destroy_ts.is_some() { + continue; + } + + let Some(build) = builds_res + .builds + .iter() + .find(|build| build.build_id == actor.image_id) + else { + tracing::error!("build info not found for actor"); + continue; + }; + + let client_flavor = match build.kind { + BuildKind::DockerImage | BuildKind::OciBundle => protocol::ClientFlavor::Container, + BuildKind::JavaScript => protocol::ClientFlavor::Isolate, + }; + + let env_usage = usage_by_env_and_flavor + .entry((actor.env_id, client_flavor)) + .or_insert(Usage { cpu: 0, memory: 0 }); + + env_usage.cpu += (actor.resources.cpu_millicores / 10) as u64; + env_usage.memory += actor.resources.memory_mib as u64; + } + + // Insert metrics + for ((env_id, client_flavor), usage) in usage_by_env_and_flavor { + pegboard::metrics::ENV_CPU_USAGE + .with_label_values(&[&env_id.to_string(), &client_flavor.to_string()]) + .set(usage.cpu.try_into()?); + pegboard::metrics::ENV_MEMORY_USAGE + .with_label_values(&[&env_id.to_string(), &client_flavor.to_string()]) + .set(usage.memory.try_into()?); + } + + Ok(()) +} diff --git a/packages/edge/services/pegboard/standalone/usage-metrics-publish/tests/integration.rs b/packages/edge/services/pegboard/standalone/usage-metrics-publish/tests/integration.rs new file mode 100644 index 0000000000..6c8ea4d0f2 --- /dev/null +++ b/packages/edge/services/pegboard/standalone/usage-metrics-publish/tests/integration.rs @@ -0,0 +1 @@ +// TODO: