Skip to content

Commit 90eb045

Browse files
committed
Implement syncing via bip157 (kyoto)
Add a storage of watched scripts Add a mechanism to process the filters and watched scripts Add last synced checkpoint, channel to watch its updates Make possible the kyoto node to start/stop, maintain the original peer list provided. Add header cache for not to redownload them when applying. Add wait_until_synced Add tx broadcasting. Add tests.
1 parent 7ff3c7e commit 90eb045

8 files changed

Lines changed: 3283 additions & 2753 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ serde_json = { version = "1.0.128", default-features = false, features = ["std"]
7676
log = { version = "0.4.22", default-features = false, features = ["std"]}
7777

7878
async-trait = { version = "0.1", default-features = false }
79-
bip157 = "0.3.4"
79+
bip157 = "0.4.2"
8080
vss-client = { package = "vss-client-ng", version = "0.5" }
8181
prost = { version = "0.11.6", default-features = false}
8282
#bitcoin-payment-instructions = { version = "0.6" }

src/chain/kyoto.rs

Lines changed: 293 additions & 69 deletions
Large diffs are not rendered by default.

src/chain/mod.rs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::collections::HashMap;
1414
use std::sync::{Arc, Mutex, RwLock};
1515
use std::time::Duration;
1616

17-
use bitcoin::{Script, Txid};
17+
use bitcoin::{Script, ScriptBuf, Txid};
1818
use lightning::chain::{BestBlock, Filter};
1919

2020
use crate::chain::bitcoind::{BitcoindChainSource, UtxoSourceClient};
@@ -104,7 +104,7 @@ enum ChainSourceKind {
104104
Esplora(EsploraChainSource),
105105
Electrum(ElectrumChainSource),
106106
Bitcoind(BitcoindChainSource),
107-
Kyoto(KyotoChainSource),
107+
Kyoto(KyotoChainSource),
108108
}
109109

110110
impl ChainSource {
@@ -221,9 +221,7 @@ impl ChainSource {
221221
ChainSourceKind::Electrum(electrum_chain_source) => {
222222
electrum_chain_source.start(runtime)?
223223
},
224-
ChainSourceKind::Kyoto(kyoto_chain_source) => {
225-
kyoto_chain_source.start(runtime)?
226-
},
224+
ChainSourceKind::Kyoto(kyoto_chain_source) => kyoto_chain_source.start(runtime)?,
227225
_ => {
228226
// Nothing to do for other chain sources.
229227
},
@@ -254,6 +252,16 @@ impl ChainSource {
254252
self.registered_txids.lock().unwrap().clone()
255253
}
256254

255+
/// Register a script pubkey to watch in compact block filters.
256+
///
257+
/// This is a no-op for chain backends other than BIP157/kyoto, which rely on transaction-based
258+
/// sync and don't need an explicit script watchlist.
259+
pub(crate) fn register_script(&self, script: ScriptBuf) {
260+
if let ChainSourceKind::Kyoto(kyoto_chain_source) = &self.kind {
261+
kyoto_chain_source.register_output(script);
262+
}
263+
}
264+
257265
pub(crate) fn is_transaction_based(&self) -> bool {
258266
match &self.kind {
259267
ChainSourceKind::Esplora(_) => true,
@@ -399,8 +407,7 @@ impl ChainSource {
399407
}
400408
}
401409

402-
// Synchronize the onchain wallet via transaction-based protocols (i.e., Esplora, Electrum,
403-
// etc.)
410+
// Synchronize the onchain wallet via transaction-based protocols (i.e., Esplora, Electrum, etc.)
404411
pub(crate) async fn sync_onchain_wallet(
405412
&self, onchain_wallet: Arc<Wallet>,
406413
) -> Result<(), Error> {
@@ -450,7 +457,7 @@ impl ChainSource {
450457
}
451458
}
452459

453-
pub(crate) async fn poll_and_update_listeners(
460+
pub(crate) async fn sync_listeners_to_tip(
454461
&self, onchain_wallet: Arc<Wallet>, channel_manager: Arc<ChannelManager>,
455462
chain_monitor: Arc<ChainMonitor>, output_sweeper: Arc<Sweeper>,
456463
) -> Result<(), Error> {
@@ -475,8 +482,8 @@ impl ChainSource {
475482
)
476483
.await
477484
},
478-
ChainSourceKind::Kyoto { .. } => {
479-
unreachable!("Listeners will be synced via the kyoto event loop")
485+
ChainSourceKind::Kyoto(kyoto_chain_source) => {
486+
kyoto_chain_source.wait_until_synced().await
480487
},
481488
}
482489
}
@@ -544,7 +551,9 @@ impl Filter for ChainSource {
544551
electrum_chain_source.register_tx(txid, script_pubkey)
545552
},
546553
ChainSourceKind::Bitcoind { .. } => (),
547-
ChainSourceKind::Kyoto { .. } => (),
554+
ChainSourceKind::Kyoto(kyoto_chain_source) => {
555+
kyoto_chain_source.register_output(script_pubkey.to_owned())
556+
},
548557
}
549558
}
550559
fn register_output(&self, output: lightning::chain::WatchedOutput) {
@@ -556,7 +565,9 @@ impl Filter for ChainSource {
556565
electrum_chain_source.register_output(output)
557566
},
558567
ChainSourceKind::Bitcoind { .. } => (),
559-
ChainSourceKind::Kyoto { .. } => (),
568+
ChainSourceKind::Kyoto(kyoto_chain_source) => {
569+
kyoto_chain_source.register_output(output.script_pubkey)
570+
},
560571
}
561572
}
562573
}

