Skip to content

Commit 0c726fb

Browse files
committed
refactor(rpc)!: update mempool interface and test code
1 parent e2055b3 commit 0c726fb

File tree

3 files changed

+124
-28
lines changed

3 files changed

+124
-28
lines changed

crates/bitcoind_rpc/src/lib.rs

Lines changed: 87 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@
1010
#![warn(missing_docs)]
1111

1212
use bdk_core::{BlockId, CheckPoint};
13-
use bitcoin::{block::Header, Block, BlockHash, Transaction};
13+
use bitcoin::{block::Header, Block, BlockHash, Transaction, Txid};
1414
use bitcoincore_rpc::bitcoincore_rpc_json;
15+
use std::collections::HashSet;
1516

1617
pub mod bip158;
1718

@@ -73,6 +74,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
7374
last_block: None,
7475
last_mempool_time: 0,
7576
last_mempool_tip: None,
77+
expected_mempool_txids,
7678
}
7779
}
7880

@@ -93,7 +95,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
9395
/// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block
9496
/// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
9597
/// at height `h`.
96-
pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
98+
pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
9799
let client = self.client;
98100

99101
// This is the emitted tip height during the last mempool emission.
@@ -136,17 +138,16 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
136138
let prev_mempool_time = self.last_mempool_time;
137139
let mut latest_time = prev_mempool_time;
138140

139-
let txs_to_emit = client
140-
.get_raw_mempool_verbose()?
141+
let new_txs = raw_mempool
141142
.into_iter()
142143
.filter_map({
143144
let latest_time = &mut latest_time;
145+
let evicted_txids = &mut evicted_txids;
144146
move |(txid, tx_entry)| -> Option<Result<_, bitcoincore_rpc::Error>> {
145147
let tx_time = tx_entry.time as usize;
146148
if tx_time > *latest_time {
147149
*latest_time = tx_time;
148150
}
149-
150151
// Avoid emitting transactions that are already emitted if we can guarantee
151152
// blocks containing ancestors are already emitted. The bitcoind rpc interface
152153
// provides us with the block height that the tx is introduced to the mempool.
@@ -157,14 +158,17 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
157158
if is_already_emitted && is_within_height {
158159
return None;
159160
}
160-
161161
let tx = match client.get_raw_transaction(&txid, None) {
162162
Ok(tx) => tx,
163-
// the tx is confirmed or evicted since `get_raw_mempool_verbose`
164-
Err(err) if err.is_not_found_error() => return None,
163+
Err(err) if err.is_not_found_error() => {
164+
// If at tip and the transaction isn't found, mark it as evicted.
165+
if at_tip {
166+
evicted_txids.insert(txid);
167+
}
168+
return None;
169+
}
165170
Err(err) => return Some(Err(err)),
166171
};
167-
168172
Some(Ok((tx, tx_time as u64)))
169173
}
170174
})
@@ -414,3 +418,77 @@ impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
414418
}
415419
}
416420
}
421+
422+
#[cfg(test)]
423+
mod test {
424+
use crate::{bitcoincore_rpc::RpcApi, Emitter};
425+
use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoin::Txid;
426+
use bdk_chain::local_chain::LocalChain;
427+
use bdk_testenv::{anyhow, TestEnv};
428+
use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, WScriptHash};
429+
use std::collections::HashSet;
430+
431+
#[test]
432+
fn test_expected_mempool_txids_accumulate_and_remove() -> anyhow::Result<()> {
433+
let env = TestEnv::new()?;
434+
let chain = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?).0;
435+
let chain_tip = chain.tip();
436+
let mut emitter = Emitter::new(env.rpc_client(), chain_tip.clone(), 1, HashSet::new());
437+
438+
env.mine_blocks(100, None)?;
439+
while emitter.next_block()?.is_some() {}
440+
441+
let spk_to_track = ScriptBuf::new_p2wsh(&WScriptHash::all_zeros());
442+
let addr_to_track = Address::from_script(&spk_to_track, bitcoin::Network::Regtest)?;
443+
let mut mempool_txids = HashSet::new();
444+
445+
// Send a tx at different heights and ensure txs are accumulating in expected_mempool_txids.
446+
for _ in 0..10 {
447+
let sent_txid = env.send(&addr_to_track, Amount::from_sat(1_000))?;
448+
mempool_txids.insert(sent_txid);
449+
emitter.mempool()?;
450+
env.mine_blocks(1, None)?;
451+
452+
for txid in &mempool_txids {
453+
assert!(
454+
emitter.expected_mempool_txids.contains(txid),
455+
"Expected txid {:?} missing",
456+
txid
457+
);
458+
}
459+
}
460+
461+
// Process each block and check that confirmed txids are removed from from
462+
// expected_mempool_txids.
463+
while let Some(block_event) = emitter.next_block()? {
464+
let confirmed_txids: HashSet<Txid> = block_event
465+
.block
466+
.txdata
467+
.iter()
468+
.map(|tx| tx.compute_txid())
469+
.collect();
470+
mempool_txids = mempool_txids
471+
.difference(&confirmed_txids)
472+
.copied()
473+
.collect::<HashSet<_>>();
474+
for txid in confirmed_txids {
475+
assert!(
476+
!emitter.expected_mempool_txids.contains(&txid),
477+
"Expected txid {:?} should have been removed",
478+
txid
479+
);
480+
}
481+
for txid in &mempool_txids {
482+
assert!(
483+
emitter.expected_mempool_txids.contains(txid),
484+
"Expected txid {:?} missing",
485+
txid
486+
);
487+
}
488+
}
489+
490+
assert!(emitter.expected_mempool_txids.is_empty());
491+
492+
Ok(())
493+
}
494+
}

