Skip to content

Commit 1b9e121

Browse files
committed
Add shared store runtime wrapper
Add a crate-local runtime wrapper for store backends that need to keep their I/O isolated while shutting down safely from async contexts. Co-Authored-By: HAL 9000
1 parent 62d2e9c commit 1b9e121

1 file changed

Lines changed: 56 additions & 0 deletions

File tree

src/runtime.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
// accordance with one or both of these licenses.
77

88
use std::future::Future;
9+
use std::io;
10+
use std::sync::atomic::{AtomicUsize, Ordering};
911
use std::sync::{Arc, Mutex};
1012
use std::time::Duration;
1113

@@ -223,6 +225,60 @@ enum RuntimeMode {
223225
Handle(tokio::runtime::Handle),
224226
}
225227

228+
pub(crate) struct StoreRuntime {
229+
runtime: Option<tokio::runtime::Runtime>,
230+
}
231+
232+
impl StoreRuntime {
233+
pub(crate) fn new(
234+
thread_name_prefix: &'static str, worker_threads: usize, runtime_name: &'static str,
235+
) -> io::Result<Self> {
236+
let runtime = tokio::runtime::Builder::new_multi_thread()
237+
.enable_all()
238+
.thread_name_fn(move || {
239+
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
240+
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
241+
format!("{}-{}", thread_name_prefix, id)
242+
})
243+
.worker_threads(worker_threads)
244+
.max_blocking_threads(worker_threads)
245+
.build()
246+
.map_err(|e| {
247+
io::Error::new(
248+
io::ErrorKind::Other,
249+
format!("Failed to build {runtime_name} runtime: {e}"),
250+
)
251+
})?;
252+
Ok(Self { runtime: Some(runtime) })
253+
}
254+
255+
pub(crate) fn handle(&self) -> &tokio::runtime::Handle {
256+
self.runtime.as_ref().expect("store runtime must be available").handle()
257+
}
258+
259+
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
260+
where
261+
F: Future + Send + 'static,
262+
F::Output: Send + 'static,
263+
{
264+
self.handle().spawn(future)
265+
}
266+
267+
pub(crate) fn shutdown_background(mut self) {
268+
if let Some(runtime) = self.runtime.take() {
269+
runtime.shutdown_background();
270+
}
271+
}
272+
}
273+
274+
impl Drop for StoreRuntime {
275+
fn drop(&mut self) {
276+
if let Some(runtime) = self.runtime.take() {
277+
runtime.shutdown_background();
278+
}
279+
}
280+
}
281+
226282
pub(crate) struct RuntimeSpawner {
227283
runtime: Arc<Runtime>,
228284
}

0 commit comments

Comments
 (0)