Skip to content

Commit 9d384e7

Browse files
committed
Add probing service
Introduce a background probing service that periodically dispatches probes to improve the scorer's liquidity estimates. Includes two built-in strategies.
1 parent fae2746 commit 9d384e7

File tree

7 files changed

+1259
-8
lines changed

7 files changed

+1259
-8
lines changed

src/builder.rs

Lines changed: 162 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ use std::collections::HashMap;
99
use std::convert::TryInto;
1010
use std::default::Default;
1111
use std::path::PathBuf;
12+
use std::sync::atomic::AtomicU64;
1213
use std::sync::{Arc, Mutex, Once, RwLock};
13-
use std::time::SystemTime;
14+
use std::time::{Duration, SystemTime};
1415
use std::{fmt, fs};
1516

1617
use bdk_wallet::template::Bip84;
@@ -47,6 +48,8 @@ use crate::config::{
4748
default_user_config, may_announce_channel, AnnounceError, AsyncPaymentsRole,
4849
BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig,
4950
DEFAULT_ESPLORA_SERVER_URL, DEFAULT_LOG_FILENAME, DEFAULT_LOG_LEVEL,
51+
DEFAULT_MAX_PROBE_AMOUNT_MSAT, DEFAULT_MAX_PROBE_LOCKED_MSAT, DEFAULT_PROBING_INTERVAL_SECS,
52+
MIN_PROBE_AMOUNT_MSAT,
5053
};
5154
use crate::connection::ConnectionManager;
5255
use crate::entropy::NodeEntropy;
@@ -73,6 +76,7 @@ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger};
7376
use crate::message_handler::NodeCustomMessageHandler;
7477
use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox;
7578
use crate::peer_store::PeerStore;
79+
use crate::probing;
7680
use crate::runtime::{Runtime, RuntimeSpawner};
7781
use crate::tx_broadcaster::TransactionBroadcaster;
7882
use crate::types::{
@@ -151,6 +155,37 @@ impl std::fmt::Debug for LogWriterConfig {
151155
}
152156
}
153157

158+
enum ProbingStrategyKind {
159+
HighDegree { top_n: usize },
160+
Random { max_hops: usize },
161+
Custom(Arc<dyn probing::ProbingStrategy>),
162+
}
163+
164+
struct ProbingStrategyConfig {
165+
kind: ProbingStrategyKind,
166+
interval: Duration,
167+
max_locked_msat: u64,
168+
}
169+
170+
impl fmt::Debug for ProbingStrategyConfig {
171+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172+
let kind_str = match &self.kind {
173+
ProbingStrategyKind::HighDegree { top_n } => {
174+
format!("HighDegree {{ top_n: {} }}", top_n)
175+
},
176+
ProbingStrategyKind::Random { max_hops } => {
177+
format!("Random {{ max_hops: {} }}", max_hops)
178+
},
179+
ProbingStrategyKind::Custom(_) => "Custom(<probing strategy>)".to_string(),
180+
};
181+
f.debug_struct("ProbingStrategyConfig")
182+
.field("kind", &kind_str)
183+
.field("interval", &self.interval)
184+
.field("max_locked_msat", &self.max_locked_msat)
185+
.finish()
186+
}
187+
}
188+
154189
/// An error encountered during building a [`Node`].
155190
///
156191
/// [`Node`]: crate::Node
@@ -247,6 +282,8 @@ pub struct NodeBuilder {
247282
runtime_handle: Option<tokio::runtime::Handle>,
248283
pathfinding_scores_sync_config: Option<PathfindingScoresSyncConfig>,
249284
recovery_mode: bool,
285+
probing_strategy: Option<ProbingStrategyConfig>,
286+
probing_diversity_penalty_msat: Option<u64>,
250287
}
251288

