Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit 4b96a09

Browse files
authored
export node task metrics to prometheus (#367)
* orchestrator now exports a /metrics/prometheus endpoint that includes the decentralized node's reported metrics
1 parent 67ee8cb commit 4b96a09

8 files changed

Lines changed: 123 additions & 7 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/orchestrator/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ google-cloud-auth = "0.18.0"
1717
google-cloud-storage = "0.24.0"
1818
hex = { workspace = true }
1919
log = { workspace = true }
20+
prometheus = "0.14.0"
2021
rand = "0.9.0"
2122
redis = { workspace = true }
2223
redis-test = { workspace = true }

crates/orchestrator/src/api/routes/heartbeat.rs

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,21 @@ async fn heartbeat(
4141
}
4242

4343
app_state.store_context.heartbeat_store.beat(&heartbeat);
44-
app_state
45-
.store_context
46-
.metrics_store
47-
.store_metrics(heartbeat.metrics.clone(), node_address);
44+
if let Some(metrics) = heartbeat.metrics.clone() {
45+
app_state
46+
.store_context
47+
.metrics_store
48+
.store_metrics(Some(metrics.clone()), node_address);
49+
50+
for metric in metrics {
51+
app_state.metrics.record_compute_task_gauge(
52+
&node_address.to_string(),
53+
&metric.key.task_id,
54+
&metric.key.label,
55+
metric.value,
56+
);
57+
}
58+
}
4859

4960
let current_task = app_state.scheduler.get_task_for_node(node_address);
5061
match current_task {
@@ -73,10 +84,13 @@ pub fn heartbeat_routes() -> Scope {
7384
mod tests {
7485
use super::*;
7586
use crate::api::tests::helper::create_test_app_state;
87+
7688
use actix_web::http::StatusCode;
7789
use actix_web::test;
7890
use actix_web::App;
7991
use serde_json::json;
92+
use shared::models::metric::MetricEntry;
93+
use shared::models::metric::MetricKey;
8094
use shared::models::task::TaskRequest;
8195

8296
#[actix_web::test]
@@ -90,7 +104,10 @@ mod tests {
90104
.await;
91105

92106
let address = "0x0000000000000000000000000000000000000000".to_string();
93-
let req_payload = json!({"address": address});
107+
let req_payload = json!({"address": address, "metrics": [
108+
{"key": {"task_id": "long-task-1234", "label": "performance/batch_avg_seq_length"}, "value": 1.0},
109+
{"key": {"task_id": "long-task-1234", "label": "performance/batch_min_seq_length"}, "value": 5.0}
110+
]});
94111

95112
let req = test::TestRequest::post()
96113
.uri("/heartbeat")
@@ -116,12 +133,31 @@ mod tests {
116133
address: "0x0000000000000000000000000000000000000000".to_string(),
117134
task_id: None,
118135
task_state: None,
119-
metrics: None,
136+
metrics: Some(vec![
137+
MetricEntry {
138+
key: MetricKey {
139+
task_id: "long-task-1234".to_string(),
140+
label: "performance/batch_avg_seq_length".to_string(),
141+
},
142+
value: 1.0,
143+
},
144+
MetricEntry {
145+
key: MetricKey {
146+
task_id: "long-task-1234".to_string(),
147+
label: "performance/batch_min_seq_length".to_string(),
148+
},
149+
value: 5.0,
150+
}
151+
]),
120152
version: None,
121153
timestamp: None,
122154
p2p_id: None,
123155
})
124156
);
157+
158+
let metrics = app_state.metrics.export_metrics().unwrap();
159+
assert!(metrics.contains("performance/batch_avg_seq_length"));
160+
assert!(metrics.contains("performance/batch_min_seq_length"));
125161
}
126162

127163
#[actix_web::test]

crates/orchestrator/src/api/routes/metrics.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ struct DeleteMetricRequest {
1717
label: String,
1818
address: String,
1919
}
20+
2021
async fn get_metrics(app_state: Data<AppState>) -> HttpResponse {
2122
let metrics = app_state
2223
.store_context
@@ -30,6 +31,18 @@ async fn get_all_metrics(app_state: Data<AppState>) -> HttpResponse {
3031
HttpResponse::Ok().json(json!({"success": true, "metrics": metrics}))
3132
}
3233

34+
async fn get_prometheus_metrics(app_state: Data<AppState>) -> HttpResponse {
35+
match app_state.metrics.export_metrics() {
36+
Ok(metrics) => HttpResponse::Ok()
37+
.content_type("text/plain; version=0.0.4")
38+
.body(metrics),
39+
Err(e) => HttpResponse::InternalServerError().json(json!({
40+
"success": false,
41+
"error": format!("Failed to export metrics: {}", e)
42+
})),
43+
}
44+
}
45+
3346
// for potential backup restore purposes
3447
async fn create_metric(
3548
app_state: Data<AppState>,
@@ -62,6 +75,7 @@ pub fn metrics_routes() -> Scope {
6275
web::scope("/metrics")
6376
.route("", get().to(get_metrics))
6477
.route("/all", get().to(get_all_metrics))
78+
.route("/prometheus", get().to(get_prometheus_metrics))
6579
.route("", post().to(create_metric))
6680
.route("/{task_id}", delete().to(delete_metric))
6781
}

crates/orchestrator/src/api/server.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::api::routes::nodes::nodes_routes;
22
use crate::api::routes::storage::storage_routes;
33
use crate::api::routes::task::tasks_routes;
44
use crate::api::routes::{heartbeat::heartbeat_routes, metrics::metrics_routes};
5+
use crate::metrics::MetricsContext;
56
use crate::models::node::NodeStatus;
67
use crate::plugins::node_groups::NodeGroupsPlugin;
78
use crate::scheduler::Scheduler;
@@ -32,6 +33,7 @@ pub struct AppState {
3233
pub pool_id: u32,
3334
pub scheduler: Scheduler,
3435
pub node_groups_plugin: Option<Arc<NodeGroupsPlugin>>,
36+
pub metrics: Arc<MetricsContext>,
3537
}
3638

3739
#[allow(clippy::too_many_arguments)]
@@ -50,6 +52,7 @@ pub async fn start_server(
5052
server_mode: ServerMode,
5153
scheduler: Scheduler,
5254
node_groups_plugin: Option<Arc<NodeGroupsPlugin>>,
55+
metrics: Arc<MetricsContext>,
5356
) -> Result<(), Error> {
5457
info!("Starting server at http://{}:{}", host, port);
5558
let app_state = Data::new(AppState {
@@ -63,6 +66,7 @@ pub async fn start_server(
6366
pool_id,
6467
scheduler,
6568
node_groups_plugin,
69+
metrics,
6670
});
6771
let node_store = app_state.store_context.node_store.clone();
6872
let node_store_clone = node_store.clone();

crates/orchestrator/src/api/tests/helper.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ use url::Url;
1919
pub async fn create_test_app_state() -> Data<AppState> {
2020
use shared::utils::MockStorageProvider;
2121

22-
use crate::{scheduler::Scheduler, utils::loop_heartbeats::LoopHeartbeats, ServerMode};
22+
use crate::{
23+
metrics::MetricsContext, scheduler::Scheduler, utils::loop_heartbeats::LoopHeartbeats,
24+
ServerMode,
25+
};
2326

2427
let store = Arc::new(RedisStore::new_test());
2528
let mut con = store
@@ -40,6 +43,7 @@ pub async fn create_test_app_state() -> Data<AppState> {
4043

4144
let mock_storage = MockStorageProvider::new();
4245
let storage_provider = Arc::new(mock_storage);
46+
let metrics = Arc::new(MetricsContext::new(1.to_string()));
4347

4448
Data::new(AppState {
4549
store_context: store_context.clone(),
@@ -58,6 +62,7 @@ pub async fn create_test_app_state() -> Data<AppState> {
5862
redis_store: store.clone(),
5963
scheduler,
6064
node_groups_plugin: None,
65+
metrics,
6166
})
6267
}
6368

@@ -66,6 +71,7 @@ pub async fn create_test_app_state_with_nodegroups() -> Data<AppState> {
6671
use shared::utils::MockStorageProvider;
6772

6873
use crate::{
74+
metrics::MetricsContext,
6975
plugins::node_groups::{NodeGroupConfiguration, NodeGroupsPlugin},
7076
scheduler::Scheduler,
7177
utils::loop_heartbeats::LoopHeartbeats,
@@ -105,6 +111,7 @@ pub async fn create_test_app_state_with_nodegroups() -> Data<AppState> {
105111

106112
let mock_storage = MockStorageProvider::new();
107113
let storage_provider = Arc::new(mock_storage);
114+
let metrics = Arc::new(MetricsContext::new(1.to_string()));
108115

109116
Data::new(AppState {
110117
store_context: store_context.clone(),
@@ -123,6 +130,7 @@ pub async fn create_test_app_state_with_nodegroups() -> Data<AppState> {
123130
redis_store: store.clone(),
124131
scheduler,
125132
node_groups_plugin,
133+
metrics,
126134
})
127135
}
128136

crates/orchestrator/src/main.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
mod api;
22
mod discovery;
33
mod events;
4+
mod metrics;
45
mod models;
56
mod node;
67
mod plugins;
@@ -24,6 +25,7 @@ use log::debug;
2425
use log::error;
2526
use log::info;
2627
use log::LevelFilter;
28+
use metrics::MetricsContext;
2729
use plugins::node_groups::NodeGroupConfiguration;
2830
use plugins::node_groups::NodeGroupsPlugin;
2931
use plugins::webhook::WebhookPlugin;
@@ -145,6 +147,8 @@ async fn main() -> Result<()> {
145147
debug!("Log level: {}", log_level);
146148
debug!("Server mode: {:?}", server_mode);
147149

150+
let metrics_context = Arc::new(MetricsContext::new(args.compute_pool_id.to_string()));
151+
148152
let heartbeats = Arc::new(LoopHeartbeats::new(&server_mode));
149153

150154
let compute_pool_id = args.compute_pool_id;
@@ -327,6 +331,7 @@ async fn main() -> Result<()> {
327331
server_mode,
328332
scheduler,
329333
node_groups_plugin,
334+
metrics_context,
330335
) => {
331336
if let Err(e) = res {
332337
error!("Server error: {}", e);
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use prometheus::{GaugeVec, Opts, Registry, TextEncoder};
2+
3+
pub struct MetricsContext {
4+
pub compute_task_gauges: GaugeVec,
5+
pub pool_id: String,
6+
pub registry: Registry,
7+
}
8+
9+
impl MetricsContext {
10+
pub fn new(pool_id: String) -> Self {
11+
// For current state/rate metrics
12+
let compute_task_gauges = GaugeVec::new(
13+
Opts::new("compute_gauges", "Compute task gauge metrics"),
14+
&["node_address", "task_id", "label", "pool_id"],
15+
)
16+
.unwrap();
17+
let registry = Registry::new();
18+
let _ = registry.register(Box::new(compute_task_gauges.clone()));
19+
20+
Self {
21+
compute_task_gauges,
22+
pool_id,
23+
registry,
24+
}
25+
}
26+
27+
pub fn record_compute_task_gauge(
28+
&self,
29+
node_address: &str,
30+
task_id: &str,
31+
label: &str,
32+
value: f64,
33+
) {
34+
println!("record_compute_task_gauge: {:?}", node_address);
35+
let vals = vec![node_address, task_id, label, &self.pool_id];
36+
println!("vals: {:?}", vals);
37+
self.compute_task_gauges
38+
.with_label_values(&[node_address, task_id, label, &self.pool_id])
39+
.set(value);
40+
}
41+
42+
pub fn export_metrics(&self) -> Result<String, prometheus::Error> {
43+
let encoder = TextEncoder::new();
44+
let metric_families = self.registry.gather();
45+
encoder.encode_to_string(&metric_families)
46+
}
47+
}

0 commit comments

Comments
 (0)