Skip to content

Commit 419aeb7

Browse files
committed
ethereum: Replace fake health check with real RPC call and use atomics
- Add `health_check()` method to EthereumAdapter using `eth_blockNumber` with a fixed 5s timeout independent of json_rpc_timeout - Replace RwLock with atomics (AtomicU64/AtomicU32) in Health struct, following the EndpointMetrics pattern to avoid lock poisoning - Add CancellationToken support to health_check_task for graceful shutdown - Add tokio-util dependency for CancellationToken
1 parent 9673023 commit 419aeb7

4 files changed

Lines changed: 53 additions & 35 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chain/ethereum/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ semver = { workspace = true }
1818
thiserror = { workspace = true }
1919
tokio = { workspace = true }
2020
tokio-stream = { workspace = true }
21+
tokio-util = { workspace = true }
2122
tower = { workspace = true }
2223

2324
itertools = "0.14.0"

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ use std::convert::TryFrom;
5757
use std::iter::FromIterator;
5858
use std::pin::Pin;
5959
use std::sync::Arc;
60-
use std::time::Instant;
60+
use std::time::{Duration, Instant};
6161
use tokio::sync::RwLock;
6262
use tokio::time::timeout;
6363

@@ -1131,6 +1131,16 @@ impl EthereumAdapter {
11311131
Box::new(self.load_block_ptrs_rpc(logger, blocks).collect())
11321132
}
11331133

1134+
/// Lightweight health check that calls `eth_blockNumber` with a fixed 5s timeout.
1135+
pub async fn health_check(&self) -> Result<u64, Error> {
1136+
let alloy = self.alloy.clone();
1137+
tokio::time::timeout(Duration::from_secs(5), async move {
1138+
alloy.get_block_number().await.map_err(Error::from)
1139+
})
1140+
.await
1141+
.map_err(|_| anyhow!("health check timed out"))?
1142+
}
1143+
11341144
pub async fn chain_id(&self) -> Result<u64, Error> {
11351145
let logger = self.logger.clone();
11361146
let alloy = self.alloy.clone();

chain/ethereum/src/health.rs

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,25 @@
1-
use crate::adapter::EthereumAdapter as EthereumAdapterTrait;
1+
use crate::adapter::EthereumAdapter as _;
22
use crate::EthereumAdapter;
3-
use std::sync::{Arc, RwLock};
3+
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
4+
use std::sync::Arc;
45
use std::time::{Duration, Instant};
5-
use tokio::time::sleep;
6+
use tokio_util::sync::CancellationToken;
7+
68
#[derive(Debug)]
79
pub struct Health {
8-
pub provider: Arc<EthereumAdapter>,
9-
latency: Arc<RwLock<Duration>>,
10-
error_rate: Arc<RwLock<f64>>,
11-
consecutive_failures: Arc<RwLock<u32>>,
10+
provider: Arc<EthereumAdapter>,
11+
latency_nanos: AtomicU64,
12+
error_rate_bits: AtomicU64,
13+
consecutive_failures: AtomicU32,
1214
}
1315

1416
impl Health {
1517
pub fn new(provider: Arc<EthereumAdapter>) -> Self {
1618
Self {
1719
provider,
18-
latency: Arc::new(RwLock::new(Duration::from_secs(0))),
19-
error_rate: Arc::new(RwLock::new(0.0)),
20-
consecutive_failures: Arc::new(RwLock::new(0)),
20+
latency_nanos: AtomicU64::new(0),
21+
error_rate_bits: AtomicU64::new(0f64.to_bits()),
22+
consecutive_failures: AtomicU32::new(0),
2123
}
2224
}
2325

@@ -26,46 +28,50 @@ impl Health {
2628
}
2729

2830
pub async fn check(&self) {
29-
let start_time = Instant::now();
30-
// For now, we'll just simulate a health check.
31-
// In a real implementation, we would send a request to the provider.
32-
let success = self.provider.provider().contains("rpc1"); // Simulate a failure for rpc2
33-
let latency = start_time.elapsed();
34-
35-
self.update_metrics(success, latency);
31+
let start = Instant::now();
32+
let success = self.provider.health_check().await.is_ok();
33+
self.update_metrics(success, start.elapsed());
3634
}
3735

3836
fn update_metrics(&self, success: bool, latency: Duration) {
39-
let mut latency_w = self.latency.write().unwrap();
40-
*latency_w = latency;
37+
self.latency_nanos
38+
.store(latency.as_nanos() as u64, Ordering::Relaxed);
4139

42-
let mut error_rate_w = self.error_rate.write().unwrap();
43-
let mut consecutive_failures_w = self.consecutive_failures.write().unwrap();
40+
let prev_error_rate = f64::from_bits(self.error_rate_bits.load(Ordering::Relaxed));
4441

4542
if success {
46-
*error_rate_w = *error_rate_w * 0.9; // Decay the error rate
47-
*consecutive_failures_w = 0;
43+
let new_error_rate = prev_error_rate * 0.9;
44+
self.error_rate_bits
45+
.store(new_error_rate.to_bits(), Ordering::Relaxed);
46+
self.consecutive_failures.store(0, Ordering::Relaxed);
4847
} else {
49-
*error_rate_w = *error_rate_w * 0.9 + 0.1; // Increase the error rate
50-
*consecutive_failures_w += 1;
48+
let new_error_rate = prev_error_rate * 0.9 + 0.1;
49+
self.error_rate_bits
50+
.store(new_error_rate.to_bits(), Ordering::Relaxed);
51+
self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
5152
}
5253
}
5354

5455
pub fn score(&self) -> f64 {
55-
let latency = *self.latency.read().unwrap();
56-
let error_rate = *self.error_rate.read().unwrap();
57-
let consecutive_failures = *self.consecutive_failures.read().unwrap();
56+
let latency_secs =
57+
Duration::from_nanos(self.latency_nanos.load(Ordering::Relaxed)).as_secs_f64();
58+
let error_rate = f64::from_bits(self.error_rate_bits.load(Ordering::Relaxed));
59+
let consecutive_failures = self.consecutive_failures.load(Ordering::Relaxed);
5860

59-
// This is a simple scoring algorithm. A more sophisticated algorithm could be used here.
60-
1.0 / (1.0 + latency.as_secs_f64() + error_rate + (consecutive_failures as f64))
61+
1.0 / (1.0 + latency_secs + error_rate + (consecutive_failures as f64))
6162
}
6263
}
6364

64-
pub async fn health_check_task(health_checkers: Vec<Arc<Health>>) {
65+
pub async fn health_check_task(health_checkers: Vec<Arc<Health>>, cancel_token: CancellationToken) {
6566
loop {
66-
for health_checker in &health_checkers {
67-
health_checker.check().await;
67+
tokio::select! {
68+
_ = cancel_token.cancelled() => break,
69+
_ = async {
70+
for hc in &health_checkers {
71+
hc.check().await;
72+
}
73+
tokio::time::sleep(Duration::from_secs(10)).await;
74+
} => {}
6875
}
69-
sleep(Duration::from_secs(10)).await;
7076
}
7177
}

0 commit comments

Comments
 (0)