Skip to content

Commit b94a12a

Browse files
committed
refactor(rpc)!: update mempool interface and test code
1 parent 20f16cd commit b94a12a

3 files changed

Lines changed: 196 additions & 34 deletions

File tree

crates/bitcoind_rpc/src/lib.rs

Lines changed: 162 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@
1010
#![warn(missing_docs)]
1111

1212
use bdk_core::{BlockId, CheckPoint};
13-
use bitcoin::{block::Header, Block, BlockHash, Transaction};
13+
use bitcoin::{block::Header, Block, BlockHash, Transaction, Txid};
1414
use bitcoincore_rpc::bitcoincore_rpc_json;
15+
use std::collections::HashSet;
1516

1617
pub mod bip158;
1718

@@ -43,6 +44,9 @@ pub struct Emitter<'c, C> {
4344
/// The last emitted block during our last mempool emission. This is used to determine whether
4445
/// there has been a reorg since our last mempool emission.
4546
last_mempool_tip: Option<u32>,
47+
48+
/// Expected mempool txs. TODO: Docs.
49+
expected_mempool_txids: HashSet<Txid>,
4650
}
4751

4852
impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
@@ -53,28 +57,36 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
5357
///
5458
/// `start_height` starts emission from a given height (if there are no conflicts with the
5559
/// original chain).
56-
pub fn new(client: &'c C, last_cp: CheckPoint, start_height: u32) -> Self {
60+
pub fn new(
61+
client: &'c C,
62+
last_cp: CheckPoint,
63+
start_height: u32,
64+
expected_mempool_txids: HashSet<Txid>,
65+
) -> Self {
5766
Self {
5867
client,
5968
start_height,
6069
last_cp,
6170
last_block: None,
6271
last_mempool_time: 0,
6372
last_mempool_tip: None,
73+
expected_mempool_txids,
6474
}
6575
}
6676

67-
/// Emit mempool transactions, alongside their first-seen unix timestamps.
77+
/// Emit mempool transactions and capture the initial snapshot of all mempool [`Txid`]s.
6878
///
69-
/// This method emits each transaction only once, unless we cannot guarantee the transaction's
70-
/// ancestors are already emitted.
79+
/// This method returns a [`MempoolEvent`] containing the full transactions (with their
80+
/// first-seen unix timestamps) that were emitted, and the set of all [`Txid`]s present from the
81+
/// initial mempool query. Each transaction is emitted only once, unless we cannot guarantee the
82+
/// transaction's ancestors are already emitted.
7183
///
7284
/// To understand why, consider a receiver which filters transactions based on whether it
7385
/// alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction spends a
7486
/// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block
7587
/// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
7688
/// at height `h`.
77-
pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
89+
pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
7890
let client = self.client;
7991

8092
// This is the emitted tip height during the last mempool emission.
@@ -84,24 +96,41 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
8496
// `start_height` has been emitted.
8597
.unwrap_or(self.start_height.saturating_sub(1));
8698

99+
// Get the raw mempool result from the RPC client.
100+
let raw_mempool = client.get_raw_mempool_verbose()?;
101+
let raw_mempool_txids: HashSet<Txid> = raw_mempool.keys().copied().collect();
102+
103+
// Determine if height matches last emitted block.
104+
let at_tip = client.get_block_count()? == self.last_cp.height() as u64;
105+
106+
// If at tip, any expected txid missing from raw mempool is considered evicted;
107+
// if not at tip, we don't evict anything.
108+
let mut evicted_txids: HashSet<Txid> = if at_tip {
109+
self.expected_mempool_txids
110+
.difference(&raw_mempool_txids)
111+
.copied()
112+
.collect()
113+
} else {
114+
HashSet::new()
115+
};
116+
87117
// Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
88118
// track of the latest mempool tx's timestamp to determine whether we have seen a tx
89119
// before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
90120
// be the new latest timestamp.
91121
let prev_mempool_time = self.last_mempool_time;
92122
let mut latest_time = prev_mempool_time;
93123

94-
let txs_to_emit = client
95-
.get_raw_mempool_verbose()?
124+
let new_txs = raw_mempool
96125
.into_iter()
97126
.filter_map({
98127
let latest_time = &mut latest_time;
128+
let evicted_txids = &mut evicted_txids;
99129
move |(txid, tx_entry)| -> Option<Result<_, bitcoincore_rpc::Error>> {
100130
let tx_time = tx_entry.time as usize;
101131
if tx_time > *latest_time {
102132
*latest_time = tx_time;
103133
}
104-
105134
// Avoid emitting transactions that are already emitted if we can guarantee
106135
// blocks containing ancestors are already emitted. The bitcoind rpc interface
107136
// provides us with the block height that the tx is introduced to the mempool.
@@ -112,23 +141,40 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
112141
if is_already_emitted && is_within_height {
113142
return None;
114143
}
115-
116144
let tx = match client.get_raw_transaction(&txid, None) {
117145
Ok(tx) => tx,
118-
// the tx is confirmed or evicted since `get_raw_mempool_verbose`
119-
Err(err) if err.is_not_found_error() => return None,
146+
Err(err) if err.is_not_found_error() => {
147+
// If at tip and the transaction isn't found, mark it as evicted.
148+
if at_tip {
149+
evicted_txids.insert(txid);
150+
}
151+
return None;
152+
}
120153
Err(err) => return Some(Err(err)),
121154
};
122-
123155
Some(Ok((tx, tx_time as u64)))
124156
}
125157
})
126158
.collect::<Result<Vec<_>, _>>()?;
127159

128160
self.last_mempool_time = latest_time;
129161
self.last_mempool_tip = Some(self.last_cp.height());
162+
if at_tip {
163+
self.expected_mempool_txids = new_txs.iter().map(|(tx, _)| tx.compute_txid()).collect();
164+
} else {
165+
self.expected_mempool_txids.extend(
166+
new_txs
167+
.iter()
168+
.map(|(tx, _)| tx.compute_txid())
169+
.collect::<Vec<_>>(),
170+
);
171+
}
130172

131-
Ok(txs_to_emit)
173+
Ok(MempoolEvent {
174+
new_txs,
175+
evicted_txids,
176+
latest_update_time: latest_time as u64,
177+
})
132178
}
133179

