@@ -10,7 +10,8 @@ use bdk_sp::{
1010 self ,
1111 address:: NetworkUnchecked ,
1212 bip32,
13- consensus:: Decodable ,
13+ consensus:: { deserialize, Decodable } ,
14+ hashes:: Hash ,
1415 hex:: { DisplayHex , FromHex } ,
1516 key:: Secp256k1 ,
1617 script:: PushBytesBuf ,
@@ -34,6 +35,7 @@ use bdk_sp_oracles::{
3435 TrustedPeer , UnboundedReceiver , Warning ,
3536 } ,
3637 filters:: kyoto:: { FilterEvent , FilterSubscriber } ,
38+ frigate:: { FrigateClient , History , SubscribeRequest , UnsubscribeRequest , DUMMY_COINBASE } ,
3739 tweaks:: blindbit:: { BlindbitSubscriber , TweakEvent } ,
3840} ;
3941use bdk_sp_wallet:: {
@@ -161,6 +163,16 @@ pub enum Commands {
161163 #[ clap( long) ]
162164 hash : Option < BlockHash > ,
163165 } ,
166+
167+ ScanFrigate {
168+ #[ clap( flatten) ]
169+ rpc_args : RpcArgs ,
170+ #[ clap( long) ]
171+ height : Option < u32 > ,
172+ #[ clap( long) ]
173+ hash : Option < BlockHash > ,
174+ } ,
175+
164176 Create {
165177 /// Network
166178 #[ clap( long, short, default_value = "signet" ) ]
@@ -561,6 +573,167 @@ async fn main() -> anyhow::Result<()> {
561573 ) ;
562574 }
563575 }
576+ Commands :: ScanFrigate {
577+ rpc_args,
578+ height,
579+ hash,
580+ } => {
581+ // The implementation done here differs from what is mentioned in the section
582+ // https://github.com/sparrowwallet/frigate/tree/master?tab=readme-ov-file#blockchainsilentpaymentssubscribe
583+ // This implementation is doing a one time scanning only. So instead of calling
584+ // `blockchain.scripthash.subscribe` on each script from the wallet, we just subscribe
585+ // and read the scanning result from the stream. On each result received we update the
586+ // wallet state and once scanning progress reaches 1.0 (100%) we stop.
587+ let sync_point = if let ( Some ( height) , Some ( hash) ) = ( height, hash) {
588+ HeaderCheckpoint :: new ( height, hash)
589+ } else if wallet. birthday . height <= wallet. chain ( ) . tip ( ) . height ( ) {
590+ let height = wallet. chain ( ) . tip ( ) . height ( ) ;
591+ let hash = wallet. chain ( ) . tip ( ) . hash ( ) ;
592+ HeaderCheckpoint :: new ( height, hash)
593+ } else {
594+ let checkpoint = wallet
595+ . chain ( )
596+ . get ( wallet. birthday . height )
597+ . expect ( "should be something" ) ;
598+ let height = checkpoint. height ( ) ;
599+ let hash = checkpoint. hash ( ) ;
600+ HeaderCheckpoint :: new ( height, hash)
601+ } ;
602+
603+ let mut client = FrigateClient :: connect ( & rpc_args. url )
604+ . await
605+ . unwrap ( )
606+ . with_timeout ( tokio:: time:: Duration :: from_secs ( 60 ) ) ;
607+
608+ let labels = wallet
609+ . indexer ( )
610+ . index ( )
611+ . num_to_label
612+ . clone ( )
613+ . into_keys ( )
614+ . collect :: < Vec < u32 > > ( ) ;
615+ let labels = if !labels. is_empty ( ) {
616+ Some ( labels)
617+ } else {
618+ None
619+ } ;
620+
621+ let subscribe_params = SubscribeRequest {
622+ scan_priv_key : * wallet. indexer ( ) . scan_sk ( ) ,
623+ spend_pub_key : * wallet. indexer ( ) . spend_pk ( ) ,
624+ start_height : Some ( sync_point. height ) ,
625+ labels,
626+ } ;
627+
628+ // Attempt to subscribe; any timeout will trigger unsubscribe automatically.
629+ match client. subscribe_with_timeout ( & subscribe_params) . await {
630+ Ok ( Some ( ( histories, progress) ) ) => {
631+ tracing:: info!(
632+ "Initial subscription result: {} histories, progress {}" ,
633+ histories. len( ) ,
634+ progress
635+ ) ;
636+ }
637+ Ok ( None ) => {
638+ tracing:: info!( "Subscription acknowledged, awaiting notifications" ) ;
639+ }
640+ Err ( e) => {
641+ tracing:: error!( "Subscribe failed: {}" , e) ;
642+ return Err ( e. into ( ) ) ;
643+ }
644+ }
645+
646+ tracing:: info!( "Starting frigate scanning loop..." ) ;
647+ loop {
648+ match client. read_from_stream ( 4096 ) . await {
649+ Ok ( subscribe_result) => {
650+ if subscribe_result[ "params" ] . is_object ( ) {
651+ let histories: Vec < History > = serde_json:: from_value (
652+ subscribe_result[ "params" ] [ "history" ] . clone ( ) ,
653+ ) ?;
654+ let progress = subscribe_result[ "params" ] [ "progress" ]
655+ . as_f64 ( )
656+ . unwrap_or ( 0.0 ) as f32 ;
657+
658+ let mut secrets_by_height: HashMap < u32 , HashMap < Txid , PublicKey > > =
659+ HashMap :: new ( ) ;
660+
661+ tracing:: debug!( "Received history {:#?}" , histories) ;
662+
663+ histories. iter ( ) . for_each ( |h| {
664+ secrets_by_height
665+ . entry ( h. height )
666+ . and_modify ( |v| {
667+ v. insert ( h. tx_hash , h. tweak_key ) ;
668+ } )
669+ . or_insert ( HashMap :: from ( [ ( h. tx_hash , h. tweak_key ) ] ) ) ;
670+ } ) ;
671+
672+ // Filter when the height is 0, because that would mean mempool transaction
673+ for secret in secrets_by_height. into_iter ( ) . filter ( |v| v. 0 > 0 ) {
674+ // Since frigate doesn't provide a blockchain.getblock we will mimick that here
675+ // By constructing a block from the block header and the list of transactions
676+ // received from the scan request
677+ let mut raw_blk = client. get_block_header ( secret. 0 ) . await . unwrap ( ) ;
678+ raw_blk. push_str ( "00" ) ;
679+
680+ // Push dummy coinbase
681+ let coinbase: Transaction =
682+ deserialize ( & Vec :: < u8 > :: from_hex ( DUMMY_COINBASE ) . unwrap ( ) )
683+ . unwrap ( ) ;
684+ let mut block: Block =
685+ deserialize ( & Vec :: < u8 > :: from_hex ( & raw_blk) . unwrap ( ) ) . unwrap ( ) ;
686+
687+ let mut blockhash = BlockHash :: all_zeros ( ) ;
688+
689+ let mut txs: Vec < Transaction > = vec ! [ coinbase] ;
690+ for key in secret. 1 . keys ( ) {
691+ let tx_result =
692+ client. get_transaction ( key. to_string ( ) ) . await . unwrap ( ) ;
693+ let tx: Transaction =
694+ deserialize ( & Vec :: < u8 > :: from_hex ( & tx_result. 1 ) . unwrap ( ) )
695+ . unwrap ( ) ;
696+ txs. push ( tx) ;
697+
698+ blockhash = BlockHash :: from_str ( & tx_result. 0 ) . unwrap ( ) ;
699+ }
700+
701+ block. txdata = txs;
702+ tracing:: debug!( "Final block {:?}" , block) ;
703+ wallet. apply_block_relevant ( & block, secret. 1 , secret. 0 ) ;
704+
705+ tracing:: debug!( "Checkpoint hash {blockhash:?}" ) ;
706+ let checkpoint = wallet. chain ( ) . tip ( ) . insert ( BlockId {
707+ height : secret. 0 ,
708+ hash : blockhash,
709+ } ) ;
710+ wallet. update_chain ( checkpoint) ;
711+ }
712+
713+ tracing:: info!( "Progress {progress}" ) ;
714+ // Check the progress
715+ if progress >= 1.0 {
716+ tracing:: info!( "Scanning completed" ) ;
717+ break ;
718+ }
719+ }
720+ }
721+ Err ( e) if e. to_string ( ) . contains ( "timed out" ) => {
722+ tracing:: warn!( "read_from_stream timeout, exiting scan" ) ;
723+ let unsubscribe_request = UnsubscribeRequest {
724+ scan_privkey : * wallet. indexer ( ) . scan_sk ( ) ,
725+ spend_pubkey : * wallet. indexer ( ) . spend_pk ( ) ,
726+ } ;
727+ let _ = client. unsubscribe ( & unsubscribe_request) . await ;
728+ break ;
729+ }
730+ Err ( e) => {
731+ tracing:: error!( "read_from_stream error: {}" , e) ;
732+ return Err ( e. into ( ) ) ;
733+ }
734+ }
735+ }
736+ }
564737 Commands :: Balance => {
565738 fn print_balances < ' a > (
566739 title_str : & ' a str ,
0 commit comments