Skip to content

Commit 368b8b4

Browse files
committed
Merge #230: Refactor sync_kyoto_client
974c8d5 refactor(payjoin): add payjoin error variants to BDKCliError (Mshehu5) 35d8313 refactor(payjoin): implement polling-based monitoring with timeout (Mshehu5) b88426a refactor: use BlockchainClient as references (Mshehu5) 99b71fd refactor: use handle pattern for Kyoto client (Mshehu5) Pull request description: <!-- You can erase any parts of this template not applicable to your Pull Request. --> ### Description <!-- Describe the purpose of this PR, what's being adding and/or fixed --> This PR addresses issues encountered while implementing persistence for Payjoin specifically around the BlockchainClient only being an owned variable rather than being able to be borrowed/referenced as &Blockchainclient. While working on persistence I ran into problems while working on resume command which needs a blockchain client to resume states such as monitor_payjoin_proposal (receiver) and process_payjoin_proposal (sender) Because BlockchainClient can only be owned the current design will require a function signature of passing two separate clients to resume sender and receiver states. I initially considered splitting the command into resume_send and resume_receive but this does not fully solve the issue. In particular the sender’s process_payjoin_proposal may call broadcast_transaction and potentially broadcast multiple transactions for persisted send entries stored in the database which still requires reusable access to the client. This Ownership issue was previously mentioned in #200 and is also noted in a comment at the top of monitor_payjoin_proposal. It prevents the function from being able to resync multiple times and reliably detect when a transaction appears in the mempool. The root cause is that the Kyoto client Box<LightClient> is destructured and spawned into other tasks when passed through sync_kyoto_client making it unusable afterward. What this PR changes This PR fixes the issue by refactoring sync_kyoto_client - The logic responsible for running the Kyoto node and logger is moved into new_blockchain_client. This makes it that node is started at start of command and not during every sync - Instead of returning a Box<lightClient> the function now returns a KyotoClientHandle. Previously the boxed client takes ownership when destructured inside sync_kyoto_client, preventing reuse/reference. With the new design sync_kyoto_client takes &KyotoClientHandle, allowing the client to be Refrenced which can be used for syncing and broadcasting transactions without being owned - Additionally monitor_payjoin_proposal is refactored to support resyncing demonstrating that the Kyoto client refactor successfully resolves the original limitations ### Notes to the reviewers <!-- In this section you can include notes directed to the reviewers, like explaining why some parts of the PR were done in a specific way --> After refactor I tested the kyoto client on regtest (Cause I do not have access to a signet) I had to set a trusted peer in the code to connect with a cbf count of 1 this worked and I also made transaction using the steps below: N.B Payjoin was also tested for the monitor_payjoin_proposal refactor using steps in project readme ``` bitcoin.conf regtest=1 server=1 rpcuser=user rpcpassword=password rpcallowip=127.0.0.1 blockfilterindex=1 listen=1 fallbackfee=0.001 [regtest] bind=127.0.0.1 port=18444 peerblockfilters=1 ``` Step 1: Create transaction ``` PSBT=$(cargo run --features cbf,sqlite -- \ --network $NETWORK \ wallet \ --wallet sender_wallet \ --ext-descriptor "$SENDER_EXT_DESC" \ --int-descriptor "$SENDER_INT_DESC" \ --database-type $DATABASE_TYPE \ create_tx --to $RECEIVER_ADDR:50000 --fee_rate 1.0 | jq -r '.psbt') ``` Step 2: Sign transaction ``` SIGNED_PSBT=$(cargo run --features cbf,sqlite -- \ --network $NETWORK \ wallet \ --wallet sender_wallet \ --ext-descriptor "$SENDER_EXT_DESC" \ --int-descriptor "$SENDER_INT_DESC" \ --database-type $DATABASE_TYPE \ sign "$PSBT" | jq -r '.psbt') ``` Step 3: Broadcast transaction ``` cargo run --features cbf,sqlite -- \ --network $NETWORK \ wallet \ --wallet sender_wallet \ --ext-descriptor "$SENDER_EXT_DESC" \ --int-descriptor "$SENDER_INT_DESC" \ --database-type $DATABASE_TYPE \ --client-type $CLIENT_TYPE \ --cbf-peer $CBF_PEER \ --cbf-conn-count $CBF_CONN_COUNT \ broadcast --psbt "$SIGNED_PSBT" ``` Mine a block to confirm `bitcoin-cli -regtest generatetoaddress 1 $(bitcoin-cli -regtest getnewaddress)` Checking Transaction Status After broadcasting, wait a moment and sync your wallet: Sync wallet ``` cargo run --features cbf,sqlite -- \ --network $NETWORK \ wallet \ --wallet receiver_wallet \ --ext-descriptor "$RECEIVER_EXT_DESC" \ --int-descriptor "$RECEIVER_INT_DESC" \ --database-type $DATABASE_TYPE \ --client-type $CLIENT_TYPE \ --cbf-peer $CBF_PEER \ --cbf-conn-count $CBF_CONN_COUNT \ sync ``` Check balance ``` cargo run --features cbf,sqlite -- \ --network $NETWORK \ wallet \ --wallet receiver_wallet \ --ext-descriptor "$RECEIVER_EXT_DESC" \ --int-descriptor "$RECEIVER_INT_DESC" \ --database-type $DATABASE_TYPE \ balance ``` List recent transactions ``` cargo run --features cbf,sqlite -- \ --network $NETWORK \ wallet \ --wallet sender_wallet \ --ext-descriptor "$SENDER_EXT_DESC" \ --int-descriptor "$SENDER_INT_DESC" \ --database-type $DATABASE_TYPE \ transactions ``` ### Checklists #### All Submissions: * [x] I've signed all my commits * [x] I followed the [contribution guidelines](https://github.com/bitcoindevkit/bdk-cli/blob/master/CONTRIBUTING.md) * [x] I ran `cargo fmt` and `cargo clippy` before committing ACKs for top commit: tvpeter: ACK 974c8d5 notmandatory: ACK 974c8d5 Tree-SHA512: 823eefdde1900fd03fc698466bce414bdba0aceae428d6fbb5af7ef0a0962a0e16948d58badc24b6f99ff9498727c912131d29f46468b421cd54d663e3ec0e8b
2 parents b9cf2ac + 974c8d5 commit 368b8b4

