Skip to content

Commit 8b055d2

Browse files
committed
refactor(rpc)!: update mempool interface and test code
1 parent c727cd2 commit 8b055d2

3 files changed

Lines changed: 136 additions & 33 deletions

File tree

crates/bitcoind_rpc/src/lib.rs

Lines changed: 102 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@
1010
#![warn(missing_docs)]
1111

1212
use bdk_core::{BlockId, CheckPoint};
13-
use bitcoin::{block::Header, Block, BlockHash, Transaction};
13+
use bitcoin::{block::Header, Block, BlockHash, Transaction, Txid};
1414
use bitcoincore_rpc::bitcoincore_rpc_json;
15+
use std::collections::HashSet;
1516

1617
pub mod bip158;
1718

@@ -43,6 +44,9 @@ pub struct Emitter<'c, C> {
4344
/// The last emitted block during our last mempool emission. This is used to determine whether
4445
/// there has been a reorg since our last mempool emission.
4546
last_mempool_tip: Option<u32>,
47+
48+
/// Expected mempool txs. TODO: Docs.
49+
expected_mempool_txids: HashSet<Txid>,
4650
}
4751

4852
impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
@@ -53,28 +57,36 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
5357
///
5458
/// `start_height` starts emission from a given height (if there are no conflicts with the
5559
/// original chain).
56-
pub fn new(client: &'c C, last_cp: CheckPoint, start_height: u32) -> Self {
60+
pub fn new(
61+
client: &'c C,
62+
last_cp: CheckPoint,
63+
start_height: u32,
64+
expected_mempool_txids: HashSet<Txid>,
65+
) -> Self {
5766
Self {
5867
client,
5968
start_height,
6069
last_cp,
6170
last_block: None,
6271
last_mempool_time: 0,
6372
last_mempool_tip: None,
73+
expected_mempool_txids,
6474
}
6575
}
6676

67-
/// Emit mempool transactions, alongside their first-seen unix timestamps.
77+
/// Emit mempool transactions and capture the initial snapshot of all mempool [`Txid`]s.
6878
///
69-
/// This method emits each transaction only once, unless we cannot guarantee the transaction's
70-
/// ancestors are already emitted.
79+
/// This method returns a [`MempoolEvent`] containing the full transactions (with their
80+
/// first-seen unix timestamps) that were emitted, and the set of all [`Txid`]s present from the
81+
/// initial mempool query. Each transaction is emitted only once, unless we cannot guarantee the
82+
/// transaction's ancestors are already emitted.
7183
///
7284
/// To understand why, consider a receiver which filters transactions based on whether it
7385
/// alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction spends a
7486
/// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block
7587
/// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
7688
/// at height `h`.
77-
pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
89+
pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
7890
let client = self.client;
7991

8092
// This is the emitted tip height during the last mempool emission.
@@ -84,24 +96,65 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
8496
// `start_height` has been emitted.
8597
.unwrap_or(self.start_height.saturating_sub(1));
8698

99+
// Get the raw mempool result from the RPC client.
100+
let raw_mempool = client.get_raw_mempool_verbose()?;
101+
let raw_mempool_txids: HashSet<Txid> = raw_mempool.keys().copied().collect();
102+
103+
// Determine the latest block height from the raw mempool data.
104+
let latest_height = raw_mempool
105+
.values()
106+
.map(|entry| entry.height)
107+
.max()
108+
.unwrap_or(0);
109+
110+
// Determine if we are at tip: if the highest mempool height equals the current chain tip.
111+
let at_tip = latest_height == self.last_cp.height() as u64;
112+
113+
// Clear out `expected_mempool_txids` if we are at the latest block height or if we are on a
114+
// different block.
115+
if prev_mempool_tip != self.last_cp.height() {
116+
self.expected_mempool_txids.clear();
117+
} else if at_tip {
118+
let blockhash = client.get_block_hash(latest_height)?;
119+
let block = client.get_block(&blockhash)?;
120+
let confirmed_txids = block
121+
.txdata
122+
.into_iter()
123+
.map(|tx| tx.compute_txid())
124+
.collect::<Vec<_>>();
125+
println!("CONFIRMED TXIDS: {:?}", confirmed_txids);
126+
self.expected_mempool_txids
127+
.retain(|txid| !confirmed_txids.contains(txid));
128+
}
129+
130+
// If at tip, any expected txid missing from raw mempool is considered evicted;
131+
// if not at tip, we don't evict anything.
132+
let mut evicted_txids: HashSet<Txid> = if at_tip {
133+
self.expected_mempool_txids
134+
.difference(&raw_mempool_txids)
135+
.copied()
136+
.collect()
137+
} else {
138+
HashSet::new()
139+
};
140+
87141
// Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
88142
// track of the latest mempool tx's timestamp to determine whether we have seen a tx
89143
// before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
90144
// be the new latest timestamp.
91145
let prev_mempool_time = self.last_mempool_time;
92146
let mut latest_time = prev_mempool_time;
93147

94-
let txs_to_emit = client
95-
.get_raw_mempool_verbose()?
148+
let new_txs = raw_mempool
96149
.into_iter()
97150
.filter_map({
98151
let latest_time = &mut latest_time;
152+
let evicted_txids = &mut evicted_txids;
99153
move |(txid, tx_entry)| -> Option<Result<_, bitcoincore_rpc::Error>> {
100154
let tx_time = tx_entry.time as usize;
101155
if tx_time > *latest_time {
102156
*latest_time = tx_time;
103157
}
104-
105158
// Avoid emitting transactions that are already emitted if we can guarantee
106159
// blocks containing ancestors are already emitted. The bitcoind rpc interface
107160
// provides us with the block height that the tx is introduced to the mempool.
@@ -112,23 +165,36 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
112165
if is_already_emitted && is_within_height {
113166
return None;
114167
}
115-
116168
let tx = match client.get_raw_transaction(&txid, None) {
117169
Ok(tx) => tx,
118-
// the tx is confirmed or evicted since `get_raw_mempool_verbose`
119-
Err(err) if err.is_not_found_error() => return None,
170+
Err(err) if err.is_not_found_error() => {
171+
// If at tip and the transaction isn't found, mark it as evicted.
172+
if at_tip {
173+
evicted_txids.insert(txid);
174+
}
175+
return None;
176+
}
120177
Err(err) => return Some(Err(err)),
121178
};
122-
123179
Some(Ok((tx, tx_time as u64)))
124180
}
125181
})
126182
.collect::<Result<Vec<_>, _>>()?;
127183

128184
self.last_mempool_time = latest_time;
129185
self.last_mempool_tip = Some(self.last_cp.height());
130-
131-
Ok(txs_to_emit)
186+
self.expected_mempool_txids.extend(
187+
new_txs
188+
.iter()
189+
.map(|(tx, _)| tx.compute_txid())
190+
.collect::<Vec<_>>(),
191+
);
192+
193+
Ok(MempoolEvent {
194+
new_txs,
195+
evicted_txids,
196+
latest_update_time: latest_time as u64,
197+
})
132198
}
133199