252289
impl NodeBuilder {
@@ -265,16 +302,21 @@ impl NodeBuilder {
265302
let runtime_handle = None;
266303
let pathfinding_scores_sync_config = None;
267304
let recovery_mode = false;
305+
let async_payments_role = None;
306+
let probing_strategy = None;
307+
let probing_diversity_penalty_msat = None;
268308
Self {
269309
config,
270310
chain_data_source_config,
271311
gossip_source_config,
272312
liquidity_source_config,
273313
log_writer_config,
274314
runtime_handle,
275-
async_payments_role: None,
315+
async_payments_role,
276316
pathfinding_scores_sync_config,
277317
recovery_mode,
318+
probing_strategy,
319+
probing_diversity_penalty_msat,
278320
}
279321
}
280322

@@ -559,6 +601,80 @@ impl NodeBuilder {
559601
self
560602
}
561603

604+
/// Configures background probing toward the highest-degree nodes in the network graph.
605+
///
606+
/// `top_n` controls how many of the most-connected nodes are cycled through.
607+
pub fn set_high_degree_probing_strategy(&mut self, top_n: usize) -> &mut Self {
608+
let kind = ProbingStrategyKind::HighDegree { top_n };
609+
self.probing_strategy = Some(self.make_probing_config(kind));
610+
self
611+
}
612+
613+
/// Configures background probing via random graph walks of up to `max_hops` hops.
614+
pub fn set_random_probing_strategy(&mut self, max_hops: usize) -> &mut Self {
615+
let kind = ProbingStrategyKind::Random { max_hops };
616+
self.probing_strategy = Some(self.make_probing_config(kind));
617+
self
618+
}
619+
620+
/// Configures a custom probing strategy for background channel probing.
621+
///
622+
/// When set, the node will periodically call [`ProbingStrategy::next_probe`] and dispatch the
623+
/// returned probe via the channel manager.
624+
pub fn set_custom_probing_strategy(
625+
&mut self, strategy: Arc<dyn probing::ProbingStrategy>,
626+
) -> &mut Self {
627+
let kind = ProbingStrategyKind::Custom(strategy);
628+
self.probing_strategy = Some(self.make_probing_config(kind));
629+
self
630+
}
631+
632+
/// Overrides the interval between probe attempts. Only has effect if a probing strategy is set.
633+
pub fn set_probing_interval(&mut self, interval: Duration) -> &mut Self {
634+
if let Some(cfg) = &mut self.probing_strategy {
635+
cfg.interval = interval;
636+
}
637+
self
638+
}
639+
640+
/// Overrides the maximum millisatoshis that may be locked in in-flight probes at any time.
641+
/// Only has effect if a probing strategy is set.
642+
pub fn set_max_probe_locked_msat(&mut self, max_msat: u64) -> &mut Self {
643+
if let Some(cfg) = &mut self.probing_strategy {
644+
cfg.max_locked_msat = max_msat;
645+
}
646+
self
647+
}
648+
649+
/// Sets the probing diversity penalty applied by the probabilistic scorer.
650+
///
651+
/// When set, the scorer will penalize channels that have been recently probed,
652+
/// encouraging path diversity during background probing. The penalty decays
653+
/// quadratically over 24 hours.
654+
///
655+
/// This is only useful for probing strategies that route through the scorer
656+
/// (e.g., [`HighDegreeStrategy`]). Strategies that build paths manually
657+
/// (e.g., [`RandomStrategy`]) bypass the scorer entirely.
658+
///
659+
/// If unset, LDK's default of `0` (no penalty) is used.
660+
pub fn set_probing_diversity_penalty_msat(&mut self, penalty_msat: u64) -> &mut Self {
661+
self.probing_diversity_penalty_msat = Some(penalty_msat);
662+
self
663+
}
664+
665+
fn make_probing_config(&self, kind: ProbingStrategyKind) -> ProbingStrategyConfig {
666+
let existing = self.probing_strategy.as_ref();
667+
ProbingStrategyConfig {
668+
kind,
669+
interval: existing
670+
.map(|c| c.interval)
671+
.unwrap_or(Duration::from_secs(DEFAULT_PROBING_INTERVAL_SECS)),
672+
max_locked_msat: existing
673+
.map(|c| c.max_locked_msat)
674+
.unwrap_or(DEFAULT_MAX_PROBE_LOCKED_MSAT),
675+
}
676+
}
677+
562678
/// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options
563679
/// previously configured.
564680
pub fn build(&self, node_entropy: NodeEntropy) -> Result<Node, BuildError> {
@@ -736,6 +852,8 @@ impl NodeBuilder {
736852
runtime,
737853
logger,
738854
Arc::new(DynStoreWrapper(kv_store)),
855+
self.probing_strategy.as_ref(),
856+
self.probing_diversity_penalty_msat,
739857
)
740858
}
741859
}
@@ -981,6 +1099,11 @@ impl ArcedNodeBuilder {
9811099
self.inner.write().unwrap().set_wallet_recovery_mode();
9821100
}
9831101

