Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 74 additions & 20 deletions src/broadcaster.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,94 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

use bitcoin::{Transaction, Wtxid};
use bitcoin::{Transaction, Txid, Wtxid};
use tokio::sync::oneshot;

use crate::Package;

#[derive(Debug)]
pub(crate) struct BroadcastQueue {
pending: HashMap<Wtxid, oneshot::Sender<Wtxid>>,
data: HashMap<Wtxid, Transaction>,
// There are the transactions that a peer should receive first. In the case of 1p1c, these are
// the `Wtxid` of the child transaction in the package.
advertise: HashSet<Wtxid>,
// Notify the user when:
// 1. a singleton transaction was broadcast
// 2. the final transaction in a package was broadcast
callbacks: HashMap<Wtxid, (oneshot::Sender<Wtxid>, Wtxid)>,
// These transactions will be fetched by the usual `Wtxid`.
witness_data: HashMap<Wtxid, Transaction>,
// These transactions represent missing inputs to a previously broadcast transaction. Because
// the inputs use the legacy `Txid` in the outpoint, these transactions are indexed by `Txid`.
legacy_data: HashMap<Txid, Transaction>,
}

impl BroadcastQueue {
pub(crate) fn new() -> Self {
Self {
pending: HashMap::new(),
data: HashMap::new(),
advertise: HashSet::new(),
callbacks: HashMap::new(),
witness_data: HashMap::new(),
legacy_data: HashMap::new(),
}
}

pub(crate) fn add_to_queue(&mut self, tx: Transaction, oneshot: oneshot::Sender<Wtxid>) {
let wtxid = tx.compute_wtxid();
self.pending.insert(wtxid, oneshot);
self.data.insert(wtxid, tx);
pub(crate) fn add_to_queue(&mut self, package: Package, oneshot: oneshot::Sender<Wtxid>) {
let advertise_wtxid = package.advertise_package();
self.advertise.insert(advertise_wtxid);
let parent = package.parent();
let parent_txid = parent.compute_txid();
let parent_wtxid = parent.compute_wtxid();
match package.child() {
Some(child) => {
let child_wtxid = child.compute_wtxid();
// Only confirm once the parent is confirmed to have been requested.
self.callbacks.insert(parent_wtxid, (oneshot, child_wtxid));
self.witness_data.insert(child_wtxid, child);
// The only way a peer can feasibly request this transaction is by `Txid`, as it is
// never advertised explicitly.
self.legacy_data.insert(parent_txid, parent);
}
None => {
self.callbacks.insert(parent_wtxid, (oneshot, parent_wtxid));
self.witness_data.insert(parent_wtxid, parent);
}
}
}

pub(crate) fn fetch_tx(&self, wtxid: Wtxid) -> Option<Transaction> {
self.data.get(&wtxid).cloned()
pub(crate) fn fetch_tx(&self, id: impl Into<TxIdentifier>) -> Option<Transaction> {
let id = id.into();
match id {
TxIdentifier::Legacy(txid) => self.legacy_data.get(&txid).cloned(),
TxIdentifier::Witness(wtxid) => self.witness_data.get(&wtxid).cloned(),
}
}

pub(crate) fn successful(&mut self, wtxid: Wtxid) {
if let Some(pending) = self.pending.remove(&wtxid) {
let _ = pending.send(wtxid);
pub(crate) fn sent_transaction_payload(&mut self, wtxid: Wtxid) {
if let Some((callback, child)) = self.callbacks.remove(&wtxid) {
self.advertise.remove(&child);
let _ = callback.send(child);
}
}

pub(crate) fn pending_wtxid(&self) -> Vec<Wtxid> {
self.pending.keys().copied().collect()
self.advertise.iter().copied().collect()
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, std::hash::Hash)]
pub(crate) enum TxIdentifier {
Legacy(Txid),
Witness(Wtxid),
}

impl From<Txid> for TxIdentifier {
fn from(value: Txid) -> Self {
Self::Legacy(value)
}
}

impl From<Wtxid> for TxIdentifier {
fn from(value: Wtxid) -> Self {
Self::Witness(value)
}
}

Expand Down Expand Up @@ -65,15 +119,15 @@ mod tests {
let transaction_2: Transaction = tx_data.transactions[1].clone().0;
let mut queue = BroadcastQueue::new();
let (tx, _) = tokio::sync::oneshot::channel();
queue.add_to_queue(transaction_1.clone(), tx);
queue.add_to_queue(transaction_1.clone().into(), tx);
let (tx, _) = tokio::sync::oneshot::channel();
queue.add_to_queue(transaction_2.clone(), tx);
queue.add_to_queue(transaction_2.clone().into(), tx);
assert_eq!(queue.pending_wtxid().len(), 2);
queue.successful(transaction_1.compute_wtxid());
queue.sent_transaction_payload(transaction_1.compute_wtxid());
assert_eq!(queue.pending_wtxid().len(), 1);
assert!(queue.fetch_tx(transaction_1.compute_wtxid()).is_some());
assert!(queue.fetch_tx(transaction_2.compute_wtxid()).is_some());
queue.successful(transaction_2.compute_wtxid());
queue.sent_transaction_payload(transaction_2.compute_wtxid());
assert_eq!(queue.pending_wtxid().len(), 0);
}
}
21 changes: 10 additions & 11 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use bitcoin::p2p::address::AddrV2;
use bitcoin::p2p::ServiceFlags;
use bitcoin::{Amount, Transaction, Wtxid};
use bitcoin::{Amount, Wtxid};
use bitcoin::{BlockHash, FeeRate};
use tokio::sync::mpsc;
use tokio::sync::mpsc::UnboundedSender;
Expand All @@ -9,7 +9,7 @@ use tokio::sync::oneshot;
use crate::chain::block_subsidy;
use crate::chain::IndexedHeader;
use crate::messages::ClientRequest;
use crate::{Event, HeaderCheckpoint, Info, TrustedPeer, Warning};
use crate::{Event, HeaderCheckpoint, Info, Package, TrustedPeer, Warning};

use super::{error::ClientError, messages::ClientMessage};
use super::{error::FetchBlockError, IndexedBlock};
Expand Down Expand Up @@ -65,23 +65,22 @@ impl Requester {
.map_err(|_| ClientError::SendError)
}

/// Broadcast a new transaction to the network, waiting for at least one peer to request it.
/// Submit a package of transactions to the network, returning when transaction data was sent
/// to at least one peer.
///
/// # Note
/// Note that this is directly callable with a single [`Transaction`].
///
/// When broadcasting a one-parent one-child (TRUC) package,
/// broadcast the child first, followed by the parent.
/// # Returns
///
/// Package relay is under-development at the time of writing.
///
/// For more information, see BIP-431 and BIP-331.
/// The `Wtxid` of the child or singleton transaction.
///
/// # Errors
///
/// If the node has stopped running.
pub async fn broadcast_tx(&self, transaction: Transaction) -> Result<Wtxid, ClientError> {
pub async fn submit_package(&self, package: impl Into<Package>) -> Result<Wtxid, ClientError> {
let (tx, rx) = tokio::sync::oneshot::channel::<Wtxid>();
let client_request = ClientRequest::new(transaction, tx);
let package = package.into();
let client_request = ClientRequest::new(package, tx);
self.ntx
.send(ClientMessage::Broadcast(client_request))
.map_err(|_| ClientError::SendError)?;
Expand Down
27 changes: 27 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,30 @@ impl core::fmt::Display for FetchBlockError {
}

impl_sourceless_error!(FetchBlockError);

/// Errors when constructing transaction packages.
#[derive(Debug)]
pub enum PackageError {
/// Packages may not include more than two transactions and must include at least one
/// transaction.
InvalidPackageLength(usize),
/// Child transactions must spend an output from the parent.
UnrelatedTransactions,
}

impl core::fmt::Display for PackageError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PackageError::InvalidPackageLength(s) => {
write!(
f,
"package must include at most two transactions, got {}",
s
)
}
PackageError::UnrelatedTransactions => {
write!(f, "packages must have dependent inputs and outputs.")
}
}
}
}
77 changes: 77 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub mod messages;
/// The structure that communicates with the Bitcoin P2P network and collects data.
pub mod node;

use bitcoin::OutPoint;
use chain::Filter;

use std::net::{IpAddr, Ipv4Addr, SocketAddr};
Expand Down Expand Up @@ -431,6 +432,82 @@ impl Dialog {
}
}

/// A package is a set of dependent transactions to submit to the mempool.
#[derive(Debug, Clone)]
pub struct Package {
parent: Transaction,
child: Option<Transaction>,
}

impl Package {
/// Create a new package from a single transaction.
pub fn new_single(transaction: Transaction) -> Self {
Self {
parent: transaction,
child: None,
}
}

/// Construct a new package using the one-parent-one-child topology, where the child spends an
/// output from the parent. The primary use of such a topology is for a child to bump the
/// fee-rate of the package.
///
/// # Errors
///
/// If the child does not spend at least one output created by the parent.
pub fn new_one_parent_one_child(
parent: Transaction,
child: Transaction,
) -> Result<Self, error::PackageError> {
let outpoints = {
let txid = parent.compute_txid();
let mut outpoints = Vec::with_capacity(parent.output.len());
for vout in 0..parent.output.len() {
outpoints.push(OutPoint {
txid,
vout: vout as u32,
});
}
outpoints
};
if !child
.input
.iter()
.any(|input| outpoints.contains(&input.previous_output))
{
return Err(error::PackageError::UnrelatedTransactions);
}
Ok(Self {
parent,
child: Some(child),
})
}

fn advertise_package(&self) -> Wtxid {
match &self.child {
Some(child) => child.compute_wtxid(),
None => self.parent.compute_wtxid(),
}
}

fn parent(&self) -> Transaction {
self.parent.clone()
}

fn child(&self) -> Option<Transaction> {
self.child.clone()
}
}

impl From<Transaction> for Package {
fn from(value: Transaction) -> Self {
Package {
parent: value,
child: None,
}
}
}

macro_rules! impl_sourceless_error {
($e:ident) => {
impl std::error::Error for $e {
Expand Down
8 changes: 3 additions & 5 deletions src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ use std::ops::Div;

use bitcoin::p2p::address::AddrV2;
use bitcoin::p2p::ServiceFlags;
use bitcoin::{
block::Header, p2p::message_network::RejectReason, BlockHash, FeeRate, Transaction, Wtxid,
};
use bitcoin::{block::Header, p2p::message_network::RejectReason, BlockHash, FeeRate, Wtxid};

use crate::chain::{BlockHeaderChanges, IndexedHeader};
use crate::IndexedFilter;
use crate::{chain::checkpoints::HeaderCheckpoint, IndexedBlock, TrustedPeer};
use crate::{IndexedFilter, Package};

use super::error::FetchBlockError;

Expand Down Expand Up @@ -140,7 +138,7 @@ pub(crate) enum ClientMessage {
/// Stop the node.
Shutdown,
/// Broadcast a [`crate::Transaction`] with a [`crate::TxBroadcastPolicy`].
Broadcast(ClientRequest<Transaction, Wtxid>),
Broadcast(ClientRequest<Package, Wtxid>),
/// Starting at the configured anchor checkpoint, re-emit all filters.
Rescan,
/// Explicitly request a block from the node.
Expand Down
43 changes: 34 additions & 9 deletions src/network/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{sync::Arc, time::Duration};
use addrman::Record;
use bip324::{AsyncProtocol, PacketReader, PacketWriter, Role};
use bitcoin::{
p2p::{message::NetworkMessage, ServiceFlags},
p2p::{message::NetworkMessage, message_blockdata::Inventory, ServiceFlags},
Network,
};
use tokio::{
Expand Down Expand Up @@ -259,15 +259,40 @@ impl Peer {
.await?;
Ok(())
}
ReaderMessage::TxRequests(requests) => {
ReaderMessage::GetData(requests) => {
let mut tx_queue = self.tx_queue.lock().await;
for wtxid in requests {
let transaction = tx_queue.fetch_tx(wtxid);
if let Some(transaction) = transaction {
let msg = message_generator.broadcast_transaction(transaction);
self.write_bytes(writer, msg).await?;
self.message_state.sent_tx(wtxid);
tx_queue.successful(wtxid);
for inv in requests {
match inv {
Inventory::WTx(wtxid) => {
let transaction = tx_queue.fetch_tx(wtxid);
if let Some(transaction) = transaction {
let msg = message_generator.broadcast_transaction(transaction);
self.write_bytes(writer, msg).await?;
self.message_state.sent_tx(wtxid);
tx_queue.sent_transaction_payload(wtxid);
}
}
Inventory::Transaction(txid) => {
let transaction = tx_queue.fetch_tx(txid);
if let Some(transaction) = transaction {
let wtxid = transaction.compute_wtxid();
let msg = message_generator.broadcast_transaction(transaction);
self.write_bytes(writer, msg).await?;
self.message_state.sent_tx(wtxid);
tx_queue.sent_transaction_payload(wtxid);
}
}
Inventory::WitnessTransaction(txid) => {
let transaction = tx_queue.fetch_tx(txid);
if let Some(transaction) = transaction {
let wtxid = transaction.compute_wtxid();
let msg = message_generator.broadcast_transaction(transaction);
self.write_bytes(writer, msg).await?;
self.message_state.sent_tx(wtxid);
tx_queue.sent_transaction_payload(wtxid);
}
}
_ => (),
}
}
Ok(())
Expand Down
Loading
Loading