Skip to content

Commit d98fb47

Browse files
committed
Add incremental on-chain wallet sync
1 parent ae80301 commit d98fb47

File tree

2 files changed

+109
-49
lines changed

2 files changed

+109
-49
lines changed

src/chain/cbf.rs

Lines changed: 82 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
use std::collections::{BTreeMap, HashMap};
99
use std::net::SocketAddr;
10+
use std::sync::atomic::{AtomicU32, Ordering};
1011
use std::sync::{Arc, Mutex, RwLock};
1112
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
1213

@@ -54,6 +55,10 @@ pub(super) struct CbfChainSource {
5455
matched_block_hashes: Arc<Mutex<Vec<(u32, BlockHash)>>>,
5556
/// One-shot channel sender to signal filter scan completion.
5657
sync_completion_tx: Arc<Mutex<Option<oneshot::Sender<SyncUpdate>>>>,
58+
/// Filters at or below this height are skipped during incremental scans.
59+
filter_skip_height: Arc<AtomicU32>,
60+
/// Last block height reached by on-chain wallet sync, used for incremental scans.
61+
last_onchain_synced_height: Mutex<Option<u32>>,
5762
/// Deduplicates concurrent on-chain wallet sync requests.
5863
onchain_wallet_sync_status: Mutex<WalletSyncStatus>,
5964
/// Shared fee rate estimator, updated by this chain source.
@@ -79,6 +84,7 @@ struct CbfEventState {
7984
watched_scripts: Arc<RwLock<Vec<ScriptBuf>>>,
8085
matched_block_hashes: Arc<Mutex<Vec<(u32, BlockHash)>>>,
8186
sync_completion_tx: Arc<Mutex<Option<oneshot::Sender<SyncUpdate>>>>,
87+
filter_skip_height: Arc<AtomicU32>,
8288
}
8389

8490
impl CbfChainSource {
@@ -92,6 +98,8 @@ impl CbfChainSource {
9298
let watched_scripts = Arc::new(RwLock::new(Vec::new()));
9399
let matched_block_hashes = Arc::new(Mutex::new(Vec::new()));
94100
let sync_completion_tx = Arc::new(Mutex::new(None));
101+
let filter_skip_height = Arc::new(AtomicU32::new(0));
102+
let last_onchain_synced_height = Mutex::new(None);
95103
let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed);
96104
Self {
97105
peers,
@@ -101,6 +109,8 @@ impl CbfChainSource {
101109
watched_scripts,
102110
matched_block_hashes,
103111
sync_completion_tx,
112+
filter_skip_height,
113+
last_onchain_synced_height,
104114
onchain_wallet_sync_status,
105115
fee_estimator,
106116
kv_store,
@@ -171,6 +181,7 @@ impl CbfChainSource {
171181
watched_scripts: Arc::clone(&self.watched_scripts),
172182
matched_block_hashes: Arc::clone(&self.matched_block_hashes),
173183
sync_completion_tx: Arc::clone(&self.sync_completion_tx),
184+
filter_skip_height: Arc::clone(&self.filter_skip_height),
174185
};
175186
let event_logger = Arc::clone(&self.logger);
176187
runtime.spawn_cancellable_background_task(Self::process_events(
@@ -236,6 +247,10 @@ impl CbfChainSource {
236247
log_debug!(logger, "CBF chain update: {:?}", header_changes);
237248
},
238249
Event::IndexedFilter(indexed_filter) => {
250+
let skip_height = state.filter_skip_height.load(Ordering::Acquire);
251+
if skip_height > 0 && indexed_filter.height() <= skip_height {
252+
continue;
253+
}
239254
let scripts = state.watched_scripts.read().unwrap();
240255
if !scripts.is_empty() && indexed_filter.contains_any(scripts.iter()) {
241256
state
@@ -266,11 +281,15 @@ impl CbfChainSource {
266281

267282
/// Run a CBF filter scan: set watched scripts, trigger a rescan, wait for
268283
/// completion, and return the sync update along with matched block hashes.
284+
///
285+
/// When `skip_before_height` is `Some(h)`, filters at or below height `h` are
286+
/// skipped, making the scan incremental.
269287
async fn run_filter_scan(
270-
&self, scripts: Vec<ScriptBuf>,
288+
&self, scripts: Vec<ScriptBuf>, skip_before_height: Option<u32>,
271289
) -> Result<(SyncUpdate, Vec<(u32, BlockHash)>), Error> {
272290
let requester = self.requester()?;
273291

292+
self.filter_skip_height.store(skip_before_height.unwrap_or(0), Ordering::Release);
274293
self.matched_block_hashes.lock().unwrap().clear();
275294
*self.watched_scripts.write().unwrap() = scripts;
276295

@@ -287,6 +306,7 @@ impl CbfChainSource {
287306
Error::WalletOperationFailed
288307
})?;
289308

309+
self.filter_skip_height.store(0, Ordering::Release);
290310
self.watched_scripts.write().unwrap().clear();
291311
let matched = std::mem::take(&mut *self.matched_block_hashes.lock().unwrap());
292312

@@ -310,69 +330,79 @@ impl CbfChainSource {
310330
})?;
311331
}
312332

313-
let requester = self.requester()?;
314-
let now = Instant::now();
333+
let res = async {
334+
let requester = self.requester()?;
335+
let now = Instant::now();
315336

316-
let scripts = onchain_wallet.get_spks_for_cbf_sync(BDK_CLIENT_STOP_GAP);
317-
if scripts.is_empty() {
318-
log_debug!(self.logger, "No wallet scripts to sync via CBF.");
319-
return Ok(());
320-
}
337+
let scripts = onchain_wallet.get_spks_for_cbf_sync(BDK_CLIENT_STOP_GAP);
338+
if scripts.is_empty() {
339+
log_debug!(self.logger, "No wallet scripts to sync via CBF.");
340+
return Ok(());
341+
}
321342

322-
let timeout_fut = tokio::time::timeout(
323-
Duration::from_secs(self.sync_config.timeouts_config.onchain_wallet_sync_timeout_secs),
324-
self.sync_onchain_wallet_op(requester, scripts),
325-
);
343+
let timeout_fut = tokio::time::timeout(
344+
Duration::from_secs(
345+
self.sync_config.timeouts_config.onchain_wallet_sync_timeout_secs,
346+
),
347+
self.sync_onchain_wallet_op(requester, scripts),
348+
);
326349

327-
let (tx_update, sync_update) = match timeout_fut.await {
328-
Ok(res) => res?,
329-
Err(e) => {
330-
log_error!(self.logger, "Sync of on-chain wallet timed out: {}", e);
331-
return Err(Error::WalletOperationTimeout);
332-
},
333-
};
350+
let (tx_update, sync_update) = match timeout_fut.await {
351+
Ok(res) => res?,
352+
Err(e) => {
353+
log_error!(self.logger, "Sync of on-chain wallet timed out: {}", e);
354+
return Err(Error::WalletOperationTimeout);
355+
},
356+
};
334357

335-
// Build chain checkpoint extending from the wallet's current tip.
336-
let mut cp = onchain_wallet.latest_checkpoint();
337-
for (height, header) in sync_update.recent_history() {
338-
if *height > cp.height() {
339-
let block_id = BlockId { height: *height, hash: header.block_hash() };
340-
cp = cp.push(block_id).unwrap_or_else(|old| old);
358+
// Build chain checkpoint extending from the wallet's current tip.
359+
let mut cp = onchain_wallet.latest_checkpoint();
360+
for (height, header) in sync_update.recent_history() {
361+
if *height > cp.height() {
362+
let block_id = BlockId { height: *height, hash: header.block_hash() };
363+
cp = cp.push(block_id).unwrap_or_else(|old| old);
364+
}
365+
}
366+
let tip = sync_update.tip();
367+
if tip.height > cp.height() {
368+
let tip_block_id = BlockId { height: tip.height, hash: tip.hash };
369+
cp = cp.push(tip_block_id).unwrap_or_else(|old| old);
341370
}
342-
}
343-
let tip = sync_update.tip();
344-
if tip.height > cp.height() {
345-
let tip_block_id = BlockId { height: tip.height, hash: tip.hash };
346-
cp = cp.push(tip_block_id).unwrap_or_else(|old| old);
347-
}
348371

349-
let update = Update { last_active_indices: BTreeMap::new(), tx_update, chain: Some(cp) };
372+
let update =
373+
Update { last_active_indices: BTreeMap::new(), tx_update, chain: Some(cp) };
350374

351-
// Apply update to wallet.
352-
onchain_wallet.apply_update(update)?;
375+
onchain_wallet.apply_update(update)?;
353376

354-
log_debug!(
355-
self.logger,
356-
"Sync of on-chain wallet via CBF finished in {}ms.",
357-
now.elapsed().as_millis()
358-
);
377+
log_debug!(
378+
self.logger,
379+
"Sync of on-chain wallet via CBF finished in {}ms.",
380+
now.elapsed().as_millis()
381+
);
359382

360-
update_node_metrics_timestamp(
361-
&self.node_metrics,
362-
&*self.kv_store,
363-
&*self.logger,
364-
|m, t| {
365-
m.latest_onchain_wallet_sync_timestamp = t;
366-
},
367-
)?;
383+
update_node_metrics_timestamp(
384+
&self.node_metrics,
385+
&*self.kv_store,
386+
&*self.logger,
387+
|m, t| {
388+
m.latest_onchain_wallet_sync_timestamp = t;
389+
},
390+
)?;
368391

369-
Ok(())
392+
Ok(())
393+
}
394+
.await;
395+
396+
self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res);
397+
398+
res
370399
}
371400

372401
async fn sync_onchain_wallet_op(
373402
&self, requester: Requester, scripts: Vec<ScriptBuf>,
374403
) -> Result<(TxUpdate<ConfirmationBlockTime>, SyncUpdate), Error> {
375-
let (sync_update, matched) = self.run_filter_scan(scripts).await?;
404+
let skip_height = *self.last_onchain_synced_height.lock().unwrap();
405+
let (sync_update, matched) = self.run_filter_scan(scripts, skip_height).await?;
376406

377407
log_debug!(
378408
self.logger,
@@ -401,6 +431,9 @@ impl CbfChainSource {
401431
}
402432
}
403433

434+
let tip = sync_update.tip();
435+
*self.last_onchain_synced_height.lock().unwrap() = Some(tip.height);
436+
404437
Ok((tx_update, sync_update))
405438
}
406439

tests/integration_tests_rust.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2850,3 +2850,30 @@ async fn fee_rate_estimation_after_manual_sync_cbf() {
28502850

28512851
node.stop().unwrap();
28522852
}
2853+
2854+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
2855+
async fn repeated_manual_sync_cbf() {
2856+
let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();
2857+
let chain_source = TestChainSource::Cbf(&bitcoind);
2858+
let node = setup_node(&chain_source, random_config(true));
2859+
2860+
let addr = node.onchain_payment().new_address().unwrap();
2861+
let premine_amount_sat = 100_000;
2862+
2863+
premine_and_distribute_funds(
2864+
&bitcoind.client,
2865+
&electrsd.client,
2866+
vec![addr],
2867+
Amount::from_sat(premine_amount_sat),
2868+
)
2869+
.await;
2870+
2871+
wait_for_cbf_sync(&node).await;
2872+
assert_eq!(node.list_balances().spendable_onchain_balance_sats, premine_amount_sat);
2873+
2874+
// Regression: the second manual sync must not block forever.
2875+
node.sync_wallets().unwrap();
2876+
assert_eq!(node.list_balances().spendable_onchain_balance_sats, premine_amount_sat);
2877+
2878+
node.stop().unwrap();
2879+
}

0 commit comments

Comments
 (0)