Skip to content

Commit 1e73e6e

Browse files
committed
Fix probing tests
1 parent ebb6227 commit 1e73e6e

File tree

2 files changed

+125
-63
lines changed

2 files changed

+125
-63
lines changed

src/probing.rs

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,14 @@ pub struct Prober {
513513
pub(crate) locked_msat: Arc<AtomicU64>,
514514
}
515515

516+
fn fmt_path(path: &lightning::routing::router::Path) -> String {
517+
path.hops
518+
.iter()
519+
.map(|h| format!("{}(scid={})", h.pubkey, h.short_channel_id))
520+
.collect::<Vec<_>>()
521+
.join(" -> ")
522+
}
523+
516524
impl Prober {
517525
/// Returns the total millisatoshis currently locked in in-flight probes.
518526
pub fn locked_msat(&self) -> u64 {
@@ -521,16 +529,34 @@ impl Prober {
521529

522530
pub(crate) fn handle_probe_successful(&self, path: &lightning::routing::router::Path) {
523531
let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum();
524-
let _ = self
532+
let prev = self
525533
.locked_msat
526-
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| Some(v.saturating_sub(amount)));
534+
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| Some(v.saturating_sub(amount)))
535+
.unwrap_or(0);
536+
log_debug!(
537+
self.logger,
538+
"Probe successful: released {} msat (locked_msat {} -> {}), path: {}",
539+
amount,
540+
prev,
541+
prev.saturating_sub(amount),
542+
fmt_path(path)
543+
);
527544
}
528545

529546
pub(crate) fn handle_probe_failed(&self, path: &lightning::routing::router::Path) {
530547
let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum();
531-
let _ = self
548+
let prev = self
532549
.locked_msat
533-
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| Some(v.saturating_sub(amount)));
550+
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| Some(v.saturating_sub(amount)))
551+
.unwrap_or(0);
552+
log_debug!(
553+
self.logger,
554+
"Probe failed: released {} msat (locked_msat {} -> {}), path: {}",
555+
amount,
556+
prev,
557+
prev.saturating_sub(amount),
558+
fmt_path(path)
559+
);
534560
}
535561
}
536562

tests/probing_tests.rs

Lines changed: 95 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
// runs payment rounds and prints probing perfomance tables.
2020

2121
mod common;
22+
use std::sync::atomic::{AtomicBool, Ordering};
2223

2324
use lightning::routing::gossip::NodeAlias;
2425
use lightning_invoice::{Bolt11InvoiceDescription, Description};
@@ -42,38 +43,38 @@ use std::time::Duration;
4243

4344
const PROBE_AMOUNT_MSAT: u64 = 1_000_000;
4445
const MAX_LOCKED_MSAT: u64 = 100_000_000;
45-
const PROBING_INTERVAL_MILLISECONDS: u64 = 500;
46+
const PROBING_INTERVAL_MILLISECONDS: u64 = 100;
4647
const PROBING_DIVERSITY_PENALTY: u64 = 50_000;
4748

4849
/// FixedDestStrategy — always targets one node; used by budget tests.
4950
struct FixedDestStrategy {
5051
destination: PublicKey,
5152
amount_msat: u64,
53+
ready_to_probe: AtomicBool,
5254
}
5355

5456
impl FixedDestStrategy {
5557
fn new(destination: PublicKey, amount_msat: u64) -> Arc<Self> {
56-
Arc::new(Self { destination, amount_msat })
58+
Arc::new(Self { destination, amount_msat, ready_to_probe: AtomicBool::new(false) })
5759
}
58-
}
5960