1102+
/// Configures a probing strategy for background channel probing.
1103+
pub fn set_custom_probing_strategy(&self, strategy: Arc<dyn probing::ProbingStrategy>) {
1104+
self.inner.write().unwrap().set_custom_probing_strategy(strategy);
1105+
}
1106+
9841107
/// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options
9851108
/// previously configured.
9861109
pub fn build(&self, node_entropy: Arc<NodeEntropy>) -> Result<Arc<Node>, BuildError> {
@@ -1126,6 +1249,8 @@ fn build_with_store_internal(
11261249
pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>,
11271250
async_payments_role: Option<AsyncPaymentsRole>, recovery_mode: bool, seed_bytes: [u8; 64],
11281251
runtime: Arc<Runtime>, logger: Arc<Logger>, kv_store: Arc<DynStore>,
1252+
probing_config: Option<&ProbingStrategyConfig>,
1253+
probing_diversity_penalty_msat: Option<u64>,
11291254
) -> Result<Node, BuildError> {
11301255
optionally_install_rustls_cryptoprovider();
11311256

@@ -1517,7 +1642,10 @@ fn build_with_store_internal(
15171642
},
15181643
}
15191644

1520-
let scoring_fee_params = ProbabilisticScoringFeeParameters::default();
1645+
let mut scoring_fee_params = ProbabilisticScoringFeeParameters::default();
1646+
if let Some(penalty) = probing_diversity_penalty_msat {
1647+
scoring_fee_params.probing_diversity_penalty_msat = penalty;
1648+
}
15211649
let router = Arc::new(DefaultRouter::new(
15221650
Arc::clone(&network_graph),
15231651
Arc::clone(&logger),
@@ -1853,6 +1981,36 @@ fn build_with_store_internal(
18531981
_leak_checker.0.push(Arc::downgrade(&wallet) as Weak<dyn Any + Send + Sync>);
18541982
}
18551983

1984+
let prober = probing_config.map(|probing_cfg| {
1985+
let strategy: Arc<dyn probing::ProbingStrategy> = match &probing_cfg.kind {
1986+
ProbingStrategyKind::HighDegree { top_n } => {
1987+
Arc::new(probing::HighDegreeStrategy::new(
1988+
network_graph.clone(),
1989+
*top_n,
1990+
MIN_PROBE_AMOUNT_MSAT,
1991+
DEFAULT_MAX_PROBE_AMOUNT_MSAT,
1992+
))
1993+
},
1994+
ProbingStrategyKind::Random { max_hops } => Arc::new(probing::RandomStrategy::new(
1995+
network_graph.clone(),
1996+
channel_manager.clone(),
1997+
*max_hops,
1998+
MIN_PROBE_AMOUNT_MSAT,
1999+
DEFAULT_MAX_PROBE_AMOUNT_MSAT,
2000+
)),
2001+
ProbingStrategyKind::Custom(s) => s.clone(),
2002+
};
2003+
Arc::new(probing::Prober {
2004+
channel_manager: channel_manager.clone(),
2005+
logger: logger.clone(),
2006+
strategy,
2007+
interval: probing_cfg.interval,
2008+
liquidity_limit_multiplier: Some(config.probing_liquidity_limit_multiplier),
2009+
max_locked_msat: probing_cfg.max_locked_msat,
2010+
locked_msat: Arc::new(AtomicU64::new(0)),
2011+
})
2012+
});
2013+
18562014
Ok(Node {
18572015
runtime,
18582016
stop_sender,
@@ -1886,6 +2044,7 @@ fn build_with_store_internal(
18862044
om_mailbox,
18872045
async_payments_role,
18882046
hrn_resolver,
2047+
prober,
18892048
#[cfg(cycle_tests)]
18902049
_leak_checker,
18912050
})

src/config.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ const DEFAULT_BDK_WALLET_SYNC_INTERVAL_SECS: u64 = 80;
2727
const DEFAULT_LDK_WALLET_SYNC_INTERVAL_SECS: u64 = 30;
2828
const DEFAULT_FEE_RATE_CACHE_UPDATE_INTERVAL_SECS: u64 = 60 * 10;
2929
const DEFAULT_PROBING_LIQUIDITY_LIMIT_MULTIPLIER: u64 = 3;
30+
pub(crate) const DEFAULT_PROBING_INTERVAL_SECS: u64 = 10;
31+
pub(crate) const DEFAULT_MAX_PROBE_LOCKED_MSAT: u64 = 100_000_000; // 100k sats
32+
pub(crate) const MIN_PROBE_AMOUNT_MSAT: u64 = 1_000_000; // 1k sats
33+
pub(crate) const DEFAULT_MAX_PROBE_AMOUNT_MSAT: u64 = 10_000_000; // 10k sats
3034
const DEFAULT_ANCHOR_PER_CHANNEL_RESERVE_SATS: u64 = 25_000;
3135

3236
// The default timeout after which we abort a wallet syncing operation.
@@ -210,7 +214,7 @@ impl Default for Config {
210214
anchor_channels_config: Some(AnchorChannelsConfig::default()),
211215
route_parameters: None,
212216
node_alias: None,
213-
}
217+
}
214218
}
215219
}
216220

src/event.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use core::future::Future;
99
use core::task::{Poll, Waker};
1010
use std::collections::VecDeque;
1111
use std::ops::Deref;
12+
use std::sync::atomic::{AtomicU64, Ordering};
1213
use std::sync::{Arc, Mutex};
1314

1415
use bitcoin::blockdata::locktime::absolute::LockTime;
@@ -515,6 +516,7 @@ where
515516
static_invoice_store: Option<StaticInvoiceStore>,
516517
onion_messenger: Arc<OnionMessenger>,
517518
om_mailbox: Option<Arc<OnionMessageMailbox>>,
519+
probe_locked_msat: Option<Arc<AtomicU64>>,
518520
}
519521

