diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index 3fa17ef19..1fc35718f 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -10,8 +10,9 @@ #![warn(missing_docs)] use bdk_core::{BlockId, CheckPoint}; -use bitcoin::{block::Header, Block, BlockHash, Transaction}; +use bitcoin::{block::Header, Block, BlockHash, Transaction, Txid}; use bitcoincore_rpc::bitcoincore_rpc_json; +use std::collections::HashSet; pub mod bip158; @@ -64,17 +65,19 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { } } - /// Emit mempool transactions, alongside their first-seen unix timestamps. + /// Emit mempool transactions and capture the initial snapshot of all mempool [`Txid`]s. /// - /// This method emits each transaction only once, unless we cannot guarantee the transaction's - /// ancestors are already emitted. + /// This method returns a [`MempoolEvent`] containing the full transactions (with their + /// first-seen unix timestamps) that were emitted, and the set of all [`Txid`]s present from the + /// initial mempool query. Each transaction is emitted only once, unless we cannot guarantee the + /// transaction's ancestors are already emitted. /// /// To understand why, consider a receiver which filters transactions based on whether it /// alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction spends a /// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block /// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block /// at height `h`. - pub fn mempool(&mut self) -> Result, bitcoincore_rpc::Error> { + pub fn mempool(&mut self) -> Result { let client = self.client; // This is the emitted tip height during the last mempool emission. @@ -91,8 +94,11 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { let prev_mempool_time = self.last_mempool_time; let mut latest_time = prev_mempool_time; - let txs_to_emit = client - .get_raw_mempool_verbose()? + // Get the raw mempool result from the RPC client. + let raw_mempool = client.get_raw_mempool_verbose()?; + let raw_mempool_txids = raw_mempool.keys().copied().collect::>(); + + let emitted_txs = raw_mempool .into_iter() .filter_map({ let latest_time = &mut latest_time; @@ -128,7 +134,11 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { self.last_mempool_time = latest_time; self.last_mempool_tip = Some(self.last_cp.height()); - Ok(txs_to_emit) + Ok(MempoolEvent { + emitted_txs, + raw_mempool_txids, + last_seen: latest_time as u64, + }) } /// Emit the next block height and header (if any). @@ -144,6 +154,37 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { } } +/// A new emission from mempool. +#[derive(Debug)] +pub struct MempoolEvent { + /// Emitted mempool transactions with their first‐seen unix timestamps. + pub emitted_txs: Vec<(Transaction, u64)>, + + /// Set of all [`Txid`]s from the raw mempool result, including transactions that may have been + /// confirmed or evicted during processing. This is used to determine which expected + /// transactions are missing. + pub raw_mempool_txids: HashSet, + + /// The latest first-seen epoch of emitted mempool transactions. + pub last_seen: u64, +} + +impl MempoolEvent { + /// Given an iterator of expected [`Txid`]s, return those that are missing from the mempool. + pub fn evicted_txids( + &self, + expected_unconfirmed_txids: impl IntoIterator, + ) -> HashSet { + let expected_set = expected_unconfirmed_txids + .into_iter() + .collect::>(); + expected_set + .difference(&self.raw_mempool_txids) + .copied() + .collect() + } +} + /// A newly emitted block from [`Emitter`]. #[derive(Debug)] pub struct BlockEvent { diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 14b0c9212..2448b6483 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -189,7 +189,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> { assert!(emitter.next_block()?.is_none()); let mempool_txs = emitter.mempool()?; - let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs); + let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs.emitted_txs); assert_eq!( indexed_additions .tx_graph @@ -437,6 +437,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { // the first emission should include all transactions let emitted_txids = emitter .mempool()? + .emitted_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(); @@ -447,7 +448,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { // second emission should be empty assert!( - emitter.mempool()?.is_empty(), + emitter.mempool()?.emitted_txs.is_empty(), "second emission should be empty" ); @@ -457,7 +458,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { } while emitter.next_header()?.is_some() {} assert!( - emitter.mempool()?.is_empty(), + emitter.mempool()?.emitted_txs.is_empty(), "third emission, after chain tip is extended, should also be empty" ); @@ -506,6 +507,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() assert_eq!( emitter .mempool()? + .emitted_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(), @@ -515,6 +517,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() assert_eq!( emitter .mempool()? + .emitted_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(), @@ -535,6 +538,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() .collect::>(); let emitted_txids = emitter .mempool()? + .emitted_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(); @@ -593,6 +597,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> { assert_eq!( emitter .mempool()? + .emitted_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(), @@ -628,6 +633,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> { // include mempool txs introduced at reorg height or greater let mempool = emitter .mempool()? + .emitted_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(); @@ -643,6 +649,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> { let mempool = emitter .mempool()? + .emitted_txs .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(); @@ -731,3 +738,107 @@ fn no_agreement_point() -> anyhow::Result<()> { Ok(()) } + +#[test] +fn test_expect_tx_evicted() -> anyhow::Result<()> { + use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoin; + use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoincore_rpc_json::CreateRawTransactionInput; + use bdk_chain::miniscript; + use bdk_chain::spk_txout::SpkTxOutIndex; + use bdk_chain::ConfirmationBlockTime; + use bitcoin::constants::genesis_block; + use bitcoin::secp256k1::Secp256k1; + use bitcoin::Network; + use std::collections::HashMap; + let env = TestEnv::new()?; + + let s = bdk_testenv::utils::DESCRIPTORS[0]; + let desc = miniscript::Descriptor::parse_descriptor(&Secp256k1::new(), s) + .unwrap() + .0; + let spk = desc.at_derivation_index(0)?.script_pubkey(); + + let chain = LocalChain::from_genesis_hash(genesis_block(Network::Regtest).block_hash()).0; + let chain_tip = chain.tip().block_id(); + + let mut index = SpkTxOutIndex::default(); + index.insert_spk(("external", 0u32), spk.clone()); + let mut graph = IndexedTxGraph::::new(index); + + // Receive tx1. + let _ = env.mine_blocks(100, None)?; + let txid_1 = env.send( + &Address::from_script(&spk, Network::Regtest)?, + Amount::ONE_BTC, + )?; + + let mut emitter = Emitter::new(env.rpc_client(), chain.tip(), 1); + let changeset = graph.batch_insert_unconfirmed(emitter.mempool()?.emitted_txs); + assert!(changeset + .tx_graph + .txs + .iter() + .any(|tx| tx.compute_txid() == txid_1)); + let seen_at = graph + .graph() + .get_tx_node(txid_1) + .unwrap() + .last_seen_unconfirmed + .unwrap(); + + // Double spend tx1. + + // Get `prevout` from core. + let core = env.rpc_client(); + let tx1 = &core.get_raw_transaction(&txid_1, None)?; + let txin = &tx1.input[0]; + let op = txin.previous_output; + + // Create `tx1b` using the previous output from tx1. + let utxo = CreateRawTransactionInput { + txid: op.txid, + vout: op.vout, + sequence: None, + }; + let addr = core.get_new_address(None, None)?.assume_checked(); + let tx = core.create_raw_transaction( + &[utxo], + &HashMap::from([(addr.to_string(), Amount::from_btc(49.99)?)]), + None, + None, + )?; + let res = core.sign_raw_transaction_with_wallet(&tx, None, None)?; + let tx1b = res.transaction()?; + + // Send the tx. + let txid_2 = core.send_raw_transaction(&tx1b)?; + + // Retrieve the expected unconfirmed txids and spks from the graph. + let exp_spk_txids = graph.expected_unconfirmed_spk_txids(&chain, chain_tip, ..)?; + assert_eq!(exp_spk_txids, vec![(txid_1, spk)]); + + // Check that mempool emission contains evicted txid. + let mempool_event = emitter.mempool()?; + let unseen_txids: Vec = mempool_event + .emitted_txs + .iter() + .map(|(tx, _)| tx.compute_txid()) + .collect(); + assert!(unseen_txids.contains(&txid_2)); + + // Update graph with evicted tx. + let exp_txids = exp_spk_txids.into_iter().map(|(txid, _)| txid); + let evicted_txids = mempool_event.evicted_txids(exp_txids); + for txid in evicted_txids { + let _ = graph.insert_evicted_at(txid, seen_at); + } + + // tx1 should no longer be canonical. + assert!(graph + .graph() + .list_canonical_txs(&chain, chain_tip) + .next() + .is_none()); + + Ok(()) +} diff --git a/crates/chain/benches/canonicalization.rs b/crates/chain/benches/canonicalization.rs index 3002a7ca3..52cbf51d6 100644 --- a/crates/chain/benches/canonicalization.rs +++ b/crates/chain/benches/canonicalization.rs @@ -132,10 +132,8 @@ pub fn many_conflicting_unconfirmed(c: &mut Criterion) { }], ..new_tx(i) }; - let update = TxUpdate { - txs: vec![Arc::new(tx)], - ..Default::default() - }; + let mut update = TxUpdate::default(); + update.txs = vec![Arc::new(tx)]; let _ = tx_graph.apply_update_at(update, Some(i as u64)); } })); @@ -169,10 +167,8 @@ pub fn many_chained_unconfirmed(c: &mut Criterion) { ..new_tx(i) }; let txid = tx.compute_txid(); - let update = TxUpdate { - txs: vec![Arc::new(tx)], - ..Default::default() - }; + let mut update = TxUpdate::default(); + update.txs = vec![Arc::new(tx)]; let _ = tx_graph.apply_update_at(update, Some(i as u64)); // Store the next prevout. previous_output = OutPoint::new(txid, 0); diff --git a/crates/chain/src/indexed_tx_graph.rs b/crates/chain/src/indexed_tx_graph.rs index 039924c92..acbddb3c6 100644 --- a/crates/chain/src/indexed_tx_graph.rs +++ b/crates/chain/src/indexed_tx_graph.rs @@ -1,13 +1,15 @@ //! Contains the [`IndexedTxGraph`] and associated types. Refer to the //! [`IndexedTxGraph`] documentation for more. -use core::fmt::Debug; + +use core::ops::RangeBounds; use alloc::{sync::Arc, vec::Vec}; -use bitcoin::{Block, OutPoint, Transaction, TxOut, Txid}; +use bitcoin::{Block, OutPoint, ScriptBuf, Transaction, TxOut, Txid}; use crate::{ + spk_txout::SpkTxOutIndex, tx_graph::{self, TxGraph}, - Anchor, BlockId, Indexer, Merge, TxPosInBlock, + Anchor, BlockId, ChainOracle, Indexer, Merge, TxPosInBlock, }; /// The [`IndexedTxGraph`] combines a [`TxGraph`] and an [`Indexer`] implementation. @@ -324,6 +326,69 @@ where indexer, } } + + /// Inserts the given `evicted_at` for `txid`. + /// + /// The `evicted_at` timestamp represents the last known time when the transaction was observed + /// to be missing from the mempool. If `txid` was previously recorded with an earlier + /// `evicted_at` value, it is updated only if the new value is greater. + pub fn insert_evicted_at(&mut self, txid: Txid, evicted_at: u64) -> ChangeSet { + let tx_graph = self.graph.insert_evicted_at(txid, evicted_at); + ChangeSet { + tx_graph, + ..Default::default() + } + } +} + +impl IndexedTxGraph> +where + A: Anchor, + I: core::fmt::Debug + Clone + Ord, +{ + /// Iterate over unconfirmed txids that we expect to exist in a chain source's spk history + /// response. + /// + /// This is used to fill [`SyncRequestBuilder::expected_unconfirmed_spk_txids`](bdk_core::spk_client::SyncRequestBuilder::expected_unconfirmed_spk_txids). + /// + /// The spk range can be contrained with `range`. + pub fn expected_unconfirmed_spk_txids<'a, O>( + &'a self, + chain: &'a O, + chain_tip: BlockId, + range: impl RangeBounds + 'a, + ) -> Result, O::Error> + where + O: ChainOracle, + { + self.graph + .expected_unconfirmed_spk_txids(chain, chain_tip, &self.index, range) + } +} + +impl IndexedTxGraph> +where + A: Anchor, + K: core::fmt::Debug + Clone + Ord, +{ + /// Iterate over unconfirmed txids that we expect to exist in a chain source's spk history + /// response. + /// + /// This is used to fill [`SyncRequestBuilder::expected_unconfirmed_spk_txids`](bdk_core::spk_client::SyncRequestBuilder::expected_unconfirmed_spk_txids). + /// + /// The spk range can be contrained with `range`. + pub fn expected_unconfirmed_spk_txids<'a, O>( + &'a self, + chain: &'a O, + chain_tip: BlockId, + range: impl RangeBounds<(K, u32)> + 'a, + ) -> Result, O::Error> + where + O: ChainOracle, + { + self.graph + .expected_unconfirmed_spk_txids(chain, chain_tip, &self.index, range) + } } impl AsRef> for IndexedTxGraph { diff --git a/crates/chain/src/indexer/keychain_txout.rs b/crates/chain/src/indexer/keychain_txout.rs index 4543027cc..8f159fb38 100644 --- a/crates/chain/src/indexer/keychain_txout.rs +++ b/crates/chain/src/indexer/keychain_txout.rs @@ -136,6 +136,12 @@ impl Default for KeychainTxOutIndex { } } +impl AsRef> for KeychainTxOutIndex { + fn as_ref(&self) -> &SpkTxOutIndex<(K, u32)> { + self.inner() + } +} + impl Indexer for KeychainTxOutIndex { type ChangeSet = ChangeSet; @@ -200,6 +206,11 @@ impl KeychainTxOutIndex { lookahead, } } + + /// Get a reference to the internal [`SpkTxOutIndex`]. + pub fn inner(&self) -> &SpkTxOutIndex<(K, u32)> { + &self.inner + } } /// Methods that are *re-exposed* from the internal [`SpkTxOutIndex`]. diff --git a/crates/chain/src/indexer/spk_txout.rs b/crates/chain/src/indexer/spk_txout.rs index 286e5d2dc..c3b33dd3e 100644 --- a/crates/chain/src/indexer/spk_txout.rs +++ b/crates/chain/src/indexer/spk_txout.rs @@ -3,7 +3,7 @@ use core::ops::RangeBounds; use crate::{ - collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap}, + collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet}, Indexer, }; use bitcoin::{Amount, OutPoint, ScriptBuf, SignedAmount, Transaction, TxOut, Txid}; @@ -42,6 +42,12 @@ pub struct SpkTxOutIndex { spk_txouts: BTreeSet<(I, OutPoint)>, } +impl AsRef> for SpkTxOutIndex { + fn as_ref(&self) -> &SpkTxOutIndex { + self + } +} + impl Default for SpkTxOutIndex { fn default() -> Self { Self { @@ -334,4 +340,23 @@ impl SpkTxOutIndex { .any(|output| self.spk_indices.contains_key(&output.script_pubkey)); input_matches || output_matches } + + /// Find relevant script pubkeys associated with a transaction for tracking and validation. + /// + /// Returns a set of script pubkeys from [`SpkTxOutIndex`] that are relevant to the outputs and + /// previous outputs of a given transaction. Inputs are only considered relevant if the parent + /// transactions have been scanned. + pub fn relevant_spks_of_tx(&self, tx: &Transaction) -> HashSet { + let spks_from_inputs = tx.input.iter().filter_map(|txin| { + self.txouts + .get(&txin.previous_output) + .map(|(_, prev_txo)| prev_txo.script_pubkey.clone()) + }); + let spks_from_outputs = tx + .output + .iter() + .filter(|txout| self.spk_indices.contains_key(&txout.script_pubkey)) + .map(|txo| txo.script_pubkey.clone()); + spks_from_inputs.chain(spks_from_outputs).collect() + } } diff --git a/crates/chain/src/rusqlite_impl.rs b/crates/chain/src/rusqlite_impl.rs index 7b39f53c0..3bc105d0b 100644 --- a/crates/chain/src/rusqlite_impl.rs +++ b/crates/chain/src/rusqlite_impl.rs @@ -264,12 +264,20 @@ impl tx_graph::ChangeSet { format!("{add_confirmation_time_column}; {extract_confirmation_time_from_anchor_column}; {drop_anchor_column}") } + /// Get v2 of sqlite [tx_graph::ChangeSet] schema + pub fn schema_v2() -> String { + format!( + "ALTER TABLE {} ADD COLUMN last_evicted INTEGER", + Self::TXS_TABLE_NAME, + ) + } + /// Initialize sqlite tables. pub fn init_sqlite_tables(db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> { migrate_schema( db_tx, Self::SCHEMA_NAME, - &[&Self::schema_v0(), &Self::schema_v1()], + &[&Self::schema_v0(), &Self::schema_v1(), &Self::schema_v2()], ) } @@ -280,7 +288,7 @@ impl tx_graph::ChangeSet { let mut changeset = Self::default(); let mut statement = db_tx.prepare(&format!( - "SELECT txid, raw_tx, last_seen FROM {}", + "SELECT txid, raw_tx, last_seen, last_evicted FROM {}", Self::TXS_TABLE_NAME, ))?; let row_iter = statement.query_map([], |row| { @@ -288,16 +296,20 @@ impl tx_graph::ChangeSet { row.get::<_, Impl>("txid")?, row.get::<_, Option>>("raw_tx")?, row.get::<_, Option>("last_seen")?, + row.get::<_, Option>("last_evicted")?, )) })?; for row in row_iter { - let (Impl(txid), tx, last_seen) = row?; + let (Impl(txid), tx, last_seen, last_evicted) = row?; if let Some(Impl(tx)) = tx { changeset.txs.insert(Arc::new(tx)); } if let Some(last_seen) = last_seen { changeset.last_seen.insert(txid, last_seen); } + if let Some(last_evicted) = last_evicted { + changeset.last_evicted.insert(txid, last_evicted); + } } let mut statement = db_tx.prepare(&format!( @@ -377,6 +389,19 @@ impl tx_graph::ChangeSet { })?; } + let mut statement = db_tx + .prepare_cached(&format!( + "INSERT INTO {}(txid, last_evicted) VALUES(:txid, :last_evicted) ON CONFLICT(txid) DO UPDATE SET last_evicted=:last_evicted", + Self::TXS_TABLE_NAME, + ))?; + for (&txid, &last_evicted) in &self.last_evicted { + let checked_time = last_evicted.to_sql()?; + statement.execute(named_params! { + ":txid": Impl(txid), + ":last_evicted": Some(checked_time), + })?; + } + let mut statement = db_tx.prepare_cached(&format!( "REPLACE INTO {}(txid, vout, value, script) VALUES(:txid, :vout, :value, :script)", Self::TXOUTS_TABLE_NAME, @@ -628,7 +653,7 @@ mod test { } #[test] - fn v0_to_v1_schema_migration_is_backward_compatible() -> anyhow::Result<()> { + fn v0_to_v2_schema_migration_is_backward_compatible() -> anyhow::Result<()> { type ChangeSet = tx_graph::ChangeSet; let mut conn = rusqlite::Connection::open_in_memory()?; @@ -697,13 +722,17 @@ mod test { } } - // Apply v1 sqlite schema to tables with data + // Apply v1 & v2 sqlite schema to tables with data { let db_tx = conn.transaction()?; migrate_schema( &db_tx, ChangeSet::SCHEMA_NAME, - &[&ChangeSet::schema_v0(), &ChangeSet::schema_v1()], + &[ + &ChangeSet::schema_v0(), + &ChangeSet::schema_v1(), + &ChangeSet::schema_v2(), + ], )?; db_tx.commit()?; } @@ -718,4 +747,43 @@ mod test { Ok(()) } + + #[test] + fn can_persist_last_evicted() -> anyhow::Result<()> { + use bitcoin::hashes::Hash; + + type ChangeSet = tx_graph::ChangeSet; + let mut conn = rusqlite::Connection::open_in_memory()?; + + // Init tables + { + let db_tx = conn.transaction()?; + ChangeSet::init_sqlite_tables(&db_tx)?; + db_tx.commit()?; + } + + let txid = bitcoin::Txid::all_zeros(); + let last_evicted = 100; + + // Persist `last_evicted` + { + let changeset = ChangeSet { + last_evicted: [(txid, last_evicted)].into(), + ..Default::default() + }; + let db_tx = conn.transaction()?; + changeset.persist_to_sqlite(&db_tx)?; + db_tx.commit()?; + } + + // Load from sqlite should succeed + { + let db_tx = conn.transaction()?; + let changeset = ChangeSet::from_sqlite(&db_tx)?; + db_tx.commit()?; + assert_eq!(changeset.last_evicted.get(&txid), Some(&last_evicted)); + } + + Ok(()) + } } diff --git a/crates/chain/src/tx_graph.rs b/crates/chain/src/tx_graph.rs index 2d512cfea..7241a4454 100644 --- a/crates/chain/src/tx_graph.rs +++ b/crates/chain/src/tx_graph.rs @@ -16,23 +16,52 @@ //! documentation for more details), and the timestamp of the last time we saw the transaction as //! unconfirmed. //! -//! Conflicting transactions are allowed to coexist within a [`TxGraph`]. This is useful for -//! identifying and traversing conflicts and descendants of a given transaction. Some [`TxGraph`] -//! methods only consider transactions that are "canonical" (i.e., in the best chain or in mempool). -//! We decide which transactions are canonical based on the transaction's anchors and the -//! `last_seen` (as unconfirmed) timestamp. +//! # Canonicalization //! -//! The [`ChangeSet`] reports changes made to a [`TxGraph`]; it can be used to either save to -//! persistent storage, or to be applied to another [`TxGraph`]. +//! Conflicting transactions are allowed to coexist within a [`TxGraph`]. A process called +//! canonicalization is required to get a conflict-free history of transactions. +//! +//! * [`list_canonical_txs`](TxGraph::list_canonical_txs) lists canonical transactions. +//! * [`filter_chain_txouts`](TxGraph::filter_chain_txouts) filters out canonical outputs from a +//! list of outpoints. +//! * [`filter_chain_unspents`](TxGraph::filter_chain_unspents) filters out canonical unspent +//! outputs from a list of outpoints. +//! * [`balance`](TxGraph::balance) gets the total sum of unspent outputs filtered from a list of +//! outpoints. +//! * [`canonical_iter`](TxGraph::canonical_iter) returns the [`CanonicalIter`] which contains all +//! of the canonicalization logic. +//! +//! All these methods require a `chain` and `chain_tip` argument. The `chain` must be a +//! [`ChainOracle`] implementation (such as [`LocalChain`](crate::local_chain::LocalChain)) which +//! identifies which blocks exist under a given `chain_tip`. //! -//! Lastly, you can use [`TxAncestors`]/[`TxDescendants`] to traverse ancestors and descendants of -//! a given transaction, respectively. +//! The canonicalization algorithm uses the following associated data to determine which +//! transactions have precedence over others: +//! +//! * [`Anchor`] - This bit of data represents that a transaction is anchored in a given block. If +//! the transaction is anchored in chain of `chain_tip`, or is an ancestor of a transaction +//! anchored in chain of `chain_tip`, then the transaction must be canonical. +//! * `last_seen` - This is the timestamp of when a transaction is last-seen in the mempool. This +//! value is updated by [`insert_seen_at`](TxGraph::insert_seen_at) and +//! [`apply_update`](TxGraph::apply_update). Transactions that are seen later have higher +//! priority than those that are seen earlier. `last_seen` values are transitive. Meaning that +//! the actual `last_seen` value of a transaction is the max of all the `last_seen` values of +//! it's descendants. +//! * `last_evicted` - This is the timestamp of when a transaction is last-seen as evicted in the +//! mempool. If this value is equal to or higher than the transaction's `last_seen` value, then +//! it will not be considered canonical. +//! +//! # Graph traversal +//! +//! You can use [`TxAncestors`]/[`TxDescendants`] to traverse ancestors and descendants of a given +//! transaction, respectively. //! //! # Applying changes //! +//! The [`ChangeSet`] reports changes made to a [`TxGraph`]; it can be used to either save to +//! persistent storage, or to be applied to another [`TxGraph`]. +//! //! Methods that change the state of [`TxGraph`] will return [`ChangeSet`]s. -//! [`ChangeSet`]s can be applied back to a [`TxGraph`] or be used to inform persistent storage -//! of the changes to [`TxGraph`]. //! //! # Generics //! @@ -91,6 +120,7 @@ //! [`insert_txout`]: TxGraph::insert_txout use crate::collections::*; +use crate::spk_txout::SpkTxOutIndex; use crate::BlockId; use crate::CanonicalIter; use crate::CanonicalReason; @@ -105,24 +135,24 @@ use bitcoin::{Amount, OutPoint, ScriptBuf, SignedAmount, Transaction, TxOut, Txi use core::fmt::{self, Formatter}; use core::{ convert::Infallible, - ops::{Deref, RangeInclusive}, + ops::{Deref, RangeBounds, RangeInclusive}, }; impl From> for TxUpdate { fn from(graph: TxGraph) -> Self { - Self { - txs: graph.full_txs().map(|tx_node| tx_node.tx).collect(), - txouts: graph - .floating_txouts() - .map(|(op, txo)| (op, txo.clone())) - .collect(), - anchors: graph - .anchors - .into_iter() - .flat_map(|(txid, anchors)| anchors.into_iter().map(move |a| (a, txid))) - .collect(), - seen_ats: graph.last_seen.into_iter().collect(), - } + let mut tx_update = TxUpdate::default(); + tx_update.txs = graph.full_txs().map(|tx_node| tx_node.tx).collect(); + tx_update.txouts = graph + .floating_txouts() + .map(|(op, txo)| (op, txo.clone())) + .collect(); + tx_update.anchors = graph + .anchors + .into_iter() + .flat_map(|(txid, anchors)| anchors.into_iter().map(move |a| (a, txid))) + .collect(); + tx_update.seen_ats = graph.last_seen.into_iter().collect(); + tx_update } } @@ -145,6 +175,7 @@ pub struct TxGraph { spends: BTreeMap>, anchors: HashMap>, last_seen: HashMap, + last_evicted: HashMap, txs_by_highest_conf_heights: BTreeSet<(u32, Txid)>, txs_by_last_seen: BTreeSet<(u64, Txid)>, @@ -162,6 +193,7 @@ impl Default for TxGraph { spends: Default::default(), anchors: Default::default(), last_seen: Default::default(), + last_evicted: Default::default(), txs_by_highest_conf_heights: Default::default(), txs_by_last_seen: Default::default(), empty_outspends: Default::default(), @@ -715,6 +747,34 @@ impl TxGraph { changeset } + /// Inserts the given `evicted_at` for `txid` into [`TxGraph`]. + /// + /// The `evicted_at` timestamp represents the last known time when the transaction was observed + /// to be missing from the mempool. If `txid` was previously recorded with an earlier + /// `evicted_at` value, it is updated only if the new value is greater. + pub fn insert_evicted_at(&mut self, txid: Txid, evicted_at: u64) -> ChangeSet { + let is_changed = match self.last_evicted.entry(txid) { + hash_map::Entry::Occupied(mut e) => { + let last_evicted = e.get_mut(); + let change = *last_evicted < evicted_at; + if change { + *last_evicted = evicted_at; + } + change + } + hash_map::Entry::Vacant(e) => { + e.insert(evicted_at); + true + } + }; + + let mut changeset = ChangeSet::::default(); + if is_changed { + changeset.last_evicted.insert(txid, evicted_at); + } + changeset + } + /// Extends this graph with the given `update`. /// /// The returned [`ChangeSet`] is the set difference between `update` and `self` (transactions that @@ -737,6 +797,10 @@ impl TxGraph { /// transactions (where the transaction with the lower `last_seen` value is omitted from the /// canonical history). /// + /// `evicted_at` is used to track when a transaction was last observed in the mempool before + /// disappearing. It helps determine whether a transaction was potentially replaced, allowing + /// the graph to filter out missing transactions that should no longer be considered valid. + /// /// Not setting a `seen_at` value means unconfirmed transactions introduced by this update will /// not be part of the canonical history of transactions. /// @@ -765,6 +829,14 @@ impl TxGraph { changeset.merge(self.insert_seen_at(txid, seen_at)); } } + for txid in update.evicted { + // We want the `evicted_at` value to override the `last_seen` value of the transaction. + // If there is no `last_seen`, there is no need for the `evicted_at` value since the + // transaction will not be canonical anyway. + if let Some(&evicted_at) = self.last_seen.get(&txid) { + changeset.merge(self.insert_evicted_at(txid, evicted_at)); + } + } changeset } @@ -782,6 +854,7 @@ impl TxGraph { .flat_map(|(txid, anchors)| anchors.iter().map(|a| (a.clone(), *txid))) .collect(), last_seen: self.last_seen.iter().map(|(&k, &v)| (k, v)).collect(), + last_evicted: self.last_evicted.iter().map(|(&k, &v)| (k, v)).collect(), } } @@ -799,6 +872,9 @@ impl TxGraph { for (txid, seen_at) in changeset.last_seen { let _ = self.insert_seen_at(txid, seen_at); } + for (txid, evicted_at) in changeset.last_evicted { + let _ = self.insert_evicted_at(txid, evicted_at); + } } } @@ -969,9 +1045,14 @@ impl TxGraph { /// List txids by descending last-seen order. /// - /// Transactions without last-seens are excluded. - pub fn txids_by_descending_last_seen(&self) -> impl ExactSizeIterator + '_ { - self.txs_by_last_seen.iter().copied().rev() + /// Transactions without last-seens are excluded. Transactions with a last-evicted timestamp + /// equal or higher than it's last-seen timestamp are excluded. + pub fn txids_by_descending_last_seen(&self) -> impl Iterator + '_ { + self.txs_by_last_seen + .iter() + .copied() + .rev() + .filter(|(last_seen, txid)| !matches!(self.last_evicted.get(txid), Some(last_evicted) if last_evicted >= last_seen)) } /// Returns a [`CanonicalIter`]. @@ -1110,6 +1191,48 @@ impl TxGraph { self.try_balance(chain, chain_tip, outpoints, trust_predicate) .expect("oracle is infallible") } + + /// Iterate over unconfirmed txids that we expect to exist in a chain source's spk history + /// response. + /// + /// This is used to fill [`SyncRequestBuilder::expected_unconfirmed_spk_txids`](bdk_core::spk_client::SyncRequestBuilder::expected_unconfirmed_spk_txids). + /// + /// The spk range can be contrained with `range`. + pub fn expected_unconfirmed_spk_txids<'a, C, I>( + &'a self, + chain: &'a C, + chain_tip: BlockId, + indexer: &'a impl AsRef>, + range: impl RangeBounds + 'a, + ) -> Result, C::Error> + where + C: ChainOracle, + I: fmt::Debug + Clone + Ord + 'a, + { + let mut spk_txs = vec![]; + for res in self.try_list_canonical_txs(chain, chain_tip) { + let canonical_tx = res?; + if canonical_tx.chain_position.is_confirmed() { + continue; + } + let txid = canonical_tx.tx_node.txid; + let tx = canonical_tx.tx_node.tx; + let outpoints = tx.input.iter().map(|txin| txin.previous_output).chain( + tx.output + .iter() + .enumerate() + .map(|(vout, _)| OutPoint::new(txid, vout as u32)), + ); + for op in outpoints { + if let Some((index, txo)) = indexer.as_ref().txout(op) { + if range.contains(index) { + spk_txs.push((txid, txo.script_pubkey.clone())); + } + } + } + } + Ok(spk_txs) + } } /// The [`ChangeSet`] represents changes to a [`TxGraph`]. @@ -1139,6 +1262,8 @@ pub struct ChangeSet { pub anchors: BTreeSet<(A, Txid)>, /// Added last-seen unix timestamps of transactions. pub last_seen: BTreeMap, + /// Added timestamps of when a transaction is last evicted from the mempool. + pub last_evicted: BTreeMap, } impl Default for ChangeSet { @@ -1148,6 +1273,7 @@ impl Default for ChangeSet { txouts: Default::default(), anchors: Default::default(), last_seen: Default::default(), + last_evicted: Default::default(), } } } @@ -1202,6 +1328,14 @@ impl Merge for ChangeSet { .filter(|(txid, update_ls)| self.last_seen.get(txid) < Some(update_ls)) .collect::>(), ); + // last_evicted timestamps should only increase + self.last_evicted.extend( + other + .last_evicted + .into_iter() + .filter(|(txid, update_lm)| self.last_evicted.get(txid) < Some(update_lm)) + .collect::>(), + ); } fn is_empty(&self) -> bool { @@ -1209,6 +1343,7 @@ impl Merge for ChangeSet { && self.txouts.is_empty() && self.anchors.is_empty() && self.last_seen.is_empty() + && self.last_evicted.is_empty() } } @@ -1228,6 +1363,7 @@ impl ChangeSet { self.anchors.into_iter().map(|(a, txid)| (f(a), txid)), ), last_seen: self.last_seen, + last_evicted: self.last_evicted, } } } diff --git a/crates/chain/tests/test_tx_graph.rs b/crates/chain/tests/test_tx_graph.rs index ef57ac15b..c44e1d2af 100644 --- a/crates/chain/tests/test_tx_graph.rs +++ b/crates/chain/tests/test_tx_graph.rs @@ -115,7 +115,8 @@ fn insert_txouts() { txs: [Arc::new(update_tx.clone())].into(), txouts: update_ops.clone().into(), anchors: [(conf_anchor, update_tx.compute_txid()),].into(), - last_seen: [(hash!("tx2"), 1000000)].into() + last_seen: [(hash!("tx2"), 1000000)].into(), + last_evicted: [].into(), } ); @@ -168,7 +169,8 @@ fn insert_txouts() { txs: [Arc::new(update_tx.clone())].into(), txouts: update_ops.into_iter().chain(original_ops).collect(), anchors: [(conf_anchor, update_tx.compute_txid()),].into(), - last_seen: [(hash!("tx2"), 1000000)].into() + last_seen: [(hash!("tx2"), 1000000)].into(), + last_evicted: [].into(), } ); } @@ -1231,69 +1233,60 @@ fn tx_graph_update_conversion() { let test_cases: &[TestCase] = &[ ("empty_update", TxUpdate::default()), - ( - "single_tx", - TxUpdate { - txs: vec![make_tx(0).into()], - ..Default::default() - }, - ), - ( - "two_txs", - TxUpdate { - txs: vec![make_tx(0).into(), make_tx(1).into()], - ..Default::default() - }, - ), - ( - "with_floating_txouts", - TxUpdate { - txs: vec![make_tx(0).into(), make_tx(1).into()], - txouts: [ - (OutPoint::new(hash!("a"), 0), make_txout(0)), - (OutPoint::new(hash!("a"), 1), make_txout(1)), - (OutPoint::new(hash!("b"), 0), make_txout(2)), - ] - .into(), - ..Default::default() - }, - ), - ( - "with_anchors", - TxUpdate { - txs: vec![make_tx(0).into(), make_tx(1).into()], - txouts: [ - (OutPoint::new(hash!("a"), 0), make_txout(0)), - (OutPoint::new(hash!("a"), 1), make_txout(1)), - (OutPoint::new(hash!("b"), 0), make_txout(2)), - ] - .into(), - anchors: [ - (ConfirmationBlockTime::default(), hash!("a")), - (ConfirmationBlockTime::default(), hash!("b")), - ] - .into(), - ..Default::default() - }, - ), - ( - "with_seen_ats", - TxUpdate { - txs: vec![make_tx(0).into(), make_tx(1).into()], - txouts: [ - (OutPoint::new(hash!("a"), 0), make_txout(0)), - (OutPoint::new(hash!("a"), 1), make_txout(1)), - (OutPoint::new(hash!("d"), 0), make_txout(2)), - ] - .into(), - anchors: [ - (ConfirmationBlockTime::default(), hash!("a")), - (ConfirmationBlockTime::default(), hash!("b")), - ] - .into(), - seen_ats: [(hash!("c"), 12346)].into_iter().collect(), - }, - ), + ("single_tx", { + let mut tx_update = TxUpdate::default(); + tx_update.txs = vec![make_tx(0).into()]; + tx_update + }), + ("two_txs", { + let mut tx_update = TxUpdate::default(); + tx_update.txs = vec![make_tx(0).into(), make_tx(1).into()]; + tx_update + }), + ("with_floating_txouts", { + let mut tx_update = TxUpdate::default(); + tx_update.txs = vec![make_tx(0).into(), make_tx(1).into()]; + tx_update.txouts = [ + (OutPoint::new(hash!("a"), 0), make_txout(0)), + (OutPoint::new(hash!("a"), 1), make_txout(1)), + (OutPoint::new(hash!("b"), 0), make_txout(2)), + ] + .into(); + tx_update + }), + ("with_anchors", { + let mut tx_update = TxUpdate::default(); + tx_update.txs = vec![make_tx(0).into(), make_tx(1).into()]; + tx_update.txouts = [ + (OutPoint::new(hash!("a"), 0), make_txout(0)), + (OutPoint::new(hash!("a"), 1), make_txout(1)), + (OutPoint::new(hash!("b"), 0), make_txout(2)), + ] + .into(); + tx_update.anchors = [ + (ConfirmationBlockTime::default(), hash!("a")), + (ConfirmationBlockTime::default(), hash!("b")), + ] + .into(); + tx_update + }), + ("with_seen_ats", { + let mut tx_update = TxUpdate::default(); + tx_update.txs = vec![make_tx(0).into(), make_tx(1).into()]; + tx_update.txouts = [ + (OutPoint::new(hash!("a"), 0), make_txout(0)), + (OutPoint::new(hash!("a"), 1), make_txout(1)), + (OutPoint::new(hash!("d"), 0), make_txout(2)), + ] + .into(); + tx_update.anchors = [ + (ConfirmationBlockTime::default(), hash!("a")), + (ConfirmationBlockTime::default(), hash!("b")), + ] + .into(); + tx_update.seen_ats = [(hash!("c"), 12346)].into_iter().collect(); + tx_update + }), ]; for (test_name, update) in test_cases { diff --git a/crates/core/src/spk_client.rs b/crates/core/src/spk_client.rs index a5ec813c9..728d9a6fd 100644 --- a/crates/core/src/spk_client.rs +++ b/crates/core/src/spk_client.rs @@ -1,7 +1,7 @@ //! Helper types for spk-based blockchain clients. use crate::{ alloc::{boxed::Box, collections::VecDeque, vec::Vec}, - collections::BTreeMap, + collections::{BTreeMap, HashMap, HashSet}, CheckPoint, ConfirmationBlockTime, Indexed, }; use bitcoin::{OutPoint, Script, ScriptBuf, Txid}; @@ -86,6 +86,14 @@ impl SyncProgress { } } +/// [`Script`] with corresponding [`Txid`] histories. +pub struct SpkWithExpectedTxids { + /// Script pubkey. + pub spk: ScriptBuf, + /// Txid history. + pub txids: HashSet, +} + /// Builds a [`SyncRequest`]. #[must_use] pub struct SyncRequestBuilder { @@ -159,6 +167,23 @@ impl SyncRequestBuilder { self } + /// Add transactions that are expected to exist under a given spk. + /// + /// This is useful for detecting a malicious replacement of an incoming transaction. + pub fn expected_unconfirmed_spk_txids( + mut self, + txs: impl IntoIterator, + ) -> Self { + for (txid, spk) in txs { + self.inner + .spk_histories + .entry(spk) + .or_default() + .insert(txid); + } + self + } + /// Add [`Txid`]s that will be synced against. pub fn txids(mut self, txids: impl IntoIterator) -> Self { self.inner.txids.extend(txids); @@ -213,6 +238,7 @@ pub struct SyncRequest { chain_tip: Option, spks: VecDeque<(I, ScriptBuf)>, spks_consumed: usize, + spk_histories: HashMap>, txids: VecDeque, txids_consumed: usize, outpoints: VecDeque, @@ -226,6 +252,7 @@ impl Default for SyncRequest { chain_tip: None, spks: VecDeque::new(), spks_consumed: 0, + spk_histories: HashMap::new(), txids: VecDeque::new(), txids_consumed: 0, outpoints: VecDeque::new(), @@ -276,6 +303,23 @@ impl SyncRequest { Some(spk) } + /// Advances the sync request and returns the next [`ScriptBuf`] with corresponding [`Txid`] + /// history. + /// + /// Returns [`None`] when there are no more scripts remaining in the request. + pub fn next_spk_with_history(&mut self) -> Option { + let next_spk = self.next_spk()?; + let spk_history = self + .spk_histories + .get(&next_spk) + .cloned() + .unwrap_or_default(); + Some(SpkWithExpectedTxids { + spk: next_spk, + txids: spk_history, + }) + } + /// Advances the sync request and returns the next [`Txid`]. /// /// Returns [`None`] when there are no more txids remaining in the request. @@ -301,6 +345,13 @@ impl SyncRequest { SyncIter::::new(self) } + /// Iterate over [`ScriptBuf`]s with corresponding [`Txid`] histories contained in this request. + pub fn iter_spks_with_expected_txids( + &mut self, + ) -> impl ExactSizeIterator + '_ { + SyncIter::::new(self) + } + /// Iterate over [`Txid`]s contained in this request. pub fn iter_txids(&mut self) -> impl ExactSizeIterator + '_ { SyncIter::::new(self) @@ -524,6 +575,19 @@ impl Iterator for SyncIter<'_, I, ScriptBuf> { } } +impl Iterator for SyncIter<'_, I, SpkWithExpectedTxids> { + type Item = SpkWithExpectedTxids; + + fn next(&mut self) -> Option { + self.request.next_spk_with_history() + } + + fn size_hint(&self) -> (usize, Option) { + let remaining = self.request.spks.len(); + (remaining, Some(remaining)) + } +} + impl Iterator for SyncIter<'_, I, Txid> { type Item = Txid; diff --git a/crates/core/src/tx_update.rs b/crates/core/src/tx_update.rs index 7707578ee..089008fd7 100644 --- a/crates/core/src/tx_update.rs +++ b/crates/core/src/tx_update.rs @@ -1,10 +1,25 @@ -use crate::collections::{BTreeMap, BTreeSet, HashMap}; +use crate::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use alloc::{sync::Arc, vec::Vec}; use bitcoin::{OutPoint, Transaction, TxOut, Txid}; /// Data object used to communicate updates about relevant transactions from some chain data source /// to the core model (usually a `bdk_chain::TxGraph`). +/// +/// ```rust +/// use bdk_core::TxUpdate; +/// # use std::sync::Arc; +/// # use bitcoin::{Transaction, transaction::Version, absolute::LockTime}; +/// # let version = Version::ONE; +/// # let lock_time = LockTime::ZERO; +/// # let tx = Arc::new(Transaction { input: vec![], output: vec![], version, lock_time }); +/// # let txid = tx.compute_txid(); +/// # let anchor = (); +/// let mut tx_update = TxUpdate::default(); +/// tx_update.txs.push(tx); +/// tx_update.anchors.insert((anchor, txid)); +/// ``` #[derive(Debug, Clone)] +#[non_exhaustive] pub struct TxUpdate { /// Full transactions. These are transactions that were determined to be relevant to the wallet /// given the request. @@ -19,6 +34,8 @@ pub struct TxUpdate { /// Seen at times for transactions. This records when a transaction was most recently seen in /// the user's mempool for the sake of tie-breaking other conflicting transactions. pub seen_ats: HashMap, + /// A set of txids missing from the mempool. + pub evicted: HashSet, } impl Default for TxUpdate { @@ -28,6 +45,7 @@ impl Default for TxUpdate { txouts: Default::default(), anchors: Default::default(), seen_ats: Default::default(), + evicted: Default::default(), } } } @@ -47,6 +65,7 @@ impl TxUpdate { .map(|(a, txid)| (map(a), txid)) .collect(), seen_ats: self.seen_ats, + evicted: self.evicted, } } @@ -56,5 +75,6 @@ impl TxUpdate { self.txouts.extend(other.txouts); self.anchors.extend(other.anchors); self.seen_ats.extend(other.seen_ats); + self.evicted.extend(other.evicted); } } diff --git a/crates/electrum/src/bdk_electrum_client.rs b/crates/electrum/src/bdk_electrum_client.rs index 621a69e11..9f3a666f2 100644 --- a/crates/electrum/src/bdk_electrum_client.rs +++ b/crates/electrum/src/bdk_electrum_client.rs @@ -1,14 +1,13 @@ use bdk_core::{ - bitcoin::{block::Header, BlockHash, OutPoint, ScriptBuf, Transaction, Txid}, - collections::{BTreeMap, HashMap}, - spk_client::{FullScanRequest, FullScanResponse, SyncRequest, SyncResponse}, + bitcoin::{block::Header, BlockHash, OutPoint, Transaction, Txid}, + collections::{BTreeMap, HashMap, HashSet}, + spk_client::{ + FullScanRequest, FullScanResponse, SpkWithExpectedTxids, SyncRequest, SyncResponse, + }, BlockId, CheckPoint, ConfirmationBlockTime, TxUpdate, }; use electrum_client::{ElectrumApi, Error, HeaderNotification}; -use std::{ - collections::HashSet, - sync::{Arc, Mutex}, -}; +use std::sync::{Arc, Mutex}; /// We include a chain suffix of a certain length for the purpose of robustness. const CHAIN_SUFFIX_LENGTH: u32 = 8; @@ -138,8 +137,17 @@ impl BdkElectrumClient { let mut last_active_indices = BTreeMap::::default(); for keychain in request.keychains() { let spks = request.iter_spks(keychain.clone()); + let spks_with_history = spks.into_iter().map(|(i, spk)| { + ( + i, + SpkWithExpectedTxids { + spk, + txids: HashSet::::new(), + }, + ) + }); if let Some(last_active_index) = - self.populate_with_spks(&mut tx_update, spks, stop_gap, batch_size)? + self.populate_with_spks(&mut tx_update, spks_with_history, stop_gap, batch_size)? { last_active_indices.insert(keychain, last_active_index); } @@ -206,7 +214,7 @@ impl BdkElectrumClient { self.populate_with_spks( &mut tx_update, request - .iter_spks() + .iter_spks_with_expected_txids() .enumerate() .map(|(i, spk)| (i as u32, spk)), usize::MAX, @@ -243,7 +251,7 @@ impl BdkElectrumClient { fn populate_with_spks( &self, tx_update: &mut TxUpdate, - mut spks: impl Iterator, + mut spks_with_history: impl Iterator, stop_gap: usize, batch_size: usize, ) -> Result, Error> { @@ -251,35 +259,48 @@ impl BdkElectrumClient { let mut last_active_index = Option::::None; loop { - let spks = (0..batch_size) - .map_while(|_| spks.next()) + let spks_with_history = (0..batch_size) + .map_while(|_| spks_with_history.next()) .collect::>(); - if spks.is_empty() { + if spks_with_history.is_empty() { return Ok(last_active_index); } - let spk_histories = self - .inner - .batch_script_get_history(spks.iter().map(|(_, s)| s.as_script()))?; + let spk_histories = self.inner.batch_script_get_history( + spks_with_history.iter().map(|(_, s)| s.spk.as_script()), + )?; - for ((spk_index, _spk), spk_history) in spks.into_iter().zip(spk_histories) { - if spk_history.is_empty() { + for ((spk_index, spk_with_history), history_res) in + spks_with_history.into_iter().zip(spk_histories) + { + if history_res.is_empty() { unused_spk_count = unused_spk_count.saturating_add(1); if unused_spk_count >= stop_gap { return Ok(last_active_index); } + tx_update.evicted.extend(spk_with_history.txids); continue; } else { last_active_index = Some(spk_index); unused_spk_count = 0; } - for tx_res in spk_history { + for tx_res in history_res { tx_update.txs.push(self.fetch_tx(tx_res.tx_hash)?); if let Ok(height) = tx_res.height.try_into() { self.validate_merkle_for_anchor(tx_update, tx_res.tx_hash, height)?; } } + + let fetched_txids = tx_update + .txs + .iter() + .map(|tx| tx.compute_txid()) + .collect::>(); + + tx_update + .evicted + .extend(spk_with_history.txids.difference(&fetched_txids).cloned()); } } } @@ -571,10 +592,8 @@ mod test { // `fetch_prev_txout` on a coinbase transaction will trigger a `fetch_tx` on a transaction // with a txid of all zeros. If `fetch_prev_txout` attempts to fetch this transaction, this // assertion will fail. - let mut tx_update = TxUpdate { - txs: vec![Arc::new(coinbase_tx)], - ..Default::default() - }; + let mut tx_update = TxUpdate::default(); + tx_update.txs = vec![Arc::new(coinbase_tx)]; assert!(client.fetch_prev_txout(&mut tx_update).is_ok()); // Ensure that the txouts are empty. diff --git a/crates/electrum/tests/test_electrum.rs b/crates/electrum/tests/test_electrum.rs index 8c89605e4..d0db849b0 100644 --- a/crates/electrum/tests/test_electrum.rs +++ b/crates/electrum/tests/test_electrum.rs @@ -1,6 +1,8 @@ use bdk_chain::{ - bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, WScriptHash}, + bitcoin::{hashes::Hash, secp256k1::Secp256k1, Address, Amount, ScriptBuf, WScriptHash}, + indexer::keychain_txout::KeychainTxOutIndex, local_chain::LocalChain, + miniscript::Descriptor, spk_client::{FullScanRequest, SyncRequest, SyncResponse}, spk_txout::SpkTxOutIndex, Balance, ConfirmationBlockTime, IndexedTxGraph, Indexer, Merge, TxGraph, @@ -14,7 +16,7 @@ use bdk_testenv::{ }; use core::time::Duration; use electrum_client::ElectrumApi; -use std::collections::{BTreeSet, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::str::FromStr; // Batch size for `sync_with_electrum`. @@ -60,6 +62,129 @@ where Ok(update) } +// Ensure that a wallet can detect a malicious replacement of an incoming transaction. +// +// This checks that both the Electrum chain source and the receiving structures properly track the +// replaced transaction as missing. +#[test] +pub fn detect_receive_tx_cancel() -> anyhow::Result<()> { + const SEND_TX_FEE: Amount = Amount::from_sat(1000); + const UNDO_SEND_TX_FEE: Amount = Amount::from_sat(2000); + + use bdk_chain::keychain_txout::SyncRequestBuilderExt; + let env = TestEnv::new()?; + let rpc_client = env.rpc_client(); + let electrum_client = electrum_client::Client::new(env.electrsd.electrum_url.as_str())?; + let client = BdkElectrumClient::new(electrum_client); + + let (receiver_desc, _) = Descriptor::parse_descriptor(&Secp256k1::signing_only(), "tr([73c5da0a/86'/0'/0']xprv9xgqHN7yz9MwCkxsBPN5qetuNdQSUttZNKw1dcYTV4mkaAFiBVGQziHs3NRSWMkCzvgjEe3n9xV8oYywvM8at9yRqyaZVz6TYYhX98VjsUk/0/*)") + .expect("must be valid"); + let mut graph = IndexedTxGraph::::new(KeychainTxOutIndex::new(0)); + let _ = graph.index.insert_descriptor((), receiver_desc.clone())?; + let (chain, _) = LocalChain::from_genesis_hash(env.bitcoind.client.get_block_hash(0)?); + + // Derive the receiving address from the descriptor. + let ((_, receiver_spk), _) = graph.index.reveal_next_spk(()).unwrap(); + let receiver_addr = Address::from_script(&receiver_spk, bdk_chain::bitcoin::Network::Regtest)?; + + env.mine_blocks(101, None)?; + + // Select a UTXO to use as an input for constructing our test transactions. + let selected_utxo = rpc_client + .list_unspent(None, None, None, Some(false), None)? + .into_iter() + // Find a block reward tx. + .find(|utxo| utxo.amount == Amount::from_int_btc(50)) + .expect("Must find a block reward UTXO"); + + // Derive the sender's address from the selected UTXO. + let sender_spk = selected_utxo.script_pub_key.clone(); + let sender_addr = Address::from_script(&sender_spk, bdk_chain::bitcoin::Network::Regtest) + .expect("Failed to derive address from UTXO"); + + // Setup the common inputs used by both `send_tx` and `undo_send_tx`. + let inputs = [CreateRawTransactionInput { + txid: selected_utxo.txid, + vout: selected_utxo.vout, + sequence: None, + }]; + + // Create and sign the `send_tx` that sends funds to the receiver address. + let send_tx_outputs = HashMap::from([( + receiver_addr.to_string(), + selected_utxo.amount - SEND_TX_FEE, + )]); + let send_tx = rpc_client.create_raw_transaction(&inputs, &send_tx_outputs, None, Some(true))?; + let send_tx = rpc_client + .sign_raw_transaction_with_wallet(send_tx.raw_hex(), None, None)? + .transaction()?; + + // Create and sign the `undo_send_tx` transaction. This redirects funds back to the sender + // address. + let undo_send_outputs = HashMap::from([( + sender_addr.to_string(), + selected_utxo.amount - UNDO_SEND_TX_FEE, + )]); + let undo_send_tx = + rpc_client.create_raw_transaction(&inputs, &undo_send_outputs, None, Some(true))?; + let undo_send_tx = rpc_client + .sign_raw_transaction_with_wallet(undo_send_tx.raw_hex(), None, None)? + .transaction()?; + + // Sync after broadcasting the `send_tx`. Ensure that we detect and receive the `send_tx`. + let send_txid = env.rpc_client().send_raw_transaction(send_tx.raw_hex())?; + env.wait_until_electrum_sees_txid(send_txid, Duration::from_secs(6))?; + let sync_request = SyncRequest::builder() + .chain_tip(chain.tip()) + .revealed_spks_from_indexer(&graph.index, ..) + .expected_unconfirmed_spk_txids(graph.expected_unconfirmed_spk_txids( + &chain, + chain.tip().block_id(), + .., + )?); + let sync_response = client.sync(sync_request, BATCH_SIZE, true)?; + assert!( + sync_response + .tx_update + .txs + .iter() + .any(|tx| tx.compute_txid() == send_txid), + "sync response must include the send_tx" + ); + let changeset = graph.apply_update(sync_response.tx_update.clone()); + assert!( + changeset.tx_graph.txs.contains(&send_tx), + "tx graph must deem send_tx relevant and include it" + ); + + // Sync after broadcasting the `undo_send_tx`. Verify that `send_tx` is now missing from the + // mempool. + let undo_send_txid = env + .rpc_client() + .send_raw_transaction(undo_send_tx.raw_hex())?; + env.wait_until_electrum_sees_txid(undo_send_txid, Duration::from_secs(6))?; + let sync_request = SyncRequest::builder() + .chain_tip(chain.tip()) + .revealed_spks_from_indexer(&graph.index, ..) + .expected_unconfirmed_spk_txids(graph.expected_unconfirmed_spk_txids( + &chain, + chain.tip().block_id(), + .., + )?); + let sync_response = client.sync(sync_request, BATCH_SIZE, true)?; + assert!( + sync_response.tx_update.evicted.contains(&send_txid), + "sync response must track send_tx as missing from mempool" + ); + let changeset = graph.apply_update(sync_response.tx_update.clone()); + assert!( + changeset.tx_graph.last_evicted.contains_key(&send_txid), + "tx graph must track send_tx as missing" + ); + + Ok(()) +} + /// If an spk history contains a tx that spends another unconfirmed tx (chained mempool history), /// the Electrum API will return the tx with a negative height. This should succeed and not panic. #[test] diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs index 4c1bd0ad7..bb7e641df 100644 --- a/crates/esplora/src/async_ext.rs +++ b/crates/esplora/src/async_ext.rs @@ -1,8 +1,10 @@ use async_trait::async_trait; use bdk_core::collections::{BTreeMap, BTreeSet, HashSet}; -use bdk_core::spk_client::{FullScanRequest, FullScanResponse, SyncRequest, SyncResponse}; +use bdk_core::spk_client::{ + FullScanRequest, FullScanResponse, SpkWithExpectedTxids, SyncRequest, SyncResponse, +}; use bdk_core::{ - bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid}, + bitcoin::{BlockHash, OutPoint, Txid}, BlockId, CheckPoint, ConfirmationBlockTime, Indexed, TxUpdate, }; use esplora_client::Sleeper; @@ -77,10 +79,19 @@ where let mut last_active_indices = BTreeMap::::new(); for keychain in keychains { let keychain_spks = request.iter_spks(keychain.clone()); + let spks_with_history = keychain_spks.into_iter().map(|(i, spk)| { + ( + i, + SpkWithExpectedTxids { + spk, + txids: HashSet::::new(), + }, + ) + }); let (update, last_active_index) = fetch_txs_with_keychain_spks( self, &mut inserted_txs, - keychain_spks, + spks_with_history, stop_gap, parallel_requests, ) @@ -125,7 +136,7 @@ where fetch_txs_with_spks( self, &mut inserted_txs, - request.iter_spks(), + request.iter_spks_with_expected_txids(), parallel_requests, ) .await?, @@ -279,12 +290,12 @@ async fn chain_update( async fn fetch_txs_with_keychain_spks( client: &esplora_client::AsyncClient, inserted_txs: &mut HashSet, - mut keychain_spks: I, + mut spks_with_history: I, stop_gap: usize, parallel_requests: usize, ) -> Result<(TxUpdate, Option), Error> where - I: Iterator> + Send, + I: Iterator> + Send, S: Sleeper + Clone + Send + Sync, { type TxsOfSpkIndex = (u32, Vec); @@ -292,18 +303,22 @@ where let mut update = TxUpdate::::default(); let mut last_index = Option::::None; let mut last_active_index = Option::::None; + let mut spk_txids = HashSet::new(); loop { - let handles = keychain_spks + let handles = spks_with_history .by_ref() .take(parallel_requests) - .map(|(spk_index, spk)| { + .map(|(spk_index, spk_with_history)| { + spk_txids.extend(&spk_with_history.txids); let client = client.clone(); async move { let mut last_seen = None; let mut spk_txs = Vec::new(); loop { - let txs = client.scripthash_txs(&spk, last_seen).await?; + let txs = client + .scripthash_txs(&spk_with_history.spk, last_seen) + .await?; let tx_count = txs.len(); last_seen = txs.last().map(|tx| tx.txid); spk_txs.extend(txs); @@ -344,6 +359,10 @@ where } } + update + .evicted + .extend(spk_txids.difference(inserted_txs).cloned()); + Ok((update, last_active_index)) } @@ -358,18 +377,21 @@ where async fn fetch_txs_with_spks( client: &esplora_client::AsyncClient, inserted_txs: &mut HashSet, - spks: I, + spks_with_history: I, parallel_requests: usize, ) -> Result, Error> where - I: IntoIterator + Send, + I: IntoIterator + Send, I::IntoIter: Send, S: Sleeper + Clone + Send + Sync, { fetch_txs_with_keychain_spks( client, inserted_txs, - spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)), + spks_with_history + .into_iter() + .enumerate() + .map(|(i, spk)| (i as u32, spk)), usize::MAX, parallel_requests, ) diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 655055b33..b7d170abc 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -1,7 +1,9 @@ use bdk_core::collections::{BTreeMap, BTreeSet, HashSet}; -use bdk_core::spk_client::{FullScanRequest, FullScanResponse, SyncRequest, SyncResponse}; +use bdk_core::spk_client::{ + FullScanRequest, FullScanResponse, SpkWithExpectedTxids, SyncRequest, SyncResponse, +}; use bdk_core::{ - bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid}, + bitcoin::{BlockHash, OutPoint, Txid}, BlockId, CheckPoint, ConfirmationBlockTime, Indexed, TxUpdate, }; use esplora_client::{OutputStatus, Tx}; @@ -67,10 +69,19 @@ impl EsploraExt for esplora_client::BlockingClient { let mut last_active_indices = BTreeMap::::new(); for keychain in request.keychains() { let keychain_spks = request.iter_spks(keychain.clone()); + let spks_with_history = keychain_spks.into_iter().map(|(i, spk)| { + ( + i, + SpkWithExpectedTxids { + spk, + txids: HashSet::::new(), + }, + ) + }); let (update, last_active_index) = fetch_txs_with_keychain_spks( self, &mut inserted_txs, - keychain_spks, + spks_with_history, stop_gap, parallel_requests, )?; @@ -116,7 +127,7 @@ impl EsploraExt for esplora_client::BlockingClient { tx_update.extend(fetch_txs_with_spks( self, &mut inserted_txs, - request.iter_spks(), + request.iter_spks_with_expected_txids(), parallel_requests, )?); tx_update.extend(fetch_txs_with_txids( @@ -248,10 +259,10 @@ fn chain_update( Ok(tip) } -fn fetch_txs_with_keychain_spks>>( +fn fetch_txs_with_keychain_spks>>( client: &esplora_client::BlockingClient, inserted_txs: &mut HashSet, - mut keychain_spks: I, + mut spks_with_history: I, stop_gap: usize, parallel_requests: usize, ) -> Result<(TxUpdate, Option), Error> { @@ -260,19 +271,21 @@ fn fetch_txs_with_keychain_spks>>( let mut update = TxUpdate::::default(); let mut last_index = Option::::None; let mut last_active_index = Option::::None; + let mut spk_txids = HashSet::new(); loop { - let handles = keychain_spks + let handles = spks_with_history .by_ref() .take(parallel_requests) - .map(|(spk_index, spk)| { + .map(|(spk_index, spk_with_history)| { + spk_txids.extend(&spk_with_history.txids); std::thread::spawn({ let client = client.clone(); move || -> Result { let mut last_seen = None; let mut spk_txs = Vec::new(); loop { - let txs = client.scripthash_txs(&spk, last_seen)?; + let txs = client.scripthash_txs(&spk_with_history.spk, last_seen)?; let tx_count = txs.len(); last_seen = txs.last().map(|tx| tx.txid); spk_txs.extend(txs); @@ -315,6 +328,10 @@ fn fetch_txs_with_keychain_spks>>( } } + update + .evicted + .extend(spk_txids.difference(inserted_txs).cloned()); + Ok((update, last_active_index)) } @@ -326,16 +343,19 @@ fn fetch_txs_with_keychain_spks>>( /// requests to make in parallel. /// /// Refer to [crate-level docs](crate) for more. -fn fetch_txs_with_spks>( +fn fetch_txs_with_spks>( client: &esplora_client::BlockingClient, inserted_txs: &mut HashSet, - spks: I, + spks_with_history: I, parallel_requests: usize, ) -> Result, Error> { fetch_txs_with_keychain_spks( client, inserted_txs, - spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)), + spks_with_history + .into_iter() + .enumerate() + .map(|(i, spk_with_history)| (i as u32, spk_with_history)), usize::MAX, parallel_requests, ) diff --git a/crates/esplora/tests/async_ext.rs b/crates/esplora/tests/async_ext.rs index b535d2bfa..f6b083fab 100644 --- a/crates/esplora/tests/async_ext.rs +++ b/crates/esplora/tests/async_ext.rs @@ -1,14 +1,146 @@ -use bdk_chain::spk_client::{FullScanRequest, SyncRequest}; -use bdk_chain::{ConfirmationBlockTime, TxGraph}; +use bdk_chain::{ + bitcoin::{secp256k1::Secp256k1, Address, Amount}, + indexer::keychain_txout::KeychainTxOutIndex, + local_chain::LocalChain, + miniscript::Descriptor, + spk_client::{FullScanRequest, SyncRequest}, + ConfirmationBlockTime, IndexedTxGraph, TxGraph, +}; use bdk_esplora::EsploraAsyncExt; use esplora_client::{self, Builder}; -use std::collections::{BTreeSet, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::str::FromStr; use std::thread::sleep; use std::time::Duration; -use bdk_chain::bitcoin::{Address, Amount}; -use bdk_testenv::{anyhow, bitcoincore_rpc::RpcApi, TestEnv}; +use bdk_testenv::{ + anyhow, + bitcoincore_rpc::{json::CreateRawTransactionInput, RawTx, RpcApi}, + TestEnv, +}; + +// Ensure that a wallet can detect a malicious replacement of an incoming transaction. +// +// This checks that both the Electrum chain source and the receiving structures properly track the +// replaced transaction as missing. +#[tokio::test] +pub async fn detect_receive_tx_cancel() -> anyhow::Result<()> { + const SEND_TX_FEE: Amount = Amount::from_sat(1000); + const UNDO_SEND_TX_FEE: Amount = Amount::from_sat(2000); + + use bdk_chain::keychain_txout::SyncRequestBuilderExt; + let env = TestEnv::new()?; + let rpc_client = env.rpc_client(); + let base_url = format!("http://{}", &env.electrsd.esplora_url.clone().unwrap()); + let client = Builder::new(base_url.as_str()).build_async()?; + + let (receiver_desc, _) = Descriptor::parse_descriptor(&Secp256k1::signing_only(), "tr([73c5da0a/86'/0'/0']xprv9xgqHN7yz9MwCkxsBPN5qetuNdQSUttZNKw1dcYTV4mkaAFiBVGQziHs3NRSWMkCzvgjEe3n9xV8oYywvM8at9yRqyaZVz6TYYhX98VjsUk/0/*)") + .expect("must be valid"); + let mut graph = IndexedTxGraph::::new(KeychainTxOutIndex::new(0)); + let _ = graph.index.insert_descriptor((), receiver_desc.clone())?; + let (chain, _) = LocalChain::from_genesis_hash(env.bitcoind.client.get_block_hash(0)?); + + // Derive the receiving address from the descriptor. + let ((_, receiver_spk), _) = graph.index.reveal_next_spk(()).unwrap(); + let receiver_addr = Address::from_script(&receiver_spk, bdk_chain::bitcoin::Network::Regtest)?; + + env.mine_blocks(101, None)?; + + // Select a UTXO to use as an input for constructing our test transactions. + let selected_utxo = rpc_client + .list_unspent(None, None, None, Some(false), None)? + .into_iter() + // Find a block reward tx. + .find(|utxo| utxo.amount == Amount::from_int_btc(50)) + .expect("Must find a block reward UTXO"); + + // Derive the sender's address from the selected UTXO. + let sender_spk = selected_utxo.script_pub_key.clone(); + let sender_addr = Address::from_script(&sender_spk, bdk_chain::bitcoin::Network::Regtest) + .expect("Failed to derive address from UTXO"); + + // Setup the common inputs used by both `send_tx` and `undo_send_tx`. + let inputs = [CreateRawTransactionInput { + txid: selected_utxo.txid, + vout: selected_utxo.vout, + sequence: None, + }]; + + // Create and sign the `send_tx` that sends funds to the receiver address. + let send_tx_outputs = HashMap::from([( + receiver_addr.to_string(), + selected_utxo.amount - SEND_TX_FEE, + )]); + let send_tx = rpc_client.create_raw_transaction(&inputs, &send_tx_outputs, None, Some(true))?; + let send_tx = rpc_client + .sign_raw_transaction_with_wallet(send_tx.raw_hex(), None, None)? + .transaction()?; + + // Create and sign the `undo_send_tx` transaction. This redirects funds back to the sender + // address. + let undo_send_outputs = HashMap::from([( + sender_addr.to_string(), + selected_utxo.amount - UNDO_SEND_TX_FEE, + )]); + let undo_send_tx = + rpc_client.create_raw_transaction(&inputs, &undo_send_outputs, None, Some(true))?; + let undo_send_tx = rpc_client + .sign_raw_transaction_with_wallet(undo_send_tx.raw_hex(), None, None)? + .transaction()?; + + // Sync after broadcasting the `send_tx`. Ensure that we detect and receive the `send_tx`. + let send_txid = env.rpc_client().send_raw_transaction(send_tx.raw_hex())?; + env.wait_until_electrum_sees_txid(send_txid, Duration::from_secs(6))?; + let sync_request = SyncRequest::builder() + .chain_tip(chain.tip()) + .revealed_spks_from_indexer(&graph.index, ..) + .expected_unconfirmed_spk_txids(graph.expected_unconfirmed_spk_txids( + &chain, + chain.tip().block_id(), + .., + )?); + let sync_response = client.sync(sync_request, 1).await?; + assert!( + sync_response + .tx_update + .txs + .iter() + .any(|tx| tx.compute_txid() == send_txid), + "sync response must include the send_tx" + ); + let changeset = graph.apply_update(sync_response.tx_update.clone()); + assert!( + changeset.tx_graph.txs.contains(&send_tx), + "tx graph must deem send_tx relevant and include it" + ); + + // Sync after broadcasting the `undo_send_tx`. Verify that `send_tx` is now missing from the + // mempool. + let undo_send_txid = env + .rpc_client() + .send_raw_transaction(undo_send_tx.raw_hex())?; + env.wait_until_electrum_sees_txid(undo_send_txid, Duration::from_secs(6))?; + let sync_request = SyncRequest::builder() + .chain_tip(chain.tip()) + .revealed_spks_from_indexer(&graph.index, ..) + .expected_unconfirmed_spk_txids(graph.expected_unconfirmed_spk_txids( + &chain, + chain.tip().block_id(), + .., + )?); + let sync_response = client.sync(sync_request, 1).await?; + assert!( + sync_response.tx_update.evicted.contains(&send_txid), + "sync response must track send_tx as missing from mempool" + ); + let changeset = graph.apply_update(sync_response.tx_update.clone()); + assert!( + changeset.tx_graph.last_evicted.contains_key(&send_txid), + "tx graph must track send_tx as missing" + ); + + Ok(()) +} #[tokio::test] pub async fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { diff --git a/crates/esplora/tests/blocking_ext.rs b/crates/esplora/tests/blocking_ext.rs index d4191ceb0..ac14968c0 100644 --- a/crates/esplora/tests/blocking_ext.rs +++ b/crates/esplora/tests/blocking_ext.rs @@ -1,14 +1,146 @@ -use bdk_chain::spk_client::{FullScanRequest, SyncRequest}; -use bdk_chain::{ConfirmationBlockTime, TxGraph}; +use bdk_chain::{ + bitcoin::{secp256k1::Secp256k1, Address, Amount}, + indexer::keychain_txout::KeychainTxOutIndex, + local_chain::LocalChain, + miniscript::Descriptor, + spk_client::{FullScanRequest, SyncRequest}, + ConfirmationBlockTime, IndexedTxGraph, TxGraph, +}; use bdk_esplora::EsploraExt; use esplora_client::{self, Builder}; -use std::collections::{BTreeSet, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::str::FromStr; use std::thread::sleep; use std::time::Duration; -use bdk_chain::bitcoin::{Address, Amount}; -use bdk_testenv::{anyhow, bitcoincore_rpc::RpcApi, TestEnv}; +use bdk_testenv::{ + anyhow, + bitcoincore_rpc::{json::CreateRawTransactionInput, RawTx, RpcApi}, + TestEnv, +}; + +// Ensure that a wallet can detect a malicious replacement of an incoming transaction. +// +// This checks that both the Electrum chain source and the receiving structures properly track the +// replaced transaction as missing. +#[test] +pub fn detect_receive_tx_cancel() -> anyhow::Result<()> { + const SEND_TX_FEE: Amount = Amount::from_sat(1000); + const UNDO_SEND_TX_FEE: Amount = Amount::from_sat(2000); + + use bdk_chain::keychain_txout::SyncRequestBuilderExt; + let env = TestEnv::new()?; + let rpc_client = env.rpc_client(); + let base_url = format!("http://{}", &env.electrsd.esplora_url.clone().unwrap()); + let client = Builder::new(base_url.as_str()).build_blocking(); + + let (receiver_desc, _) = Descriptor::parse_descriptor(&Secp256k1::signing_only(), "tr([73c5da0a/86'/0'/0']xprv9xgqHN7yz9MwCkxsBPN5qetuNdQSUttZNKw1dcYTV4mkaAFiBVGQziHs3NRSWMkCzvgjEe3n9xV8oYywvM8at9yRqyaZVz6TYYhX98VjsUk/0/*)") + .expect("must be valid"); + let mut graph = IndexedTxGraph::::new(KeychainTxOutIndex::new(0)); + let _ = graph.index.insert_descriptor((), receiver_desc.clone())?; + let (chain, _) = LocalChain::from_genesis_hash(env.bitcoind.client.get_block_hash(0)?); + + // Derive the receiving address from the descriptor. + let ((_, receiver_spk), _) = graph.index.reveal_next_spk(()).unwrap(); + let receiver_addr = Address::from_script(&receiver_spk, bdk_chain::bitcoin::Network::Regtest)?; + + env.mine_blocks(101, None)?; + + // Select a UTXO to use as an input for constructing our test transactions. + let selected_utxo = rpc_client + .list_unspent(None, None, None, Some(false), None)? + .into_iter() + // Find a block reward tx. + .find(|utxo| utxo.amount == Amount::from_int_btc(50)) + .expect("Must find a block reward UTXO"); + + // Derive the sender's address from the selected UTXO. + let sender_spk = selected_utxo.script_pub_key.clone(); + let sender_addr = Address::from_script(&sender_spk, bdk_chain::bitcoin::Network::Regtest) + .expect("Failed to derive address from UTXO"); + + // Setup the common inputs used by both `send_tx` and `undo_send_tx`. + let inputs = [CreateRawTransactionInput { + txid: selected_utxo.txid, + vout: selected_utxo.vout, + sequence: None, + }]; + + // Create and sign the `send_tx` that sends funds to the receiver address. + let send_tx_outputs = HashMap::from([( + receiver_addr.to_string(), + selected_utxo.amount - SEND_TX_FEE, + )]); + let send_tx = rpc_client.create_raw_transaction(&inputs, &send_tx_outputs, None, Some(true))?; + let send_tx = rpc_client + .sign_raw_transaction_with_wallet(send_tx.raw_hex(), None, None)? + .transaction()?; + + // Create and sign the `undo_send_tx` transaction. This redirects funds back to the sender + // address. + let undo_send_outputs = HashMap::from([( + sender_addr.to_string(), + selected_utxo.amount - UNDO_SEND_TX_FEE, + )]); + let undo_send_tx = + rpc_client.create_raw_transaction(&inputs, &undo_send_outputs, None, Some(true))?; + let undo_send_tx = rpc_client + .sign_raw_transaction_with_wallet(undo_send_tx.raw_hex(), None, None)? + .transaction()?; + + // Sync after broadcasting the `send_tx`. Ensure that we detect and receive the `send_tx`. + let send_txid = env.rpc_client().send_raw_transaction(send_tx.raw_hex())?; + env.wait_until_electrum_sees_txid(send_txid, Duration::from_secs(6))?; + let sync_request = SyncRequest::builder() + .chain_tip(chain.tip()) + .revealed_spks_from_indexer(&graph.index, ..) + .expected_unconfirmed_spk_txids(graph.expected_unconfirmed_spk_txids( + &chain, + chain.tip().block_id(), + .., + )?); + let sync_response = client.sync(sync_request, 1)?; + assert!( + sync_response + .tx_update + .txs + .iter() + .any(|tx| tx.compute_txid() == send_txid), + "sync response must include the send_tx" + ); + let changeset = graph.apply_update(sync_response.tx_update.clone()); + assert!( + changeset.tx_graph.txs.contains(&send_tx), + "tx graph must deem send_tx relevant and include it" + ); + + // Sync after broadcasting the `undo_send_tx`. Verify that `send_tx` is now missing from the + // mempool. + let undo_send_txid = env + .rpc_client() + .send_raw_transaction(undo_send_tx.raw_hex())?; + env.wait_until_electrum_sees_txid(undo_send_txid, Duration::from_secs(6))?; + let sync_request = SyncRequest::builder() + .chain_tip(chain.tip()) + .revealed_spks_from_indexer(&graph.index, ..) + .expected_unconfirmed_spk_txids(graph.expected_unconfirmed_spk_txids( + &chain, + chain.tip().block_id(), + .., + )?); + let sync_response = client.sync(sync_request, 1)?; + assert!( + sync_response.tx_update.evicted.contains(&send_txid), + "sync response must track send_tx as missing from mempool" + ); + let changeset = graph.apply_update(sync_response.tx_update.clone()); + assert!( + changeset.tx_graph.last_evicted.contains_key(&send_txid), + "tx graph must track send_tx as missing" + ); + + Ok(()) +} #[test] pub fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { diff --git a/crates/wallet/src/test_utils.rs b/crates/wallet/src/test_utils.rs index 7e1778fab..14ff40ed4 100644 --- a/crates/wallet/src/test_utils.rs +++ b/crates/wallet/src/test_utils.rs @@ -4,7 +4,7 @@ use alloc::string::ToString; use alloc::sync::Arc; use core::str::FromStr; -use bdk_chain::{tx_graph, BlockId, ConfirmationBlockTime}; +use bdk_chain::{BlockId, ConfirmationBlockTime}; use bitcoin::{ absolute, hashes::Hash, transaction, Address, Amount, BlockHash, FeeRate, Network, OutPoint, Transaction, TxIn, TxOut, Txid, @@ -314,12 +314,11 @@ pub fn insert_checkpoint(wallet: &mut Wallet, block: BlockId) { /// Insert transaction pub fn insert_tx(wallet: &mut Wallet, tx: Transaction) { + let mut tx_update = bdk_chain::TxUpdate::default(); + tx_update.txs.push(Arc::new(tx)); wallet .apply_update(Update { - tx_update: bdk_chain::TxUpdate { - txs: vec![Arc::new(tx)], - ..Default::default() - }, + tx_update, ..Default::default() }) .unwrap(); @@ -329,12 +328,11 @@ pub fn insert_tx(wallet: &mut Wallet, tx: Transaction) { /// the given `anchor`. Note: to be considered confirmed the anchor block must exist in /// the current active chain. pub fn insert_anchor(wallet: &mut Wallet, txid: Txid, anchor: ConfirmationBlockTime) { + let mut tx_update = bdk_chain::TxUpdate::default(); + tx_update.anchors.insert((anchor, txid)); wallet .apply_update(Update { - tx_update: tx_graph::TxUpdate { - anchors: [(anchor, txid)].into(), - ..Default::default() - }, + tx_update, ..Default::default() }) .unwrap(); @@ -342,12 +340,11 @@ pub fn insert_anchor(wallet: &mut Wallet, txid: Txid, anchor: ConfirmationBlockT /// Marks the given `txid` seen as unconfirmed at `seen_at` pub fn insert_seen_at(wallet: &mut Wallet, txid: Txid, seen_at: u64) { + let mut tx_update = bdk_chain::TxUpdate::default(); + tx_update.seen_ats.insert(txid, seen_at); wallet .apply_update(crate::Update { - tx_update: tx_graph::TxUpdate { - seen_ats: [(txid, seen_at)].into_iter().collect(), - ..Default::default() - }, + tx_update, ..Default::default() }) .unwrap(); diff --git a/example-crates/example_bitcoind_rpc_polling/src/main.rs b/example-crates/example_bitcoind_rpc_polling/src/main.rs index 95c547967..33e5281c6 100644 --- a/example-crates/example_bitcoind_rpc_polling/src/main.rs +++ b/example-crates/example_bitcoind_rpc_polling/src/main.rs @@ -11,10 +11,7 @@ use bdk_bitcoind_rpc::{ bitcoincore_rpc::{Auth, Client, RpcApi}, Emitter, }; -use bdk_chain::{ - bitcoin::{Block, Transaction}, - local_chain, Merge, -}; +use bdk_chain::{bitcoin::Block, local_chain, Merge}; use example_cli::{ anyhow, clap::{self, Args, Subcommand}, @@ -36,7 +33,7 @@ const DB_COMMIT_DELAY: Duration = Duration::from_secs(60); #[derive(Debug)] enum Emission { Block(bdk_bitcoind_rpc::BlockEvent), - Mempool(Vec<(Transaction, u64)>), + Mempool(bdk_bitcoind_rpc::MempoolEvent), Tip(u32), } @@ -204,7 +201,7 @@ fn main() -> anyhow::Result<()> { let graph_changeset = graph .lock() .unwrap() - .batch_insert_relevant_unconfirmed(mempool_txs); + .batch_insert_relevant_unconfirmed(mempool_txs.emitted_txs); { let db = &mut *db.lock().unwrap(); db_stage.merge(ChangeSet { @@ -287,7 +284,21 @@ fn main() -> anyhow::Result<()> { (chain_changeset, graph_changeset) } Emission::Mempool(mempool_txs) => { - let graph_changeset = graph.batch_insert_relevant_unconfirmed(mempool_txs); + let mut graph_changeset = graph + .batch_insert_relevant_unconfirmed(mempool_txs.emitted_txs.clone()); + let expected_txids = graph + .expected_unconfirmed_spk_txids( + &chain.clone(), + chain.tip().block_id(), + .., + )? + .into_iter() + .map(|(txid, _)| txid); + let evicted_txids = mempool_txs.evicted_txids(expected_txids); + for txid in evicted_txids { + graph_changeset + .merge(graph.insert_evicted_at(txid, mempool_txs.last_seen)); + } (local_chain::ChangeSet::default(), graph_changeset) } Emission::Tip(h) => { diff --git a/example-crates/example_wallet_rpc/src/main.rs b/example-crates/example_wallet_rpc/src/main.rs index 204224bcb..c3ac70f49 100644 --- a/example-crates/example_wallet_rpc/src/main.rs +++ b/example-crates/example_wallet_rpc/src/main.rs @@ -3,7 +3,7 @@ use bdk_bitcoind_rpc::{ Emitter, }; use bdk_wallet::{ - bitcoin::{Block, Network, Transaction}, + bitcoin::{Block, Network}, file_store::Store, KeychainKind, Wallet, }; @@ -73,7 +73,7 @@ impl Args { enum Emission { SigTerm, Block(bdk_bitcoind_rpc::BlockEvent), - Mempool(Vec<(Transaction, u64)>), + Mempool(bdk_bitcoind_rpc::MempoolEvent), } fn main() -> anyhow::Result<()> { @@ -157,7 +157,7 @@ fn main() -> anyhow::Result<()> { } Emission::Mempool(mempool_emission) => { let start_apply_mempool = Instant::now(); - wallet.apply_unconfirmed_txs(mempool_emission); + wallet.apply_unconfirmed_txs(mempool_emission.emitted_txs); wallet.persist(&mut db)?; println!( "Applied unconfirmed transactions in {}s",