60-
impl ProbingStrategy for FixedDestStrategy {
61-
fn next_probe(&self) -> Option<Probe> {
62-
Some(Probe::Destination { final_node: self.destination, amount_msat: self.amount_msat })
61+
fn start_probing(&self) {
62+
self.ready_to_probe.store(true, Ordering::Relaxed);
63+
}
64+
65+
fn stop_probing(&self) {
66+
self.ready_to_probe.store(false, Ordering::Relaxed);
6367
}
6468
}
6569

66-
async fn wait_until(timeout: Duration, predicate: impl Fn() -> bool) -> bool {
67-
tokio::time::timeout(timeout, async {
68-
loop {
69-
if predicate() {
70-
return;
71-
}
72-
tokio::time::sleep(Duration::from_millis(100)).await;
70+
impl ProbingStrategy for FixedDestStrategy {
71+
fn next_probe(&self) -> Option<Probe> {
72+
if self.ready_to_probe.load(Ordering::Relaxed) {
73+
Some(Probe::Destination { final_node: self.destination, amount_msat: self.amount_msat })
74+
} else {
75+
None
7376
}
74-
})
75-
.await
76-
.is_ok()
77+
}
7778
}
7879

7980
fn config_with_label(label: &str) -> common::TestConfig {
@@ -85,20 +86,6 @@ fn config_with_label(label: &str) -> common::TestConfig {
8586
config
8687
}
8788

88-
fn build_node_fixed_dest_probing(
89-
chain_source: &TestChainSource<'_>, destination_node_id: PublicKey,
90-
) -> TestNode {
91-
let mut config = random_config(false);
92-
let strategy = FixedDestStrategy::new(destination_node_id, PROBE_AMOUNT_MSAT);
93-
config.probing = Some(
94-
ProbingConfig::custom(strategy)
95-
.interval(Duration::from_millis(PROBING_INTERVAL_MILLISECONDS))
96-
.max_locked_msat(PROBE_AMOUNT_MSAT)
97-
.build(),
98-
);
99-
setup_node(chain_source, config)
100-
}
101-
10289
fn build_node_random_probing(chain_source: &TestChainSource<'_>, max_hops: usize) -> TestNode {
10390
let mut config = config_with_label("Random");
10491
config.probing = Some(
@@ -259,14 +246,23 @@ fn print_probing_perfomance(observers: &[&TestNode], all_nodes: &[&TestNode]) {
259246

260247
/// Verifies that `locked_msat` increases when a probe is dispatched and returns
261248
/// to zero once the probe resolves (succeeds or fails).
262-
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
249+
#[tokio::test(flavor = "multi_thread")]
263250
async fn probe_budget_increments_and_decrements() {
264251
let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();
265252
let chain_source = TestChainSource::Electrum(&electrsd);
266253

267254
let node_b = setup_node(&chain_source, random_config(false));
268255
let node_c = setup_node(&chain_source, random_config(false));
269-
let node_a = build_node_fixed_dest_probing(&chain_source, node_c.node_id());
256+
257+
let mut config_a = random_config(false);
258+
let strategy = FixedDestStrategy::new(node_c.node_id(), PROBE_AMOUNT_MSAT);
259+
config_a.probing = Some(
260+
ProbingConfig::custom(strategy.clone())
261+
.interval(Duration::from_millis(PROBING_INTERVAL_MILLISECONDS))
262+
.max_locked_msat(PROBE_AMOUNT_MSAT)
263+
.build(),
264+
);
265+
let node_a = setup_node(&chain_source, config_a);
270266

271267
let addr_a = node_a.onchain_payment().new_address().unwrap();
272268
let addr_b = node_b.onchain_payment().new_address().unwrap();
@@ -295,20 +291,34 @@ async fn probe_budget_increments_and_decrements() {
295291
expect_event!(node_b, ChannelReady);
296292
expect_event!(node_c, ChannelReady);
297293

298-
// Give gossip time to propagate to A.
294+
// Give gossip time to propagate to A, then enable probing.
299295
tokio::time::sleep(Duration::from_secs(3)).await;
296+
strategy.start_probing();
300297

301-
let went_up =
302-
wait_until(Duration::from_secs(10), || node_a.prober().map_or(0, |p| p.locked_msat()) > 0)
303-
.await;
298+
let went_up = tokio::time::timeout(Duration::from_secs(30), async {
299+
loop {
300+
if node_a.prober().map_or(0, |p| p.locked_msat()) > 0 {
301+
break;
302+
}
303+
tokio::time::sleep(Duration::from_millis(1)).await;
304+
}
305+
})
306+
.await
307+
.is_ok();
304308
assert!(went_up, "locked_msat never increased — no probe was dispatched");
305309
println!("First probe dispatched; locked_msat = {}", node_a.prober().unwrap().locked_msat());
306310

307-
let cleared =
308-
wait_until(Duration::from_secs(20), || node_a.prober().map_or(1, |p| p.locked_msat()) == 0)
309-
.await;
311+
let cleared = tokio::time::timeout(Duration::from_secs(30), async {
312+
loop {
313+
if node_a.prober().map_or(1, |p| p.locked_msat()) == 0 {
314+
break;
315+
}
316+
tokio::time::sleep(Duration::from_millis(1)).await;
317+
}
318+
})
319+
.await
320+
.is_ok();
310321
assert!(cleared, "locked_msat never returned to zero after probe resolved");
311-
println!("Probe resolved; locked_msat = 0");
312322

313323
node_a.stop().unwrap();
314324
node_b.stop().unwrap();
@@ -320,20 +330,19 @@ async fn probe_budget_increments_and_decrements() {
320330
/// Exhaustion is triggered by stopping the intermediate node (B) while a probe HTLC
321331
/// is in-flight, preventing resolution and keeping the budget locked. After B restarts
322332
/// the HTLC fails, the budget clears, and probing resumes.
323-
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
333+
#[tokio::test(flavor = "multi_thread")]
324334
async fn exhausted_probe_budget_blocks_new_probes() {
325335
let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();
326336
let chain_source = TestChainSource::Electrum(&electrsd);
327337

328338
let node_b = setup_node(&chain_source, random_config(false));
329339
let node_c = setup_node(&chain_source, random_config(false));
330340

331-
// Use a slow probing interval so we can read capacity before the first probe fires.
332341
let mut config_a = random_config(false);
333342
let strategy = FixedDestStrategy::new(node_c.node_id(), PROBE_AMOUNT_MSAT);
334343
config_a.probing = Some(
335-
ProbingConfig::custom(strategy)
336-
.interval(Duration::from_secs(3))
344+
ProbingConfig::custom(strategy.clone())
345+
.interval(Duration::from_millis(PROBING_INTERVAL_MILLISECONDS))
337346
.max_locked_msat(PROBE_AMOUNT_MSAT)
338347
.build(),
339348
);
@@ -366,19 +375,30 @@ async fn exhausted_probe_budget_blocks_new_probes() {
366375
expect_event!(node_b, ChannelReady);
367376
expect_event!(node_c, ChannelReady);
368377

369-
// Record capacity before the first probe fires (interval is 3s, so we have time).
370378
let capacity_at_open = node_a
371379
.list_channels()
372380
.iter()
373381
.find(|ch| ch.counterparty_node_id == node_b.node_id())
374382
.map(|ch| ch.outbound_capacity_msat)
375383
.expect("A→B channel not found");
376384

377-
// Give gossip time to propagate to A, then wait for the first probe.
378-
let locked =
379-
wait_until(Duration::from_secs(15), || node_a.prober().map_or(0, |p| p.locked_msat()) > 0)
380-
.await;
381-
assert!(locked, "no probe dispatched within 15 s");
385+
assert_eq!(node_a.prober().map_or(1, |p| p.locked_msat()), 0, "initial locked_msat is nonzero");
386+
387+
tokio::time::sleep(Duration::from_secs(3)).await;
388+
strategy.start_probing();
389+
390+
// Wait for the first probe to be in-flight.
391+
let locked = tokio::time::timeout(Duration::from_secs(30), async {
392+
loop {
393+
if node_a.prober().map_or(0, |p| p.locked_msat()) > 0 {
394+
break;
395+
}
396+
tokio::time::sleep(Duration::from_millis(1)).await;
397+
}
398+
})
399+
.await
400+
.is_ok();
401+
assert!(locked, "no probe dispatched within 30 s");
382402

383403
// Capacity should have decreased due to the in-flight probe HTLC.
384404
let capacity_with_probe = node_a
@@ -395,8 +415,6 @@ async fn exhausted_probe_budget_blocks_new_probes() {
395415
// Stop B while the probe HTLC is in-flight.
396416
node_b.stop().unwrap();
397417

398-
// Let several Prober ticks fire (interval is 3s); the budget is exhausted so
399-
// they must be skipped. Wait, then check both conditions at once.
400418
tokio::time::sleep(Duration::from_secs(5)).await;
401419
assert!(
402420
node_a.prober().map_or(0, |p| p.locked_msat()) > 0,
@@ -413,6 +431,9 @@ async fn exhausted_probe_budget_blocks_new_probes() {
413431
"a new probe HTLC was sent despite budget being exhausted"
414432
);
415433

434+
// Pause probing so the budget can clear without a new probe re-locking it.
435+
strategy.stop_probing();
436+
416437
// Bring B back and explicitly reconnect to A and C so the stuck HTLC resolves
417438
// without waiting for the background reconnection backoff.
418439
node_b.start().unwrap();
@@ -421,15 +442,30 @@ async fn exhausted_probe_budget_blocks_new_probes() {
421442
node_b.connect(node_a.node_id(), node_a_addr, false).unwrap();
422443
node_b.connect(node_c.node_id(), node_c_addr, false).unwrap();
423444

424-
let cleared =
425-
wait_until(Duration::from_secs(15), || node_a.prober().map_or(1, |p| p.locked_msat()) == 0)
426-
.await;
445+
let cleared = tokio::time::timeout(Duration::from_secs(60), async {
446+
loop {
447+
if node_a.prober().map_or(1, |p| p.locked_msat()) == 0 {
448+
break;
449+
}
450+
tokio::time::sleep(Duration::from_millis(1)).await;
451+
}
452+
})
453+
.await
454+
.is_ok();
427455
assert!(cleared, "locked_msat never cleared after B came back online");
428456

429-
// Once the budget is freed, a new probe should be dispatched within a few ticks.
430-
let new_probe =
431-
wait_until(Duration::from_secs(10), || node_a.prober().map_or(0, |p| p.locked_msat()) > 0)
432-
.await;
457+
// Re-enable probing; a new probe should be dispatched within a few ticks.
458+
strategy.start_probing();
459+
let new_probe = tokio::time::timeout(Duration::from_secs(60), async {
460+
loop {
461+
if node_a.prober().map_or(0, |p| p.locked_msat()) > 0 {
462+
break;
463+
}
464+
tokio::time::sleep(Duration::from_millis(1)).await;
465+
}
466+
})
467+
.await
468+
.is_ok();
433469
assert!(new_probe, "no new probe dispatched after budget was freed");
434470

435471
node_a.stop().unwrap();

0 commit comments

Comments
 (0)