-
Notifications
You must be signed in to change notification settings - Fork 1k
Expand file tree
/
Copy pathlocal.rs
More file actions
418 lines (364 loc) · 13.5 KB
/
local.rs
File metadata and controls
418 lines (364 loc) · 13.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
use std::{
io,
num::NonZeroUsize,
path::PathBuf,
sync::{
atomic::{AtomicU64, Ordering::Relaxed},
Arc,
},
};
use futures::{FutureExt as _, TryFutureExt as _};
use itertools::Itertools as _;
use log::{info, trace, warn};
use scopeguard::ScopeGuard;
use spacetimedb_commitlog::{error, payload::Txdata, Commit, Commitlog, Decoder, Encode, Transaction};
use spacetimedb_fs_utils::lockfile::advisory::{LockError, LockedFile};
use spacetimedb_paths::server::ReplicaDir;
use thiserror::Error;
use tokio::{
sync::{futures::OwnedNotified, mpsc, oneshot, watch, Notify},
task::{spawn_blocking, AbortHandle},
};
use tracing::{instrument, Span};
use crate::{Close, Durability, DurableOffset, History, TxOffset};
pub use spacetimedb_commitlog::repo::{OnNewSegmentFn, SizeOnDisk};
/// [`Local`] configuration.
#[derive(Clone, Copy, Debug)]
pub struct Options {
/// The number of elements to reserve for batching transactions.
///
/// This puts an upper bound on the buffer capacity, while not preventing
/// reallocations when the number of queued transactions exceeds it.
///
/// In other words, the durability actor will attempt to receive all
/// transactions that are currently in the queue, but shrink the buffer to
/// `batch_capacity` if it had to make additional space during a burst.
///
/// The internal queue of [Local] is bounded to `2 * batch_capacity`.
///
/// Default: 4096
pub batch_capacity: NonZeroUsize,
/// [`Commitlog`] configuration.
pub commitlog: spacetimedb_commitlog::Options,
}
impl Options {
pub const DEFAULT_BATCH_CAPACITY: NonZeroUsize = NonZeroUsize::new(4096).unwrap();
}
impl Default for Options {
fn default() -> Self {
Self {
batch_capacity: Self::DEFAULT_BATCH_CAPACITY,
commitlog: Default::default(),
}
}
}
#[derive(Debug, Error)]
pub enum OpenError {
#[error("commitlog directory is locked")]
Lock(#[from] LockError),
#[error("failed to open commitlog")]
Commitlog(#[from] io::Error),
}
type ShutdownReply = oneshot::Sender<OwnedNotified>;
/// [`Durability`] implementation backed by a [`Commitlog`] on local storage.
///
/// The commitlog is constrained to store the canonical [`Txdata`] payload,
/// where the generic parameter `T` is the type of the row data stored in
/// the mutations section.
///
/// `T` is left generic in order to allow bypassing the `ProductValue`
/// intermediate representation in the future.
///
/// Note, however, that instantiating `T` to a different type may require to
/// change the log format version!
pub struct Local<T> {
/// The [`Commitlog`] this [`Durability`] and [`History`] impl wraps.
clog: Arc<Commitlog<Txdata<T>>>,
/// The durable transaction offset, as reported by the background
/// [`FlushAndSyncTask`].
durable_offset: watch::Receiver<Option<TxOffset>>,
/// Backlog of transactions to be written to disk by the background
/// [`PersisterTask`].
///
/// The queue is bounded to `4 * Option::batch_capacity`.
queue: mpsc::Sender<Transaction<Txdata<T>>>,
/// How many transactions are sitting in the `queue`.
///
/// This is mainly for observability purposes, and can thus be updated with
/// relaxed memory ordering.
queue_depth: Arc<AtomicU64>,
/// Channel to request the actor to exit.
shutdown: mpsc::Sender<ShutdownReply>,
/// [AbortHandle] to force cancellation of the [Actor].
abort: AbortHandle,
}
impl<T: Encode + Send + Sync + 'static> Local<T> {
/// Create a [`Local`] instance at the `replica_dir`.
///
/// `replica_dir` must already exist.
///
/// Background tasks are spawned onto the provided tokio runtime.
///
/// We will send a message down the `on_new_segment` channel whenever we begin a new commitlog segment.
/// This is used to capture a snapshot each new segment.
pub fn open(
replica_dir: ReplicaDir,
rt: tokio::runtime::Handle,
opts: Options,
on_new_segment: Option<Arc<OnNewSegmentFn>>,
) -> Result<Self, OpenError> {
info!("open local durability");
// We could just place a lock on the commitlog directory,
// yet for backwards-compatibility, we keep using the `db.lock` file.
let lock = Lock::create(replica_dir.0.join("db.lock"))?;
let clog = Arc::new(Commitlog::open(
replica_dir.commit_log(),
opts.commitlog,
on_new_segment,
)?);
let (queue, txdata_rx) = mpsc::channel(4 * opts.batch_capacity.get());
let queue_depth = Arc::new(AtomicU64::new(0));
let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset());
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let abort = rt
.spawn(
Actor {
clog: clog.clone(),
durable_offset: durable_tx,
queue_depth: queue_depth.clone(),
batch_capacity: opts.batch_capacity,
lock,
}
.run(txdata_rx, shutdown_rx),
)
.abort_handle();
Ok(Self {
clog,
durable_offset: durable_rx,
queue,
shutdown: shutdown_tx,
queue_depth,
abort,
})
}
/// Obtain a read-only copy of the durable state that implements [History].
pub fn as_history(&self) -> impl History<TxData = Txdata<T>> + use<T> {
self.clog.clone()
}
}
impl<T: Send + Sync + 'static> Local<T> {
/// Inspect how many transactions added via [`Self::append_tx`] are pending
/// to be applied to the underlying [`Commitlog`].
pub fn queue_depth(&self) -> u64 {
self.queue_depth.load(Relaxed)
}
/// Obtain an iterator over the [`Commit`]s in the underlying log.
pub fn commits_from(&self, offset: TxOffset) -> impl Iterator<Item = Result<Commit, error::Traversal>> + use<T> {
self.clog.commits_from(offset).map_ok(Commit::from)
}
/// Get a list of segment offsets, sorted in ascending order.
pub fn existing_segment_offsets(&self) -> io::Result<Vec<TxOffset>> {
self.clog.existing_segment_offsets()
}
/// Compress the segments at the offsets provided, marking them as immutable.
pub fn compress_segments(&self, offsets: &[TxOffset]) -> io::Result<()> {
self.clog.compress_segments(offsets)
}
/// Get the size on disk of the underlying [`Commitlog`].
pub fn size_on_disk(&self) -> io::Result<SizeOnDisk> {
self.clog.size_on_disk()
}
}
struct Actor<T> {
clog: Arc<Commitlog<Txdata<T>>>,
durable_offset: watch::Sender<Option<TxOffset>>,
queue_depth: Arc<AtomicU64>,
batch_capacity: NonZeroUsize,
#[allow(unused)]
lock: Lock,
}
impl<T: Encode + Send + Sync + 'static> Actor<T> {
#[instrument(name = "durability::local::actor", skip_all)]
async fn run(
self,
mut transactions_rx: mpsc::Receiver<Transaction<Txdata<T>>>,
mut shutdown_rx: mpsc::Receiver<oneshot::Sender<OwnedNotified>>,
) {
info!("starting durability actor");
let mut tx_buf = Vec::with_capacity(self.batch_capacity.get());
// `flush_and_sync` when the loop exits without panicking,
// or `flush_and_sync` inside the loop failed.
let mut sync_on_exit = true;
loop {
tokio::select! {
// Biased towards the shutdown channel,
// so that we stop accepting new data promptly after
// `Durability::close` was called.
biased;
Some(reply) = shutdown_rx.recv() => {
transactions_rx.close();
let _ = reply.send(self.lock.notified());
},
// Pop as many elements from the channel as possible,
// potentially requiring the `tx_buf` to allocate additional
// capacity.
// We'll reclaim capacity in excess of `self.batch_size` below.
n = transactions_rx.recv_many(&mut tx_buf, usize::MAX) => {
if n == 0 {
break;
}
self.queue_depth.fetch_sub(n as u64, Relaxed);
let clog = self.clog.clone();
tx_buf = spawn_blocking(move || -> io::Result<Vec<Transaction<Txdata<T>>>> {
for tx in tx_buf.drain(..) {
clog.commit([tx])?;
}
Ok(tx_buf)
})
.await
.expect("commitlog write panicked")
.expect("commitlog write failed");
if self.flush_and_sync().await.is_err() {
sync_on_exit = false;
break;
}
// Reclaim burst capacity.
if n < self.batch_capacity.get() {
tx_buf.shrink_to(self.batch_capacity.get());
}
},
}
}
if sync_on_exit {
let _ = self.flush_and_sync().await;
}
info!("exiting durability actor");
}
#[instrument(skip_all)]
async fn flush_and_sync(&self) -> io::Result<Option<TxOffset>> {
// Skip if nothing changed.
if let Some((committed, durable)) = self.clog.max_committed_offset().zip(*self.durable_offset.borrow())
&& committed == durable
{
return Ok(None);
}
let clog = self.clog.clone();
let span = Span::current();
spawn_blocking(move || {
let _span = span.enter();
clog.flush_and_sync()
})
.await
.expect("commitlog flush-and-sync blocking task panicked")
.inspect_err(|e| warn!("error flushing commitlog: {e:#}"))
.inspect(|maybe_offset| {
if let Some(new_offset) = maybe_offset {
trace!("synced to offset {new_offset}");
self.durable_offset.send_modify(|val| {
val.replace(*new_offset);
});
}
})
}
}
struct Lock {
file: Option<LockedFile>,
notify_on_drop: Arc<Notify>,
}
impl Lock {
pub fn create(path: PathBuf) -> Result<Self, LockError> {
let file = LockedFile::lock(path).map(Some)?;
let notify_on_drop = Arc::new(Notify::new());
Ok(Self { file, notify_on_drop })
}
pub fn notified(&self) -> OwnedNotified {
self.notify_on_drop.clone().notified_owned()
}
}
impl Drop for Lock {
fn drop(&mut self) {
// Ensure the file lock is dropped before notifying.
if let Some(file) = self.file.take() {
drop(file);
}
self.notify_on_drop.notify_waiters();
}
}
impl<T: Send + Sync + 'static> Durability for Local<T> {
type TxData = Txdata<T>;
fn append_tx(&self, tx: Transaction<Self::TxData>) {
match self.queue.try_reserve() {
Ok(permit) => permit.send(tx),
Err(mpsc::error::TrySendError::Closed(_)) => {
panic!("durability actor crashed");
}
Err(mpsc::error::TrySendError::Full(_)) => {
let send = || self.queue.blocking_send(tx);
if tokio::runtime::Handle::try_current().is_ok() {
tokio::task::block_in_place(send)
} else {
send()
}
.expect("durability actor crashed");
}
}
self.queue_depth.fetch_add(1, Relaxed);
}
fn durable_tx_offset(&self) -> DurableOffset {
self.durable_offset.clone().into()
}
fn close(&self) -> Close {
info!("close local durability");
let durable_offset = self.durable_tx_offset();
let shutdown = self.shutdown.clone();
// Abort actor if shutdown future is dropped.
let abort = scopeguard::guard(self.abort.clone(), |actor| {
warn!("close future dropped, aborting durability actor");
actor.abort();
});
async move {
let (done_tx, done_rx) = oneshot::channel();
// Ignore channel errors - those just mean the actor is already gone.
let _ = shutdown
.send(done_tx)
.map_err(drop)
.and_then(|()| done_rx.map_err(drop))
.and_then(|done| async move {
done.await;
Ok(())
})
.await;
// Don't abort if we completed normally.
let _ = ScopeGuard::into_inner(abort);
durable_offset.last_seen()
}
.boxed()
}
}
impl<T: Encode + 'static> History for Commitlog<Txdata<T>> {
type TxData = Txdata<T>;
fn fold_transactions_from<D>(&self, offset: TxOffset, decoder: D) -> Result<(), D::Error>
where
D: Decoder,
D::Error: From<error::Traversal>,
{
self.fold_transactions_from(offset, decoder)
}
fn transactions_from<'a, D>(
&self,
offset: TxOffset,
decoder: &'a D,
) -> impl Iterator<Item = Result<Transaction<Self::TxData>, D::Error>>
where
D: Decoder<Record = Self::TxData>,
D::Error: From<error::Traversal>,
Self::TxData: 'a,
{
self.transactions_from(offset, decoder)
}
fn tx_range_hint(&self) -> (TxOffset, Option<TxOffset>) {
let min = self.min_committed_offset().unwrap_or_default();
let max = self.max_committed_offset();
(min, max)
}
}