Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit 61a0a27

Browse files
authored
Fix: stale metrics in distributed deployment of orchestrator (#425)
* fix stale metrics serving when orchestrator is deployed in distributed env
1 parent e9c7863 commit 61a0a27

4 files changed

Lines changed: 177 additions & 28 deletions

File tree

crates/orchestrator/src/api/routes/heartbeat.rs

Lines changed: 63 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -98,17 +98,12 @@ async fn heartbeat(
9898
.map(|metric| (&metric.key.task_id, &metric.key.label))
9999
.collect();
100100

101-
// Clean up stale metrics
101+
// Clean up stale metrics from Redis only
102+
// The sync service will handle all Prometheus updates
102103
for (task_id, task_metrics) in previous_metrics {
103104
for (label, _value) in task_metrics {
104105
let prev_key = (&task_id, &label);
105106
if !new_metrics_set.contains(&prev_key) {
106-
// Remove from Prometheus metrics
107-
app_state.metrics.remove_compute_task_gauge(
108-
&node_address.to_string(),
109-
&task_id,
110-
&label,
111-
);
112107
// Remove from Redis metrics store
113108
if let Err(e) = app_state
114109
.store_context
@@ -122,7 +117,7 @@ async fn heartbeat(
122117
}
123118
}
124119

125-
// Store new metrics and update Prometheus
120+
// Store new metrics in Redis only
126121
if let Err(e) = app_state
127122
.store_context
128123
.metrics_store
@@ -131,15 +126,6 @@ async fn heartbeat(
131126
{
132127
error!("Error storing metrics: {}", e);
133128
}
134-
135-
for metric in metrics {
136-
app_state.metrics.record_compute_task_gauge(
137-
&node_address.to_string(),
138-
&metric.key.task_id,
139-
&metric.key.label,
140-
metric.value,
141-
);
142-
}
143129
}
144130

145131
let current_task = app_state.scheduler.get_task_for_node(node_address).await;
@@ -171,7 +157,9 @@ mod tests {
171157

172158
use super::*;
173159
use crate::api::tests::helper::create_test_app_state;
160+
use crate::metrics::sync_service::MetricsSyncService;
174161
use crate::models::node::OrchestratorNode;
162+
use crate::ServerMode;
175163

176164
use actix_web::http::StatusCode;
177165
use actix_web::test;
@@ -254,10 +242,38 @@ mod tests {
254242
})
255243
);
256244

257-
let metrics = app_state.metrics.export_metrics().unwrap();
258-
assert!(metrics.contains("performance/batch_avg_seq_length"));
259-
assert!(metrics.contains("performance/batch_min_seq_length"));
260-
assert!(metrics.contains("long-task-1234"));
245+
// Verify metrics are stored in Redis (heartbeat API responsibility)
246+
let redis_metrics = app_state
247+
.store_context
248+
.metrics_store
249+
.get_metrics_for_node(node_address)
250+
.await
251+
.unwrap();
252+
assert!(redis_metrics.contains_key("long-task-1234"));
253+
assert!(redis_metrics["long-task-1234"].contains_key("performance/batch_avg_seq_length"));
254+
assert!(redis_metrics["long-task-1234"].contains_key("performance/batch_min_seq_length"));
255+
256+
// Test metrics sync service: Redis -> Prometheus
257+
// Verify Prometheus registry is initially empty (no sync service has run)
258+
let prometheus_metrics_before = app_state.metrics.export_metrics().unwrap();
259+
assert!(!prometheus_metrics_before.contains("performance/batch_avg_seq_length"));
260+
261+
// Create and run sync service manually to test the sync functionality
262+
let sync_service = MetricsSyncService::new(
263+
app_state.store_context.clone(),
264+
app_state.metrics.clone(),
265+
ServerMode::Full, // Test app uses Full mode
266+
10,
267+
);
268+
269+
// Manually trigger a sync operation
270+
sync_service.sync_metrics_from_redis().await.unwrap();
271+
272+
// Verify Prometheus registry now contains the metrics from Redis
273+
let prometheus_metrics_after = app_state.metrics.export_metrics().unwrap();
274+
assert!(prometheus_metrics_after.contains("performance/batch_avg_seq_length"));
275+
assert!(prometheus_metrics_after.contains("performance/batch_min_seq_length"));
276+
assert!(prometheus_metrics_after.contains("long-task-1234"));
261277

262278
let heartbeat_two = json!({"address": address, "metrics": [
263279
{"key": {"task_id": "long-task-1235", "label": "performance/batch_len"}, "value": 10.0},
@@ -272,11 +288,17 @@ mod tests {
272288
let resp = test::call_service(&app, req).await;
273289
assert_eq!(resp.status(), StatusCode::OK);
274290

275-
let metrics = app_state.metrics.export_metrics().unwrap();
276-
assert!(metrics.contains("long-task-1235"));
277-
assert!(metrics.contains("performance/batch_len"));
278-
assert!(metrics.contains("performance/batch_min_len"));
279-
assert!(!metrics.contains("long-task-1234"));
291+
// Verify new metrics in Redis and old metrics cleaned up
292+
let redis_metrics = app_state
293+
.store_context
294+
.metrics_store
295+
.get_metrics_for_node(node_address)
296+
.await
297+
.unwrap();
298+
assert!(redis_metrics.contains_key("long-task-1235"));
299+
assert!(redis_metrics["long-task-1235"].contains_key("performance/batch_len"));
300+
assert!(redis_metrics["long-task-1235"].contains_key("performance/batch_min_len"));
301+
assert!(!redis_metrics.contains_key("long-task-1234")); // Stale metrics cleaned up
280302
let aggregated_metrics = app_state
281303
.store_context
282304
.metrics_store
@@ -293,6 +315,12 @@ mod tests {
293315
aggregated_metrics.get("performance/batch_avg_seq_length"),
294316
None
295317
);
318+
sync_service.sync_metrics_from_redis().await.unwrap();
319+
let prometheus_metrics_after_two = app_state.metrics.export_metrics().unwrap();
320+
assert!(prometheus_metrics_after_two.contains("performance/batch_len"));
321+
assert!(prometheus_metrics_after_two.contains("performance/batch_min_len"));
322+
assert!(prometheus_metrics_after_two.contains("long-task-1235"));
323+
assert!(!prometheus_metrics_after_two.contains("long-task-1234"));
296324

297325
let heartbeat_three = json!({"address": address, "metrics": [
298326
]});
@@ -305,15 +333,22 @@ mod tests {
305333
let resp = test::call_service(&app, req).await;
306334
assert_eq!(resp.status(), StatusCode::OK);
307335

308-
let metrics = app_state.metrics.export_metrics().unwrap();
336+
// Verify all metrics cleaned up from Redis
337+
let redis_metrics = app_state
338+
.store_context
339+
.metrics_store
340+
.get_metrics_for_node(node_address)
341+
.await
342+
.unwrap();
343+
assert!(redis_metrics.is_empty()); // All metrics for this node should be gone
344+
309345
let aggregated_metrics = app_state
310346
.store_context
311347
.metrics_store
312348
.get_aggregate_metrics_for_all_tasks()
313349
.await
314350
.unwrap();
315351
assert_eq!(aggregated_metrics, HashMap::new());
316-
assert_eq!(metrics, "");
317352
}
318353

319354
#[actix_web::test]

crates/orchestrator/src/main.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use log::debug;
2525
use log::error;
2626
use log::info;
2727
use log::LevelFilter;
28+
use metrics::sync_service::MetricsSyncService;
2829
use metrics::webhook_sender::MetricsWebhookSender;
2930
use metrics::MetricsContext;
3031
use plugins::node_groups::NodeGroupConfiguration;
@@ -268,6 +269,19 @@ async fn main() -> Result<()> {
268269

269270
// Only spawn processor tasks if in ProcessorOnly or Full mode
270271
if matches!(server_mode, ServerMode::ProcessorOnly | ServerMode::Full) {
272+
// Start metrics sync service to centralize metrics from Redis to Prometheus
273+
let metrics_sync_store_context = store_context.clone();
274+
let metrics_sync_context = metrics_context.clone();
275+
tasks.spawn(async move {
276+
let sync_service = MetricsSyncService::new(
277+
metrics_sync_store_context,
278+
metrics_sync_context,
279+
server_mode,
280+
10,
281+
);
282+
sync_service.run().await
283+
});
284+
271285
if let Some(group_plugin) = node_groups_plugin.clone() {
272286
tasks.spawn(async move {
273287
group_plugin

crates/orchestrator/src/metrics/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use prometheus::{GaugeVec, Opts, Registry, TextEncoder};
2+
pub mod sync_service;
23
pub mod webhook_sender;
34

45
pub struct MetricsContext {
@@ -53,4 +54,11 @@ impl MetricsContext {
5354
let metric_families = self.registry.gather();
5455
encoder.encode_to_string(&metric_families)
5556
}
57+
58+
/// Clear all metrics from the registry
59+
pub fn clear_compute_task_metrics(&self) {
60+
// Clear all time series from the compute_task_gauges metric family
61+
// This removes all existing metrics so we can rebuild from Redis
62+
self.compute_task_gauges.reset();
63+
}
5664
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
use crate::metrics::MetricsContext;
2+
use crate::store::core::StoreContext;
3+
use crate::ServerMode;
4+
use log::{debug, error, info};
5+
use std::sync::Arc;
6+
use std::time::Duration;
7+
use tokio::time::interval;
8+
9+
pub struct MetricsSyncService {
10+
store_context: Arc<StoreContext>,
11+
metrics_context: Arc<MetricsContext>,
12+
server_mode: ServerMode,
13+
sync_interval: Duration,
14+
}
15+
16+
impl MetricsSyncService {
17+
pub fn new(
18+
store_context: Arc<StoreContext>,
19+
metrics_context: Arc<MetricsContext>,
20+
server_mode: ServerMode,
21+
sync_interval_seconds: u64,
22+
) -> Self {
23+
Self {
24+
store_context,
25+
metrics_context,
26+
server_mode,
27+
sync_interval: Duration::from_secs(sync_interval_seconds),
28+
}
29+
}
30+
31+
pub async fn run(&self) -> anyhow::Result<()> {
32+
// Only run the sync service on ProcessorOnly or Full mode instances
33+
if !matches!(
34+
self.server_mode,
35+
ServerMode::ProcessorOnly | ServerMode::Full
36+
) {
37+
debug!("Metrics sync service disabled for ApiOnly mode");
38+
return Ok(());
39+
}
40+
41+
info!(
42+
"Starting metrics sync service (interval: {:?})",
43+
self.sync_interval
44+
);
45+
let mut interval = interval(self.sync_interval);
46+
47+
loop {
48+
interval.tick().await;
49+
if let Err(e) = self.sync_metrics_from_redis().await {
50+
error!("Error syncing metrics from Redis: {}", e);
51+
}
52+
}
53+
}
54+
55+
pub async fn sync_metrics_from_redis(&self) -> anyhow::Result<()> {
56+
debug!("Syncing metrics from Redis to Prometheus");
57+
58+
// Get all metrics from Redis
59+
let redis_metrics = match self.store_context.metrics_store.get_all_metrics().await {
60+
Ok(metrics) => metrics,
61+
Err(e) => {
62+
error!("Failed to get metrics from Redis: {}", e);
63+
return Err(e);
64+
}
65+
};
66+
67+
// Clear existing Prometheus metrics
68+
self.metrics_context.clear_compute_task_metrics();
69+
70+
// Rebuild metrics from Redis data
71+
let mut total_metrics = 0;
72+
for (task_id, task_metrics) in redis_metrics {
73+
for (label, node_metrics) in task_metrics {
74+
for (node_address, value) in node_metrics {
75+
self.metrics_context.record_compute_task_gauge(
76+
&node_address,
77+
&task_id,
78+
&label,
79+
value,
80+
);
81+
total_metrics += 1;
82+
}
83+
}
84+
}
85+
86+
debug!(
87+
"Synced {} metric entries from Redis to Prometheus",
88+
total_metrics
89+
);
90+
Ok(())
91+
}
92+
}

0 commit comments

Comments
 (0)