From c10194184972e83ec97825f2a2a535be58f782fa Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Tue, 28 Apr 2026 18:12:07 +0200 Subject: [PATCH 1/4] Add probing service Introduce a background probing service that periodically sends payment probes to discover liquidity along Lightning routes. Probes update the local scorer with channel liquidity information, improving pathfinding for subsequent real payments. The service supports three strategies: - HighDegreeStrategy: probes nodes with the most channels in the network graph - RandomWalkStrategy: walks random paths from the local node - Custom: user-supplied strategy via the `ProbingStrategy` trait A dedicated `ProbingConfigBuilder` exposes amount bounds, locked-msat caps, probing intervals, and per-node cooldowns, with sensible defaults. The service runs as a cancellable background task driven by the existing `Runtime`, and budget accounting tracks both in-flight and locked amounts to bound outbound liquidity exposure. UniFFI bindings expose the probing service to the Swift, Kotlin, and Python language bindings. Co-Authored-By: Claude Sonnet 4.6 --- bindings/ldk_node.udl | 5 + src/builder.rs | 88 ++++- src/config.rs | 6 + src/event.rs | 29 +- src/lib.rs | 21 ++ src/probing.rs | 833 ++++++++++++++++++++++++++++++++++++++++++ src/util.rs | 37 ++ 7 files changed, 1006 insertions(+), 13 deletions(-) create mode 100644 src/probing.rs create mode 100644 src/util.rs diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 7e9e61f5d5..5455a5af50 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -13,6 +13,10 @@ typedef dictionary TorConfig; typedef interface NodeEntropy; +typedef interface ProbingConfig; + +typedef interface ProbingConfigBuilder; + typedef enum WordCount; [Remote] @@ -61,6 +65,7 @@ interface Builder { [Throws=BuildError] void set_async_payments_role(AsyncPaymentsRole? role); void set_wallet_recovery_mode(); + void set_probing_config(ProbingConfig config); [Throws=BuildError] Node build(NodeEntropy node_entropy); [Throws=BuildError] diff --git a/src/builder.rs b/src/builder.rs index c88c867cc1..1a281cc93d 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -50,6 +50,7 @@ use crate::config::{ default_user_config, may_announce_channel, AnnounceError, AsyncPaymentsRole, BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, HRNResolverConfig, TorConfig, DEFAULT_ESPLORA_SERVER_URL, DEFAULT_LOG_FILENAME, DEFAULT_LOG_LEVEL, + DEFAULT_MAX_PROBE_AMOUNT_MSAT, DEFAULT_MIN_PROBE_AMOUNT_MSAT, }; use crate::connection::ConnectionManager; use crate::entropy::NodeEntropy; @@ -76,6 +77,9 @@ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger}; use crate::message_handler::NodeCustomMessageHandler; use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; use crate::peer_store::PeerStore; +use crate::probing::{ + HighDegreeStrategy, Prober, ProbingConfig, ProbingStrategy, ProbingStrategyKind, RandomWalkStrategy, +}; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ @@ -292,6 +296,7 @@ pub struct NodeBuilder { runtime_handle: Option, pathfinding_scores_sync_config: Option, recovery_mode: bool, + probing_config: Option, } impl NodeBuilder { @@ -310,6 +315,8 @@ impl NodeBuilder { let runtime_handle = None; let pathfinding_scores_sync_config = None; let recovery_mode = false; + let async_payments_role = None; + let probing_config = None; Self { config, chain_data_source_config, @@ -317,9 +324,10 @@ impl NodeBuilder { liquidity_source_config, log_writer_config, runtime_handle, - async_payments_role: None, + async_payments_role, pathfinding_scores_sync_config, recovery_mode, + probing_config, } } @@ -625,6 +633,31 @@ impl NodeBuilder { self } + /// Configures background probing. + /// + /// Use [`ProbingConfigBuilder`] to build the configuration: + /// ```no_run + /// # #[cfg(not(feature = "uniffi"))] + /// # { + /// use std::time::Duration; + /// use ldk_node::Builder; + /// use ldk_node::probing::ProbingConfigBuilder; + /// + /// let mut builder = Builder::new(); + /// builder.set_probing_config( + /// ProbingConfigBuilder::high_degree(100) + /// .interval(Duration::from_secs(30)) + /// .build() + /// ); + /// # } + /// ``` + /// + /// [`ProbingConfigBuilder`]: crate::probing::ProbingConfigBuilder + pub fn set_probing_config(&mut self, config: ProbingConfig) -> &mut Self { + self.probing_config = Some(config); + self + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: NodeEntropy) -> Result { @@ -864,6 +897,7 @@ impl NodeBuilder { self.gossip_source_config.as_ref(), self.liquidity_source_config.as_ref(), self.pathfinding_scores_sync_config.as_ref(), + self.probing_config.as_ref(), self.async_payments_role, self.recovery_mode, seed_bytes, @@ -1164,6 +1198,15 @@ impl ArcedNodeBuilder { self.inner.write().expect("lock").set_wallet_recovery_mode(); } + /// Configures background probing. + /// + /// Use [`ProbingConfigBuilder`] to build the configuration. + /// + /// [`ProbingConfigBuilder`]: crate::probing::ProbingConfigBuilder + pub fn set_probing_config(&self, config: Arc) { + self.inner.write().unwrap().set_probing_config((*config).clone()); + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: Arc) -> Result, BuildError> { @@ -1359,8 +1402,9 @@ fn build_with_store_internal( gossip_source_config: Option<&GossipSourceConfig>, liquidity_source_config: Option<&LiquiditySourceConfig>, pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>, - async_payments_role: Option, recovery_mode: bool, seed_bytes: [u8; 64], - runtime: Arc, logger: Arc, kv_store: Arc, + probing_config: Option<&ProbingConfig>, async_payments_role: Option, + recovery_mode: bool, seed_bytes: [u8; 64], runtime: Arc, logger: Arc, + kv_store: Arc, ) -> Result { optionally_install_rustls_cryptoprovider(); @@ -1777,7 +1821,10 @@ fn build_with_store_internal( }, } - let scoring_fee_params = ProbabilisticScoringFeeParameters::default(); + let mut scoring_fee_params = ProbabilisticScoringFeeParameters::default(); + if let Some(penalty) = probing_config.and_then(|c| c.diversity_penalty_msat) { + scoring_fee_params.probing_diversity_penalty_msat = penalty; + } let router = Arc::new(DefaultRouter::new( Arc::clone(&network_graph), Arc::clone(&logger), @@ -2152,6 +2199,38 @@ fn build_with_store_internal( _leak_checker.0.push(Arc::downgrade(&wallet) as Weak); } + let prober = probing_config.map(|probing_cfg| { + let strategy: Arc = match &probing_cfg.kind { + ProbingStrategyKind::HighDegree { top_node_count } => { + Arc::new(HighDegreeStrategy::new( + Arc::clone(&network_graph), + Arc::clone(&channel_manager), + Arc::clone(&router), + *top_node_count, + DEFAULT_MIN_PROBE_AMOUNT_MSAT, + DEFAULT_MAX_PROBE_AMOUNT_MSAT, + probing_cfg.cooldown, + config.probing_liquidity_limit_multiplier, + )) + }, + ProbingStrategyKind::RandomWalk { max_hops } => Arc::new(RandomWalkStrategy::new( + Arc::clone(&network_graph), + Arc::clone(&channel_manager), + *max_hops, + DEFAULT_MIN_PROBE_AMOUNT_MSAT, + DEFAULT_MAX_PROBE_AMOUNT_MSAT, + )), + ProbingStrategyKind::Custom(s) => Arc::clone(s), + }; + Arc::new(Prober { + channel_manager: Arc::clone(&channel_manager), + logger: Arc::clone(&logger), + strategy, + interval: probing_cfg.interval, + max_locked_msat: probing_cfg.max_locked_msat, + }) + }); + Ok(Node { runtime, stop_sender, @@ -2185,6 +2264,7 @@ fn build_with_store_internal( om_mailbox, async_payments_role, hrn_resolver, + prober, #[cfg(cycle_tests)] _leak_checker, }) diff --git a/src/config.rs b/src/config.rs index 558a4d0618..c908743825 100644 --- a/src/config.rs +++ b/src/config.rs @@ -28,6 +28,12 @@ const DEFAULT_BDK_WALLET_SYNC_INTERVAL_SECS: u64 = 80; const DEFAULT_LDK_WALLET_SYNC_INTERVAL_SECS: u64 = 30; const DEFAULT_FEE_RATE_CACHE_UPDATE_INTERVAL_SECS: u64 = 60 * 10; const DEFAULT_PROBING_LIQUIDITY_LIMIT_MULTIPLIER: u64 = 3; +pub(crate) const DEFAULT_PROBING_INTERVAL_SECS: u64 = 10; +pub(crate) const MIN_PROBING_INTERVAL: Duration = Duration::from_millis(100); +pub(crate) const DEFAULT_PROBED_NODE_COOLDOWN_SECS: u64 = 60 * 60; // 1 hour +pub(crate) const DEFAULT_MAX_PROBE_LOCKED_MSAT: u64 = 100_000_000; // 100k sats +pub(crate) const DEFAULT_MIN_PROBE_AMOUNT_MSAT: u64 = 1_000_000; // 1k sats +pub(crate) const DEFAULT_MAX_PROBE_AMOUNT_MSAT: u64 = 10_000_000; // 10k sats const DEFAULT_ANCHOR_PER_CHANNEL_RESERVE_SATS: u64 = 25_000; // The default timeout after which we abort a wallet syncing operation. diff --git a/src/event.rs b/src/event.rs index 86ee7bb05a..28d87ca6fb 100644 --- a/src/event.rs +++ b/src/event.rs @@ -51,6 +51,7 @@ use crate::payment::store::{ PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus, }; use crate::payment::PaymentMetadata; +use crate::probing::Prober; use crate::runtime::Runtime; use crate::types::{ CustomTlvRecord, DynStore, KeysManager, OnionMessenger, PaymentStore, Sweeper, Wallet, @@ -537,12 +538,13 @@ where payment_store: Arc, peer_store: Arc>, keys_manager: Arc, - runtime: Arc, - logger: L, - config: Arc, static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, + prober: Option>, + runtime: Arc, + logger: L, + config: Arc, } impl EventHandler @@ -558,7 +560,7 @@ where payment_store: Arc, peer_store: Arc>, keys_manager: Arc, static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, - runtime: Arc, logger: L, config: Arc, + prober: Option>, runtime: Arc, logger: L, config: Arc, ) -> Self { Self { event_queue, @@ -572,12 +574,13 @@ where payment_store, peer_store, keys_manager, - logger, - runtime, - config, static_invoice_store, onion_messenger, om_mailbox, + prober, + runtime, + logger, + config, } } @@ -1210,8 +1213,16 @@ where LdkEvent::PaymentPathSuccessful { .. } => {}, LdkEvent::PaymentPathFailed { .. } => {}, - LdkEvent::ProbeSuccessful { .. } => {}, - LdkEvent::ProbeFailed { .. } => {}, + LdkEvent::ProbeSuccessful { path, payment_id, .. } => { + if let Some(prober) = &self.prober { + prober.handle_background_probe_successful(&path, payment_id); + } + }, + LdkEvent::ProbeFailed { path, payment_id, .. } => { + if let Some(prober) = &self.prober { + prober.handle_background_probe_failed(&path, payment_id); + } + }, LdkEvent::HTLCHandlingFailed { failure_type, .. } => { if let Some(liquidity_source) = self.liquidity_source.as_ref() { liquidity_source.handle_htlc_handling_failed(failure_type).await; diff --git a/src/lib.rs b/src/lib.rs index 7ed69031c3..4b59376eca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -101,10 +101,12 @@ pub mod logger; mod message_handler; pub mod payment; mod peer_store; +pub mod probing; mod runtime; mod scoring; mod tx_broadcaster; mod types; +mod util; mod wallet; use std::default::Default; @@ -113,6 +115,8 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; #[cfg(cycle_tests)] use std::{any::Any, sync::Weak}; +#[cfg(feature = "uniffi")] +use crate::probing::ProbingConfig; pub use balance::{BalanceDetails, LightningBalance, PendingSweepBalance}; pub use bip39; pub use bitcoin; @@ -172,6 +176,9 @@ use payment::{ UnifiedPayment, }; use peer_store::{PeerInfo, PeerStore}; +#[cfg(feature = "uniffi")] +pub use probing::ArcedProbingConfigBuilder as ProbingConfigBuilder; +use probing::{run_prober, Prober}; use runtime::Runtime; pub use tokio; use types::{ @@ -247,6 +254,7 @@ pub struct Node { om_mailbox: Option>, async_payments_role: Option, hrn_resolver: HRNResolver, + prober: Option>, #[cfg(cycle_tests)] _leak_checker: LeakChecker, } @@ -605,11 +613,19 @@ impl Node { static_invoice_store, Arc::clone(&self.onion_messenger), self.om_mailbox.clone(), + self.prober.clone(), Arc::clone(&self.runtime), Arc::clone(&self.logger), Arc::clone(&self.config), )); + if let Some(prober) = self.prober.clone() { + let stop_rx = self.stop_sender.subscribe(); + self.runtime.spawn_cancellable_background_task(async move { + run_prober(prober, stop_rx).await; + }); + } + // Setup background processing let background_persister = Arc::clone(&self.kv_store); let background_event_handler = Arc::clone(&event_handler); @@ -1096,6 +1112,11 @@ impl Node { )) } + /// Returns a reference to the [`Prober`], or `None` if no probing strategy is configured. + pub fn prober(&self) -> Option<&Prober> { + self.prober.as_deref() + } + /// Retrieve a list of known channels. pub fn list_channels(&self) -> Vec { self.channel_manager.list_channels().into_iter().map(|c| c.into()).collect() diff --git a/src/probing.rs b/src/probing.rs new file mode 100644 index 0000000000..d37f4ba272 --- /dev/null +++ b/src/probing.rs @@ -0,0 +1,833 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +//! Background probing for training the payment scorer. +//! +//! Lightning Network nodes only know channels' capacities via their initially announced limits; +//! the real values change unpredictably after payments have been sent, which makes some of +//! the channels inoperable (capacity has been depleted). The only way to know about channel +//! depletion is to attempt sending a payment through it. Thus, sending a live payment +//! might involve a significant time delay for finding an appropriate channel with enough capacity, +//! up to complete failure when a route with enough capacity cannot be found. +//! +//! The background probing service fires probes to learn about the live state of channels and +//! their capacities, providing accurate data to the scorer and router. +//! +//! This module provides the configuration for such a service. There are two pre-built strategies, +//! [`RandomWalkStrategy`] and [`HighDegreeStrategy`], as well as a [`ProbingStrategy`] trait which +//! allows defining a custom probing strategy (for example if there is an established payment +//! pattern). +//! +//! # Configuration +//! +//! Probing is opt-in: a node only runs the service if a [`ProbingConfig`] has been registered +//! on the [`Builder`] via [`Builder::set_probing_config`] before [`Builder::build`]. Without a +//! config, no probes are sent. +//! +//! # Example +//! +//! ```no_run +//! # #[cfg(not(feature = "uniffi"))] +//! # { +//! use std::time::Duration; +//! use ldk_node::Builder; +//! use ldk_node::probing::ProbingConfigBuilder; +//! +//! let probing_config = ProbingConfigBuilder::high_degree(100) +//! .interval(Duration::from_secs(30)) +//! .max_locked_msat(500_000) +//! .diversity_penalty_msat(250) +//! .build(); +//! +//! let mut builder = Builder::new(); +//! builder.set_probing_config(probing_config); +//! # } +//! ``` +//! +//! # Caution +//! +//! Probes send real HTLCs along real paths. If an intermediate hop is offline or +//! misbehaving, the probe HTLC can remain in-flight — locking outbound liquidity +//! on the first-hop channel until the HTLC timeout elapses (potentially hours). +//! `max_locked_msat` caps the total outbound capacity that in-flight probes may +//! hold at any one time; tune it conservatively for nodes with tight liquidity. +//! +//! [`Builder`]: crate::Builder +//! [`Builder::set_probing_config`]: crate::Builder::set_probing_config +//! [`Builder::build`]: crate::Builder::build + +use std::collections::HashMap; +use std::fmt; +#[cfg(feature = "uniffi")] +use std::sync::RwLock; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use bitcoin::secp256k1::PublicKey; +use lightning::ln::channelmanager::{PaymentId, RecentPaymentDetails}; +use lightning::routing::gossip::NodeId; +use lightning::routing::router::Router as LdkRouter; +use lightning::routing::router::{ + Path, PaymentParameters, RouteHop, RouteParameters, MAX_PATH_LENGTH_ESTIMATE, +}; +use lightning_invoice::DEFAULT_MIN_FINAL_CLTV_EXPIRY_DELTA; +use lightning_types::features::{ChannelFeatures, NodeFeatures}; + +use crate::config::{ + DEFAULT_MAX_PROBE_LOCKED_MSAT, DEFAULT_PROBED_NODE_COOLDOWN_SECS, + DEFAULT_PROBING_INTERVAL_SECS, MIN_PROBING_INTERVAL, +}; +use crate::logger::{log_debug, LdkLogger, Logger}; +use crate::types::{ChannelManager, Graph, Router}; +use crate::util::random_range; + +/// Which built-in probing strategy to use, or a custom one. +#[derive(Clone)] +pub(crate) enum ProbingStrategyKind { + HighDegree { top_node_count: usize }, + RandomWalk { max_hops: usize }, + Custom(Arc), +} + +impl fmt::Debug for ProbingStrategyKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::HighDegree { top_node_count } => { + f.debug_struct("HighDegree").field("top_node_count", top_node_count).finish() + }, + Self::RandomWalk { max_hops } => { + f.debug_struct("RandomWalk").field("max_hops", max_hops).finish() + }, + Self::Custom(_) => f.write_str("Custom()"), + } + } +} + +/// Configuration for the background probing subsystem. +/// +/// Instances are produced by [`ProbingConfigBuilder`], which exposes three strategy +/// constructors: [`ProbingConfigBuilder::high_degree`], [`ProbingConfigBuilder::random_walk`], +/// and [`ProbingConfigBuilder::custom`]. +/// +/// Optional setters on the builder tune timing and liquidity limits, and +/// [`ProbingConfigBuilder::build`] finalizes the value. +/// +/// # Examples +/// +/// Using pre-built strategy: +/// ```no_run +/// # #[cfg(not(feature = "uniffi"))] +/// # { +/// use std::time::Duration; +/// use ldk_node::Builder; +/// use ldk_node::probing::ProbingConfigBuilder; +/// +/// let config = ProbingConfigBuilder::high_degree(100) +/// .interval(Duration::from_secs(30)) +/// .max_locked_msat(500_000) +/// .diversity_penalty_msat(250) +/// .build(); +/// +/// let mut builder = Builder::new(); +/// builder.set_probing_config(config); +/// # } +/// ``` +/// +/// Creating a custom strategy that always probes the same path: +/// ``` +/// use ldk_node::lightning::routing::router::Path; +/// use ldk_node::probing::ProbingStrategy; +/// +/// struct FixedPathStrategy { +/// path: Path, +/// } +/// impl ProbingStrategy for FixedPathStrategy { +/// fn next_probe(&self) -> Option { +/// if self.path.hops.len() > 1 { +/// Some(self.path.clone()) +/// } else { +/// None +/// } +/// } +/// } +/// ``` +#[derive(Clone, Debug)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Object))] +pub struct ProbingConfig { + pub(crate) kind: ProbingStrategyKind, + pub(crate) interval: Duration, + pub(crate) max_locked_msat: u64, + pub(crate) diversity_penalty_msat: Option, + pub(crate) cooldown: Duration, +} + +/// Builder for [`ProbingConfig`]. +/// +/// A new instance starts from one of three strategy constructors — [`high_degree`], +/// [`random_walk`], or [`custom`] — and is finalized through [`build`]. Optional setters +/// in between override the timing and liquidity defaults. +/// +/// [`high_degree`]: Self::high_degree +/// [`random_walk`]: Self::random_walk +/// [`custom`]: Self::custom +/// [`build`]: Self::build +pub struct ProbingConfigBuilder { + kind: ProbingStrategyKind, + interval: Duration, + max_locked_msat: u64, + diversity_penalty_msat: Option, + cooldown: Duration, +} + +impl ProbingConfigBuilder { + fn with_kind(kind: ProbingStrategyKind) -> Self { + Self { + kind, + interval: Duration::from_secs(DEFAULT_PROBING_INTERVAL_SECS), + max_locked_msat: DEFAULT_MAX_PROBE_LOCKED_MSAT, + diversity_penalty_msat: None, + cooldown: Duration::from_secs(DEFAULT_PROBED_NODE_COOLDOWN_SECS), + } + } + + /// Start building a config that probes toward the highest-degree nodes in the graph. + /// + /// `top_node_count` controls how many of the most-connected nodes are cycled through. + pub fn high_degree(top_node_count: usize) -> Self { + Self::with_kind(ProbingStrategyKind::HighDegree { top_node_count }) + } + + /// Start building a config that probes via random graph walks. + /// + /// `max_hops` is the upper bound on the number of hops in a randomly constructed path. + /// Values below `2` are clamped to `2`. + pub fn random_walk(max_hops: usize) -> Self { + Self::with_kind(ProbingStrategyKind::RandomWalk { max_hops }) + } + + /// Start building a config with a custom [`ProbingStrategy`] implementation. + pub fn custom(strategy: Arc) -> Self { + Self::with_kind(ProbingStrategyKind::Custom(strategy)) + } + + /// Overrides the interval between probe attempts. + /// + /// Defaults to 10 seconds. + pub fn interval(&mut self, interval: Duration) -> &mut Self { + self.interval = interval; + self + } + + /// Overrides the maximum millisatoshis that may be locked in in-flight probes at any time. + /// + /// Defaults to 100 000 000 msat (100k sats). + pub fn max_locked_msat(&mut self, max_msat: u64) -> &mut Self { + self.max_locked_msat = max_msat; + self + } + + /// Sets the probing diversity penalty applied by the probabilistic scorer. + /// + /// When set, the scorer will penalize channels that have been recently probed, + /// encouraging path diversity during background probing. The penalty decays + /// quadratically over 24 hours. + /// + /// This is only useful for probing strategies that route through the scorer + /// (e.g., [`HighDegreeStrategy`]). Strategies that build paths manually + /// (e.g., [`RandomWalkStrategy`]) bypass the scorer entirely. + /// + /// If unset, LDK's default of `0` (no penalty) is used. + pub fn diversity_penalty_msat(&mut self, penalty_msat: u64) -> &mut Self { + self.diversity_penalty_msat = Some(penalty_msat); + self + } + + /// Sets how long a probed node stays ineligible before being probed again. + /// + /// Only applies to [`HighDegreeStrategy`]. Defaults to 1 hour. + pub fn cooldown(&mut self, cooldown: Duration) -> &mut Self { + self.cooldown = cooldown; + self + } + + /// Builds the [`ProbingConfig`]. + pub fn build(&self) -> ProbingConfig { + ProbingConfig { + kind: self.kind.clone(), + interval: self.interval.max(MIN_PROBING_INTERVAL), + max_locked_msat: self.max_locked_msat, + diversity_penalty_msat: self.diversity_penalty_msat, + cooldown: self.cooldown, + } + } +} + +/// A UniFFI-compatible wrapper around [`ProbingConfigBuilder`] that uses interior mutability +/// so it can be shared behind an `Arc` as required by the FFI object model. +/// +/// Instances are produced by the constructors [`new_high_degree`] and [`new_random_walk`]. +/// The `set_*` methods override the defaults, and [`build`] yields the resulting +/// [`ProbingConfig`]. +/// +/// [`new_high_degree`]: Self::new_high_degree +/// [`new_random_walk`]: Self::new_random_walk +/// [`build`]: Self::build +#[cfg(feature = "uniffi")] +#[derive(uniffi::Object)] +pub struct ArcedProbingConfigBuilder { + inner: RwLock, +} + +#[cfg(feature = "uniffi")] +#[uniffi::export] +impl ArcedProbingConfigBuilder { + /// Start building a config that probes toward the highest-degree nodes in the graph. + /// + /// `top_node_count` controls how many of the most-connected nodes are cycled through. + #[uniffi::constructor] + pub fn new_high_degree(top_node_count: u64) -> Arc { + Arc::new(Self { + inner: RwLock::new(ProbingConfigBuilder::high_degree(top_node_count as usize)), + }) + } + + /// Start building a config that probes via random graph walks. + /// + /// `max_hops` is the upper bound on the number of hops in a randomly constructed path. + /// Values below `2` are clamped to `2`. + #[uniffi::constructor] + pub fn new_random_walk(max_hops: u64) -> Arc { + Arc::new(Self { inner: RwLock::new(ProbingConfigBuilder::random_walk(max_hops as usize)) }) + } + + /// Overrides the interval between probe attempts. + /// + /// Defaults to 10 seconds. + pub fn set_interval(&self, secs: u64) { + self.inner.write().unwrap().interval(Duration::from_secs(secs)); + } + + /// Overrides the maximum millisatoshis that may be locked in in-flight probes at any time. + /// + /// Defaults to 100 000 000 msat (100k sats). + pub fn set_max_locked_msat(&self, max_msat: u64) { + self.inner.write().unwrap().max_locked_msat(max_msat); + } + + /// Sets the probing diversity penalty applied by the probabilistic scorer. + /// + /// When set, the scorer will penalize channels that have been recently probed, + /// encouraging path diversity during background probing. The penalty decays + /// quadratically over 24 hours. + /// + /// This is only useful for probing strategies that route through the scorer + /// (e.g., [`HighDegreeStrategy`]). Strategies that build paths manually + /// (e.g., [`RandomWalkStrategy`]) bypass the scorer entirely. + /// + /// If unset, LDK's default of `0` (no penalty) is used. + pub fn set_diversity_penalty_msat(&self, penalty_msat: u64) { + self.inner.write().unwrap().diversity_penalty_msat(penalty_msat); + } + + /// Sets how long a probed node stays ineligible before being probed again. + /// + /// Only applies to [`HighDegreeStrategy`]. Defaults to 1 hour. + pub fn set_cooldown(&self, secs: u64) { + self.inner.write().unwrap().cooldown(Duration::from_secs(secs)); + } + + /// Builds the [`ProbingConfig`]. + pub fn build(&self) -> Arc { + Arc::new(self.inner.read().unwrap().build()) + } +} + +/// A strategy that decides which path the probing service should probe next. +pub trait ProbingStrategy: Send + Sync + 'static { + /// Returns the next probe path to run, or `None` to skip this tick. + fn next_probe(&self) -> Option; +} + +/// Probes toward the most-connected nodes in the graph. +/// +/// On each tick the strategy reads the current gossip graph, sorts nodes by +/// channel count, and picks the highest-degree node from the top +/// `top_node_count` that has not been probed within `cooldown`. +/// Nodes probed more recently are skipped so that the strategy +/// naturally spreads across the top nodes and picks up graph changes. +/// If all top nodes are on cooldown, the cooldown map is cleared and a new cycle begins +/// immediately. +/// +/// The probe amount is chosen uniformly at random from +/// `[min_amount_msat, max_amount_msat]`. +/// +/// `HighDegreeStrategy` can only use publicly announced channels for probing. +pub struct HighDegreeStrategy { + network_graph: Arc, + channel_manager: Arc, + router: Arc, + /// How many of the highest-degree nodes to cycle through. + pub top_node_count: usize, + /// Lower bound for the randomly chosen probe amount. + pub min_amount_msat: u64, + /// Upper bound for the randomly chosen probe amount. + pub max_amount_msat: u64, + /// How long a node stays ineligible after being probed. + pub cooldown: Duration, + /// Skip a path when the first-hop outbound liquidity is less than + /// `path_value * liquidity_limit_multiplier`. + pub liquidity_limit_multiplier: u64, + /// Nodes probed recently, with the time they were last probed. + recently_probed: Mutex>, +} + +impl HighDegreeStrategy { + /// Creates a new high-degree probing strategy. + pub(crate) fn new( + network_graph: Arc, channel_manager: Arc, router: Arc, + top_node_count: usize, min_amount_msat: u64, max_amount_msat: u64, cooldown: Duration, + liquidity_limit_multiplier: u64, + ) -> Self { + assert!( + min_amount_msat <= max_amount_msat, + "min_amount_msat must not exceed max_amount_msat" + ); + Self { + network_graph, + channel_manager, + router, + top_node_count, + min_amount_msat, + max_amount_msat, + cooldown, + liquidity_limit_multiplier, + recently_probed: Mutex::new(HashMap::new()), + } + } +} + +impl ProbingStrategy for HighDegreeStrategy { + fn next_probe(&self) -> Option { + let graph = self.network_graph.read_only(); + + let mut nodes_by_degree: Vec<(PublicKey, usize)> = graph + .nodes() + .unordered_iter() + .filter_map(|(id, info)| { + PublicKey::try_from(*id).ok().map(|pubkey| (pubkey, info.channels.len())) + }) + .collect(); + + if nodes_by_degree.is_empty() { + return None; + } + + nodes_by_degree.sort_unstable_by(|a, b| b.1.cmp(&a.1)); + + let top_node_count = self.top_node_count.min(nodes_by_degree.len()); + let now = Instant::now(); + + let mut probed = self.recently_probed.lock().unwrap_or_else(|e| e.into_inner()); + + // We could check staleness when we use the entry, but that way we'd not clear cache at + // all. For hundreds of top nodes it's okay to call retain each tick. + probed.retain(|_, probed_at| now.duration_since(*probed_at) < self.cooldown); + + // If all top nodes are on cooldown, reset and start a new cycle. + let final_node = match nodes_by_degree[..top_node_count] + .iter() + .find(|(pubkey, _)| !probed.contains_key(pubkey)) + { + Some((pubkey, _)) => *pubkey, + None => { + probed.clear(); + nodes_by_degree[0].0 + }, + }; + + probed.insert(final_node, now); + drop(probed); + drop(graph); + + let amount_msat = random_range(self.min_amount_msat, self.max_amount_msat); + let payment_params = + PaymentParameters::from_node_id(final_node, DEFAULT_MIN_FINAL_CLTV_EXPIRY_DELTA as u32); + let route_params = + RouteParameters::from_payment_params_and_value(payment_params, amount_msat); + + let payer = self.channel_manager.get_our_node_id(); + let usable_channels = self.channel_manager.list_usable_channels(); + let first_hops: Vec<&_> = usable_channels.iter().collect(); + let inflight_htlcs = self.channel_manager.compute_inflight_htlcs(); + + let route = self + .router + .find_route(&payer, &route_params, Some(&first_hops), inflight_htlcs) + .ok()?; + + let path = route.paths.into_iter().next()?; + + // Liquidity-limit check (mirrors send_preflight_probes): skip the path when the + // first-hop outbound liquidity is less than path_value * liquidity_limit_multiplier. + if let Some(first_hop_hop) = path.hops.first() { + if let Some(ch) = usable_channels + .iter() + .find(|h| h.get_outbound_payment_scid() == Some(first_hop_hop.short_channel_id)) + { + let path_value = path.final_value_msat() + path.fee_msat(); + if ch.next_outbound_htlc_limit_msat + < path_value.saturating_mul(self.liquidity_limit_multiplier) + { + return None; + } + } + } + + Some(path) + } +} + +/// Explores the graph by walking a random number (≥2) of hops outward from one of our own +/// channels, constructing the [`Path`] explicitly. +/// +/// On each tick: +/// 1. Picks one of our confirmed, usable channels to start from. +/// 2. Performs a random walk of a chosen depth (up to [`MAX_PATH_LENGTH_ESTIMATE`]) through the +/// gossip graph, skipping disabled channels and dead-ends. +/// +/// The probe amount is chosen uniformly at random from `[min_amount_msat, max_amount_msat]`. +/// +/// Because path selection ignores the scorer, this probes channels the router +/// would never try on its own, teaching the scorer about previously unknown paths. +/// +/// `RandomWalkStrategy` can only use publicly announced channels for probing. +pub struct RandomWalkStrategy { + network_graph: Arc, + channel_manager: Arc, + /// Upper bound on the number of hops in a randomly constructed path. + pub max_hops: usize, + /// Lower bound for the randomly chosen probe amount. + pub min_amount_msat: u64, + /// Upper bound for the randomly chosen probe amount. + pub max_amount_msat: u64, +} + +impl RandomWalkStrategy { + /// Creates a new random-walk probing strategy. + pub(crate) fn new( + network_graph: Arc, channel_manager: Arc, max_hops: usize, + min_amount_msat: u64, max_amount_msat: u64, + ) -> Self { + assert!( + min_amount_msat <= max_amount_msat, + "min_amount_msat must not exceed max_amount_msat" + ); + Self { + network_graph, + channel_manager, + max_hops: max_hops.clamp(2, MAX_PATH_LENGTH_ESTIMATE as usize), + min_amount_msat, + max_amount_msat, + } + } + + /// Tries to build a path of `target_hops` hops. Returns `None` if the local node has no + /// usable channels, or the walk terminates before reaching `target_hops`. + fn try_build_path(&self, target_hops: usize, amount_msat: u64) -> Option { + let initial_channels = self + .channel_manager + .list_channels() + .into_iter() + .filter(|c| c.is_usable && c.short_channel_id.is_some()) + .collect::>(); + + if initial_channels.is_empty() { + return None; + } + + let graph = self.network_graph.read_only(); + let first_hop = + &initial_channels[random_range(0, initial_channels.len() as u64 - 1) as usize]; + let first_hop_scid = first_hop.short_channel_id?; + let next_peer_pubkey = first_hop.counterparty.node_id; + let next_peer_node_id = NodeId::from_pubkey(&next_peer_pubkey); + + // Track the tightest HTLC limit across all hops to cap the probe amount. + // The first hop limit comes from our live channel state; subsequent hops use htlc_maximum_msat from the gossip channel update. + let mut route_least_htlc_upper_bound = first_hop.next_outbound_htlc_limit_msat; + let mut route_greatest_htlc_lower_bound = first_hop.next_outbound_htlc_minimum_msat; + + // Walk the graph: each entry is (node_id, arrived_via_scid, pubkey); first entry is set: + let mut route: Vec<(NodeId, u64, PublicKey)> = + vec![(next_peer_node_id, first_hop_scid, next_peer_pubkey)]; + + let mut prev_scid = first_hop_scid; + let mut current_node_id = next_peer_node_id; + + for _ in 1..target_hops { + let node_info = match graph.node(¤t_node_id) { + Some(n) => n, + None => break, + }; + + // Skip the edge we arrived on. Longer cycles aren't filtered — probes fail at + // the destination anyway, so revisiting nodes is harmless. + let candidates: Vec = + node_info.channels.iter().copied().filter(|&scid| scid != prev_scid).collect(); + + if candidates.is_empty() { + break; + } + + let next_scid = candidates[random_range(0, candidates.len() as u64 - 1) as usize]; + let next_channel = match graph.channel(next_scid) { + Some(c) => c, + None => break, + }; + + // as_directed_from validates that current_node_id is a channel endpoint and that + // both direction updates are present; effective_capacity covers both htlc_maximum_msat + // and funding capacity. + let Some((directed, next_node_id)) = next_channel.as_directed_from(¤t_node_id) + else { + break; + }; + // Retrieve the direction-specific update via the public ChannelInfo fields. + // as_directed_from already checked both directions are Some, but we break + // defensively rather than unwrap. + let update = match if directed.source() == &next_channel.node_one { + next_channel.one_to_two.as_ref() + } else { + next_channel.two_to_one.as_ref() + } { + Some(u) => u, + None => break, + }; + + if !update.enabled { + break; + } + + route_least_htlc_upper_bound = + route_least_htlc_upper_bound.min(update.htlc_maximum_msat); + + route_greatest_htlc_lower_bound = + route_greatest_htlc_lower_bound.max(update.htlc_minimum_msat); + + let next_pubkey = match PublicKey::try_from(*next_node_id) { + Ok(pk) => pk, + Err(_) => break, + }; + + route.push((*next_node_id, next_scid, next_pubkey)); + prev_scid = next_scid; + current_node_id = *next_node_id; + } + + if route_greatest_htlc_lower_bound > route_least_htlc_upper_bound { + return None; + } + let amount_msat = + amount_msat.max(route_greatest_htlc_lower_bound).min(route_least_htlc_upper_bound); + if amount_msat < self.min_amount_msat || amount_msat > self.max_amount_msat { + return None; + } + + // Assemble hops backwards so each hop's proportional fee is computed on the amount it actually forwards + let mut hops = Vec::with_capacity(route.len()); + let mut forwarded = amount_msat; + let last = route.len() - 1; + + // Resolve (node_features, channel_features, maybe_announced_channel) for a hop. + // The first hop is our local channel and may be unannounced, so its ChannelFeatures + // are not in the gossip graph — match on SCID to detect it and fall back to local-state + // defaults. All other (walked) hops were picked from the graph and must resolve there. + let hop_features = + |node_id: &NodeId, via_scid: u64| -> Option<(NodeFeatures, ChannelFeatures, bool)> { + let node_features = graph + .node(node_id) + .and_then(|n| n.announcement_info.as_ref().map(|a| a.features().clone())) + .unwrap_or_else(NodeFeatures::empty); + let (channel_features, maybe_announced_channel) = if via_scid == first_hop_scid { + (ChannelFeatures::empty(), false) + } else { + (graph.channel(via_scid)?.features.clone(), true) + }; + Some((node_features, channel_features, maybe_announced_channel)) + }; + + // Final hop: fee_msat carries the delivery amount; cltv_expiry_delta carries the + // destination's final CLTV (matching LDK's shifted-by-one RouteHop convention). + { + let (node_id, via_scid, pubkey) = route[last]; + let (node_features, channel_features, maybe_announced_channel) = + hop_features(&node_id, via_scid)?; + hops.push(RouteHop { + pubkey, + node_features, + short_channel_id: via_scid, + channel_features, + fee_msat: amount_msat, + cltv_expiry_delta: DEFAULT_MIN_FINAL_CLTV_EXPIRY_DELTA as u32, + maybe_announced_channel, + }); + } + + // Non-final hops, from second-to-last back to first. + for i in (0..last).rev() { + let (node_id, via_scid, pubkey) = route[i]; + let (node_features, channel_features, maybe_announced_channel) = + hop_features(&node_id, via_scid)?; + + let (_, next_scid, _) = route[i + 1]; + let next_channel = graph.channel(next_scid)?; + let (directed, _) = next_channel.as_directed_from(&node_id)?; + let update = match if directed.source() == &next_channel.node_one { + next_channel.one_to_two.as_ref() + } else { + next_channel.two_to_one.as_ref() + } { + Some(u) => u, + None => return None, + }; + let fee = update.fees.base_msat as u64 + + (forwarded * update.fees.proportional_millionths as u64 / 1_000_000); + forwarded += fee; + + hops.push(RouteHop { + pubkey, + node_features, + short_channel_id: via_scid, + channel_features, + fee_msat: fee, + cltv_expiry_delta: update.cltv_expiry_delta as u32, + maybe_announced_channel, + }); + } + + hops.reverse(); + + // The first-hop HTLC carries amount_msat + all intermediate fees. + // Verify the total fits within our live outbound limit before returning. + let total_outgoing: u64 = hops.iter().map(|h| h.fee_msat).sum(); + if total_outgoing > first_hop.next_outbound_htlc_limit_msat { + return None; + } + + Some(Path { hops, blinded_tail: None }) + } +} + +impl ProbingStrategy for RandomWalkStrategy { + fn next_probe(&self) -> Option { + let target_hops = random_range(2, self.max_hops as u64) as usize; + let amount_msat = random_range(self.min_amount_msat, self.max_amount_msat); + + self.try_build_path(target_hops, amount_msat) + } +} + +/// Periodically dispatches probes according to a [`ProbingStrategy`]. +pub struct Prober { + pub(crate) channel_manager: Arc, + pub(crate) logger: Arc, + /// The strategy that decides what to probe. + pub strategy: Arc, + /// How often to fire a probe attempt. + pub interval: Duration, + /// Maximum total millisatoshis that may be locked in in-flight probes at any time. + pub max_locked_msat: u64, +} + +fn fmt_path(path: &lightning::routing::router::Path) -> String { + path.hops + .iter() + .map(|h| format!("{}(scid={})", h.pubkey, h.short_channel_id)) + .collect::>() + .join(" -> ") +} + +impl Prober { + /// Returns the total millisatoshis currently locked in in-flight probes. + pub fn locked_msat(&self) -> u64 { + return self + .channel_manager + .list_recent_payments() + .into_iter() + .filter_map(|p| match p { + RecentPaymentDetails::Pending { is_probe: true, total_msat, .. } => { + Some(total_msat) + }, + _ => None, + }) + .sum(); + } + + pub(crate) fn handle_background_probe_successful(&self, path: &Path, payment_id: PaymentId) { + log_debug!( + self.logger, + "Background probe with payment_id: {} succeeded along the path: {}", + payment_id, + fmt_path(path) + ); + } + + pub(crate) fn handle_background_probe_failed(&self, path: &Path, payment_id: PaymentId) { + log_debug!( + self.logger, + "Background probe with payment_id: {} failed along the path: {}", + payment_id, + fmt_path(path) + ); + } +} + +/// Runs the probing loop for the given [`Prober`] until `stop_rx` fires. +pub(crate) async fn run_prober(prober: Arc, mut stop_rx: tokio::sync::watch::Receiver<()>) { + let mut ticker = tokio::time::interval(prober.interval); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + biased; + _ = stop_rx.changed() => { + log_debug!(prober.logger, "Stopping background probing."); + return; + } + _ = ticker.tick() => { + let path = match prober.strategy.next_probe() { + Some(p) => p, + None => continue, + }; + let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum(); + if prober.locked_msat() + amount > prober.max_locked_msat { + log_debug!(prober.logger, "Skipping probe: locked-msat budget exceeded."); + continue; + } + match prober.channel_manager.send_probe(path.clone()) { + Ok((_, payment_id)) => { + log_debug!( + prober.logger, + "Background probe with payment_id {} sent: locked {} msat, path: {}", + payment_id, + amount, + fmt_path(&path) + ); + } + Err(e) => { + log_debug!( + prober.logger, + "Background probe send failed: {:?}, path: {}", + e, + fmt_path(&path) + ); + } + } + } + } + } +} diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000000..3350ad2c70 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,37 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +/// Returns a random `u64` uniformly distributed in `[min, max]` (inclusive). +pub(crate) fn random_range(min: u64, max: u64) -> u64 { + debug_assert!(min <= max); + if min == max { + return min; + } + let range = match (max - min).checked_add(1) { + Some(r) => r, + None => { + // overflowed — full u64::MAX range + let mut buf = [0u8; 8]; + getrandom::fill(&mut buf).expect("getrandom failed"); + return u64::from_ne_bytes(buf); + }, + }; + // We remove bias due to the fact that the range does not evenly divide 2⁶⁴. + // Imagine we had a range from 0 to 2⁶⁴-2 (of length 2⁶⁴-1), then + // the outcomes of 0 would be twice as frequent as any other, as 0 can be produced + // as randomly drawn 0 % 2⁶⁴-1 and as well as 2⁶⁴-1 % 2⁶⁴-1 + let limit = u64::MAX - (u64::MAX % range); + loop { + let mut buf = [0u8; 8]; + getrandom::fill(&mut buf).expect("getrandom failed"); + let val = u64::from_ne_bytes(buf); + if val < limit { + return min + (val % range); + } + // loop runs ~1 iteration on average, in worst case it's ~2 iterations on average + } +} From a9814d18accae6e748d003e03d846650de4cfc8c Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Wed, 6 May 2026 03:53:18 +0200 Subject: [PATCH 2/4] Move ffi re-exports to `ffi/types.rs` --- src/ffi/types.rs | 1 + src/lib.rs | 4 ---- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 7380d75cac..778844145d 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -149,6 +149,7 @@ pub use crate::entropy::{generate_entropy_mnemonic, NodeEntropy, WordCount}; use crate::error::Error; pub use crate::liquidity::LSPS1OrderStatus; pub use crate::logger::{LogLevel, LogRecord, LogWriter}; +pub use crate::probing::ProbingConfig; use crate::{hex_utils, SocketAddress, UserChannelId}; uniffi::custom_type!(PublicKey, String, { diff --git a/src/lib.rs b/src/lib.rs index 4b59376eca..be534de692 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -115,8 +115,6 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; #[cfg(cycle_tests)] use std::{any::Any, sync::Weak}; -#[cfg(feature = "uniffi")] -use crate::probing::ProbingConfig; pub use balance::{BalanceDetails, LightningBalance, PendingSweepBalance}; pub use bip39; pub use bitcoin; @@ -176,8 +174,6 @@ use payment::{ UnifiedPayment, }; use peer_store::{PeerInfo, PeerStore}; -#[cfg(feature = "uniffi")] -pub use probing::ArcedProbingConfigBuilder as ProbingConfigBuilder; use probing::{run_prober, Prober}; use runtime::Runtime; pub use tokio; From 51808c096547dec5b96ec2d51e7d57c5f307485c Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Tue, 28 Apr 2026 18:13:21 +0200 Subject: [PATCH 3/4] Add probing service tests Add integration tests that verify the probing service fires probes on the configured interval and respects the locked-msat budget cap. Shared helpers in tests/common are extended with probing-aware setup. Co-Authored-By: Claude Sonnet 4.6 --- src/builder.rs | 2 +- src/probing.rs | 10 +- tests/common/mod.rs | 67 ++++++- tests/probing_tests.rs | 389 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 455 insertions(+), 13 deletions(-) create mode 100644 tests/probing_tests.rs diff --git a/src/builder.rs b/src/builder.rs index 1a281cc93d..31846896d3 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1204,7 +1204,7 @@ impl ArcedNodeBuilder { /// /// [`ProbingConfigBuilder`]: crate::probing::ProbingConfigBuilder pub fn set_probing_config(&self, config: Arc) { - self.inner.write().unwrap().set_probing_config((*config).clone()); + self.inner.write().expect("lock").set_probing_config((*config).clone()); } /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options diff --git a/src/probing.rs b/src/probing.rs index d37f4ba272..ec13ab5d24 100644 --- a/src/probing.rs +++ b/src/probing.rs @@ -308,14 +308,14 @@ impl ArcedProbingConfigBuilder { /// /// Defaults to 10 seconds. pub fn set_interval(&self, secs: u64) { - self.inner.write().unwrap().interval(Duration::from_secs(secs)); + self.inner.write().expect("lock").interval(Duration::from_secs(secs)); } /// Overrides the maximum millisatoshis that may be locked in in-flight probes at any time. /// /// Defaults to 100 000 000 msat (100k sats). pub fn set_max_locked_msat(&self, max_msat: u64) { - self.inner.write().unwrap().max_locked_msat(max_msat); + self.inner.write().expect("lock").max_locked_msat(max_msat); } /// Sets the probing diversity penalty applied by the probabilistic scorer. @@ -330,19 +330,19 @@ impl ArcedProbingConfigBuilder { /// /// If unset, LDK's default of `0` (no penalty) is used. pub fn set_diversity_penalty_msat(&self, penalty_msat: u64) { - self.inner.write().unwrap().diversity_penalty_msat(penalty_msat); + self.inner.write().expect("lock").diversity_penalty_msat(penalty_msat); } /// Sets how long a probed node stays ineligible before being probed again. /// /// Only applies to [`HighDegreeStrategy`]. Defaults to 1 hour. pub fn set_cooldown(&self, secs: u64) { - self.inner.write().unwrap().cooldown(Duration::from_secs(secs)); + self.inner.write().expect("lock").cooldown(Duration::from_secs(secs)); } /// Builds the [`ProbingConfig`]. pub fn build(&self) -> Arc { - Arc::new(self.inner.read().unwrap().build()) + Arc::new(self.inner.read().expect("lock").build()) } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index d7775e67b3..9e84330bca 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -43,6 +43,7 @@ use ldk_node::config::{ use ldk_node::entropy::{generate_entropy_mnemonic, NodeEntropy}; use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus}; +use ldk_node::probing::ProbingConfig; use ldk_node::{ Builder, ChannelShutdownState, CustomTlvRecord, Event, LightningBalance, Node, NodeError, PendingSweepBalance, UserChannelId, @@ -403,9 +404,9 @@ pub(crate) fn random_config(anchor_channels: bool) -> TestConfig { } #[cfg(feature = "uniffi")] -type TestNode = Arc; +pub(crate) type TestNode = Arc; #[cfg(not(feature = "uniffi"))] -type TestNode = Node; +pub(crate) type TestNode = Node; #[derive(Clone)] pub(crate) enum TestChainSource<'a> { @@ -436,6 +437,7 @@ pub(crate) struct TestConfig { pub node_entropy: NodeEntropy, pub async_payments_role: Option, pub recovery_mode: bool, + pub probing: Option, } impl Default for TestConfig { @@ -455,6 +457,7 @@ impl Default for TestConfig { node_entropy, async_payments_role, recovery_mode, + probing: None, } } } @@ -590,6 +593,10 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> builder.set_wallet_recovery_mode(); } + if let Some(probing) = config.probing { + builder.set_probing_config(probing.into()); + } + let node = match config.store_type { TestStoreType::TestSyncStore => { let kv_store = TestSyncStore::new(config.node_config.storage_dir_path.into()); @@ -695,6 +702,37 @@ pub(crate) async fn wait_for_outpoint_spend(electrs: &E, outpoin .await; } +/// Polls the channel from `source_node` to `counterparty_node` until it reports `is_usable` +/// and can carry an HTLC of `min_amount_msat` from `source_node`'s side. +/// +/// After `ChannelReady`, channel-monitor persistence can lag for tens of seconds on slow +/// CI runners; during that window `send_probe`/`send_payment` reject with +/// `ParameterError("...monitor update is in progress...")`. This helper gives tests a +/// deterministic readiness gate instead of racing the monitor-update pipeline. +pub(crate) async fn wait_for_channel_ready_to_send( + source_node: &TestNode, counterparty_node: &TestNode, min_amount_msat: u64, +) { + let counterparty = counterparty_node.node_id(); + let deadline = tokio::time::Instant::now() + Duration::from_secs(180); + while tokio::time::Instant::now() < deadline { + let ready = source_node.list_channels().iter().any(|c| { + c.counterparty_node_id == counterparty + && c.is_usable + && c.next_outbound_htlc_limit_msat >= min_amount_msat + }); + if ready { + return; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + panic!( + "channel from {} to {} not ready to send {} msat within 180s", + source_node.node_id(), + counterparty, + min_amount_msat, + ); +} + pub(crate) async fn exponential_backoff_poll(mut poll: F) -> T where F: FnMut() -> Option, @@ -820,12 +858,18 @@ pub async fn open_channel( node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, should_announce: bool, electrsd: &ElectrsD, ) -> OutPoint { - open_channel_push_amt(node_a, node_b, funding_amount_sat, None, should_announce, electrsd).await + let funding_txo = + open_channel_no_wait(node_a, node_b, funding_amount_sat, None, should_announce).await; + wait_for_tx(&electrsd.client, funding_txo.txid).await; + funding_txo } -pub async fn open_channel_push_amt( +/// Like [`open_channel`] but skips the `wait_for_tx` electrum check so that +/// multiple channels can be opened back-to-back before any blocks are mined. +/// The caller is responsible for mining blocks and confirming the funding txs. +pub async fn open_channel_no_wait( node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, push_amount_msat: Option, - should_announce: bool, electrsd: &ElectrsD, + should_announce: bool, ) -> OutPoint { if should_announce { node_a @@ -853,11 +897,20 @@ pub async fn open_channel_push_amt( let funding_txo_a = expect_channel_pending_event!(node_a, node_b.node_id()); let funding_txo_b = expect_channel_pending_event!(node_b, node_a.node_id()); assert_eq!(funding_txo_a, funding_txo_b); - wait_for_tx(&electrsd.client, funding_txo_a.txid).await; - funding_txo_a } +pub async fn open_channel_push_amt( + node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, push_amount_msat: Option, + should_announce: bool, electrsd: &ElectrsD, +) -> OutPoint { + let funding_txo = + open_channel_no_wait(node_a, node_b, funding_amount_sat, push_amount_msat, should_announce) + .await; + wait_for_tx(&electrsd.client, funding_txo.txid).await; + funding_txo +} + pub async fn open_channel_with_all( node_a: &TestNode, node_b: &TestNode, should_announce: bool, electrsd: &ElectrsD, ) -> OutPoint { diff --git a/tests/probing_tests.rs b/tests/probing_tests.rs new file mode 100644 index 0000000000..c293d955c0 --- /dev/null +++ b/tests/probing_tests.rs @@ -0,0 +1,389 @@ +// Integration tests for the probing service. +// +// Budget tests – linear A ──[1M sats]──▶ B ──[1M sats]──▶ C topology: +// +// probe_budget_increments_and_decrements +// Verifies locked_msat rises when a probe is dispatched and returns +// to zero once the probe resolves. +// +// exhausted_probe_budget_blocks_new_probes +// Samples locked_msat across multiple probe cycles and asserts it never +// exceeds the configured max_locked_msat budget cap. +// +// probing_budget_restored_after_node_restart +// Dispatches a probe, then stops node_b before the failure can propagate +// back so the pending probe HTLC is preserved. Restarts node_a and asserts +// the prober's locked_msat is rebuilt non-zero from list_recent_payments(). + +mod common; +use std::sync::atomic::{AtomicBool, Ordering}; + +use common::{ + expect_channel_ready_event, expect_event, generate_blocks_and_wait, open_channel, + premine_and_distribute_funds, random_chain_source, random_config, setup_bitcoind_and_electrsd, + setup_node, wait_for_channel_ready_to_send, TestNode, TestStoreType, +}; + +use ldk_node::bitcoin::Amount; +use ldk_node::probing::{ProbingConfigBuilder, ProbingStrategy}; +use ldk_node::Event; + +use lightning::routing::router::Path; + +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +const PROBE_AMOUNT_MSAT: u64 = 1_000_000; +const PROBING_INTERVAL_MILLISECONDS: u64 = 100; + +/// FixedPathStrategy — returns a fixed pre-built path; used by budget tests. +/// +/// The path is set after node and channel setup via [`set_path`]. +struct FixedPathStrategy { + path: Mutex>, + ready_to_probe: AtomicBool, +} + +impl FixedPathStrategy { + fn new() -> Arc { + Arc::new(Self { path: Mutex::new(None), ready_to_probe: AtomicBool::new(false) }) + } + + fn set_path(&self, path: Path) { + *self.path.lock().unwrap() = Some(path); + } + + fn start_probing(&self) { + self.ready_to_probe.store(true, Ordering::Relaxed); + } + + fn stop_probing(&self) { + self.ready_to_probe.store(false, Ordering::Relaxed); + } +} + +impl ProbingStrategy for FixedPathStrategy { + fn next_probe(&self) -> Option { + if self.ready_to_probe.load(Ordering::Relaxed) { + self.path.lock().unwrap().clone() + } else { + None + } + } +} + +/// Builds a 2-hop probe path: node_a → node_b → node_c using live channel info. +fn build_probe_path( + node_a: &TestNode, node_b: &TestNode, node_c: &TestNode, amount_msat: u64, +) -> Path { + use lightning::routing::router::RouteHop; + use lightning_types::features::{ChannelFeatures, NodeFeatures}; + + let ch_ab = node_a + .list_channels() + .into_iter() + .find(|ch| ch.counterparty_node_id == node_b.node_id() && ch.short_channel_id.is_some()) + .expect("A→B channel not found"); + let ch_bc = node_b + .list_channels() + .into_iter() + .find(|ch| ch.counterparty_node_id == node_c.node_id() && ch.short_channel_id.is_some()) + .expect("B→C channel not found"); + + Path { + hops: vec![ + RouteHop { + pubkey: node_b.node_id(), + node_features: NodeFeatures::empty(), + short_channel_id: ch_ab.short_channel_id.unwrap(), + channel_features: ChannelFeatures::empty(), + fee_msat: 1000, + cltv_expiry_delta: 144, + maybe_announced_channel: true, + }, + RouteHop { + pubkey: node_c.node_id(), + node_features: NodeFeatures::empty(), + short_channel_id: ch_bc.short_channel_id.unwrap(), + channel_features: ChannelFeatures::empty(), + fee_msat: amount_msat, + cltv_expiry_delta: 18, + maybe_announced_channel: true, + }, + ], + blinded_tail: None, + } +} + +/// Verifies that `locked_msat` increases when a probe is dispatched and returns +/// to zero once the probe resolves (succeeds or fails). +#[tokio::test(flavor = "multi_thread")] +async fn probe_budget_increments_and_decrements() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + let node_b = setup_node(&chain_source, random_config(false)); + let node_c = setup_node(&chain_source, random_config(false)); + + let mut config_a = random_config(false); + let strategy = FixedPathStrategy::new(); + config_a.probing = Some( + ProbingConfigBuilder::custom(strategy.clone()) + .interval(Duration::from_millis(PROBING_INTERVAL_MILLISECONDS)) + .max_locked_msat(10 * PROBE_AMOUNT_MSAT) + .build(), + ); + let node_a = setup_node(&chain_source, config_a); + + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a, addr_b], + Amount::from_sat(2_000_000), + ) + .await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; + node_b.sync_wallets().unwrap(); + open_channel(&node_b, &node_c, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + node_c.sync_wallets().unwrap(); + + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_event!(node_b, ChannelReady); + expect_event!(node_b, ChannelReady); + expect_event!(node_c, ChannelReady); + + // Build the probe path now that channels are ready, then enable probing. + strategy.set_path(build_probe_path(&node_a, &node_b, &node_c, PROBE_AMOUNT_MSAT)); + // First hop carries amount + per-hop fee; second hop carries just amount. + wait_for_channel_ready_to_send(&node_a, &node_b, PROBE_AMOUNT_MSAT + 1000).await; + wait_for_channel_ready_to_send(&node_b, &node_c, PROBE_AMOUNT_MSAT).await; + strategy.start_probing(); + + let went_up = tokio::time::timeout(Duration::from_secs(30), async { + loop { + if node_a.prober().unwrap().locked_msat() > 0 { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await + .is_ok(); + assert!(went_up, "locked_msat never increased — no probe was dispatched"); + println!("First probe dispatched; locked_msat = {}", node_a.prober().unwrap().locked_msat()); + + strategy.stop_probing(); + let cleared = tokio::time::timeout(Duration::from_secs(30), async { + loop { + if node_a.prober().unwrap().locked_msat() == 0 { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await + .is_ok(); + assert!(cleared, "locked_msat never returned to zero after probe resolved"); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); + node_c.stop().unwrap(); +} + +/// Verifies that `locked_msat` is restored after the node is stopped and restarted +/// while a probe is still in flight. +/// +/// Race-sensitive: once a probe is dispatched, the failure round-trip +/// (`A→B→C → C fails back → B → A`) resolves it within milliseconds. To keep the +/// HTLC pending across the restart we observe `locked_msat > 0` and then *immediately* +/// call `node_a.disconnect(node_b)`, which closes A's socket to B in-process — much +/// faster than `node_b.stop()` — so any failure message from B is dropped before A +/// processes it. If the race is lost on a given probe (locked_msat drops back to 0 +/// after the disconnect), we reconnect and let the next probe tick try again. +/// The pending Probe entry persists in `node_a`'s channel manager and must be +/// rebuilt by the prober's `locked_msat` on restart via `list_recent_payments()`. +#[tokio::test(flavor = "multi_thread")] +async fn probing_budget_restored_after_node_restart() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + let node_b = setup_node(&chain_source, random_config(false)); + let node_c = setup_node(&chain_source, random_config(false)); + + let mut config_a = random_config(false); + // Use a pure on-disk store so state survives the restart. + config_a.store_type = TestStoreType::Sqlite; + let strategy = FixedPathStrategy::new(); + config_a.probing = Some( + ProbingConfigBuilder::custom(strategy.clone()) + .interval(Duration::from_millis(PROBING_INTERVAL_MILLISECONDS)) + .max_locked_msat(10 * PROBE_AMOUNT_MSAT) + .build(), + ); + let restart_config = config_a.clone(); + let node_a = setup_node(&chain_source, config_a); + + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a, addr_b], + Amount::from_sat(2_000_000), + ) + .await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; + node_b.sync_wallets().unwrap(); + open_channel(&node_b, &node_c, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + node_c.sync_wallets().unwrap(); + + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_event!(node_b, ChannelReady); + expect_event!(node_b, ChannelReady); + expect_event!(node_c, ChannelReady); + + strategy.set_path(build_probe_path(&node_a, &node_b, &node_c, PROBE_AMOUNT_MSAT)); + wait_for_channel_ready_to_send(&node_a, &node_b, PROBE_AMOUNT_MSAT + 1000).await; + wait_for_channel_ready_to_send(&node_b, &node_c, PROBE_AMOUNT_MSAT).await; + + let node_b_id = node_b.node_id(); + let node_b_addr = node_b.listening_addresses().unwrap().into_iter().next().unwrap(); + + strategy.start_probing(); + + // Dispatch a probe and isolate node_a from node_b before the failure can + // propagate back. Tight polling + in-process disconnect minimises the race + // window; on a lost race we reconnect and let the prober's next tick try. + let isolated = tokio::time::timeout(Duration::from_secs(30), async { + loop { + if node_a.prober().unwrap().locked_msat() > 0 { + node_a.disconnect(node_b_id).ok(); + if node_a.prober().unwrap().locked_msat() > 0 { + return true; + } + node_a.connect(node_b_id, node_b_addr.clone(), false).ok(); + } + tokio::time::sleep(Duration::from_millis(1)).await; + } + }) + .await + .unwrap_or(false); + assert!(isolated, "could not preserve in-flight probe long enough to restart"); + strategy.stop_probing(); + + let locked_before = node_a.prober().unwrap().locked_msat(); + println!("Before restart: locked_msat = {}", locked_before); + assert!(locked_before > 0, "probe resolved before we could isolate node_a — flaky timing"); + + node_a.stop().unwrap(); + + // Restart node_a from the same persisted state. + let node_a = setup_node(&chain_source, restart_config); + + let locked_after = node_a.prober().unwrap().locked_msat(); + println!("After restart: locked_msat = {}", locked_after); + assert!( + locked_after > 0, + "locked_msat was not restored after restart (before={} after={})", + locked_before, + locked_after + ); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); + node_c.stop().unwrap(); +} + +/// Verifies that `locked_msat` never exceeds `max_locked_msat` across multiple probe cycles. +#[tokio::test(flavor = "multi_thread")] +async fn exhausted_probe_budget_blocks_new_probes() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + let node_b = setup_node(&chain_source, random_config(false)); + let node_c = setup_node(&chain_source, random_config(false)); + + let mut config_a = random_config(false); + let strategy = FixedPathStrategy::new(); + let max_locked_msat = 2 * PROBE_AMOUNT_MSAT; + config_a.probing = Some( + ProbingConfigBuilder::custom(strategy.clone()) + .interval(Duration::from_millis(PROBING_INTERVAL_MILLISECONDS)) + .max_locked_msat(max_locked_msat) + .build(), + ); + let node_a = setup_node(&chain_source, config_a); + + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a, addr_b], + Amount::from_sat(2_000_000), + ) + .await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; + node_b.sync_wallets().unwrap(); + open_channel(&node_b, &node_c, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + node_c.sync_wallets().unwrap(); + + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_event!(node_b, ChannelReady); + expect_event!(node_b, ChannelReady); + expect_event!(node_c, ChannelReady); + + assert_eq!(node_a.prober().map_or(1, |p| p.locked_msat()), 0, "initial locked_msat is nonzero"); + + strategy.set_path(build_probe_path(&node_a, &node_b, &node_c, PROBE_AMOUNT_MSAT)); + wait_for_channel_ready_to_send(&node_a, &node_b, PROBE_AMOUNT_MSAT + 1000).await; + wait_for_channel_ready_to_send(&node_b, &node_c, PROBE_AMOUNT_MSAT).await; + strategy.start_probing(); + + // Sample locked_msat across multiple probe cycles and assert the budget cap is never exceeded + let mut observed_locked = false; + let deadline = tokio::time::Instant::now() + Duration::from_secs(30); + while tokio::time::Instant::now() < deadline { + let msat = node_a.prober().map_or(0, |p| p.locked_msat()); + if msat > 0 { + observed_locked = true; + } + assert!( + msat <= max_locked_msat, + "locked_msat {msat} exceeded budget cap {max_locked_msat}" + ); + tokio::time::sleep(Duration::from_millis(25)).await; + } + + assert!(observed_locked, "no probe was dispatched during the observation window"); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); + node_c.stop().unwrap(); +} From b57aa5647af1f0540765964cadbaf401c0053b0c Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Wed, 10 Jun 2026 16:23:08 +0200 Subject: [PATCH 4/4] probing: change uniffi docs to be as ordinary Uniffi documentation is equal to the documentation for "ordinary" code. Also we add a `scorer` reference to `HighDegreeStrategy`, so it doesn't apply global fee penalties and probing node can make a payments which would take "best" routes. --- bindings/ldk_node.udl | 14 ++++++- src/builder.rs | 23 ++++++++--- src/lib.rs | 2 + src/probing.rs | 36 ++++++++--------- tests/probing_tests.rs | 91 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 138 insertions(+), 28 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 5455a5af50..d2d6cbfce8 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -15,8 +15,6 @@ typedef interface NodeEntropy; typedef interface ProbingConfig; -typedef interface ProbingConfigBuilder; - typedef enum WordCount; [Remote] @@ -36,6 +34,18 @@ interface LogWriter { void log(LogRecord record); }; +interface ProbingConfigBuilder { + [Name=high_degree] + constructor(u64 top_node_count); + [Name=random_walk] + constructor(u64 max_hops); + void set_interval(u64 secs); + void set_max_locked_msat(u64 max_msat); + void set_diversity_penalty_msat(u64 penalty_msat); + void set_cooldown(u64 secs); + ProbingConfig build(); +}; + interface Builder { constructor(); [Name=from_config] diff --git a/src/builder.rs b/src/builder.rs index 31846896d3..c3ec541d46 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -78,7 +78,8 @@ use crate::message_handler::NodeCustomMessageHandler; use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; use crate::peer_store::PeerStore; use crate::probing::{ - HighDegreeStrategy, Prober, ProbingConfig, ProbingStrategy, ProbingStrategyKind, RandomWalkStrategy, + HighDegreeStrategy, Prober, ProbingConfig, ProbingStrategy, ProbingStrategyKind, + RandomWalkStrategy, }; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; @@ -1821,10 +1822,7 @@ fn build_with_store_internal( }, } - let mut scoring_fee_params = ProbabilisticScoringFeeParameters::default(); - if let Some(penalty) = probing_config.and_then(|c| c.diversity_penalty_msat) { - scoring_fee_params.probing_diversity_penalty_msat = penalty; - } + let scoring_fee_params = ProbabilisticScoringFeeParameters::default(); let router = Arc::new(DefaultRouter::new( Arc::clone(&network_graph), Arc::clone(&logger), @@ -2202,10 +2200,23 @@ fn build_with_store_internal( let prober = probing_config.map(|probing_cfg| { let strategy: Arc = match &probing_cfg.kind { ProbingStrategyKind::HighDegree { top_node_count } => { + // Dedicated router for probing so the diversity penalty doesn't interfere + // with real payments; shares the scorer so probe results still train it. + let mut probing_fee_params = ProbabilisticScoringFeeParameters::default(); + if let Some(penalty) = probing_cfg.diversity_penalty_msat { + probing_fee_params.probing_diversity_penalty_msat = penalty; + } + let probing_router = Arc::new(DefaultRouter::new( + Arc::clone(&network_graph), + Arc::clone(&logger), + Arc::clone(&keys_manager), + Arc::clone(&scorer), + probing_fee_params, + )); Arc::new(HighDegreeStrategy::new( Arc::clone(&network_graph), Arc::clone(&channel_manager), - Arc::clone(&router), + probing_router, *top_node_count, DEFAULT_MIN_PROBE_AMOUNT_MSAT, DEFAULT_MAX_PROBE_AMOUNT_MSAT, diff --git a/src/lib.rs b/src/lib.rs index be534de692..374fc6749d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -174,6 +174,8 @@ use payment::{ UnifiedPayment, }; use peer_store::{PeerInfo, PeerStore}; +#[cfg(feature = "uniffi")] +pub use probing::ArcedProbingConfigBuilder as ProbingConfigBuilder; use probing::{run_prober, Prober}; use runtime::Runtime; pub use tokio; diff --git a/src/probing.rs b/src/probing.rs index ec13ab5d24..2996863cce 100644 --- a/src/probing.rs +++ b/src/probing.rs @@ -266,42 +266,35 @@ impl ProbingConfigBuilder { } } -/// A UniFFI-compatible wrapper around [`ProbingConfigBuilder`] that uses interior mutability -/// so it can be shared behind an `Arc` as required by the FFI object model. +/// Builder for [`ProbingConfig`]. /// -/// Instances are produced by the constructors [`new_high_degree`] and [`new_random_walk`]. -/// The `set_*` methods override the defaults, and [`build`] yields the resulting -/// [`ProbingConfig`]. +/// A new instance starts from one of two strategy constructors — [`high_degree`] or +/// [`random_walk`] — and is finalized through [`build`]. Optional setters in between +/// override the timing and liquidity defaults. /// -/// [`new_high_degree`]: Self::new_high_degree -/// [`new_random_walk`]: Self::new_random_walk +/// [`high_degree`]: Self::high_degree +/// [`random_walk`]: Self::random_walk /// [`build`]: Self::build #[cfg(feature = "uniffi")] -#[derive(uniffi::Object)] pub struct ArcedProbingConfigBuilder { inner: RwLock, } #[cfg(feature = "uniffi")] -#[uniffi::export] impl ArcedProbingConfigBuilder { /// Start building a config that probes toward the highest-degree nodes in the graph. /// /// `top_node_count` controls how many of the most-connected nodes are cycled through. - #[uniffi::constructor] - pub fn new_high_degree(top_node_count: u64) -> Arc { - Arc::new(Self { - inner: RwLock::new(ProbingConfigBuilder::high_degree(top_node_count as usize)), - }) + pub fn high_degree(top_node_count: u64) -> Self { + Self { inner: RwLock::new(ProbingConfigBuilder::high_degree(top_node_count as usize)) } } /// Start building a config that probes via random graph walks. /// /// `max_hops` is the upper bound on the number of hops in a randomly constructed path. /// Values below `2` are clamped to `2`. - #[uniffi::constructor] - pub fn new_random_walk(max_hops: u64) -> Arc { - Arc::new(Self { inner: RwLock::new(ProbingConfigBuilder::random_walk(max_hops as usize)) }) + pub fn random_walk(max_hops: u64) -> Self { + Self { inner: RwLock::new(ProbingConfigBuilder::random_walk(max_hops as usize)) } } /// Overrides the interval between probe attempts. @@ -759,9 +752,12 @@ impl Prober { .list_recent_payments() .into_iter() .filter_map(|p| match p { - RecentPaymentDetails::Pending { is_probe: true, total_msat, .. } => { - Some(total_msat) - }, + RecentPaymentDetails::Pending { + is_probe: true, + total_msat, + pending_fee_msat, + .. + } => Some(total_msat + pending_fee_msat.unwrap_or(0)), _ => None, }) .sum(); diff --git a/tests/probing_tests.rs b/tests/probing_tests.rs index c293d955c0..b024ac6cd1 100644 --- a/tests/probing_tests.rs +++ b/tests/probing_tests.rs @@ -6,6 +6,10 @@ // Verifies locked_msat rises when a probe is dispatched and returns // to zero once the probe resolves. // +// locked_msat_accounts_for_routing_fees +// Asserts the exact locked_msat (delivered amount + per-hop fee) for a single +// in-flight probe, proving fees are tracked and not just the delivered amount. +// // exhausted_probe_budget_blocks_new_probes // Samples locked_msat across multiple probe cycles and asserts it never // exceeds the configured max_locked_msat budget cap. @@ -200,6 +204,93 @@ async fn probe_budget_increments_and_decrements() { node_c.stop().unwrap(); } +/// Verifies that `locked_msat` accounts for routing fees, not just the delivered amount: +/// a probe along A→B→C locks `delivered amount + per-hop fee` on the first-hop channel. +/// +/// The budget is sized to exactly one probe's worth, so at most one probe is in flight and +/// the observed `locked_msat` is deterministic. The existing budget test only checks that it +/// is non-zero; this asserts the precise value, which a fees-excluded accounting would miss. +#[tokio::test(flavor = "multi_thread")] +async fn locked_msat_accounts_for_routing_fees() { + // First hop carries the delivered amount plus this per-hop fee (see `build_probe_path`). + const FIRST_HOP_FEE_MSAT: u64 = 1000; + const LOCKED_PER_PROBE_MSAT: u64 = PROBE_AMOUNT_MSAT + FIRST_HOP_FEE_MSAT; + + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + let node_b = setup_node(&chain_source, random_config(false)); + let node_c = setup_node(&chain_source, random_config(false)); + + let mut config_a = random_config(false); + let strategy = FixedPathStrategy::new(); + config_a.probing = Some( + ProbingConfigBuilder::custom(strategy.clone()) + .interval(Duration::from_millis(PROBING_INTERVAL_MILLISECONDS)) + // Budget for exactly one in-flight probe so locked_msat is deterministic. + .max_locked_msat(LOCKED_PER_PROBE_MSAT) + .build(), + ); + let node_a = setup_node(&chain_source, config_a); + + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a, addr_b], + Amount::from_sat(2_000_000), + ) + .await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; + node_b.sync_wallets().unwrap(); + open_channel(&node_b, &node_c, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + node_c.sync_wallets().unwrap(); + + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_event!(node_b, ChannelReady); + expect_event!(node_b, ChannelReady); + expect_event!(node_c, ChannelReady); + + strategy.set_path(build_probe_path(&node_a, &node_b, &node_c, PROBE_AMOUNT_MSAT)); + wait_for_channel_ready_to_send(&node_a, &node_b, LOCKED_PER_PROBE_MSAT).await; + wait_for_channel_ready_to_send(&node_b, &node_c, PROBE_AMOUNT_MSAT).await; + strategy.start_probing(); + + // Capture locked_msat the moment the first probe goes in flight. With a single-probe + // budget the value is only ever 0 or exactly one probe's worth, so the first non-zero + // reading is the full first-hop HTLC. + let locked = tokio::time::timeout(Duration::from_secs(30), async { + loop { + let locked = node_a.prober().unwrap().locked_msat(); + if locked > 0 { + break locked; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await + .expect("locked_msat never increased — no probe was dispatched"); + + assert_eq!( + locked, LOCKED_PER_PROBE_MSAT, + "locked_msat must equal the delivered amount plus routing fees, not just the delivered amount" + ); + + strategy.stop_probing(); + node_a.stop().unwrap(); + node_b.stop().unwrap(); + node_c.stop().unwrap(); +} + /// Verifies that `locked_msat` is restored after the node is stopped and restarted /// while a probe is still in flight. ///