Skip to content

Commit 8b93895

Browse files
committed
Isolate PostgreSQL persistence from the node runtime
Co-Authored-By: HAL 9000
1 parent d9fa740 commit 8b93895

1 file changed

Lines changed: 82 additions & 62 deletions

File tree

src/io/postgres_store/mod.rs

Lines changed: 82 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
//! Objects related to [`PostgresStore`] live here.
99
use std::collections::HashMap;
1010
use std::future::Future;
11-
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11+
use std::sync::atomic::{AtomicU64, Ordering};
1212
use std::sync::{Arc, Mutex};
1313

1414
use lightning::io;
@@ -22,6 +22,7 @@ use tokio_postgres::{Config, Error as PgError};
2222
use self::pool::{make_config_connection, ClientConnection, PgTlsConnector, SmallPool};
2323
use crate::io::utils::check_namespace_key_validity;
2424
use crate::logger::{log_debug, log_info, LdkLogger, Logger};
25+
use crate::runtime::StoreRuntime;
2526

2627
mod migrations;
2728
mod pool;
@@ -41,6 +42,18 @@ const PAGE_SIZE: usize = 50;
4142
// Keep this small while still allowing progress if one runtime worker blocks on sync store access.
4243
const INTERNAL_RUNTIME_WORKERS: usize = 2;
4344

