Skip to content

Commit ae523db

Browse files
commitlog: Don't lock while compressing (#4981)
Commitlog compression is coordinated externally, and it is safe to compress the same segment concurrently. Therefore, we can release the lock before starting the actual compression work, thereby avoiding to interfere with writes. Also, return an error if the mutable segment is requested to be compressed, instead of panicking. Co-authored-by: joshua-spacetime <josh@clockworklabs.io>
1 parent 9abc04b commit ae523db

1 file changed

Lines changed: 30 additions & 7 deletions

File tree

crates/commitlog/src/lib.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -332,16 +332,39 @@ impl<T> Commitlog<T> {
332332
}
333333

334334
/// Compress the segments at the offsets provided, marking them as immutable.
335+
///
336+
/// `offsets` must contain the exact segment offsets, no rounding to the
337+
/// nearest offset is performed. If a segment is not found on disk, an error
338+
/// is returned and no further segments from the list are processed.
339+
///
340+
/// The latest, writable segment will not be compressed. If `offsets`
341+
/// contains its offset, an error is returned.
342+
///
343+
/// This method acquires a read lock on this `Commitlog` instance, but
344+
/// releases it once the compression work starts. Concurrent compression
345+
/// tasks on the same segment are safe, but external coordination is
346+
/// required to avoid duplicate work.
347+
///
348+
/// Attempting to compress a segment that is already compressed incurs a
349+
/// small overhead to open the file and determining its format, but
350+
/// otherwise does nothing.
335351
pub fn compress_segments(&self, offsets: &[u64]) -> io::Result<CompressionStats> {
336-
// even though `compress_segment` takes &self, we take an
337-
// exclusive lock to avoid any weirdness happening.
338-
#[allow(clippy::readonly_write_lock)]
339-
let inner = self.inner.write().unwrap();
340-
assert!(!offsets.contains(&inner.head.min_tx_offset()));
341-
// TODO: parallelize, maybe
352+
let (repo, head_offset) = {
353+
let inner = self.inner.read().unwrap();
354+
let repo = inner.repo.clone();
355+
let head_offset = inner.head.min_tx_offset();
356+
357+
(repo, head_offset)
358+
};
359+
if offsets.contains(&head_offset) {
360+
return Err(io::Error::new(
361+
io::ErrorKind::InvalidInput,
362+
format!("refusing to compress mutable segment {head_offset}"),
363+
));
364+
}
342365
let mut stats = <_>::default();
343366
for offset in offsets {
344-
stats += inner.repo.compress_segment(*offset)?;
367+
stats += repo.compress_segment(*offset)?;
345368
}
346369
Ok(stats)
347370
}

0 commit comments

Comments
 (0)