Skip to content

Commit bf1f925

Browse files
committed
Migrate broadcaster to work with packages
1 parent d01b5c9 commit bf1f925

7 files changed

Lines changed: 127 additions & 60 deletions

File tree

src/broadcaster.rs

Lines changed: 74 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,94 @@
1-
use std::collections::HashMap;
1+
use std::collections::{HashMap, HashSet};
22

3-
use bitcoin::{Transaction, Wtxid};
3+
use bitcoin::{Transaction, Txid, Wtxid};
44
use tokio::sync::oneshot;
55

6+
use crate::Package;
7+
68
#[derive(Debug)]
79
pub(crate) struct BroadcastQueue {
8-
pending: HashMap<Wtxid, oneshot::Sender<Wtxid>>,
9-
data: HashMap<Wtxid, Transaction>,
10+
// There are the transactions that a peer should receive first. In the case of 1p1c, these are
11+
// the `Wtxid` of the child transaction in the package.
12+
advertise: HashSet<Wtxid>,
13+
// Notify the user when:
14+
// 1. a singleton transaction was broadcast
15+
// 2. the final transaction in a package was broadcast
16+
callbacks: HashMap<Wtxid, (oneshot::Sender<Wtxid>, Wtxid)>,
17+
// These transactions will be fetched by the usual `Wtxid`.
18+
witness_data: HashMap<Wtxid, Transaction>,
19+
// These transactions represent missing inputs to a previously broadcast transaction. Because
20+
// the inputs use the legacy `Txid` in the outpoint, these transactions are indexed by `Txid`.
21+
legacy_data: HashMap<Txid, Transaction>,
1022
}
1123

