Skip to content

Commit 6755483

Browse files
committed
feat(cbf): update bdk-kyoto to 0.9.0
- refactor syncing into a fn - made `skip-blocks` optional and removed default value to use bdk-kyoto Sync scan type
1 parent 7706b04 commit 6755483

6 files changed

Lines changed: 113 additions & 294 deletions

File tree

Cargo.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ tokio = { version = "1", features = ["full"] }
2525
bdk_bitcoind_rpc = { version = "0.18.0", optional = true }
2626
bdk_electrum = { version = "0.21.0", optional = true }
2727
bdk_esplora = { version = "0.20.1", features = ["async-https", "tokio"], optional = true }
28-
bdk_kyoto = { version = "0.7.1", optional = true }
28+
bdk_kyoto = { version = "0.9.0", optional = true }
2929
shlex = { version = "1.3.0", optional = true }
3030
tracing = "0.1.41"
3131
tracing-subscriber = "0.3.19"

src/commands.rs

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,7 @@ use bdk_wallet::bitcoin::{
2020
};
2121
use clap::{value_parser, Args, Parser, Subcommand, ValueEnum};
2222

23-
#[cfg(any(
24-
feature = "cbf",
25-
feature = "electrum",
26-
feature = "esplora",
27-
feature = "rpc"
28-
))]
23+
#[cfg(any(feature = "electrum", feature = "esplora", feature = "rpc"))]
2924
use crate::utils::parse_proxy_auth;
3025
use crate::utils::{parse_address, parse_outpoint, parse_recipient};
3126

@@ -214,7 +209,7 @@ pub struct WalletOpts {
214209
}
215210

216211
/// Options to configure a SOCKS5 proxy for a blockchain client connection.
217-
#[cfg(any(feature = "cbf", feature = "electrum", feature = "esplora"))]
212+
#[cfg(any(feature = "electrum", feature = "esplora"))]
218213
#[derive(Debug, Args, Clone, PartialEq, Eq)]
219214
pub struct ProxyOpts {
220215
/// Sets the SOCKS5 proxy for a blockchain client.
@@ -244,17 +239,12 @@ pub struct ProxyOpts {
244239
#[derive(Debug, Args, Clone, PartialEq, Eq)]
245240
pub struct CompactFilterOpts {
246241
/// Sets the number of parallel node connections.
247-
#[clap(name = "CONNECTIONS", long = "cbf-conn-count", default_value = "4")]
242+
#[clap(name = "CONNECTIONS", long = "cbf-conn-count", default_value = "4", value_parser = value_parser!(u8).range(1..=15))]
248243
pub conn_count: u8,
249244

250245
/// Optionally skip initial `skip_blocks` blocks.
251-
#[clap(
252-
env = "SKIP_BLOCKS",
253-
short = 'k',
254-
long = "cbf-skip-blocks",
255-
default_value = "0"
256-
)]
257-
pub skip_blocks: u32,
246+
#[clap(env = "SKIP_BLOCKS", short = 'k', long = "cbf-skip-blocks")]
247+
pub skip_blocks: Option<u32>,
258248
}
259249

260250
/// Wallet subcommands that can be issued without a blockchain backend.

