Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions crates/cli/src/commands/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,13 @@ pub struct TestConfigArgs {
}

/// Lists available test case names for a given test category.
/// TODO: Fill with enums TestCases of each category
fn list_test_cases(category: TestCategory) -> Vec<String> {
// Returns available test case names for each category.
match category {
TestCategory::Validator => {
// From validator::supported_validator_test_cases()
vec![
"Ping".to_string(),
"PingMeasure".to_string(),
"PingLoad".to_string(),
]
}
TestCategory::Validator => validator::ValidatorTestCase::all()
.iter()
.map(|tc| tc.to_string())
.collect(),
TestCategory::Beacon => {
// TODO: Extract from beacon::supported_beacon_test_cases()
vec![]
Expand Down
293 changes: 284 additions & 9 deletions crates/cli/src/commands/test/validator.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,57 @@
//! Validator client connectivity tests.

use super::{TestCategoryResult, TestConfigArgs};
use crate::error::Result;
use std::{fmt, io::Write, time::Duration};

use clap::Args;
use std::{io::Write, time::Duration};
use rand::Rng;
use tokio::{
net::TcpStream,
sync::mpsc,
time::{Instant, timeout},
};

use super::{
AllCategoriesResult, TestCategory, TestCategoryResult, TestConfigArgs, TestResult, TestVerdict,
};
use crate::{duration::Duration as CliDuration, error::Result};

// Thresholds (from Go implementation)
const THRESHOLD_MEASURE_AVG: Duration = Duration::from_millis(50);
const THRESHOLD_MEASURE_POOR: Duration = Duration::from_millis(240);
const THRESHOLD_LOAD_AVG: Duration = Duration::from_millis(50);
const THRESHOLD_LOAD_POOR: Duration = Duration::from_millis(240);

/// Validator test cases.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ValidatorTestCase {
/// TCP connectivity check.
Ping,
/// TCP round-trip time measurement.
PingMeasure,
/// Sustained TCP load test.
PingLoad,
}

impl ValidatorTestCase {
/// Returns all validator test cases.
pub fn all() -> &'static [ValidatorTestCase] {
&[
ValidatorTestCase::Ping,
ValidatorTestCase::PingMeasure,
ValidatorTestCase::PingLoad,
]
}
}

impl fmt::Display for ValidatorTestCase {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
ValidatorTestCase::Ping => "Ping",
ValidatorTestCase::PingMeasure => "PingMeasure",
ValidatorTestCase::PingLoad => "PingLoad",
})
}
}

/// Arguments for the validator test command.
#[derive(Args, Clone, Debug)]
Expand All @@ -30,10 +78,237 @@ pub struct TestValidatorArgs {
}

