Skip to content

Commit e1c53d7

Browse files
committed
feat(bitcoind_rpc)!: Reduce friction of Emitter API.
* Change signature of `Emitter::new` so that `expected_mempool_txids` can be more easily constructed from `TxGraph` methods. * Change generic bounds of `C` within `Emitter<C>` to be `C: DeRef, C::Target: RpcApi`. This allows the caller to have `Arc<Client>` as `C` and does not force to caller to hold a lifetimed reference.
1 parent 8b1febc commit e1c53d7

3 files changed

Lines changed: 87 additions & 43 deletions

File tree

crates/bitcoind_rpc/src/lib.rs

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,14 @@
44
//! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`].
55
//!
66
//! To only get block updates (exclude mempool transactions), the caller can use
7-
//! [`Emitter::next_block`] or/and [`Emitter::next_header`] until it returns `Ok(None)` (which means
8-
//! the chain tip is reached). A separate method, [`Emitter::mempool`] can be used to emit the whole
9-
//! mempool.
7+
//! [`Emitter::next_block`] until it returns `Ok(None)` (which means the chain tip is reached). A
8+
//! separate method, [`Emitter::mempool`] can be used to emit the whole mempool.
109
#![warn(missing_docs)]
1110

1211
use bdk_core::{BlockId, CheckPoint};
1312
use bitcoin::{Block, BlockHash, Transaction, Txid};
14-
use bitcoincore_rpc::bitcoincore_rpc_json;
15-
use std::collections::HashSet;
13+
use bitcoincore_rpc::{bitcoincore_rpc_json, RpcApi};
14+
use std::{collections::HashSet, ops::Deref};
1615

1716
pub mod bip158;
1817

@@ -23,8 +22,8 @@ pub use bitcoincore_rpc;
2322
/// Refer to [module-level documentation] for more.
2423
///
2524
/// [module-level documentation]: crate
26-
pub struct Emitter<'c, C> {
27-
client: &'c C,
25+
pub struct Emitter<C> {
26+
client: C,
2827
start_height: u32,
2928

3029
/// The checkpoint of the last-emitted block that is in the best chain. If it is later found
@@ -56,7 +55,17 @@ pub struct Emitter<'c, C> {
5655
expected_mempool_txids: HashSet<Txid>,
5756
}
5857

