|
8 | 8 | //! Objects related to [`PostgresStore`] live here. |
9 | 9 | use std::collections::HashMap; |
10 | 10 | use std::future::Future; |
11 | | -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; |
| 11 | +use std::sync::atomic::{AtomicU64, Ordering}; |
12 | 12 | use std::sync::{Arc, Mutex}; |
13 | 13 |
|
14 | 14 | use lightning::io; |
@@ -38,9 +38,6 @@ const SCHEMA_VERSION: u16 = 1; |
38 | 38 | // The number of entries returned per page in paginated list operations. |
39 | 39 | const PAGE_SIZE: usize = 50; |
40 | 40 |
|
41 | | -// Keep this small while still allowing progress if one runtime worker blocks on sync store access. |
42 | | -const INTERNAL_RUNTIME_WORKERS: usize = 2; |
43 | | - |
44 | 41 | fn sql_identifier(identifier: &str) -> io::Result<String> { |
45 | 42 | if identifier.is_empty() || identifier.contains('\0') { |
46 | 43 | return Err(io::Error::new( |
@@ -91,18 +88,13 @@ macro_rules! query_with_retry { |
91 | 88 |
|
92 | 89 | /// A [`KVStore`] implementation that writes to and reads from a [PostgreSQL] database. |
93 | 90 | /// |
94 | | -/// Maintains an internal runtime for the underlying tokio-postgres connection drivers. |
95 | | -/// |
96 | 91 | /// [PostgreSQL]: https://www.postgresql.org |
97 | 92 | pub struct PostgresStore { |
98 | 93 | inner: Arc<PostgresStoreInner>, |
99 | 94 |
|
100 | 95 | // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list |
101 | 96 | // operations aren't sensitive to the order of execution. |
102 | 97 | next_write_version: AtomicU64, |
103 | | - |
104 | | - // A store-internal runtime used for setup and connection driver tasks. |
105 | | - internal_runtime: Option<tokio::runtime::Runtime>, |
106 | 98 | } |
107 | 99 |
|
108 | 100 | // tokio::sync::Mutex (used for the DB client) contains UnsafeCell which opts out of |
@@ -145,33 +137,12 @@ impl PostgresStore { |
145 | 137 | connection_string: String, db_name: Option<String>, kv_table_name: Option<String>, |
146 | 138 | certificate_pem: Option<String>, logger: Option<Arc<Logger>>, |
147 | 139 | ) -> io::Result<Self> { |
148 | | - let internal_runtime = tokio::runtime::Builder::new_multi_thread() |
149 | | - .enable_all() |
150 | | - .thread_name_fn(|| { |
151 | | - static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); |
152 | | - let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); |
153 | | - format!("ldk-node-postgres-runtime-{}", id) |
154 | | - }) |
155 | | - .worker_threads(INTERNAL_RUNTIME_WORKERS) |
156 | | - .max_blocking_threads(INTERNAL_RUNTIME_WORKERS) |
157 | | - .build() |
158 | | - .map_err(|e| { |
159 | | - io::Error::new( |
160 | | - io::ErrorKind::Other, |
161 | | - format!("Failed to build PostgreSQL runtime: {e}"), |
162 | | - ) |
163 | | - })?; |
164 | 140 | let tls = Self::build_tls_connector(certificate_pem)?; |
165 | | - let runtime_handle = internal_runtime.handle(); |
166 | | - let inner = tokio::task::block_in_place(|| { |
167 | | - runtime_handle.block_on(async { |
168 | | - PostgresStoreInner::new(connection_string, db_name, kv_table_name, tls, logger) |
169 | | - .await |
170 | | - }) |
171 | | - })?; |
| 141 | + let inner = |
| 142 | + PostgresStoreInner::new(connection_string, db_name, kv_table_name, tls, logger).await?; |
172 | 143 | let inner = Arc::new(inner); |
173 | 144 | let next_write_version = AtomicU64::new(1); |
174 | | - Ok(Self { inner, next_write_version, internal_runtime: Some(internal_runtime) }) |
| 145 | + Ok(Self { inner, next_write_version }) |
175 | 146 | } |
176 | 147 |
|
177 | 148 | fn build_tls_connector(certificate_pem: Option<String>) -> io::Result<PgTlsConnector> { |
@@ -216,14 +187,6 @@ impl PostgresStore { |
216 | 187 | } |
217 | 188 | } |
218 | 189 |
|
219 | | -impl Drop for PostgresStore { |
220 | | - fn drop(&mut self) { |
221 | | - if let Some(internal_runtime) = self.internal_runtime.take() { |
222 | | - internal_runtime.shutdown_background(); |
223 | | - } |
224 | | - } |
225 | | -} |
226 | | - |
227 | 190 | impl KVStore for PostgresStore { |
228 | 191 | fn read( |
229 | 192 | &self, primary_namespace: &str, secondary_namespace: &str, key: &str, |
@@ -292,15 +255,6 @@ impl KVStore for PostgresStore { |
292 | 255 | } |
293 | 256 | } |
294 | 257 |
|
295 | | -impl PostgresStore { |
296 | | - fn internal_runtime(&self) -> io::Result<&tokio::runtime::Runtime> { |
297 | | - self.internal_runtime.as_ref().ok_or_else(|| { |
298 | | - debug_assert!(false, "Failed to access internal PostgreSQL runtime"); |
299 | | - io::Error::new(io::ErrorKind::Other, "Failed to access internal PostgreSQL runtime") |
300 | | - }) |
301 | | - } |
302 | | -} |
303 | | - |
304 | 258 | impl PaginatedKVStore for PostgresStore { |
305 | 259 | fn list_paginated( |
306 | 260 | &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>, |
|
0 commit comments