134180
/// Emit the next block height and header (if any).
@@ -139,11 +185,37 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
139185

140186
/// Emit the next block height and block (if any).
141187
pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block>>, bitcoincore_rpc::Error> {
142-
Ok(poll(self, |hash| self.client.get_block(hash))?
143-
.map(|(checkpoint, block)| BlockEvent { block, checkpoint }))
188+
if let Some((checkpoint, block)) = poll(self, |hash| self.client.get_block(hash))? {
189+
for tx in &block.txdata {
190+
self.expected_mempool_txids.remove(&tx.compute_txid());
191+
}
192+
return Ok(Some(BlockEvent { block, checkpoint }));
193+
}
194+
Ok(None)
144195
}
145196
}
146197

198+
/// A new emission from mempool.
199+
#[derive(Debug)]
200+
pub struct MempoolEvent {
201+
/// Unemitted transactions or transactions with ancestors that are unseen by the receiver.
202+
///
203+
/// To understand the second condition, consider a receiver which filters transactions based on
204+
/// whether it alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction
205+
/// spends a tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to
206+
/// block of height `h-1`, we want to re-emit this transaction until the receiver has seen the
207+
/// block at height `h`.
208+
pub new_txs: Vec<(Transaction, u64)>,
209+
210+
/// [`Txid`]s of all transactions that have been evicted from mempool.
211+
pub evicted_txids: HashSet<Txid>,
212+
213+
/// The latest timestamp of when a transaction entered the mempool.
214+
///
215+
/// This is useful for setting the timestamp for evicted transactions.
216+
pub latest_update_time: u64,
217+
}
218+
147219
/// A newly emitted block from [`Emitter`].
148220
#[derive(Debug)]
149221
pub struct BlockEvent<B> {
@@ -329,3 +401,77 @@ impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
329401
}
330402
}
331403
}
404+
405+
#[cfg(test)]
406+
mod test {
407+
use crate::{bitcoincore_rpc::RpcApi, Emitter};
408+
use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoin::Txid;
409+
use bdk_chain::local_chain::LocalChain;
410+
use bdk_testenv::{anyhow, TestEnv};
411+
use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, WScriptHash};
412+
use std::collections::HashSet;
413+
414+
#[test]
415+
fn test_expected_mempool_txids_accumulate_and_remove() -> anyhow::Result<()> {
416+
let env = TestEnv::new()?;
417+
let chain = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?).0;
418+
let chain_tip = chain.tip();
419+
let mut emitter = Emitter::new(env.rpc_client(), chain_tip.clone(), 1, HashSet::new());
420+
421+
env.mine_blocks(100, None)?;
422+
while emitter.next_block()?.is_some() {}
423+
424+
let spk_to_track = ScriptBuf::new_p2wsh(&WScriptHash::all_zeros());
425+
let addr_to_track = Address::from_script(&spk_to_track, bitcoin::Network::Regtest)?;
426+
let mut mempool_txids = HashSet::new();
427+
428+
// Send a tx at different heights and ensure txs are accumulating in expected_mempool_txids.
429+
for _ in 0..10 {
430+
let sent_txid = env.send(&addr_to_track, Amount::from_sat(1_000))?;
431+
mempool_txids.insert(sent_txid);
432+
emitter.mempool()?;
433+
env.mine_blocks(1, None)?;
434+
435+
for txid in &mempool_txids {
436+
assert!(
437+
emitter.expected_mempool_txids.contains(txid),
438+
"Expected txid {:?} missing",
439+
txid
440+
);
441+
}
442+
}
443+
444+
// Process each block and check that confirmed txids are removed from from
445+
// expected_mempool_txids.
446+
while let Some(block_event) = emitter.next_block()? {
447+
let confirmed_txids: HashSet<Txid> = block_event
448+
.block
449+
.txdata
450+
.iter()
451+
.map(|tx| tx.compute_txid())
452+
.collect();
453+
mempool_txids = mempool_txids
454+
.difference(&confirmed_txids)
455+
.copied()
456+
.collect::<HashSet<_>>();
457+
for txid in confirmed_txids {
458+
assert!(
459+
!emitter.expected_mempool_txids.contains(&txid),
460+
"Expected txid {:?} should have been removed",
461+
txid
462+
);
463+
}
464+
for txid in &mempool_txids {
465+
assert!(
466+
emitter.expected_mempool_txids.contains(txid),
467+
"Expected txid {:?} missing",
468+
txid
469+
);
470+
}
471+
}
472+
473+
assert!(emitter.expected_mempool_txids.is_empty());
474+
475+
Ok(())
476+
}
477+
}

0 commit comments

Comments
 (0)