Skip to content

Commit e2b4113

Browse files
kimCentril
andauthored
Async shutdown for database / durability (#3880)
Controlled shutdown of a database should drain the outstanding transactions queue(s) and flush them to the durability layer. With the introduction of another queueing layer in #3868, it became harder to observe when or if this process is completed. This patch thus introduces an explicit (async) shutdown method for `RelationalDB` and below, which will wait until all submitted transactions are either reported durable, or an error occurs in the durability layer. `RelationalDB` is made `!Clone`, such that shutdown can be initiated in the `Drop` impl. Note that this requires access to a tokio runtime, which we thread through via the `Persistence` services in order to allow control over which of the various runtimes is being used for durability-related tasks. Also moves `RelationalDB::open` to a blocking thread when a persistence-enabled database is constructed by the `HostController` -- this process performs heavy I/O and can take a substantial amount of time, during which we don't want to block a worker thread. # API and ABI breaking changes None # Expected complexity level and risk 3 # Testing - [ ] some testing added - [ ] existing tests still pass - [ ] `impl Drop for RelationalDB` difficult to test, extra eyeballs needed --------- Co-authored-by: Mazdak Farrokhzad <twingoow@gmail.com>
1 parent 70628fb commit e2b4113

19 files changed

Lines changed: 591 additions & 234 deletions

File tree

crates/core/src/client/client_connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ impl DurableOffsetSupply for watch::Receiver<ModuleHost> {
134134
}
135135
}
136136

