Skip to content

Commit 9f401b5

Browse files
committed
commitlog: Don't lock while compressing
Commitlog compression is coordinated externally, and it is safe to compress the same segment concurrently. Therefore, we can release the lock before starting the actual compression work, thereby avoiding to interfere with writes. Also, just ignore the currently active segment instead of panicking.
1 parent 698bdcb commit 9f401b5

1 file changed

Lines changed: 30 additions & 9 deletions

File tree

crates/commitlog/src/lib.rs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -330,16 +330,37 @@ impl<T> Commitlog<T> {
330330
}
331331

332332
/// Compress the segments at the offsets provided, marking them as immutable.
333+
///
334+
/// `offsets` must contain the exact segment offsets, no rounding to the
335+
/// nearest offset is performed. If a segment is not found on disk, an error
336+
/// is returned and no further segments from the list are processed.
337+
///
338+
/// The latest, writable segment will not be compressed. If `offsets`
339+
/// contains its offset, an error is returned.
340+
///
341+
/// This method acquires a read lock on this `Commitlog` instance, but
342+
/// releases it once the compression work starts. Concurrent compression
343+
/// tasks on the same segment are safe, but external coordination is
344+
/// required to avoid duplicate work.
345+
///
346+
/// Attempting to compress a segment that is already compressed incurs a
347+
/// small overhead to open the file and determining its format, but
348+
/// otherwise does nothing.
333349
pub fn compress_segments(&self, offsets: &[u64]) -> io::Result<()> {
334-
// even though `compress_segment` takes &self, we take an
335-
// exclusive lock to avoid any weirdness happening.
336-
#[allow(clippy::readonly_write_lock)]
337-
let inner = self.inner.write().unwrap();
338-
assert!(!offsets.contains(&inner.head.min_tx_offset()));
339-
// TODO: parallelize, maybe
340-
offsets
341-
.iter()
342-
.try_for_each(|&offset| inner.repo.compress_segment(offset))
350+
let (repo, head_offset) = {
351+
let inner = self.inner.read().unwrap();
352+
let repo = inner.repo.clone();
353+
let head_offset = inner.head.min_tx_offset();
354+
355+
(repo, head_offset)
356+
};
357+
if offsets.contains(&head_offset) {
358+
return Err(io::Error::new(
359+
io::ErrorKind::InvalidInput,
360+
format!("refusing to compress mutable segment {head_offset}"),
361+
));
362+
}
363+
offsets.iter().try_for_each(|&offset| repo.compress_segment(offset))
343364
}
344365

345366
/// Remove all data from the log and reopen it.

0 commit comments

Comments
 (0)