src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,9 @@ impl Node {
272272
let chain_source = Arc::clone(&self.chain_source);
273273
self.runtime.block_on(async move { chain_source.update_fee_rate_estimates().await })?;
274274

275+
// For BIP157 backends: seed kyoto's script watchlist with all previously-derived wallet addresses
276+
self.wallet.seed_watched_scripts();
277+
275278
// Spawn background task continuously syncing onchain, lightning, and fee rate cache.
276279
let stop_sync_receiver = self.stop_sender.subscribe();
277280
let chain_source = Arc::clone(&self.chain_source);
@@ -1642,7 +1645,7 @@ impl Node {
16421645
} else {
16431646
chain_source.update_fee_rate_estimates().await?;
16441647
chain_source
1645-
.poll_and_update_listeners(
1648+
.sync_listeners_to_tip(
16461649
sync_wallet,
16471650
sync_cman,
16481651
sync_cmon,

src/wallet/mod.rs

Lines changed: 110 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::future::Future;
99
use std::ops::Deref;
1010
use std::str::FromStr;
1111
use std::sync::{Arc, Mutex};
12+
use std::time::{SystemTime, UNIX_EPOCH};
1213

1314
use bdk_chain::spk_client::{FullScanRequest, SyncRequest};
1415
use bdk_wallet::descriptor::ExtendedDescriptor;
@@ -122,6 +123,35 @@ impl Wallet {
122123
self.inner.lock().unwrap().start_sync_with_revealed_spks().build()
123124
}
124125

126+
/// Register all previously-revealed wallet scripts into the chain source's script watchlist.
127+
///
128+
/// For BIP157 backends the kyoto compact-filter scanner only checks scripts that have been
129+
/// explicitly registered. When the node restarts, previously-derived addresses need to be
130+
/// re-seeded so that blocks containing payments to those addresses are not silently skipped.
131+
/// This is a no-op for non-BIP157 backends.
132+
pub(crate) fn seed_watched_scripts(&self) {
133+
let locked_wallet = self.inner.lock().unwrap();
134+
for ((_, _), spk) in locked_wallet.spk_index().revealed_spks(..) {
135+
self.chain_source.register_script(spk);
136+
}
137+
}
138+
139+
/// Advance the wallet's internal [`LocalChain`] to include the given block header.
140+
///
141+
/// Called by the BIP157 backend for every `ChainUpdate::Connected` event so the chain stays
142+
/// in sync with kyoto's header stream. Without this, subsequent `block_connected` calls would
143+
/// fail because BDK requires the parent block to already be in the chain before connecting the
144+
/// next block.
145+
pub(crate) fn apply_header(&self, header: &bitcoin::block::Header, height: u32) {
146+
let mut locked_wallet = self.inner.lock().unwrap();
147+
let block_id = bdk_chain::BlockId { height, hash: header.block_hash() };
148+
let latest_cp = locked_wallet.latest_checkpoint().insert(block_id);
149+
let update = bdk_wallet::Update { chain: Some(latest_cp), ..Default::default() };
150+
if let Err(e) = locked_wallet.apply_update(update) {
151+
log_error!(self.logger, "BIP157: Failed to apply header at height {}: {}", height, e);
152+
}
153+
}
154+
125155
pub(crate) fn get_cached_txs(&self) -> Vec<Arc<Transaction>> {
126156
self.inner.lock().unwrap().tx_graph().full_txs().map(|tx_node| tx_node.tx).collect()
127157
}
@@ -329,13 +359,17 @@ impl Wallet {
329359
let tx_refs: Vec<(
330360
&Transaction,
331361
lightning::chain::chaininterface::TransactionType,
332-
)> =
333-
txs_to_broadcast
334-
.iter()
335-
.map(|tx| {
336-
(tx, lightning::chain::chaininterface::TransactionType::Sweep { channels: vec![] })
337-
})
338-
.collect();
362+
)> = txs_to_broadcast
363+
.iter()
364+
.map(|tx| {
365+
(
366+
tx,
367+
lightning::chain::chaininterface::TransactionType::Sweep {
368+
channels: vec![],
369+
},
370+
)
371+
})
372+
.collect();
339373
self.broadcaster.broadcast_transactions(&tx_refs);
340374
log_info!(
341375
self.logger,
@@ -426,44 +460,50 @@ impl Wallet {
426460
) -> Result<Transaction, Error> {
427461
let fee_rate = self.fee_estimator.estimate_fee_rate(confirmation_target);
428462

429-
let mut locked_wallet = self.inner.lock().unwrap();
430-
let mut tx_builder = locked_wallet.build_tx();
463+
let tx = {
464+
let mut locked_wallet = self.inner.lock().unwrap();
465+
let mut tx_builder = locked_wallet.build_tx();
431466

432-
tx_builder.add_recipient(output_script, amount).fee_rate(fee_rate).nlocktime(locktime);
467+
tx_builder.add_recipient(output_script, amount).fee_rate(fee_rate).nlocktime(locktime);
433468

434-
let mut psbt = match tx_builder.finish() {
435-
Ok(psbt) => {
436-
log_trace!(self.logger, "Created funding PSBT: {:?}", psbt);
437-
psbt
438-
},
439-
Err(err) => {
440-
log_error!(self.logger, "Failed to create funding transaction: {}", err);
441-
return Err(err.into());
442-
},
443-
};
469+
let mut psbt = match tx_builder.finish() {
470+
Ok(psbt) => {
471+
log_trace!(self.logger, "Created funding PSBT: {:?}", psbt);
472+
psbt
473+
},
474+
Err(err) => {
475+
log_error!(self.logger, "Failed to create funding transaction: {}", err);
476+
return Err(err.into());
477+
},
478+
};
444479

445-
match locked_wallet.sign(&mut psbt, SignOptions::default()) {
446-
Ok(finalized) => {
447-
if !finalized {
448-
return Err(Error::OnchainTxCreationFailed);
449-
}
450-
},
451-
Err(err) => {
452-
log_error!(self.logger, "Failed to create funding transaction: {}", err);
453-
return Err(err.into());
454-
},
455-
}
480+
match locked_wallet.sign(&mut psbt, SignOptions::default()) {
481+
Ok(finalized) => {
482+
if !finalized {
483+
return Err(Error::OnchainTxCreationFailed);
484+
}
485+
},
486+
Err(err) => {
487+
log_error!(self.logger, "Failed to create funding transaction: {}", err);
488+
return Err(err.into());
489+
},
490+
}
456491

457-
let mut locked_persister = self.persister.lock().unwrap();
458-
locked_wallet.persist(&mut locked_persister).map_err(|e| {
459-
log_error!(self.logger, "Failed to persist wallet: {}", e);
460-
Error::PersistenceFailed
461-
})?;
492+
let mut locked_persister = self.persister.lock().unwrap();
493+
locked_wallet.persist(&mut locked_persister).map_err(|e| {
494+
log_error!(self.logger, "Failed to persist wallet: {}", e);
495+
Error::PersistenceFailed
496+
})?;
462497

463-
let tx = psbt.extract_tx().map_err(|e| {
464-
log_error!(self.logger, "Failed to extract transaction: {}", e);
465-
e
466-
})?;
498+
psbt.extract_tx().map_err(|e| {
499+
log_error!(self.logger, "Failed to extract transaction: {}", e);
500+
e
501+
})?
502+
}; // locked_wallet drops here
503+
504+
// Re-seed watched scripts so the BIP-157 chain source learns about any
505+
// change addresses that BDK derived internally when building the tx.
506+
self.seed_watched_scripts();
467507

468508
Ok(tx)
469509
}
@@ -473,6 +513,8 @@ impl Wallet {
473513
let mut locked_persister = self.persister.lock().unwrap();
474514

475515
let address_info = locked_wallet.reveal_next_address(KeychainKind::External);
516+
// For BIP157 backends, register the script so kyoto watches it in compact filters.
517+
self.chain_source.register_script(address_info.address.script_pubkey());
476518
locked_wallet.persist(&mut locked_persister).map_err(|e| {
477519
log_error!(self.logger, "Failed to persist wallet: {}", e);
478520
Error::PersistenceFailed
@@ -485,6 +527,8 @@ impl Wallet {
485527
let mut locked_persister = self.persister.lock().unwrap();
486528

487529
let address_info = locked_wallet.next_unused_address(KeychainKind::Internal);
530+
// For BIP157 backends, register the script so kyoto watches it in compact filters.
531+
self.chain_source.register_script(address_info.address.script_pubkey());
488532
locked_wallet.persist(&mut locked_persister).map_err(|e| {
489533
log_error!(self.logger, "Failed to persist wallet: {}", e);
490534
Error::PersistenceFailed
@@ -811,7 +855,8 @@ impl Wallet {
811855
let (sent, received) = locked_wallet.sent_and_received(&psbt.unsigned_tx);
812856
let drain_amount = sent - received;
813857
if spendable_amount_sats < drain_amount.to_sat() {
814-
log_error!(self.logger,
858+
log_error!(
859+
self.logger,
815860
"Unable to send payment due to insufficient funds. Available: {}sats, Required: {}",
816861
spendable_amount_sats,
817862
drain_amount,
@@ -846,13 +891,30 @@ impl Wallet {
846891
})?
847892
};
848893

894+
// Re-seed watched scripts so the BIP-157 chain source learns about any
895+
// change addresses that BDK derived internally when building the tx.
896+
self.seed_watched_scripts();
897+
849898
self.broadcaster.broadcast_transactions(&[(
850899
&tx,
851900
lightning::chain::chaininterface::TransactionType::Sweep { channels: vec![] },
852901
)]);
853902

854903
let txid = tx.compute_txid();
855904

905+
// Eagerly register the just-sent tx as unconfirmed in BDK so the payment
906+
// store entry is immediately visible via Node::payment(). Without this,
907+
// backends without mempool support (e.g. BIP157) would only see the payment
908+
// after the tx is confirmed in a block.
909+
let now_secs = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
910+
if let Err(e) = self.apply_mempool_txs(vec![(tx, now_secs)], vec![]) {
911+
log_error!(
912+
self.logger,
913+
"Failed to eagerly apply outgoing transaction to payment store: {:?}",
914+
e
915+
);
916+
}
917+
856918
match send_amount {
857919
OnchainSendAmount::ExactRetainingReserve { amount_sats, .. } => {
858920
log_info!(
@@ -1336,7 +1398,13 @@ impl Wallet {
13361398
final_fee_rate_sat_per_kwu.saturating_mul(3).saturating_div(2),
13371399
);
13381400
if required_fee_rate > max_allowed_fee_rate {
1339-
log_error!( self.logger, "BDK required fee rate {} exceeds sanity cap {} (1.5x our estimate) for tx {}", required_fee_rate, max_allowed_fee_rate, txid );
1401+
log_error!(
1402+
self.logger,
1403+
"BDK required fee rate {} exceeds sanity cap {} (1.5x our estimate) for tx {}",
1404+
required_fee_rate,
1405+
max_allowed_fee_rate,
1406+
txid
1407+
);
13401408
return Err(Error::InvalidFeeRate);
13411409
}
13421410

0 commit comments

Comments
 (0)