1224
impl BroadcastQueue {
1325
pub(crate) fn new() -> Self {
1426
Self {
15-
pending: HashMap::new(),
16-
data: HashMap::new(),
27+
advertise: HashSet::new(),
28+
callbacks: HashMap::new(),
29+
witness_data: HashMap::new(),
30+
legacy_data: HashMap::new(),
1731
}
1832
}
1933

20-
pub(crate) fn add_to_queue(&mut self, tx: Transaction, oneshot: oneshot::Sender<Wtxid>) {
21-
let wtxid = tx.compute_wtxid();
22-
self.pending.insert(wtxid, oneshot);
23-
self.data.insert(wtxid, tx);
34+
pub(crate) fn add_to_queue(&mut self, package: Package, oneshot: oneshot::Sender<Wtxid>) {
35+
let advertise_wtxid = package.advertise_package();
36+
self.advertise.insert(advertise_wtxid);
37+
let parent = package.parent();
38+
let parent_txid = parent.compute_txid();
39+
let parent_wtxid = parent.compute_wtxid();
40+
match package.child() {
41+
Some(child) => {
42+
let child_wtxid = child.compute_wtxid();
43+
// Only confirm once the parent is confirmed to have been requested.
44+
self.callbacks.insert(parent_wtxid, (oneshot, child_wtxid));
45+
self.witness_data.insert(child_wtxid, child);
46+
// The only way a peer can feasibly request this transaction is by `Txid`, as it is
47+
// never advertised explicitly.
48+
self.legacy_data.insert(parent_txid, parent);
49+
}
50+
None => {
51+
self.callbacks.insert(parent_wtxid, (oneshot, parent_wtxid));
52+
self.witness_data.insert(parent_wtxid, parent);
53+
}
54+
}
2455
}
2556

26-
pub(crate) fn fetch_tx(&self, wtxid: Wtxid) -> Option<Transaction> {
27-
self.data.get(&wtxid).cloned()
57+
pub(crate) fn fetch_tx(&self, id: impl Into<TxIdentifier>) -> Option<Transaction> {
58+
let id = id.into();
59+
match id {
60+
TxIdentifier::Legacy(txid) => self.legacy_data.get(&txid).cloned(),
61+
TxIdentifier::Witness(wtxid) => self.witness_data.get(&wtxid).cloned(),
62+
}
2863
}
2964

30-
pub(crate) fn successful(&mut self, wtxid: Wtxid) {
31-
if let Some(pending) = self.pending.remove(&wtxid) {
32-
let _ = pending.send(wtxid);
65+
pub(crate) fn sent_transaction_payload(&mut self, wtxid: Wtxid) {
66+
if let Some((callback, child)) = self.callbacks.remove(&wtxid) {
67+
self.advertise.remove(&child);
68+
let _ = callback.send(child);
3369
}
3470
}
3571

3672
pub(crate) fn pending_wtxid(&self) -> Vec<Wtxid> {
37-
self.pending.keys().copied().collect()
73+
self.advertise.iter().copied().collect()
74+
}
75+
}
76+
77+
#[derive(Debug, Clone, Copy, PartialEq, Eq, std::hash::Hash)]
78+
pub(crate) enum TxIdentifier {
79+
Legacy(Txid),
80+
Witness(Wtxid),
81+
}
82+
83+
impl From<Txid> for TxIdentifier {
84+
fn from(value: Txid) -> Self {
85+
Self::Legacy(value)
86+
}
87+
}
88+
89+
impl From<Wtxid> for TxIdentifier {
90+
fn from(value: Wtxid) -> Self {
91+
Self::Witness(value)
3892
}
3993
}
4094

@@ -65,15 +119,15 @@ mod tests {
65119
let transaction_2: Transaction = tx_data.transactions[1].clone().0;
66120
let mut queue = BroadcastQueue::new();
67121
let (tx, _) = tokio::sync::oneshot::channel();
68-
queue.add_to_queue(transaction_1.clone(), tx);
122+
queue.add_to_queue(transaction_1.clone().into(), tx);
69123
let (tx, _) = tokio::sync::oneshot::channel();
70-
queue.add_to_queue(transaction_2.clone(), tx);
124+
queue.add_to_queue(transaction_2.clone().into(), tx);
71125
assert_eq!(queue.pending_wtxid().len(), 2);
72-
queue.successful(transaction_1.compute_wtxid());
126+
queue.sent_transaction_payload(transaction_1.compute_wtxid());
73127
assert_eq!(queue.pending_wtxid().len(), 1);
74128
assert!(queue.fetch_tx(transaction_1.compute_wtxid()).is_some());
75129
assert!(queue.fetch_tx(transaction_2.compute_wtxid()).is_some());
76-
queue.successful(transaction_2.compute_wtxid());
130+
queue.sent_transaction_payload(transaction_2.compute_wtxid());
77131
assert_eq!(queue.pending_wtxid().len(), 0);
78132
}
79133
}

src/client.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use bitcoin::p2p::address::AddrV2;
22
use bitcoin::p2p::ServiceFlags;
3-
use bitcoin::{Amount, Transaction, Wtxid};
3+
use bitcoin::{Amount, Wtxid};
44
use bitcoin::{BlockHash, FeeRate};
55
use tokio::sync::mpsc;
66
use tokio::sync::mpsc::UnboundedSender;
@@ -9,7 +9,7 @@ use tokio::sync::oneshot;
99
use crate::chain::block_subsidy;
1010
use crate::chain::IndexedHeader;
1111
use crate::messages::ClientRequest;
12-
use crate::{Event, HeaderCheckpoint, Info, TrustedPeer, Warning};
12+
use crate::{Event, HeaderCheckpoint, Info, Package, TrustedPeer, Warning};
1313

1414
use super::{error::ClientError, messages::ClientMessage};
1515
use super::{error::FetchBlockError, IndexedBlock};
@@ -65,23 +65,22 @@ impl Requester {
6565
.map_err(|_| ClientError::SendError)
6666
}
6767

68-
/// Broadcast a new transaction to the network, waiting for at least one peer to request it.
68+
/// Submit a package of transactions to the network, returning when transaction data was sent
69+
/// to at least one peer.
6970
///
70-
/// # Note
71+
/// Note that this is directly callable with a single [`Transaction`].
7172
///
72-
/// When broadcasting a one-parent one-child (TRUC) package,
73-
/// broadcast the child first, followed by the parent.
73+
/// # Returns
7474
///
75-
/// Package relay is under-development at the time of writing.
76-
///
77-
/// For more information, see BIP-431 and BIP-331.
75+
/// The `Wtxid` of the child or singleton transaction.
7876
///
7977
/// # Errors
8078
///
8179
/// If the node has stopped running.
82-
pub async fn broadcast_tx(&self, transaction: Transaction) -> Result<Wtxid, ClientError> {
80+
pub async fn submit_package(&self, package: impl Into<Package>) -> Result<Wtxid, ClientError> {
8381
let (tx, rx) = tokio::sync::oneshot::channel::<Wtxid>();
84-
let client_request = ClientRequest::new(transaction, tx);
82+
let package = package.into();
83+
let client_request = ClientRequest::new(package, tx);
8584
self.ntx
8685
.send(ClientMessage::Broadcast(client_request))
8786
.map_err(|_| ClientError::SendError)?;

src/messages.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,11 @@ use std::ops::Div;
33

44
use bitcoin::p2p::address::AddrV2;
55
use bitcoin::p2p::ServiceFlags;
6-
use bitcoin::{
7-
block::Header, p2p::message_network::RejectReason, BlockHash, FeeRate, Transaction, Wtxid,
8-
};
6+
use bitcoin::{block::Header, p2p::message_network::RejectReason, BlockHash, FeeRate, Wtxid};
97

108
use crate::chain::{BlockHeaderChanges, IndexedHeader};
11-
use crate::IndexedFilter;
129
use crate::{chain::checkpoints::HeaderCheckpoint, IndexedBlock, TrustedPeer};
10+
use crate::{IndexedFilter, Package};
1311

1412
use super::error::FetchBlockError;
1513

@@ -140,7 +138,7 @@ pub(crate) enum ClientMessage {
140138
/// Stop the node.
141139
Shutdown,
142140
/// Broadcast a [`crate::Transaction`] with a [`crate::TxBroadcastPolicy`].
143-
Broadcast(ClientRequest<Transaction, Wtxid>),
141+
Broadcast(ClientRequest<Package, Wtxid>),
144142
/// Starting at the configured anchor checkpoint, re-emit all filters.
145143
Rescan,
146144
/// Explicitly request a block from the node.

src/network/peer.rs

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::{sync::Arc, time::Duration};
44
use addrman::Record;
55
use bip324::{AsyncProtocol, PacketReader, PacketWriter, Role};
66
use bitcoin::{
7-
p2p::{message::NetworkMessage, ServiceFlags},
7+
p2p::{message::NetworkMessage, message_blockdata::Inventory, ServiceFlags},
88
Network,
99
};
1010
use tokio::{
@@ -259,15 +259,40 @@ impl Peer {
259259
.await?;
260260
Ok(())
261261
}
262-
ReaderMessage::TxRequests(requests) => {
262+
ReaderMessage::GetData(requests) => {
263263
let mut tx_queue = self.tx_queue.lock().await;
264-
for wtxid in requests {
265-
let transaction = tx_queue.fetch_tx(wtxid);
266-
if let Some(transaction) = transaction {
267-
let msg = message_generator.broadcast_transaction(transaction);
268-
self.write_bytes(writer, msg).await?;
269-
self.message_state.sent_tx(wtxid);
270-
tx_queue.successful(wtxid);
264+
for inv in requests {
265+
match inv {
266+
Inventory::WTx(wtxid) => {
267+
let transaction = tx_queue.fetch_tx(wtxid);
268+
if let Some(transaction) = transaction {
269+
let msg = message_generator.broadcast_transaction(transaction);
270+
self.write_bytes(writer, msg).await?;
271+
self.message_state.sent_tx(wtxid);
272+
tx_queue.sent_transaction_payload(wtxid);
273+
}
274+
}
275+
Inventory::Transaction(txid) => {
276+
let transaction = tx_queue.fetch_tx(txid);
277+
if let Some(transaction) = transaction {
278+
let wtxid = transaction.compute_wtxid();
279+
let msg = message_generator.broadcast_transaction(transaction);
280+
self.write_bytes(writer, msg).await?;
281+
self.message_state.sent_tx(wtxid);
282+
tx_queue.sent_transaction_payload(wtxid);
283+
}
284+
}
285+
Inventory::WitnessTransaction(txid) => {
286+
let transaction = tx_queue.fetch_tx(txid);
287+
if let Some(transaction) = transaction {
288+
let wtxid = transaction.compute_wtxid();
289+
let msg = message_generator.broadcast_transaction(transaction);
290+
self.write_bytes(writer, msg).await?;
291+
self.message_state.sent_tx(wtxid);
292+
tx_queue.sent_transaction_payload(wtxid);
293+
}
294+
}
295+
_ => (),
271296
}
272297
}
273298
Ok(())

src/network/reader.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,7 @@ impl<R: AsyncBufReadExt + Send + Sync + Unpin> Reader<R> {
6161
}
6262
None
6363
}
64-
NetworkMessage::GetData(inventory) => {
65-
let mut requests = Vec::new();
66-
for inv in inventory {
67-
match inv {
68-
Inventory::WTx(wtxid) => requests.push(wtxid),
69-
_ => continue,
70-
}
71-
}
72-
Some(ReaderMessage::TxRequests(requests))
73-
}
64+
NetworkMessage::GetData(inventory) => Some(ReaderMessage::GetData(inventory)),
7465
NetworkMessage::NotFound(_) => None,
7566
NetworkMessage::GetBlocks(_) => None,
7667
NetworkMessage::GetHeaders(_) => None,
@@ -162,7 +153,7 @@ pub(in crate::network) enum ReaderMessage {
162153
#[allow(dead_code)]
163154
Pong(u64),
164155
FeeFilter(FeeRate),
165-
TxRequests(Vec<Wtxid>),
156+
GetData(Vec<Inventory>),
166157
}
167158

168159
impl ReaderMessage {

src/node.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use bitcoin::{
99
message_network::VersionMessage,
1010
ServiceFlags,
1111
},
12-
Block, BlockHash, Network, Transaction, Wtxid,
12+
Block, BlockHash, Network, Wtxid,
1313
};
1414
use tokio::{
1515
select,
@@ -33,7 +33,7 @@ use crate::{
3333
peer_map::PeerMap, LastBlockMonitor, MainThreadMessage, PeerId, PeerMessage,
3434
PeerThreadMessage,
3535
},
36-
Config, IndexedBlock, NodeState,
36+
Config, IndexedBlock, NodeState, Package,
3737
};
3838

3939
use super::{
@@ -313,7 +313,7 @@ impl Node {
313313
}
314314

315315
// Broadcast transactions according to the configured policy
316-
async fn broadcast_transaction(&self, broadcast: ClientRequest<Transaction, Wtxid>) {
316+
async fn broadcast_transaction(&self, broadcast: ClientRequest<Package, Wtxid>) {
317317
let mut queue = self.peer_map.tx_queue.lock().await;
318318
let (transaction, oneshot) = broadcast.into_values();
319319
queue.add_to_queue(transaction, oneshot);

tests/core.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,7 @@ async fn tx_can_broadcast() {
649649
warn_rx: _,
650650
event_rx: _,
651651
} = client;
652-
tokio::time::timeout(Duration::from_secs(60), requester.broadcast_tx(tx))
652+
tokio::time::timeout(Duration::from_secs(60), requester.submit_package(tx))
653653
.await
654654
.unwrap()
655655
.unwrap();

0 commit comments

Comments
 (0)