/// Runs the validator client tests.
pub async fn run(_args: TestValidatorArgs, _writer: &mut dyn Write) -> Result<TestCategoryResult> {
// TODO: Implement validator tests
// - Ping
// - PingMeasure
// - PingLoad
unimplemented!("validator test not yet implemented")
pub async fn run(args: TestValidatorArgs, writer: &mut dyn Write) -> Result<TestCategoryResult> {
pluto_tracing::init(
&pluto_tracing::TracingConfig::builder()
.with_default_console()
.build(),
)
.expect("Failed to initialize tracing");

tracing::info!("Starting validator client test");

// Get and filter test cases
let queued_tests: Vec<ValidatorTestCase> = if let Some(ref filter) = args.test_config.test_cases
{
ValidatorTestCase::all()
.iter()
.filter(|tc| filter.contains(&tc.to_string()))
.copied()
.collect()
} else {
ValidatorTestCase::all().to_vec()
};

if queued_tests.is_empty() {
return Err(crate::error::CliError::Other(
"test case not supported".into(),
));
}

let start_time = Instant::now();
let test_results = run_tests_with_timeout(&args, &queued_tests).await;
let elapsed = start_time.elapsed();

let score = super::calculate_score(&test_results);

let mut res = TestCategoryResult::new(TestCategory::Validator);
res.targets.insert(args.api_address.clone(), test_results);
res.execution_time = Some(CliDuration::new(elapsed));
res.score = Some(score);

if !args.test_config.quiet {
super::write_result_to_writer(&res, writer)?;
}

if !args.test_config.output_json.is_empty() {
super::write_result_to_file(&res, args.test_config.output_json.as_ref()).await?;
}

if args.test_config.publish {
let all = AllCategoriesResult {
validator: Some(res.clone()),
..Default::default()
};
super::publish_result_to_obol_api(
all,
&args.test_config.publish_addr,
&args.test_config.publish_private_key_file,
)
.await?;
}

Ok(res)
}

/// Timeout error message.
const ERR_TIMEOUT_INTERRUPTED: &str = "timeout/interrupted";
Comment thread
emlautarom1 marked this conversation as resolved.
Outdated

/// Runs tests with timeout, keeping completed tests on timeout.
async fn run_tests_with_timeout(
args: &TestValidatorArgs,
tests: &[ValidatorTestCase],
) -> Vec<TestResult> {
let mut results = Vec::new();
let start = Instant::now();

for &test_case in tests {
let remaining = args.test_config.timeout.saturating_sub(start.elapsed());

Comment thread
emlautarom1 marked this conversation as resolved.
match tokio::time::timeout(remaining, run_single_test(args, test_case)).await {
Comment thread
varex83agent marked this conversation as resolved.
Outdated
Ok(result) => results.push(result),
Err(_) => {
results.push(
TestResult::new(test_case.to_string())
.fail(std::io::Error::other(ERR_TIMEOUT_INTERRUPTED)),
);
break;
}
}
}

results
}
Comment thread
PoulavBhowmick03 marked this conversation as resolved.

/// Runs a single test case.
async fn run_single_test(args: &TestValidatorArgs, test_case: ValidatorTestCase) -> TestResult {
match test_case {
ValidatorTestCase::Ping => ping_test(args).await,
ValidatorTestCase::PingMeasure => ping_measure_test(args).await,
ValidatorTestCase::PingLoad => ping_load_test(args).await,
}
}

async fn ping_test(args: &TestValidatorArgs) -> TestResult {
let mut result = TestResult::new(ValidatorTestCase::Ping.to_string());

match timeout(
Comment thread
varex83agent marked this conversation as resolved.
Duration::from_secs(1),
TcpStream::connect(&args.api_address),
)
.await
{
Ok(Ok(_conn)) => {
result.verdict = TestVerdict::Ok;
}
Ok(Err(e)) => {
return result.fail(e);
}
Err(_) => {
return result.fail(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"connection timeout",
));
}
}

result
}

async fn ping_measure_test(args: &TestValidatorArgs) -> TestResult {
let mut result = TestResult::new(ValidatorTestCase::PingMeasure.to_string());
let before = Instant::now();

match timeout(
Duration::from_secs(1),
TcpStream::connect(&args.api_address),
)
.await
{
Ok(Ok(_conn)) => {
let rtt = before.elapsed();
result =
super::evaluate_rtt(rtt, result, THRESHOLD_MEASURE_AVG, THRESHOLD_MEASURE_POOR);
}
Ok(Err(e)) => {
return result.fail(e);
}
Err(_) => {
return result.fail(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"connection timeout",
));
}
}

result
}

async fn ping_load_test(args: &TestValidatorArgs) -> TestResult {
tracing::info!(
duration = ?args.load_test_duration,
target = %args.api_address,
"Running ping load tests..."
);

let mut result = TestResult::new(ValidatorTestCase::PingLoad.to_string());
let (tx, mut rx) = mpsc::channel::<Duration>(i16::MAX as usize);
let address = args.api_address.clone();
let duration = args.load_test_duration;

{
let start = Instant::now();
let mut interval = tokio::time::interval(Duration::from_secs(1));
let mut workers = tokio::task::JoinSet::new();

interval.tick().await;
while start.elapsed() < duration {
interval.tick().await;

let tx = tx.clone();
let addr = address.clone();
let remaining = duration.saturating_sub(start.elapsed());

workers.spawn(ping_continuously(addr, tx, remaining));
}

// Drop the scheduler's clone so only workers hold senders
drop(tx);

// Wait for all spawned ping workers to finish
workers.join_all().await;
}

tracing::info!(target = %args.api_address, "Ping load tests finished");

// All senders dropped, collect all RTTs
rx.close();
let mut rtts = Vec::new();
while let Some(rtt) = rx.recv().await {
rtts.push(rtt);
}

result = super::evaluate_highest_rtt(rtts, result, THRESHOLD_LOAD_AVG, THRESHOLD_LOAD_POOR);

result
}