crates/bitcoind_rpc/tests/test_emitter.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::{BTreeMap, BTreeSet};
1+
use std::collections::{BTreeMap, BTreeSet, HashSet};
22

33
use bdk_bitcoind_rpc::Emitter;
44
use bdk_chain::{
@@ -22,7 +22,7 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> {
2222
let env = TestEnv::new()?;
2323
let network_tip = env.rpc_client().get_block_count()?;
2424
let (mut local_chain, _) = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?);
25-
let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0);
25+
let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0, HashSet::new());
2626

2727
// Mine some blocks and return the actual block hashes.
2828
// Because initializing `ElectrsD` already mines some blocks, we must include those too when
@@ -156,7 +156,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
156156
index
157157
});
158158

159-
let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0);
159+
let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, HashSet::new());
160160

161161
while let Some(emission) = emitter.next_block()? {
162162
let height = emission.block_height();
@@ -189,7 +189,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
189189
assert!(emitter.next_block()?.is_none());
190190

191191
let mempool_txs = emitter.mempool()?;
192-
let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs);
192+
let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs.new_txs);
193193
assert_eq!(
194194
indexed_additions
195195
.tx_graph
@@ -252,6 +252,7 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> {
252252
hash: env.rpc_client().get_block_hash(0)?,
253253
}),
254254
EMITTER_START_HEIGHT as _,
255+
HashSet::new(),
255256
);
256257

257258
env.mine_blocks(CHAIN_TIP_HEIGHT, None)?;
@@ -328,6 +329,7 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> {
328329
hash: env.rpc_client().get_block_hash(0)?,
329330
}),
330331
0,
332+
HashSet::new(),
331333
);
332334

333335
// setup addresses
@@ -419,6 +421,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
419421
hash: env.rpc_client().get_block_hash(0)?,
420422
}),
421423
0,
424+
HashSet::new(),
422425
);
423426

424427
// mine blocks and sync up emitter
@@ -437,6 +440,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
437440
// the first emission should include all transactions
438441
let emitted_txids = emitter
439442
.mempool()?
443+
.new_txs
440444
.into_iter()
441445
.map(|(tx, _)| tx.compute_txid())
442446
.collect::<BTreeSet<Txid>>();
@@ -447,7 +451,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
447451

448452
// second emission should be empty
449453
assert!(
450-
emitter.mempool()?.is_empty(),
454+
emitter.mempool()?.new_txs.is_empty(),
451455
"second emission should be empty"
452456
);
453457

@@ -457,7 +461,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
457461
}
458462
while emitter.next_header()?.is_some() {}
459463
assert!(
460-
emitter.mempool()?.is_empty(),
464+
emitter.mempool()?.new_txs.is_empty(),
461465
"third emission, after chain tip is extended, should also be empty"
462466
);
463467

@@ -484,6 +488,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
484488
hash: env.rpc_client().get_block_hash(0)?,
485489
}),
486490
0,
491+
HashSet::new(),
487492
);
488493

489494
// mine blocks to get initial balance, sync emitter up to tip
@@ -506,6 +511,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
506511
assert_eq!(
507512
emitter
508513
.mempool()?
514+
.new_txs
509515
.into_iter()
510516
.map(|(tx, _)| tx.compute_txid())
511517
.collect::<BTreeSet<_>>(),
@@ -515,6 +521,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
515521
assert_eq!(
516522
emitter
517523
.mempool()?
524+
.new_txs
518525
.into_iter()
519526
.map(|(tx, _)| tx.compute_txid())
520527
.collect::<BTreeSet<_>>(),
@@ -535,6 +542,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
535542
.collect::<BTreeSet<_>>();
536543
let emitted_txids = emitter
537544
.mempool()?
545+
.new_txs
538546
.into_iter()
539547
.map(|(tx, _)| tx.compute_txid())
540548
.collect::<BTreeSet<_>>();
@@ -572,6 +580,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
572580
hash: env.rpc_client().get_block_hash(0)?,
573581
}),
574582
0,
583+
HashSet::new(),
575584
);
576585

