Skip to content

Commit e648fe7

Browse files
authored
Merge pull request #45 from TheBlueMatt-macbook/2025-12-async-opts
2 parents 0474476 + cf3f337 commit e648fe7

3 files changed

Lines changed: 132 additions & 75 deletions

File tree

orange-sdk/Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ name = "orange_sdk"
1414

1515
[features]
1616
default = ["spark"]
17-
uniffi = ["dep:uniffi", "spark", "cashu", "rand"]
17+
uniffi = ["dep:uniffi", "spark", "cashu", "rand", "pin-project-lite"]
1818
spark = ["breez-sdk-spark", "uuid", "serde_json"]
1919
cashu = ["cdk", "serde_json"]
2020
_test-utils = ["corepc-node", 'electrsd', "cashu", "uuid/v7", "rand"]
@@ -29,7 +29,7 @@ bitcoin-payment-instructions = { workspace = true, features = ["http"] }
2929
chrono = { version = "0.4", default-features = false }
3030
rand = { version = "0.8.5", optional = true }
3131
breez-sdk-spark = { git = "https://github.com/breez/spark-sdk.git", rev = "5f25eb6185950545e2ca170971667adc43c3544f", default-features = false, features = ["rustls-tls"], optional = true }
32-
tokio = { version = "1.0", default-features = false, features = ["rt-multi-thread", "sync"] }
32+
tokio = { version = "1.0", default-features = false, features = ["rt-multi-thread", "sync", "macros"] }
3333
uuid = { version = "1.0", default-features = false, optional = true }
3434
cdk = { version = "0.14.2", default-features = false, features = ["wallet"], optional = true }
3535
serde_json = { version = "1.0", optional = true }
@@ -43,7 +43,8 @@ cdk-sqlite = { version = "0.14.2", optional = true }
4343
cdk-axum = { version = "0.14.2", optional = true }
4444
axum = { version = "0.8.1", optional = true }
4545

46-
uniffi = { version = "0.29", features = ["cli", "tokio"], optional = true }
46+
uniffi = { version = "0.29", default-features = false, features = ["cli"], optional = true }
47+
pin-project-lite = { version = "0.2", default-features = false, optional = true }
4748

4849
[dev-dependencies]
4950
test-log = "0.2.18"

orange-sdk/src/ffi/orange/wallet.rs

Lines changed: 63 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ use crate::ffi::orange::Event;
1010
use crate::ffi::orange::config::{Tunables, WalletConfig};
1111
use crate::ffi::orange::error::{InitFailure, WalletError};
1212
use crate::{impl_from_core_type, impl_into_core_type};
13+
14+
use std::pin::Pin;
1315
use std::sync::Arc;
16+
use std::task::{Context, Poll};
1417

1518
/// Represents the balances of the wallet, including available and pending balances.
1619
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, uniffi::Object)]
@@ -85,52 +88,82 @@ impl SingleUseReceiveUri {
8588
impl_from_core_type!(OrangeSingleUseReceiveUri, SingleUseReceiveUri);
8689
impl_into_core_type!(SingleUseReceiveUri, OrangeSingleUseReceiveUri);
8790

91+
// This is basically the `async-compat` utility that UniFFI uses under
92+
// the hood for its `async_runtime = "tokio"` attribute, except it lets
93+
// us use our own runtime to avoid the redundant runtimes implicit in
94+
// the `async-compat` single-threaded runtime.
95+
pin_project_lite::pin_project! {
96+
struct RTPoller<T> {
97+
#[pin]
98+
inner: T,
99+
rt: Arc<tokio::runtime::Runtime>,
100+
}
101+
}
102+
103+
impl<T> RTPoller<T> {
104+
fn new(inner: T, rt: Arc<tokio::runtime::Runtime>) -> Self {
105+
Self { inner, rt }
106+
}
107+
}
108+
109+
impl<T: Future> Future for RTPoller<T> {
110+
type Output = T::Output;
111+
112+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
113+
let _guard = self.rt.enter();
114+
self.project().inner.poll(cx)
115+
}
116+
}
117+
88118
#[derive(Clone, uniffi::Object)]
89119
pub struct Wallet {
90120
inner: Arc<OrangeWallet>,
91-
_rt: Arc<tokio::runtime::Runtime>,
121+
rt: Arc<tokio::runtime::Runtime>,
92122
}
93123

