Skip to content

Commit 0e04482

Browse files
committed
Address review comments
1 parent 0a00b4c commit 0e04482

3 files changed

Lines changed: 76 additions & 26 deletions

File tree

crates/core/src/db/durability.rs

Lines changed: 70 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,28 @@
1-
use std::sync::{
2-
atomic::{AtomicU64, Ordering},
3-
Arc,
1+
use std::{
2+
sync::{
3+
atomic::{AtomicU64, Ordering},
4+
Arc,
5+
},
6+
time::Duration,
47
};
58

6-
use anyhow::Context as _;
9+
use log::{info, warn};
710
use spacetimedb_commitlog::payload::{
811
txdata::{Mutations, Ops},
912
Txdata,
1013
};
1114
use spacetimedb_data_structures::map::IntSet;
1215
use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData};
1316
use spacetimedb_durability::{DurableOffset, TxOffset};
17+
use spacetimedb_lib::Identity;
1418
use spacetimedb_primitives::TableId;
1519
use tokio::{
1620
runtime,
1721
sync::mpsc::{channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender},
22+
time::{timeout, Instant},
1823
};
1924

20-
use crate::db::persistence::Durability;
25+
use crate::db::{lock_file::LockFile, persistence::Durability};
2126

2227
/// A request to persist a transaction or to terminate the actor.
2328
pub struct DurabilityRequest {
@@ -93,8 +98,12 @@ impl DurabilityWorker {
9398
tx_data: tx_data.clone(),
9499
})
95100
.inspect(|()| {
96-
self.requested_tx_offset
97-
.fetch_max(tx_data.tx_offset().unwrap_or_default(), Ordering::SeqCst);
101+
// If `tx_data` has a `None` tx offset, the actor will ignore it.
102+
// Otherwise, record the offset as requested, so that
103+
// [Self::shutdown] can determine when the queue is drained.
104+
if let Some(tx_offset) = tx_data.tx_offset() {
105+
self.requested_tx_offset.fetch_max(tx_offset, Ordering::SeqCst);
106+
}
98107
})
99108
.expect(HUNG_UP);
100109
}
@@ -115,11 +124,10 @@ impl DurabilityWorker {
115124
/// If [Self::request_durability] is called after [Self::shutdown], the
116125
/// former will panic.
117126
pub async fn shutdown(&self) -> anyhow::Result<TxOffset> {
118-
self.shutdown
119-
.send(())
120-
.await
121-
.context("durability worker already closed")?;
122-
// Wait for the channel to be closed.
127+
// Request the actor to shutdown.
128+
// Ignore send errors -- in that case a shutdown is already in progress.
129+
let _ = self.shutdown.try_send(());
130+
// Wait for the request channel to be closed.
123131
self.request_tx.closed().await;
124132
// Load the latest tx offset and wait for it to become durable.
125133
let latest_tx_offset = self.requested_tx_offset.load(Ordering::SeqCst);
@@ -128,9 +136,53 @@ impl DurabilityWorker {
128136
Ok(durable_offset)
129137
}
130138

131-
/// Get a handle to the tokio runtime `self` was constructed with.
132-
pub fn runtime(&self) -> &tokio::runtime::Handle {
133-
&self.runtime
139+
/// Consume `self` and run [Self::shutdown].
140+
///
141+
/// The `lock_file` is not dropped until the shutdown is complete (either
142+
/// successfully or unsuccessfully). This is to prevent the database to be
143+
/// re-opened for writing while there is still an active background task
144+
/// writing to the commitlog.
145+
///
146+
/// The shutdown task will be scheduled onto the tokio runtime provided
147+
/// to [Self::new]. This means that the task may still be running when this
148+
/// method returns.
149+
///
150+
/// `database_identity` is used to associate log records with the database
151+
/// owning this durability worker.
152+
///
153+
/// This method is used in the `Drop` impl for [crate::db::relational_db::RelationalDB].
154+
pub(super) fn spawn_shutdown(self, database_identity: Identity, lock_file: LockFile) {
155+
let rt = self.runtime.clone();
156+
let mut shutdown = rt.spawn(async move { self.shutdown().await });
157+
rt.spawn(async move {
158+
let label = format!("database={database_identity}");
159+
let start = Instant::now();
160+
loop {
161+
// Warn every 5s if the shutdown doesn't appear to make progress.
162+
// The backing durability could still be writing to disk,
163+
// but we can't cancel it from here,
164+
// so dropping the lock file would be unsafe.
165+
match timeout(Duration::from_secs(5), &mut shutdown).await {
166+
Err(_elapsed) => {
167+
let since = start.elapsed().as_secs_f32();
168+
warn!("{label} waiting for durability worker shutdown since {since}s",);
169+
continue;
170+
}
171+
Ok(res) => {
172+
let Ok(done) = res else {
173+
warn!("{label} durability worker shutdown cancelled");
174+
break;
175+
};
176+
match done {
177+
Ok(offset) => info!("{label} durability worker shut down at tx offset: {offset}"),
178+
Err(e) => warn!("{label} error shutting down durability worker: {e:#}"),
179+
}
180+
break;
181+
}
182+
}
183+
}
184+
drop(lock_file);
185+
});
134186
}
135187
}
136188

@@ -145,10 +197,13 @@ impl DurabilityWorkerActor {
145197
async fn run(mut self) {
146198
loop {
147199
tokio::select! {
200+
// Biased towards the shutdown channel,
201+
// so that adding new requests is prevented promptly.
148202
biased;
149203

150204
Some(()) = self.shutdown.recv() => {
151205
self.request_rx.close();
206+
self.shutdown.close();
152207
},
153208

154209
req = self.request_rx.recv() => {

crates/core/src/db/relational_db.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,7 @@ impl Drop for RelationalDB {
147147
// the durability backend.
148148
if let Some(worker) = self.durability.take() {
149149
let lock_file = self.lock_file.clone();
150-
let rt = worker.runtime().clone();
151-
rt.spawn(async move {
152-
if let Err(e) = worker.shutdown().await {
153-
log::warn!("error shutting down durability worker: {e:#}");
154-
}
155-
drop(lock_file);
156-
});
150+
worker.spawn_shutdown(self.database_identity, lock_file);
157151
}
158152
}
159153
}

crates/durability/src/imp/local.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::{
1212
use anyhow::Context as _;
1313
use itertools::Itertools as _;
1414
use log::{info, trace, warn};
15-
use scopeguard::defer_on_unwind;
15+
use scopeguard::defer;
1616
use spacetimedb_commitlog::{error, payload::Txdata, Commit, Commitlog, Decoder, Encode, Transaction};
1717
use spacetimedb_paths::server::CommitLogDir;
1818
use tokio::{
@@ -114,7 +114,7 @@ impl<T: Encode + Send + Sync + 'static> Local<T> {
114114
clog: Arc::downgrade(&clog),
115115
period: opts.sync_interval,
116116
offset: durable_tx,
117-
abort: persister_task.abort_handle(),
117+
persister_task: persister_task.abort_handle(),
118118
}
119119
.run(),
120120
);
@@ -254,7 +254,7 @@ struct FlushAndSyncTask<T> {
254254
period: Duration,
255255
offset: watch::Sender<Option<TxOffset>>,
256256
/// Handle to abort the [`PersisterTask`] if fsync panics.
257-
abort: AbortHandle,
257+
persister_task: AbortHandle,
258258
}
259259

260260
impl<T: Send + Sync + 'static> FlushAndSyncTask<T> {
@@ -265,7 +265,7 @@ impl<T: Send + Sync + 'static> FlushAndSyncTask<T> {
265265
let mut interval = interval(self.period);
266266
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
267267

268-
defer_on_unwind!(self.abort.abort());
268+
defer!(self.persister_task.abort());
269269

270270
loop {
271271
interval.tick().await;
@@ -290,6 +290,7 @@ impl<T: Send + Sync + 'static> FlushAndSyncTask<T> {
290290
}
291291
Ok(Err(e)) => {
292292
flush_error("flush-and-sync", e);
293+
break;
293294
}
294295
Ok(Ok(Some(new_offset))) => {
295296
trace!("synced to offset {new_offset}");

0 commit comments

Comments
 (0)