Skip to content

Commit edde913

Browse files
committed
Implement sync_onchain_wallet for ChainSource::Electrum
1 parent 3bcf278 commit edde913

File tree

3 files changed

+166
-3
lines changed

3 files changed

+166
-3
lines changed

src/chain/electrum.rs

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
// accordance with one or both of these licenses.
77

88
use crate::config::{
9-
Config, FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS, LDK_WALLET_SYNC_TIMEOUT_SECS,
10-
TX_BROADCAST_TIMEOUT_SECS,
9+
Config, BDK_CLIENT_STOP_GAP, BDK_WALLET_SYNC_TIMEOUT_SECS, FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS,
10+
LDK_WALLET_SYNC_TIMEOUT_SECS, TX_BROADCAST_TIMEOUT_SECS,
1111
};
1212
use crate::error::Error;
1313
use crate::fee_estimator::{
@@ -20,6 +20,12 @@ use lightning::chain::{Confirm, Filter, WatchedOutput};
2020
use lightning::util::ser::Writeable;
2121
use lightning_transaction_sync::ElectrumSyncClient;
2222

23+
use bdk_chain::bdk_core::spk_client::FullScanRequest as BdkFullScanRequest;
24+
use bdk_chain::bdk_core::spk_client::FullScanResponse as BdkFullScanResponse;
25+
use bdk_chain::bdk_core::spk_client::SyncRequest as BdkSyncRequest;
26+
use bdk_chain::bdk_core::spk_client::SyncResponse as BdkSyncResponse;
27+
use bdk_wallet::KeychainKind as BdkKeyChainKind;
28+
2329
use bdk_electrum::BdkElectrumClient;
2430

2531
use electrum_client::{Batch, Client as ElectrumClient, ElectrumApi};
@@ -30,6 +36,8 @@ use std::collections::HashMap;
3036
use std::sync::Arc;
3137
use std::time::{Duration, Instant};
3238

39+
const BDK_ELECTRUM_CLIENT_BATCH_SIZE: usize = 5;
40+
3341
pub(crate) struct ElectrumRuntimeClient {
3442
electrum_client: Arc<ElectrumClient>,
3543
bdk_electrum_client: Arc<BdkElectrumClient<ElectrumClient>>,
@@ -96,6 +104,69 @@ impl ElectrumRuntimeClient {
96104
Ok(res)
97105
}
98106

107+
pub(crate) async fn get_full_scan_wallet_update(
108+
&self, request: BdkFullScanRequest<BdkKeyChainKind>,
109+
cached_txs: impl IntoIterator<Item = impl Into<Arc<Transaction>>>,
110+
) -> Result<BdkFullScanResponse<BdkKeyChainKind>, Error> {
111+
let bdk_electrum_client = Arc::clone(&self.bdk_electrum_client);
112+
bdk_electrum_client.populate_tx_cache(cached_txs);
113+
114+
let spawn_fut = self.runtime.spawn_blocking(move || {
115+
bdk_electrum_client.full_scan(
116+
request,
117+
BDK_CLIENT_STOP_GAP,
118+
BDK_ELECTRUM_CLIENT_BATCH_SIZE,
119+
true,
120+
)
121+
});
122+
let wallet_sync_timeout_fut =
123+
tokio::time::timeout(Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut);
124+
125+
wallet_sync_timeout_fut
126+
.await
127+
.map_err(|e| {
128+
log_error!(self.logger, "Sync of on-chain wallet timed out: {}", e);
129+
Error::WalletOperationTimeout
130+
})?
131+
.map_err(|e| {
132+
log_error!(self.logger, "Sync of on-chain wallet failed: {}", e);
133+
Error::WalletOperationFailed
134+
})?
135+
.map_err(|e| {
136+
log_error!(self.logger, "Sync of on-chain wallet failed: {}", e);
137+
Error::WalletOperationFailed
138+
})
139+
}
140+
141+
pub(crate) async fn get_incremental_sync_wallet_update(
142+
&self, request: BdkSyncRequest<(BdkKeyChainKind, u32)>,
143+
cached_txs: impl IntoIterator<Item = impl Into<Arc<Transaction>>>,
144+
) -> Result<BdkSyncResponse, Error> {
145+
let bdk_electrum_client = Arc::clone(&self.bdk_electrum_client);
146+
bdk_electrum_client.populate_tx_cache(cached_txs);
147+
148+
let spawn_fut = self.runtime.spawn_blocking(move || {
149+
bdk_electrum_client.sync(request, BDK_ELECTRUM_CLIENT_BATCH_SIZE, true)
150+
});
151+
let wallet_sync_timeout_fut =
152+
tokio::time::timeout(Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut);
153+
154+
wallet_sync_timeout_fut
155+
.await
156+
.map_err(|e| {
157+
log_error!(self.logger, "Incremental sync of on-chain wallet timed out: {}", e);
158+
Error::WalletOperationTimeout
159+
})?
160+
.map_err(|e| {
161+
log_error!(self.logger, "Incremental sync of on-chain wallet failed: {}", e);
162+
Error::WalletOperationFailed
163+
})?
164+
.map_err(|e| {
165+
log_error!(self.logger, "Incremental sync of on-chain wallet failed: {}", e);
166+
Error::WalletOperationFailed
167+
})
168+
}
169+
99170
pub(crate) async fn broadcast(&self, tx: Transaction) {
100171
let electrum_client = Arc::clone(&self.electrum_client);
101172

src/chain/mod.rs

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,95 @@ impl ChainSource {
678678

679679
res
680680
},
681-
Self::Electrum { .. } => todo!(),
681+
Self::Electrum {
682+
electrum_runtime_client,
683+
onchain_wallet,
684+
onchain_wallet_sync_status,
685+
kv_store,
686+
logger,
687+
node_metrics,
688+
..
689+
} => {
690+
let electrum_client: Arc<ElectrumRuntimeClient> =
691+
if let Some(client) = electrum_runtime_client.read().unwrap().as_ref() {
692+
Arc::clone(client)
693+
} else {
694+
debug_assert!(
695+
false,
696+
"We should have started the chain source before syncing the onchain wallet"
697+
);
698+
return Err(Error::FeerateEstimationUpdateFailed);
699+
};
700+
let receiver_res = {
701+
let mut status_lock = onchain_wallet_sync_status.lock().unwrap();
702+
status_lock.register_or_subscribe_pending_sync()
703+
};
704+
if let Some(mut sync_receiver) = receiver_res {
705+
log_info!(logger, "Sync in progress, skipping.");
706+
return sync_receiver.recv().await.map_err(|e| {
707+
debug_assert!(false, "Failed to receive wallet sync result: {:?}", e);
708+
log_error!(logger, "Failed to receive wallet sync result: {:?}", e);
709+
Error::WalletOperationFailed
710+
})?;
711+
}
712+
713+
// If this is our first sync, do a full scan with the configured gap limit.
714+
// Otherwise just do an incremental sync.
715+
let incremental_sync =
716+
node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some();
717+
718+
macro_rules! get_and_apply_wallet_update {
719+
($sync_future: expr) => {{
720+
let now = Instant::now();
721+
let update = $sync_future.await?;
722+
723+
match onchain_wallet.apply_update(update) {
724+
Ok(()) => {
725+
log_info!(
726+
logger,
727+
"{} of on-chain wallet finished in {}ms.",
728+
if incremental_sync { "Incremental sync" } else { "Sync" },
729+
now.elapsed().as_millis()
730+
);
731+
let unix_time_secs_opt = SystemTime::now()
732+
.duration_since(UNIX_EPOCH)
733+
.ok()
734+
.map(|d| d.as_secs());
735+
{
736+
let mut locked_node_metrics = node_metrics.write().unwrap();
737+
locked_node_metrics.latest_onchain_wallet_sync_timestamp =
738+
unix_time_secs_opt;
739+
write_node_metrics(
740+
&*locked_node_metrics,
741+
Arc::clone(&kv_store),
742+
Arc::clone(&logger),
743+
)?;
744+
}
745+
Ok(())
746+
},
747+
Err(e) => Err(e),
748+
}
749+
}};
750+
}
751+
752+
let cached_txs = onchain_wallet.get_cached_txs();
753+
754+
let res = if incremental_sync {
755+
let incremental_sync_request = onchain_wallet.get_incremental_sync_request();
756+
let incremental_sync_fut = electrum_client
757+
.get_incremental_sync_wallet_update(incremental_sync_request, cached_txs);
758+
get_and_apply_wallet_update!(incremental_sync_fut)
759+
} else {
760+
let full_scan_request = onchain_wallet.get_full_scan_request();
761+
let full_scan_fut =
762+
electrum_client.get_full_scan_wallet_update(full_scan_request, cached_txs);
763+
get_and_apply_wallet_update!(full_scan_fut)
764+
};
765+
766+
onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res);
767+
768+
res
769+
},
682770
Self::BitcoindRpc { .. } => {
683771
// In BitcoindRpc mode we sync lightning and onchain wallet in one go by via
684772
// `ChainPoller`. So nothing to do here.

src/wallet/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ where
9898
self.inner.lock().unwrap().start_sync_with_revealed_spks().build()
9999
}
100100

101+
pub(crate) fn get_cached_txs(&self) -> Vec<Arc<Transaction>> {
102+
self.inner.lock().unwrap().tx_graph().full_txs().map(|tx_node| tx_node.tx).collect()
103+
}
104+
101105
pub(crate) fn current_best_block(&self) -> BestBlock {
102106
let checkpoint = self.inner.lock().unwrap().latest_checkpoint();
103107
BestBlock { block_hash: checkpoint.hash(), height: checkpoint.height() }

0 commit comments

Comments
 (0)