Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit 18d6883

Browse files
authored
imp(orchestrator): optimize heartbeat & metrics storage (#545)
* optimize heartbeat & metrics store
1 parent 5a25777 commit 18d6883

3 files changed

Lines changed: 258 additions & 129 deletions

File tree

crates/orchestrator/src/api/routes/heartbeat.rs

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -95,46 +95,54 @@ async fn heartbeat(
9595
error!("Heartbeat Error: {}", e);
9696
}
9797
if let Some(metrics) = heartbeat.metrics.clone() {
98-
// Get all previously reported metrics for this node
99-
let previous_metrics = match app_state
98+
// Get current metric keys for this node efficiently using HKEYS
99+
let previous_metric_keys = match app_state
100100
.store_context
101101
.metrics_store
102-
.get_metrics_for_node(node_address)
102+
.get_metric_keys_for_node(node_address)
103103
.await
104104
{
105-
Ok(metrics) => metrics,
105+
Ok(keys) => keys,
106106
Err(e) => {
107-
error!("Error getting metrics for node: {}", e);
108-
Default::default()
107+
error!("Error getting metric keys for node: {}", e);
108+
Vec::new()
109109
}
110110
};
111111

112-
// Create a HashSet of new metrics for efficient lookup
113-
let new_metrics_set: HashSet<_> = metrics
112+
// Create a HashSet of new metric keys for efficient lookup
113+
let new_metrics_set: HashSet<String> = metrics
114114
.iter()
115-
.map(|metric| (&metric.key.task_id, &metric.key.label))
115+
.map(|metric| {
116+
let task_id = if metric.key.task_id.is_empty() {
117+
"manual".to_string()
118+
} else {
119+
metric.key.task_id.clone()
120+
};
121+
format!("{}:{}", task_id, metric.key.label.replace(':', ""))
122+
})
116123
.collect();
117124

118-
// Clean up stale metrics from Redis only
119-
// The sync service will handle all Prometheus updates
120-
for (task_id, task_metrics) in previous_metrics {
121-
for (label, _value) in task_metrics {
122-
let prev_key = (&task_id, &label);
123-
if !new_metrics_set.contains(&prev_key) {
124-
// Remove from Redis metrics store
125-
if let Err(e) = app_state
126-
.store_context
127-
.metrics_store
128-
.delete_metric(&task_id, &label, &node_address.to_string())
129-
.await
130-
{
131-
error!("Error deleting metric: {}", e);
132-
}
125+
// Find stale metrics to delete
126+
let stale_metrics: Vec<String> = previous_metric_keys
127+
.into_iter()
128+
.filter(|key| !new_metrics_set.contains(key))
129+
.collect();
130+
131+
// Delete stale metrics efficiently
132+
for metric_key in stale_metrics {
133+
if let Some((task_id, label)) = metric_key.split_once(':') {
134+
if let Err(e) = app_state
135+
.store_context
136+
.metrics_store
137+
.delete_metric(task_id, label, &node_address.to_string())
138+
.await
139+
{
140+
error!("Error deleting metric: {}", e);
133141
}
134142
}
135143
}
136144

137-
// Store new metrics in Redis only
145+
// Store new metrics in Redis
138146
if let Err(e) = app_state
139147
.store_context
140148
.metrics_store

crates/orchestrator/src/main.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,51 @@ struct Args {
126126
max_healthy_nodes_with_same_endpoint: u32,
127127
}
128128

129+
async fn run_inactive_node_metric_migration(store_context: Arc<StoreContext>) -> Result<()> {
130+
info!("Starting migration of inactive node metrics to new data model");
131+
132+
let node_addresses = match store_context.node_store.get_nodes().await {
133+
Ok(nodes) => {
134+
let addresses: Vec<_> = nodes.into_iter().map(|node| node.address).collect();
135+
info!("Found {} nodes to migrate", addresses.len());
136+
addresses
137+
}
138+
Err(e) => {
139+
error!("Error getting all nodes for migration: {}", e);
140+
return Ok(()); // Don't fail startup if migration can't get nodes
141+
}
142+
};
143+
144+
let mut migrated_count = 0;
145+
let mut error_count = 0;
146+
147+
for node_address in node_addresses {
148+
match store_context
149+
.metrics_store
150+
.migrate_node_metrics_if_needed(node_address)
151+
.await
152+
{
153+
Ok(()) => {
154+
migrated_count += 1;
155+
if migrated_count % 100 == 0 {
156+
info!("Migrated {} nodes so far...", migrated_count);
157+
}
158+
}
159+
Err(e) => {
160+
error!("Error migrating metrics for node {}: {}", node_address, e);
161+
error_count += 1;
162+
// Continue with other nodes even if one fails
163+
}
164+
}
165+
}
166+
167+
info!(
168+
"Migration completed. Successfully migrated {} nodes, {} errors",
169+
migrated_count, error_count
170+
);
171+
Ok(())
172+
}
173+
129174
#[tokio::main]
130175
async fn main() -> Result<()> {
131176
let args = Args::parse();
@@ -176,6 +221,10 @@ async fn main() -> Result<()> {
176221
let store = Arc::new(RedisStore::new(&args.redis_store_url));
177222
let store_context = Arc::new(StoreContext::new(store.clone()));
178223

224+
// Run one-time migration for inactive nodes
225+
let migration_store_context = store_context.clone();
226+
run_inactive_node_metric_migration(migration_store_context).await?;
227+
179228
let p2p_client = Arc::new(P2PClient::new(wallet.clone()).await.unwrap());
180229

181230
let contracts = ContractBuilder::new(wallet.provider())

0 commit comments

Comments
 (0)