137-
impl DurableOffsetSupply for RelationalDB {
137+
impl DurableOffsetSupply for Arc<RelationalDB> {
138138
fn durable_offset(&mut self) -> Result<Option<DurableOffset>, NoSuchModule> {
139139
Ok(self.durable_tx_offset())
140140
}

crates/core/src/db/durability.rs

Lines changed: 369 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,369 @@
1+
use std::{
2+
sync::{
3+
atomic::{AtomicU64, Ordering},
4+
Arc,
5+
},
6+
time::Duration,
7+
};
8+
9+
use log::{error, info, warn};
10+
use spacetimedb_commitlog::payload::{
11+
txdata::{Mutations, Ops},
12+
Txdata,
13+
};
14+
use spacetimedb_data_structures::map::IntSet;
15+
use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData};
16+
use spacetimedb_durability::{DurableOffset, TxOffset};
17+
use spacetimedb_lib::Identity;
18+
use spacetimedb_primitives::TableId;
19+
use tokio::{
20+
runtime,
21+
sync::mpsc::{channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender},
22+
time::{timeout, Instant},
23+
};
24+
25+
use crate::db::{lock_file::LockFile, persistence::Durability};
26+
27+
/// A request to persist a transaction or to terminate the actor.
28+
pub struct DurabilityRequest {
29+
reducer_context: Option<ReducerContext>,
30+
tx_data: Arc<TxData>,
31+
}
32+
33+
/// Represents a handle to a background task that persists transactions
34+
/// according to the [`Durability`] policy provided.
35+
///
36+
/// This exists to avoid holding a transaction lock while
37+
/// preparing the [TxData] for processing by the [Durability] layer.
38+
pub struct DurabilityWorker {
39+
request_tx: UnboundedSender<DurabilityRequest>,
40+
requested_tx_offset: AtomicU64,
41+
shutdown: Sender<()>,
42+
durability: Arc<Durability>,
43+
runtime: runtime::Handle,
44+
}
45+
46+
/// Those who run seem to have all the fun... 🎶
47+
const HUNG_UP: &str = "durability actor hung up / panicked";
48+
49+
impl DurabilityWorker {
50+
/// Create a new [`DurabilityWorker`] using the given `durability` policy.
51+
///
52+
/// Background tasks will be spawned onto to provided tokio `runtime`.
53+
pub fn new(durability: Arc<Durability>, runtime: runtime::Handle) -> Self {
54+
let (request_tx, request_rx) = unbounded_channel();
55+
let (shutdown_tx, shutdown_rx) = channel(1);
56+
57+
let actor = DurabilityWorkerActor {
58+
request_rx,
59+
shutdown: shutdown_rx,
60+
durability: durability.clone(),
61+
};
62+
let _enter = runtime.enter();
63+
tokio::spawn(actor.run());
64+
65+
Self {
66+
request_tx,
67+
requested_tx_offset: AtomicU64::new(0),
68+
shutdown: shutdown_tx,
69+
durability,
70+
runtime,
71+
}
72+
}
73+
74+
/// Request that a transaction be made durable.
75+
/// That is, if `(tx_data, ctx)` should be appended to the commitlog, do so.
76+
///
77+
/// Note that by this stage
78+
/// [`spacetimedb_datastore::locking_tx_datastore::committed_state::tx_consumes_offset`]
79+
/// has already decided based on the reducer and operations whether the transaction should be appended;
80+
/// this method is responsible only for reading its decision out of the `tx_data`
81+
/// and calling `durability.append_tx`.
82+
///
83+
/// This method does not block,
84+
/// and sends the work to an actor that collects data and calls `durability.append_tx`.
85+
///
86+
/// # Panics
87+
///
88+
/// Panics if the durability worker has already closed the receive end of
89+
/// its queue. This may happen if
90+
///
91+
/// - the backing [Durability] has panicked, or
92+
/// - [Self::shutdown] was called
93+
///
94+
pub fn request_durability(&self, reducer_context: Option<ReducerContext>, tx_data: &Arc<TxData>) {
95+
self.request_tx
96+
.send(DurabilityRequest {
97+
reducer_context,
98+
tx_data: tx_data.clone(),
99+
})
100+
.inspect(|()| {
101+
// If `tx_data` has a `None` tx offset, the actor will ignore it.
102+
// Otherwise, record the offset as requested, so that
103+
// [Self::shutdown] can determine when the queue is drained.
104+
if let Some(tx_offset) = tx_data.tx_offset() {
105+
self.requested_tx_offset.fetch_max(tx_offset, Ordering::SeqCst);
106+
}
107+
})
108+
.expect(HUNG_UP);
109+
}
110+
111+
/// Get the [`DurableOffset`] of this database.
112+
pub fn durable_tx_offset(&self) -> DurableOffset {
113+
self.durability.durable_tx_offset()
114+
}
115+
116+
/// Shut down the worker without dropping it,
117+
/// flushing outstanding transaction.
118+
///
119+
/// Closes the internal channel, then waits for the [DurableOffset] to
120+
/// report the offset of the most recently enqueued transaction as durable.
121+
///
122+
/// # Panics
123+
///
124+
/// If [Self::request_durability] is called after [Self::shutdown], the
125+
/// former will panic.
126+
pub async fn shutdown(&self) -> anyhow::Result<TxOffset> {
127+
// Request the actor to shutdown.
128+
// Ignore send errors -- in that case a shutdown is already in progress.
129+
let _ = self.shutdown.try_send(());
130+
// Wait for the request channel to be closed.
131+
self.request_tx.closed().await;
132+
// Load the latest tx offset and wait for it to become durable.
133+
let latest_tx_offset = self.requested_tx_offset.load(Ordering::SeqCst);
134+
let durable_offset = self.durable_tx_offset().wait_for(latest_tx_offset).await?;
135+
136+
Ok(durable_offset)
137+
}
138+
139+
/// Consume `self` and run [Self::shutdown].
140+
///
141+
/// The `lock_file` is not dropped until the shutdown is complete (either
142+
/// successfully or unsuccessfully). This is to prevent the database to be
143+
/// re-opened for writing while there is still an active background task
144+
/// writing to the commitlog.
145+
///
146+
/// The shutdown task will be spawned onto the tokio runtime provided to
147+
/// [Self::new]. This means that the task may still be running when this
148+
/// method returns.
149+
///
150+
/// `database_identity` is used to associate log records with the database
151+
/// owning this durability worker.
152+
///
153+
/// This method is used in the `Drop` impl for [crate::db::relational_db::RelationalDB].
154+
pub(super) fn spawn_shutdown(self, database_identity: Identity, lock_file: LockFile) {
155+
let rt = self.runtime.clone();
156+
let mut shutdown = rt.spawn(async move { self.shutdown().await });
157+
rt.spawn(async move {
158+
let label = format!("[{database_identity}]");
159+
let start = Instant::now();
160+
loop {
161+
// Warn every 5s if the shutdown doesn't appear to make progress.
162+
// The backing durability could still be writing to disk,
163+
// but we can't cancel it from here,
164+
// so dropping the lock file would be unsafe.
165+
match timeout(Duration::from_secs(5), &mut shutdown).await {
166+
Err(_elapsed) => {
167+
let since = start.elapsed().as_secs_f32();
168+
error!("{label} waiting for durability worker shutdown since {since}s",);
169+
continue;
170+
}
171+
Ok(res) => {
172+
let Ok(done) = res else {
173+
warn!("{label} durability worker shutdown cancelled");
174+
break;
175+
};
176+
match done {
177+
Ok(offset) => info!("{label} durability worker shut down at tx offset: {offset}"),
178+
Err(e) => warn!("{label} error shutting down durability worker: {e:#}"),
179+
}
180+
break;
181+
}
182+
}
183+
}
184+
drop(lock_file);
185+
});
186+
}
187+
}
188+
189+
pub struct DurabilityWorkerActor {
190+
request_rx: UnboundedReceiver<DurabilityRequest>,
191+
shutdown: Receiver<()>,
192+
durability: Arc<Durability>,
193+
}
194+
195+
impl DurabilityWorkerActor {
196+
/// Processes requests to do durability.
197+
async fn run(mut self) {
198+
loop {
199+
tokio::select! {
200+
// Biased towards the shutdown channel,
201+
// so that adding new requests is prevented promptly.
202+
biased;
203+
204+
Some(()) = self.shutdown.recv() => {
205+
self.request_rx.close();
206+
self.shutdown.close();
207+
},
208+
209+
req = self.request_rx.recv() => {
210+
let Some(DurabilityRequest { reducer_context, tx_data }) = req else {
211+
break;
212+
};
213+
Self::do_durability(&*self.durability, reducer_context, &tx_data);
214+
}
215+
}
216+
}
217+
}
218+
219+
pub fn do_durability(durability: &Durability, reducer_context: Option<ReducerContext>, tx_data: &TxData) {
220+
if tx_data.tx_offset().is_none() {
221+
let name = reducer_context.as_ref().map(|rcx| &*rcx.name);
222+
debug_assert!(
223+
!tx_data.has_rows_or_connect_disconnect(name),
224+
"tx_data has no rows but has connect/disconnect: `{name:?}`"
225+
);
226+
return;
227+
}
228+
229+
let is_persistent_table = |table_id: &TableId| -> bool { !tx_data.is_ephemeral_table(table_id) };
230+
231+
let inserts: Box<_> = tx_data
232+
.inserts()
233+
// Skip ephemeral tables
234+
.filter(|(table_id, _)| is_persistent_table(table_id))
235+
.map(|(table_id, rowdata)| Ops {
236+
table_id: *table_id,
237+
rowdata: rowdata.clone(),
238+
})
239+
.collect();
240+
241+
let truncates: IntSet<TableId> = tx_data.truncates().collect();
242+
243+
let deletes: Box<_> = tx_data
244+
.deletes()
245+
.filter(|(table_id, _)| is_persistent_table(table_id))
246+
.map(|(table_id, rowdata)| Ops {
247+
table_id: *table_id,
248+
rowdata: rowdata.clone(),
249+
})
250+
// filter out deletes for tables that are truncated in the same transaction.
251+
.filter(|ops| !truncates.contains(&ops.table_id))
252+
.collect();
253+
254+
let truncates: Box<_> = truncates.into_iter().filter(is_persistent_table).collect();
255+
256+
let inputs = reducer_context.map(|rcx| rcx.into());
257+
258+
debug_assert!(
259+
!(inserts.is_empty() && truncates.is_empty() && deletes.is_empty() && inputs.is_none()),
260+
"empty transaction"
261+
);
262+
263+
let txdata = Txdata {
264+
inputs,
265+
outputs: None,
266+
mutations: Some(Mutations {
267+
inserts,
268+
deletes,
269+
truncates,
270+
}),
271+
};
272+
273+
// TODO: Should measure queuing time + actual write
274+
// This does not block, as per trait docs.
275+
durability.append_tx(txdata);
276+
}
277+
}
278+
279+
#[cfg(test)]
280+
mod tests {
281+
use std::{pin::pin, task::Poll};
282+
283+
use pretty_assertions::assert_matches;
284+
use spacetimedb_sats::product;
285+
use tokio::sync::watch;
286+
287+
use super::*;
288+
use crate::db::relational_db::Txdata;
289+
290+
#[derive(Default)]
291+
struct CountingDurability {
292+
appended: watch::Sender<Option<TxOffset>>,
293+
durable: watch::Sender<Option<TxOffset>>,
294+
}
295+
296+
impl CountingDurability {
297+
async fn mark_durable(&self, offset: TxOffset) {
298+
self.appended
299+
.subscribe()
300+
.wait_for(|x| x.is_some_and(|appended_offset| appended_offset >= offset))
301+
.await
302+
.unwrap();
303+
self.durable.send_modify(|durable_offset| {
304+
durable_offset.replace(offset);
305+
});
306+
}
307+
}
308+
309+
impl spacetimedb_durability::Durability for CountingDurability {
310+
type TxData = Txdata;
311+
312+
fn append_tx(&self, _tx: Self::TxData) {
313+
self.appended.send_modify(|offset| {
314+
*offset = offset.map(|x| x + 1).or(Some(0));
315+
});
316+
}
317+
318+
fn durable_tx_offset(&self) -> DurableOffset {
319+
self.durable.subscribe().into()
320+
}
321+
}
322+
323+
#[tokio::test]
324+
async fn shutdown_waits_until_durable() {
325+
let durability = Arc::new(CountingDurability::default());
326+
let worker = DurabilityWorker::new(durability.clone(), runtime::Handle::current());
327+
328+
for i in 0..=10 {
329+
let mut txdata = TxData::default();
330+
txdata.set_tx_offset(i);
331+
// Ensure the transaction is non-empty.
332+
txdata.set_inserts_for_table(4000.into(), "foo", [product![42u8]].into());
333+
334+
worker.request_durability(None, &Arc::new(txdata));
335+
}
336+
assert_eq!(
337+
10,
338+
worker.requested_tx_offset.load(Ordering::Relaxed),
339+
"worker should have requested up to tx offset 10"
340+
);
341+
342+
let shutdown = worker.shutdown();
343+
let mut shutdown_fut = pin!(shutdown);
344+
assert_matches!(
345+
futures::poll!(&mut shutdown_fut),
346+
Poll::Pending,
347+
"shutdown should be pending because requested > durable"
348+
);
349+
350+
durability.mark_durable(5).await;
351+
assert_matches!(
352+
futures::poll!(&mut shutdown_fut),
353+
Poll::Pending,
354+
"shutdown should be pending because requested > durable"
355+
);
356+
357+
durability.mark_durable(10).await;
358+
assert_matches!(
359+
futures::poll!(&mut shutdown_fut),
360+
Poll::Ready(Ok(10)),
361+
"shutdown returns, reporting durable offset at 10"
362+
);
363+
assert_eq!(
364+
Some(10),
365+
*durability.appended.borrow(),
366+
"durability should have appended up to tx offset 10"
367+
);
368+
}
369+
}

0 commit comments

Comments
 (0)