diff --git a/src/broadcaster.rs b/src/broadcaster.rs index 9d80f6d..da4ef98 100644 --- a/src/broadcaster.rs +++ b/src/broadcaster.rs @@ -1,40 +1,94 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; -use bitcoin::{Transaction, Wtxid}; +use bitcoin::{Transaction, Txid, Wtxid}; use tokio::sync::oneshot; +use crate::Package; + #[derive(Debug)] pub(crate) struct BroadcastQueue { - pending: HashMap>, - data: HashMap, + // There are the transactions that a peer should receive first. In the case of 1p1c, these are + // the `Wtxid` of the child transaction in the package. + advertise: HashSet, + // Notify the user when: + // 1. a singleton transaction was broadcast + // 2. the final transaction in a package was broadcast + callbacks: HashMap, Wtxid)>, + // These transactions will be fetched by the usual `Wtxid`. + witness_data: HashMap, + // These transactions represent missing inputs to a previously broadcast transaction. Because + // the inputs use the legacy `Txid` in the outpoint, these transactions are indexed by `Txid`. + legacy_data: HashMap, } impl BroadcastQueue { pub(crate) fn new() -> Self { Self { - pending: HashMap::new(), - data: HashMap::new(), + advertise: HashSet::new(), + callbacks: HashMap::new(), + witness_data: HashMap::new(), + legacy_data: HashMap::new(), } } - pub(crate) fn add_to_queue(&mut self, tx: Transaction, oneshot: oneshot::Sender) { - let wtxid = tx.compute_wtxid(); - self.pending.insert(wtxid, oneshot); - self.data.insert(wtxid, tx); + pub(crate) fn add_to_queue(&mut self, package: Package, oneshot: oneshot::Sender) { + let advertise_wtxid = package.advertise_package(); + self.advertise.insert(advertise_wtxid); + let parent = package.parent(); + let parent_txid = parent.compute_txid(); + let parent_wtxid = parent.compute_wtxid(); + match package.child() { + Some(child) => { + let child_wtxid = child.compute_wtxid(); + // Only confirm once the parent is confirmed to have been requested. + self.callbacks.insert(parent_wtxid, (oneshot, child_wtxid)); + self.witness_data.insert(child_wtxid, child); + // The only way a peer can feasibly request this transaction is by `Txid`, as it is + // never advertised explicitly. + self.legacy_data.insert(parent_txid, parent); + } + None => { + self.callbacks.insert(parent_wtxid, (oneshot, parent_wtxid)); + self.witness_data.insert(parent_wtxid, parent); + } + } } - pub(crate) fn fetch_tx(&self, wtxid: Wtxid) -> Option { - self.data.get(&wtxid).cloned() + pub(crate) fn fetch_tx(&self, id: impl Into) -> Option { + let id = id.into(); + match id { + TxIdentifier::Legacy(txid) => self.legacy_data.get(&txid).cloned(), + TxIdentifier::Witness(wtxid) => self.witness_data.get(&wtxid).cloned(), + } } - pub(crate) fn successful(&mut self, wtxid: Wtxid) { - if let Some(pending) = self.pending.remove(&wtxid) { - let _ = pending.send(wtxid); + pub(crate) fn sent_transaction_payload(&mut self, wtxid: Wtxid) { + if let Some((callback, child)) = self.callbacks.remove(&wtxid) { + self.advertise.remove(&child); + let _ = callback.send(child); } } pub(crate) fn pending_wtxid(&self) -> Vec { - self.pending.keys().copied().collect() + self.advertise.iter().copied().collect() + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, std::hash::Hash)] +pub(crate) enum TxIdentifier { + Legacy(Txid), + Witness(Wtxid), +} + +impl From for TxIdentifier { + fn from(value: Txid) -> Self { + Self::Legacy(value) + } +} + +impl From for TxIdentifier { + fn from(value: Wtxid) -> Self { + Self::Witness(value) } } @@ -65,15 +119,15 @@ mod tests { let transaction_2: Transaction = tx_data.transactions[1].clone().0; let mut queue = BroadcastQueue::new(); let (tx, _) = tokio::sync::oneshot::channel(); - queue.add_to_queue(transaction_1.clone(), tx); + queue.add_to_queue(transaction_1.clone().into(), tx); let (tx, _) = tokio::sync::oneshot::channel(); - queue.add_to_queue(transaction_2.clone(), tx); + queue.add_to_queue(transaction_2.clone().into(), tx); assert_eq!(queue.pending_wtxid().len(), 2); - queue.successful(transaction_1.compute_wtxid()); + queue.sent_transaction_payload(transaction_1.compute_wtxid()); assert_eq!(queue.pending_wtxid().len(), 1); assert!(queue.fetch_tx(transaction_1.compute_wtxid()).is_some()); assert!(queue.fetch_tx(transaction_2.compute_wtxid()).is_some()); - queue.successful(transaction_2.compute_wtxid()); + queue.sent_transaction_payload(transaction_2.compute_wtxid()); assert_eq!(queue.pending_wtxid().len(), 0); } } diff --git a/src/client.rs b/src/client.rs index 3e2c965..4146beb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,6 +1,6 @@ use bitcoin::p2p::address::AddrV2; use bitcoin::p2p::ServiceFlags; -use bitcoin::{Amount, Transaction, Wtxid}; +use bitcoin::{Amount, Wtxid}; use bitcoin::{BlockHash, FeeRate}; use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedSender; @@ -9,7 +9,7 @@ use tokio::sync::oneshot; use crate::chain::block_subsidy; use crate::chain::IndexedHeader; use crate::messages::ClientRequest; -use crate::{Event, HeaderCheckpoint, Info, TrustedPeer, Warning}; +use crate::{Event, HeaderCheckpoint, Info, Package, TrustedPeer, Warning}; use super::{error::ClientError, messages::ClientMessage}; use super::{error::FetchBlockError, IndexedBlock}; @@ -65,23 +65,22 @@ impl Requester { .map_err(|_| ClientError::SendError) } - /// Broadcast a new transaction to the network, waiting for at least one peer to request it. + /// Submit a package of transactions to the network, returning when transaction data was sent + /// to at least one peer. /// - /// # Note + /// Note that this is directly callable with a single [`Transaction`]. /// - /// When broadcasting a one-parent one-child (TRUC) package, - /// broadcast the child first, followed by the parent. + /// # Returns /// - /// Package relay is under-development at the time of writing. - /// - /// For more information, see BIP-431 and BIP-331. + /// The `Wtxid` of the child or singleton transaction. /// /// # Errors /// /// If the node has stopped running. - pub async fn broadcast_tx(&self, transaction: Transaction) -> Result { + pub async fn submit_package(&self, package: impl Into) -> Result { let (tx, rx) = tokio::sync::oneshot::channel::(); - let client_request = ClientRequest::new(transaction, tx); + let package = package.into(); + let client_request = ClientRequest::new(package, tx); self.ntx .send(ClientMessage::Broadcast(client_request)) .map_err(|_| ClientError::SendError)?; diff --git a/src/error.rs b/src/error.rs index 334c32e..a22afaf 100644 --- a/src/error.rs +++ b/src/error.rs @@ -75,3 +75,30 @@ impl core::fmt::Display for FetchBlockError { } impl_sourceless_error!(FetchBlockError); + +/// Errors when constructing transaction packages. +#[derive(Debug)] +pub enum PackageError { + /// Packages may not include more than two transactions and must include at least one + /// transaction. + InvalidPackageLength(usize), + /// Child transactions must spend an output from the parent. + UnrelatedTransactions, +} + +impl core::fmt::Display for PackageError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PackageError::InvalidPackageLength(s) => { + write!( + f, + "package must include at most two transactions, got {}", + s + ) + } + PackageError::UnrelatedTransactions => { + write!(f, "packages must have dependent inputs and outputs.") + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 0c35400..32122de 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -60,6 +60,7 @@ pub mod messages; /// The structure that communicates with the Bitcoin P2P network and collects data. pub mod node; +use bitcoin::OutPoint; use chain::Filter; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -431,6 +432,82 @@ impl Dialog { } } +/// A package is a set of dependent transactions to submit to the mempool. +#[derive(Debug, Clone)] +pub struct Package { + parent: Transaction, + child: Option, +} + +impl Package { + /// Create a new package from a single transaction. + pub fn new_single(transaction: Transaction) -> Self { + Self { + parent: transaction, + child: None, + } + } + + /// Construct a new package using the one-parent-one-child topology, where the child spends an + /// output from the parent. The primary use of such a topology is for a child to bump the + /// fee-rate of the package. + /// + /// # Errors + /// + /// If the child does not spend at least one output created by the parent. + pub fn new_one_parent_one_child( + parent: Transaction, + child: Transaction, + ) -> Result { + let outpoints = { + let txid = parent.compute_txid(); + let mut outpoints = Vec::with_capacity(parent.output.len()); + for vout in 0..parent.output.len() { + outpoints.push(OutPoint { + txid, + vout: vout as u32, + }); + } + outpoints + }; + if !child + .input + .iter() + .any(|input| outpoints.contains(&input.previous_output)) + { + return Err(error::PackageError::UnrelatedTransactions); + } + Ok(Self { + parent, + child: Some(child), + }) + } + + fn advertise_package(&self) -> Wtxid { + match &self.child { + Some(child) => child.compute_wtxid(), + None => self.parent.compute_wtxid(), + } + } + + fn parent(&self) -> Transaction { + self.parent.clone() + } + + fn child(&self) -> Option { + self.child.clone() + } +} + +impl From for Package { + fn from(value: Transaction) -> Self { + Package { + parent: value, + child: None, + } + } +} + macro_rules! impl_sourceless_error { ($e:ident) => { impl std::error::Error for $e { diff --git a/src/messages.rs b/src/messages.rs index 15f5cb4..18e17b3 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -3,13 +3,11 @@ use std::ops::Div; use bitcoin::p2p::address::AddrV2; use bitcoin::p2p::ServiceFlags; -use bitcoin::{ - block::Header, p2p::message_network::RejectReason, BlockHash, FeeRate, Transaction, Wtxid, -}; +use bitcoin::{block::Header, p2p::message_network::RejectReason, BlockHash, FeeRate, Wtxid}; use crate::chain::{BlockHeaderChanges, IndexedHeader}; -use crate::IndexedFilter; use crate::{chain::checkpoints::HeaderCheckpoint, IndexedBlock, TrustedPeer}; +use crate::{IndexedFilter, Package}; use super::error::FetchBlockError; @@ -140,7 +138,7 @@ pub(crate) enum ClientMessage { /// Stop the node. Shutdown, /// Broadcast a [`crate::Transaction`] with a [`crate::TxBroadcastPolicy`]. - Broadcast(ClientRequest), + Broadcast(ClientRequest), /// Starting at the configured anchor checkpoint, re-emit all filters. Rescan, /// Explicitly request a block from the node. diff --git a/src/network/peer.rs b/src/network/peer.rs index 966e37d..9cfc76c 100644 --- a/src/network/peer.rs +++ b/src/network/peer.rs @@ -4,7 +4,7 @@ use std::{sync::Arc, time::Duration}; use addrman::Record; use bip324::{AsyncProtocol, PacketReader, PacketWriter, Role}; use bitcoin::{ - p2p::{message::NetworkMessage, ServiceFlags}, + p2p::{message::NetworkMessage, message_blockdata::Inventory, ServiceFlags}, Network, }; use tokio::{ @@ -259,15 +259,40 @@ impl Peer { .await?; Ok(()) } - ReaderMessage::TxRequests(requests) => { + ReaderMessage::GetData(requests) => { let mut tx_queue = self.tx_queue.lock().await; - for wtxid in requests { - let transaction = tx_queue.fetch_tx(wtxid); - if let Some(transaction) = transaction { - let msg = message_generator.broadcast_transaction(transaction); - self.write_bytes(writer, msg).await?; - self.message_state.sent_tx(wtxid); - tx_queue.successful(wtxid); + for inv in requests { + match inv { + Inventory::WTx(wtxid) => { + let transaction = tx_queue.fetch_tx(wtxid); + if let Some(transaction) = transaction { + let msg = message_generator.broadcast_transaction(transaction); + self.write_bytes(writer, msg).await?; + self.message_state.sent_tx(wtxid); + tx_queue.sent_transaction_payload(wtxid); + } + } + Inventory::Transaction(txid) => { + let transaction = tx_queue.fetch_tx(txid); + if let Some(transaction) = transaction { + let wtxid = transaction.compute_wtxid(); + let msg = message_generator.broadcast_transaction(transaction); + self.write_bytes(writer, msg).await?; + self.message_state.sent_tx(wtxid); + tx_queue.sent_transaction_payload(wtxid); + } + } + Inventory::WitnessTransaction(txid) => { + let transaction = tx_queue.fetch_tx(txid); + if let Some(transaction) = transaction { + let wtxid = transaction.compute_wtxid(); + let msg = message_generator.broadcast_transaction(transaction); + self.write_bytes(writer, msg).await?; + self.message_state.sent_tx(wtxid); + tx_queue.sent_transaction_payload(wtxid); + } + } + _ => (), } } Ok(()) diff --git a/src/network/reader.rs b/src/network/reader.rs index fb168b1..a722570 100644 --- a/src/network/reader.rs +++ b/src/network/reader.rs @@ -61,16 +61,7 @@ impl Reader { } None } - NetworkMessage::GetData(inventory) => { - let mut requests = Vec::new(); - for inv in inventory { - match inv { - Inventory::WTx(wtxid) => requests.push(wtxid), - _ => continue, - } - } - Some(ReaderMessage::TxRequests(requests)) - } + NetworkMessage::GetData(inventory) => Some(ReaderMessage::GetData(inventory)), NetworkMessage::NotFound(_) => None, NetworkMessage::GetBlocks(_) => None, NetworkMessage::GetHeaders(_) => None, @@ -162,7 +153,7 @@ pub(in crate::network) enum ReaderMessage { #[allow(dead_code)] Pong(u64), FeeFilter(FeeRate), - TxRequests(Vec), + GetData(Vec), } impl ReaderMessage { diff --git a/src/node.rs b/src/node.rs index 6f53faa..8a654ec 100644 --- a/src/node.rs +++ b/src/node.rs @@ -9,7 +9,7 @@ use bitcoin::{ message_network::VersionMessage, ServiceFlags, }, - Block, BlockHash, Network, Transaction, Wtxid, + Block, BlockHash, Network, Wtxid, }; use tokio::{ select, @@ -33,7 +33,7 @@ use crate::{ peer_map::PeerMap, LastBlockMonitor, MainThreadMessage, PeerId, PeerMessage, PeerThreadMessage, }, - Config, IndexedBlock, NodeState, + Config, IndexedBlock, NodeState, Package, }; use super::{ @@ -313,7 +313,7 @@ impl Node { } // Broadcast transactions according to the configured policy - async fn broadcast_transaction(&self, broadcast: ClientRequest) { + async fn broadcast_transaction(&self, broadcast: ClientRequest) { let mut queue = self.peer_map.tx_queue.lock().await; let (transaction, oneshot) = broadcast.into_values(); queue.add_to_queue(transaction, oneshot); diff --git a/tests/core.rs b/tests/core.rs index f74b5c0..9f5ccae 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -649,7 +649,7 @@ async fn tx_can_broadcast() { warn_rx: _, event_rx: _, } = client; - tokio::time::timeout(Duration::from_secs(60), requester.broadcast_tx(tx)) + tokio::time::timeout(Duration::from_secs(60), requester.submit_package(tx)) .await .unwrap() .unwrap();