Skip to content

Commit d9fa740

Browse files
committed
Isolate VSS persistence from the node runtime
Co-Authored-By: HAL 9000
1 parent 1b9e121 commit d9fa740

1 file changed

Lines changed: 82 additions & 56 deletions

File tree

src/io/vss_store.rs

Lines changed: 82 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::fmt;
1313
use std::future::Future;
1414
#[cfg(test)]
1515
use std::panic::RefUnwindSafe;
16-
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
16+
use std::sync::atomic::{AtomicU64, Ordering};
1717
use std::sync::{Arc, Mutex};
1818
use std::time::Duration;
1919

@@ -45,6 +45,7 @@ use vss_client::util::storable_builder::{EntropySource, StorableBuilder};
4545
use crate::entropy::NodeEntropy;
4646
use crate::io::utils::check_namespace_key_validity;
4747
use crate::lnurl_auth::LNURL_AUTH_HARDENED_CHILD_INDEX;
48+
use crate::runtime::StoreRuntime;
4849

4950
type CustomRetryPolicy = FilteredRetryPolicy<
5051
JitteredRetryPolicy<
@@ -77,6 +78,18 @@ const VSS_SCHEMA_VERSION_KEY: &str = "vss_schema_version";
7778
// would hit a blocking case
7879
const INTERNAL_RUNTIME_WORKERS: usize = 2;
7980

81+
async fn run_on_internal_runtime<T>(
82+
runtime: Arc<StoreRuntime>, future: impl Future<Output = io::Result<T>> + Send + 'static,
83+
) -> io::Result<T>
84+
where
85+
T: Send + 'static,
86+
{
87+
let task = runtime.spawn(future);
88+
task.await.map_err(|e| {
89+
io::Error::new(io::ErrorKind::Other, format!("VSS runtime task failed: {}", e))
90+
})?
91+
}
92+
8093
/// A [`KVStore`] implementation that writes to and reads from a [VSS] backend.
8194
///
8295
/// [VSS]: https://github.com/lightningdevkit/vss-server/blob/main/README.md
@@ -85,13 +98,8 @@ pub struct VssStore {
8598
// Version counter to ensure that writes are applied in the correct order. It is assumed that read and list
8699
// operations aren't sensitive to the order of execution.
87100
next_version: AtomicU64,
88-
// A VSS-internal runtime we use to avoid any deadlocks we could hit when waiting on a spawned
89-
// blocking task to finish while the blocked thread had acquired the reactor. In particular,
90-
// this works around a previously-hit case where a concurrent call to
91-
// `PeerManager::process_pending_events` -> `ChannelManager::get_and_clear_pending_msg_events`
92-
// would deadlock when trying to acquire sync `Mutex` locks that are held by the thread
93-
// currently being blocked waiting on the VSS operation to finish.
94-
internal_runtime: Option<tokio::runtime::Runtime>,
101+
// A VSS-internal runtime that drives VSS I/O independently from the node runtime.
102+
internal_runtime: Option<Arc<StoreRuntime>>,
95103
}
96104

