Skip to content

Commit c4123e9

Browse files
PoulavBhowmick03varex83agentvarex83
authored
feat(cli): added cli validator tests (#286)
* feat: added cli validator tests * suggestions * clippy * simplify timeout iteration * fix(cli): address validator test PR review comments - Replace `name()` method with `fmt::Display` impl on `ValidatorTestCase` - Initialize tracing with `pluto_tracing::init` in `run` - Move `start_time` immediately before `run_tests_with_timeout` - Fix `ERR_TIMEOUT_INTERRUPTED` message to `"timeout/interrupted"` - Replace `checked_add`/`expect` with `Duration::saturating_sub` for timeout tracking - Remove outer `tokio::spawn` in `ping_load_test`; use a scoped block so the `JoinSet` is dropped (aborting workers) on timeout cancellation - Use `workers.join_all().await` instead of manual `join_next` loop - Change `ping_continuously` signature to `impl AsRef<str>` - Remove unnecessary `drop(conn)` before `return` - Qualify `super::` helper functions instead of importing them directly Co-Authored-By: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> * fix(deny): add advisory ignores for transitive deps All entries are transitive dependencies that cannot be immediately upgraded. None are reachable from Pluto's production code paths. Co-Authored-By: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> * fix(cli): address emlautarom1 review comments on validator tests - Add CancellationToken to validator::run and run_tests_with_timeout; use tokio::select! so Ctrl+C (SIGINT) cancels in-progress tests - Replace ERR_TIMEOUT_INTERRUPTED string const with CliError::TimeoutInterrupted and CliError::Other("test case not supported") with CliError::TestCaseNotSupported; remove leading underscore from both variants in the error enum - Remove RUSTSEC advisory ignores added to deny.toml; run cargo update instead — all seven advisories are resolved by updated transitive deps Co-Authored-By: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> * fix linter * fix: stop ping_continuously loop on dial error to match charon behavior In Charon's `pingValidatorContinuously`, both dial errors and timeouts cause an immediate `return`, stopping the loop. Port this behavior to Rust so a failing connection doesn't inflate the RTT sample set with absent readings. Co-Authored-By: varex83 <varex83@users.noreply.github.com> --------- Co-authored-by: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Co-authored-by: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> Co-authored-by: varex83 <varex83@users.noreply.github.com>
1 parent 8836fb4 commit c4123e9

3 files changed

Lines changed: 299 additions & 19 deletions

File tree

crates/cli/src/commands/test/mod.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,13 @@ pub struct TestConfigArgs {
6969
}
7070

7171
/// Lists available test case names for a given test category.
72-
/// TODO: Fill with enums TestCases of each category
7372
fn list_test_cases(category: TestCategory) -> Vec<String> {
7473
// Returns available test case names for each category.
7574
match category {
76-
TestCategory::Validator => {
77-
// From validator::supported_validator_test_cases()
78-
vec![
79-
"Ping".to_string(),
80-
"PingMeasure".to_string(),
81-
"PingLoad".to_string(),
82-
]
83-
}
75+
TestCategory::Validator => validator::ValidatorTestCase::all()
76+
.iter()
77+
.map(|tc| tc.to_string())
78+
.collect(),
8479
TestCategory::Beacon => beacon::test_case_names(),
8580
TestCategory::Mev => {
8681
vec![

crates/cli/src/commands/test/validator.rs

Lines changed: 294 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,61 @@
11
//! Validator client connectivity tests.
22
3-
use super::{TestConfigArgs, helpers::TestCategoryResult};
4-
use crate::error::Result;
3+
use std::{fmt, io::Write, time::Duration};
4+
55
use clap::Args;
6-
use std::{io::Write, time::Duration};
6+
use rand::Rng;
7+
use tokio::{
8+
net::TcpStream,
9+
sync::mpsc,
10+
time::{Instant, timeout},
11+
};
12+
use tokio_util::sync::CancellationToken;
13+
14+
use super::{
15+
AllCategoriesResult, TestCategory, TestCategoryResult, TestConfigArgs, TestResult, TestVerdict,
16+
};
17+
use crate::{
18+
duration::Duration as CliDuration,
19+
error::{CliError, Result},
20+
};
21+
22+
// Thresholds (from Go implementation)
23+
const THRESHOLD_MEASURE_AVG: Duration = Duration::from_millis(50);
24+
const THRESHOLD_MEASURE_POOR: Duration = Duration::from_millis(240);
25+
const THRESHOLD_LOAD_AVG: Duration = Duration::from_millis(50);
26+
const THRESHOLD_LOAD_POOR: Duration = Duration::from_millis(240);
27+
28+
/// Validator test cases.
29+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30+
pub enum ValidatorTestCase {
31+
/// TCP connectivity check.
32+
Ping,
33+
/// TCP round-trip time measurement.
34+
PingMeasure,
35+
/// Sustained TCP load test.
36+
PingLoad,
37+
}
38+
39+
impl ValidatorTestCase {
40+
/// Returns all validator test cases.
41+
pub fn all() -> &'static [ValidatorTestCase] {
42+
&[
43+
ValidatorTestCase::Ping,
44+
ValidatorTestCase::PingMeasure,
45+
ValidatorTestCase::PingLoad,
46+
]
47+
}
48+
}
49+
50+
impl fmt::Display for ValidatorTestCase {
51+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52+
f.write_str(match self {
53+
ValidatorTestCase::Ping => "Ping",
54+
ValidatorTestCase::PingMeasure => "PingMeasure",
55+
ValidatorTestCase::PingLoad => "PingLoad",
56+
})
57+
}
58+
}
759

860
/// Arguments for the validator test command.
961
#[derive(Args, Clone, Debug)]
@@ -30,10 +82,243 @@ pub struct TestValidatorArgs {
3082
}
3183

3284
/// Runs the validator client tests.
33-
pub async fn run(_args: TestValidatorArgs, _writer: &mut dyn Write) -> Result<TestCategoryResult> {
34-
// TODO: Implement validator tests
35-
// - Ping
36-
// - PingMeasure
37-
// - PingLoad
38-
unimplemented!("validator test not yet implemented")
85+
pub async fn run(
86+
args: TestValidatorArgs,
87+
writer: &mut dyn Write,
88+
ct: CancellationToken,
89+
) -> Result<TestCategoryResult> {
90+
pluto_tracing::init(
91+
&pluto_tracing::TracingConfig::builder()
92+
.with_default_console()
93+
.build(),
94+
)
95+
.expect("Failed to initialize tracing");
96+
97+
tracing::info!("Starting validator client test");
98+
99+
// Get and filter test cases
100+
let queued_tests: Vec<ValidatorTestCase> = if let Some(ref filter) = args.test_config.test_cases
101+
{
102+
ValidatorTestCase::all()
103+
.iter()
104+
.filter(|tc| filter.contains(&tc.to_string()))
105+
.copied()
106+
.collect()
107+
} else {
108+
ValidatorTestCase::all().to_vec()
109+
};
110+
111+
if queued_tests.is_empty() {
112+
return Err(CliError::TestCaseNotSupported);
113+
}
114+
115+
let start_time = Instant::now();
116+
let test_results = run_tests_with_timeout(&args, &queued_tests, ct).await;
117+
let elapsed = start_time.elapsed();
118+
119+
let score = super::calculate_score(&test_results);
120+
121+
let mut res = TestCategoryResult::new(TestCategory::Validator);
122+
res.targets.insert(args.api_address.clone(), test_results);
123+
res.execution_time = Some(CliDuration::new(elapsed));
124+
res.score = Some(score);
125+
126+
if !args.test_config.quiet {
127+
super::write_result_to_writer(&res, writer)?;
128+
}
129+
130+
if !args.test_config.output_json.is_empty() {
131+
super::write_result_to_file(&res, args.test_config.output_json.as_ref()).await?;
132+
}
133+
134+
if args.test_config.publish {
135+
let all = AllCategoriesResult {
136+
validator: Some(res.clone()),
137+
..Default::default()
138+
};
139+
super::publish_result_to_obol_api(
140+
all,
141+
&args.test_config.publish_addr,
142+
&args.test_config.publish_private_key_file,
143+
)
144+
.await?;
145+
}
146+
147+
Ok(res)
148+
}
149+
150+
/// Runs tests with timeout, keeping completed tests on timeout.
151+
async fn run_tests_with_timeout(
152+
args: &TestValidatorArgs,
153+
tests: &[ValidatorTestCase],
154+
ct: CancellationToken,
155+
) -> Vec<TestResult> {
156+
let mut results = Vec::new();
157+
let start = Instant::now();
158+
159+
for &test_case in tests {
160+
let remaining = args.test_config.timeout.saturating_sub(start.elapsed());
161+
162+
tokio::select! {
163+
result = run_single_test(args, test_case) => {
164+
results.push(result);
165+
}
166+
_ = tokio::time::sleep(remaining) => {
167+
results.push(
168+
TestResult::new(test_case.to_string())
169+
.fail(CliError::TimeoutInterrupted),
170+
);
171+
break;
172+
}
173+
_ = ct.cancelled() => {
174+
results.push(
175+
TestResult::new(test_case.to_string())
176+
.fail(CliError::TimeoutInterrupted),
177+
);
178+
break;
179+
}
180+
}
181+
}
182+
183+
results
184+
}
185+
186+
/// Runs a single test case.
187+
async fn run_single_test(args: &TestValidatorArgs, test_case: ValidatorTestCase) -> TestResult {
188+
match test_case {
189+
ValidatorTestCase::Ping => ping_test(args).await,
190+
ValidatorTestCase::PingMeasure => ping_measure_test(args).await,
191+
ValidatorTestCase::PingLoad => ping_load_test(args).await,
192+
}
193+
}
194+
195+
async fn ping_test(args: &TestValidatorArgs) -> TestResult {
196+
let mut result = TestResult::new(ValidatorTestCase::Ping.to_string());
197+
198+
match timeout(
199+
Duration::from_secs(1),
200+
TcpStream::connect(&args.api_address),
201+
)
202+
.await
203+
{
204+
Ok(Ok(_conn)) => {
205+
result.verdict = TestVerdict::Ok;
206+
}
207+
Ok(Err(e)) => {
208+
return result.fail(e);
209+
}
210+
Err(_) => {
211+
return result.fail(std::io::Error::new(
212+
std::io::ErrorKind::TimedOut,
213+
"connection timeout",
214+
));
215+
}
216+
}
217+
218+
result
219+
}
220+
221+
async fn ping_measure_test(args: &TestValidatorArgs) -> TestResult {
222+
let mut result = TestResult::new(ValidatorTestCase::PingMeasure.to_string());
223+
let before = Instant::now();
224+
225+
match timeout(
226+
Duration::from_secs(1),
227+
TcpStream::connect(&args.api_address),
228+
)
229+
.await
230+
{
231+
Ok(Ok(_conn)) => {
232+
let rtt = before.elapsed();
233+
result =
234+
super::evaluate_rtt(rtt, result, THRESHOLD_MEASURE_AVG, THRESHOLD_MEASURE_POOR);
235+
}
236+
Ok(Err(e)) => {
237+
return result.fail(e);
238+
}
239+
Err(_) => {
240+
return result.fail(std::io::Error::new(
241+
std::io::ErrorKind::TimedOut,
242+
"connection timeout",
243+
));
244+
}
245+
}
246+
247+
result
248+
}
249+
250+
async fn ping_load_test(args: &TestValidatorArgs) -> TestResult {
251+
tracing::info!(
252+
duration = ?args.load_test_duration,
253+
target = %args.api_address,
254+
"Running ping load tests..."
255+
);
256+
257+
let mut result = TestResult::new(ValidatorTestCase::PingLoad.to_string());
258+
let (tx, mut rx) = mpsc::channel::<Duration>(i16::MAX as usize);
259+
let address = args.api_address.clone();
260+
let duration = args.load_test_duration;
261+
262+
{
263+
let start = Instant::now();
264+
let mut interval = tokio::time::interval(Duration::from_secs(1));
265+
let mut workers = tokio::task::JoinSet::new();
266+
267+
interval.tick().await;
268+
while start.elapsed() < duration {
269+
interval.tick().await;
270+
271+
let tx = tx.clone();
272+
let addr = address.clone();
273+
let remaining = duration.saturating_sub(start.elapsed());
274+
275+
workers.spawn(ping_continuously(addr, tx, remaining));
276+
}
277+
278+
// Drop the scheduler's clone so only workers hold senders
279+
drop(tx);
280+
281+
// Wait for all spawned ping workers to finish
282+
workers.join_all().await;
283+
}
284+
285+
tracing::info!(target = %args.api_address, "Ping load tests finished");
286+
287+
// All senders dropped, collect all RTTs
288+
rx.close();
289+
let mut rtts = Vec::new();
290+
while let Some(rtt) = rx.recv().await {
291+
rtts.push(rtt);
292+
}
293+
294+
result = super::evaluate_highest_rtt(rtts, result, THRESHOLD_LOAD_AVG, THRESHOLD_LOAD_POOR);
295+
296+
result
297+
}
298+
299+
async fn ping_continuously(
300+
address: impl AsRef<str>,
301+
tx: mpsc::Sender<Duration>,
302+
max_duration: Duration,
303+
) {
304+
let address = address.as_ref();
305+
let start = Instant::now();
306+
307+
while start.elapsed() < max_duration {
308+
let before = Instant::now();
309+
310+
match timeout(Duration::from_secs(1), TcpStream::connect(address)).await {
311+
Ok(Ok(_conn)) => {
312+
let rtt = before.elapsed();
313+
if tx.send(rtt).await.is_err() {
314+
return;
315+
}
316+
}
317+
Ok(Err(_)) | Err(_) => {
318+
return;
319+
}
320+
}
321+
let sleep_ms = rand::thread_rng().gen_range(0..100);
322+
tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
323+
}
39324
}

crates/cli/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ async fn run() -> std::result::Result<(), CliError> {
7272
.map(|_| ())
7373
}
7474
TestCommands::Validator(args) => {
75-
commands::test::validator::run(args, &mut stdout)
75+
commands::test::validator::run(args, &mut stdout, ct.clone())
7676
.await
7777
.map(|_| ())
7878
}

0 commit comments

Comments
 (0)