Skip to content

Commit 5f6c6c8

Browse files
committed
Fix probing tests
Added an explicit start of probing, which eliminated previously occured race condition
1 parent f9cf136 commit 5f6c6c8

1 file changed

Lines changed: 81 additions & 50 deletions

File tree

tests/probing_tests.rs

Lines changed: 81 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
// runs payment rounds and prints probing perfomance tables.
2020

2121
mod common;
22+
use std::sync::atomic::{AtomicBool, Ordering};
2223

2324
use lightning::routing::gossip::NodeAlias;
2425
use lightning_invoice::{Bolt11InvoiceDescription, Description};
@@ -49,31 +50,27 @@ const PROBING_DIVERSITY_PENALTY: u64 = 50_000;
4950
struct FixedDestStrategy {
5051
destination: PublicKey,
5152
amount_msat: u64,
53+
ready_to_probe: AtomicBool,
5254
}
5355

5456
impl FixedDestStrategy {
5557
fn new(destination: PublicKey, amount_msat: u64) -> Arc<Self> {
56-
Arc::new(Self { destination, amount_msat })
58+
Arc::new(Self { destination, amount_msat, ready_to_probe: AtomicBool::new(false) })
5759
}
58-
}
5960

60-
impl ProbingStrategy for FixedDestStrategy {
61-
fn next_probe(&self) -> Option<Probe> {
62-
Some(Probe::Destination { final_node: self.destination, amount_msat: self.amount_msat })
61+
fn start_probing(&self) {
62+
self.ready_to_probe.store(true, Ordering::Relaxed);
6363
}
6464
}
6565

66-
async fn wait_until(timeout: Duration, predicate: impl Fn() -> bool) -> bool {
67-
tokio::time::timeout(timeout, async {
68-
loop {
69-
if predicate() {
70-
return;
71-
}
72-
tokio::time::sleep(Duration::from_millis(1)).await;
66+
impl ProbingStrategy for FixedDestStrategy {
67+
fn next_probe(&self) -> Option<Probe> {
68+
if self.ready_to_probe.load(Ordering::Relaxed) {
69+
Some(Probe::Destination { final_node: self.destination, amount_msat: self.amount_msat })
70+
} else {
71+
None
7372
}
74-
})
75-
.await
76-
.is_ok()
73+
}
7774
}
7875

7976
fn config_with_label(label: &str) -> common::TestConfig {
@@ -85,20 +82,6 @@ fn config_with_label(label: &str) -> common::TestConfig {
8582
config
8683
}
8784

88-
fn build_node_fixed_dest_probing(
89-
chain_source: &TestChainSource<'_>, destination_node_id: PublicKey,
90-
) -> TestNode {
91-
let mut config = random_config(false);
92-
let strategy = FixedDestStrategy::new(destination_node_id, PROBE_AMOUNT_MSAT);
93-
config.probing = Some(
94-
ProbingConfig::custom(strategy)
95-
.interval(Duration::from_millis(PROBING_INTERVAL_MILLISECONDS))
96-
.max_locked_msat(PROBE_AMOUNT_MSAT)
97-
.build(),
98-
);
99-
setup_node(chain_source, config)
100-
}
101-
10285
fn build_node_random_probing(chain_source: &TestChainSource<'_>, max_hops: usize) -> TestNode {
10386
let mut config = config_with_label("Random");
10487
config.probing = Some(
@@ -259,14 +242,23 @@ fn print_probing_perfomance(observers: &[&TestNode], all_nodes: &[&TestNode]) {
259242

260243
/// Verifies that `locked_msat` increases when a probe is dispatched and returns
261244
/// to zero once the probe resolves (succeeds or fails).
262-
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
245+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
263246
async fn probe_budget_increments_and_decrements() {
264247
let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();
265248
let chain_source = TestChainSource::Electrum(&electrsd);
266249

267250
let node_b = setup_node(&chain_source, random_config(false));
268251
let node_c = setup_node(&chain_source, random_config(false));
269-
let node_a = build_node_fixed_dest_probing(&chain_source, node_c.node_id());
252+
253+
let mut config_a = random_config(false);
254+
let strategy = FixedDestStrategy::new(node_c.node_id(), PROBE_AMOUNT_MSAT);
255+
config_a.probing = Some(
256+
ProbingConfig::custom(strategy.clone())
257+
.interval(Duration::from_millis(PROBING_INTERVAL_MILLISECONDS))
258+
.max_locked_msat(PROBE_AMOUNT_MSAT)
259+
.build(),
260+
);
261+
let node_a = setup_node(&chain_source, config_a);
270262

271263
let addr_a = node_a.onchain_payment().new_address().unwrap();
272264
let addr_b = node_b.onchain_payment().new_address().unwrap();
@@ -295,18 +287,33 @@ async fn probe_budget_increments_and_decrements() {
295287
expect_event!(node_b, ChannelReady);
296288
expect_event!(node_c, ChannelReady);
297289

298-
// Give gossip time to propagate to A.
290+
// Give gossip time to propagate to A, then enable probing.
299291
tokio::time::sleep(Duration::from_secs(3)).await;
292+
strategy.start_probing();
300293

301-
let went_up =
302-
wait_until(Duration::from_secs(10), || node_a.prober().map_or(0, |p| p.locked_msat()) > 0)
303-
.await;
294+
let went_up = tokio::time::timeout(Duration::from_secs(10), async {
295+
loop {
296+
if node_a.prober().map_or(0, |p| p.locked_msat()) > 0 {
297+
break;
298+
}
299+
tokio::time::sleep(Duration::from_millis(1)).await;
300+
}
301+
})
302+
.await
303+
.is_ok();
304304
assert!(went_up, "locked_msat never increased — no probe was dispatched");
305305
println!("First probe dispatched; locked_msat = {}", node_a.prober().unwrap().locked_msat());
306306

307-
let cleared =
308-
wait_until(Duration::from_secs(20), || node_a.prober().map_or(1, |p| p.locked_msat()) == 0)
309-
.await;
307+
let cleared = tokio::time::timeout(Duration::from_secs(20), async {
308+
loop {
309+
if node_a.prober().map_or(1, |p| p.locked_msat()) == 0 {
310+
break;
311+
}
312+
tokio::time::sleep(Duration::from_millis(1)).await;
313+
}
314+
})
315+
.await
316+
.is_ok();
310317
assert!(cleared, "locked_msat never returned to zero after probe resolved");
311318
println!("Probe resolved; locked_msat = 0");
312319

@@ -320,7 +327,7 @@ async fn probe_budget_increments_and_decrements() {
320327
/// Exhaustion is triggered by stopping the intermediate node (B) while a probe HTLC
321328
/// is in-flight, preventing resolution and keeping the budget locked. After B restarts
322329
/// the HTLC fails, the budget clears, and probing resumes.
323-
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
330+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
324331
async fn exhausted_probe_budget_blocks_new_probes() {
325332
let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();
326333
let chain_source = TestChainSource::Electrum(&electrsd);
@@ -332,8 +339,8 @@ async fn exhausted_probe_budget_blocks_new_probes() {
332339
let mut config_a = random_config(false);
333340
let strategy = FixedDestStrategy::new(node_c.node_id(), PROBE_AMOUNT_MSAT);
334341
config_a.probing = Some(
335-
ProbingConfig::custom(strategy)
336-
.interval(Duration::from_secs(3))
342+
ProbingConfig::custom(strategy.clone())
343+
.interval(Duration::from_secs(10))
337344
.max_locked_msat(PROBE_AMOUNT_MSAT)
338345
.build(),
339346
);
@@ -374,10 +381,20 @@ async fn exhausted_probe_budget_blocks_new_probes() {
374381
.map(|ch| ch.outbound_capacity_msat)
375382
.expect("A→B channel not found");
376383

384+
assert_eq!(node_a.prober().map_or(1, |p| p.locked_msat()), 0, "initial locked_msat is nonzero");
385+
strategy.start_probing();
386+
377387
// Give gossip time to propagate to A, then wait for the first probe.
378-
let locked =
379-
wait_until(Duration::from_secs(15), || node_a.prober().map_or(0, |p| p.locked_msat()) > 0)
380-
.await;
388+
let locked = tokio::time::timeout(Duration::from_secs(15), async {
389+
loop {
390+
if node_a.prober().map_or(0, |p| p.locked_msat()) > 0 {
391+
break;
392+
}
393+
tokio::time::sleep(Duration::from_millis(1)).await;
394+
}
395+
})
396+
.await
397+
.is_ok();
381398
assert!(locked, "no probe dispatched within 15 s");
382399

383400
// Capacity should have decreased due to the in-flight probe HTLC.
@@ -421,15 +438,29 @@ async fn exhausted_probe_budget_blocks_new_probes() {
421438
node_b.connect(node_a.node_id(), node_a_addr, false).unwrap();
422439
node_b.connect(node_c.node_id(), node_c_addr, false).unwrap();
423440

424-
let cleared =
425-
wait_until(Duration::from_secs(15), || node_a.prober().map_or(1, |p| p.locked_msat()) == 0)
426-
.await;
441+
let cleared = tokio::time::timeout(Duration::from_secs(15), async {
442+
loop {
443+
if node_a.prober().map_or(1, |p| p.locked_msat()) == 0 {
444+
break;
445+
}
446+
tokio::time::sleep(Duration::from_millis(1)).await;
447+
}
448+
})
449+
.await
450+
.is_ok();
427451
assert!(cleared, "locked_msat never cleared after B came back online");
428452

429453
// Once the budget is freed, a new probe should be dispatched within a few ticks.
430-
let new_probe =
431-
wait_until(Duration::from_secs(10), || node_a.prober().map_or(0, |p| p.locked_msat()) > 0)
432-
.await;
454+
let new_probe = tokio::time::timeout(Duration::from_secs(10), async {
455+
loop {
456+
if node_a.prober().map_or(0, |p| p.locked_msat()) > 0 {
457+
break;
458+
}
459+
tokio::time::sleep(Duration::from_millis(1)).await;
460+
}
461+
})
462+
.await
463+
.is_ok();
433464
assert!(new_probe, "no new probe dispatched after budget was freed");
434465

435466
node_a.stop().unwrap();

0 commit comments

Comments
 (0)