1010#![ warn( missing_docs) ]
1111
1212use bdk_core:: { BlockId , CheckPoint } ;
13- use bitcoin:: { block:: Header , Block , BlockHash , Transaction } ;
13+ use bitcoin:: { block:: Header , Block , BlockHash , Transaction , Txid } ;
1414use bitcoincore_rpc:: bitcoincore_rpc_json;
15+ use std:: collections:: HashSet ;
1516
1617pub mod bip158;
1718
@@ -44,8 +45,7 @@ pub struct Emitter<'c, C> {
4445 /// there has been a reorg since our last mempool emission.
4546 last_mempool_tip : Option < u32 > ,
4647
47- /// Unconfirmed txids that are expected to appear in mempool. This is used to determine if any
48- /// known txids have been evicted.
48+ /// Expected mempool txs. TODO: Docs.
4949 expected_mempool_txids : HashSet < Txid > ,
5050}
5151
@@ -57,9 +57,6 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
5757 ///
5858 /// `start_height` starts emission from a given height (if there are no conflicts with the
5959 /// original chain).
60- ///
61- /// `expected_mempool_txids` is the initial set of unconfirmed txids. Once at tip, any that are
62- /// no longer in mempool are marked evicted.
6360 pub fn new (
6461 client : & ' c C ,
6562 last_cp : CheckPoint ,
@@ -73,27 +70,23 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
7370 last_block : None ,
7471 last_mempool_time : 0 ,
7572 last_mempool_tip : None ,
73+ expected_mempool_txids,
7674 }
7775 }
7876
79- /// Emit mempool transactions and any evicted [`Txid`]s. Returns a `latest_update_time` which is
80- /// used for setting the timestamp for evicted transactions.
77+ /// Emit mempool transactions and capture the initial snapshot of all mempool [`Txid`]s.
8178 ///
8279 /// This method returns a [`MempoolEvent`] containing the full transactions (with their
83- /// first-seen unix timestamps) that were emitted, and [`MempoolEvent::evicted_txids`] which are
84- /// any [`Txid`]s which were previously expected and are now missing from the mempool. Note that
85- /// [`Txid`]s are only evicted if the emitter is at the chain tip with the same height and hash
86- /// as the best block from the RPC.
87- ///
88- /// This method emits each transaction only once, unless we cannot guarantee the transaction's
89- /// ancestors are already emitted.
80+ /// first-seen unix timestamps) that were emitted, and the set of all [`Txid`]s present from the
81+ /// initial mempool query. Each transaction is emitted only once, unless we cannot guarantee the
82+ /// transaction's ancestors are already emitted.
9083 ///
9184 /// To understand why, consider a receiver which filters transactions based on whether it
9285 /// alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction spends a
9386 /// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block
9487 /// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
9588 /// at height `h`.
96- pub fn mempool ( & mut self ) -> Result < Vec < ( Transaction , u64 ) > , bitcoincore_rpc:: Error > {
89+ pub fn mempool ( & mut self ) -> Result < MempoolEvent , bitcoincore_rpc:: Error > {
9790 let client = self . client ;
9891
9992 // This is the emitted tip height during the last mempool emission.
@@ -103,20 +96,12 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
10396 // `start_height` has been emitted.
10497 . unwrap_or ( self . start_height . saturating_sub ( 1 ) ) ;
10598
106- // Get the raw mempool result from the RPC client which will be used to determine if any
107- // transactions have been evicted.
99+ // Get the raw mempool result from the RPC client.
108100 let raw_mempool = client. get_raw_mempool_verbose ( ) ?;
109101 let raw_mempool_txids: HashSet < Txid > = raw_mempool. keys ( ) . copied ( ) . collect ( ) ;
110102
111- // Determine if height and hash matches the best block from the RPC. Evictions are deferred
112- // if we are not at the best block.
113- let height = client. get_block_count ( ) ?;
114- let at_tip = if height != self . last_cp . height ( ) as u64 {
115- false
116- } else {
117- // Verify if block hash matches in case of re-org.
118- client. get_block_hash ( height) ? == self . last_cp . hash ( )
119- } ;
103+ // Determine if height matches last emitted block.
104+ let at_tip = client. get_block_count ( ) ? == self . last_cp . height ( ) as u64 ;
120105
121106 // If at tip, any expected txid missing from raw mempool is considered evicted;
122107 // if not at tip, we don't evict anything.
@@ -136,17 +121,16 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
136121 let prev_mempool_time = self . last_mempool_time ;
137122 let mut latest_time = prev_mempool_time;
138123
139- let txs_to_emit = client
140- . get_raw_mempool_verbose ( ) ?
124+ let new_txs = raw_mempool
141125 . into_iter ( )
142126 . filter_map ( {
143127 let latest_time = & mut latest_time;
128+ let evicted_txids = & mut evicted_txids;
144129 move |( txid, tx_entry) | -> Option < Result < _ , bitcoincore_rpc:: Error > > {
145130 let tx_time = tx_entry. time as usize ;
146131 if tx_time > * latest_time {
147132 * latest_time = tx_time;
148133 }
149-
150134 // Avoid emitting transactions that are already emitted if we can guarantee
151135 // blocks containing ancestors are already emitted. The bitcoind rpc interface
152136 // provides us with the block height that the tx is introduced to the mempool.
@@ -157,29 +141,33 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
157141 if is_already_emitted && is_within_height {
158142 return None ;
159143 }
160-
161144 let tx = match client. get_raw_transaction ( & txid, None ) {
162145 Ok ( tx) => tx,
163- // the tx is confirmed or evicted since `get_raw_mempool_verbose`
164- Err ( err) if err. is_not_found_error ( ) => return None ,
146+ Err ( err) if err. is_not_found_error ( ) => {
147+ // If at tip and the transaction isn't found, mark it as evicted.
148+ if at_tip {
149+ evicted_txids. insert ( txid) ;
150+ }
151+ return None ;
152+ }
165153 Err ( err) => return Some ( Err ( err) ) ,
166154 } ;
167-
168155 Some ( Ok ( ( tx, tx_time as u64 ) ) )
169156 }
170157 } )
171158 . collect :: < Result < Vec < _ > , _ > > ( ) ?;
172159
173160 self . last_mempool_time = latest_time;
174161 self . last_mempool_tip = Some ( self . last_cp . height ( ) ) ;
175-
176- // If at tip, we replace `expected_mempool_txids` with just the new txids. Otherwise, we’re
177- // still catching up to the tip and keep accumulating.
178162 if at_tip {
179163 self . expected_mempool_txids = new_txs. iter ( ) . map ( |( tx, _) | tx. compute_txid ( ) ) . collect ( ) ;
180164 } else {
181- self . expected_mempool_txids
182- . extend ( new_txs. iter ( ) . map ( |( tx, _) | tx. compute_txid ( ) ) ) ;
165+ self . expected_mempool_txids . extend (
166+ new_txs
167+ . iter ( )
168+ . map ( |( tx, _) | tx. compute_txid ( ) )
169+ . collect :: < Vec < _ > > ( ) ,
170+ ) ;
183171 }
184172
185173 Ok ( MempoolEvent {
@@ -198,7 +186,6 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
198186 /// Emit the next block height and block (if any).
199187 pub fn next_block ( & mut self ) -> Result < Option < BlockEvent < Block > > , bitcoincore_rpc:: Error > {
200188 if let Some ( ( checkpoint, block) ) = poll ( self , |hash| self . client . get_block ( hash) ) ? {
201- // Stop tracking unconfirmed transactions that have been confirmed in this block.
202189 for tx in & block. txdata {
203190 self . expected_mempool_txids . remove ( & tx. compute_txid ( ) ) ;
204191 }
@@ -414,3 +401,77 @@ impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
414401 }
415402 }
416403}
404+
405+ #[ cfg( test) ]
406+ mod test {
407+ use crate :: { bitcoincore_rpc:: RpcApi , Emitter } ;
408+ use bdk_bitcoind_rpc:: bitcoincore_rpc:: bitcoin:: Txid ;
409+ use bdk_chain:: local_chain:: LocalChain ;
410+ use bdk_testenv:: { anyhow, TestEnv } ;
411+ use bitcoin:: { hashes:: Hash , Address , Amount , ScriptBuf , WScriptHash } ;
412+ use std:: collections:: HashSet ;
413+
414+ #[ test]
415+ fn test_expected_mempool_txids_accumulate_and_remove ( ) -> anyhow:: Result < ( ) > {
416+ let env = TestEnv :: new ( ) ?;
417+ let chain = LocalChain :: from_genesis_hash ( env. rpc_client ( ) . get_block_hash ( 0 ) ?) . 0 ;
418+ let chain_tip = chain. tip ( ) ;
419+ let mut emitter = Emitter :: new ( env. rpc_client ( ) , chain_tip. clone ( ) , 1 , HashSet :: new ( ) ) ;
420+
421+ env. mine_blocks ( 100 , None ) ?;
422+ while emitter. next_block ( ) ?. is_some ( ) { }
423+
424+ let spk_to_track = ScriptBuf :: new_p2wsh ( & WScriptHash :: all_zeros ( ) ) ;
425+ let addr_to_track = Address :: from_script ( & spk_to_track, bitcoin:: Network :: Regtest ) ?;
426+ let mut mempool_txids = HashSet :: new ( ) ;
427+
428+ // Send a tx at different heights and ensure txs are accumulating in expected_mempool_txids.
429+ for _ in 0 ..10 {
430+ let sent_txid = env. send ( & addr_to_track, Amount :: from_sat ( 1_000 ) ) ?;
431+ mempool_txids. insert ( sent_txid) ;
432+ emitter. mempool ( ) ?;
433+ env. mine_blocks ( 1 , None ) ?;
434+
435+ for txid in & mempool_txids {
436+ assert ! (
437+ emitter. expected_mempool_txids. contains( txid) ,
438+ "Expected txid {:?} missing" ,
439+ txid
440+ ) ;
441+ }
442+ }
443+
444+ // Process each block and check that confirmed txids are removed from from
445+ // expected_mempool_txids.
446+ while let Some ( block_event) = emitter. next_block ( ) ? {
447+ let confirmed_txids: HashSet < Txid > = block_event
448+ . block
449+ . txdata
450+ . iter ( )
451+ . map ( |tx| tx. compute_txid ( ) )
452+ . collect ( ) ;
453+ mempool_txids = mempool_txids
454+ . difference ( & confirmed_txids)
455+ . copied ( )
456+ . collect :: < HashSet < _ > > ( ) ;
457+ for txid in confirmed_txids {
458+ assert ! (
459+ !emitter. expected_mempool_txids. contains( & txid) ,
460+ "Expected txid {:?} should have been removed" ,
461+ txid
462+ ) ;
463+ }
464+ for txid in & mempool_txids {
465+ assert ! (
466+ emitter. expected_mempool_txids. contains( txid) ,
467+ "Expected txid {:?} missing" ,
468+ txid
469+ ) ;
470+ }
471+ }
472+
473+ assert ! ( emitter. expected_mempool_txids. is_empty( ) ) ;
474+
475+ Ok ( ( ) )
476+ }
477+ }
0 commit comments