Skip to content

Commit d5b6bda

Browse files
jkczyzclaude
andcommitted
Persist payment transaction data without blocking LDK
Previously the BroadcasterInterface implementation wrote the payment record synchronously when LDK invoked it. With a remote KV store this could block LDK's message handling for hundreds of milliseconds per call, noticeably during force-close bursts or splice broadcasts. Persistence now happens asynchronously and must complete before the transaction is sent to the chain client. If persistence fails, the broadcast is dropped: a payment record must exist for every on-chain tx we emit, otherwise a crash could leave the tx confirmed with no matching record. Generated with assistance from Claude Code. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent b55159a commit d5b6bda

2 files changed

Lines changed: 52 additions & 24 deletions

File tree

src/chain/mod.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::config::{
2424
WALLET_SYNC_INTERVAL_MINIMUM_SECS,
2525
};
2626
use crate::fee_estimator::OnchainFeeEstimator;
27-
use crate::logger::{log_debug, log_info, log_trace, LdkLogger, Logger};
27+
use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger};
2828
use crate::runtime::Runtime;
2929
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
3030
use crate::{Error, NodeMetrics};
@@ -453,15 +453,26 @@ impl ChainSource {
453453
return;
454454
}
455455
Some(next_package) = receiver.recv() => {
456+
let txs = match self.tx_broadcaster.classify_package(next_package).await {
457+
Ok(txs) => txs,
458+
Err(e) => {
459+
log_error!(
460+
tx_bcast_logger,
461+
"Skipping broadcast: failed to persist payment records: {:?}",
462+
e,
463+
);
464+
continue;
465+
},
466+
};
456467
match &self.kind {
457468
ChainSourceKind::Esplora(esplora_chain_source) => {
458-
esplora_chain_source.process_broadcast_package(next_package).await
469+
esplora_chain_source.process_broadcast_package(txs).await
459470
},
460471
ChainSourceKind::Electrum(electrum_chain_source) => {
461-
electrum_chain_source.process_broadcast_package(next_package).await
472+
electrum_chain_source.process_broadcast_package(txs).await
462473
},
463474
ChainSourceKind::Bitcoind(bitcoind_chain_source) => {
464-
bitcoind_chain_source.process_broadcast_package(next_package).await
475+
bitcoind_chain_source.process_broadcast_package(txs).await
465476
},
466477
}
467478
}

src/tx_broadcaster.rs

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,26 @@ use tokio::sync::{mpsc, Mutex, MutexGuard};
1414

1515
use crate::logger::{log_error, LdkLogger};
1616
use crate::types::Wallet;
17+
use crate::Error;
1718

1819
const BCAST_PACKAGE_QUEUE_SIZE: usize = 50;
1920

21+
/// A package of transactions that LDK handed to the broadcaster in one
22+
/// `broadcast_transactions` call, along with each transaction's type. Queued until the
23+
/// background task classifies and broadcasts it.
24+
pub(crate) type BroadcastPackage = Vec<(Transaction, TransactionType)>;
25+
2026
pub(crate) struct TransactionBroadcaster<L: Deref>
2127
where
2228
L::Target: LdkLogger,
2329
{
24-
queue_sender: mpsc::Sender<Vec<Transaction>>,
25-
queue_receiver: Mutex<mpsc::Receiver<Vec<Transaction>>>,
30+
queue_sender: mpsc::Sender<BroadcastPackage>,
31+
queue_receiver: Mutex<mpsc::Receiver<BroadcastPackage>>,
2632
/// Weak handle to the [`Wallet`] that performs classification of funding broadcasts
2733
/// (channel opens and splices) into payment records. Remains `None` while the
28-
/// builder is wiring the node up, during which broadcasts are still forwarded to
29-
/// the queue but no payment record is written. [`Self::set_wallet`] installs the
30-
/// handle once the [`Wallet`] exists.
34+
/// builder is wiring the node up, during which broadcasts are forwarded to the
35+
/// queue but no payment record is written. [`Self::set_wallet`] installs the handle
36+
/// once the [`Wallet`] exists.
3137
wallet: StdMutex<Option<Weak<Wallet>>>,
3238
logger: L,
3339
}
@@ -55,30 +61,41 @@ where
5561

5662
pub(crate) async fn get_broadcast_queue(
5763
&self,
58-
) -> MutexGuard<'_, mpsc::Receiver<Vec<Transaction>>> {
64+
) -> MutexGuard<'_, mpsc::Receiver<BroadcastPackage>> {
5965
self.queue_receiver.lock().await
6066
}
67+
68+
/// Classifies a queued package into payment records and returns the raw
69+
/// transactions ready for the chain client. Returns `Err` if any classification
70+
/// fails; callers must not broadcast the package in that case, since a crash would
71+
/// leave the tx on-chain without a record.
72+
pub(crate) async fn classify_package(
73+
&self, package: BroadcastPackage,
74+
) -> Result<Vec<Transaction>, Error> {
75+
let wallet_opt = self.wallet.lock().expect("lock").as_ref().and_then(Weak::upgrade);
76+
if let Some(wallet) = wallet_opt {
77+
let package = tokio::task::spawn_blocking(move || {
78+
for (tx, tx_type) in &package {
79+
wallet.classify_broadcast(tx, tx_type)?;
80+
}
81+
Ok::<_, Error>(package)
82+
})
83+
.await
84+
.map_err(|_| Error::PersistenceFailed)??;
85+
Ok(package.into_iter().map(|(tx, _)| tx).collect())
86+
} else {
87+
Ok(package.into_iter().map(|(tx, _)| tx).collect())
88+
}
89+
}
6190
}
6291

6392
impl<L: Deref> BroadcasterInterface for TransactionBroadcaster<L>
6493
where
6594
L::Target: LdkLogger,
6695
{
6796
fn broadcast_transactions(&self, txs: &[(&Transaction, TransactionType)]) {
68-
let wallet = self.wallet.lock().expect("lock").as_ref().and_then(Weak::upgrade);
69-
if let Some(wallet) = wallet {
70-
for (tx, tx_type) in txs {
71-
if let Err(e) = wallet.classify_broadcast(tx, tx_type) {
72-
log_error!(
73-
self.logger,
74-
"Failed to classify broadcast tx {}: {:?}",
75-
tx.compute_txid(),
76-
e,
77-
);
78-
}
79-
}
80-
}
81-
let package = txs.iter().map(|(t, _)| (*t).clone()).collect::<Vec<Transaction>>();
97+
let package: BroadcastPackage =
98+
txs.iter().map(|(tx, tx_type)| ((*tx).clone(), tx_type.clone())).collect();
8299
self.queue_sender.try_send(package).unwrap_or_else(|e| {
83100
log_error!(self.logger, "Failed to broadcast transactions: {}", e);
84101
});

0 commit comments

Comments
 (0)