134200
/// Emit the next block height and header (if any).
@@ -144,6 +210,27 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
144210
}
145211
}
146212

213+
/// A new emission from mempool.
214+
#[derive(Debug)]
215+
pub struct MempoolEvent {
216+
/// Unemitted transactions or transactions with ancestors that are unseen by the receiver.
217+
///
218+
/// To understand the second condition, consider a receiver which filters transactions based on
219+
/// whether it alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction
220+
/// spends a tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to
221+
/// block of height `h-1`, we want to re-emit this transaction until the receiver has seen the
222+
/// block at height `h`.
223+
pub new_txs: Vec<(Transaction, u64)>,
224+
225+
/// [`Txid`]s of all transactions that have been evicted from mempool.
226+
pub evicted_txids: HashSet<Txid>,
227+
228+
/// The latest timestamp of when a transaction entered the mempool.
229+
///
230+
/// This is useful for setting the timestamp for evicted transactions.
231+
pub latest_update_time: u64,
232+
}
233+
147234
/// A newly emitted block from [`Emitter`].
148235
#[derive(Debug)]
149236
pub struct BlockEvent<B> {

crates/bitcoind_rpc/tests/test_emitter.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::{BTreeMap, BTreeSet};
1+
use std::collections::{BTreeMap, BTreeSet, HashSet};
22

33
use bdk_bitcoind_rpc::Emitter;
44
use bdk_chain::{
@@ -22,7 +22,7 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> {
2222
let env = TestEnv::new()?;
2323
let network_tip = env.rpc_client().get_block_count()?;
2424
let (mut local_chain, _) = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?);
25-
let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0);
25+
let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0, HashSet::new());
2626

2727
// Mine some blocks and return the actual block hashes.
2828
// Because initializing `ElectrsD` already mines some blocks, we must include those too when
@@ -156,7 +156,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
156156
index
157157
});
158158

159-
let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0);
159+
let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, HashSet::new());
160160

161161
while let Some(emission) = emitter.next_block()? {
162162
let height = emission.block_height();
@@ -189,7 +189,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
189189
assert!(emitter.next_block()?.is_none());
190190

191191
let mempool_txs = emitter.mempool()?;
192-
let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs);
192+
let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs.new_txs);
193193
assert_eq!(
194194
indexed_additions
195195
.tx_graph
@@ -252,6 +252,7 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> {
252252
hash: env.rpc_client().get_block_hash(0)?,
253253
}),
254254
EMITTER_START_HEIGHT as _,
255+
HashSet::new(),
255256
);
256257