59-
impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
58+
/// Indicates that there are no initially expected mempool transactions.
59+
///
60+
/// Pass this to the `expected_mempool_txids` field of [`Emitter::new`] when the wallet is known
61+
/// to start empty (i.e. with no unconfirmed transactions).
62+
pub const NO_EXPECTED_MEMPOOL_TXIDS: core::iter::Empty<Txid> = core::iter::empty();
63+
64+
impl<C> Emitter<C>
65+
where
66+
C: Deref,
67+
C::Target: RpcApi,
68+
{
6069
/// Construct a new [`Emitter`].
6170
///
6271
/// `last_cp` informs the emitter of the chain we are starting off with. This way, the emitter
@@ -66,12 +75,13 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
6675
/// original chain).
6776
///
6877
/// `expected_mempool_txids` is the initial set of unconfirmed txids provided by the wallet.
69-
/// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions.
78+
/// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions. If it is
79+
/// known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXIDS`] can be used.
7080
pub fn new(
71-
client: &'c C,
81+
client: C,
7282
last_cp: CheckPoint,
7383
start_height: u32,
74-
expected_mempool_txids: HashSet<Txid>,
84+
expected_mempool_txids: impl IntoIterator<Item = impl Into<Txid>>,
7585
) -> Self {
7686
Self {
7787
client,
@@ -80,7 +90,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
8090
last_block: None,
8191
last_mempool_time: 0,
8292
last_mempool_tip: None,
83-
expected_mempool_txids,
93+
expected_mempool_txids: expected_mempool_txids.into_iter().map(Into::into).collect(),
8494
}
8595
}
8696

@@ -102,7 +112,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
102112
/// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
103113
/// at height `h`.
104114
pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
105-
let client = self.client;
115+
let client = &*self.client;
106116

107117
// This is the emitted tip height during the last mempool emission.
108118
let prev_mempool_tip = self
@@ -204,7 +214,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
204214

205215
/// Emit the next block height and block (if any).
206216
pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block>>, bitcoincore_rpc::Error> {
207-
if let Some((checkpoint, block)) = poll(self, |hash| self.client.get_block(hash))? {
217+
if let Some((checkpoint, block)) = poll(self, move |hash, client| client.get_block(hash))? {
208218
// Stop tracking unconfirmed transactions that have been confirmed in this block.
209219
for tx in &block.txdata {
210220
self.expected_mempool_txids.remove(&tx.compute_txid());
@@ -247,7 +257,7 @@ impl MempoolEvent {
247257
/// A newly emitted block from [`Emitter`].
248258
#[derive(Debug)]
249259
pub struct BlockEvent<B> {
250-
/// Either a full [`Block`] or [`Header`] of the new block.
260+
/// The block.
251261
pub block: B,
252262

253263
/// The checkpoint of the new block.
@@ -299,9 +309,10 @@ enum PollResponse {
299309

300310
fn poll_once<C>(emitter: &Emitter<C>) -> Result<PollResponse, bitcoincore_rpc::Error>
301311
where
302-
C: bitcoincore_rpc::RpcApi,
312+
C: Deref,
313+
C::Target: RpcApi,
303314
{
304-
let client = emitter.client;
315+
let client = &*emitter.client;
305316

306317
if let Some(last_res) = &emitter.last_block {
307318
let next_hash = if last_res.height < emitter.start_height as _ {
@@ -355,15 +366,16 @@ fn poll<C, V, F>(
355366
get_item: F,
356367
) -> Result<Option<(CheckPoint, V)>, bitcoincore_rpc::Error>
357368
where
358-
C: bitcoincore_rpc::RpcApi,
359-
F: Fn(&BlockHash) -> Result<V, bitcoincore_rpc::Error>,
369+
C: Deref,
370+
C::Target: RpcApi,
371+
F: Fn(&BlockHash, &C::Target) -> Result<V, bitcoincore_rpc::Error>,
360372
{
361373
loop {
362374
match poll_once(emitter)? {
363375
PollResponse::Block(res) => {
364376
let height = res.height as u32;
365377
let hash = res.hash;
366-
let item = get_item(&hash)?;
378+
let item = get_item(&hash, &emitter.client)?;
367379

368380
let new_cp = emitter
369381
.last_cp
@@ -432,19 +444,23 @@ impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
432444

433445
#[cfg(test)]
434446
mod test {
435-
use crate::{bitcoincore_rpc::RpcApi, Emitter};
436-
use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoin::Txid;
447+
use crate::{bitcoincore_rpc::RpcApi, Emitter, NO_EXPECTED_MEMPOOL_TXIDS};
437448
use bdk_chain::local_chain::LocalChain;
438449
use bdk_testenv::{anyhow, TestEnv};
439-
use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, WScriptHash};
450+
use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, Txid, WScriptHash};
440451
use std::collections::HashSet;
441452

442453
#[test]
443454
fn test_expected_mempool_txids_accumulate_and_remove() -> anyhow::Result<()> {
444455
let env = TestEnv::new()?;
445456
let chain = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?).0;
446457
let chain_tip = chain.tip();
447-
let mut emitter = Emitter::new(env.rpc_client(), chain_tip.clone(), 1, HashSet::new());
458+
let mut emitter = Emitter::new(
459+
env.rpc_client(),
460+
chain_tip.clone(),
461+
1,
462+
NO_EXPECTED_MEMPOOL_TXIDS,
463+
);
448464

449465
env.mine_blocks(100, None)?;
450466
while emitter.next_block()?.is_some() {}

crates/bitcoind_rpc/tests/test_emitter.rs

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
use std::collections::{BTreeMap, BTreeSet, HashSet};
1+
use std::{
2+
collections::{BTreeMap, BTreeSet, HashSet},
3+
ops::Deref,
4+
};
25

3-
use bdk_bitcoind_rpc::Emitter;
6+
use bdk_bitcoind_rpc::{Emitter, NO_EXPECTED_MEMPOOL_TXIDS};
47
use bdk_chain::{
58
bitcoin::{Address, Amount, Txid},
69
local_chain::{CheckPoint, LocalChain},
@@ -22,7 +25,12 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> {
2225
let env = TestEnv::new()?;
2326
let network_tip = env.rpc_client().get_block_count()?;
2427
let (mut local_chain, _) = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?);
25-
let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0, HashSet::new());
28+
let mut emitter = Emitter::new(
29+
env.rpc_client(),
30+
local_chain.tip(),
31+
0,
32+
NO_EXPECTED_MEMPOOL_TXIDS,
33+
);
2634

2735
// Mine some blocks and return the actual block hashes.
2836
// Because initializing `ElectrsD` already mines some blocks, we must include those too when
@@ -156,7 +164,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
156164
index
157165
});
158166

159-
let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, HashSet::new());
167+
let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, NO_EXPECTED_MEMPOOL_TXIDS);
160168

161169
while let Some(emission) = emitter.next_block()? {
162170
let height = emission.block_height();
@@ -252,7 +260,7 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> {
252260
hash: env.rpc_client().get_block_hash(0)?,
253261
}),
254262
EMITTER_START_HEIGHT as _,
255-
HashSet::new(),
263+
NO_EXPECTED_MEMPOOL_TXIDS,
256264
);
257265

258266
env.mine_blocks(CHAIN_TIP_HEIGHT, None)?;
@@ -292,7 +300,8 @@ fn sync_from_emitter<C>(
292300
emitter: &mut Emitter<C>,
293301
) -> anyhow::Result<()>
294302
where
295-
C: bitcoincore_rpc::RpcApi,
303+
C: Deref,
304+
C::Target: bitcoincore_rpc::RpcApi,
296305
{
297306
while let Some(emission) = emitter.next_block()? {
298307
let height = emission.block_height();
@@ -329,7 +338,7 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> {
329338
hash: env.rpc_client().get_block_hash(0)?,
330339
}),
331340
0,
332-
HashSet::new(),
341+
NO_EXPECTED_MEMPOOL_TXIDS,
333342
);
334343

335344
// setup addresses
@@ -421,7 +430,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
421430
hash: env.rpc_client().get_block_hash(0)?,
422431
}),
423432
0,
424-
HashSet::new(),
433+
NO_EXPECTED_MEMPOOL_TXIDS,
425434
);
426435

427436
// mine blocks and sync up emitter
@@ -488,7 +497,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
488497
hash: env.rpc_client().get_block_hash(0)?,
489498
}),
490499
0,
491-
HashSet::new(),
500+
NO_EXPECTED_MEMPOOL_TXIDS,
492501
);
493502

494503
// mine blocks to get initial balance, sync emitter up to tip
@@ -580,7 +589,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
580589
hash: env.rpc_client().get_block_hash(0)?,
581590
}),
582591
0,
583-
HashSet::new(),
592+
NO_EXPECTED_MEMPOOL_TXIDS,
584593
);
585594

