diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index 3fa17ef19..dc35d5c12 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -4,14 +4,14 @@ //! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`]. //! //! To only get block updates (exclude mempool transactions), the caller can use -//! [`Emitter::next_block`] or/and [`Emitter::next_header`] until it returns `Ok(None)` (which means -//! the chain tip is reached). A separate method, [`Emitter::mempool`] can be used to emit the whole -//! mempool. +//! [`Emitter::next_block`] until it returns `Ok(None)` (which means the chain tip is reached). A +//! separate method, [`Emitter::mempool`] can be used to emit the whole mempool. #![warn(missing_docs)] use bdk_core::{BlockId, CheckPoint}; -use bitcoin::{block::Header, Block, BlockHash, Transaction}; -use bitcoincore_rpc::bitcoincore_rpc_json; +use bitcoin::{Block, BlockHash, Transaction, Txid}; +use bitcoincore_rpc::{bitcoincore_rpc_json, RpcApi}; +use std::{collections::HashSet, ops::Deref}; pub mod bip158; @@ -22,8 +22,8 @@ pub use bitcoincore_rpc; /// Refer to [module-level documentation] for more. /// /// [module-level documentation]: crate -pub struct Emitter<'c, C> { - client: &'c C, +pub struct Emitter { + client: C, start_height: u32, /// The checkpoint of the last-emitted block that is in the best chain. If it is later found @@ -43,9 +43,29 @@ pub struct Emitter<'c, C> { /// The last emitted block during our last mempool emission. This is used to determine whether /// there has been a reorg since our last mempool emission. last_mempool_tip: Option, + + /// A set of txids currently assumed to still be in the mempool. + /// + /// This is used to detect mempool evictions by comparing the set against the latest mempool + /// snapshot from bitcoind. Any txid in this set that is missing from the snapshot is considered + /// evicted. + /// + /// When the emitter emits a block, confirmed txids are removed from this set. This prevents + /// confirmed transactions from being mistakenly marked with an `evicted_at` timestamp. + expected_mempool_txids: HashSet, } -impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { +/// Indicates that there are no initially expected mempool transactions. +/// +/// Pass this to the `expected_mempool_txids` field of [`Emitter::new`] when the wallet is known +/// to start empty (i.e. with no unconfirmed transactions). +pub const NO_EXPECTED_MEMPOOL_TXIDS: core::iter::Empty = core::iter::empty(); + +impl Emitter +where + C: Deref, + C::Target: RpcApi, +{ /// Construct a new [`Emitter`]. /// /// `last_cp` informs the emitter of the chain we are starting off with. This way, the emitter @@ -53,7 +73,16 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { /// /// `start_height` starts emission from a given height (if there are no conflicts with the /// original chain). - pub fn new(client: &'c C, last_cp: CheckPoint, start_height: u32) -> Self { + /// + /// `expected_mempool_txids` is the initial set of unconfirmed txids provided by the wallet. + /// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions. If it is + /// known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXIDS`] can be used. + pub fn new( + client: C, + last_cp: CheckPoint, + start_height: u32, + expected_mempool_txids: impl IntoIterator>, + ) -> Self { Self { client, start_height, @@ -61,10 +90,18 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { last_block: None, last_mempool_time: 0, last_mempool_tip: None, + expected_mempool_txids: expected_mempool_txids.into_iter().map(Into::into).collect(), } } - /// Emit mempool transactions, alongside their first-seen unix timestamps. + /// Emit mempool transactions and any evicted [`Txid`]s. + /// + /// This method returns a [`MempoolEvent`] containing the full transactions (with their + /// first-seen unix timestamps) that were emitted, and [`MempoolEvent::evicted_txids`] which are + /// any [`Txid`]s which were previously seen in the mempool and are now missing. Evicted txids + /// are only reported once the emitter’s checkpoint matches the RPC’s best block in both height + /// and hash. Until `next_block()` advances the checkpoint to tip, `mempool()` will always + /// return an empty `evicted_txids` set. /// /// This method emits each transaction only once, unless we cannot guarantee the transaction's /// ancestors are already emitted. @@ -74,8 +111,8 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { /// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block /// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block /// at height `h`. - pub fn mempool(&mut self) -> Result, bitcoincore_rpc::Error> { - let client = self.client; + pub fn mempool(&mut self) -> Result { + let client = &*self.client; // This is the emitted tip height during the last mempool emission. let prev_mempool_tip = self @@ -84,6 +121,38 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { // `start_height` has been emitted. .unwrap_or(self.start_height.saturating_sub(1)); + // Loop to make sure that the fetched mempool content and the fetched tip are consistent + // with one another. + let (raw_mempool, raw_mempool_txids, rpc_height, rpc_block_hash) = loop { + // Determine if height and hash matches the best block from the RPC. Evictions are deferred + // if we are not at the best block. + let height = client.get_block_count()?; + let hash = client.get_block_hash(height)?; + + // Get the raw mempool result from the RPC client which will be used to determine if any + // transactions have been evicted. + let mp = client.get_raw_mempool_verbose()?; + let mp_txids: HashSet = mp.keys().copied().collect(); + + if height == client.get_block_count()? && hash == client.get_block_hash(height)? { + break (mp, mp_txids, height, hash); + } + }; + + let at_tip = + rpc_height == self.last_cp.height() as u64 && rpc_block_hash == self.last_cp.hash(); + + // If at tip, any expected txid missing from raw mempool is considered evicted; + // if not at tip, we don't evict anything. + let evicted_txids: HashSet = if at_tip { + self.expected_mempool_txids + .difference(&raw_mempool_txids) + .copied() + .collect() + } else { + HashSet::new() + }; + // Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep // track of the latest mempool tx's timestamp to determine whether we have seen a tx // before. `prev_mempool_time` is the previous timestamp and `last_time` records what will @@ -91,8 +160,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { let prev_mempool_time = self.last_mempool_time; let mut latest_time = prev_mempool_time; - let txs_to_emit = client - .get_raw_mempool_verbose()? + let new_txs = raw_mempool .into_iter() .filter_map({ let latest_time = &mut latest_time; @@ -101,25 +169,25 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { if tx_time > *latest_time { *latest_time = tx_time; } - - // Avoid emitting transactions that are already emitted if we can guarantee - // blocks containing ancestors are already emitted. The bitcoind rpc interface - // provides us with the block height that the tx is introduced to the mempool. - // If we have already emitted the block of height, we can assume that all - // ancestor txs have been processed by the receiver. + // Best-effort check to avoid re-emitting transactions we've already emitted. + // + // Complete suppression isn't possible, since a transaction may spend outputs + // owned by the wallet. To determine if such a transaction is relevant, we must + // have already seen its ancestor(s) that contain the spent prevouts. + // + // Fortunately, bitcoind provides the block height at which the transaction + // entered the mempool. If we've already emitted that block height, we can + // reasonably assume the receiver has seen all ancestor transactions. let is_already_emitted = tx_time <= prev_mempool_time; let is_within_height = tx_entry.height <= prev_mempool_tip as _; if is_already_emitted && is_within_height { return None; } - let tx = match client.get_raw_transaction(&txid, None) { Ok(tx) => tx, - // the tx is confirmed or evicted since `get_raw_mempool_verbose` Err(err) if err.is_not_found_error() => return None, Err(err) => return Some(Err(err)), }; - Some(Ok((tx, tx_time as u64))) } }) @@ -128,26 +196,68 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { self.last_mempool_time = latest_time; self.last_mempool_tip = Some(self.last_cp.height()); - Ok(txs_to_emit) - } + // If at tip, we replace `expected_mempool_txids` with just the new txids. Otherwise, we’re + // still catching up to the tip and keep accumulating. + if at_tip { + self.expected_mempool_txids = new_txs.iter().map(|(tx, _)| tx.compute_txid()).collect(); + } else { + self.expected_mempool_txids + .extend(new_txs.iter().map(|(tx, _)| tx.compute_txid())); + } - /// Emit the next block height and header (if any). - pub fn next_header(&mut self) -> Result>, bitcoincore_rpc::Error> { - Ok(poll(self, |hash| self.client.get_block_header(hash))? - .map(|(checkpoint, block)| BlockEvent { block, checkpoint })) + Ok(MempoolEvent { + new_txs, + evicted_txids, + latest_update_time: latest_time as u64, + }) } /// Emit the next block height and block (if any). pub fn next_block(&mut self) -> Result>, bitcoincore_rpc::Error> { - Ok(poll(self, |hash| self.client.get_block(hash))? - .map(|(checkpoint, block)| BlockEvent { block, checkpoint })) + if let Some((checkpoint, block)) = poll(self, move |hash, client| client.get_block(hash))? { + // Stop tracking unconfirmed transactions that have been confirmed in this block. + for tx in &block.txdata { + self.expected_mempool_txids.remove(&tx.compute_txid()); + } + return Ok(Some(BlockEvent { block, checkpoint })); + } + Ok(None) + } +} + +/// A new emission from mempool. +#[derive(Debug)] +pub struct MempoolEvent { + /// Unemitted transactions or transactions with ancestors that are unseen by the receiver. + /// + /// To understand the second condition, consider a receiver which filters transactions based on + /// whether it alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction + /// spends a tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to + /// block of height `h-1`, we want to re-emit this transaction until the receiver has seen the + /// block at height `h`. + pub new_txs: Vec<(Transaction, u64)>, + + /// [`Txid`]s of all transactions that have been evicted from mempool. + pub evicted_txids: HashSet, + + /// The latest timestamp of when a transaction entered the mempool. + /// + /// This is useful for setting the timestamp for evicted transactions. + pub latest_update_time: u64, +} + +impl MempoolEvent { + /// Returns an iterator of `(txid, evicted_at)` pairs for all evicted transactions. + pub fn evicted_ats(&self) -> impl ExactSizeIterator + '_ { + let time = self.latest_update_time; + self.evicted_txids.iter().map(move |&txid| (txid, time)) } } /// A newly emitted block from [`Emitter`]. #[derive(Debug)] pub struct BlockEvent { - /// Either a full [`Block`] or [`Header`] of the new block. + /// The block. pub block: B, /// The checkpoint of the new block. @@ -199,9 +309,10 @@ enum PollResponse { fn poll_once(emitter: &Emitter) -> Result where - C: bitcoincore_rpc::RpcApi, + C: Deref, + C::Target: RpcApi, { - let client = emitter.client; + let client = &*emitter.client; if let Some(last_res) = &emitter.last_block { let next_hash = if last_res.height < emitter.start_height as _ { @@ -255,15 +366,16 @@ fn poll( get_item: F, ) -> Result, bitcoincore_rpc::Error> where - C: bitcoincore_rpc::RpcApi, - F: Fn(&BlockHash) -> Result, + C: Deref, + C::Target: RpcApi, + F: Fn(&BlockHash, &C::Target) -> Result, { loop { match poll_once(emitter)? { PollResponse::Block(res) => { let height = res.height as u32; let hash = res.hash; - let item = get_item(&hash)?; + let item = get_item(&hash, &emitter.client)?; let new_cp = emitter .last_cp @@ -329,3 +441,81 @@ impl BitcoindRpcErrorExt for bitcoincore_rpc::Error { } } } + +#[cfg(test)] +mod test { + use crate::{bitcoincore_rpc::RpcApi, Emitter, NO_EXPECTED_MEMPOOL_TXIDS}; + use bdk_chain::local_chain::LocalChain; + use bdk_testenv::{anyhow, TestEnv}; + use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, Txid, WScriptHash}; + use std::collections::HashSet; + + #[test] + fn test_expected_mempool_txids_accumulate_and_remove() -> anyhow::Result<()> { + let env = TestEnv::new()?; + let chain = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?).0; + let chain_tip = chain.tip(); + let mut emitter = Emitter::new( + env.rpc_client(), + chain_tip.clone(), + 1, + NO_EXPECTED_MEMPOOL_TXIDS, + ); + + env.mine_blocks(100, None)?; + while emitter.next_block()?.is_some() {} + + let spk_to_track = ScriptBuf::new_p2wsh(&WScriptHash::all_zeros()); + let addr_to_track = Address::from_script(&spk_to_track, bitcoin::Network::Regtest)?; + let mut mempool_txids = HashSet::new(); + + // Send a tx at different heights and ensure txs are accumulating in expected_mempool_txids. + for _ in 0..10 { + let sent_txid = env.send(&addr_to_track, Amount::from_sat(1_000))?; + mempool_txids.insert(sent_txid); + emitter.mempool()?; + env.mine_blocks(1, None)?; + + for txid in &mempool_txids { + assert!( + emitter.expected_mempool_txids.contains(txid), + "Expected txid {:?} missing", + txid + ); + } + } + + // Process each block and check that confirmed txids are removed from from + // expected_mempool_txids. + while let Some(block_event) = emitter.next_block()? { + let confirmed_txids: HashSet = block_event + .block + .txdata + .iter() + .map(|tx| tx.compute_txid()) + .collect(); + mempool_txids = mempool_txids + .difference(&confirmed_txids) + .copied() + .collect::>(); + for txid in confirmed_txids { + assert!( + !emitter.expected_mempool_txids.contains(&txid), + "Expected txid {:?} should have been removed", + txid + ); + } + for txid in &mempool_txids { + assert!( + emitter.expected_mempool_txids.contains(txid), + "Expected txid {:?} missing", + txid + ); + } + } + + assert!(emitter.expected_mempool_txids.is_empty()); + + Ok(()) + } +} diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 5753b82f8..c6b0c86ac 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -1,6 +1,9 @@ -use std::collections::{BTreeMap, BTreeSet}; +use std::{ + collections::{BTreeMap, BTreeSet, HashSet}, + ops::Deref, +}; -use bdk_bitcoind_rpc::Emitter; +use bdk_bitcoind_rpc::{Emitter, NO_EXPECTED_MEMPOOL_TXIDS}; use bdk_chain::{ bitcoin::{Address, Amount, Txid}, local_chain::{CheckPoint, LocalChain}, @@ -22,7 +25,12 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> { let env = TestEnv::new()?; let network_tip = env.rpc_client().get_block_count()?; let (mut local_chain, _) = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?); - let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0); + let mut emitter = Emitter::new( + env.rpc_client(), + local_chain.tip(), + 0, + NO_EXPECTED_MEMPOOL_TXIDS, + ); // Mine some blocks and return the actual block hashes. // Because initializing `ElectrsD` already mines some blocks, we must include those too when @@ -156,7 +164,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> { index }); - let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0); + let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, NO_EXPECTED_MEMPOOL_TXIDS); while let Some(emission) = emitter.next_block()? { let height = emission.block_height(); @@ -189,7 +197,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> { assert!(emitter.next_block()?.is_none()); let mempool_txs = emitter.mempool()?; - let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs); + let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs.new_txs); assert_eq!( indexed_additions .tx_graph @@ -252,14 +260,15 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> { hash: env.rpc_client().get_block_hash(0)?, }), EMITTER_START_HEIGHT as _, + NO_EXPECTED_MEMPOOL_TXIDS, ); env.mine_blocks(CHAIN_TIP_HEIGHT, None)?; - while emitter.next_header()?.is_some() {} + while emitter.next_block()?.is_some() {} for reorg_count in 1..=10 { let replaced_blocks = env.reorg_empty_blocks(reorg_count)?; - let next_emission = emitter.next_header()?.expect("must emit block after reorg"); + let next_emission = emitter.next_block()?.expect("must emit block after reorg"); assert_eq!( ( next_emission.block_height() as usize, @@ -268,7 +277,7 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> { replaced_blocks[0], "block emitted after reorg should be at the reorg height" ); - while emitter.next_header()?.is_some() {} + while emitter.next_block()?.is_some() {} } Ok(()) @@ -291,7 +300,8 @@ fn sync_from_emitter( emitter: &mut Emitter, ) -> anyhow::Result<()> where - C: bitcoincore_rpc::RpcApi, + C: Deref, + C::Target: bitcoincore_rpc::RpcApi, { while let Some(emission) = emitter.next_block()? { let height = emission.block_height(); @@ -332,6 +342,7 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> { hash: env.rpc_client().get_block_hash(0)?, }), 0, + NO_EXPECTED_MEMPOOL_TXIDS, ); // setup addresses @@ -423,6 +434,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { hash: env.rpc_client().get_block_hash(0)?, }), 0, + NO_EXPECTED_MEMPOOL_TXIDS, ); // mine blocks and sync up emitter @@ -431,7 +443,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { .get_new_address(None, None)? .assume_checked(); env.mine_blocks(BLOCKS_TO_MINE, Some(addr.clone()))?; - while emitter.next_header()?.is_some() {} + while emitter.next_block()?.is_some() {} // have some random txs in mempool let exp_txids = (0..MEMPOOL_TX_COUNT) @@ -441,6 +453,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { // the first emission should include all transactions let emitted_txids = emitter .mempool()? + .new_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(); @@ -451,7 +464,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { // second emission should be empty assert!( - emitter.mempool()?.is_empty(), + emitter.mempool()?.new_txs.is_empty(), "second emission should be empty" ); @@ -459,9 +472,9 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { for _ in 0..BLOCKS_TO_MINE { env.mine_empty_block()?; } - while emitter.next_header()?.is_some() {} + while emitter.next_block()?.is_some() {} assert!( - emitter.mempool()?.is_empty(), + emitter.mempool()?.new_txs.is_empty(), "third emission, after chain tip is extended, should also be empty" ); @@ -488,6 +501,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() hash: env.rpc_client().get_block_hash(0)?, }), 0, + NO_EXPECTED_MEMPOOL_TXIDS, ); // mine blocks to get initial balance, sync emitter up to tip @@ -496,7 +510,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() .get_new_address(None, None)? .assume_checked(); env.mine_blocks(PREMINE_COUNT, Some(addr.clone()))?; - while emitter.next_header()?.is_some() {} + while emitter.next_block()?.is_some() {} // mine blocks to introduce txs to mempool at different heights let tx_introductions = (0..MEMPOOL_TX_COUNT) @@ -510,6 +524,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() assert_eq!( emitter .mempool()? + .new_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(), @@ -519,6 +534,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() assert_eq!( emitter .mempool()? + .new_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(), @@ -528,7 +544,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() // At this point, the emitter has seen all mempool transactions. It should only re-emit those // that have introduction heights less than the emitter's last-emitted block tip. - while let Some(emission) = emitter.next_header()? { + while let Some(emission) = emitter.next_block()? { let height = emission.block_height(); // We call `mempool()` twice. // The second call (at height `h`) should skip the tx introduced at height `h`. @@ -539,6 +555,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() .collect::>(); let emitted_txids = emitter .mempool()? + .new_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(); @@ -576,6 +593,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> { hash: env.rpc_client().get_block_hash(0)?, }), 0, + NO_EXPECTED_MEMPOOL_TXIDS, ); // mine blocks to get initial balance @@ -593,10 +611,11 @@ fn mempool_during_reorg() -> anyhow::Result<()> { // sync emitter to tip, first mempool emission should include all txs (as we haven't emitted // from the mempool yet) - while emitter.next_header()?.is_some() {} + while emitter.next_block()?.is_some() {} assert_eq!( emitter .mempool()? + .new_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(), @@ -625,13 +644,14 @@ fn mempool_during_reorg() -> anyhow::Result<()> { .collect::>()); // `next_header` emits the replacement block of the reorg - if let Some(emission) = emitter.next_header()? { + if let Some(emission) = emitter.next_block()? { let height = emission.block_height(); // the mempool emission (that follows the first block emission after reorg) should only // include mempool txs introduced at reorg height or greater let mempool = emitter .mempool()? + .new_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(); @@ -647,6 +667,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> { let mempool = emitter .mempool()? + .new_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(); @@ -670,7 +691,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> { } // sync emitter to tip - while emitter.next_header()?.is_some() {} + while emitter.next_block()?.is_some() {} } Ok(()) @@ -700,18 +721,23 @@ fn no_agreement_point() -> anyhow::Result<()> { hash: env.rpc_client().get_block_hash(0)?, }), (PREMINE_COUNT - 2) as u32, + NO_EXPECTED_MEMPOOL_TXIDS, ); // mine 101 blocks env.mine_blocks(PREMINE_COUNT, None)?; // emit block 99a - let block_header_99a = emitter.next_header()?.expect("block 99a header").block; + let block_header_99a = emitter + .next_block()? + .expect("block 99a header") + .block + .header; let block_hash_99a = block_header_99a.block_hash(); let block_hash_98a = block_header_99a.prev_blockhash; // emit block 100a - let block_header_100a = emitter.next_header()?.expect("block 100a header").block; + let block_header_100a = emitter.next_block()?.expect("block 100a header").block; let block_hash_100a = block_header_100a.block_hash(); // get hash for block 101a @@ -726,7 +752,11 @@ fn no_agreement_point() -> anyhow::Result<()> { env.mine_blocks(3, None)?; // emit block header 99b - let block_header_99b = emitter.next_header()?.expect("block 99b header").block; + let block_header_99b = emitter + .next_block()? + .expect("block 99b header") + .block + .header; let block_hash_99b = block_header_99b.block_hash(); let block_hash_98b = block_header_99b.prev_blockhash; @@ -735,3 +765,106 @@ fn no_agreement_point() -> anyhow::Result<()> { Ok(()) } + +/// Validates that when an unconfirmed transaction is double-spent (and thus evicted from the +/// mempool), the emitter reports it in `evicted_txids`, and after inserting that eviction into the +/// graph it no longer appears in the set of canonical transactions. +/// +/// 1. Broadcast a first tx (tx1) and confirm it arrives in unconfirmed set. +/// 2. Double-spend tx1 with tx1b and verify `mempool()` reports tx1 as evicted. +/// 3. Insert the eviction into the graph and assert tx1 is no longer canonical. +#[test] +fn test_expect_tx_evicted() -> anyhow::Result<()> { + use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoin; + use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoincore_rpc_json::CreateRawTransactionInput; + use bdk_chain::miniscript; + use bdk_chain::spk_txout::SpkTxOutIndex; + use bitcoin::constants::genesis_block; + use bitcoin::secp256k1::Secp256k1; + use bitcoin::Network; + use std::collections::HashMap; + let env = TestEnv::new()?; + + let s = bdk_testenv::utils::DESCRIPTORS[0]; + let desc = miniscript::Descriptor::parse_descriptor(&Secp256k1::new(), s) + .unwrap() + .0; + let spk = desc.at_derivation_index(0)?.script_pubkey(); + + let mut chain = LocalChain::from_genesis_hash(genesis_block(Network::Regtest).block_hash()).0; + let chain_tip = chain.tip().block_id(); + + let mut index = SpkTxOutIndex::default(); + index.insert_spk((), spk.clone()); + let mut graph = IndexedTxGraph::::new(index); + + // Receive tx1. + let _ = env.mine_blocks(100, None)?; + let txid_1 = env.send( + &Address::from_script(&spk, Network::Regtest)?, + Amount::ONE_BTC, + )?; + + let mut emitter = Emitter::new(env.rpc_client(), chain.tip(), 1, HashSet::from([txid_1])); + while let Some(emission) = emitter.next_block()? { + let height = emission.block_height(); + chain.apply_update(CheckPoint::from_header(&emission.block.header, height))?; + } + + let changeset = graph.batch_insert_unconfirmed(emitter.mempool()?.new_txs); + assert!(changeset + .tx_graph + .txs + .iter() + .any(|tx| tx.compute_txid() == txid_1)); + + // Double spend tx1. + + // Get `prevout` from core. + let core = env.rpc_client(); + let tx1 = &core.get_raw_transaction(&txid_1, None)?; + let txin = &tx1.input[0]; + let op = txin.previous_output; + + // Create `tx1b` using the previous output from tx1. + let utxo = CreateRawTransactionInput { + txid: op.txid, + vout: op.vout, + sequence: None, + }; + let addr = core.get_new_address(None, None)?.assume_checked(); + let tx = core.create_raw_transaction( + &[utxo], + &HashMap::from([(addr.to_string(), Amount::from_btc(49.99)?)]), + None, + None, + )?; + let res = core.sign_raw_transaction_with_wallet(&tx, None, None)?; + let tx1b = res.transaction()?; + + // Send the tx. + let _txid_2 = core.send_raw_transaction(&tx1b)?; + + // Retrieve the expected unconfirmed txids and spks from the graph. + let exp_spk_txids = graph + .list_expected_spk_txids(&chain, chain_tip, ..) + .collect::>(); + assert_eq!(exp_spk_txids, vec![(spk, txid_1)]); + + // Check that mempool emission contains evicted txid. + let mempool_event = emitter.mempool()?; + assert!(mempool_event.evicted_txids.contains(&txid_1)); + + // Update graph with evicted tx. + let _ = graph.batch_insert_relevant_evicted_at(mempool_event.evicted_ats()); + + let canonical_txids = graph + .graph() + .list_canonical_txs(&chain, chain_tip, CanonicalizationParams::default()) + .map(|tx| tx.tx_node.compute_txid()) + .collect::>(); + // tx1 should no longer be canonical. + assert!(!canonical_txids.contains(&txid_1)); + + Ok(()) +} diff --git a/crates/chain/src/chain_data.rs b/crates/chain/src/chain_data.rs index a4d764c02..4890b08a7 100644 --- a/crates/chain/src/chain_data.rs +++ b/crates/chain/src/chain_data.rs @@ -40,6 +40,11 @@ impl ChainPosition { pub fn is_confirmed(&self) -> bool { matches!(self, Self::Confirmed { .. }) } + + /// Returns whether [`ChainPosition`] is unconfirmed or not. + pub fn is_unconfirmed(&self) -> bool { + matches!(self, Self::Unconfirmed { .. }) + } } impl ChainPosition<&A> { diff --git a/crates/chain/src/indexed_tx_graph.rs b/crates/chain/src/indexed_tx_graph.rs index bcd6ac3fc..89e794326 100644 --- a/crates/chain/src/indexed_tx_graph.rs +++ b/crates/chain/src/indexed_tx_graph.rs @@ -145,6 +145,22 @@ where } } + /// Batch inserts `(txid, evicted_at)` pairs for `txid`s that the graph is tracking. + /// + /// The `evicted_at` timestamp represents the last known time when the transaction was observed + /// to be missing from the mempool. If `txid` was previously recorded with an earlier + /// `evicted_at` value, it is updated only if the new value is greater. + pub fn batch_insert_relevant_evicted_at( + &mut self, + evicted_ats: impl IntoIterator, + ) -> ChangeSet { + let tx_graph = self.graph.batch_insert_relevant_evicted_at(evicted_ats); + ChangeSet { + tx_graph, + ..Default::default() + } + } + /// Batch insert transactions, filtering out those that are irrelevant. /// /// Relevancy is determined by the [`Indexer::is_tx_relevant`] implementation of `I`. Irrelevant diff --git a/crates/chain/src/tx_graph.rs b/crates/chain/src/tx_graph.rs index 2358b1885..7916bf954 100644 --- a/crates/chain/src/tx_graph.rs +++ b/crates/chain/src/tx_graph.rs @@ -251,6 +251,18 @@ pub struct CanonicalTx<'a, T, A> { pub tx_node: TxNode<'a, T, A>, } +impl<'a, T, A> From> for Txid { + fn from(tx: CanonicalTx<'a, T, A>) -> Self { + tx.tx_node.txid + } +} + +impl<'a, A> From, A>> for Arc { + fn from(tx: CanonicalTx<'a, Arc, A>) -> Self { + tx.tx_node.tx + } +} + /// Errors returned by `TxGraph::calculate_fee`. #[derive(Debug, PartialEq, Eq)] pub enum CalculateFeeError { @@ -835,6 +847,26 @@ impl TxGraph { changeset } + /// Batch inserts `(txid, evicted_at)` pairs into [`TxGraph`] for `txid`s that the graph is + /// tracking. + /// + /// The `evicted_at` timestamp represents the last known time when the transaction was observed + /// to be missing from the mempool. If `txid` was previously recorded with an earlier + /// `evicted_at` value, it is updated only if the new value is greater. + pub fn batch_insert_relevant_evicted_at( + &mut self, + evicted_ats: impl IntoIterator, + ) -> ChangeSet { + let mut changeset = ChangeSet::default(); + for (txid, evicted_at) in evicted_ats { + // Only record evictions for transactions the graph is tracking. + if self.txs.contains_key(&txid) { + changeset.merge(self.insert_evicted_at(txid, evicted_at)); + } + } + changeset + } + /// Extends this graph with the given `update`. /// /// The returned [`ChangeSet`] is the set difference between `update` and `self` (transactions that diff --git a/examples/example_bitcoind_rpc_polling/src/main.rs b/examples/example_bitcoind_rpc_polling/src/main.rs index 5eb3d3eb1..51e5bc23b 100644 --- a/examples/example_bitcoind_rpc_polling/src/main.rs +++ b/examples/example_bitcoind_rpc_polling/src/main.rs @@ -11,10 +11,7 @@ use bdk_bitcoind_rpc::{ bitcoincore_rpc::{Auth, Client, RpcApi}, Emitter, }; -use bdk_chain::{ - bitcoin::{Block, Transaction}, - local_chain, CanonicalizationParams, Merge, -}; +use bdk_chain::{bitcoin::Block, local_chain, CanonicalizationParams, Merge}; use example_cli::{ anyhow, clap::{self, Args, Subcommand}, @@ -36,7 +33,7 @@ const DB_COMMIT_DELAY: Duration = Duration::from_secs(60); #[derive(Debug)] enum Emission { Block(bdk_bitcoind_rpc::BlockEvent), - Mempool(Vec<(Transaction, u64)>), + Mempool(bdk_bitcoind_rpc::MempoolEvent), Tip(u32), } @@ -139,9 +136,24 @@ fn main() -> anyhow::Result<()> { fallback_height, .. } = rpc_args; - let chain_tip = chain.lock().unwrap().tip(); let rpc_client = rpc_args.new_client()?; - let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height); + let mut emitter = { + let chain = chain.lock().unwrap(); + let graph = graph.lock().unwrap(); + Emitter::new( + &rpc_client, + chain.tip(), + fallback_height, + graph + .graph() + .list_canonical_txs( + &*chain, + chain.tip().block_id(), + CanonicalizationParams::default(), + ) + .filter(|tx| tx.chain_position.is_unconfirmed()), + ) + }; let mut db_stage = ChangeSet::default(); let mut last_db_commit = Instant::now(); @@ -205,7 +217,7 @@ fn main() -> anyhow::Result<()> { let graph_changeset = graph .lock() .unwrap() - .batch_insert_relevant_unconfirmed(mempool_txs); + .batch_insert_relevant_unconfirmed(mempool_txs.new_txs); { let db = &mut *db.lock().unwrap(); db_stage.merge(ChangeSet { @@ -224,7 +236,24 @@ fn main() -> anyhow::Result<()> { } = rpc_args; let sigterm_flag = start_ctrlc_handler(); - let last_cp = chain.lock().unwrap().tip(); + let rpc_client = Arc::new(rpc_args.new_client()?); + let mut emitter = { + let chain = chain.lock().unwrap(); + let graph = graph.lock().unwrap(); + Emitter::new( + rpc_client.clone(), + chain.tip(), + fallback_height, + graph + .graph() + .list_canonical_txs( + &*chain, + chain.tip().block_id(), + CanonicalizationParams::default(), + ) + .filter(|tx| tx.chain_position.is_unconfirmed()), + ) + }; println!( "[{:>10}s] starting emitter thread...", @@ -232,9 +261,6 @@ fn main() -> anyhow::Result<()> { ); let (tx, rx) = std::sync::mpsc::sync_channel::(CHANNEL_BOUND); let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> { - let rpc_client = rpc_args.new_client()?; - let mut emitter = Emitter::new(&rpc_client, last_cp, fallback_height); - let mut block_count = rpc_client.get_block_count()? as u32; tx.send(Emission::Tip(block_count))?; @@ -288,7 +314,11 @@ fn main() -> anyhow::Result<()> { (chain_changeset, graph_changeset) } Emission::Mempool(mempool_txs) => { - let graph_changeset = graph.batch_insert_relevant_unconfirmed(mempool_txs); + let mut graph_changeset = + graph.batch_insert_relevant_unconfirmed(mempool_txs.new_txs.clone()); + graph_changeset.merge( + graph.batch_insert_relevant_evicted_at(mempool_txs.evicted_ats()), + ); (local_chain::ChangeSet::default(), graph_changeset) } Emission::Tip(h) => {