Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit 384805a

Browse files
authored
Release/0.3.7 (#579)
* add additional status update loop metrics (#575) * increase heartbeat timeout (#574) * switch to sequential validation as quickfix for nonce issues (#576) * imp(worker): host nw mode with ability to switch networking config (#577) * host nw mode with ability to switch networking config * imp(general): Orchestrator Discovery Sync, Worker Timeouts (#578) * fix race condition in discovery sync, decrease heartbeat ttl, increase general auth mw request timeout * increase heartbeat timeout * release 0.3.7
2 parents fb912cf + 7726382 commit 384805a

13 files changed

Lines changed: 147 additions & 57 deletions

File tree

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ iroh = "0.34.1"
4040
rand_v8 = { package = "rand", version = "0.8.5", features = ["std"] }
4141
rand_core_v6 = { package = "rand_core", version = "0.6.4", features = ["std"] }
4242
[workspace.package]
43-
version = "0.3.6"
43+
version = "0.3.7"
4444
edition = "2021"
4545

4646
[workspace.features]

crates/orchestrator/src/discovery/monitor.rs

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -309,23 +309,40 @@ impl DiscoveryMonitor {
309309
if !discovery_node.is_active && existing_node.status == NodeStatus::Healthy {
310310
// Node is active False but we have it in store and it is healthy
311311
// This means that the node likely got kicked by e.g. the validator
312-
// We simply remove it from the store now and will rediscover it later?
313-
info!(
314-
"Node {} is no longer active on chain, marking as ejected",
315-
node_address
316-
);
317-
if !discovery_node.is_provider_whitelisted {
318-
if let Err(e) = self
319-
.update_node_status(&node_address, NodeStatus::Ejected)
312+
// Add a grace period check to avoid immediately marking nodes that just became healthy
313+
let should_mark_inactive =
314+
if let Some(last_status_change) = existing_node.last_status_change {
315+
let grace_period = chrono::Duration::minutes(5); // 5 minute grace period
316+
let now = chrono::Utc::now();
317+
now.signed_duration_since(last_status_change) > grace_period
318+
} else {
319+
// If no last_status_change, assume it's been healthy for a while
320+
true
321+
};
322+
323+
if should_mark_inactive {
324+
info!(
325+
"Node {} is no longer active on chain, marking as ejected",
326+
node_address
327+
);
328+
if !discovery_node.is_provider_whitelisted {
329+
if let Err(e) = self
330+
.update_node_status(&node_address, NodeStatus::Ejected)
331+
.await
332+
{
333+
error!("Error updating node status: {}", e);
334+
}
335+
} else if let Err(e) = self
336+
.update_node_status(&node_address, NodeStatus::Dead)
320337
.await
321338
{
322339
error!("Error updating node status: {}", e);
323340
}
324-
} else if let Err(e) = self
325-
.update_node_status(&node_address, NodeStatus::Dead)
326-
.await
327-
{
328-
error!("Error updating node status: {}", e);
341+
} else {
342+
info!(
343+
"Node {} is no longer active on chain but recently became healthy, waiting before marking inactive",
344+
node_address
345+
);
329346
}
330347
}
331348

@@ -359,6 +376,16 @@ impl DiscoveryMonitor {
359376
) {
360377
if last_change < last_updated {
361378
info!("Node {} is dead but has been updated on discovery, marking as discovered", node_address);
379+
380+
if existing_node.compute_specs != discovery_node.compute_specs {
381+
info!(
382+
"Node {} compute specs changed, marking as discovered",
383+
node_address
384+
);
385+
let mut node = existing_node.clone();
386+
node.compute_specs = discovery_node.compute_specs.clone();
387+
let _ = self.store_context.node_store.add_node(node.clone()).await;
388+
}
362389
if let Err(e) = self
363390
.update_node_status(&node_address, NodeStatus::Discovered)
364391
.await

crates/orchestrator/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,7 @@ async fn main() -> Result<()> {
381381

382382
let status_update_store_context = store_context.clone();
383383
let status_update_heartbeats = heartbeats.clone();
384+
let status_update_metrics = metrics_context.clone();
384385
tasks.spawn({
385386
let contracts = contracts.clone();
386387
async move {
@@ -393,6 +394,7 @@ async fn main() -> Result<()> {
393394
args.disable_ejection,
394395
status_update_heartbeats.clone(),
395396
status_updater_plugins,
397+
status_update_metrics,
396398
);
397399
status_updater.run().await
398400
}

crates/orchestrator/src/metrics/mod.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use prometheus::{CounterVec, GaugeVec, Opts, Registry, TextEncoder};
1+
use prometheus::{CounterVec, GaugeVec, HistogramOpts, HistogramVec, Opts, Registry, TextEncoder};
22
pub mod sync_service;
33
pub mod webhook_sender;
44

@@ -14,6 +14,7 @@ pub struct MetricsContext {
1414
pub heartbeat_requests_total: CounterVec,
1515
pub nodes_per_task: GaugeVec,
1616
pub task_state: GaugeVec,
17+
pub status_update_execution_time: HistogramVec,
1718
}
1819

1920
impl MetricsContext {
@@ -90,7 +91,6 @@ impl MetricsContext {
9091
&["task_id", "task_name", "pool_id"],
9192
)
9293
.unwrap();
93-
9494
let task_state = GaugeVec::new(
9595
Opts::new(
9696
"orchestrator_task_state",
@@ -100,6 +100,19 @@ impl MetricsContext {
100100
)
101101
.unwrap();
102102

103+
let status_update_execution_time = HistogramVec::new(
104+
HistogramOpts::new(
105+
"orchestrator_status_update_execution_time_seconds",
106+
"Duration of status update execution",
107+
)
108+
.buckets(vec![
109+
0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0, 15.0, 30.0, 45.0, 60.0, 90.0,
110+
120.0,
111+
]),
112+
&["node_address", "pool_id"],
113+
)
114+
.unwrap();
115+
103116
let registry = Registry::new();
104117
let _ = registry.register(Box::new(compute_task_gauges.clone()));
105118
let _ = registry.register(Box::new(task_info.clone()));
@@ -110,6 +123,7 @@ impl MetricsContext {
110123
let _ = registry.register(Box::new(heartbeat_requests_total.clone()));
111124
let _ = registry.register(Box::new(nodes_per_task.clone()));
112125
let _ = registry.register(Box::new(task_state.clone()));
126+
let _ = registry.register(Box::new(status_update_execution_time.clone()));
113127

114128
Self {
115129
compute_task_gauges,
@@ -123,6 +137,7 @@ impl MetricsContext {
123137
heartbeat_requests_total,
124138
nodes_per_task,
125139
task_state,
140+
status_update_execution_time,
126141
}
127142
}
128143

@@ -218,6 +233,12 @@ impl MetricsContext {
218233
self.compute_task_gauges.reset();
219234
}
220235

236+
pub fn record_status_update_execution_time(&self, node_address: &str, duration: f64) {
237+
self.status_update_execution_time
238+
.with_label_values(&[node_address, &self.pool_id])
239+
.observe(duration);
240+
}
241+
221242
/// Clear all orchestrator statistics metrics
222243
pub fn clear_orchestrator_statistics(&self) {
223244
self.nodes_total.reset();

crates/orchestrator/src/status_update/mod.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::metrics::MetricsContext;
12
use crate::models::node::{NodeStatus, OrchestratorNode};
23
use crate::plugins::StatusUpdatePlugin;
34
use crate::store::core::StoreContext;
@@ -7,7 +8,7 @@ use shared::web3::contracts::core::builder::Contracts;
78
use shared::web3::wallet::WalletProvider;
89
use std::result::Result;
910
use std::sync::Arc;
10-
use std::time::Duration;
11+
use std::time::{Duration, Instant};
1112
use tokio::time::interval;
1213

1314
pub struct NodeStatusUpdater {
@@ -19,6 +20,7 @@ pub struct NodeStatusUpdater {
1920
disable_ejection: bool,
2021
heartbeats: Arc<LoopHeartbeats>,
2122
plugins: Vec<Box<dyn StatusUpdatePlugin>>,
23+
metrics: Arc<MetricsContext>,
2224
}
2325

2426
impl NodeStatusUpdater {
@@ -32,6 +34,7 @@ impl NodeStatusUpdater {
3234
disable_ejection: bool,
3335
heartbeats: Arc<LoopHeartbeats>,
3436
plugins: Vec<Box<dyn StatusUpdatePlugin>>,
37+
metrics: Arc<MetricsContext>,
3538
) -> Self {
3639
Self {
3740
store_context,
@@ -42,6 +45,7 @@ impl NodeStatusUpdater {
4245
disable_ejection,
4346
heartbeats,
4447
plugins,
48+
metrics,
4549
}
4650
}
4751

@@ -133,6 +137,7 @@ impl NodeStatusUpdater {
133137
pub async fn process_nodes(&self) -> Result<(), anyhow::Error> {
134138
let nodes = self.store_context.node_store.get_nodes().await?;
135139
for node in nodes {
140+
let start_time = Instant::now();
136141
let node = node.clone();
137142
let old_status = node.status.clone();
138143
let heartbeat = self
@@ -313,6 +318,12 @@ impl NodeStatusUpdater {
313318
}
314319
}
315320
}
321+
// Record status update execution time
322+
let duration = start_time.elapsed();
323+
self.metrics.record_status_update_execution_time(
324+
&node.address.to_string(),
325+
duration.as_secs_f64(),
326+
);
316327
}
317328
Ok(())
318329
}
@@ -346,6 +357,7 @@ mod tests {
346357
false,
347358
Arc::new(LoopHeartbeats::new(&mode)),
348359
vec![],
360+
app_state.metrics.clone(),
349361
);
350362
let node = OrchestratorNode {
351363
address: Address::from_str("0x0000000000000000000000000000000000000000").unwrap(),
@@ -450,6 +462,7 @@ mod tests {
450462
false,
451463
Arc::new(LoopHeartbeats::new(&mode)),
452464
vec![],
465+
app_state.metrics.clone(),
453466
);
454467
tokio::spawn(async move {
455468
updater
@@ -502,6 +515,7 @@ mod tests {
502515
false,
503516
Arc::new(LoopHeartbeats::new(&mode)),
504517
vec![],
518+
app_state.metrics.clone(),
505519
);
506520
tokio::spawn(async move {
507521
updater
@@ -570,6 +584,7 @@ mod tests {
570584
false,
571585
Arc::new(LoopHeartbeats::new(&mode)),
572586
vec![],
587+
app_state.metrics.clone(),
573588
);
574589
tokio::spawn(async move {
575590
updater
@@ -648,6 +663,7 @@ mod tests {
648663
false,
649664
Arc::new(LoopHeartbeats::new(&mode)),
650665
vec![],
666+
app_state.metrics.clone(),
651667
);
652668
tokio::spawn(async move {
653669
updater
@@ -734,6 +750,7 @@ mod tests {
734750
false,
735751
Arc::new(LoopHeartbeats::new(&mode)),
736752
vec![],
753+
app_state.metrics.clone(),
737754
);
738755
tokio::spawn(async move {
739756
updater
@@ -817,6 +834,7 @@ mod tests {
817834
false,
818835
Arc::new(LoopHeartbeats::new(&mode)),
819836
vec![],
837+
app_state.metrics.clone(),
820838
);
821839
tokio::spawn(async move {
822840
updater
@@ -894,6 +912,7 @@ mod tests {
894912
false,
895913
Arc::new(LoopHeartbeats::new(&mode)),
896914
vec![],
915+
app_state.metrics.clone(),
897916
);
898917
tokio::spawn(async move {
899918
updater
@@ -962,6 +981,7 @@ mod tests {
962981
false,
963982
Arc::new(LoopHeartbeats::new(&mode)),
964983
vec![],
984+
app_state.metrics.clone(),
965985
);
966986
tokio::spawn(async move {
967987
updater

crates/orchestrator/src/store/domains/heartbeat_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ impl HeartbeatStore {
2929
con.set_options::<_, _, ()>(
3030
&key,
3131
payload_string,
32-
redis::SetOptions::default().with_expiration(redis::SetExpiry::EX(60)),
32+
redis::SetOptions::default().with_expiration(redis::SetExpiry::EX(90)),
3333
)
3434
.await
3535
.map_err(|_| anyhow!("Failed to set options"))?;

crates/shared/src/security/auth_signature_middleware.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ const MAX_NONCE_LENGTH: usize = 64;
3232
const MIN_NONCE_LENGTH: usize = 16;
3333
const RATE_LIMIT_WINDOW_SECS: u64 = 60;
3434
const MAX_REQUESTS_PER_WINDOW: usize = 100;
35+
const REQUEST_EXPIRY_SECS: u64 = 300;
3536

3637
type SyncAddressValidator = Arc<dyn Fn(&Address) -> bool + Send + Sync>;
3738
type AsyncAddressValidator = Arc<dyn Fn(&Address) -> LocalBoxFuture<'static, bool> + Send + Sync>;
@@ -445,7 +446,7 @@ where
445446
.duration_since(UNIX_EPOCH)
446447
.unwrap()
447448
.as_secs();
448-
if current_time - timestamp > 10 {
449+
if current_time - timestamp > REQUEST_EXPIRY_SECS {
449450
return Err(ErrorBadRequest(json!({
450451
"error": "Request expired",
451452
"code": "REQUEST_EXPIRED",

0 commit comments

Comments
 (0)