Skip to content

Commit 7128171

Browse files
author
Matt Corallo
committed
Use our own tokio runtime rather than the UniFFI-default one
When we use `#[uniffi::export(async_runtime = "tokio")]` under the hood it uses the `async-compat` crate to build a single-threaded tokio runtime and then spawn a single thread to poll background tasks. Because `ldk-node` requires a multi-threaded runtime to spawn blocking tasks, however, we build a fresh tokio runtime in the `Wallet::new` constructor and let it run in the background as well. This causes thread blowup and is a bit confusing as we end up polling futures on multiple tokio runtimes, some of which are kinda-single-threaded. Instead, we just copy the 20 relevant lines out of `async-compat` and spawn the `Wallet` futures on the tokio runtime we built for `ldk-node`.
1 parent cc12141 commit 7128171

2 files changed

Lines changed: 63 additions & 19 deletions

File tree

orange-sdk/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ name = "orange_sdk"
1414

1515
[features]
1616
default = ["spark"]
17-
uniffi = ["dep:uniffi", "spark", "cashu", "rand"]
17+
uniffi = ["dep:uniffi", "spark", "cashu", "rand", "pin-project-lite"]
1818
spark = ["breez-sdk-spark", "uuid", "serde_json"]
1919
cashu = ["cdk", "serde_json"]
2020
_test-utils = ["corepc-node", 'electrsd', "cashu", "uuid/v7", "rand"]
@@ -43,7 +43,8 @@ cdk-sqlite = { version = "0.14.2", optional = true }
4343
cdk-axum = { version = "0.14.2", optional = true }
4444
axum = { version = "0.8.1", optional = true }
4545

46-
uniffi = { version = "0.29", features = ["cli", "tokio"], optional = true }
46+
uniffi = { version = "0.29", default-features = false, features = ["cli"], optional = true }
47+
pin-project-lite = { version = "0.2", default-features = false, optional = true }
4748

4849
[dev-dependencies]
4950
test-log = "0.2.18"

orange-sdk/src/ffi/orange/wallet.rs

Lines changed: 60 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ use crate::ffi::orange::Event;
1010
use crate::ffi::orange::config::{Tunables, WalletConfig};
1111
use crate::ffi::orange::error::{InitFailure, WalletError};
1212
use crate::{impl_from_core_type, impl_into_core_type};
13+
14+
use std::pin::Pin;
1315
use std::sync::Arc;
16+
use std::task::{Context, Poll};
1417

1518
/// Represents the balances of the wallet, including available and pending balances.
1619
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, uniffi::Object)]
@@ -85,52 +88,80 @@ impl SingleUseReceiveUri {
8588
impl_from_core_type!(OrangeSingleUseReceiveUri, SingleUseReceiveUri);
8689
impl_into_core_type!(SingleUseReceiveUri, OrangeSingleUseReceiveUri);
8790

91+
// This is basically the `async-compat` utility that UniFFI uses under
92+
// the hood for its `async_runtime = "tokio"` attribute, except it lets
93+
// us use our own runtime to avoid the redundant runtimes implicit in
94+
// the `async-compat` single-threaded runtime.
95+
pin_project_lite::pin_project! {
96+
struct RTPoller<T> {
97+
#[pin]
98+
inner: T,
99+
rt: Arc<tokio::runtime::Runtime>,
100+
}
101+
}
102+
103+
impl<T> RTPoller<T> {
104+
fn new(inner: T, rt: Arc<tokio::runtime::Runtime>) -> Self {
105+
Self { inner, rt }
106+
}
107+
}
108+
109+
impl<T: Future> Future for RTPoller<T> {
110+
type Output = T::Output;
111+
112+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
113+
let _guard = self.rt.enter();
114+
self.project().inner.poll(cx)
115+
}
116+
}
117+
88118
#[derive(Clone, uniffi::Object)]
89119
pub struct Wallet {
90120
inner: Arc<OrangeWallet>,
91-
_rt: Arc<tokio::runtime::Runtime>,
121+
rt: Arc<tokio::runtime::Runtime>,
92122
}
93123

