77
88use std:: collections:: { BTreeMap , HashMap } ;
99use std:: net:: SocketAddr ;
10- use std:: sync:: atomic:: { AtomicBool , AtomicU32 , Ordering } ;
10+ use std:: sync:: atomic:: { AtomicU32 , Ordering } ;
1111use std:: sync:: { Arc , Mutex , RwLock } ;
1212use std:: time:: { Duration , Instant , SystemTime , UNIX_EPOCH } ;
1313
@@ -43,6 +43,11 @@ const MIN_FEERATE_SAT_PER_KWU: u64 = 250;
4343/// Number of recent blocks to look back for per-target fee rate estimation.
4444const FEE_RATE_LOOKBACK_BLOCKS : usize = 6 ;
4545
46+ /// Number of blocks to walk back from a component's persisted best block height
47+ /// for reorg safety when computing the incremental scan skip height.
48+ /// Matches bdk-kyoto's `IMPOSSIBLE_REORG_DEPTH`.
49+ const REORG_SAFETY_BLOCKS : u32 = 7 ;
50+
4651/// The fee estimation back-end used by the CBF chain source.
4752enum FeeSource {
4853 /// Derive fee rates from the coinbase reward of recent blocks.
@@ -82,12 +87,6 @@ pub(super) struct CbfChainSource {
8287 scan_lock : tokio:: sync:: Mutex < ( ) > ,
8388 /// Scripts registered by LDK's Filter trait for lightning channel monitoring.
8489 registered_scripts : Mutex < Vec < ScriptBuf > > ,
85- /// Set when new scripts are registered; forces a full rescan on next lightning sync.
86- lightning_scripts_dirty : Arc < AtomicBool > ,
87- /// Last block height reached by on-chain wallet sync, used for incremental scans.
88- last_onchain_synced_height : Arc < Mutex < Option < u32 > > > ,
89- /// Last block height reached by lightning wallet sync, used for incremental scans.
90- last_lightning_synced_height : Arc < Mutex < Option < u32 > > > ,
9190 /// Deduplicates concurrent on-chain wallet sync requests.
9291 onchain_wallet_sync_status : Mutex < WalletSyncStatus > ,
9392 /// Deduplicates concurrent lightning wallet sync requests.
@@ -116,9 +115,6 @@ struct CbfEventState {
116115 matched_block_hashes : Arc < Mutex < Vec < ( u32 , BlockHash ) > > > ,
117116 sync_completion_tx : Arc < Mutex < Option < oneshot:: Sender < SyncUpdate > > > > ,
118117 filter_skip_height : Arc < AtomicU32 > ,
119- last_onchain_synced_height : Arc < Mutex < Option < u32 > > > ,
120- last_lightning_synced_height : Arc < Mutex < Option < u32 > > > ,
121- lightning_scripts_dirty : Arc < AtomicBool > ,
122118}
123119
124120impl CbfChainSource {
@@ -149,10 +145,7 @@ impl CbfChainSource {
149145 let sync_completion_tx = Arc :: new ( Mutex :: new ( None ) ) ;
150146 let filter_skip_height = Arc :: new ( AtomicU32 :: new ( 0 ) ) ;
151147 let registered_scripts = Mutex :: new ( Vec :: new ( ) ) ;
152- let lightning_scripts_dirty = Arc :: new ( AtomicBool :: new ( true ) ) ;
153148 let scan_lock = tokio:: sync:: Mutex :: new ( ( ) ) ;
154- let last_onchain_synced_height = Arc :: new ( Mutex :: new ( None ) ) ;
155- let last_lightning_synced_height = Arc :: new ( Mutex :: new ( None ) ) ;
156149 let onchain_wallet_sync_status = Mutex :: new ( WalletSyncStatus :: Completed ) ;
157150 let lightning_wallet_sync_status = Mutex :: new ( WalletSyncStatus :: Completed ) ;
158151 Ok ( Self {
@@ -166,10 +159,7 @@ impl CbfChainSource {
166159 sync_completion_tx,
167160 filter_skip_height,
168161 registered_scripts,
169- lightning_scripts_dirty,
170162 scan_lock,
171- last_onchain_synced_height,
172- last_lightning_synced_height,
173163 onchain_wallet_sync_status,
174164 lightning_wallet_sync_status,
175165 fee_estimator,
@@ -249,9 +239,6 @@ impl CbfChainSource {
249239 matched_block_hashes : Arc :: clone ( & self . matched_block_hashes ) ,
250240 sync_completion_tx : Arc :: clone ( & self . sync_completion_tx ) ,
251241 filter_skip_height : Arc :: clone ( & self . filter_skip_height ) ,
252- last_onchain_synced_height : Arc :: clone ( & self . last_onchain_synced_height ) ,
253- last_lightning_synced_height : Arc :: clone ( & self . last_lightning_synced_height ) ,
254- lightning_scripts_dirty : Arc :: clone ( & self . lightning_scripts_dirty ) ,
255242 } ;
256243 let event_logger = Arc :: clone ( & self . logger ) ;
257244 runtime. spawn_cancellable_background_task ( Self :: process_events (
@@ -320,24 +307,9 @@ impl CbfChainSource {
320307 accepted. len( ) ,
321308 ) ;
322309
323- // Reset synced heights to just before the earliest reorganized
324- // block so the next incremental scan covers the affected range.
325- if let Some ( min_reorg_height) = reorganized. iter ( ) . map ( |h| h. height ) . min ( ) {
326- let reset_height = if min_reorg_height > 0 {
327- Some ( min_reorg_height - 1 )
328- } else {
329- None
330- } ;
331- * state. last_onchain_synced_height . lock ( ) . unwrap ( ) = reset_height;
332- * state. last_lightning_synced_height . lock ( ) . unwrap ( ) = reset_height;
333- state. lightning_scripts_dirty . store ( true , Ordering :: Release ) ;
334- log_debug ! (
335- logger,
336- "Reset synced heights to {:?} due to reorg at height {}." ,
337- reset_height,
338- min_reorg_height,
339- ) ;
340- }
310+ // No height reset needed: skip heights are derived from
311+ // BDK's checkpoint (on-chain) and LDK's best block
312+ // (lightning), both walked back by REORG_SAFETY_BLOCKS.
341313 } ,
342314 BlockHeaderChanges :: Connected ( header) => {
343315 log_trace ! ( logger, "CBF block connected at height {}" , header. height, ) ;
@@ -382,13 +354,11 @@ impl CbfChainSource {
382354 /// Register a transaction script for Lightning channel monitoring.
383355 pub ( crate ) fn register_tx ( & self , _txid : & Txid , script_pubkey : & Script ) {
384356 self . registered_scripts . lock ( ) . unwrap ( ) . push ( script_pubkey. to_owned ( ) ) ;
385- self . lightning_scripts_dirty . store ( true , Ordering :: Release ) ;
386357 }
387358
388359 /// Register a watched output script for Lightning channel monitoring.
389360 pub ( crate ) fn register_output ( & self , output : WatchedOutput ) {
390361 self . registered_scripts . lock ( ) . unwrap ( ) . push ( output. script_pubkey . clone ( ) ) ;
391- self . lightning_scripts_dirty . store ( true , Ordering :: Release ) ;
392362 }
393363
394364 /// Run a CBF filter scan: set watched scripts, trigger a rescan, wait for
@@ -458,7 +428,7 @@ impl CbfChainSource {
458428 Duration :: from_secs (
459429 self . sync_config . timeouts_config . onchain_wallet_sync_timeout_secs ,
460430 ) ,
461- self . sync_onchain_wallet_op ( requester, scripts) ,
431+ self . sync_onchain_wallet_op ( requester, & onchain_wallet , scripts) ,
462432 ) ;
463433
464434 let ( tx_update, sync_update) = match timeout_fut. await {
@@ -513,12 +483,11 @@ impl CbfChainSource {
513483 }
514484
515485 async fn sync_onchain_wallet_op (
516- & self , requester : Requester , scripts : Vec < ScriptBuf > ,
486+ & self , requester : Requester , onchain_wallet : & Wallet , scripts : Vec < ScriptBuf > ,
517487 ) -> Result < ( TxUpdate < ConfirmationBlockTime > , SyncUpdate ) , Error > {
518- // Always do a full scan (skip_height=None) for the on-chain wallet.
519- // Unlike the Lightning wallet which can rely on reorg_queue events,
520- // the on-chain wallet needs to see all blocks to correctly detect
521- // reorgs via checkpoint comparison in the caller.
488+ // Derive skip height from BDK's persisted checkpoint, walked back by
489+ // REORG_SAFETY_BLOCKS for reorg safety (same approach as bdk-kyoto).
490+ // This survives restarts since BDK persists its checkpoint chain.
522491 //
523492 // We include LDK-registered scripts (e.g., channel funding output
524493 // scripts) alongside the wallet scripts. This ensures the on-chain
@@ -530,9 +499,10 @@ impl CbfChainSource {
530499 // unknown. This mirrors what the Bitcoind chain source does in
531500 // `Wallet::block_connected` by inserting registered tx outputs.
532501 let mut all_scripts = scripts;
533- // we query all registered scripts, not only BDK-related
534502 all_scripts. extend ( self . registered_scripts . lock ( ) . unwrap ( ) . iter ( ) . cloned ( ) ) ;
535- let ( sync_update, matched) = self . run_filter_scan ( all_scripts, None ) . await ?;
503+ let skip_height =
504+ onchain_wallet. latest_checkpoint ( ) . height ( ) . checked_sub ( REORG_SAFETY_BLOCKS ) ;
505+ let ( sync_update, matched) = self . run_filter_scan ( all_scripts, skip_height) . await ?;
536506
537507 log_debug ! (
538508 self . logger,
@@ -561,9 +531,6 @@ impl CbfChainSource {
561531 }
562532 }
563533
564- let tip = sync_update. tip ( ) ;
565- * self . last_onchain_synced_height . lock ( ) . unwrap ( ) = Some ( tip. height ) ;
566-
567534 Ok ( ( tx_update, sync_update) )
568535 }
569536
@@ -644,9 +611,8 @@ impl CbfChainSource {
644611 & self , requester : Requester , channel_manager : Arc < ChannelManager > ,
645612 chain_monitor : Arc < ChainMonitor > , output_sweeper : Arc < Sweeper > , scripts : Vec < ScriptBuf > ,
646613 ) -> Result < ( ) , Error > {
647- let scripts_dirty = self . lightning_scripts_dirty . load ( Ordering :: Acquire ) ;
648614 let skip_height =
649- if scripts_dirty { None } else { * self . last_lightning_synced_height . lock ( ) . unwrap ( ) } ;
615+ channel_manager . current_best_block ( ) . height . checked_sub ( REORG_SAFETY_BLOCKS ) ;
650616 let ( sync_update, matched) = self . run_filter_scan ( scripts, skip_height) . await ?;
651617
652618 log_debug ! (
@@ -681,9 +647,6 @@ impl CbfChainSource {
681647 }
682648 }
683649
684- * self . last_lightning_synced_height . lock ( ) . unwrap ( ) = Some ( tip. height ) ;
685- self . lightning_scripts_dirty . store ( false , Ordering :: Release ) ;
686-
687650 Ok ( ( ) )
688651 }
689652
0 commit comments