Skip to content

Commit 963aa9d

Browse files
committed
Add probing service
Introduce a background probing service that periodically dispatches probes to improve the scorer's liquidity estimates. Includes two built-in strategies.
1 parent a555133 commit 963aa9d

File tree

7 files changed

+1261
-6
lines changed

7 files changed

+1261
-6
lines changed

src/builder.rs

Lines changed: 161 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ use std::collections::HashMap;
99
use std::convert::TryInto;
1010
use std::default::Default;
1111
use std::path::PathBuf;
12+
use std::sync::atomic::AtomicU64;
1213
use std::sync::{Arc, Mutex, Once, RwLock};
13-
use std::time::SystemTime;
14+
use std::time::{Duration, SystemTime};
1415
use std::{fmt, fs};
1516

1617
use bdk_wallet::template::Bip84;
@@ -47,6 +48,8 @@ use crate::config::{
4748
default_user_config, may_announce_channel, AnnounceError, AsyncPaymentsRole,
4849
BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, TorConfig,
4950
DEFAULT_ESPLORA_SERVER_URL, DEFAULT_LOG_FILENAME, DEFAULT_LOG_LEVEL,
51+
DEFAULT_MAX_PROBE_AMOUNT_MSAT, DEFAULT_MAX_PROBE_LOCKED_MSAT, DEFAULT_PROBING_INTERVAL_SECS,
52+
MIN_PROBE_AMOUNT_MSAT,
5053
};
5154
use crate::connection::ConnectionManager;
5255
use crate::entropy::NodeEntropy;
@@ -73,6 +76,7 @@ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger};
7376
use crate::message_handler::NodeCustomMessageHandler;
7477
use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox;
7578
use crate::peer_store::PeerStore;
79+
use crate::probing;
7680
use crate::runtime::{Runtime, RuntimeSpawner};
7781
use crate::tx_broadcaster::TransactionBroadcaster;
7882
use crate::types::{
@@ -151,6 +155,37 @@ impl std::fmt::Debug for LogWriterConfig {
151155
}
152156
}
153157

158+
enum ProbingStrategyKind {
159+
HighDegree { top_n: usize },
160+
Random { max_hops: usize },
161+
Custom(Arc<dyn probing::ProbingStrategy>),
162+
}
163+
164+
struct ProbingStrategyConfig {
165+
kind: ProbingStrategyKind,
166+
interval: Duration,
167+
max_locked_msat: u64,
168+
}
169+
170+
impl fmt::Debug for ProbingStrategyConfig {
171+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172+
let kind_str = match &self.kind {
173+
ProbingStrategyKind::HighDegree { top_n } => {
174+
format!("HighDegree {{ top_n: {} }}", top_n)
175+
},
176+
ProbingStrategyKind::Random { max_hops } => {
177+
format!("Random {{ max_hops: {} }}", max_hops)
178+
},
179+
ProbingStrategyKind::Custom(_) => "Custom(<probing strategy>)".to_string(),
180+
};
181+
f.debug_struct("ProbingStrategyConfig")
182+
.field("kind", &kind_str)
183+
.field("interval", &self.interval)
184+
.field("max_locked_msat", &self.max_locked_msat)
185+
.finish()
186+
}
187+
}
188+
154189
/// An error encountered during building a [`Node`].
155190
///
156191
/// [`Node`]: crate::Node
@@ -281,6 +316,8 @@ pub struct NodeBuilder {
281316
runtime_handle: Option<tokio::runtime::Handle>,
282317
pathfinding_scores_sync_config: Option<PathfindingScoresSyncConfig>,
283318
recovery_mode: bool,
319+
probing_strategy: Option<ProbingStrategyConfig>,
320+
probing_diversity_penalty_msat: Option<u64>,
284321
}
285322

