Skip to content

Commit 9a41713

Browse files
committed
Merge branch 'pr-4' into add-cbf-chain-source
# Conflicts: # src/builder.rs # src/chain/cbf.rs # src/chain/mod.rs # src/config.rs # tests/common/mod.rs # tests/integration_tests_rust.rs
2 parents 07a1347 + f4a8e46 commit 9a41713

File tree

7 files changed

+303
-46
lines changed

7 files changed

+303
-46
lines changed

src/builder.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use lightning::util::sweep::OutputSweeper;
4242
use lightning_persister::fs_store::v1::FilesystemStore;
4343
use vss_client::headers::VssHeaderProvider;
4444

45-
use crate::chain::ChainSource;
45+
use crate::chain::{ChainSource, FeeSourceConfig};
4646
use crate::config::{
4747
default_user_config, may_announce_channel, AnnounceError, AsyncPaymentsRole,
4848
BitcoindRestClientConfig, CbfSyncConfig, Config, ElectrumSyncConfig, EsploraSyncConfig,
@@ -108,6 +108,7 @@ enum ChainDataSourceConfig {
108108
Cbf {
109109
peers: Vec<String>,
110110
sync_config: Option<CbfSyncConfig>,
111+
fee_source_config: Option<FeeSourceConfig>,
111112
},
112113
}
113114

@@ -384,8 +385,10 @@ impl NodeBuilder {
384385
/// target selection partially mitigates this.
385386
pub fn set_chain_source_cbf(
386387
&mut self, peers: Vec<String>, sync_config: Option<CbfSyncConfig>,
388+
fee_source_config: Option<FeeSourceConfig>,
387389
) -> &mut Self {
388-
self.chain_data_source_config = Some(ChainDataSourceConfig::Cbf { peers, sync_config });
390+
self.chain_data_source_config =
391+
Some(ChainDataSourceConfig::Cbf { peers, sync_config, fee_source_config });
389392
self
390393
}
391394

@@ -929,8 +932,11 @@ impl ArcedNodeBuilder {
929932
/// divided by block weight) rather than per-transaction fee rates. This can underestimate
930933
/// next-block inclusion rates during periods of high mempool congestion. Percentile-based
931934
/// target selection partially mitigates this.
932-
pub fn set_chain_source_cbf(&self, peers: Vec<String>, sync_config: Option<CbfSyncConfig>) {
933-
self.inner.write().unwrap().set_chain_source_cbf(peers, sync_config);
935+
pub fn set_chain_source_cbf(
936+
&self, peers: Vec<String>, sync_config: Option<CbfSyncConfig>,
937+
fee_source_config: Option<FeeSourceConfig>,
938+
) {
939+
self.inner.write().unwrap().set_chain_source_cbf(peers, sync_config, fee_source_config);
934940
}
935941

936942
/// Configures the [`Node`] instance to connect to a Bitcoin Core node via RPC.
@@ -1405,11 +1411,12 @@ fn build_with_store_internal(
14051411
}),
14061412
},
14071413

1408-
Some(ChainDataSourceConfig::Cbf { peers, sync_config }) => {
1414+
Some(ChainDataSourceConfig::Cbf { peers, sync_config, fee_source_config }) => {
14091415
let sync_config = sync_config.clone().unwrap_or(CbfSyncConfig::default());
14101416
ChainSource::new_cbf(
14111417
peers.clone(),
14121418
sync_config,
1419+
fee_source_config.clone(),
14131420
Arc::clone(&fee_estimator),
14141421
Arc::clone(&tx_broadcaster),
14151422
Arc::clone(&kv_store),
@@ -2182,14 +2189,15 @@ mod tests {
21822189
let sync_config = CbfSyncConfig::default();
21832190

21842191
let peers = vec!["127.0.0.1:8333".to_string()];
2185-
builder.set_chain_source_cbf(peers.clone(), Some(sync_config.clone()));
2192+
builder.set_chain_source_cbf(peers.clone(), Some(sync_config.clone()), None);
21862193

21872194
let guard = builder.inner.read().unwrap();
21882195
assert!(matches!(
21892196
guard.chain_data_source_config.as_ref(),
21902197
Some(super::ChainDataSourceConfig::Cbf {
21912198
peers: p,
21922199
sync_config: Some(config),
2200+
fee_source_config: None,
21932201
}) if config == &sync_config && p == &peers
21942202
));
21952203
}

src/chain/cbf.rs

Lines changed: 206 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@ use bip157::{
1919
};
2020
use bitcoin::constants::SUBSIDY_HALVING_INTERVAL;
2121
use bitcoin::{Amount, FeeRate, Network, Script, ScriptBuf, Transaction, Txid};
22+
use electrum_client::ElectrumApi;
2223
use lightning::chain::{Confirm, WatchedOutput};
2324
use lightning::util::ser::Writeable;
2425
use tokio::sync::{mpsc, oneshot};
2526

26-
use super::WalletSyncStatus;
27+
use super::{FeeSourceConfig, WalletSyncStatus};
2728
use crate::config::{CbfSyncConfig, Config, BDK_CLIENT_STOP_GAP};
2829
use crate::error::Error;
2930
use crate::fee_estimator::{
@@ -42,11 +43,29 @@ const MIN_FEERATE_SAT_PER_KWU: u64 = 250;
4243
/// Number of recent blocks to look back for per-target fee rate estimation.
4344
const FEE_RATE_LOOKBACK_BLOCKS: usize = 6;
4445

46+
/// The fee estimation back-end used by the CBF chain source.
47+
enum FeeSource {
48+
/// Derive fee rates from the coinbase reward of recent blocks.
49+
///
50+
/// Provides a per-target rate using percentile selection across multiple blocks.
51+
/// Less accurate than a mempool-aware source but requires no extra connectivity.
52+
Cbf,
53+
/// Delegate fee estimation to an Esplora HTTP server.
54+
Esplora { client: esplora_client::AsyncClient },
55+
/// Delegate fee estimation to an Electrum server.
56+
///
57+
/// A fresh connection is opened for each estimation cycle because `ElectrumClient`
58+
/// is not `Sync`.
59+
Electrum { server_url: String },
60+
}
61+
4562
pub(super) struct CbfChainSource {
4663
/// Peer addresses for sourcing compact block filters via P2P.
4764
peers: Vec<String>,
4865
/// User-provided sync configuration (timeouts, background sync intervals).
4966
pub(super) sync_config: CbfSyncConfig,
67+
/// Fee estimation back-end.
68+
fee_source: FeeSource,
5069
/// Tracks whether the bip157 node is running and holds the command handle.
5170
cbf_runtime_status: Mutex<CbfRuntimeStatus>,
5271
/// Latest chain tip hash, updated by the background event processing task.
@@ -104,10 +123,22 @@ struct CbfEventState {
104123

105124
impl CbfChainSource {
106125
pub(crate) fn new(
107-
peers: Vec<String>, sync_config: CbfSyncConfig, fee_estimator: Arc<OnchainFeeEstimator>,
108-
kv_store: Arc<DynStore>, config: Arc<Config>, logger: Arc<Logger>,
109-
node_metrics: Arc<RwLock<NodeMetrics>>,
126+
peers: Vec<String>, sync_config: CbfSyncConfig, fee_source_config: Option<FeeSourceConfig>,
127+
fee_estimator: Arc<OnchainFeeEstimator>, kv_store: Arc<DynStore>, config: Arc<Config>,
128+
logger: Arc<Logger>, node_metrics: Arc<RwLock<NodeMetrics>>,
110129
) -> Self {
130+
let fee_source = match fee_source_config {
131+
Some(FeeSourceConfig::Esplora(server_url)) => {
132+
let timeout = sync_config.timeouts_config.per_request_timeout_secs;
133+
let mut builder = esplora_client::Builder::new(&server_url);
134+
builder = builder.timeout(timeout as u64);
135+
let client = builder.build_async().unwrap();
136+
FeeSource::Esplora { client }
137+
},
138+
Some(FeeSourceConfig::Electrum(server_url)) => FeeSource::Electrum { server_url },
139+
None => FeeSource::Cbf,
140+
};
141+
111142
let cbf_runtime_status = Mutex::new(CbfRuntimeStatus::Stopped);
112143
let latest_tip = Arc::new(Mutex::new(None));
113144
let watched_scripts = Arc::new(RwLock::new(Vec::new()));
@@ -124,6 +155,7 @@ impl CbfChainSource {
124155
Self {
125156
peers,
126157
sync_config,
158+
fee_source,
127159
cbf_runtime_status,
128160
latest_tip,
129161
watched_scripts,
@@ -480,8 +512,24 @@ impl CbfChainSource {
480512
async fn sync_onchain_wallet_op(
481513
&self, requester: Requester, scripts: Vec<ScriptBuf>,
482514
) -> Result<(TxUpdate<ConfirmationBlockTime>, SyncUpdate), Error> {
483-
let skip_height = *self.last_onchain_synced_height.lock().unwrap();
484-
let (sync_update, matched) = self.run_filter_scan(scripts, skip_height).await?;
515+
// Always do a full scan (skip_height=None) for the on-chain wallet.
516+
// Unlike the Lightning wallet which can rely on reorg_queue events,
517+
// the on-chain wallet needs to see all blocks to correctly detect
518+
// reorgs via checkpoint comparison in the caller.
519+
//
520+
// We include LDK-registered scripts (e.g., channel funding output
521+
// scripts) alongside the wallet scripts. This ensures the on-chain
522+
// wallet scan also fetches blocks containing channel funding
523+
// transactions, whose outputs are needed by BDK's TxGraph to
524+
// calculate fees for subsequent spends such as splice transactions.
525+
// Without these, BDK's `calculate_fee` would fail with
526+
// `MissingTxOut` because the parent transaction's outputs are
527+
// unknown. This mirrors what the Bitcoind chain source does in
528+
// `Wallet::block_connected` by inserting registered tx outputs.
529+
let mut all_scripts = scripts;
530+
// we query all registered scripts, not only BDK-related
531+
all_scripts.extend(self.registered_scripts.lock().unwrap().iter().cloned());
532+
let (sync_update, matched) = self.run_filter_scan(all_scripts, None).await?;
485533

486534
log_debug!(
487535
self.logger,
@@ -637,13 +685,45 @@ impl CbfChainSource {
637685
}
638686

639687
pub(crate) async fn update_fee_rate_estimates(&self) -> Result<(), Error> {
688+
let new_fee_rate_cache = match &self.fee_source {
689+
FeeSource::Cbf => self.fee_rate_cache_from_cbf().await?,
690+
FeeSource::Esplora { client } => Some(self.fee_rate_cache_from_esplora(client).await?),
691+
FeeSource::Electrum { server_url } => {
692+
Some(self.fee_rate_cache_from_electrum(server_url).await?)
693+
},
694+
};
695+
696+
let Some(new_fee_rate_cache) = new_fee_rate_cache else {
697+
return Ok(());
698+
};
699+
700+
self.fee_estimator.set_fee_rate_cache(new_fee_rate_cache);
701+
702+
update_node_metrics_timestamp(
703+
&self.node_metrics,
704+
&*self.kv_store,
705+
&*self.logger,
706+
|m, t| {
707+
m.latest_fee_rate_cache_update_timestamp = t;
708+
},
709+
)?;
710+
711+
Ok(())
712+
}
713+
714+
/// Derive per-target fee rates from recent blocks' coinbase outputs.
715+
///
716+
/// Returns `Ok(None)` when no chain tip is available yet (first startup before sync).
717+
async fn fee_rate_cache_from_cbf(
718+
&self,
719+
) -> Result<Option<HashMap<crate::fee_estimator::ConfirmationTarget, FeeRate>>, Error> {
640720
let requester = self.requester()?;
641721

642722
let tip_hash = match *self.latest_tip.lock().unwrap() {
643723
Some(hash) => hash,
644724
None => {
645725
log_debug!(self.logger, "No tip available yet for fee rate estimation, skipping.");
646-
return Ok(());
726+
return Ok(None);
647727
},
648728
};
649729

@@ -685,7 +765,7 @@ impl CbfChainSource {
685765
e
686766
);
687767
*self.latest_tip.lock().unwrap() = None;
688-
return Ok(());
768+
return Ok(None);
689769
},
690770
Ok(Err(e)) => {
691771
log_error!(
@@ -704,7 +784,7 @@ impl CbfChainSource {
704784
e
705785
);
706786
*self.latest_tip.lock().unwrap() = None;
707-
return Ok(());
787+
return Ok(None);
708788
},
709789
Err(e) => {
710790
log_error!(self.logger, "Updating fee rate estimates timed out: {}", e);
@@ -772,25 +852,130 @@ impl CbfChainSource {
772852
);
773853
}
774854

775-
self.fee_estimator.set_fee_rate_cache(new_fee_rate_cache);
776-
777855
log_debug!(
778856
self.logger,
779-
"Fee rate cache update finished in {}ms ({} blocks sampled).",
857+
"CBF fee rate estimation finished in {}ms ({} blocks sampled).",
780858
now.elapsed().as_millis(),
781859
block_fee_rates.len(),
782860
);
783861

784-
update_node_metrics_timestamp(
785-
&self.node_metrics,
786-
&*self.kv_store,
787-
&*self.logger,
788-
|m, t| {
789-
m.latest_fee_rate_cache_update_timestamp = t;
790-
},
791-
)?;
862+
Ok(Some(new_fee_rate_cache))
863+
}
792864

793-
Ok(())
865+
/// Fetch per-target fee rates from an Esplora server.
866+
async fn fee_rate_cache_from_esplora(
867+
&self, client: &esplora_client::AsyncClient,
868+
) -> Result<HashMap<crate::fee_estimator::ConfirmationTarget, FeeRate>, Error> {
869+
let timeout = Duration::from_secs(
870+
self.sync_config.timeouts_config.fee_rate_cache_update_timeout_secs,
871+
);
872+
let estimates = tokio::time::timeout(timeout, client.get_fee_estimates())
873+
.await
874+
.map_err(|e| {
875+
log_error!(self.logger, "Updating fee rate estimates timed out: {}", e);
876+
Error::FeerateEstimationUpdateTimeout
877+
})?
878+
.map_err(|e| {
879+
log_error!(self.logger, "Failed to retrieve fee rate estimates: {}", e);
880+
Error::FeerateEstimationUpdateFailed
881+
})?;
882+
883+
if estimates.is_empty() && self.config.network == Network::Bitcoin {
884+
log_error!(
885+
self.logger,
886+
"Failed to retrieve fee rate estimates: empty estimates are disallowed on Mainnet.",
887+
);
888+
return Err(Error::FeerateEstimationUpdateFailed);
889+
}
890+
891+
let confirmation_targets = get_all_conf_targets();
892+
let mut new_fee_rate_cache = HashMap::with_capacity(confirmation_targets.len());
893+
for target in confirmation_targets {
894+
let num_blocks = get_num_block_defaults_for_target(target);
895+
let converted_estimate_sat_vb =
896+
esplora_client::convert_fee_rate(num_blocks, estimates.clone())
897+
.map_or(1.0, |converted| converted.max(1.0));
898+
let fee_rate = FeeRate::from_sat_per_kwu((converted_estimate_sat_vb * 250.0) as u64);
899+
let adjusted_fee_rate = apply_post_estimation_adjustments(target, fee_rate);
900+
new_fee_rate_cache.insert(target, adjusted_fee_rate);
901+
902+
log_trace!(
903+
self.logger,
904+
"Fee rate estimation updated for {:?}: {} sats/kwu",
905+
target,
906+
adjusted_fee_rate.to_sat_per_kwu(),
907+
);
908+
}
909+
Ok(new_fee_rate_cache)
910+
}
911+
912+
/// Fetch per-target fee rates from an Electrum server.
913+
///
914+
/// Opens a fresh connection for each call because `ElectrumClient` is not `Sync`.
915+
async fn fee_rate_cache_from_electrum(
916+
&self, server_url: &str,
917+
) -> Result<HashMap<crate::fee_estimator::ConfirmationTarget, FeeRate>, Error> {
918+
let server_url = server_url.to_owned();
919+
let confirmation_targets = get_all_conf_targets();
920+
let per_request_timeout = self.sync_config.timeouts_config.per_request_timeout_secs;
921+
922+
let raw_estimates: Vec<serde_json::Value> = tokio::time::timeout(
923+
Duration::from_secs(
924+
self.sync_config.timeouts_config.fee_rate_cache_update_timeout_secs,
925+
),
926+
tokio::task::spawn_blocking(move || {
927+
let electrum_config = electrum_client::ConfigBuilder::new()
928+
.retry(3)
929+
.timeout(Some(per_request_timeout))
930+
.build();
931+
let client = electrum_client::Client::from_config(&server_url, electrum_config)
932+
.map_err(|_| Error::FeerateEstimationUpdateFailed)?;
933+
let mut batch = electrum_client::Batch::default();
934+
for target in confirmation_targets {
935+
batch.estimate_fee(get_num_block_defaults_for_target(target));
936+
}
937+
client.batch_call(&batch).map_err(|_| Error::FeerateEstimationUpdateFailed)
938+
}),
939+
)
940+
.await
941+
.map_err(|e| {
942+
log_error!(self.logger, "Updating fee rate estimates timed out: {}", e);
943+
Error::FeerateEstimationUpdateTimeout
944+
})?
945+
.map_err(|_| Error::FeerateEstimationUpdateFailed)? // JoinError
946+
?; // inner Result
947+
948+
let confirmation_targets = get_all_conf_targets();
949+
950+
if raw_estimates.len() != confirmation_targets.len()
951+
&& self.config.network == Network::Bitcoin
952+
{
953+
log_error!(
954+
self.logger,
955+
"Failed to retrieve fee rate estimates: Electrum server didn't return all expected results.",
956+
);
957+
return Err(Error::FeerateEstimationUpdateFailed);
958+
}
959+
960+
let mut new_fee_rate_cache = HashMap::with_capacity(confirmation_targets.len());
961+
for (target, raw_rate) in confirmation_targets.into_iter().zip(raw_estimates.into_iter()) {
962+
// Electrum returns BTC/KvB; fall back to 1 sat/vb (= 0.00001 BTC/KvB) on failure.
963+
let fee_rate_btc_per_kvb =
964+
raw_rate.as_f64().map_or(0.00001_f64, |v: f64| v.max(0.00001));
965+
// Convert BTC/KvB → sat/kwu: multiply by 25_000_000 (= 10^8 / 4).
966+
let fee_rate =
967+
FeeRate::from_sat_per_kwu((fee_rate_btc_per_kvb * 25_000_000.0).round() as u64);
968+
let adjusted_fee_rate = apply_post_estimation_adjustments(target, fee_rate);
969+
new_fee_rate_cache.insert(target, adjusted_fee_rate);
970+
971+
log_trace!(
972+
self.logger,
973+
"Fee rate estimation updated for {:?}: {} sats/kwu",
974+
target,
975+
adjusted_fee_rate.to_sat_per_kwu(),
976+
);
977+
}
978+
Ok(new_fee_rate_cache)
794979
}
795980

796981
/// Broadcast a package of transactions via the P2P network.

0 commit comments

Comments
 (0)