44//! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`].
55//!
66//! To only get block updates (exclude mempool transactions), the caller can use
7- //! [`Emitter::next_block`] or/and [`Emitter::next_header`] until it returns `Ok(None)` (which means
8- //! the chain tip is reached). A separate method, [`Emitter::mempool`] can be used to emit the whole
9- //! mempool.
7+ //! [`Emitter::next_block`] until it returns `Ok(None)` (which means the chain tip is reached). A
8+ //! separate method, [`Emitter::mempool`] can be used to emit the whole mempool.
109#![ warn( missing_docs) ]
1110
1211use bdk_core:: { BlockId , CheckPoint } ;
13- use bitcoin:: { block:: Header , Block , BlockHash , Transaction } ;
14- use bitcoincore_rpc:: bitcoincore_rpc_json;
12+ use bitcoin:: { Block , BlockHash , Transaction , Txid } ;
13+ use bitcoincore_rpc:: { bitcoincore_rpc_json, RpcApi } ;
14+ use std:: { collections:: HashSet , ops:: Deref } ;
1515
1616pub mod bip158;
1717
@@ -22,8 +22,8 @@ pub use bitcoincore_rpc;
2222/// Refer to [module-level documentation] for more.
2323///
2424/// [module-level documentation]: crate
25- pub struct Emitter < ' c , C > {
26- client : & ' c C ,
25+ pub struct Emitter < C > {
26+ client : C ,
2727 start_height : u32 ,
2828
2929 /// The checkpoint of the last-emitted block that is in the best chain. If it is later found
@@ -43,28 +43,65 @@ pub struct Emitter<'c, C> {
4343 /// The last emitted block during our last mempool emission. This is used to determine whether
4444 /// there has been a reorg since our last mempool emission.
4545 last_mempool_tip : Option < u32 > ,
46+
47+ /// A set of txids currently assumed to still be in the mempool.
48+ ///
49+ /// This is used to detect mempool evictions by comparing the set against the latest mempool
50+ /// snapshot from bitcoind. Any txid in this set that is missing from the snapshot is considered
51+ /// evicted.
52+ ///
53+ /// When the emitter emits a block, confirmed txids are removed from this set. This prevents
54+ /// confirmed transactions from being mistakenly marked with an `evicted_at` timestamp.
55+ expected_mempool_txids : HashSet < Txid > ,
4656}
4757
48- impl < ' c , C : bitcoincore_rpc:: RpcApi > Emitter < ' c , C > {
58+ /// Indicates that there are no initially expected mempool transactions.
59+ ///
60+ /// Pass this to the `expected_mempool_txids` field of [`Emitter::new`] when the wallet is known
61+ /// to start empty (i.e. with no unconfirmed transactions).
62+ pub const NO_EXPECTED_MEMPOOL_TXIDS : core:: iter:: Empty < Txid > = core:: iter:: empty ( ) ;
63+
64+ impl < C > Emitter < C >
65+ where
66+ C : Deref ,
67+ C :: Target : RpcApi ,
68+ {
4969 /// Construct a new [`Emitter`].
5070 ///
5171 /// `last_cp` informs the emitter of the chain we are starting off with. This way, the emitter
5272 /// can start emission from a block that connects to the original chain.
5373 ///
5474 /// `start_height` starts emission from a given height (if there are no conflicts with the
5575 /// original chain).
56- pub fn new ( client : & ' c C , last_cp : CheckPoint , start_height : u32 ) -> Self {
76+ ///
77+ /// `expected_mempool_txids` is the initial set of unconfirmed txids provided by the wallet.
78+ /// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions. If it is
79+ /// known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXIDS`] can be used.
80+ pub fn new (
81+ client : C ,
82+ last_cp : CheckPoint ,
83+ start_height : u32 ,
84+ expected_mempool_txids : impl IntoIterator < Item = impl Into < Txid > > ,
85+ ) -> Self {
5786 Self {
5887 client,
5988 start_height,
6089 last_cp,
6190 last_block : None ,
6291 last_mempool_time : 0 ,
6392 last_mempool_tip : None ,
93+ expected_mempool_txids : expected_mempool_txids. into_iter ( ) . map ( Into :: into) . collect ( ) ,
6494 }
6595 }
6696
67- /// Emit mempool transactions, alongside their first-seen unix timestamps.
97+ /// Emit mempool transactions and any evicted [`Txid`]s.
98+ ///
99+ /// This method returns a [`MempoolEvent`] containing the full transactions (with their
100+ /// first-seen unix timestamps) that were emitted, and [`MempoolEvent::evicted_txids`] which are
101+ /// any [`Txid`]s which were previously seen in the mempool and are now missing. Evicted txids
102+ /// are only reported once the emitter’s checkpoint matches the RPC’s best block in both height
103+ /// and hash. Until `next_block()` advances the checkpoint to tip, `mempool()` will always
104+ /// return an empty `evicted_txids` set.
68105 ///
69106 /// This method emits each transaction only once, unless we cannot guarantee the transaction's
70107 /// ancestors are already emitted.
@@ -74,8 +111,8 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
74111 /// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block
75112 /// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
76113 /// at height `h`.
77- pub fn mempool ( & mut self ) -> Result < Vec < ( Transaction , u64 ) > , bitcoincore_rpc:: Error > {
78- let client = self . client ;
114+ pub fn mempool ( & mut self ) -> Result < MempoolEvent , bitcoincore_rpc:: Error > {
115+ let client = & * self . client ;
79116
80117 // This is the emitted tip height during the last mempool emission.
81118 let prev_mempool_tip = self
@@ -84,15 +121,46 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
84121 // `start_height` has been emitted.
85122 . unwrap_or ( self . start_height . saturating_sub ( 1 ) ) ;
86123
124+ // Loop to make sure that the fetched mempool content and the fetched tip are consistent
125+ // with one another.
126+ let ( raw_mempool, raw_mempool_txids, rpc_height, rpc_block_hash) = loop {
127+ // Determine if height and hash matches the best block from the RPC. Evictions are deferred
128+ // if we are not at the best block.
129+ let height = client. get_block_count ( ) ?;
130+ let hash = client. get_block_hash ( height) ?;
131+
132+ // Get the raw mempool result from the RPC client which will be used to determine if any
133+ // transactions have been evicted.
134+ let mp = client. get_raw_mempool_verbose ( ) ?;
135+ let mp_txids: HashSet < Txid > = mp. keys ( ) . copied ( ) . collect ( ) ;
136+
137+ if height == client. get_block_count ( ) ? && hash == client. get_block_hash ( height) ? {
138+ break ( mp, mp_txids, height, hash) ;
139+ }
140+ } ;
141+
142+ let at_tip =
143+ rpc_height == self . last_cp . height ( ) as u64 && rpc_block_hash == self . last_cp . hash ( ) ;
144+
145+ // If at tip, any expected txid missing from raw mempool is considered evicted;
146+ // if not at tip, we don't evict anything.
147+ let evicted_txids: HashSet < Txid > = if at_tip {
148+ self . expected_mempool_txids
149+ . difference ( & raw_mempool_txids)
150+ . copied ( )
151+ . collect ( )
152+ } else {
153+ HashSet :: new ( )
154+ } ;
155+
87156 // Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
88157 // track of the latest mempool tx's timestamp to determine whether we have seen a tx
89158 // before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
90159 // be the new latest timestamp.
91160 let prev_mempool_time = self . last_mempool_time ;
92161 let mut latest_time = prev_mempool_time;
93162
94- let txs_to_emit = client
95- . get_raw_mempool_verbose ( ) ?
163+ let new_txs = raw_mempool
96164 . into_iter ( )
97165 . filter_map ( {
98166 let latest_time = & mut latest_time;
@@ -101,25 +169,25 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
101169 if tx_time > * latest_time {
102170 * latest_time = tx_time;
103171 }
104-
105- // Avoid emitting transactions that are already emitted if we can guarantee
106- // blocks containing ancestors are already emitted. The bitcoind rpc interface
107- // provides us with the block height that the tx is introduced to the mempool.
108- // If we have already emitted the block of height, we can assume that all
109- // ancestor txs have been processed by the receiver.
172+ // Best-effort check to avoid re-emitting transactions we've already emitted.
173+ //
174+ // Complete suppression isn't possible, since a transaction may spend outputs
175+ // owned by the wallet. To determine if such a transaction is relevant, we must
176+ // have already seen its ancestor(s) that contain the spent prevouts.
177+ //
178+ // Fortunately, bitcoind provides the block height at which the transaction
179+ // entered the mempool. If we've already emitted that block height, we can
180+ // reasonably assume the receiver has seen all ancestor transactions.
110181 let is_already_emitted = tx_time <= prev_mempool_time;
111182 let is_within_height = tx_entry. height <= prev_mempool_tip as _ ;
112183 if is_already_emitted && is_within_height {
113184 return None ;
114185 }
115-
116186 let tx = match client. get_raw_transaction ( & txid, None ) {
117187 Ok ( tx) => tx,
118- // the tx is confirmed or evicted since `get_raw_mempool_verbose`
119188 Err ( err) if err. is_not_found_error ( ) => return None ,
120189 Err ( err) => return Some ( Err ( err) ) ,
121190 } ;
122-
123191 Some ( Ok ( ( tx, tx_time as u64 ) ) )
124192 }
125193 } )
@@ -128,26 +196,68 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
128196 self . last_mempool_time = latest_time;
129197 self . last_mempool_tip = Some ( self . last_cp . height ( ) ) ;
130198
131- Ok ( txs_to_emit)
132- }
199+ // If at tip, we replace `expected_mempool_txids` with just the new txids. Otherwise, we’re
200+ // still catching up to the tip and keep accumulating.
201+ if at_tip {
202+ self . expected_mempool_txids = new_txs. iter ( ) . map ( |( tx, _) | tx. compute_txid ( ) ) . collect ( ) ;
203+ } else {
204+ self . expected_mempool_txids
205+ . extend ( new_txs. iter ( ) . map ( |( tx, _) | tx. compute_txid ( ) ) ) ;
206+ }
133207
134- /// Emit the next block height and header (if any).
135- pub fn next_header ( & mut self ) -> Result < Option < BlockEvent < Header > > , bitcoincore_rpc:: Error > {
136- Ok ( poll ( self , |hash| self . client . get_block_header ( hash) ) ?
137- . map ( |( checkpoint, block) | BlockEvent { block, checkpoint } ) )
208+ Ok ( MempoolEvent {
209+ new_txs,
210+ evicted_txids,
211+ latest_update_time : latest_time as u64 ,
212+ } )
138213 }
139214
140215 /// Emit the next block height and block (if any).
141216 pub fn next_block ( & mut self ) -> Result < Option < BlockEvent < Block > > , bitcoincore_rpc:: Error > {
142- Ok ( poll ( self , |hash| self . client . get_block ( hash) ) ?
143- . map ( |( checkpoint, block) | BlockEvent { block, checkpoint } ) )
217+ if let Some ( ( checkpoint, block) ) = poll ( self , move |hash, client| client. get_block ( hash) ) ? {
218+ // Stop tracking unconfirmed transactions that have been confirmed in this block.
219+ for tx in & block. txdata {
220+ self . expected_mempool_txids . remove ( & tx. compute_txid ( ) ) ;
221+ }
222+ return Ok ( Some ( BlockEvent { block, checkpoint } ) ) ;
223+ }
224+ Ok ( None )
225+ }
226+ }
227+
228+ /// A new emission from mempool.
229+ #[ derive( Debug ) ]
230+ pub struct MempoolEvent {
231+ /// Unemitted transactions or transactions with ancestors that are unseen by the receiver.
232+ ///
233+ /// To understand the second condition, consider a receiver which filters transactions based on
234+ /// whether it alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction
235+ /// spends a tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to
236+ /// block of height `h-1`, we want to re-emit this transaction until the receiver has seen the
237+ /// block at height `h`.
238+ pub new_txs : Vec < ( Transaction , u64 ) > ,
239+
240+ /// [`Txid`]s of all transactions that have been evicted from mempool.
241+ pub evicted_txids : HashSet < Txid > ,
242+
243+ /// The latest timestamp of when a transaction entered the mempool.
244+ ///
245+ /// This is useful for setting the timestamp for evicted transactions.
246+ pub latest_update_time : u64 ,
247+ }
248+
249+ impl MempoolEvent {
250+ /// Returns an iterator of `(txid, evicted_at)` pairs for all evicted transactions.
251+ pub fn evicted_ats ( & self ) -> impl ExactSizeIterator < Item = ( Txid , u64 ) > + ' _ {
252+ let time = self . latest_update_time ;
253+ self . evicted_txids . iter ( ) . map ( move |& txid| ( txid, time) )
144254 }
145255}
146256
147257/// A newly emitted block from [`Emitter`].
148258#[ derive( Debug ) ]
149259pub struct BlockEvent < B > {
150- /// Either a full [`Block`] or [`Header`] of the new block.
260+ /// The block.
151261 pub block : B ,
152262
153263 /// The checkpoint of the new block.
@@ -199,9 +309,10 @@ enum PollResponse {
199309
200310fn poll_once < C > ( emitter : & Emitter < C > ) -> Result < PollResponse , bitcoincore_rpc:: Error >
201311where
202- C : bitcoincore_rpc:: RpcApi ,
312+ C : Deref ,
313+ C :: Target : RpcApi ,
203314{
204- let client = emitter. client ;
315+ let client = & * emitter. client ;
205316
206317 if let Some ( last_res) = & emitter. last_block {
207318 let next_hash = if last_res. height < emitter. start_height as _ {
@@ -255,15 +366,16 @@ fn poll<C, V, F>(
255366 get_item : F ,
256367) -> Result < Option < ( CheckPoint , V ) > , bitcoincore_rpc:: Error >
257368where
258- C : bitcoincore_rpc:: RpcApi ,
259- F : Fn ( & BlockHash ) -> Result < V , bitcoincore_rpc:: Error > ,
369+ C : Deref ,
370+ C :: Target : RpcApi ,
371+ F : Fn ( & BlockHash , & C :: Target ) -> Result < V , bitcoincore_rpc:: Error > ,
260372{
261373 loop {
262374 match poll_once ( emitter) ? {
263375 PollResponse :: Block ( res) => {
264376 let height = res. height as u32 ;
265377 let hash = res. hash ;
266- let item = get_item ( & hash) ?;
378+ let item = get_item ( & hash, & emitter . client ) ?;
267379
268380 let new_cp = emitter
269381 . last_cp
@@ -329,3 +441,81 @@ impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
329441 }
330442 }
331443}
444+
445+ #[ cfg( test) ]
446+ mod test {
447+ use crate :: { bitcoincore_rpc:: RpcApi , Emitter , NO_EXPECTED_MEMPOOL_TXIDS } ;
448+ use bdk_chain:: local_chain:: LocalChain ;
449+ use bdk_testenv:: { anyhow, TestEnv } ;
450+ use bitcoin:: { hashes:: Hash , Address , Amount , ScriptBuf , Txid , WScriptHash } ;
451+ use std:: collections:: HashSet ;
452+
453+ #[ test]
454+ fn test_expected_mempool_txids_accumulate_and_remove ( ) -> anyhow:: Result < ( ) > {
455+ let env = TestEnv :: new ( ) ?;
456+ let chain = LocalChain :: from_genesis_hash ( env. rpc_client ( ) . get_block_hash ( 0 ) ?) . 0 ;
457+ let chain_tip = chain. tip ( ) ;
458+ let mut emitter = Emitter :: new (
459+ env. rpc_client ( ) ,
460+ chain_tip. clone ( ) ,
461+ 1 ,
462+ NO_EXPECTED_MEMPOOL_TXIDS ,
463+ ) ;
464+
465+ env. mine_blocks ( 100 , None ) ?;
466+ while emitter. next_block ( ) ?. is_some ( ) { }
467+
468+ let spk_to_track = ScriptBuf :: new_p2wsh ( & WScriptHash :: all_zeros ( ) ) ;
469+ let addr_to_track = Address :: from_script ( & spk_to_track, bitcoin:: Network :: Regtest ) ?;
470+ let mut mempool_txids = HashSet :: new ( ) ;
471+
472+ // Send a tx at different heights and ensure txs are accumulating in expected_mempool_txids.
473+ for _ in 0 ..10 {
474+ let sent_txid = env. send ( & addr_to_track, Amount :: from_sat ( 1_000 ) ) ?;
475+ mempool_txids. insert ( sent_txid) ;
476+ emitter. mempool ( ) ?;
477+ env. mine_blocks ( 1 , None ) ?;
478+
479+ for txid in & mempool_txids {
480+ assert ! (
481+ emitter. expected_mempool_txids. contains( txid) ,
482+ "Expected txid {:?} missing" ,
483+ txid
484+ ) ;
485+ }
486+ }
487+
488+ // Process each block and check that confirmed txids are removed from from
489+ // expected_mempool_txids.
490+ while let Some ( block_event) = emitter. next_block ( ) ? {
491+ let confirmed_txids: HashSet < Txid > = block_event
492+ . block
493+ . txdata
494+ . iter ( )
495+ . map ( |tx| tx. compute_txid ( ) )
496+ . collect ( ) ;
497+ mempool_txids = mempool_txids
498+ . difference ( & confirmed_txids)
499+ . copied ( )
500+ . collect :: < HashSet < _ > > ( ) ;
501+ for txid in confirmed_txids {
502+ assert ! (
503+ !emitter. expected_mempool_txids. contains( & txid) ,
504+ "Expected txid {:?} should have been removed" ,
505+ txid
506+ ) ;
507+ }
508+ for txid in & mempool_txids {
509+ assert ! (
510+ emitter. expected_mempool_txids. contains( txid) ,
511+ "Expected txid {:?} missing" ,
512+ txid
513+ ) ;
514+ }
515+ }
516+
517+ assert ! ( emitter. expected_mempool_txids. is_empty( ) ) ;
518+
519+ Ok ( ( ) )
520+ }
521+ }
0 commit comments