Skip to content

Commit 637add7

Browse files
committed
Implement on-chain wallet sync via compact block filters
1 parent 2a93cdd commit 637add7

File tree

3 files changed

+229
-13
lines changed

3 files changed

+229
-13
lines changed

src/chain/cbf.rs

Lines changed: 189 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,23 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8-
use std::collections::HashMap;
8+
use std::collections::{BTreeMap, HashMap};
99
use std::net::SocketAddr;
1010
use std::sync::{Arc, Mutex, RwLock};
1111
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
1212

13-
use bip157::{BlockHash, Builder, Client, Event, Info, Requester, TrustedPeer, Warning};
14-
use bitcoin::{Script, Transaction, Txid};
13+
use bdk_chain::{BlockId, ConfirmationBlockTime, TxUpdate};
14+
use bdk_wallet::Update;
15+
use bip157::{
16+
BlockHash, Builder, Client, Event, Info, Requester, SyncUpdate, TrustedPeer, Warning,
17+
};
18+
use bitcoin::{Script, ScriptBuf, Transaction, Txid};
1519
use lightning::chain::WatchedOutput;
1620
use lightning::util::ser::Writeable;
17-
use tokio::sync::mpsc;
21+
use tokio::sync::{mpsc, oneshot};
1822

19-
use crate::config::{CbfSyncConfig, Config};
23+
use super::WalletSyncStatus;
24+
use crate::config::{CbfSyncConfig, Config, BDK_CLIENT_STOP_GAP};
2025
use crate::fee_estimator::{
2126
apply_post_estimation_adjustments, get_all_conf_targets, OnchainFeeEstimator,
2227
};
@@ -35,6 +40,14 @@ pub(super) struct CbfChainSource {
3540
cbf_runtime_status: Mutex<CbfRuntimeStatus>,
3641
/// Latest chain tip hash, updated by the background event processing task.
3742
latest_tip: Arc<Mutex<Option<BlockHash>>>,
43+
/// Scripts to match against compact block filters during a scan.
44+
watched_scripts: Arc<RwLock<Vec<ScriptBuf>>>,
45+
/// Block (height, hash) pairs where filters matched watched scripts.
46+
matched_block_hashes: Arc<Mutex<Vec<(u32, BlockHash)>>>,
47+
/// One-shot channel sender to signal filter scan completion.
48+
sync_completion_tx: Arc<Mutex<Option<oneshot::Sender<SyncUpdate>>>>,
49+
/// Deduplicates concurrent on-chain wallet sync requests.
50+
onchain_wallet_sync_status: Mutex<WalletSyncStatus>,
3851
/// Shared fee rate estimator, updated by this chain source.
3952
fee_estimator: Arc<OnchainFeeEstimator>,
4053
/// Persistent key-value store for node metrics.
@@ -52,6 +65,14 @@ enum CbfRuntimeStatus {
5265
Stopped,
5366
}
5467

68+
/// Shared state passed to the background event processing task.
69+
struct CbfEventState {
70+
latest_tip: Arc<Mutex<Option<BlockHash>>>,
71+
watched_scripts: Arc<RwLock<Vec<ScriptBuf>>>,
72+
matched_block_hashes: Arc<Mutex<Vec<(u32, BlockHash)>>>,
73+
sync_completion_tx: Arc<Mutex<Option<oneshot::Sender<SyncUpdate>>>>,
74+
}
75+
5576
impl CbfChainSource {
5677
pub(crate) fn new(
5778
peers: Vec<String>, sync_config: CbfSyncConfig, fee_estimator: Arc<OnchainFeeEstimator>,
@@ -60,11 +81,19 @@ impl CbfChainSource {
6081
) -> Self {
6182
let cbf_runtime_status = Mutex::new(CbfRuntimeStatus::Stopped);
6283
let latest_tip = Arc::new(Mutex::new(None));
84+
let watched_scripts = Arc::new(RwLock::new(Vec::new()));
85+
let matched_block_hashes = Arc::new(Mutex::new(Vec::new()));
86+
let sync_completion_tx = Arc::new(Mutex::new(None));
87+
let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed);
6388
Self {
6489
peers,
6590
sync_config,
6691
cbf_runtime_status,
6792
latest_tip,
93+
watched_scripts,
94+
matched_block_hashes,
95+
sync_completion_tx,
96+
onchain_wallet_sync_status,
6897
fee_estimator,
6998
kv_store,
7099
config,
@@ -129,11 +158,16 @@ impl CbfChainSource {
129158
.spawn_cancellable_background_task(Self::process_warn_messages(warn_rx, warn_logger));
130159

131160
// Spawn a task to process events.
161+
let event_state = CbfEventState {
162+
latest_tip: Arc::clone(&self.latest_tip),
163+
watched_scripts: Arc::clone(&self.watched_scripts),
164+
matched_block_hashes: Arc::clone(&self.matched_block_hashes),
165+
sync_completion_tx: Arc::clone(&self.sync_completion_tx),
166+
};
132167
let event_logger = Arc::clone(&self.logger);
133-
let event_tip = Arc::clone(&self.latest_tip);
134168
runtime.spawn_cancellable_background_task(Self::process_events(
135169
event_rx,
136-
event_tip,
170+
event_state,
137171
event_logger,
138172
));
139173

@@ -170,20 +204,22 @@ impl CbfChainSource {
170204
}
171205

172206
async fn process_events(
173-
mut event_rx: mpsc::UnboundedReceiver<Event>, latest_tip: Arc<Mutex<Option<BlockHash>>>,
174-
logger: Arc<Logger>,
207+
mut event_rx: mpsc::UnboundedReceiver<Event>, state: CbfEventState, logger: Arc<Logger>,
175208
) {
176209
while let Some(event) = event_rx.recv().await {
177210
match event {
178211
Event::FiltersSynced(sync_update) => {
179212
let tip = sync_update.tip();
180-
*latest_tip.lock().unwrap() = Some(tip.hash);
213+
*state.latest_tip.lock().unwrap() = Some(tip.hash);
181214
log_info!(
182215
logger,
183216
"CBF filters synced to tip: height={}, hash={}",
184217
tip.height,
185218
tip.hash,
186219
);
220+
if let Some(tx) = state.sync_completion_tx.lock().unwrap().take() {
221+
let _ = tx.send(sync_update);
222+
}
187223
},
188224
Event::Block(indexed_block) => {
189225
log_trace!(logger, "CBF received block at height {}", indexed_block.height,);
@@ -192,6 +228,14 @@ impl CbfChainSource {
192228
log_debug!(logger, "CBF chain update: {:?}", header_changes);
193229
},
194230
Event::IndexedFilter(indexed_filter) => {
231+
let scripts = state.watched_scripts.read().unwrap();
232+
if !scripts.is_empty() && indexed_filter.contains_any(scripts.iter()) {
233+
state
234+
.matched_block_hashes
235+
.lock()
236+
.unwrap()
237+
.push((indexed_filter.height(), indexed_filter.block_hash()));
238+
}
195239
log_trace!(logger, "CBF received filter at height {}", indexed_filter.height(),);
196240
},
197241
}
@@ -212,12 +256,144 @@ impl CbfChainSource {
212256
}
213257
}
214258

259+
/// Run a CBF filter scan: set watched scripts, trigger a rescan, wait for
260+
/// completion, and return the sync update along with matched block hashes.
261+
async fn run_filter_scan(
262+
&self, scripts: Vec<ScriptBuf>,
263+
) -> Result<(SyncUpdate, Vec<(u32, BlockHash)>), Error> {
264+
let requester = self.requester()?;
265+
266+
self.matched_block_hashes.lock().unwrap().clear();
267+
*self.watched_scripts.write().unwrap() = scripts;
268+
269+
let (tx, rx) = oneshot::channel();
270+
*self.sync_completion_tx.lock().unwrap() = Some(tx);
271+
272+
requester.rescan().map_err(|e| {
273+
log_error!(self.logger, "Failed to trigger CBF rescan: {:?}", e);
274+
Error::WalletOperationFailed
275+
})?;
276+
277+
let sync_update = rx.await.map_err(|e| {
278+
log_error!(self.logger, "CBF sync completion channel dropped: {:?}", e);
279+
Error::WalletOperationFailed
280+
})?;
281+
282+
self.watched_scripts.write().unwrap().clear();
283+
let matched = std::mem::take(&mut *self.matched_block_hashes.lock().unwrap());
284+
285+
Ok((sync_update, matched))
286+
}
287+
215288
/// Sync the on-chain wallet by scanning compact block filters for relevant transactions.
216289
pub(crate) async fn sync_onchain_wallet(
217-
&self, _onchain_wallet: Arc<Wallet>,
290+
&self, onchain_wallet: Arc<Wallet>,
218291
) -> Result<(), Error> {
219-
log_error!(self.logger, "On-chain wallet sync via CBF is not yet implemented.");
220-
Err(Error::WalletOperationFailed)
292+
let receiver_res = {
293+
let mut status_lock = self.onchain_wallet_sync_status.lock().unwrap();
294+
status_lock.register_or_subscribe_pending_sync()
295+
};
296+
if let Some(mut sync_receiver) = receiver_res {
297+
log_debug!(self.logger, "On-chain wallet sync already in progress, waiting.");
298+
return sync_receiver.recv().await.map_err(|e| {
299+
debug_assert!(false, "Failed to receive wallet sync result: {:?}", e);
300+
log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e);
301+
Error::WalletOperationFailed
302+
})?;
303+
}
304+
305+
let requester = self.requester()?;
306+
let now = Instant::now();
307+
308+
let scripts = onchain_wallet.get_spks_for_cbf_sync(BDK_CLIENT_STOP_GAP);
309+
if scripts.is_empty() {
310+
log_debug!(self.logger, "No wallet scripts to sync via CBF.");
311+
return Ok(());
312+
}
313+
314+
let timeout_fut = tokio::time::timeout(
315+
Duration::from_secs(self.sync_config.timeouts_config.onchain_wallet_sync_timeout_secs),
316+
self.sync_onchain_wallet_op(requester, scripts),
317+
);
318+
319+
let (tx_update, sync_update) = match timeout_fut.await {
320+
Ok(res) => res?,
321+
Err(e) => {
322+
log_error!(self.logger, "Sync of on-chain wallet timed out: {}", e);
323+
return Err(Error::WalletOperationTimeout);
324+
},
325+
};
326+
327+
// Build chain checkpoint extending from the wallet's current tip.
328+
let mut cp = onchain_wallet.latest_checkpoint();
329+
for (height, header) in sync_update.recent_history() {
330+
if *height > cp.height() {
331+
let block_id = BlockId { height: *height, hash: header.block_hash() };
332+
cp = cp.push(block_id).unwrap_or_else(|old| old);
333+
}
334+
}
335+
let tip = sync_update.tip();
336+
if tip.height > cp.height() {
337+
let tip_block_id = BlockId { height: tip.height, hash: tip.hash };
338+
cp = cp.push(tip_block_id).unwrap_or_else(|old| old);
339+
}
340+
341+
let update = Update { last_active_indices: BTreeMap::new(), tx_update, chain: Some(cp) };
342+
343+
// Apply update to wallet.
344+
onchain_wallet.apply_update(update)?;
345+
346+
log_debug!(
347+
self.logger,
348+
"Sync of on-chain wallet via CBF finished in {}ms.",
349+
now.elapsed().as_millis()
350+
);
351+
352+
update_node_metrics_timestamp(
353+
&self.node_metrics,
354+
&*self.kv_store,
355+
&*self.logger,
356+
|m, t| {
357+
m.latest_onchain_wallet_sync_timestamp = t;
358+
},
359+
)?;
360+
361+
Ok(())
362+
}
363+
364+
async fn sync_onchain_wallet_op(
365+
&self, requester: Requester, scripts: Vec<ScriptBuf>,
366+
) -> Result<(TxUpdate<ConfirmationBlockTime>, SyncUpdate), Error> {
367+
let (sync_update, matched) = self.run_filter_scan(scripts).await?;
368+
369+
log_debug!(
370+
self.logger,
371+
"CBF on-chain filter scan complete: {} matching blocks found.",
372+
matched.len()
373+
);
374+
375+
// Fetch matching blocks and include all their transactions.
376+
// The compact block filter already matched our scripts (covering both
377+
// created outputs and spent inputs), so we include every transaction
378+
// from matched blocks and let BDK determine relevance.
379+
let mut tx_update = TxUpdate::default();
380+
for (height, block_hash) in &matched {
381+
let indexed_block = requester.get_block(*block_hash).await.map_err(|e| {
382+
log_error!(self.logger, "Failed to fetch block {}: {:?}", block_hash, e);
383+
Error::WalletOperationFailed
384+
})?;
385+
let block = indexed_block.block;
386+
let block_id = BlockId { height: *height, hash: block.header.block_hash() };
387+
let conf_time =
388+
ConfirmationBlockTime { block_id, confirmation_time: block.header.time as u64 };
389+
for tx in &block.txdata {
390+
let txid = tx.compute_txid();
391+
tx_update.txs.push(Arc::new(tx.clone()));
392+
tx_update.anchors.insert((conf_time, txid));
393+
}
394+
}
395+
396+
Ok((tx_update, sync_update))
221397
}
222398

223399
/// Sync the Lightning wallet by confirming channel transactions via compact block filters.

src/wallet/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,28 @@ impl Wallet {
122122
self.inner.lock().unwrap().start_sync_with_revealed_spks().build()
123123
}
124124

125+
pub(crate) fn get_spks_for_cbf_sync(&self, stop_gap: usize) -> Vec<ScriptBuf> {
126+
let wallet = self.inner.lock().unwrap();
127+
let mut scripts: Vec<ScriptBuf> =
128+
wallet.spk_index().revealed_spks(..).map(|((_, _), spk)| spk).collect();
129+
130+
// For first sync when no scripts have been revealed yet, generate
131+
// lookahead scripts up to the stop gap for both keychains.
132+
if scripts.is_empty() {
133+
for keychain in [KeychainKind::External, KeychainKind::Internal] {
134+
for idx in 0..stop_gap as u32 {
135+
scripts.push(wallet.peek_address(keychain, idx).address.script_pubkey());
136+
}
137+
}
138+
}
139+
140+
scripts
141+
}
142+
143+
pub(crate) fn latest_checkpoint(&self) -> bdk_chain::CheckPoint {
144+
self.inner.lock().unwrap().latest_checkpoint()
145+
}
146+
125147
pub(crate) fn get_cached_txs(&self) -> Vec<Arc<Transaction>> {
126148
self.inner.lock().unwrap().tx_graph().full_txs().map(|tx_node| tx_node.tx).collect()
127149
}

tests/common/mod.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,24 @@ pub(crate) async fn wait_for_outpoint_spend<E: ElectrumApi>(electrs: &E, outpoin
603603
.await;
604604
}
605605

606+
pub(crate) async fn wait_for_cbf_sync(node: &TestNode) {
607+
let before = node.status().latest_onchain_wallet_sync_timestamp;
608+
let mut delay = Duration::from_millis(200);
609+
for _ in 0..30 {
610+
if node.sync_wallets().is_ok() {
611+
let after = node.status().latest_onchain_wallet_sync_timestamp;
612+
if after > before {
613+
return;
614+
}
615+
}
616+
tokio::time::sleep(delay).await;
617+
if delay < Duration::from_secs(2) {
618+
delay = delay.mul_f32(1.5);
619+
}
620+
}
621+
panic!("wait_for_cbf_sync: timed out waiting for CBF sync to complete");
622+
}
623+
606624
pub(crate) async fn exponential_backoff_poll<T, F>(mut poll: F) -> T
607625
where
608626
F: FnMut() -> Option<T>,

0 commit comments

Comments
 (0)