520522
impl<L: Deref + Clone + Sync + Send + 'static> EventHandler<L>
@@ -531,6 +533,7 @@ where
531533
keys_manager: Arc<KeysManager>, static_invoice_store: Option<StaticInvoiceStore>,
532534
onion_messenger: Arc<OnionMessenger>, om_mailbox: Option<Arc<OnionMessageMailbox>>,
533535
runtime: Arc<Runtime>, logger: L, config: Arc<Config>,
536+
probe_locked_msat: Option<Arc<AtomicU64>>
534537
) -> Self {
535538
Self {
536539
event_queue,
@@ -550,6 +553,7 @@ where
550553
static_invoice_store,
551554
onion_messenger,
552555
om_mailbox,
556+
probe_locked_msat,
553557
}
554558
}
555559

@@ -1135,8 +1139,22 @@ where
11351139

11361140
LdkEvent::PaymentPathSuccessful { .. } => {},
11371141
LdkEvent::PaymentPathFailed { .. } => {},
1138-
LdkEvent::ProbeSuccessful { .. } => {},
1139-
LdkEvent::ProbeFailed { .. } => {},
1142+
LdkEvent::ProbeSuccessful { path, .. } => {
1143+
if let Some(counter) = &self.probe_locked_msat {
1144+
let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum();
1145+
let _ = counter.fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| {
1146+
Some(v.saturating_sub(amount))
1147+
});
1148+
}
1149+
},
1150+
LdkEvent::ProbeFailed { path, .. } => {
1151+
if let Some(counter) = &self.probe_locked_msat {
1152+
let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum();
1153+
let _ = counter.fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| {
1154+
Some(v.saturating_sub(amount))
1155+
});
1156+
}
1157+
},
11401158
LdkEvent::HTLCHandlingFailed { failure_type, .. } => {
11411159
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
11421160
liquidity_source.handle_htlc_handling_failed(failure_type).await;
@@ -1375,7 +1393,6 @@ where
13751393
);
13761394
}
13771395
}
1378-
13791396
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
13801397
let skimmed_fee_msat = skimmed_fee_msat.unwrap_or(0);
13811398
liquidity_source

0 commit comments

Comments
 (0)