Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ pub struct ProxyOpts {
#[derive(Debug, Args, Clone, PartialEq, Eq)]
pub struct CompactFilterOpts {
/// Sets the number of parallel node connections.
#[clap(name = "CONNECTIONS", long = "cbf-conn-count", default_value = "4", value_parser = value_parser!(u8).range(1..=15))]
#[clap(name = "CONNECTIONS", long = "cbf-conn-count", default_value = "2", value_parser = value_parser!(u8).range(1..=15))]
pub conn_count: u8,

/// Optionally skip initial `skip_blocks` blocks.
Expand Down
9 changes: 8 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use bdk_wallet::bitcoin::hex::HexToBytesError;
use bdk_wallet::bitcoin::psbt::ExtractTxError;
use bdk_wallet::bitcoin::{base64, consensus};
use thiserror::Error;

Expand Down Expand Up @@ -51,7 +52,7 @@ pub enum BDKCliError {
ParseOutPointError(#[from] bdk_wallet::bitcoin::blockdata::transaction::ParseOutPointError),

#[error("PsbtExtractTxError: {0}")]
PsbtExtractTxError(#[from] bdk_wallet::bitcoin::psbt::ExtractTxError),
PsbtExtractTxError(Box<ExtractTxError>),

#[error("PsbtError: {0}")]
PsbtError(#[from] bdk_wallet::bitcoin::psbt::Error),
Expand Down Expand Up @@ -90,3 +91,9 @@ pub enum BDKCliError {
#[error("BDK-Kyoto error: {0}")]
BuilderError(#[from] bdk_kyoto::builder::BuilderError),
}

impl From<ExtractTxError> for BDKCliError {
fn from(value: ExtractTxError) -> Self {
BDKCliError::PsbtExtractTxError(Box::new(value))
}
}
89 changes: 78 additions & 11 deletions src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,17 @@ use std::collections::BTreeMap;
#[cfg(any(feature = "electrum", feature = "esplora"))]
use std::collections::HashSet;
use std::convert::TryFrom;
#[cfg(feature = "repl")]
#[cfg(any(feature = "repl", feature = "electrum", feature = "esplora"))]
use std::io::Write;
use std::str::FromStr;

#[cfg(feature = "electrum")]
use crate::utils::BlockchainClient::Electrum;
#[cfg(feature = "cbf")]
use bdk_kyoto::{Info, LightClient};
use bdk_wallet::bitcoin::base64::prelude::*;
#[cfg(feature = "cbf")]
use tokio::select;
#[cfg(any(
feature = "electrum",
feature = "esplora",
Expand Down Expand Up @@ -507,7 +511,6 @@ pub(crate) async fn handle_online_wallet_subcommand(
(Some(_), Some(_)) => panic!("Both `psbt` and `tx` options not allowed"),
(None, None) => panic!("Missing `psbt` and `tx` option"),
};

let txid = match client {
#[cfg(feature = "electrum")]
Electrum {
Expand All @@ -531,8 +534,69 @@ pub(crate) async fn handle_online_wallet_subcommand(
.map_err(|e| Error::Generic(e.to_string()))?,

#[cfg(feature = "cbf")]
KyotoClient { client: _ } => {
unimplemented!()
KyotoClient { client } => {
let LightClient {
requester,
mut log_subscriber,
mut info_subscriber,
mut warning_subscriber,
update_subscriber: _,
node,
} = client;

let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber)
.map_err(|e| Error::Generic(format!("SetGlobalDefault error: {}", e)))?;

tokio::task::spawn(async move { node.run().await });
tokio::task::spawn(async move {
select! {
Comment thread
rustaceanrob marked this conversation as resolved.
log = log_subscriber.recv() => {
if let Some(log) = log {
tracing::info!("{log}");
}
},
warn = warning_subscriber.recv() => {
if let Some(warn) = warn {
tracing::warn!("{warn}");
}
}
}
});
let txid = tx.compute_txid();
tracing::info!("Waiting for connections to broadcast...");
while let Some(info) = info_subscriber.recv().await {
match info {
Info::ConnectionsMet => {
requester
.broadcast_random(tx.clone())
.map_err(|e| Error::Generic(format!("{}", e)))?;
break;
}
_ => tracing::info!("{info}"),
}
}
tokio::time::timeout(tokio::time::Duration::from_secs(15), async move {
while let Some(info) = info_subscriber.recv().await {
match info {
Info::TxGossiped(wtxid) => {
tracing::info!("Successfully broadcast WTXID: {wtxid}");
break;
}
Info::ConnectionsMet => {
tracing::info!("Rebroadcasting to new connections");
requester.broadcast_random(tx.clone()).unwrap();
}
_ => tracing::info!("{info}"),
}
}
})
.await
.map_err(|_| {
tracing::warn!("Broadcast was unsuccessful");
Error::Generic("Transaction broadcast timed out after 15 seconds".into())
})?;
txid
}
};
Ok(json!({ "txid": txid }))
Expand Down Expand Up @@ -681,11 +745,11 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
subcommand: WalletSubCommand::OnlineWalletSubCommand(online_subcommand),
} => {
let network = cli_opts.network;
let home_dir = prepare_home_dir(cli_opts.datadir)?;
let wallet_name = &wallet_opts.wallet;
let database_path = prepare_wallet_db_dir(wallet_name, &home_dir)?;
#[cfg(feature = "sqlite")]
let result = {
let home_dir = prepare_home_dir(cli_opts.datadir)?;
let wallet_name = &wallet_opts.wallet;
let database_path = prepare_wallet_db_dir(wallet_name, &home_dir)?;
let mut persister = match &wallet_opts.database_type {
#[cfg(feature = "sqlite")]
DatabaseType::Sqlite => {
Expand All @@ -698,7 +762,7 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {

let mut wallet = new_persisted_wallet(network, &mut persister, &wallet_opts)?;
let blockchain_client =
new_blockchain_client(&wallet_opts, &wallet, Some(database_path))?;
new_blockchain_client(&wallet_opts, &wallet, database_path)?;

let result = handle_online_wallet_subcommand(
&mut wallet,
Expand All @@ -711,6 +775,9 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
};
#[cfg(not(any(feature = "sqlite")))]
let result = {
let wallet = new_wallet(network, &wallet_opts)?;
let blockchain_client =
crate::utils::new_blockchain_client(&wallet_opts, &wallet, database_path)?;
let mut wallet = new_wallet(network, &wallet_opts)?;
handle_online_wallet_subcommand(&mut wallet, blockchain_client, online_subcommand)
.await?
Expand Down Expand Up @@ -807,7 +874,7 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
&mut wallet,
&wallet_opts,
line,
Some(database_path.clone()),
database_path.clone(),
)
.await;
#[cfg(feature = "sqlite")]
Expand Down Expand Up @@ -840,7 +907,7 @@ async fn respond(
wallet: &mut Wallet,
wallet_opts: &WalletOpts,
line: &str,
_datadir: Option<std::path::PathBuf>,
_datadir: std::path::PathBuf,
) -> Result<bool, String> {
use clap::Parser;

Expand All @@ -857,7 +924,7 @@ async fn respond(
subcommand: WalletSubCommand::OnlineWalletSubCommand(online_subcommand),
} => {
let blockchain =
new_blockchain_client(wallet_opts, &wallet, _datadir).map_err(|e| e.to_string())?;
new_blockchain_client(wallet_opts, wallet, _datadir).map_err(|e| e.to_string())?;
let value = handle_online_wallet_subcommand(wallet, blockchain, online_subcommand)
.await
.map_err(|e| e.to_string())?;
Expand Down
19 changes: 8 additions & 11 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
use crate::error::BDKCliError as Error;
use std::str::FromStr;

#[cfg(feature = "sqlite")]
use std::path::{Path, PathBuf};

use crate::commands::WalletOpts;
Expand Down Expand Up @@ -76,11 +75,11 @@ pub(crate) fn parse_address(address_str: &str) -> Result<Address, Error> {
Ok(unchecked_address.assume_checked())
}

#[cfg(feature = "sqlite")]
/// Prepare bdk-cli home directory
///
/// This function is called to check if [`crate::CliOpts`] datadir is set.
/// If not the default home directory is created at `~/.bdk-bitcoin`.
#[allow(dead_code)]
pub(crate) fn prepare_home_dir(home_path: Option<PathBuf>) -> Result<PathBuf, Error> {
let dir = home_path.unwrap_or_else(|| {
let mut dir = PathBuf::new();
Expand All @@ -101,11 +100,11 @@ pub(crate) fn prepare_home_dir(home_path: Option<PathBuf>) -> Result<PathBuf, Er
}

/// Prepare wallet database directory.
#[cfg(feature = "sqlite")]
#[allow(dead_code)]
pub(crate) fn prepare_wallet_db_dir(
wallet_name: &Option<String>,
home_path: &Path,
) -> Result<PathBuf, Error> {
) -> Result<std::path::PathBuf, Error> {
let mut dir = home_path.to_owned();
if let Some(wallet_name) = wallet_name {
dir.push(wallet_name);
Expand Down Expand Up @@ -153,8 +152,8 @@ pub(crate) enum BlockchainClient {
/// Create a new blockchain from the wallet configuration options.
pub(crate) fn new_blockchain_client(
wallet_opts: &WalletOpts,
wallet: &Wallet,
datadir: Option<std::path::PathBuf>,
_wallet: &Wallet,
_datadir: PathBuf,
) -> Result<BlockchainClient, Error> {
#[cfg(any(feature = "electrum", feature = "esplora", feature = "rpc"))]
let url = wallet_opts.url.as_str();
Expand Down Expand Up @@ -200,14 +199,12 @@ pub(crate) fn new_blockchain_client(
None => Sync,
};

let mut builder = NodeBuilder::new(wallet.network());
let builder = NodeBuilder::new(_wallet.network());

if let Some(datadir) = datadir {
builder = builder.data_dir(&datadir);
};
let client = builder
.required_peers(wallet_opts.compactfilter_opts.conn_count)
.build_with_wallet(wallet, scan_type)?;
.data_dir(&_datadir)
.build_with_wallet(_wallet, scan_type)?;

BlockchainClient::KyotoClient { client }
}
Expand Down
Loading