src/error.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,6 @@ pub enum BDKCliError {
8787
BitcoinCoreRpcError(#[from] bdk_bitcoind_rpc::bitcoincore_rpc::Error),
8888

8989
#[cfg(feature = "cbf")]
90-
#[error("Kyoto-cbf error: {0}")]
91-
SqlInitializationError(#[from] bdk_kyoto::kyoto::db::error::SqlInitializationError),
90+
#[error("BDK-Kyoto error: {0}")]
91+
BuilderError(#[from] bdk_kyoto::builder::BuilderError),
9292
}

src/handlers.rs

Lines changed: 3 additions & 254 deletions
Original file line numberDiff line numberDiff line change
@@ -420,102 +420,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
420420
}
421421
#[cfg(feature = "cbf")]
422422
KyotoClient { client } => {
423-
let LightClient {
424-
requester,
425-
log_subscriber,
426-
warning_subscriber,
427-
mut update_subscriber,
428-
node,
429-
} = client;
430-
431-
let subscriber = tracing_subscriber::FmtSubscriber::new();
432-
tracing::subscriber::set_global_default(subscriber)
433-
.map_err(|e| Error::Generic(format!("SetGlobalDefault error: {}", e)))?;
434-
435-
tokio::task::spawn(async move { node.run().await });
436-
tokio::task::spawn(async move {
437-
trace_logger(log_subscriber, warning_subscriber).await
438-
});
439-
440-
if !requester.is_running().await {
441-
tracing::error!("Kyoto node is not running");
442-
return Err(Error::Generic("Kyoto node failed to start".to_string()));
443-
}
444-
tracing::info!("Kyoto node is running");
445-
446-
tracing::info!(
447-
"Initial derivation indices: External={}, Internal={}",
448-
wallet.derivation_index(KeychainKind::External).unwrap_or(0),
449-
wallet.derivation_index(KeychainKind::Internal).unwrap_or(0)
450-
);
451-
452-
tracing::info!("Adding wallet scripts to node");
453-
requester.add_revealed_scripts(&wallet).await.map_err(|e| {
454-
Error::Generic(format!("Failed to add wallet scripts: {}", e))
455-
})?;
456-
457-
let mut updates_applied = false;
458-
let mut initial_tx_count = wallet.transactions().count();
459-
let mut iteration_count = 0;
460-
const MAX_ITERATIONS: u32 = 600;
461-
462-
loop {
463-
iteration_count += 1;
464-
tracing::debug!(
465-
"Loop iteration {}, updates_applied: {}",
466-
iteration_count,
467-
updates_applied
468-
);
469-
470-
if iteration_count > MAX_ITERATIONS && !updates_applied {
471-
tracing::error!(
472-
"Timeout: No updates received after {} iterations",
473-
MAX_ITERATIONS
474-
);
475-
return Err(Error::Generic(
476-
"Sync timed out: no updates received".to_string(),
477-
));
478-
}
479-
480-
match update_subscriber.update().await {
481-
Some(update) => {
482-
tracing::info!("Received update: applying to wallet");
483-
wallet.apply_update(update).map_err(|e| {
484-
Error::Generic(format!("Failed to apply update: {}", e))
485-
})?;
486-
updates_applied = true;
487-
488-
tracing::info!(
489-
"Tx count: {}, Balance: {}, Chain tip: {}",
490-
wallet.transactions().count(),
491-
wallet.balance().total().to_sat(),
492-
wallet.local_chain().tip().height()
493-
);
494-
495-
let current_tx_count = wallet.transactions().count();
496-
if current_tx_count == initial_tx_count && updates_applied {
497-
tracing::info!("No new transactions found; sync complete");
498-
break;
499-
}
500-
initial_tx_count = current_tx_count;
501-
}
502-
None => {
503-
tracing::debug!("No update received, waiting");
504-
if updates_applied {
505-
tracing::info!("No further updates available");
506-
break;
507-
}
508-
// Wait briefly to avoid tight loop
509-
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
510-
}
511-
}
512-
}
513-
514-
tracing::info!(
515-
"Sync completed: tx_count={}, balance={}",
516-
wallet.transactions().count(),
517-
wallet.balance().total().to_sat()
518-
);
423+
sync_kyoto_client(wallet, client).await?;
519424
}
520425
}
521426
Ok(json!({}))
@@ -583,61 +488,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
583488
}
584489
#[cfg(feature = "cbf")]
585490
KyotoClient { client } => {
586-
let LightClient {
587-
requester,
588-
log_subscriber,
589-
warning_subscriber,
590-
mut update_subscriber,
591-
node,
592-
} = client;
593-
594-
let subscriber = tracing_subscriber::FmtSubscriber::new();
595-
tracing::subscriber::set_global_default(subscriber)
596-
.map_err(|e| Error::Generic(format!("SetGlobalDefault error: {}", e)))?;
597-
598-
tokio::task::spawn(async move { node.run().await });
599-
tokio::task::spawn(async move {
600-
trace_logger(log_subscriber, warning_subscriber).await
601-
});
602-
603-
requester
604-
.add_revealed_scripts(&wallet)
605-
.await
606-
.map_err(|e| Error::Generic(format!("Failed to add script: {}", e)))?;
607-
608-
let mut updates_applied = false;
609-
let mut initial_tx_count = wallet.transactions().count();
610-
611-
loop {
612-
match update_subscriber.update().await {
613-
Some(update) => {
614-
wallet.apply_update(update).map_err(|e| {
615-
Error::Generic(format!("Failed to apply update: {}", e))
616-
})?;
617-
updates_applied = true;
618-
619-
tracing::info!(
620-
"Chain tip: {}, Transactions: {}",
621-
wallet.local_chain().tip().height(),
622-
wallet.transactions().count()
623-
);
624-
625-
let current_tx_count = wallet.transactions().count();
626-
if current_tx_count == initial_tx_count && updates_applied {
627-
tracing::info!("No new transactions found; sync complete");
628-
break;
629-
}
630-
initial_tx_count = current_tx_count;
631-
}
632-
None => {
633-
if updates_applied {
634-
tracing::info!("No further updates available");
635-
break;
636-
}
637-
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
638-
}
639-
}
640-
}
491+
sync_kyoto_client(wallet, client).await?;
641492
}
642493
}
643494
Ok(json!({}))
@@ -684,109 +535,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
684535

