Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 227 additions & 37 deletions crates/bitcoind_rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
//! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`].
//!
//! To only get block updates (exclude mempool transactions), the caller can use
//! [`Emitter::next_block`] or/and [`Emitter::next_header`] until it returns `Ok(None)` (which means
//! the chain tip is reached). A separate method, [`Emitter::mempool`] can be used to emit the whole
//! mempool.
//! [`Emitter::next_block`] until it returns `Ok(None)` (which means the chain tip is reached). A
//! separate method, [`Emitter::mempool`] can be used to emit the whole mempool.
#![warn(missing_docs)]

use bdk_core::{BlockId, CheckPoint};
use bitcoin::{block::Header, Block, BlockHash, Transaction};
use bitcoincore_rpc::bitcoincore_rpc_json;
use bitcoin::{Block, BlockHash, Transaction, Txid};
use bitcoincore_rpc::{bitcoincore_rpc_json, RpcApi};
use std::{collections::HashSet, ops::Deref};

pub mod bip158;

Expand All @@ -22,8 +22,8 @@ pub use bitcoincore_rpc;
/// Refer to [module-level documentation] for more.
///
/// [module-level documentation]: crate
pub struct Emitter<'c, C> {
client: &'c C,
pub struct Emitter<C> {
client: C,
start_height: u32,

/// The checkpoint of the last-emitted block that is in the best chain. If it is later found
Expand All @@ -43,28 +43,65 @@ pub struct Emitter<'c, C> {
/// The last emitted block during our last mempool emission. This is used to determine whether
/// there has been a reorg since our last mempool emission.
last_mempool_tip: Option<u32>,

/// A set of txids currently assumed to still be in the mempool.
///
/// This is used to detect mempool evictions by comparing the set against the latest mempool
/// snapshot from bitcoind. Any txid in this set that is missing from the snapshot is considered
/// evicted.
///
/// When the emitter emits a block, confirmed txids are removed from this set. This prevents
/// confirmed transactions from being mistakenly marked with an `evicted_at` timestamp.
expected_mempool_txids: HashSet<Txid>,
}

impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
/// Indicates that there are no initially expected mempool transactions.
///
/// Pass this to the `expected_mempool_txids` field of [`Emitter::new`] when the wallet is known
/// to start empty (i.e. with no unconfirmed transactions).
pub const NO_EXPECTED_MEMPOOL_TXIDS: core::iter::Empty<Txid> = core::iter::empty();

