Skip to content

Commit 9e00579

Browse files
committed
refactor(rpc)!: update mempool interface and test code
1 parent e4894f0 commit 9e00579

File tree

3 files changed

+127
-34
lines changed

3 files changed

+127
-34
lines changed

crates/bitcoind_rpc/src/lib.rs

Lines changed: 93 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@
1010
#![warn(missing_docs)]
1111

1212
use bdk_core::{BlockId, CheckPoint};
13-
use bitcoin::{block::Header, Block, BlockHash, Transaction};
13+
use bitcoin::{block::Header, Block, BlockHash, Transaction, Txid};
1414
use bitcoincore_rpc::bitcoincore_rpc_json;
15+
use std::collections::HashSet;
1516

1617
pub mod bip158;
1718

@@ -43,6 +44,9 @@ pub struct Emitter<'c, C> {
4344
/// The last emitted block during our last mempool emission. This is used to determine whether
4445
/// there has been a reorg since our last mempool emission.
4546
last_mempool_tip: Option<u32>,
47+
48+
/// Expected mempool txs. TODO: Docs.
49+
expected_mempool_txids: HashSet<Txid>,
4650
}
4751

4852
impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
@@ -53,28 +57,36 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
5357
///
5458
/// `start_height` starts emission from a given height (if there are no conflicts with the
5559
/// original chain).
56-
pub fn new(client: &'c C, last_cp: CheckPoint, start_height: u32) -> Self {
60+
pub fn new(
61+
client: &'c C,
62+
last_cp: CheckPoint,
63+
start_height: u32,
64+
expected_mempool_txids: HashSet<Txid>,
65+
) -> Self {
5766
Self {
5867
client,
5968
start_height,
6069
last_cp,
6170
last_block: None,
6271
last_mempool_time: 0,
6372
last_mempool_tip: None,
73+
expected_mempool_txids,
6474
}
6575
}
6676

67-
/// Emit mempool transactions, alongside their first-seen unix timestamps.
77+
/// Emit mempool transactions and capture the initial snapshot of all mempool [`Txid`]s.
6878
///
69-
/// This method emits each transaction only once, unless we cannot guarantee the transaction's
70-
/// ancestors are already emitted.
79+
/// This method returns a [`MempoolEvent`] containing the full transactions (with their
80+
/// first-seen unix timestamps) that were emitted, and the set of all [`Txid`]s present from the
81+
/// initial mempool query. Each transaction is emitted only once, unless we cannot guarantee the
82+
/// transaction's ancestors are already emitted.
7183
///
7284
/// To understand why, consider a receiver which filters transactions based on whether it
7385
/// alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction spends a
7486
/// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block
7587
/// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
7688
/// at height `h`.
77-
pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
89+
pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
7890
let client = self.client;
7991

8092
// This is the emitted tip height during the last mempool emission.
@@ -84,24 +96,46 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
8496
// `start_height` has been emitted.
8597
.unwrap_or(self.start_height.saturating_sub(1));
8698

99+
// Get the raw mempool result from the RPC client.
100+
let raw_mempool = client.get_raw_mempool_verbose()?;
101+
let raw_mempool_txids: HashSet<Txid> = raw_mempool.keys().copied().collect();
102+
103+
// Determine if height matches last emitted block.
104+
let at_tip = client.get_block_count()? == self.last_cp.height() as u64;
105+
106+
// Clear out `expected_mempool_txids` if we are on a different block.
107+
if prev_mempool_tip != self.last_cp.height() {
108+
self.expected_mempool_txids.clear();
109+
}
110+
111+
// If at tip, any expected txid missing from raw mempool is considered evicted;
112+
// if not at tip, we don't evict anything.
113+
let mut evicted_txids: HashSet<Txid> = if at_tip {
114+
self.expected_mempool_txids
115+
.difference(&raw_mempool_txids)
116+
.copied()
117+
.collect()
118+
} else {
119+
HashSet::new()
120+
};
121+
87122
// Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
88123
// track of the latest mempool tx's timestamp to determine whether we have seen a tx
89124
// before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
90125
// be the new latest timestamp.
91126
let prev_mempool_time = self.last_mempool_time;
92127
let mut latest_time = prev_mempool_time;
93128

94-
let txs_to_emit = client
95-
.get_raw_mempool_verbose()?
129+
let new_txs = raw_mempool
96130
.into_iter()
97131
.filter_map({
98132
let latest_time = &mut latest_time;
133+
let evicted_txids = &mut evicted_txids;
99134
move |(txid, tx_entry)| -> Option<Result<_, bitcoincore_rpc::Error>> {
100135
let tx_time = tx_entry.time as usize;
101136
if tx_time > *latest_time {
102137
*latest_time = tx_time;
103138
}
104-
105139
// Avoid emitting transactions that are already emitted if we can guarantee
106140
// blocks containing ancestors are already emitted. The bitcoind rpc interface
107141
// provides us with the block height that the tx is introduced to the mempool.
@@ -112,23 +146,40 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
112146
if is_already_emitted && is_within_height {
113147
return None;
114148
}
115-
116149
let tx = match client.get_raw_transaction(&txid, None) {
117150
Ok(tx) => tx,
118-
// the tx is confirmed or evicted since `get_raw_mempool_verbose`
119-
Err(err) if err.is_not_found_error() => return None,
151+
Err(err) if err.is_not_found_error() => {
152+
// If at tip and the transaction isn't found, mark it as evicted.
153+
if at_tip {
154+
evicted_txids.insert(txid);
155+
}
156+
return None;
157+
}
120158
Err(err) => return Some(Err(err)),
121159
};
122-
123160
Some(Ok((tx, tx_time as u64)))
124161
}
125162
})
126163
.collect::<Result<Vec<_>, _>>()?;
127164

128165
self.last_mempool_time = latest_time;
129166
self.last_mempool_tip = Some(self.last_cp.height());
167+
if at_tip {
168+
self.expected_mempool_txids = new_txs.iter().map(|(tx, _)| tx.compute_txid()).collect();
169+
} else {
170+
self.expected_mempool_txids.extend(
171+
new_txs
172+
.iter()
173+
.map(|(tx, _)| tx.compute_txid())
174+
.collect::<Vec<_>>(),
175+
);
176+
}
130177

131-
Ok(txs_to_emit)
178+
Ok(MempoolEvent {
179+
new_txs,
180+
evicted_txids,
181+
latest_update_time: latest_time as u64,
182+
})
132183
}
133184