94-
#[uniffi::export(async_runtime = "tokio")]
124+
#[uniffi::export]
95125
impl Wallet {
96126
#[uniffi::constructor]
97-
pub fn new(config: WalletConfig) -> Result<Self, InitFailure> {
127+
pub async fn new(config: WalletConfig) -> Result<Self, InitFailure> {
98128
let rt = Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build()?);
99129

100130
let config: OrangeWalletConfig = config.try_into()?;
101131

102-
let inner = rt.block_on(async move { OrangeWallet::new(config).await })?;
132+
let inner = RTPoller::new(OrangeWallet::new(config), Arc::clone(&rt)).await?;
103133

104-
Ok(Wallet { inner: Arc::new(inner), _rt: rt })
134+
Ok(Wallet { inner: Arc::new(inner), rt })
105135
}
106136

107137
pub fn node_id(&self) -> String {
108138
self.inner.node_id().to_string()
109139
}
110140

111141
pub async fn get_balance(&self) -> Result<Balances, WalletError> {
112-
let balance = self.inner.get_balance().await?;
142+
let balance = RTPoller::new(self.inner.get_balance(), Arc::clone(&self.rt)).await?;
113143
Ok(balance.into())
114144
}
115145

116-
pub async fn is_connected_to_lsp(&self) -> bool {
146+
pub fn is_connected_to_lsp(&self) -> bool {
117147
self.inner.is_connected_to_lsp()
118148
}
119149

120150
/// Sets whether the wallet should automatically rebalance from trusted/onchain to lightning.
121151
pub async fn set_rebalance_enabled(&self, value: bool) {
122-
self.inner.set_rebalance_enabled(value).await
152+
RTPoller::new(self.inner.set_rebalance_enabled(value), Arc::clone(&self.rt)).await
123153
}
124154

125155
/// Whether the wallet should automatically rebalance from trusted/onchain to lightning.
126156
pub async fn get_rebalance_enabled(&self) -> bool {
127-
self.inner.get_rebalance_enabled().await
157+
RTPoller::new(self.inner.get_rebalance_enabled(), Arc::clone(&self.rt)).await
128158
}
129159

130160
pub async fn list_transactions(
131161
&self,
132162
) -> Result<Vec<std::sync::Arc<crate::ffi::orange::Transaction>>, WalletError> {
133-
let transactions = self.inner.list_transactions().await?;
163+
let transactions =
164+
RTPoller::new(self.inner.list_transactions(), Arc::clone(&self.rt)).await?;
134165
Ok(transactions.into_iter().map(|tx| std::sync::Arc::new(tx.into())).collect())
135166
}
136167

@@ -144,7 +175,11 @@ impl Wallet {
144175
pub async fn parse_payment_instructions(
145176
&self, instructions: String,
146177
) -> Result<PaymentInstructions, ParseError> {
147-
let result = self.inner.parse_payment_instructions(&instructions).await?;
178+
let result = RTPoller::new(
179+
self.inner.parse_payment_instructions(&instructions),
180+
Arc::clone(&self.rt),
181+
)
182+
.await?;
148183
Ok(result.into())
149184
}
150185

@@ -159,7 +194,7 @@ impl Wallet {
159194
pub async fn pay(
160195
&self, payment_info: Arc<PaymentInfo>,
161196
) -> Result<super::PaymentId, WalletError> {
162-
let id = self.inner.pay(&payment_info.0).await?;
197+
let id = RTPoller::new(self.inner.pay(&payment_info.0), Arc::clone(&self.rt)).await?;
163198
Ok(id.into())
164199
}
165200

@@ -170,18 +205,26 @@ impl Wallet {
170205
pub async fn estimate_fee(
171206
&self, payment_info: Arc<PaymentInfo>,
172207
) -> Result<Arc<Amount>, WalletError> {
173-
let fee = self.inner.estimate_fee(&payment_info.0.instructions).await;
208+
let fee = RTPoller::new(
209+
self.inner.estimate_fee(&payment_info.0.instructions),
210+
Arc::clone(&self.rt),
211+
)
212+
.await;
174213
Ok(Arc::new(fee.into()))
175214
}
176215

177216
pub async fn stop(&self) {
178-
self.inner.stop().await
217+
RTPoller::new(self.inner.stop(), Arc::clone(&self.rt)).await
179218
}
180219

181220
pub async fn get_single_use_receive_uri(
182221
&self, amount: Option<Arc<Amount>>,
183222
) -> Result<SingleUseReceiveUri, WalletError> {
184-
let uri = self.inner.get_single_use_receive_uri(amount.map(|a| a.0)).await?;
223+
let uri = RTPoller::new(
224+
self.inner.get_single_use_receive_uri(amount.map(|a| a.0)),
225+
Arc::clone(&self.rt),
226+
)
227+
.await?;
185228
Ok(uri.into())
186229
}
187230

@@ -192,7 +235,7 @@ impl Wallet {
192235

193236
/// List our current channels
194237
pub async fn close_channels(&self) -> Result<(), WalletError> {
195-
self.inner.close_channels().await?;
238+
RTPoller::new(self.inner.close_channels(), Arc::clone(&self.rt)).await?;
196239
Ok(())
197240
}
198241

@@ -230,7 +273,7 @@ impl Wallet {
230273
/// **Caution:** Users must handle events as quickly as possible to prevent a large event backlog,
231274
/// which can increase the memory footprint of [`crate::Wallet`].
232275
pub async fn next_event_async(&self) -> Event {
233-
self.inner.next_event_async().await.into()
276+
RTPoller::new(self.inner.next_event_async(), Arc::clone(&self.rt)).await.into()
234277
}
235278

236279
/// Returns the next event in the event queue.

0 commit comments

Comments
 (0)