impl<C> Emitter<C>
where
C: Deref,
C::Target: RpcApi,
{
/// Construct a new [`Emitter`].
///
/// `last_cp` informs the emitter of the chain we are starting off with. This way, the emitter
/// can start emission from a block that connects to the original chain.
///
/// `start_height` starts emission from a given height (if there are no conflicts with the
/// original chain).
pub fn new(client: &'c C, last_cp: CheckPoint, start_height: u32) -> Self {
///
/// `expected_mempool_txids` is the initial set of unconfirmed txids provided by the wallet.
/// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions. If it is
/// known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXIDS`] can be used.
pub fn new(
client: C,
last_cp: CheckPoint,
start_height: u32,
expected_mempool_txids: impl IntoIterator<Item = impl Into<Txid>>,
) -> Self {
Self {
client,
start_height,
last_cp,
last_block: None,
last_mempool_time: 0,
last_mempool_tip: None,
expected_mempool_txids: expected_mempool_txids.into_iter().map(Into::into).collect(),
}
}

/// Emit mempool transactions, alongside their first-seen unix timestamps.
/// Emit mempool transactions and any evicted [`Txid`]s.
///
/// This method returns a [`MempoolEvent`] containing the full transactions (with their
/// first-seen unix timestamps) that were emitted, and [`MempoolEvent::evicted_txids`] which are
/// any [`Txid`]s which were previously seen in the mempool and are now missing. Evicted txids
/// are only reported once the emitter’s checkpoint matches the RPC’s best block in both height
/// and hash. Until `next_block()` advances the checkpoint to tip, `mempool()` will always
/// return an empty `evicted_txids` set.
///
/// This method emits each transaction only once, unless we cannot guarantee the transaction's
/// ancestors are already emitted.
Expand All @@ -74,8 +111,8 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
/// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block
/// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
/// at height `h`.
pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
let client = self.client;
pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
let client = &*self.client;

// This is the emitted tip height during the last mempool emission.
let prev_mempool_tip = self
Expand All @@ -84,15 +121,46 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
// `start_height` has been emitted.
.unwrap_or(self.start_height.saturating_sub(1));

// Loop to make sure that the fetched mempool content and the fetched tip are consistent
// with one another.
let (raw_mempool, raw_mempool_txids, rpc_height, rpc_block_hash) = loop {
// Determine if height and hash matches the best block from the RPC. Evictions are deferred
// if we are not at the best block.
let height = client.get_block_count()?;
let hash = client.get_block_hash(height)?;

// Get the raw mempool result from the RPC client which will be used to determine if any
// transactions have been evicted.
let mp = client.get_raw_mempool_verbose()?;
let mp_txids: HashSet<Txid> = mp.keys().copied().collect();

if height == client.get_block_count()? && hash == client.get_block_hash(height)? {
break (mp, mp_txids, height, hash);
}
};

let at_tip =
rpc_height == self.last_cp.height() as u64 && rpc_block_hash == self.last_cp.hash();

// If at tip, any expected txid missing from raw mempool is considered evicted;
// if not at tip, we don't evict anything.
let evicted_txids: HashSet<Txid> = if at_tip {
self.expected_mempool_txids
.difference(&raw_mempool_txids)
.copied()
.collect()
} else {
HashSet::new()
};

// Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
// track of the latest mempool tx's timestamp to determine whether we have seen a tx
// before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
// be the new latest timestamp.
let prev_mempool_time = self.last_mempool_time;
let mut latest_time = prev_mempool_time;

let txs_to_emit = client
.get_raw_mempool_verbose()?
let new_txs = raw_mempool
.into_iter()
.filter_map({
let latest_time = &mut latest_time;
Expand All @@ -101,25 +169,25 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
if tx_time > *latest_time {
*latest_time = tx_time;
}

// Avoid emitting transactions that are already emitted if we can guarantee
// blocks containing ancestors are already emitted. The bitcoind rpc interface
// provides us with the block height that the tx is introduced to the mempool.
// If we have already emitted the block of height, we can assume that all
// ancestor txs have been processed by the receiver.
// Best-effort check to avoid re-emitting transactions we've already emitted.
//
// Complete suppression isn't possible, since a transaction may spend outputs
// owned by the wallet. To determine if such a transaction is relevant, we must
// have already seen its ancestor(s) that contain the spent prevouts.
//
// Fortunately, bitcoind provides the block height at which the transaction
// entered the mempool. If we've already emitted that block height, we can
// reasonably assume the receiver has seen all ancestor transactions.
let is_already_emitted = tx_time <= prev_mempool_time;
let is_within_height = tx_entry.height <= prev_mempool_tip as _;
if is_already_emitted && is_within_height {
return None;
}

let tx = match client.get_raw_transaction(&txid, None) {
Ok(tx) => tx,
// the tx is confirmed or evicted since `get_raw_mempool_verbose`
Err(err) if err.is_not_found_error() => return None,
Err(err) => return Some(Err(err)),
};

Some(Ok((tx, tx_time as u64)))
}
})
Expand All @@ -128,26 +196,68 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
self.last_mempool_time = latest_time;
self.last_mempool_tip = Some(self.last_cp.height());

Ok(txs_to_emit)
}
// If at tip, we replace `expected_mempool_txids` with just the new txids. Otherwise, we’re
// still catching up to the tip and keep accumulating.
if at_tip {
self.expected_mempool_txids = new_txs.iter().map(|(tx, _)| tx.compute_txid()).collect();
} else {
self.expected_mempool_txids
.extend(new_txs.iter().map(|(tx, _)| tx.compute_txid()));
}

/// Emit the next block height and header (if any).
pub fn next_header(&mut self) -> Result<Option<BlockEvent<Header>>, bitcoincore_rpc::Error> {
Ok(poll(self, |hash| self.client.get_block_header(hash))?
.map(|(checkpoint, block)| BlockEvent { block, checkpoint }))
Ok(MempoolEvent {
new_txs,
evicted_txids,
latest_update_time: latest_time as u64,
})
}

/// Emit the next block height and block (if any).
pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block>>, bitcoincore_rpc::Error> {
Ok(poll(self, |hash| self.client.get_block(hash))?
.map(|(checkpoint, block)| BlockEvent { block, checkpoint }))
if let Some((checkpoint, block)) = poll(self, move |hash, client| client.get_block(hash))? {
// Stop tracking unconfirmed transactions that have been confirmed in this block.
for tx in &block.txdata {
self.expected_mempool_txids.remove(&tx.compute_txid());
}
return Ok(Some(BlockEvent { block, checkpoint }));
}
Ok(None)
}
}