97105
impl VssStore {
@@ -100,52 +108,46 @@ impl VssStore {
100108
header_provider: Arc<dyn VssHeaderProvider>,
101109
) -> io::Result<Self> {
102110
let next_version = AtomicU64::new(1);
103-
let internal_runtime = tokio::runtime::Builder::new_multi_thread()
104-
.enable_all()
105-
.thread_name_fn(|| {
106-
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
107-
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
108-
format!("ldk-node-vss-runtime-{}", id)
109-
})
110-
.worker_threads(INTERNAL_RUNTIME_WORKERS)
111-
.max_blocking_threads(INTERNAL_RUNTIME_WORKERS)
112-
.build()
113-
.map_err(|e| {
114-
io::Error::new(io::ErrorKind::Other, format!("Failed to build VSS runtime: {}", e))
115-
})?;
111+
let internal_runtime =
112+
Arc::new(StoreRuntime::new("ldk-node-vss-runtime", INTERNAL_RUNTIME_WORKERS, "VSS")?);
116113

117114
let (data_encryption_key, obfuscation_master_key) =
118115
derive_data_encryption_and_obfuscation_keys(&vss_seed);
119116
let key_obfuscator = KeyObfuscator::new(obfuscation_master_key);
117+
let setup_key_obfuscator = KeyObfuscator::new(obfuscation_master_key);
120118

121119
let mut entropy_seed = [0u8; 32];
122120
getrandom::fill(&mut entropy_seed).expect("Failed to generate random bytes");
123121
let entropy_source = RandomBytes::new(entropy_seed);
122+
let setup_entropy_source = RandomBytes::new(entropy_seed);
124123

125-
let sync_retry_policy = retry_policy();
126-
let blocking_client = VssClient::new_with_headers(
124+
let setup_retry_policy = retry_policy();
125+
let setup_client = VssClient::new_with_headers(
127126
base_url.clone(),
128-
sync_retry_policy,
129-
header_provider.clone(),
127+
setup_retry_policy,
128+
Arc::clone(&header_provider),
130129
);
131130

132-
let runtime_handle = internal_runtime.handle();
133-
let schema_version = tokio::task::block_in_place(|| {
131+
let async_retry_policy = retry_policy();
132+
let async_client =
133+
VssClient::new_with_headers(base_url, async_retry_policy, header_provider);
134+
135+
let setup_store_id = store_id.clone();
136+
let runtime_handle = internal_runtime.handle().clone();
137+
let schema_version = std::thread::spawn(move || {
134138
runtime_handle.block_on(async {
135139
determine_and_write_schema_version(
136-
&blocking_client,
137-
&store_id,
140+
&setup_client,
141+
&setup_store_id,
138142
data_encryption_key,
139-
&key_obfuscator,
140-
&entropy_source,
143+
&setup_key_obfuscator,
144+
&setup_entropy_source,
141145
)
142146
.await
143147
})
144-
})?;
145-
146-
let async_retry_policy = retry_policy();
147-
let async_client =
148-
VssClient::new_with_headers(base_url, async_retry_policy, header_provider);
148+
})
149+
.join()
150+
.map_err(|_| io::Error::new(io::ErrorKind::Other, "VSS schema setup task panicked"))??;
149151

150152
let inner = Arc::new(VssStoreInner::new(
151153
schema_version,
@@ -158,6 +160,10 @@ impl VssStore {
158160

159161
Ok(Self { inner, next_version, internal_runtime: Some(internal_runtime) })
160162
}
163+
164+
fn internal_runtime(&self) -> Arc<StoreRuntime> {
165+
Arc::clone(self.internal_runtime.as_ref().expect("VSS runtime must be available"))
166+
}
161167
/// Returns a [`VssStoreBuilder`] allowing to build a [`VssStore`].
162168
pub fn builder(
163169
node_entropy: NodeEntropy, vss_url: String, store_id: String, network: Network,
@@ -200,10 +206,14 @@ impl KVStore for VssStore {
200206
let secondary_namespace = secondary_namespace.to_string();
201207
let key = key.to_string();
202208
let inner = Arc::clone(&self.inner);
209+
let runtime = self.internal_runtime();
203210
async move {
204-
inner
205-
.read_internal(&inner.async_client, primary_namespace, secondary_namespace, key)
206-
.await
211+
run_on_internal_runtime(runtime, async move {
212+
inner
213+
.read_internal(&inner.async_client, primary_namespace, secondary_namespace, key)
214+
.await
215+
})
216+
.await
207217
}
208218
}
209219
fn write(
@@ -215,19 +225,23 @@ impl KVStore for VssStore {
215225
let secondary_namespace = secondary_namespace.to_string();
216226
let key = key.to_string();
217227
let inner = Arc::clone(&self.inner);
228+
let runtime = self.internal_runtime();
218229
async move {
219-
inner
220-
.write_internal(
221-
&inner.async_client,
222-
inner_lock_ref,
223-
locking_key,
224-
version,
225-
primary_namespace,
226-
secondary_namespace,
227-
key,
228-
buf,
229-
)
230-
.await
230+
run_on_internal_runtime(runtime, async move {
231+
inner
232+
.write_internal(
233+
&inner.async_client,
234+
inner_lock_ref,
235+
locking_key,
236+
version,
237+
primary_namespace,
238+
secondary_namespace,
239+
key,
240+
buf,
241+
)
242+
.await
243+
})
244+
.await
231245
}
232246
}
233247
fn remove(
@@ -239,6 +253,7 @@ impl KVStore for VssStore {
239253
let secondary_namespace = secondary_namespace.to_string();
240254
let key = key.to_string();
241255
let inner = Arc::clone(&self.inner);
256+
let runtime = self.internal_runtime();
242257
let fut = async move {
243258
inner
244259
.remove_internal(
@@ -254,10 +269,12 @@ impl KVStore for VssStore {
254269
};
255270
async move {
256271
if lazy {
257-
tokio::task::spawn(async move { fut.await });
272+
runtime.spawn(async move {
273+
let _ = fut.await;
274+
});
258275
Ok(())
259276
} else {
260-
fut.await
277+
run_on_internal_runtime(runtime, fut).await
261278
}
262279
}
263280
}
@@ -267,16 +284,25 @@ impl KVStore for VssStore {
267284
let primary_namespace = primary_namespace.to_string();
268285
let secondary_namespace = secondary_namespace.to_string();
269286
let inner = Arc::clone(&self.inner);
287+
let runtime = self.internal_runtime();
270288
async move {
271-
inner.list_internal(&inner.async_client, primary_namespace, secondary_namespace).await
289+
run_on_internal_runtime(runtime, async move {
290+
inner
291+
.list_internal(&inner.async_client, primary_namespace, secondary_namespace)
292+
.await
293+
})
294+
.await
272295
}
273296
}
274297
}
275298

276299
impl Drop for VssStore {
277300
fn drop(&mut self) {
278-
let internal_runtime = self.internal_runtime.take();
279-
tokio::task::block_in_place(move || drop(internal_runtime));
301+
if let Some(runtime) = self.internal_runtime.take() {
302+
if let Ok(runtime) = Arc::try_unwrap(runtime) {
303+
runtime.shutdown_background();
304+
}
305+
}
280306
}
281307
}
282308

0 commit comments

Comments
 (0)