286323
impl NodeBuilder {
@@ -299,16 +336,21 @@ impl NodeBuilder {
299336
let runtime_handle = None;
300337
let pathfinding_scores_sync_config = None;
301338
let recovery_mode = false;
339+
let async_payments_role = None;
340+
let probing_strategy = None;
341+
let probing_diversity_penalty_msat = None;
302342
Self {
303343
config,
304344
chain_data_source_config,
305345
gossip_source_config,
306346
liquidity_source_config,
307347
log_writer_config,
308348
runtime_handle,
309-
async_payments_role: None,
349+
async_payments_role,
310350
pathfinding_scores_sync_config,
311351
recovery_mode,
352+
probing_strategy,
353+
probing_diversity_penalty_msat,
312354
}
313355
}
314356

@@ -614,6 +656,80 @@ impl NodeBuilder {
614656
self
615657
}
616658

659+
/// Configures background probing toward the highest-degree nodes in the network graph.
660+
///
661+
/// `top_n` controls how many of the most-connected nodes are cycled through.
662+
pub fn set_high_degree_probing_strategy(&mut self, top_n: usize) -> &mut Self {
663+
let kind = ProbingStrategyKind::HighDegree { top_n };
664+
self.probing_strategy = Some(self.make_probing_config(kind));
665+
self
666+
}
667+
668+
/// Configures background probing via random graph walks of up to `max_hops` hops.
669+
pub fn set_random_probing_strategy(&mut self, max_hops: usize) -> &mut Self {
670+
let kind = ProbingStrategyKind::Random { max_hops };
671+
self.probing_strategy = Some(self.make_probing_config(kind));
672+
self
673+
}
674+
675+
/// Configures a custom probing strategy for background channel probing.
676+
///
677+
/// When set, the node will periodically call [`ProbingStrategy::next_probe`] and dispatch the
678+
/// returned probe via the channel manager.
679+
pub fn set_custom_probing_strategy(
680+
&mut self, strategy: Arc<dyn probing::ProbingStrategy>,
681+
) -> &mut Self {
682+
let kind = ProbingStrategyKind::Custom(strategy);
683+
self.probing_strategy = Some(self.make_probing_config(kind));
684+
self
685+
}
686+
687+
/// Overrides the interval between probe attempts. Only has effect if a probing strategy is set.
688+
pub fn set_probing_interval(&mut self, interval: Duration) -> &mut Self {
689+
if let Some(cfg) = &mut self.probing_strategy {
690+
cfg.interval = interval;
691+
}
692+
self
693+
}
694+
695+
/// Overrides the maximum millisatoshis that may be locked in in-flight probes at any time.
696+
/// Only has effect if a probing strategy is set.
697+
pub fn set_max_probe_locked_msat(&mut self, max_msat: u64) -> &mut Self {
698+
if let Some(cfg) = &mut self.probing_strategy {
699+
cfg.max_locked_msat = max_msat;
700+
}
701+
self
702+
}
703+
704+
/// Sets the probing diversity penalty applied by the probabilistic scorer.
705+
///
706+
/// When set, the scorer will penalize channels that have been recently probed,
707+
/// encouraging path diversity during background probing. The penalty decays
708+
/// quadratically over 24 hours.
709+
///
710+
/// This is only useful for probing strategies that route through the scorer
711+
/// (e.g., [`HighDegreeStrategy`]). Strategies that build paths manually
712+
/// (e.g., [`RandomStrategy`]) bypass the scorer entirely.
713+
///
714+
/// If unset, LDK's default of `0` (no penalty) is used.
715+
pub fn set_probing_diversity_penalty_msat(&mut self, penalty_msat: u64) -> &mut Self {
716+
self.probing_diversity_penalty_msat = Some(penalty_msat);
717+
self
718+
}
719+
720+
fn make_probing_config(&self, kind: ProbingStrategyKind) -> ProbingStrategyConfig {
721+
let existing = self.probing_strategy.as_ref();
722+
ProbingStrategyConfig {
723+
kind,
724+
interval: existing
725+
.map(|c| c.interval)
726+
.unwrap_or(Duration::from_secs(DEFAULT_PROBING_INTERVAL_SECS)),
727+
max_locked_msat: existing
728+
.map(|c| c.max_locked_msat)
729+
.unwrap_or(DEFAULT_MAX_PROBE_LOCKED_MSAT),
730+
}
731+
}
732+
617733
/// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options
618734
/// previously configured.
619735
pub fn build(&self, node_entropy: NodeEntropy) -> Result<Node, BuildError> {
@@ -791,6 +907,8 @@ impl NodeBuilder {
791907
runtime,
792908
logger,
793909
Arc::new(DynStoreWrapper(kv_store)),
910+
self.probing_strategy.as_ref(),
911+
self.probing_diversity_penalty_msat,
794912
)
795913
}
796914
}
@@ -1081,6 +1199,11 @@ impl ArcedNodeBuilder {
10811199
self.inner.write().unwrap().set_wallet_recovery_mode();
10821200
}
10831201