File tree

4 files changed

+184
-190
lines changed

4 files changed

+184
-190
lines changed

src/error.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,34 @@ pub enum BDKCliError {
112112
))]
113113
#[error("Reqwest error: {0}")]
114114
ReqwestError(#[from] reqwest::Error),
115+
116+
#[cfg(feature = "payjoin")]
117+
#[error("Payjoin URL parse error: {0}")]
118+
PayjoinUrlParse(#[from] payjoin::IntoUrlError),
119+
120+
#[cfg(feature = "payjoin")]
121+
#[error("Payjoin send response error: {0}")]
122+
PayjoinSendResponse(#[from] payjoin::send::ResponseError),
123+
124+
#[cfg(feature = "payjoin")]
125+
#[error("Payjoin sender build error: {0}")]
126+
PayjoinSenderBuild(#[from] payjoin::send::BuildSenderError),
127+
128+
#[cfg(feature = "payjoin")]
129+
#[error("Payjoin receive error: {0}")]
130+
PayjoinReceive(#[from] payjoin::receive::Error),
131+
132+
#[cfg(feature = "payjoin")]
133+
#[error("Payjoin selection error: {0}")]
134+
PayjoinSelection(#[from] payjoin::receive::SelectionError),
135+
136+
#[cfg(feature = "payjoin")]
137+
#[error("Payjoin input contribution error: {0}")]
138+
PayjoinInputContribution(#[from] payjoin::receive::InputContributionError),
139+
140+
#[cfg(feature = "payjoin")]
141+
#[error("Payjoin create request error: {0}")]
142+
PayjoinCreateRequest(#[from] payjoin::send::v2::CreateRequestError),
115143
}
116144

117145
impl From<ExtractTxError> for BDKCliError {

src/handlers.rs

Lines changed: 22 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use crate::config::{WalletConfig, WalletConfigInner};
1515
use crate::error::BDKCliError as Error;
1616
#[cfg(any(feature = "sqlite", feature = "redb"))]
1717
use crate::persister::Persister;
18+
#[cfg(feature = "cbf")]
19+
use crate::utils::BlockchainClient::KyotoClient;
1820
use crate::utils::*;
1921
#[cfg(feature = "redb")]
2022
use bdk_redb::Store as RedbStore;
@@ -46,8 +48,6 @@ use bdk_wallet::{
4648
};
4749
use cli_table::{Cell, CellStruct, Style, Table, format::Justify};
4850
use serde_json::json;
49-
#[cfg(feature = "cbf")]
50-
use {crate::utils::BlockchainClient::KyotoClient, bdk_kyoto::LightClient, tokio::select};
5151

5252
#[cfg(feature = "electrum")]
5353
use crate::utils::BlockchainClient::Electrum;
@@ -605,7 +605,7 @@ pub fn handle_offline_wallet_subcommand(
605605
))]
606606
pub(crate) async fn handle_online_wallet_subcommand(
607607
wallet: &mut Wallet,
608-
client: BlockchainClient,
608+
client: &BlockchainClient,
609609
online_subcommand: OnlineWalletSubCommand,
610610
) -> Result<String, Error> {
611611
match online_subcommand {
@@ -632,7 +632,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
632632
client
633633
.populate_tx_cache(wallet.tx_graph().full_txs().map(|tx_node| tx_node.tx));
634634

635-
let update = client.full_scan(request, _stop_gap, batch_size, false)?;
635+
let update = client.full_scan(request, _stop_gap, *batch_size, false)?;
636636
wallet.apply_update(update)?;
637637
}
638638
#[cfg(feature = "esplora")]
@@ -641,7 +641,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
641641
parallel_requests,
642642
} => {
643643
let update = client
644-
.full_scan(request, _stop_gap, parallel_requests)
644+
.full_scan(request, _stop_gap, *parallel_requests)
645645
.await
646646
.map_err(|e| *e)?;
647647
wallet.apply_update(update)?;
@@ -658,7 +658,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
658658
hash: genesis_block.block_hash(),
659659
});
660660
let mut emitter = Emitter::new(
661-
&*client,
661+
client.as_ref(),
662662
genesis_cp.clone(),
663663
genesis_cp.height(),
664664
NO_EXPECTED_MEMPOOL_TXS,
@@ -1246,7 +1246,7 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
12461246

12471247
let result = handle_online_wallet_subcommand(
12481248
&mut wallet,
1249-
blockchain_client,
1249+
&blockchain_client,
12501250
online_subcommand,
12511251
)
12521252
.await?;
@@ -1258,7 +1258,7 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
12581258
let mut wallet = new_wallet(network, wallet_opts)?;
12591259
let blockchain_client =
12601260
crate::utils::new_blockchain_client(wallet_opts, &wallet, database_path)?;
1261-
handle_online_wallet_subcommand(&mut wallet, blockchain_client, online_subcommand)
1261+
handle_online_wallet_subcommand(&mut wallet, &blockchain_client, online_subcommand)
12621262
.await?
12631263
};
12641264
Ok(result)
@@ -1452,7 +1452,7 @@ async fn respond(
14521452
} => {
14531453
let blockchain =
14541454
new_blockchain_client(wallet_opts, wallet, _datadir).map_err(|e| e.to_string())?;
1455-
let value = handle_online_wallet_subcommand(wallet, blockchain, online_subcommand)
1455+
let value = handle_online_wallet_subcommand(wallet, &blockchain, online_subcommand)
14561456
.await
14571457
.map_err(|e| e.to_string())?;
14581458
Some(value)
@@ -1508,7 +1508,7 @@ async fn respond(
15081508
feature = "rpc"
15091509
))]
15101510
/// Syncs a given wallet using the blockchain client.
1511-
pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Result<(), Error> {
1511+
pub async fn sync_wallet(client: &BlockchainClient, wallet: &mut Wallet) -> Result<(), Error> {
15121512
#[cfg(any(feature = "electrum", feature = "esplora"))]
15131513
let request = wallet
15141514
.start_sync_with_revealed_spks()
@@ -1523,7 +1523,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul
15231523
// already have.
15241524
client.populate_tx_cache(wallet.tx_graph().full_txs().map(|tx_node| tx_node.tx));
15251525

1526-
let update = client.sync(request, batch_size, false)?;
1526+
let update = client.sync(request, *batch_size, false)?;
15271527
wallet
15281528
.apply_update(update)
15291529
.map_err(|e| Error::Generic(e.to_string()))
@@ -1534,7 +1534,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul
15341534
parallel_requests,
15351535
} => {
15361536
let update = client
1537-
.sync(request, parallel_requests)
1537+
.sync(request, *parallel_requests)
15381538
.await
15391539
.map_err(|e| *e)?;
15401540
wallet
@@ -1549,7 +1549,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul
15491549
// reload the last 200 blocks in case of a reorg
15501550
let emitter_height = wallet_cp.height().saturating_sub(200);
15511551
let mut emitter = Emitter::new(
1552-
&*client,
1552+
client.as_ref(),
15531553
wallet_cp,
15541554
emitter_height,
15551555
wallet
@@ -1600,7 +1600,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul
16001600
))]
16011601
/// Broadcasts a given transaction using the blockchain client.
16021602
pub async fn broadcast_transaction(
1603-
client: BlockchainClient,
1603+
client: &BlockchainClient,
16041604
tx: Transaction,
16051605
) -> Result<Txid, Error> {
16061606
match client {
@@ -1627,38 +1627,15 @@ pub async fn broadcast_transaction(
16271627

16281628
#[cfg(feature = "cbf")]
16291629
KyotoClient { client } => {
1630-
let LightClient {
1631-
requester,
1632-
mut info_subscriber,
1633-
mut warning_subscriber,
1634-
update_subscriber: _,
1635-
node,
1636-
} = *client;
1637-
1638-
let subscriber = tracing_subscriber::FmtSubscriber::new();
1639-
tracing::subscriber::set_global_default(subscriber)
1640-
.map_err(|e| Error::Generic(format!("SetGlobalDefault error: {e}")))?;
1641-
1642-
tokio::task::spawn(async move { node.run().await });
1643-
tokio::task::spawn(async move {
1644-
select! {
1645-
info = info_subscriber.recv() => {
1646-
if let Some(info) = info {
1647-
tracing::info!("{info}");
1648-
}
1649-
},
1650-
warn = warning_subscriber.recv() => {
1651-
if let Some(warn) = warn {
1652-
tracing::warn!("{warn}");
1653-
}
1654-
}
1655-
}
1656-
});
16571630
let txid = tx.compute_txid();
1658-
let wtxid = requester.broadcast_random(tx.clone()).await.map_err(|_| {
1659-
tracing::warn!("Broadcast was unsuccessful");
1660-
Error::Generic("Transaction broadcast timed out after 30 seconds".into())
1661-
})?;
1631+
let wtxid = client
1632+
.requester
1633+
.broadcast_random(tx.clone())
1634+
.await
1635+
.map_err(|_| {
1636+
tracing::warn!("Broadcast was unsuccessful");
1637+
Error::Generic("Transaction broadcast timed out after 30 seconds".into())
1638+
})?;
16621639
tracing::info!("Successfully broadcast WTXID: {wtxid}");
16631640
Ok(txid)
16641641
}

0 commit comments

Comments
 (0)