Skip to content

Commit 65c98bc

Browse files
committed
Use BDK's async wallet persister
Persist the on-chain wallet through BDK's AsyncWalletPersister so wallet state writes use the async KVStore path. Existing synchronous wallet APIs keep bridging through the node runtime until their callers are made async. Co-Authored-By: HAL 9000
1 parent b6176b8 commit 65c98bc

3 files changed

Lines changed: 100 additions & 82 deletions

File tree

src/builder.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1537,17 +1537,18 @@ fn build_with_store_internal(
15371537

15381538
let descriptor = Bip84(xprv, KeychainKind::External);
15391539
let change_descriptor = Bip84(xprv, KeychainKind::Internal);
1540-
let mut wallet_persister = KVStoreWalletPersister::new(
1541-
Arc::clone(&kv_store),
1542-
Arc::clone(&runtime),
1543-
Arc::clone(&logger),
1544-
);
1545-
let wallet_opt = BdkWallet::load()
1546-
.descriptor(KeychainKind::External, Some(descriptor.clone()))
1547-
.descriptor(KeychainKind::Internal, Some(change_descriptor.clone()))
1548-
.extract_keys()
1549-
.check_network(config.network)
1550-
.load_wallet(&mut wallet_persister)
1540+
let mut wallet_persister =
1541+
KVStoreWalletPersister::new(Arc::clone(&kv_store), Arc::clone(&logger));
1542+
let wallet_opt = runtime
1543+
.block_on(async {
1544+
BdkWallet::load()
1545+
.descriptor(KeychainKind::External, Some(descriptor.clone()))
1546+
.descriptor(KeychainKind::Internal, Some(change_descriptor.clone()))
1547+
.extract_keys()
1548+
.check_network(config.network)
1549+
.load_wallet_async(&mut wallet_persister)
1550+
.await
1551+
})
15511552
.map_err(|e| match e {
15521553
bdk_wallet::LoadWithPersistError::InvalidChangeSet(
15531554
bdk_wallet::LoadError::Mismatch(bdk_wallet::LoadMismatch::Network {
@@ -1571,9 +1572,13 @@ fn build_with_store_internal(
15711572
let bdk_wallet = match wallet_opt {
15721573
Some(wallet) => wallet,
15731574
None => {
1574-
let mut wallet = BdkWallet::create(descriptor, change_descriptor)
1575-
.network(config.network)
1576-
.create_wallet(&mut wallet_persister)
1575+
let mut wallet = runtime
1576+
.block_on(async {
1577+
BdkWallet::create(descriptor, change_descriptor)
1578+
.network(config.network)
1579+
.create_wallet_async(&mut wallet_persister)
1580+
.await
1581+
})
15771582
.map_err(|e| {
15781583
log_error!(logger, "Failed to set up wallet: {}", e);
15791584
BuildError::WalletSetupFailed

src/wallet/mod.rs

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -161,10 +161,12 @@ impl Wallet {
161161
})?;
162162

163163
let mut locked_persister = self.persister.lock().expect("lock");
164-
locked_wallet.persist(&mut locked_persister).map_err(|e| {
165-
log_error!(self.logger, "Failed to persist wallet: {}", e);
166-
Error::PersistenceFailed
167-
})?;
164+
self.runtime.block_on(locked_wallet.persist_async(&mut locked_persister)).map_err(
165+
|e| {
166+
log_error!(self.logger, "Failed to persist wallet: {}", e);
167+
Error::PersistenceFailed
168+
},
169+
)?;
168170

169171
Ok(())
170172
},
@@ -214,7 +216,7 @@ impl Wallet {
214216
})?;
215217

216218
let mut locked_persister = self.persister.lock().expect("lock");
217-
locked_wallet.persist(&mut locked_persister).map_err(|e| {
219+
self.runtime.block_on(locked_wallet.persist_async(&mut locked_persister)).map_err(|e| {
218220
log_error!(self.logger, "Failed to persist wallet: {}", e);
219221
Error::PersistenceFailed
220222
})?;
@@ -474,7 +476,7 @@ impl Wallet {
474476
}
475477

476478
let mut locked_persister = self.persister.lock().expect("lock");
477-
locked_wallet.persist(&mut locked_persister).map_err(|e| {
479+
self.runtime.block_on(locked_wallet.persist_async(&mut locked_persister)).map_err(|e| {
478480
log_error!(self.logger, "Failed to persist wallet: {}", e);
479481
Error::PersistenceFailed
480482
})?;
@@ -492,7 +494,7 @@ impl Wallet {
492494
let mut locked_persister = self.persister.lock().expect("lock");
493495

494496
let address_info = locked_wallet.reveal_next_address(KeychainKind::External);
495-
locked_wallet.persist(&mut locked_persister).map_err(|e| {
497+
self.runtime.block_on(locked_wallet.persist_async(&mut locked_persister)).map_err(|e| {
496498
log_error!(self.logger, "Failed to persist wallet: {}", e);
497499
Error::PersistenceFailed
498500
})?;
@@ -504,7 +506,7 @@ impl Wallet {
504506
let mut locked_persister = self.persister.lock().expect("lock");
505507

506508
let address_info = locked_wallet.next_unused_address(KeychainKind::Internal);
507-
locked_wallet.persist(&mut locked_persister).map_err(|e| {
509+
self.runtime.block_on(locked_wallet.persist_async(&mut locked_persister)).map_err(|e| {
508510
log_error!(self.logger, "Failed to persist wallet: {}", e);
509511
Error::PersistenceFailed
510512
})?;
@@ -516,7 +518,7 @@ impl Wallet {
516518
let mut locked_persister = self.persister.lock().expect("lock");
517519

518520
locked_wallet.cancel_tx(tx);
519-
locked_wallet.persist(&mut locked_persister).map_err(|e| {
521+
self.runtime.block_on(locked_wallet.persist_async(&mut locked_persister)).map_err(|e| {
520522
log_error!(self.logger, "Failed to persist wallet: {}", e);
521523
Error::PersistenceFailed
522524
})?;
@@ -854,10 +856,12 @@ impl Wallet {
854856
}
855857

856858
let mut locked_persister = self.persister.lock().expect("lock");
857-
locked_wallet.persist(&mut locked_persister).map_err(|e| {
858-
log_error!(self.logger, "Failed to persist wallet: {}", e);
859-
Error::PersistenceFailed
860-
})?;
859+
self.runtime.block_on(locked_wallet.persist_async(&mut locked_persister)).map_err(
860+
|e| {
861+
log_error!(self.logger, "Failed to persist wallet: {}", e);
862+
Error::PersistenceFailed
863+
},
864+
)?;
861865

862866
psbt.extract_tx().map_err(|e| {
863867
log_error!(self.logger, "Failed to extract transaction: {}", e);
@@ -972,10 +976,12 @@ impl Wallet {
972976
.find(|txout| must_pay_to.iter().all(|output| output != txout));
973977

974978
if change_output.is_some() {
975-
locked_wallet.persist(&mut locked_persister).map_err(|e| {
976-
log_error!(self.logger, "Failed to persist wallet: {}", e);
977-
()
978-
})?;
979+
self.runtime.block_on(locked_wallet.persist_async(&mut locked_persister)).map_err(
980+
|e| {
981+
log_error!(self.logger, "Failed to persist wallet: {}", e);
982+
()
983+
},
984+
)?;
979985
}
980986

981987
Ok(CoinSelection { confirmed_utxos, change_output })
@@ -1080,7 +1086,7 @@ impl Wallet {
10801086
let mut locked_persister = self.persister.lock().expect("lock");
10811087

10821088
let address_info = locked_wallet.next_unused_address(KeychainKind::Internal);
1083-
locked_wallet.persist(&mut locked_persister).map_err(|e| {
1089+
self.runtime.block_on(locked_wallet.persist_async(&mut locked_persister)).map_err(|e| {
10841090
log_error!(self.logger, "Failed to persist wallet: {}", e);
10851091
()
10861092
})?;
@@ -1399,7 +1405,7 @@ impl Wallet {
13991405
}
14001406

14011407
let mut locked_persister = self.persister.lock().expect("lock");
1402-
locked_wallet.persist(&mut locked_persister).map_err(|e| {
1408+
self.runtime.block_on(locked_wallet.persist_async(&mut locked_persister)).map_err(|e| {
14031409
log_error!(self.logger, "Failed to persist wallet after fee bump of {}: {}", txid, e);
14041410
Error::PersistenceFailed
14051411
})?;
@@ -1501,7 +1507,7 @@ impl Listen for Wallet {
15011507
};
15021508

15031509
let mut locked_persister = self.persister.lock().expect("lock");
1504-
match locked_wallet.persist(&mut locked_persister) {
1510+
match self.runtime.block_on(locked_wallet.persist_async(&mut locked_persister)) {
15051511
Ok(_) => (),
15061512
Err(e) => {
15071513
log_error!(self.logger, "Failed to persist on-chain wallet: {}", e);

src/wallet/persist.rs

Lines changed: 55 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -5,45 +5,39 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8+
use std::future::Future;
9+
use std::pin::Pin;
810
use std::sync::Arc;
911

1012
use bdk_chain::Merge;
11-
use bdk_wallet::{ChangeSet, WalletPersister};
13+
use bdk_wallet::{AsyncWalletPersister, ChangeSet};
1214

1315
use crate::io::utils::{
1416
read_bdk_wallet_change_set, write_bdk_wallet_change_descriptor, write_bdk_wallet_descriptor,
1517
write_bdk_wallet_indexer, write_bdk_wallet_local_chain, write_bdk_wallet_network,
1618
write_bdk_wallet_tx_graph,
1719
};
1820
use crate::logger::{log_error, LdkLogger, Logger};
19-
use crate::runtime::Runtime;
2021
use crate::types::DynStore;
2122

2223
pub(crate) struct KVStoreWalletPersister {
2324
latest_change_set: Option<ChangeSet>,
2425
kv_store: Arc<DynStore>,
25-
runtime: Arc<Runtime>,
2626
logger: Arc<Logger>,
2727
}
2828

2929
impl KVStoreWalletPersister {
30-
pub(crate) fn new(kv_store: Arc<DynStore>, runtime: Arc<Runtime>, logger: Arc<Logger>) -> Self {
31-
Self { latest_change_set: None, kv_store, runtime, logger }
30+
pub(crate) fn new(kv_store: Arc<DynStore>, logger: Arc<Logger>) -> Self {
31+
Self { latest_change_set: None, kv_store, logger }
3232
}
33-
}
34-
35-
impl WalletPersister for KVStoreWalletPersister {
36-
type Error = std::io::Error;
3733

38-
fn initialize(persister: &mut Self) -> Result<ChangeSet, Self::Error> {
34+
async fn initialize_inner(&mut self) -> Result<ChangeSet, std::io::Error> {
3935
// Return immediately if we have already been initialized.
40-
if let Some(latest_change_set) = persister.latest_change_set.as_ref() {
36+
if let Some(latest_change_set) = self.latest_change_set.as_ref() {
4137
return Ok(latest_change_set.clone());
4238
}
4339

44-
let change_set_opt = persister
45-
.runtime
46-
.block_on(read_bdk_wallet_change_set(&*persister.kv_store, &*persister.logger))?;
40+
let change_set_opt = read_bdk_wallet_change_set(&*self.kv_store, &*self.logger).await?;
4741

4842
let change_set = match change_set_opt {
4943
Some(persisted_change_set) => persisted_change_set,
@@ -54,18 +48,21 @@ impl WalletPersister for KVStoreWalletPersister {
5448
ChangeSet::default()
5549
},
5650
};
57-
persister.latest_change_set = Some(change_set.clone());
51+
self.latest_change_set = Some(change_set.clone());
5852
Ok(change_set)
5953
}
6054

61-
fn persist(persister: &mut Self, change_set: &ChangeSet) -> Result<(), Self::Error> {
55+
async fn persist_inner(&mut self, change_set: &ChangeSet) -> Result<(), std::io::Error> {
6256
if change_set.is_empty() {
6357
return Ok(());
6458
}
6559

60+
let kv_store = Arc::clone(&self.kv_store);
61+
let logger = Arc::clone(&self.logger);
62+
6663
// We're allowed to fail here if we're not initialized, BDK docs state: "This method can fail if the
6764
// persister is not initialized."
68-
let latest_change_set = persister.latest_change_set.as_mut().ok_or_else(|| {
65+
let latest_change_set = self.latest_change_set.as_mut().ok_or_else(|| {
6966
std::io::Error::new(
7067
std::io::ErrorKind::Other,
7168
"Wallet must be initialized before calling persist",
@@ -80,7 +77,7 @@ impl WalletPersister for KVStoreWalletPersister {
8077
{
8178
debug_assert!(false, "Wallet descriptor must never change");
8279
log_error!(
83-
persister.logger,
80+
logger,
8481
"Wallet change set doesn't match persisted descriptor. This should never happen."
8582
);
8683
return Err(std::io::Error::new(
@@ -89,11 +86,7 @@ impl WalletPersister for KVStoreWalletPersister {
8986
));
9087
} else {
9188
latest_change_set.descriptor = Some(descriptor.clone());
92-
persister.runtime.block_on(write_bdk_wallet_descriptor(
93-
&descriptor,
94-
&*persister.kv_store,
95-
&*persister.logger,
96-
))?;
89+
write_bdk_wallet_descriptor(&descriptor, &*kv_store, Arc::clone(&logger)).await?;
9790
}
9891
}
9992

@@ -103,7 +96,7 @@ impl WalletPersister for KVStoreWalletPersister {
10396
{
10497
debug_assert!(false, "Wallet change_descriptor must never change");
10598
log_error!(
106-
persister.logger,
99+
logger,
107100
"Wallet change set doesn't match persisted change_descriptor. This should never happen."
108101
);
109102
return Err(std::io::Error::new(
@@ -112,19 +105,20 @@ impl WalletPersister for KVStoreWalletPersister {
112105
));
113106
} else {
114107
latest_change_set.change_descriptor = Some(change_descriptor.clone());
115-
persister.runtime.block_on(write_bdk_wallet_change_descriptor(
108+
write_bdk_wallet_change_descriptor(
116109
&change_descriptor,
117-
&*persister.kv_store,
118-
&*persister.logger,
119-
))?;
110+
&*kv_store,
111+
Arc::clone(&logger),
112+
)
113+
.await?;
120114
}
121115
}
122116

123117
if let Some(network) = change_set.network {
124118
if latest_change_set.network.is_some() && latest_change_set.network != Some(network) {
125119
debug_assert!(false, "Wallet network must never change");
126120
log_error!(
127-
persister.logger,
121+
logger,
128122
"Wallet change set doesn't match persisted network. This should never happen."
129123
);
130124
return Err(std::io::Error::new(
@@ -133,11 +127,7 @@ impl WalletPersister for KVStoreWalletPersister {
133127
));
134128
} else {
135129
latest_change_set.network = Some(network);
136-
persister.runtime.block_on(write_bdk_wallet_network(
137-
&network,
138-
&*persister.kv_store,
139-
&*persister.logger,
140-
))?;
130+
write_bdk_wallet_network(&network, &*kv_store, Arc::clone(&logger)).await?;
141131
}
142132
}
143133

@@ -157,31 +147,48 @@ impl WalletPersister for KVStoreWalletPersister {
157147
// particular order.
158148
if !change_set.indexer.is_empty() {
159149
latest_change_set.indexer.merge(change_set.indexer.clone());
160-
persister.runtime.block_on(write_bdk_wallet_indexer(
161-
&latest_change_set.indexer,
162-
&*persister.kv_store,
163-
Arc::clone(&persister.logger),
164-
))?;
150+
write_bdk_wallet_indexer(&latest_change_set.indexer, &*kv_store, Arc::clone(&logger))
151+
.await?;
165152
}
166153

167154
if !change_set.tx_graph.is_empty() {
168155
latest_change_set.tx_graph.merge(change_set.tx_graph.clone());
169-
persister.runtime.block_on(write_bdk_wallet_tx_graph(
170-
&latest_change_set.tx_graph,
171-
&*persister.kv_store,
172-
Arc::clone(&persister.logger),
173-
))?;
156+
write_bdk_wallet_tx_graph(&latest_change_set.tx_graph, &*kv_store, Arc::clone(&logger))
157+
.await?;
174158
}
175159

176160
if !change_set.local_chain.is_empty() {
177161
latest_change_set.local_chain.merge(change_set.local_chain.clone());
178-
persister.runtime.block_on(write_bdk_wallet_local_chain(
162+
write_bdk_wallet_local_chain(
179163
&latest_change_set.local_chain,
180-
&*persister.kv_store,
181-
Arc::clone(&persister.logger),
182-
))?;
164+
&*kv_store,
165+
Arc::clone(&logger),
166+
)
167+
.await?;
183168
}
184169

185170
Ok(())
186171
}
187172
}
173+
174+
impl AsyncWalletPersister for KVStoreWalletPersister {
175+
type Error = std::io::Error;
176+
177+
fn initialize<'a>(
178+
persister: &'a mut Self,
179+
) -> Pin<Box<dyn Future<Output = Result<ChangeSet, Self::Error>> + Send + 'a>>
180+
where
181+
Self: 'a,
182+
{
183+
Box::pin(persister.initialize_inner())
184+
}
185+
186+
fn persist<'a>(
187+
persister: &'a mut Self, change_set: &'a ChangeSet,
188+
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'a>>
189+
where
190+
Self: 'a,
191+
{
192+
Box::pin(persister.persist_inner(change_set))
193+
}
194+
}

0 commit comments

Comments
 (0)