Skip to content

Commit c4d3282

Browse files
jkczyzclaude
andcommitted
Persist payment transaction data without blocking LDK
Previously the BroadcasterInterface implementation wrote the payment record synchronously when LDK invoked it. With a remote KV store this could block LDK's message handling for hundreds of milliseconds per call, noticeably during force-close bursts or splice broadcasts. Persistence now happens asynchronously and must complete before the transaction is sent to the chain client. If persistence fails, the broadcast is dropped: a payment record must exist for every on-chain tx we emit, otherwise a crash could leave the tx confirmed with no matching record. Generated with assistance from Claude Code. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 0b9b403 commit c4d3282

5 files changed

Lines changed: 66 additions & 32 deletions

File tree

src/chain/bitcoind.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -568,16 +568,18 @@ impl BitcoindChainSource {
568568
Ok(())
569569
}
570570

571-
pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
571+
pub(crate) async fn process_broadcast_package(
572+
&self, txs: impl IntoIterator<Item = Transaction>,
573+
) {
572574
// While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28
573575
// features, we should eventually switch to use `submitpackage` via the
574576
// `rust-bitcoind-json-rpc` crate rather than just broadcasting individual
575577
// transactions.
576-
for tx in &package {
578+
for tx in txs {
577579
let txid = tx.compute_txid();
578580
let timeout_fut = tokio::time::timeout(
579581
Duration::from_secs(DEFAULT_TX_BROADCAST_TIMEOUT_SECS),
580-
self.api_client.broadcast_transaction(tx),
582+
self.api_client.broadcast_transaction(&tx),
581583
);
582584
match timeout_fut.await {
583585
Ok(res) => match res {

src/chain/electrum.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,9 @@ impl ElectrumChainSource {
275275
Ok(())
276276
}
277277

278-
pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
278+
pub(crate) async fn process_broadcast_package(
279+
&self, txs: impl IntoIterator<Item = Transaction>,
280+
) {
279281
let electrum_client: Arc<ElectrumRuntimeClient> = if let Some(client) =
280282
self.electrum_runtime_status.read().expect("lock").client().as_ref()
281283
{
@@ -285,7 +287,7 @@ impl ElectrumChainSource {
285287
return;
286288
};
287289

288-
for tx in package {
290+
for tx in txs {
289291
electrum_client.broadcast(tx).await;
290292
}
291293
}

src/chain/esplora.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -352,12 +352,14 @@ impl EsploraChainSource {
352352
Ok(())
353353
}
354354

355-
pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
356-
for tx in &package {
355+
pub(crate) async fn process_broadcast_package(
356+
&self, txs: impl IntoIterator<Item = Transaction>,
357+
) {
358+
for tx in txs {
357359
let txid = tx.compute_txid();
358360
let timeout_fut = tokio::time::timeout(
359361
Duration::from_secs(self.sync_config.timeouts_config.tx_broadcast_timeout_secs),
360-
self.esplora_client.broadcast(tx),
362+
self.esplora_client.broadcast(&tx),
361363
);
362364
match timeout_fut.await {
363365
Ok(res) => match res {

src/chain/mod.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::config::{
2424
WALLET_SYNC_INTERVAL_MINIMUM_SECS,
2525
};
2626
use crate::fee_estimator::OnchainFeeEstimator;
27-
use crate::logger::{log_debug, log_info, log_trace, LdkLogger, Logger};
27+
use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger};
2828
use crate::runtime::Runtime;
2929
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
3030
use crate::{Error, NodeMetrics};
@@ -453,15 +453,27 @@ impl ChainSource {
453453
return;
454454
}
455455
Some(next_package) = receiver.recv() => {
456+
let package = match self.tx_broadcaster.classify_package(next_package).await {
457+
Ok(p) => p,
458+
Err(e) => {
459+
log_error!(
460+
tx_bcast_logger,
461+
"Skipping broadcast: failed to persist payment records: {:?}",
462+
e,
463+
);
464+
continue;
465+
},
466+
};
467+
let txs = package.into_iter().map(|(tx, _)| tx);
456468
match &self.kind {
457469
ChainSourceKind::Esplora(esplora_chain_source) => {
458-
esplora_chain_source.process_broadcast_package(next_package).await
470+
esplora_chain_source.process_broadcast_package(txs).await
459471
},
460472
ChainSourceKind::Electrum(electrum_chain_source) => {
461-
electrum_chain_source.process_broadcast_package(next_package).await
473+
electrum_chain_source.process_broadcast_package(txs).await
462474
},
463475
ChainSourceKind::Bitcoind(bitcoind_chain_source) => {
464-
bitcoind_chain_source.process_broadcast_package(next_package).await
476+
bitcoind_chain_source.process_broadcast_package(txs).await
465477
},
466478
}
467479
}

src/tx_broadcaster.rs

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,26 @@ use tokio::sync::{mpsc, Mutex, MutexGuard};
1414

1515
use crate::logger::{log_error, LdkLogger};
1616
use crate::types::Wallet;
17+
use crate::Error;
1718

1819
const BCAST_PACKAGE_QUEUE_SIZE: usize = 50;
1920

21+
/// A package of transactions that LDK handed to the broadcaster in one
22+
/// `broadcast_transactions` call, along with each transaction's type. Queued until the
23+
/// background task classifies and broadcasts it.
24+
pub(crate) type BroadcastPackage = Vec<(Transaction, TransactionType)>;
25+
2026
pub(crate) struct TransactionBroadcaster<L: Deref>
2127
where
2228
L::Target: LdkLogger,
2329
{
24-
queue_sender: mpsc::Sender<Vec<Transaction>>,
25-
queue_receiver: Mutex<mpsc::Receiver<Vec<Transaction>>>,
30+
queue_sender: mpsc::Sender<BroadcastPackage>,
31+
queue_receiver: Mutex<mpsc::Receiver<BroadcastPackage>>,
2632
/// Weak handle to the [`Wallet`] that performs classification of funding broadcasts
2733
/// (channel opens and splices) into payment records. Remains `None` while the
28-
/// builder is wiring the node up, during which broadcasts are still forwarded to
29-
/// the queue but no payment record is written. [`Self::set_wallet`] installs the
30-
/// handle once the [`Wallet`] exists.
34+
/// builder is wiring the node up, during which broadcasts are forwarded to the
35+
/// queue but no payment record is written. [`Self::set_wallet`] installs the handle
36+
/// once the [`Wallet`] exists.
3137
wallet: StdMutex<Option<Weak<Wallet>>>,
3238
logger: L,
3339
}
@@ -55,30 +61,40 @@ where
5561

5662
pub(crate) async fn get_broadcast_queue(
5763
&self,
58-
) -> MutexGuard<'_, mpsc::Receiver<Vec<Transaction>>> {
64+
) -> MutexGuard<'_, mpsc::Receiver<BroadcastPackage>> {
5965
self.queue_receiver.lock().await
6066
}
67+
68+
/// Classifies a queued package into payment records and returns the package ready
69+
/// for the chain client. Returns `Err` if any classification fails; callers must
70+
/// not broadcast the package in that case, since a crash would leave the tx
71+
/// on-chain without a record.
72+
pub(crate) async fn classify_package(
73+
&self, package: BroadcastPackage,
74+
) -> Result<BroadcastPackage, Error> {
75+
let wallet_opt = self.wallet.lock().expect("lock").as_ref().and_then(Weak::upgrade);
76+
if let Some(wallet) = wallet_opt {
77+
tokio::task::spawn_blocking(move || {
78+
for (tx, tx_type) in &package {
79+
wallet.classify_broadcast(tx, tx_type)?;
80+
}
81+
Ok::<_, Error>(package)
82+
})
83+
.await
84+
.map_err(|_| Error::PersistenceFailed)?
85+
} else {
86+
Ok(package)
87+
}
88+
}
6189
}
6290

6391
impl<L: Deref> BroadcasterInterface for TransactionBroadcaster<L>
6492
where
6593
L::Target: LdkLogger,
6694
{
6795
fn broadcast_transactions(&self, txs: &[(&Transaction, TransactionType)]) {
68-
let wallet = self.wallet.lock().expect("lock").as_ref().and_then(Weak::upgrade);
69-
if let Some(wallet) = wallet {
70-
for (tx, tx_type) in txs {
71-
if let Err(e) = wallet.classify_broadcast(tx, tx_type) {
72-
log_error!(
73-
self.logger,
74-
"Failed to classify broadcast tx {}: {:?}",
75-
tx.compute_txid(),
76-
e,
77-
);
78-
}
79-
}
80-
}
81-
let package = txs.iter().map(|(t, _)| (*t).clone()).collect::<Vec<Transaction>>();
96+
let package: BroadcastPackage =
97+
txs.iter().map(|(tx, tx_type)| ((*tx).clone(), tx_type.clone())).collect();
8298
self.queue_sender.try_send(package).unwrap_or_else(|e| {
8399
log_error!(self.logger, "Failed to broadcast transactions: {}", e);
84100
});

0 commit comments

Comments
 (0)