/// A new emission from mempool.
#[derive(Debug)]
pub struct MempoolEvent {
/// Unemitted transactions or transactions with ancestors that are unseen by the receiver.
///
/// To understand the second condition, consider a receiver which filters transactions based on
/// whether it alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction
/// spends a tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to
/// block of height `h-1`, we want to re-emit this transaction until the receiver has seen the
/// block at height `h`.
pub new_txs: Vec<(Transaction, u64)>,

/// [`Txid`]s of all transactions that have been evicted from mempool.
pub evicted_txids: HashSet<Txid>,

/// The latest timestamp of when a transaction entered the mempool.
///
/// This is useful for setting the timestamp for evicted transactions.
pub latest_update_time: u64,
}

impl MempoolEvent {
/// Returns an iterator of `(txid, evicted_at)` pairs for all evicted transactions.
pub fn evicted_ats(&self) -> impl ExactSizeIterator<Item = (Txid, u64)> + '_ {
let time = self.latest_update_time;
self.evicted_txids.iter().map(move |&txid| (txid, time))
}
}

/// A newly emitted block from [`Emitter`].
#[derive(Debug)]
pub struct BlockEvent<B> {
/// Either a full [`Block`] or [`Header`] of the new block.
/// The block.
pub block: B,

/// The checkpoint of the new block.
Expand Down Expand Up @@ -199,9 +309,10 @@ enum PollResponse {

fn poll_once<C>(emitter: &Emitter<C>) -> Result<PollResponse, bitcoincore_rpc::Error>
where
C: bitcoincore_rpc::RpcApi,
C: Deref,
C::Target: RpcApi,
{
let client = emitter.client;
let client = &*emitter.client;

if let Some(last_res) = &emitter.last_block {
let next_hash = if last_res.height < emitter.start_height as _ {
Expand Down Expand Up @@ -255,15 +366,16 @@ fn poll<C, V, F>(
get_item: F,
) -> Result<Option<(CheckPoint, V)>, bitcoincore_rpc::Error>
where
C: bitcoincore_rpc::RpcApi,
F: Fn(&BlockHash) -> Result<V, bitcoincore_rpc::Error>,
C: Deref,
C::Target: RpcApi,
F: Fn(&BlockHash, &C::Target) -> Result<V, bitcoincore_rpc::Error>,
{
loop {
match poll_once(emitter)? {
PollResponse::Block(res) => {
let height = res.height as u32;
let hash = res.hash;
let item = get_item(&hash)?;
let item = get_item(&hash, &emitter.client)?;

let new_cp = emitter
.last_cp
Expand Down Expand Up @@ -329,3 +441,81 @@ impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
}
}
}

#[cfg(test)]
mod test {
use crate::{bitcoincore_rpc::RpcApi, Emitter, NO_EXPECTED_MEMPOOL_TXIDS};
use bdk_chain::local_chain::LocalChain;
use bdk_testenv::{anyhow, TestEnv};
use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, Txid, WScriptHash};
use std::collections::HashSet;

#[test]
fn test_expected_mempool_txids_accumulate_and_remove() -> anyhow::Result<()> {
let env = TestEnv::new()?;
let chain = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?).0;
let chain_tip = chain.tip();
let mut emitter = Emitter::new(
env.rpc_client(),
chain_tip.clone(),
1,
NO_EXPECTED_MEMPOOL_TXIDS,
);

env.mine_blocks(100, None)?;
while emitter.next_block()?.is_some() {}

let spk_to_track = ScriptBuf::new_p2wsh(&WScriptHash::all_zeros());
let addr_to_track = Address::from_script(&spk_to_track, bitcoin::Network::Regtest)?;
let mut mempool_txids = HashSet::new();

// Send a tx at different heights and ensure txs are accumulating in expected_mempool_txids.
for _ in 0..10 {
let sent_txid = env.send(&addr_to_track, Amount::from_sat(1_000))?;
mempool_txids.insert(sent_txid);
emitter.mempool()?;
env.mine_blocks(1, None)?;

for txid in &mempool_txids {
assert!(
emitter.expected_mempool_txids.contains(txid),
"Expected txid {:?} missing",
txid
);
}
}

// Process each block and check that confirmed txids are removed from from
// expected_mempool_txids.
while let Some(block_event) = emitter.next_block()? {
let confirmed_txids: HashSet<Txid> = block_event
.block
.txdata
.iter()
.map(|tx| tx.compute_txid())
.collect();
mempool_txids = mempool_txids
.difference(&confirmed_txids)
.copied()
.collect::<HashSet<_>>();
for txid in confirmed_txids {
assert!(
!emitter.expected_mempool_txids.contains(&txid),
"Expected txid {:?} should have been removed",
txid
);
}
for txid in &mempool_txids {
assert!(
emitter.expected_mempool_txids.contains(txid),
"Expected txid {:?} missing",
txid
);
}
}

assert!(emitter.expected_mempool_txids.is_empty());

Ok(())
}
}
Loading
Loading