1010#![ warn( missing_docs) ]
1111
1212use bdk_core:: { BlockId , CheckPoint } ;
13- use bitcoin:: { block:: Header , Block , BlockHash , Transaction } ;
13+ use bitcoin:: { block:: Header , Block , BlockHash , Transaction , Txid } ;
1414use bitcoincore_rpc:: bitcoincore_rpc_json;
15+ use std:: collections:: HashSet ;
1516
1617pub mod bip158;
1718
@@ -64,18 +65,21 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
6465 }
6566 }
6667
67- /// Emit mempool transactions, alongside their first-seen unix timestamps .
68+ /// Emit mempool transactions and capture the initial snapshot of all mempool [`Txid`]s .
6869 ///
69- /// This method emits each transaction only once, unless we cannot guarantee the transaction's
70- /// ancestors are already emitted.
70+ /// This method returns a [`MempoolEvent`] containing the full transactions (with their
71+ /// first-seen unix timestamps) that were emitted, and the set of all [`Txid`]s present from the
72+ /// initial mempool query. Each transaction is emitted only once, unless we cannot guarantee the
73+ /// transaction's ancestors are already emitted.
7174 ///
7275 /// To understand why, consider a receiver which filters transactions based on whether it
7376 /// alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction spends a
7477 /// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block
7578 /// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
7679 /// at height `h`.
77- pub fn mempool ( & mut self ) -> Result < Vec < ( Transaction , u64 ) > , bitcoincore_rpc:: Error > {
80+ pub fn mempool ( & mut self ) -> Result < MempoolEvent , bitcoincore_rpc:: Error > {
7881 let client = self . client ;
82+ let mut emitted_txs = Vec :: new ( ) ;
7983
8084 // This is the emitted tip height during the last mempool emission.
8185 let prev_mempool_tip = self
@@ -91,44 +95,49 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
9195 let prev_mempool_time = self . last_mempool_time ;
9296 let mut latest_time = prev_mempool_time;
9397
94- let txs_to_emit = client
95- . get_raw_mempool_verbose ( ) ?
96- . into_iter ( )
97- . filter_map ( {
98- let latest_time = & mut latest_time;
99- move |( txid, tx_entry) | -> Option < Result < _ , bitcoincore_rpc:: Error > > {
100- let tx_time = tx_entry. time as usize ;
101- if tx_time > * latest_time {
102- * latest_time = tx_time;
103- }
98+ // Get the raw mempool result from the RPC client.
99+ let raw_mempool = client. get_raw_mempool_verbose ( ) ?;
100+ let raw_mempool_txids = raw_mempool. keys ( ) . copied ( ) . collect :: < HashSet < _ > > ( ) ;
104101
105- // Avoid emitting transactions that are already emitted if we can guarantee
106- // blocks containing ancestors are already emitted. The bitcoind rpc interface
107- // provides us with the block height that the tx is introduced to the mempool.
108- // If we have already emitted the block of height, we can assume that all
109- // ancestor txs have been processed by the receiver.
110- let is_already_emitted = tx_time <= prev_mempool_time;
111- let is_within_height = tx_entry. height <= prev_mempool_tip as _ ;
112- if is_already_emitted && is_within_height {
113- return None ;
114- }
102+ for ( txid, tx_entry) in raw_mempool. into_iter ( ) {
103+ let tx_time = tx_entry. time as usize ;
104+ if tx_time > latest_time {
105+ latest_time = tx_time;
106+ }
115107
116- let tx = match client. get_raw_transaction ( & txid, None ) {
117- Ok ( tx) => tx,
118- // the tx is confirmed or evicted since `get_raw_mempool_verbose`
119- Err ( err) if err. is_not_found_error ( ) => return None ,
120- Err ( err) => return Some ( Err ( err) ) ,
121- } ;
108+ // Avoid emitting transactions that are already emitted if we can guarantee blocks
109+ // containing ancestors are already emitted. The bitcoind rpc interface provides us with
110+ // the block height that the tx is introduced to the mempool. If we have already emitted
111+ // the block of height, we can assume that all ancestor txs have been processed by the
112+ // receiver.
113+ let is_already_emitted = tx_time <= prev_mempool_time;
114+ let is_within_height = tx_entry. height <= prev_mempool_tip as _ ;
115+ if is_already_emitted && is_within_height {
116+ continue ;
117+ }
122118
123- Some ( Ok ( ( tx, tx_time as u64 ) ) )
119+ match client. get_raw_transaction ( & txid, None ) {
120+ Ok ( tx) => {
121+ emitted_txs. push ( ( tx, tx_time as u64 ) ) ;
122+ }
123+ Err ( err) => {
124+ if err. is_not_found_error ( ) {
125+ // the tx is confirmed or evicted since `get_raw_mempool_verbose`
126+ continue ;
127+ } else {
128+ return Err ( err) ;
129+ }
124130 }
125- } )
126- . collect :: < Result < Vec < _ > , _ > > ( ) ? ;
131+ } ;
132+ }
127133
128134 self . last_mempool_time = latest_time;
129135 self . last_mempool_tip = Some ( self . last_cp . height ( ) ) ;
130136
131- Ok ( txs_to_emit)
137+ Ok ( MempoolEvent {
138+ emitted_txs,
139+ raw_mempool_txids,
140+ } )
132141 }
133142
134143 /// Emit the next block height and header (if any).
@@ -144,6 +153,34 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
144153 }
145154}
146155
156+ /// A new emission from mempool.
157+ #[ derive( Debug ) ]
158+ pub struct MempoolEvent {
159+ /// Emitted mempool transactions with their first‐seen unix timestamps.
160+ pub emitted_txs : Vec < ( Transaction , u64 ) > ,
161+
162+ /// Set of all [`Txid`]s from the raw mempool result, including transactions that may have been
163+ /// confirmed or evicted during processing. This is used to determine which expected
164+ /// transactions are missing.
165+ pub raw_mempool_txids : HashSet < Txid > ,
166+ }
167+
168+ impl MempoolEvent {
169+ /// Given an iterator of expected [`Txid`]s, return those that are missing from the mempool.
170+ pub fn evicted_txids (
171+ & self ,
172+ expected_unconfirmed_txids : impl IntoIterator < Item = Txid > ,
173+ ) -> HashSet < Txid > {
174+ let expected_set = expected_unconfirmed_txids
175+ . into_iter ( )
176+ . collect :: < HashSet < _ > > ( ) ;
177+ expected_set
178+ . difference ( & self . raw_mempool_txids )
179+ . copied ( )
180+ . collect ( )
181+ }
182+ }
183+
147184/// A newly emitted block from [`Emitter`].
148185#[ derive( Debug ) ]
149186pub struct BlockEvent < B > {
0 commit comments