586595
// mine blocks to get initial balance
@@ -708,7 +717,7 @@ fn no_agreement_point() -> anyhow::Result<()> {
708717
hash: env.rpc_client().get_block_hash(0)?,
709718
}),
710719
(PREMINE_COUNT - 2) as u32,
711-
HashSet::new(),
720+
NO_EXPECTED_MEMPOOL_TXIDS,
712721
);
713722

714723
// mine 101 blocks

examples/example_bitcoind_rpc_polling/src/main.rs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use std::{
2-
collections::HashSet,
32
path::PathBuf,
43
sync::{
54
atomic::{AtomicBool, Ordering},
@@ -137,9 +136,20 @@ fn main() -> anyhow::Result<()> {
137136
fallback_height, ..
138137
} = rpc_args;
139138

140-
let chain_tip = chain.lock().unwrap().tip();
141139
let rpc_client = rpc_args.new_client()?;
142-
let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height, HashSet::new());
140+
let mut emitter = {
141+
let chain = chain.lock().unwrap();
142+
let graph = graph.lock().unwrap();
143+
Emitter::new(
144+
&rpc_client,
145+
chain.tip(),
146+
fallback_height,
147+
graph
148+
.graph()
149+
.list_canonical_txs(&*chain, chain.tip().block_id())
150+
.filter(|tx| tx.chain_position.is_unconfirmed()),
151+
)
152+
};
143153
let mut db_stage = ChangeSet::default();
144154

145155
let mut last_db_commit = Instant::now();
@@ -221,18 +231,27 @@ fn main() -> anyhow::Result<()> {
221231
} = rpc_args;
222232
let sigterm_flag = start_ctrlc_handler();
223233

224-
let last_cp = chain.lock().unwrap().tip();
234+
let rpc_client = Arc::new(rpc_args.new_client()?);
235+
let mut emitter = {
236+
let chain = chain.lock().unwrap();
237+
let graph = graph.lock().unwrap();
238+
Emitter::new(
239+
rpc_client.clone(),
240+
chain.tip(),
241+
fallback_height,
242+
graph
243+
.graph()
244+
.list_canonical_txs(&*chain, chain.tip().block_id())
245+
.filter(|tx| tx.chain_position.is_unconfirmed()),
246+
)
247+
};
225248

226249
println!(
227250
"[{:>10}s] starting emitter thread...",
228251
start.elapsed().as_secs_f32()
229252
);
230253
let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND);
231254
let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> {
232-
let rpc_client = rpc_args.new_client()?;
233-
let mut emitter =
234-
Emitter::new(&rpc_client, last_cp, fallback_height, HashSet::new());
235-
236255
let mut block_count = rpc_client.get_block_count()? as u32;
237256
tx.send(Emission::Tip(block_count))?;
238257

0 commit comments

Comments
 (0)