@@ -16,6 +16,7 @@ use std::sync::{Arc, Mutex};
1616use std:: time:: { Duration , Instant } ;
1717
1818use bitcoin:: secp256k1:: PublicKey ;
19+ use lightning:: ln:: channelmanager:: PaymentId ;
1920use lightning:: routing:: gossip:: NodeId ;
2021use lightning:: routing:: router:: {
2122 Path , PaymentParameters , RouteHop , RouteParameters , MAX_PATH_LENGTH_ESTIMATE ,
@@ -654,6 +655,7 @@ pub struct Prober {
654655 /// Maximum total millisatoshis that may be locked in in-flight probes at any time.
655656 pub max_locked_msat : u64 ,
656657 pub ( crate ) locked_msat : Arc < AtomicU64 > ,
658+ pub ( crate ) inflight_probes : Mutex < HashMap < PaymentId , u64 > > ,
657659}
658660
659661fn fmt_path ( path : & lightning:: routing:: router:: Path ) -> String {
@@ -670,8 +672,7 @@ impl Prober {
670672 self . locked_msat . load ( Ordering :: Relaxed )
671673 }
672674
673- pub ( crate ) fn handle_probe_successful ( & self , path : & lightning:: routing:: router:: Path ) {
674- let amount: u64 = path. hops . iter ( ) . map ( |h| h. fee_msat ) . sum ( ) ;
675+ pub ( crate ) fn handle_background_probe_successful ( & self , path : & Path , amount : u64 ) {
675676 let prev = self
676677 . locked_msat
677678 . fetch_update ( Ordering :: AcqRel , Ordering :: Acquire , |v| Some ( v. saturating_sub ( amount) ) )
@@ -687,8 +688,7 @@ impl Prober {
687688 ) ;
688689 }
689690
690- pub ( crate ) fn handle_probe_failed ( & self , path : & lightning:: routing:: router:: Path ) {
691- let amount: u64 = path. hops . iter ( ) . map ( |h| h. fee_msat ) . sum ( ) ;
691+ pub ( crate ) fn handle_background_probe_failed ( & self , path : & Path , amount : u64 ) {
692692 let prev = self
693693 . locked_msat
694694 . fetch_update ( Ordering :: AcqRel , Ordering :: Acquire , |v| Some ( v. saturating_sub ( amount) ) )
@@ -727,9 +727,16 @@ pub(crate) async fn run_prober(prober: Arc<Prober>, mut stop_rx: tokio::sync::wa
727727 log_debug!( prober. logger, "Skipping probe: locked-msat budget exceeded." ) ;
728728 continue ;
729729 }
730+ // Hold `inflight_probes` across `send_probe` so the event handler in
731+ // `event.rs` (which acquires the same lock to remove the entry) cannot
732+ // observe a `ProbeSuccessful`/`ProbeFailed` for a payment_id we have not
733+ // yet inserted, which would leave `locked_msat` permanently incremented.
734+ let mut inflight = prober. inflight_probes. lock( ) . expect( "lock" ) ;
730735 match prober. channel_manager. send_probe( path. clone( ) ) {
731- Ok ( _) => {
736+ Ok ( ( _, payment_id) ) => {
737+ inflight. insert( payment_id, amount) ;
732738 prober. locked_msat. fetch_add( amount, Ordering :: Release ) ;
739+ drop( inflight) ;
733740 log_debug!(
734741 prober. logger,
735742 "Probe sent: locked {} msat, path: {}" ,
@@ -738,6 +745,7 @@ pub(crate) async fn run_prober(prober: Arc<Prober>, mut stop_rx: tokio::sync::wa
738745 ) ;
739746 }
740747 Err ( e) => {
748+ drop( inflight) ;
741749 log_debug!(
742750 prober. logger,
743751 "Probe send failed: {:?}, path: {}" ,
0 commit comments