94-
#[uniffi::export(async_runtime = "tokio")]
124+
#[uniffi::export]
95125
impl Wallet {
96126
#[uniffi::constructor]
97-
pub fn new(config: WalletConfig) -> Result<Self, InitFailure> {
98-
let rt = Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build()?);
127+
pub async fn new(config: WalletConfig) -> Result<Self, InitFailure> {
128+
let rt = Arc::new(
129+
tokio::runtime::Builder::new_multi_thread().worker_threads(2).enable_all().build()?,
130+
);
99131

100132
let config: OrangeWalletConfig = config.try_into()?;
101133

102-
let inner = rt.block_on(async move { OrangeWallet::new(config).await })?;
134+
let inner = RTPoller::new(OrangeWallet::new(config), Arc::clone(&rt)).await?;
103135

104-
Ok(Wallet { inner: Arc::new(inner), _rt: rt })
136+
Ok(Wallet { inner: Arc::new(inner), rt })
105137
}
106138

107139
pub fn node_id(&self) -> String {
108140
self.inner.node_id().to_string()
109141
}
110142

111143
pub async fn get_balance(&self) -> Result<Balances, WalletError> {
112-
let balance = self.inner.get_balance().await?;
144+
let balance = RTPoller::new(self.inner.get_balance(), Arc::clone(&self.rt)).await?;
113145
Ok(balance.into())
114146
}
115147

116-
pub async fn is_connected_to_lsp(&self) -> bool {
148+
pub fn is_connected_to_lsp(&self) -> bool {
117149
self.inner.is_connected_to_lsp()
118150
}
119151

120152
/// Sets whether the wallet should automatically rebalance from trusted/onchain to lightning.
121153
pub async fn set_rebalance_enabled(&self, value: bool) {
122-
self.inner.set_rebalance_enabled(value).await
154+
RTPoller::new(self.inner.set_rebalance_enabled(value), Arc::clone(&self.rt)).await
123155
}
124156

125157
/// Whether the wallet should automatically rebalance from trusted/onchain to lightning.
126158
pub async fn get_rebalance_enabled(&self) -> bool {
127-
self.inner.get_rebalance_enabled().await
159+
RTPoller::new(self.inner.get_rebalance_enabled(), Arc::clone(&self.rt)).await
128160
}
129161

130162
pub async fn list_transactions(
131163
&self,
132164
) -> Result<Vec<std::sync::Arc<crate::ffi::orange::Transaction>>, WalletError> {
133-
let transactions = self.inner.list_transactions().await?;
165+
let transactions =
166+
RTPoller::new(self.inner.list_transactions(), Arc::clone(&self.rt)).await?;
134167
Ok(transactions.into_iter().map(|tx| std::sync::Arc::new(tx.into())).collect())
135168
}
136169

@@ -144,7 +177,11 @@ impl Wallet {
144177
pub async fn parse_payment_instructions(
145178
&self, instructions: String,
146179
) -> Result<PaymentInstructions, ParseError> {
147-
let result = self.inner.parse_payment_instructions(&instructions).await?;
180+
let result = RTPoller::new(
181+
self.inner.parse_payment_instructions(&instructions),
182+
Arc::clone(&self.rt),
183+
)
184+
.await?;
148185
Ok(result.into())
149186
}
150187

@@ -159,7 +196,7 @@ impl Wallet {
159196
pub async fn pay(
160197
&self, payment_info: Arc<PaymentInfo>,
161198
) -> Result<super::PaymentId, WalletError> {
162-
let id = self.inner.pay(&payment_info.0).await?;
199+
let id = RTPoller::new(self.inner.pay(&payment_info.0), Arc::clone(&self.rt)).await?;
163200
Ok(id.into())
164201
}
165202

@@ -170,18 +207,26 @@ impl Wallet {
170207
pub async fn estimate_fee(
171208
&self, payment_info: Arc<PaymentInfo>,
172209
) -> Result<Arc<Amount>, WalletError> {
173-
let fee = self.inner.estimate_fee(&payment_info.0.instructions).await;
210+
let fee = RTPoller::new(
211+
self.inner.estimate_fee(&payment_info.0.instructions),
212+
Arc::clone(&self.rt),
213+
)
214+
.await;
174215
Ok(Arc::new(fee.into()))
175216
}
176217

177218
pub async fn stop(&self) {
178-
self.inner.stop().await
219+
RTPoller::new(self.inner.stop(), Arc::clone(&self.rt)).await
179220
}
180221

181222
pub async fn get_single_use_receive_uri(
182223
&self, amount: Option<Arc<Amount>>,
183224
) -> Result<SingleUseReceiveUri, WalletError> {
184-
let uri = self.inner.get_single_use_receive_uri(amount.map(|a| a.0)).await?;
225+
let uri = RTPoller::new(
226+
self.inner.get_single_use_receive_uri(amount.map(|a| a.0)),
227+
Arc::clone(&self.rt),
228+
)
229+
.await?;
185230
Ok(uri.into())
186231
}
187232

@@ -192,7 +237,7 @@ impl Wallet {
192237

193238
/// List our current channels
194239
pub async fn close_channels(&self) -> Result<(), WalletError> {
195-
self.inner.close_channels().await?;
240+
RTPoller::new(self.inner.close_channels(), Arc::clone(&self.rt)).await?;
196241
Ok(())
197242
}
198243

@@ -230,7 +275,7 @@ impl Wallet {
230275
/// **Caution:** Users must handle events as quickly as possible to prevent a large event backlog,
231276
/// which can increase the memory footprint of [`crate::Wallet`].
232277
pub async fn next_event_async(&self) -> Event {
233-
self.inner.next_event_async().await.into()
278+
RTPoller::new(self.inner.next_event_async(), Arc::clone(&self.rt)).await.into()
234279
}
235280

236281
/// Returns the next event in the event queue.

orange-sdk/src/lib.rs

Lines changed: 65 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ pub enum ChainSource {
208208
}
209209

210210
/// Configuration for initializing the wallet.
211+
#[derive(Clone)]
211212
pub struct WalletConfig {
212213
/// Configuration for wallet storage.
213214
pub storage_config: StorageConfig,
@@ -528,58 +529,66 @@ impl Wallet {
528529

529530
let tx_metadata = TxMetadataStore::new(Arc::clone(&store)).await;
530531

531-
let trusted: Arc<Box<DynTrustedWalletInterface>> = match &config.extra_config {
532-
#[cfg(feature = "spark")]
533-
ExtraConfig::Spark(sp) => Arc::new(Box::new(
534-
Spark::init(
535-
&config,
536-
*sp,
537-
Arc::clone(&store),
538-
Arc::clone(&event_queue),
539-
tx_metadata.clone(),
540-
Arc::clone(&logger),
541-
Arc::clone(&runtime),
542-
)
543-
.await?,
544-
)),
545-
#[cfg(feature = "cashu")]
546-
ExtraConfig::Cashu(cashu) => Arc::new(Box::new(
547-
Cashu::init(
548-
&config,
549-
cashu.clone(),
550-
Arc::clone(&store),
551-
Arc::clone(&event_queue),
552-
tx_metadata.clone(),
553-
Arc::clone(&logger),
554-
Arc::clone(&runtime),
555-
)
556-
.await?,
557-
)),
558-
#[cfg(feature = "_test-utils")]
559-
ExtraConfig::Dummy(cfg) => Arc::new(Box::new(
560-
DummyTrustedWallet::new(
561-
cfg.uuid,
562-
&cfg.lsp,
563-
&cfg.bitcoind,
564-
tx_metadata.clone(),
565-
Arc::clone(&event_queue),
566-
Arc::clone(&runtime),
567-
)
568-
.await,
569-
)),
570-
};
571-
572-
let ln_wallet = Arc::new(
573-
LightningWallet::init(
574-
Arc::clone(&runtime),
575-
config,
576-
Arc::clone(&store),
577-
Arc::clone(&event_queue),
578-
tx_metadata.clone(),
579-
Arc::clone(&logger),
580-
)
581-
.await?,
532+
let (trusted, ln_wallet) = tokio::join!(
533+
async {
534+
let trusted: Arc<Box<DynTrustedWalletInterface>> = match &config.extra_config {
535+
#[cfg(feature = "spark")]
536+
ExtraConfig::Spark(sp) => Arc::new(Box::new(
537+
Spark::init(
538+
&config,
539+
*sp,
540+
Arc::clone(&store),
541+
Arc::clone(&event_queue),
542+
tx_metadata.clone(),
543+
Arc::clone(&logger),
544+
Arc::clone(&runtime),
545+
)
546+
.await?,
547+
)),
548+
#[cfg(feature = "cashu")]
549+
ExtraConfig::Cashu(cashu) => Arc::new(Box::new(
550+
Cashu::init(
551+
&config,
552+
cashu.clone(),
553+
Arc::clone(&store),
554+
Arc::clone(&event_queue),
555+
tx_metadata.clone(),
556+
Arc::clone(&logger),
557+
Arc::clone(&runtime),
558+
)
559+
.await?,
560+
)),
561+
#[cfg(feature = "_test-utils")]
562+
ExtraConfig::Dummy(cfg) => Arc::new(Box::new(
563+
DummyTrustedWallet::new(
564+
cfg.uuid,
565+
&cfg.lsp,
566+
&cfg.bitcoind,
567+
tx_metadata.clone(),
568+
Arc::clone(&event_queue),
569+
Arc::clone(&runtime),
570+
)
571+
.await,
572+
)),
573+
};
574+
Ok::<_, InitFailure>(trusted)
575+
},
576+
async {
577+
Ok::<_, InitFailure>(Arc::new(
578+
LightningWallet::init(
579+
Arc::clone(&runtime),
580+
config.clone(),
581+
Arc::clone(&store),
582+
Arc::clone(&event_queue),
583+
tx_metadata.clone(),
584+
Arc::clone(&logger),
585+
)
586+
.await?,
587+
))
588+
},
582589
);
590+
let trusted = trusted?;
591+
let ln_wallet = ln_wallet?;
583592

584593
let wt = Arc::new(WalletTrusted(Arc::clone(&trusted)));
585594

@@ -669,12 +678,14 @@ impl Wallet {
669678

670679
/// Lists the transactions which have been made.
671680
pub async fn list_transactions(&self) -> Result<Vec<Transaction>, WalletError> {
672-
let trusted_payments = self.inner.trusted.list_payments().await?;
681+
let (trusted_payments, splice_outs) = tokio::join!(
682+
self.inner.trusted.list_payments(),
683+
store::read_splice_outs(self.inner.store.as_ref())
684+
);
685+
let trusted_payments = trusted_payments?;
673686
let mut lightning_payments = self.inner.ln_wallet.list_payments();
674687
lightning_payments.sort_by_key(|l| l.latest_update_timestamp);
675688

676-
let splice_outs = store::read_splice_outs(self.inner.store.as_ref()).await;
677-
678689
let mut res = Vec::with_capacity(
679690
trusted_payments.len() + lightning_payments.len() + splice_outs.len(),
680691
);

0 commit comments

Comments
 (0)