134185
/// Emit the next block height and header (if any).
@@ -139,11 +190,37 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
139190

140191
/// Emit the next block height and block (if any).
141192
pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block>>, bitcoincore_rpc::Error> {
142-
Ok(poll(self, |hash| self.client.get_block(hash))?
143-
.map(|(checkpoint, block)| BlockEvent { block, checkpoint }))
193+
if let Some((checkpoint, block)) = poll(self, |hash| self.client.get_block(hash))? {
194+
for tx in &block.txdata {
195+
self.expected_mempool_txids.remove(&tx.compute_txid());
196+
}
197+
return Ok(Some(BlockEvent { block, checkpoint }));
198+
}
199+
Ok(None)
144200
}
145201
}
146202

203+
/// A new emission from mempool.
204+
#[derive(Debug)]
205+
pub struct MempoolEvent {
206+
/// Unemitted transactions or transactions with ancestors that are unseen by the receiver.
207+
///
208+
/// To understand the second condition, consider a receiver which filters transactions based on
209+
/// whether it alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction
210+
/// spends a tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to
211+
/// block of height `h-1`, we want to re-emit this transaction until the receiver has seen the
212+
/// block at height `h`.
213+
pub new_txs: Vec<(Transaction, u64)>,
214+
215+
/// [`Txid`]s of all transactions that have been evicted from mempool.
216+
pub evicted_txids: HashSet<Txid>,
217+
218+
/// The latest timestamp of when a transaction entered the mempool.
219+
///
220+
/// This is useful for setting the timestamp for evicted transactions.
221+
pub latest_update_time: u64,
222+
}
223+
147224
/// A newly emitted block from [`Emitter`].
148225
#[derive(Debug)]
149226
pub struct BlockEvent<B> {

crates/bitcoind_rpc/tests/test_emitter.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::{BTreeMap, BTreeSet};
1+
use std::collections::{BTreeMap, BTreeSet, HashSet};
22

33
use bdk_bitcoind_rpc::Emitter;
44
use bdk_chain::{
@@ -22,7 +22,7 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> {
2222
let env = TestEnv::new()?;
2323
let network_tip = env.rpc_client().get_block_count()?;
2424
let (mut local_chain, _) = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?);
25-
let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0);
25+
let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0, HashSet::new());
2626

2727
// Mine some blocks and return the actual block hashes.
2828
// Because initializing `ElectrsD` already mines some blocks, we must include those too when
@@ -156,7 +156,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
156156
index
157157
});
158158

159-
let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0);
159+
let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, HashSet::new());
160160

161161
while let Some(emission) = emitter.next_block()? {
162162
let height = emission.block_height();
@@ -189,7 +189,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
189189
assert!(emitter.next_block()?.is_none());
190190

191191
let mempool_txs = emitter.mempool()?;
192-
let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs);
192+
let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs.new_txs);
193193
assert_eq!(
194194
indexed_additions
195195
.tx_graph
@@ -252,6 +252,7 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> {
252252
hash: env.rpc_client().get_block_hash(0)?,
253253
}),
254254
EMITTER_START_HEIGHT as _,
255+
HashSet::new(),
255256
);
256257