45+
async fn run_on_internal_runtime<T>(
46+
runtime: Arc<StoreRuntime>, future: impl Future<Output = io::Result<T>> + Send + 'static,
47+
) -> io::Result<T>
48+
where
49+
T: Send + 'static,
50+
{
51+
let task = runtime.spawn(future);
52+
task.await.map_err(|e| {
53+
io::Error::new(io::ErrorKind::Other, format!("PostgreSQL runtime task failed: {}", e))
54+
})?
55+
}
56+
4457
fn sql_identifier(identifier: &str) -> io::Result<String> {
4558
if identifier.is_empty() || identifier.contains('\0') {
4659
return Err(io::Error::new(
@@ -101,8 +114,8 @@ pub struct PostgresStore {
101114
// operations aren't sensitive to the order of execution.
102115
next_write_version: AtomicU64,
103116

104-
// A store-internal runtime used for setup and connection driver tasks.
105-
internal_runtime: Option<tokio::runtime::Runtime>,
117+
// A store-internal runtime that drives PostgreSQL I/O independently from the node runtime.
118+
internal_runtime: Option<Arc<StoreRuntime>>,
106119
}
107120

108121
// tokio::sync::Mutex (used for the DB client) contains UnsafeCell which opts out of
@@ -145,30 +158,16 @@ impl PostgresStore {
145158
connection_string: String, db_name: Option<String>, kv_table_name: Option<String>,
146159
certificate_pem: Option<String>, logger: Option<Arc<Logger>>,
147160
) -> io::Result<Self> {
148-
let internal_runtime = tokio::runtime::Builder::new_multi_thread()
149-
.enable_all()
150-
.thread_name_fn(|| {
151-
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
152-
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
153-
format!("ldk-node-postgres-runtime-{}", id)
154-
})
155-
.worker_threads(INTERNAL_RUNTIME_WORKERS)
156-
.max_blocking_threads(INTERNAL_RUNTIME_WORKERS)
157-
.build()
158-
.map_err(|e| {
159-
io::Error::new(
160-
io::ErrorKind::Other,
161-
format!("Failed to build PostgreSQL runtime: {e}"),
162-
)
163-
})?;
161+
let internal_runtime = Arc::new(StoreRuntime::new(
162+
"ldk-node-postgres-runtime",
163+
INTERNAL_RUNTIME_WORKERS,
164+
"PostgreSQL",
165+
)?);
164166
let tls = Self::build_tls_connector(certificate_pem)?;
165-
let runtime_handle = internal_runtime.handle();
166-
let inner = tokio::task::block_in_place(|| {
167-
runtime_handle.block_on(async {
168-
PostgresStoreInner::new(connection_string, db_name, kv_table_name, tls, logger)
169-
.await
170-
})
171-
})?;
167+
let inner = run_on_internal_runtime(Arc::clone(&internal_runtime), async move {
168+
PostgresStoreInner::new(connection_string, db_name, kv_table_name, tls, logger).await
169+
})
170+
.await?;
172171
let inner = Arc::new(inner);
173172
let next_write_version = AtomicU64::new(1);
174173
Ok(Self { inner, next_write_version, internal_runtime: Some(internal_runtime) })
@@ -214,12 +213,18 @@ impl PostgresStore {
214213

215214
(inner_lock_ref, version)
216215
}
216+
217+
fn internal_runtime(&self) -> Arc<StoreRuntime> {
218+
Arc::clone(self.internal_runtime.as_ref().expect("PostgreSQL runtime must be available"))
219+
}
217220
}
218221

219222
impl Drop for PostgresStore {
220223
fn drop(&mut self) {
221224
if let Some(internal_runtime) = self.internal_runtime.take() {
222-
internal_runtime.shutdown_background();
225+
if let Ok(internal_runtime) = Arc::try_unwrap(internal_runtime) {
226+
internal_runtime.shutdown_background();
227+
}
223228
}
224229
}
225230
}
@@ -232,7 +237,13 @@ impl KVStore for PostgresStore {
232237
let secondary_namespace = secondary_namespace.to_string();
233238
let key = key.to_string();
234239
let inner = Arc::clone(&self.inner);
235-
async move { inner.read_internal(&primary_namespace, &secondary_namespace, &key).await }
240+
let runtime = self.internal_runtime();
241+
async move {
242+
run_on_internal_runtime(runtime, async move {
243+
inner.read_internal(&primary_namespace, &secondary_namespace, &key).await
244+
})
245+
.await
246+
}
236247
}
237248

238249
fn write(
@@ -244,18 +255,22 @@ impl KVStore for PostgresStore {
244255
let secondary_namespace = secondary_namespace.to_string();
245256
let key = key.to_string();
246257
let inner = Arc::clone(&self.inner);
258+
let runtime = self.internal_runtime();
247259
async move {
248-
inner
249-
.write_internal(
250-
inner_lock_ref,
251-
locking_key,
252-
version,
253-
&primary_namespace,
254-
&secondary_namespace,
255-
&key,
256-
buf,
257-
)
258-
.await
260+
run_on_internal_runtime(runtime, async move {
261+
inner
262+
.write_internal(
263+
inner_lock_ref,
264+
locking_key,
265+
version,
266+
&primary_namespace,
267+
&secondary_namespace,
268+
&key,
269+
buf,
270+
)
271+
.await
272+
})
273+
.await
259274
}
260275
}
261276

@@ -268,17 +283,21 @@ impl KVStore for PostgresStore {
268283
let secondary_namespace = secondary_namespace.to_string();
269284
let key = key.to_string();
270285
let inner = Arc::clone(&self.inner);
286+
let runtime = self.internal_runtime();
271287
async move {
272-
inner
273-
.remove_internal(
274-
inner_lock_ref,
275-
locking_key,
276-
version,
277-
&primary_namespace,
278-
&secondary_namespace,
279-
&key,
280-
)
281-
.await
288+
run_on_internal_runtime(runtime, async move {
289+
inner
290+
.remove_internal(
291+
inner_lock_ref,
292+
locking_key,
293+
version,
294+
&primary_namespace,
295+
&secondary_namespace,
296+
&key,
297+
)
298+
.await
299+
})
300+
.await
282301
}
283302
}
284303

@@ -288,16 +307,13 @@ impl KVStore for PostgresStore {
288307
let primary_namespace = primary_namespace.to_string();
289308
let secondary_namespace = secondary_namespace.to_string();
290309
let inner = Arc::clone(&self.inner);
291-
async move { inner.list_internal(&primary_namespace, &secondary_namespace).await }
292-
}
293-
}
294-
295-
impl PostgresStore {
296-
fn internal_runtime(&self) -> io::Result<&tokio::runtime::Runtime> {
297-
self.internal_runtime.as_ref().ok_or_else(|| {
298-
debug_assert!(false, "Failed to access internal PostgreSQL runtime");
299-
io::Error::new(io::ErrorKind::Other, "Failed to access internal PostgreSQL runtime")
300-
})
310+
let runtime = self.internal_runtime();
311+
async move {
312+
run_on_internal_runtime(runtime, async move {
313+
inner.list_internal(&primary_namespace, &secondary_namespace).await
314+
})
315+
.await
316+
}
301317
}
302318
}
303319

@@ -308,10 +324,14 @@ impl PaginatedKVStore for PostgresStore {
308324
let primary_namespace = primary_namespace.to_string();
309325
let secondary_namespace = secondary_namespace.to_string();
310326
let inner = Arc::clone(&self.inner);
327+
let runtime = self.internal_runtime();
311328
async move {
312-
inner
313-
.list_paginated_internal(&primary_namespace, &secondary_namespace, page_token)
314-
.await
329+
run_on_internal_runtime(runtime, async move {
330+
inner
331+
.list_paginated_internal(&primary_namespace, &secondary_namespace, page_token)
332+
.await
333+
})
334+
.await
315335
}
316336
}
317337
}

0 commit comments

Comments
 (0)