685536
#[cfg(feature = "cbf")]
686537
KyotoClient { client } => {
687-
let LightClient {
688-
requester,
689-
log_subscriber,
690-
warning_subscriber,
691-
mut update_subscriber,
692-
node,
693-
} = client;
694-
695-
let subscriber = tracing_subscriber::FmtSubscriber::new();
696-
tracing::subscriber::set_global_default(subscriber)
697-
.map_err(|e| Error::Generic(format!("SetGlobalDefault error: {}", e)))?;
698-
699-
tokio::task::spawn(async move { node.run().await });
700-
tokio::task::spawn(async move {
701-
trace_logger(log_subscriber, warning_subscriber).await
702-
});
703-
704-
// Wait for peer connections to establish
705-
const PEER_CONNECTION_DELAY: u64 = 60;
706-
tracing::info!(
707-
"Waiting {} seconds for peer connections",
708-
PEER_CONNECTION_DELAY
709-
);
710-
tokio::time::sleep(std::time::Duration::from_secs(PEER_CONNECTION_DELAY)).await;
711-
712-
tracing::info!("Proceeding with transaction broadcast");
713-
714-
// Broadcast the transaction
715-
let broadcasted_txid = tx.compute_txid();
716-
tracing::info!("Broadcasting transaction: {}", broadcasted_txid);
717-
requester
718-
.broadcast_tx(TxBroadcast {
719-
tx: tx.clone(),
720-
broadcast_policy: RandomPeer,
721-
})
722-
.await
723-
.map_err(|e| {
724-
tracing::error!("Failed to broadcast transaction: {}", e);
725-
Error::Generic(format!("Failed to broadcast transaction: {}", e))
726-
})?;
727-
tracing::info!("Transaction broadcasted successfully");
728-
729-
// Perform a sync to ensure transaction propagation
730-
tracing::info!("Starting sync to confirm transaction in mempool",);
731-
requester.add_revealed_scripts(&wallet).await.map_err(|e| {
732-
Error::Generic(format!("Failed to add wallet scripts: {}", e))
733-
})?;
734-
735-
let chain_tip = wallet.local_chain().tip();
736-
tracing::info!(
737-
"Starting sync to confirm transaction {} in mempool from chain tip: height={}, hash={}",
738-
broadcasted_txid,
739-
chain_tip.height(),
740-
chain_tip.hash()
741-
);
742-
743-
requester.add_revealed_scripts(&wallet).await.map_err(|e| {
744-
Error::Generic(format!("Failed to add wallet scripts: {}", e))
745-
})?;
746-
747-
let mut updates_applied = false;
748-
loop {
749-
match update_subscriber.update().await {
750-
Some(update) => {
751-
wallet.apply_update(update).map_err(|e| {
752-
Error::Generic(format!("Failed to apply update: {}", e))
753-
})?;
754-
updates_applied = true;
755-
tracing::info!(
756-
"Applied update: tx_count={}, balance={}, chain_tip={}",
757-
wallet.transactions().count(),
758-
wallet.balance().total().to_sat(),
759-
wallet.local_chain().tip().height()
760-
);
761-
break;
762-
}
763-
None => {
764-
if updates_applied {
765-
tracing::info!("No further updates available during sync");
766-
break;
767-
}
768-
tracing::debug!("No update received, waiting");
769-
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
770-
}
771-
}
772-
}
773-
774-
if !wallet
775-
.transactions()
776-
.any(|tx| tx.tx_node.txid == broadcasted_txid)
777-
{
778-
tracing::warn!(
779-
"Transaction {} not found in wallet after sync; may not be propagated",
780-
broadcasted_txid
781-
);
782-
} else {
783-
tracing::info!(
784-
"Sync completed; transaction {} likely propagated",
785-
broadcasted_txid
786-
);
787-
}
788-
789-
tx.compute_txid()
538+
unimplemented!()
790539
}
791540
};
792541
Ok(json!({ "txid": txid }))

0 commit comments

Comments
 (0)