1202+
/// Configures a probing strategy for background channel probing.
1203+
pub fn set_custom_probing_strategy(&self, strategy: Arc<dyn probing::ProbingStrategy>) {
1204+
self.inner.write().unwrap().set_custom_probing_strategy(strategy);
1205+
}
1206+
10841207
/// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options
10851208
/// previously configured.
10861209
pub fn build(&self, node_entropy: Arc<NodeEntropy>) -> Result<Arc<Node>, BuildError> {
@@ -1226,6 +1349,7 @@ fn build_with_store_internal(
12261349
pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>,
12271350
async_payments_role: Option<AsyncPaymentsRole>, recovery_mode: bool, seed_bytes: [u8; 64],
12281351
runtime: Arc<Runtime>, logger: Arc<Logger>, kv_store: Arc<DynStore>,
1352+
probing_config: Option<&ProbingStrategyConfig>, probing_diversity_penalty_msat: Option<u64>,
12291353
) -> Result<Node, BuildError> {
12301354
optionally_install_rustls_cryptoprovider();
12311355

@@ -1626,7 +1750,10 @@ fn build_with_store_internal(
16261750
},
16271751
}
16281752

1629-
let scoring_fee_params = ProbabilisticScoringFeeParameters::default();
1753+
let mut scoring_fee_params = ProbabilisticScoringFeeParameters::default();
1754+
if let Some(penalty) = probing_diversity_penalty_msat {
1755+
scoring_fee_params.probing_diversity_penalty_msat = penalty;
1756+
}
16301757
let router = Arc::new(DefaultRouter::new(
16311758
Arc::clone(&network_graph),
16321759
Arc::clone(&logger),
@@ -1965,6 +2092,36 @@ fn build_with_store_internal(
19652092
_leak_checker.0.push(Arc::downgrade(&wallet) as Weak<dyn Any + Send + Sync>);
19662093
}
19672094

2095+
let prober = probing_config.map(|probing_cfg| {
2096+
let strategy: Arc<dyn probing::ProbingStrategy> = match &probing_cfg.kind {
2097+
ProbingStrategyKind::HighDegree { top_n } => {
2098+
Arc::new(probing::HighDegreeStrategy::new(
2099+
network_graph.clone(),
2100+
*top_n,
2101+
MIN_PROBE_AMOUNT_MSAT,
2102+
DEFAULT_MAX_PROBE_AMOUNT_MSAT,
2103+
))
2104+
},
2105+
ProbingStrategyKind::Random { max_hops } => Arc::new(probing::RandomStrategy::new(
2106+
network_graph.clone(),
2107+
channel_manager.clone(),
2108+
*max_hops,
2109+
MIN_PROBE_AMOUNT_MSAT,
2110+
DEFAULT_MAX_PROBE_AMOUNT_MSAT,
2111+
)),
2112+
ProbingStrategyKind::Custom(s) => s.clone(),
2113+
};
2114+
Arc::new(probing::Prober {
2115+
channel_manager: channel_manager.clone(),
2116+
logger: logger.clone(),
2117+
strategy,
2118+
interval: probing_cfg.interval,
2119+
liquidity_limit_multiplier: Some(config.probing_liquidity_limit_multiplier),
2120+
max_locked_msat: probing_cfg.max_locked_msat,
2121+
locked_msat: Arc::new(AtomicU64::new(0)),
2122+
})
2123+
});
2124+
19682125
Ok(Node {
19692126
runtime,
19702127
stop_sender,
@@ -1998,6 +2155,7 @@ fn build_with_store_internal(
19982155
om_mailbox,
19992156
async_payments_role,
20002157
hrn_resolver,
2158+
prober,
20012159
#[cfg(cycle_tests)]
20022160
_leak_checker,
20032161
})

src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ const DEFAULT_BDK_WALLET_SYNC_INTERVAL_SECS: u64 = 80;
2727
const DEFAULT_LDK_WALLET_SYNC_INTERVAL_SECS: u64 = 30;
2828
const DEFAULT_FEE_RATE_CACHE_UPDATE_INTERVAL_SECS: u64 = 60 * 10;
2929
const DEFAULT_PROBING_LIQUIDITY_LIMIT_MULTIPLIER: u64 = 3;
30+
pub(crate) const DEFAULT_PROBING_INTERVAL_SECS: u64 = 10;
31+
pub(crate) const DEFAULT_MAX_PROBE_LOCKED_MSAT: u64 = 100_000_000; // 100k sats
32+
pub(crate) const MIN_PROBE_AMOUNT_MSAT: u64 = 1_000_000; // 1k sats
33+
pub(crate) const DEFAULT_MAX_PROBE_AMOUNT_MSAT: u64 = 10_000_000; // 10k sats
3034
const DEFAULT_ANCHOR_PER_CHANNEL_RESERVE_SATS: u64 = 25_000;
3135

3236
// The default timeout after which we abort a wallet syncing operation.

src/event.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use core::future::Future;
99
use core::task::{Poll, Waker};
1010
use std::collections::VecDeque;
1111
use std::ops::Deref;
12+
use std::sync::atomic::{AtomicU64, Ordering};
1213
use std::sync::{Arc, Mutex};
1314

1415
use bitcoin::blockdata::locktime::absolute::LockTime;
@@ -515,6 +516,7 @@ where
515516
static_invoice_store: Option<StaticInvoiceStore>,
516517
onion_messenger: Arc<OnionMessenger>,
517518
om_mailbox: Option<Arc<OnionMessageMailbox>>,
519+
probe_locked_msat: Option<Arc<AtomicU64>>,
518520
}
519521

520522
impl<L: Deref + Clone + Sync + Send + 'static> EventHandler<L>
@@ -531,6 +533,7 @@ where
531533
keys_manager: Arc<KeysManager>, static_invoice_store: Option<StaticInvoiceStore>,
532534
onion_messenger: Arc<OnionMessenger>, om_mailbox: Option<Arc<OnionMessageMailbox>>,
533535
runtime: Arc<Runtime>, logger: L, config: Arc<Config>,
536+
probe_locked_msat: Option<Arc<AtomicU64>>,
534537
) -> Self {
535538
Self {
536539
event_queue,
@@ -550,6 +553,7 @@ where
550553
static_invoice_store,
551554
onion_messenger,
552555
om_mailbox,
556+
probe_locked_msat,
553557
}
554558
}
555559

@@ -1135,8 +1139,22 @@ where
11351139

11361140
LdkEvent::PaymentPathSuccessful { .. } => {},
11371141
LdkEvent::PaymentPathFailed { .. } => {},
1138-
LdkEvent::ProbeSuccessful { .. } => {},
1139-
LdkEvent::ProbeFailed { .. } => {},
1142+
LdkEvent::ProbeSuccessful { path, .. } => {
1143+
if let Some(counter) = &self.probe_locked_msat {
1144+
let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum();
1145+
let _ = counter.fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| {
1146+
Some(v.saturating_sub(amount))
1147+
});
1148+
}
1149+
},
1150+
LdkEvent::ProbeFailed { path, .. } => {
1151+
if let Some(counter) = &self.probe_locked_msat {
1152+
let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum();
1153+
let _ = counter.fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| {
1154+
Some(v.saturating_sub(amount))
1155+
});
1156+
}
1157+
},
11401158
LdkEvent::HTLCHandlingFailed { failure_type, .. } => {
11411159
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
11421160
liquidity_source.handle_htlc_handling_failed(failure_type).await;

0 commit comments

Comments
 (0)