async fn ping_continuously(
address: impl AsRef<str>,
tx: mpsc::Sender<Duration>,
max_duration: Duration,
) {
let address = address.as_ref();
let start = Instant::now();

while start.elapsed() < max_duration {
let before = Instant::now();

match timeout(Duration::from_secs(1), TcpStream::connect(address)).await {
Ok(Ok(_conn)) => {
let rtt = before.elapsed();
if tx.send(rtt).await.is_err() {
return;
}
}
Ok(Err(e)) => {
tracing::warn!(target = %address, error = ?e, "Ping connection attempt failed during load test");
}
Err(e) => {
tracing::warn!(target = %address, error = ?e, "Ping connection attempt timed out during load test");
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In charon, when dial has error, it will stop the loop

		conn, err := d.DialContext(ctx, "tcp", address)
		if err != nil {
			return
		}

This can affect the score later

}
let sleep_ms = rand::thread_rng().gen_range(0..100);
tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
}
}
27 changes: 26 additions & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,32 @@ feature-depth = 1
db-path = "~/.cargo/advisory-db"
db-urls = ["https://github.com/rustsec/advisory-db"]
yanked = "deny"
ignore = []
ignore = [
# Unsoundness in `rand` 0.7.3 and 0.8.5 reachable only via a custom `log`
# logger that calls `rand::rng()` and hits a reseed during the log event.
# The only affected pulls are transitive: `cuckoofilter` -> `libp2p-floodsub`
# (stuck on 0.7.3) and `alloy-signer-local` (stuck on 0.8.5). Neither is
# reachable from Pluto's loggers. Remove once upstream bumps to >=0.9.3.
{ id = "RUSTSEC-2026-0097", reason = "transitive rand <0.9.3 via cuckoofilter and alloy-signer-local; not triggerable from our code" },
# CN validation logic error in aws-lc-sys <=0.38.0. Transitive dep via
# libp2p-quic, libp2p-websocket, reqwest, tracing-loki; can't upgrade yet.
{ id = "RUSTSEC-2026-0044", reason = "transitive aws-lc-sys 0.38.0; upgrade blocked by transitive constraints" },
# CRL distribution point matching logic error in aws-lc-sys <=0.38.0.
# Same transitive path as RUSTSEC-2026-0044 above.
{ id = "RUSTSEC-2026-0048", reason = "transitive aws-lc-sys 0.38.0; upgrade blocked by transitive constraints" },
# CRL distribution point incomplete check in rustls-webpki 0.103.9.
# Transitive via rustls-platform-verifier. Remove once upstream upgrades.
{ id = "RUSTSEC-2026-0049", reason = "transitive rustls-webpki 0.103.9 via rustls-platform-verifier; not directly used" },
# URI name constraints ignored in rustls-webpki 0.103.9. Same path.
{ id = "RUSTSEC-2026-0098", reason = "transitive rustls-webpki 0.103.9 via rustls-platform-verifier; not directly used" },
# tar-rs <=0.4.44 archive unpacking PAX handling issues (both advisories).
# Transitive via pluto-app. Remove once upstream bumps to >=0.4.45.
{ id = "RUSTSEC-2026-0067", reason = "transitive tar 0.4.44 via pluto-app; upgrade blocked by upstream" },
{ id = "RUSTSEC-2026-0068", reason = "transitive tar 0.4.44 via pluto-app; upgrade blocked by upstream" },
# astral-tokio-tar <=0.5.6 malformed PAX extension issue. Dev-only dep
# via testcontainers -> pluto-eth2api (test only). Not in production builds.
{ id = "RUSTSEC-2026-0066", reason = "dev-only transitive astral-tokio-tar 0.5.6 via testcontainers; not in production" },
Comment thread
emlautarom1 marked this conversation as resolved.
Outdated
]
unmaintained = "workspace"

[licenses]
Expand Down
Loading