Skip to content

Commit 273b831

Browse files
kimCentril
authored andcommitted
Append commit instead of individual transactions to commitlog (#4404)
Re-open #4140 (reverted in #4292). The original patch was merged a bit too eagerly. It should go in _after_ 2.0 is released with some confidence.
1 parent 68d9de2 commit 273b831

16 files changed

Lines changed: 895 additions & 623 deletions

File tree

crates/commitlog/src/commitlog.rs

Lines changed: 154 additions & 118 deletions
Large diffs are not rendered by default.

crates/commitlog/src/lib.rs

Lines changed: 47 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{
22
io,
3-
num::{NonZeroU16, NonZeroU64},
3+
num::NonZeroU64,
44
ops::RangeBounds,
55
sync::{Arc, RwLock},
66
};
@@ -17,11 +17,12 @@ pub mod segment;
1717
mod varchar;
1818
mod varint;
1919

20+
use crate::segment::Committed;
2021
pub use crate::{
2122
commit::{Commit, StoredCommit},
2223
commitlog::CommittedMeta,
2324
payload::{Decoder, Encode},
24-
repo::fs::SizeOnDisk,
25+
repo::{fs::SizeOnDisk, TxOffset},
2526
segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION},
2627
varchar::Varchar,
2728
};
@@ -58,14 +59,6 @@ pub struct Options {
5859
/// Default: 1GiB
5960
#[cfg_attr(feature = "serde", serde(default = "Options::default_max_segment_size"))]
6061
pub max_segment_size: u64,
61-
/// The maximum number of records in a commit.
62-
///
63-
/// If this number is exceeded, the commit is flushed to disk even without
64-
/// explicitly calling [`Commitlog::flush`].
65-
///
66-
/// Default: 1
67-
#[cfg_attr(feature = "serde", serde(default = "Options::default_max_records_in_commit"))]
68-
pub max_records_in_commit: NonZeroU16,
6962
/// Whenever at least this many bytes have been written to the currently
7063
/// active segment, an entry is added to its offset index.
7164
///
@@ -97,6 +90,12 @@ pub struct Options {
9790
/// Has no effect if the `fallocate` feature is not enabled.
9891
#[cfg_attr(feature = "serde", serde(default = "Options::default_preallocate_segments"))]
9992
pub preallocate_segments: bool,
93+
/// Size in bytes of the memory buffer holding commit data before flushing
94+
/// to storage.
95+
///
96+
/// Default: 8KiB
97+
#[cfg_attr(feature = "serde", serde(default = "Options::default_write_buffer_size"))]
98+
pub write_buffer_size: usize,
10099
}
101100

102101
impl Default for Options {
@@ -107,18 +106,18 @@ impl Default for Options {
107106

108107
impl Options {
109108
pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 1024 * 1024 * 1024;
110-
pub const DEFAULT_MAX_RECORDS_IN_COMMIT: NonZeroU16 = NonZeroU16::new(1).expect("1 > 0, qed");
111109
pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed");
112110
pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false;
113111
pub const DEFAULT_PREALLOCATE_SEGMENTS: bool = false;
112+
pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 8 * 1024;
114113

115114
pub const DEFAULT: Self = Self {
116115
log_format_version: DEFAULT_LOG_FORMAT_VERSION,
117116
max_segment_size: Self::default_max_segment_size(),
118-
max_records_in_commit: Self::default_max_records_in_commit(),
119117
offset_index_interval_bytes: Self::default_offset_index_interval_bytes(),
120118
offset_index_require_segment_fsync: Self::default_offset_index_require_segment_fsync(),
121119
preallocate_segments: Self::default_preallocate_segments(),
120+
write_buffer_size: Self::default_write_buffer_size(),
122121
};
123122

124123
pub const fn default_log_format_version() -> u8 {
@@ -129,10 +128,6 @@ impl Options {
129128
Self::DEFAULT_MAX_SEGMENT_SIZE
130129
}
131130

132-
pub const fn default_max_records_in_commit() -> NonZeroU16 {
133-
Self::DEFAULT_MAX_RECORDS_IN_COMMIT
134-
}
135-
136131
pub const fn default_offset_index_interval_bytes() -> NonZeroU64 {
137132
Self::DEFAULT_OFFSET_INDEX_INTERVAL_BYTES
138133
}
@@ -145,6 +140,10 @@ impl Options {
145140
Self::DEFAULT_PREALLOCATE_SEGMENTS
146141
}
147142

143+
pub const fn default_write_buffer_size() -> usize {
144+
Self::DEFAULT_WRITE_BUFFER_SIZE
145+
}
146+
148147
/// Compute the length in bytes of an offset index based on the settings in
149148
/// `self`.
150149
pub fn offset_index_len(&self) -> u64 {
@@ -263,7 +262,7 @@ impl<T> Commitlog<T> {
263262
pub fn flush(&self) -> io::Result<Option<u64>> {
264263
let mut inner = self.inner.write().unwrap();
265264
trace!("flush commitlog");
266-
inner.commit()?;
265+
inner.flush()?;
267266

268267
Ok(inner.max_committed_offset())
269268
}
@@ -283,7 +282,7 @@ impl<T> Commitlog<T> {
283282
pub fn flush_and_sync(&self) -> io::Result<Option<u64>> {
284283
let mut inner = self.inner.write().unwrap();
285284
trace!("flush and sync commitlog");
286-
inner.commit()?;
285+
inner.flush()?;
287286
inner.sync();
288287

289288
Ok(inner.max_committed_offset())
@@ -384,57 +383,47 @@ impl<T> Commitlog<T> {
384383
}
385384

386385
impl<T: Encode> Commitlog<T> {
387-
/// Append the record `txdata` to the log.
386+
/// Write `transactions` to the log.
388387
///
389-
/// If the internal buffer exceeds [`Options::max_records_in_commit`], the
390-
/// argument is returned in an `Err`. The caller should [`Self::flush`] the
391-
/// log and try again.
388+
/// This will store all `transactions` as a single [Commit]
389+
/// (note that `transactions` must not yield more than [u16::MAX] elements).
392390
///
393-
/// In case the log is appended to from multiple threads, this may result in
394-
/// a busy loop trying to acquire a slot in the buffer. In such scenarios,
395-
/// [`Self::append_maybe_flush`] is preferable.
396-
pub fn append(&self, txdata: T) -> Result<(), T> {
397-
let mut inner = self.inner.write().unwrap();
398-
inner.append(txdata)
399-
}
400-
401-
/// Append the record `txdata` to the log.
391+
/// Data is buffered internally, call [Self::flush] to force flushing to
392+
/// the underlying storage.
393+
///
394+
/// Returns `Ok(None)` if `transactions` was empty, otherwise [Committed],
395+
/// which contains the offset range and checksum of the commit.
402396
///
403-
/// The `txdata` payload is buffered in memory until either:
397+
/// # Errors
404398
///
405-
/// - [`Self::flush`] is called explicitly, or
406-
/// - [`Options::max_records_in_commit`] is exceeded
399+
/// An `Err` value is returned in the following cases:
407400
///
408-
/// In the latter case, [`Self::append`] flushes implicitly, _before_
409-
/// appending the `txdata` argument.
401+
/// - if the transaction sequence is invalid, e.g. because the transaction
402+
/// offsets are not contiguous.
410403
///
411-
/// I.e. the argument is not guaranteed to be flushed after the method
412-
/// returns. If that is desired, [`Self::flush`] must be called explicitly.
404+
/// In this case, **none** of the `transactions` will be written.
413405
///
414-
/// If writing `txdata` to the commitlog results in a new segment file being opened,
415-
/// we will send a message down `on_new_segment`.
416-
/// This will be hooked up to the `request_snapshot` channel of a `SnapshotWorker`.
406+
/// - if creating the new segment fails due to an I/O error.
417407
///
418-
/// # Errors
408+
/// # Panics
409+
///
410+
/// The method panics if:
411+
///
412+
/// - `transactions` exceeds [u16::MAX] elements
413+
///
414+
/// - [Self::flush] or writing to the underlying buffered writer fails
419415
///
420-
/// If the log needs to be flushed, but an I/O error occurs, ownership of
421-
/// `txdata` is returned back to the caller alongside the [`io::Error`].
416+
/// This is likely caused by some storage issue. As we cannot tell with
417+
/// certainty how much data (if any) has been written, the internal state
418+
/// becomes invalid and thus a panic is raised.
422419
///
423-
/// The value can then be used to retry appending.
424-
pub fn append_maybe_flush(&self, txdata: T) -> Result<(), error::Append<T>> {
420+
/// - [Self::sync] panics (called when rotating segments)
421+
pub fn commit<U: Into<Transaction<T>>>(
422+
&self,
423+
transactions: impl IntoIterator<Item = U>,
424+
) -> io::Result<Option<Committed>> {
425425
let mut inner = self.inner.write().unwrap();
426-
427-
if let Err(txdata) = inner.append(txdata) {
428-
if let Err(source) = inner.commit() {
429-
return Err(error::Append { txdata, source });
430-
}
431-
432-
// `inner.commit.n` must be zero at this point
433-
let res = inner.append(txdata);
434-
debug_assert!(res.is_ok(), "failed to append while holding write lock");
435-
}
436-
437-
Ok(())
426+
inner.commit(transactions)
438427
}
439428

440429
/// Obtain an iterator which traverses the log from the start, yielding

crates/commitlog/src/repo/mod.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -214,13 +214,11 @@ pub fn create_segment_writer<R: Repo>(
214214
records: Vec::new(),
215215
epoch,
216216
},
217-
inner: io::BufWriter::new(storage),
217+
inner: io::BufWriter::with_capacity(opts.write_buffer_size, storage),
218218

219219
min_tx_offset: offset,
220220
bytes_written: Header::LEN as u64,
221221

222-
max_records_in_commit: opts.max_records_in_commit,
223-
224222
offset_index_head: create_offset_index_writer(repo, offset, opts),
225223
})
226224
}
@@ -294,8 +292,6 @@ pub fn resume_segment_writer<R: Repo>(
294292
min_tx_offset: tx_range.start,
295293
bytes_written: size_in_bytes,
296294

297-
max_records_in_commit: opts.max_records_in_commit,
298-
299295
offset_index_head: create_offset_index_writer(repo, offset, opts),
300296
}))
301297
}

0 commit comments

Comments
 (0)