257258
env.mine_blocks(CHAIN_TIP_HEIGHT, None)?;
@@ -328,6 +329,7 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> {
328329
hash: env.rpc_client().get_block_hash(0)?,
329330
}),
330331
0,
332+
HashSet::new(),
331333
);
332334

333335
// setup addresses
@@ -419,6 +421,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
419421
hash: env.rpc_client().get_block_hash(0)?,
420422
}),
421423
0,
424+
HashSet::new(),
422425
);
423426

424427
// mine blocks and sync up emitter
@@ -437,6 +440,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
437440
// the first emission should include all transactions
438441
let emitted_txids = emitter
439442
.mempool()?
443+
.new_txs
440444
.into_iter()
441445
.map(|(tx, _)| tx.compute_txid())
442446
.collect::<BTreeSet<Txid>>();
@@ -447,7 +451,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
447451

448452
// second emission should be empty
449453
assert!(
450-
emitter.mempool()?.is_empty(),
454+
emitter.mempool()?.new_txs.is_empty(),
451455
"second emission should be empty"
452456
);
453457

@@ -457,7 +461,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
457461
}
458462
while emitter.next_header()?.is_some() {}
459463
assert!(
460-
emitter.mempool()?.is_empty(),
464+
emitter.mempool()?.new_txs.is_empty(),
461465
"third emission, after chain tip is extended, should also be empty"
462466
);
463467

@@ -484,6 +488,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
484488
hash: env.rpc_client().get_block_hash(0)?,
485489
}),
486490
0,
491+
HashSet::new(),
487492
);
488493

489494
// mine blocks to get initial balance, sync emitter up to tip
@@ -506,6 +511,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
506511
assert_eq!(
507512
emitter
508513
.mempool()?
514+
.new_txs
509515
.into_iter()
510516
.map(|(tx, _)| tx.compute_txid())
511517
.collect::<BTreeSet<_>>(),
@@ -515,6 +521,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
515521
assert_eq!(
516522
emitter
517523
.mempool()?
524+
.new_txs
518525
.into_iter()
519526
.map(|(tx, _)| tx.compute_txid())
520527
.collect::<BTreeSet<_>>(),
@@ -535,6 +542,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
535542
.collect::<BTreeSet<_>>();
536543
let emitted_txids = emitter
537544
.mempool()?
545+
.new_txs
538546
.into_iter()
539547
.map(|(tx, _)| tx.compute_txid())
540548
.collect::<BTreeSet<_>>();
@@ -572,6 +580,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
572580
hash: env.rpc_client().get_block_hash(0)?,
573581
}),
574582
0,
583+
HashSet::new(),
575584
);
576585

577586
// mine blocks to get initial balance
@@ -593,6 +602,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
593602
assert_eq!(
594603
emitter
595604
.mempool()?
605+
.new_txs
596606
.into_iter()
597607
.map(|(tx, _)| tx.compute_txid())
598608
.collect::<BTreeSet<_>>(),
@@ -628,6 +638,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
628638
// include mempool txs introduced at reorg height or greater
629639
let mempool = emitter
630640
.mempool()?
641+
.new_txs
631642
.into_iter()
632643
.map(|(tx, _)| tx.compute_txid())
633644
.collect::<BTreeSet<_>>();
@@ -643,6 +654,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
643654

644655
let mempool = emitter
645656
.mempool()?
657+
.new_txs
646658
.into_iter()
647659
.map(|(tx, _)| tx.compute_txid())
648660
.collect::<BTreeSet<_>>();
@@ -696,6 +708,7 @@ fn no_agreement_point() -> anyhow::Result<()> {
696708
hash: env.rpc_client().get_block_hash(0)?,
697709
}),
698710
(PREMINE_COUNT - 2) as u32,
711+
HashSet::new(),
699712
);
700713

701714
// mine 101 blocks
@@ -821,9 +834,7 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> {
821834
assert!(mempool_event.evicted_txids.contains(&txid_1));
822835

823836
// Update graph with evicted tx.
824-
let exp_txids = exp_spk_txids.into_iter().map(|(_, txid)| txid);
825-
let evicted_txids = mempool_event.evicted_txids(exp_txids);
826-
for txid in evicted_txids {
837+
for txid in mempool_event.evicted_txids {
827838
let _ = graph.insert_evicted_at(txid, seen_at);
828839
}
829840

0 commit comments

Comments
 (0)