55// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66// accordance with one or both of these licenses.
77
8- use std:: collections:: { BTreeMap , HashMap } ;
8+ use std:: collections:: { HashMap , HashSet } ;
99use std:: net:: SocketAddr ;
1010use std:: sync:: atomic:: { AtomicU32 , Ordering } ;
1111use std:: sync:: { Arc , Mutex , RwLock } ;
1212use std:: time:: { Duration , Instant , SystemTime , UNIX_EPOCH } ;
1313
14- use bdk_chain:: { BlockId , ConfirmationBlockTime , TxUpdate } ;
15- use bdk_wallet:: Update ;
14+ use bdk_chain:: indexer:: keychain_txout:: KeychainTxOutIndex ;
15+ use bdk_chain:: { BlockId , ConfirmationBlockTime , IndexedTxGraph , TxUpdate } ;
16+ use bdk_wallet:: { KeychainKind , Update } ;
1617use bip157:: chain:: { BlockHeaderChanges , ChainState } ;
1718use bip157:: error:: FetchBlockError ;
1819use bip157:: {
@@ -27,7 +28,7 @@ use lightning::util::ser::Writeable;
2728use tokio:: sync:: { mpsc, oneshot} ;
2829
2930use super :: { FeeSourceConfig , WalletSyncStatus } ;
30- use crate :: config:: { CbfSyncConfig , Config , BDK_CLIENT_STOP_GAP } ;
31+ use crate :: config:: { CbfSyncConfig , Config } ;
3132use crate :: error:: Error ;
3233use crate :: fee_estimator:: {
3334 apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target,
@@ -92,7 +93,7 @@ pub(super) struct CbfChainSource {
9293 /// Serializes concurrent filter scans (on-chain and lightning).
9394 scan_lock : tokio:: sync:: Mutex < ( ) > ,
9495 /// Scripts registered by LDK's Filter trait for lightning channel monitoring.
95- registered_scripts : Mutex < Vec < ScriptBuf > > ,
96+ registered_scripts : Mutex < HashSet < ScriptBuf > > ,
9697 /// Deduplicates concurrent on-chain wallet sync requests.
9798 onchain_wallet_sync_status : Mutex < WalletSyncStatus > ,
9899 /// Deduplicates concurrent lightning wallet sync requests.
@@ -150,7 +151,7 @@ impl CbfChainSource {
150151 let matched_block_hashes = Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ;
151152 let sync_completion_tx = Arc :: new ( Mutex :: new ( None ) ) ;
152153 let filter_skip_height = Arc :: new ( AtomicU32 :: new ( 0 ) ) ;
153- let registered_scripts = Mutex :: new ( Vec :: new ( ) ) ;
154+ let registered_scripts = Mutex :: new ( HashSet :: new ( ) ) ;
154155 let scan_lock = tokio:: sync:: Mutex :: new ( ( ) ) ;
155156 let onchain_wallet_sync_status = Mutex :: new ( WalletSyncStatus :: Completed ) ;
156157 let lightning_wallet_sync_status = Mutex :: new ( WalletSyncStatus :: Completed ) ;
@@ -520,12 +521,12 @@ impl CbfChainSource {
520521
521522 /// Register a transaction script for Lightning channel monitoring.
522523 pub ( crate ) fn register_tx ( & self , _txid : & Txid , script_pubkey : & Script ) {
523- self . registered_scripts . lock ( ) . expect ( "lock" ) . push ( script_pubkey. to_owned ( ) ) ;
524+ self . registered_scripts . lock ( ) . expect ( "lock" ) . insert ( script_pubkey. to_owned ( ) ) ;
524525 }
525526
526527 /// Register a watched output script for Lightning channel monitoring.
527528 pub ( crate ) fn register_output ( & self , output : WatchedOutput ) {
528- self . registered_scripts . lock ( ) . expect ( "lock" ) . push ( output. script_pubkey . clone ( ) ) ;
529+ self . registered_scripts . lock ( ) . expect ( "lock" ) . insert ( output. script_pubkey . clone ( ) ) ;
529530 }
530531
531532 /// Run a CBF filter scan: set watched scripts, trigger a rescan, wait for
@@ -591,17 +592,23 @@ impl CbfChainSource {
591592 let requester = self . requester ( ) ?;
592593 let now = Instant :: now ( ) ;
593594
594- let scripts = onchain_wallet. get_spks_for_cbf_sync ( BDK_CLIENT_STOP_GAP ) ;
595- if scripts. is_empty ( ) {
596- log_debug ! ( self . logger, "No wallet scripts to sync via CBF." ) ;
597- return Ok ( ( ) ) ;
598- }
595+ // Seed a sync-local IndexedTxGraph from a clone of the wallet's spk_index.
596+ // This carries descriptors, the configured lookahead, and the currently-revealed
597+ // range so we can both derive the SPK set to scan and observe new "used"
598+ // indices via `apply_block_relevant`. Mirrors bdk-kyoto's `UpdateBuilder`.
599+ let mut graph: IndexedTxGraph < ConfirmationBlockTime , KeychainTxOutIndex < KeychainKind > > =
600+ IndexedTxGraph :: new ( onchain_wallet. spk_index_clone ( ) ) ;
601+
602+ // Skip height: walk back from the wallet's persisted checkpoint by
603+ // REORG_SAFETY_BLOCKS. Survives restarts since BDK persists the chain.
604+ let skip_height =
605+ onchain_wallet. latest_checkpoint ( ) . height ( ) . checked_sub ( REORG_SAFETY_BLOCKS ) ;
599606
600607 let timeout_fut = tokio:: time:: timeout (
601608 Duration :: from_secs (
602609 self . sync_config . timeouts_config . onchain_wallet_sync_timeout_secs ,
603610 ) ,
604- self . sync_onchain_wallet_op ( requester, & onchain_wallet , scripts ) ,
611+ self . sync_onchain_wallet_op ( requester, & mut graph , skip_height ) ,
605612 ) ;
606613
607614 let ( tx_update, sync_update) = match timeout_fut. await {
@@ -612,6 +619,10 @@ impl CbfChainSource {
612619 } ,
613620 } ;
614621
622+ // Pull the high-water marks of observed derivation indices so BDK can
623+ // advance its reveal cursor past addresses that received funds.
624+ let last_active_indices = graph. index . last_used_indices ( ) ;
625+
615626 // Build chain checkpoint extending from the wallet's current tip,
616627 // using `insert` (not `push`) so that reorgs are handled correctly.
617628 // `insert` detects conflicting hashes and purges stale blocks,
@@ -625,8 +636,7 @@ impl CbfChainSource {
625636 let tip_block_id = BlockId { height : tip. height , hash : tip. hash } ;
626637 cp = cp. insert ( tip_block_id) ;
627638
628- let update =
629- Update { last_active_indices : BTreeMap :: new ( ) , tx_update, chain : Some ( cp) } ;
639+ let update = Update { last_active_indices, tx_update, chain : Some ( cp) } ;
630640
631641 onchain_wallet. apply_update ( update) ?;
632642
@@ -655,25 +665,22 @@ impl CbfChainSource {
655665 }
656666
657667 async fn sync_onchain_wallet_op (
658- & self , requester : Requester , onchain_wallet : & Wallet , scripts : Vec < ScriptBuf > ,
668+ & self , requester : Requester ,
669+ graph : & mut IndexedTxGraph < ConfirmationBlockTime , KeychainTxOutIndex < KeychainKind > > ,
670+ skip_height : Option < u32 > ,
659671 ) -> Result < ( TxUpdate < ConfirmationBlockTime > , SyncUpdate ) , Error > {
660- // Derive skip height from BDK's persisted checkpoint, walked back by
661- // REORG_SAFETY_BLOCKS for reorg safety (same approach as bdk-kyoto).
662- // This survives restarts since BDK persists its checkpoint chain.
663- //
664- // We include LDK-registered scripts (e.g., channel funding output
665- // scripts) alongside the wallet scripts. This ensures the on-chain
666- // wallet scan also fetches blocks containing channel funding
667- // transactions, whose outputs are needed by BDK's TxGraph to
668- // calculate fees for subsequent spends such as splice transactions.
669- // Without these, BDK's `calculate_fee` would fail with
670- // `MissingTxOut` because the parent transaction's outputs are
671- // unknown. This mirrors what the Bitcoind chain source does in
672- // `Wallet::block_connected` by inserting registered tx outputs.
673- let mut all_scripts = scripts;
672+ // Derive the SPK set from the indexer: every revealed key plus the configured
673+ // lookahead window per keychain. Mirrors bdk-kyoto's `peek_scripts` and ensures
674+ // we don't miss deposits one stop-gap past the current reveal frontier.
675+ let mut all_scripts: Vec < ScriptBuf > = peek_keychain_scripts ( & graph. index ) ;
676+
677+ // Include LDK-registered scripts (e.g. channel funding output scripts) so the
678+ // scan also fetches blocks containing those transactions; BDK needs their
679+ // outputs in its TxGraph to compute fees for subsequent spends (splices).
680+ // Without this, `calculate_fee` would fail with `MissingTxOut`. Mirrors what
681+ // the Bitcoind chain source does in `Wallet::block_connected`.
674682 all_scripts. extend ( self . registered_scripts . lock ( ) . expect ( "lock" ) . iter ( ) . cloned ( ) ) ;
675- let skip_height =
676- onchain_wallet. latest_checkpoint ( ) . height ( ) . checked_sub ( REORG_SAFETY_BLOCKS ) ;
683+
677684 let ( sync_update, matched) = self . run_filter_scan ( all_scripts, skip_height) . await ?;
678685
679686 log_debug ! (
@@ -682,10 +689,12 @@ impl CbfChainSource {
682689 matched. len( )
683690 ) ;
684691
685- // Fetch matching blocks and include all their transactions.
686- // The compact block filter already matched our scripts (covering both
687- // created outputs and spent inputs), so we include every transaction
688- // from matched blocks and let BDK determine relevance.
692+ // Fetch matched blocks. Feed each one to the IndexedTxGraph: this records
693+ // observed derivation indices in the keychain index (so the wallet can
694+ // advance its reveal cursor on apply_update) and also captures wallet-relevant
695+ // txs. We additionally collect *every* tx from matched blocks into `tx_update`
696+ // so LDK-registered txs (channel funding etc.) are known to BDK's graph for
697+ // later fee computation, even though they don't match the wallet keychain.
689698 let mut tx_update = TxUpdate :: default ( ) ;
690699 let per_request_timeout =
691700 Duration :: from_secs ( self . sync_config . timeouts_config . per_request_timeout_secs . into ( ) ) ;
@@ -705,6 +714,9 @@ impl CbfChainSource {
705714 let block_id = BlockId { height : * height, hash : block. header . block_hash ( ) } ;
706715 let conf_time =
707716 ConfirmationBlockTime { block_id, confirmation_time : block. header . time as u64 } ;
717+
718+ let _ = graph. apply_block_relevant ( & block, * height) ;
719+
708720 for tx in & block. txdata {
709721 let txid = tx. compute_txid ( ) ;
710722 tx_update. txs . push ( Arc :: new ( tx. clone ( ) ) ) ;
@@ -737,7 +749,8 @@ impl CbfChainSource {
737749 let requester = self . requester ( ) ?;
738750 let now = Instant :: now ( ) ;
739751
740- let scripts: Vec < ScriptBuf > = self . registered_scripts . lock ( ) . expect ( "lock" ) . clone ( ) ;
752+ let scripts: Vec < ScriptBuf > =
753+ self . registered_scripts . lock ( ) . expect ( "lock" ) . iter ( ) . cloned ( ) . collect ( ) ;
741754 if scripts. is_empty ( ) {
742755 log_debug ! ( self . logger, "No registered scripts for CBF lightning sync." ) ;
743756 } else {
@@ -1193,6 +1206,21 @@ impl CbfChainSource {
11931206 }
11941207}
11951208
1209+ /// SPKs to scan for on-chain wallet sync: every revealed key plus the configured
1210+ /// lookahead window per keychain. Mirrors bdk-kyoto's `UpdateBuilder::peek_scripts`.
1211+ fn peek_keychain_scripts ( index : & KeychainTxOutIndex < KeychainKind > ) -> Vec < ScriptBuf > {
1212+ let mut scripts = Vec :: new ( ) ;
1213+ let last_revealed = index. last_revealed_indices ( ) ;
1214+ let lookahead = index. lookahead ( ) ;
1215+ for keychain in [ KeychainKind :: External , KeychainKind :: Internal ] {
1216+ let Some ( spk_iter) = index. unbounded_spk_iter ( keychain) else { continue } ;
1217+ let frontier = last_revealed. get ( & keychain) . copied ( ) . unwrap_or ( 0 ) ;
1218+ let bound = ( frontier + lookahead) as usize ;
1219+ scripts. extend ( spk_iter. take ( bound) . map ( |( _, spk) | spk) ) ;
1220+ }
1221+ scripts
1222+ }
1223+
11961224/// Record the current timestamp in a `NodeMetrics` field and persist the metrics.
11971225fn update_node_metrics_timestamp (
11981226 node_metrics : & RwLock < NodeMetrics > , kv_store : & DynStore , logger : & Logger ,
0 commit comments