577586
// mine blocks to get initial balance
@@ -593,6 +602,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
593602
assert_eq!(
594603
emitter
595604
.mempool()?
605+
.new_txs
596606
.into_iter()
597607
.map(|(tx, _)| tx.compute_txid())
598608
.collect::<BTreeSet<_>>(),
@@ -628,6 +638,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
628638
// include mempool txs introduced at reorg height or greater
629639
let mempool = emitter
630640
.mempool()?
641+
.new_txs
631642
.into_iter()
632643
.map(|(tx, _)| tx.compute_txid())
633644
.collect::<BTreeSet<_>>();
@@ -643,6 +654,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
643654

644655
let mempool = emitter
645656
.mempool()?
657+
.new_txs
646658
.into_iter()
647659
.map(|(tx, _)| tx.compute_txid())
648660
.collect::<BTreeSet<_>>();
@@ -696,6 +708,7 @@ fn no_agreement_point() -> anyhow::Result<()> {
696708
hash: env.rpc_client().get_block_hash(0)?,
697709
}),
698710
(PREMINE_COUNT - 2) as u32,
711+
HashSet::new(),
699712
);
700713

701714
// mine 101 blocks
@@ -821,10 +834,10 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> {
821834
assert!(mempool_event.evicted_txids.contains(&txid_1));
822835

823836
// Update graph with evicted tx.
824-
let exp_txids = exp_spk_txids.into_iter().map(|(_, txid)| txid);
825-
let evicted_txids = mempool_event.evicted_txids(exp_txids);
826-
for txid in evicted_txids {
827-
let _ = graph.insert_evicted_at(txid, seen_at);
837+
for txid in mempool_event.evicted_txids {
838+
if graph.graph().get_tx_node(txid).is_some() {
839+
let _ = graph.insert_evicted_at(txid, seen_at);
840+
}
828841
}
829842

830843
let canonical_txids = graph

examples/example_bitcoind_rpc_polling/src/main.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::{
2+
collections::HashSet,
23
path::PathBuf,
34
sync::{
45
atomic::{AtomicBool, Ordering},
@@ -11,10 +12,7 @@ use bdk_bitcoind_rpc::{
1112
bitcoincore_rpc::{Auth, Client, RpcApi},
1213
Emitter,
1314
};
14-
use bdk_chain::{
15-
bitcoin::{Block, Transaction},
16-
local_chain, Merge,
17-
};
15+
use bdk_chain::{bitcoin::Block, local_chain, Merge};
1816
use example_cli::{
1917
anyhow,
2018
clap::{self, Args, Subcommand},
@@ -36,7 +34,7 @@ const DB_COMMIT_DELAY: Duration = Duration::from_secs(60);
3634
#[derive(Debug)]
3735
enum Emission {
3836
Block(bdk_bitcoind_rpc::BlockEvent<Block>),
39-
Mempool(Vec<(Transaction, u64)>),
37+
Mempool(bdk_bitcoind_rpc::MempoolEvent),
4038
Tip(u32),
4139
}
4240

@@ -141,7 +139,7 @@ fn main() -> anyhow::Result<()> {
141139

142140
let chain_tip = chain.lock().unwrap().tip();
143141
let rpc_client = rpc_args.new_client()?;
144-
let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height);
142+
let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height, HashSet::new());
145143
let mut db_stage = ChangeSet::default();
146144

147145
let mut last_db_commit = Instant::now();
@@ -204,7 +202,7 @@ fn main() -> anyhow::Result<()> {
204202
let graph_changeset = graph
205203
.lock()
206204
.unwrap()
207-
.batch_insert_relevant_unconfirmed(mempool_txs);
205+
.batch_insert_relevant_unconfirmed(mempool_txs.new_txs);
208206
{
209207
let db = &mut *db.lock().unwrap();
210208
db_stage.merge(ChangeSet {
@@ -232,7 +230,8 @@ fn main() -> anyhow::Result<()> {
232230
let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND);
233231
let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> {
234232
let rpc_client = rpc_args.new_client()?;
235-
let mut emitter = Emitter::new(&rpc_client, last_cp, fallback_height);
233+
let mut emitter =
234+
Emitter::new(&rpc_client, last_cp, fallback_height, HashSet::new());
236235

237236
let mut block_count = rpc_client.get_block_count()? as u32;
238237
tx.send(Emission::Tip(block_count))?;
@@ -287,7 +286,13 @@ fn main() -> anyhow::Result<()> {
287286
(chain_changeset, graph_changeset)
288287
}
289288
Emission::Mempool(mempool_txs) => {
290-
let graph_changeset = graph.batch_insert_relevant_unconfirmed(mempool_txs);
289+
let mut graph_changeset =
290+
graph.batch_insert_relevant_unconfirmed(mempool_txs.new_txs.clone());
291+
for txid in mempool_txs.evicted_txids {
292+
graph_changeset.merge(
293+
graph.insert_evicted_at(txid, mempool_txs.latest_update_time),
294+
);
295+
}
291296
(local_chain::ChangeSet::default(), graph_changeset)
292297
}
293298
Emission::Tip(h) => {

0 commit comments

Comments
 (0)