257258
env.mine_blocks(CHAIN_TIP_HEIGHT, None)?;
@@ -328,6 +329,7 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> {
328329
hash: env.rpc_client().get_block_hash(0)?,
329330
}),
330331
0,
332+
HashSet::new(),
331333
);
332334

333335
// setup addresses
@@ -419,6 +421,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
419421
hash: env.rpc_client().get_block_hash(0)?,
420422
}),
421423
0,
424+
HashSet::new(),
422425
);
423426

424427
// mine blocks and sync up emitter
@@ -437,6 +440,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
437440
// the first emission should include all transactions
438441
let emitted_txids = emitter
439442
.mempool()?
443+
.new_txs
440444
.into_iter()
441445
.map(|(tx, _)| tx.compute_txid())
442446
.collect::<BTreeSet<Txid>>();
@@ -447,7 +451,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
447451

448452
// second emission should be empty
449453
assert!(
450-
emitter.mempool()?.is_empty(),
454+
emitter.mempool()?.new_txs.is_empty(),
451455
"second emission should be empty"
452456
);
453457

@@ -457,7 +461,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
457461
}
458462
while emitter.next_header()?.is_some() {}
459463
assert!(
460-
emitter.mempool()?.is_empty(),
464+
emitter.mempool()?.new_txs.is_empty(),
461465
"third emission, after chain tip is extended, should also be empty"
462466
);
463467

@@ -484,6 +488,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
484488
hash: env.rpc_client().get_block_hash(0)?,
485489
}),
486490
0,
491+
HashSet::new(),
487492
);
488493

489494
// mine blocks to get initial balance, sync emitter up to tip
@@ -506,6 +511,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
506511
assert_eq!(
507512
emitter
508513
.mempool()?
514+
.new_txs
509515
.into_iter()
510516
.map(|(tx, _)| tx.compute_txid())
511517
.collect::<BTreeSet<_>>(),
@@ -515,6 +521,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
515521
assert_eq!(
516522
emitter
517523
.mempool()?
524+
.new_txs
518525
.into_iter()
519526
.map(|(tx, _)| tx.compute_txid())
520527
.collect::<BTreeSet<_>>(),
@@ -535,6 +542,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
535542
.collect::<BTreeSet<_>>();
536543
let emitted_txids = emitter
537544
.mempool()?
545+
.new_txs
538546
.into_iter()
539547
.map(|(tx, _)| tx.compute_txid())
540548
.collect::<BTreeSet<_>>();
@@ -572,6 +580,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
572580
hash: env.rpc_client().get_block_hash(0)?,
573581
}),
574582
0,
583+
HashSet::new(),
575584
);
576585

577586
// mine blocks to get initial balance
@@ -593,6 +602,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
593602
assert_eq!(
594603
emitter
595604
.mempool()?
605+
.new_txs
596606
.into_iter()
597607
.map(|(tx, _)| tx.compute_txid())
598608
.collect::<BTreeSet<_>>(),
@@ -628,6 +638,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
628638
// include mempool txs introduced at reorg height or greater
629639
let mempool = emitter
630640
.mempool()?
641+
.new_txs
631642
.into_iter()
632643
.map(|(tx, _)| tx.compute_txid())
633644
.collect::<BTreeSet<_>>();
@@ -643,6 +654,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
643654

644655
let mempool = emitter
645656
.mempool()?
657+
.new_txs
646658
.into_iter()
647659
.map(|(tx, _)| tx.compute_txid())
648660
.collect::<BTreeSet<_>>();
@@ -696,6 +708,7 @@ fn no_agreement_point() -> anyhow::Result<()> {
696708
hash: env.rpc_client().get_block_hash(0)?,
697709
}),
698710
(PREMINE_COUNT - 2) as u32,
711+
HashSet::new(),
699712
);
700713

701714
// mine 101 blocks
@@ -821,9 +834,7 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> {
821834
assert!(mempool_event.evicted_txids.contains(&txid_1));
822835

823836
// Update graph with evicted tx.
824-
let exp_txids = exp_spk_txids.into_iter().map(|(_, txid)| txid);
825-
let evicted_txids = mempool_event.evicted_txids(exp_txids);
826-
for txid in evicted_txids {
837+
for txid in mempool_event.evicted_txids {
827838
let _ = graph.insert_evicted_at(txid, seen_at);
828839
}
829840

0 commit comments

Comments
 (0)