Skip to content

Commit af50bf5

Browse files
authored
Compression stats and commitlog compression function (clockworklabs#4708)
Adds a way to pass a "closure" when compressing a commitlog segment, such that the source and destination are polymorphic. This allows to have them wrapped externally, which is used to apply bandwidth limiting where necessary. Also return bytes in + bytes out stats from the compressor. # Expected complexity level and risk 1 # Testing Elsewhere.
1 parent 312dfaa commit af50bf5

8 files changed

Lines changed: 133 additions & 32 deletions

File tree

crates/commitlog/src/lib.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use log::trace;
99
use repo::{fs::OnNewSegmentFn, Repo};
1010
use spacetimedb_paths::server::CommitLogDir;
1111

12+
pub use spacetimedb_fs_utils::compression::CompressionStats;
13+
1214
pub mod commit;
1315
pub mod commitlog;
1416
mod index;
@@ -330,16 +332,18 @@ impl<T> Commitlog<T> {
330332
}
331333

332334
/// Compress the segments at the offsets provided, marking them as immutable.
333-
pub fn compress_segments(&self, offsets: &[u64]) -> io::Result<()> {
335+
pub fn compress_segments(&self, offsets: &[u64]) -> io::Result<CompressionStats> {
334336
// even though `compress_segment` takes &self, we take an
335337
// exclusive lock to avoid any weirdness happening.
336338
#[allow(clippy::readonly_write_lock)]
337339
let inner = self.inner.write().unwrap();
338340
assert!(!offsets.contains(&inner.head.min_tx_offset()));
339341
// TODO: parallelize, maybe
340-
offsets
341-
.iter()
342-
.try_for_each(|&offset| inner.repo.compress_segment(offset))
342+
let mut stats = <_>::default();
343+
for offset in offsets {
344+
stats += inner.repo.compress_segment(*offset)?;
345+
}
346+
Ok(stats)
343347
}
344348

345349
/// Remove all data from the log and reopen it.

crates/commitlog/src/repo/fs.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ use std::io;
44
use std::sync::Arc;
55

66
use log::{debug, warn};
7-
use spacetimedb_fs_utils::compression::{compress_with_zstd, CompressReader};
7+
use spacetimedb_fs_utils::compression::{CompressReader, CompressionStats};
88
use spacetimedb_fs_utils::lockfile;
99
use spacetimedb_paths::server::{CommitLogDir, SegmentFile};
1010
use tempfile::NamedTempFile;
1111

12-
use crate::segment::{self, FileLike};
13-
1412
use super::{Repo, SegmentLen, SegmentReader, TxOffset, TxOffsetIndex, TxOffsetIndexMut};
13+
use crate::repo::CompressOnce;
14+
use crate::segment::{self, FileLike};
1515

1616
const SEGMENT_FILE_EXT: &str = ".stdb.log";
1717

@@ -317,21 +317,18 @@ impl Repo for Fs {
317317
fs::remove_file(self.segment_path(offset))
318318
}
319319

320-
fn compress_segment(&self, offset: u64) -> io::Result<()> {
320+
fn compress_segment_with(&self, offset: u64, f: impl CompressOnce) -> io::Result<CompressionStats> {
321321
let src = self.open_segment_reader(offset)?;
322322
// if it's already compressed, leave it be
323323
let CompressReader::None(mut src) = src.inner else {
324-
return Ok(());
324+
return Ok(<_>::default());
325325
};
326326

327327
let mut dst = NamedTempFile::new_in(&self.root)?;
328-
// bytes per frame. in the future, it might be worth looking into putting
329-
// every commit into its own frame, to make seeking more efficient.
330-
let max_frame_size = 0x1000;
331-
compress_with_zstd(&mut src, &mut dst, Some(max_frame_size))?;
328+
let stats = f.compress(&mut src, &mut dst)?;
332329
dst.persist(self.segment_path(offset))?;
333330

334-
Ok(())
331+
Ok(stats)
335332
}
336333

337334
fn existing_offsets(&self) -> io::Result<Vec<u64>> {

crates/commitlog/src/repo/mem.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::repo::{
1111

1212
mod segment;
1313
pub use segment::{ReadOnlySegment, Segment};
14+
use spacetimedb_fs_utils::compression::CompressionStats;
1415

1516
pub const PAGE_SIZE: usize = 4096;
1617

@@ -105,8 +106,8 @@ impl Repo for Memory {
105106
Ok(())
106107
}
107108

108-
fn compress_segment(&self, _offset: u64) -> io::Result<()> {
109-
Ok(())
109+
fn compress_segment_with(&self, _: u64, _: impl super::CompressOnce) -> io::Result<CompressionStats> {
110+
Ok(<_>::default())
110111
}
111112

112113
fn existing_offsets(&self) -> io::Result<Vec<u64>> {

crates/commitlog/src/repo/mod.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ use std::{
44
};
55

66
use log::{debug, warn};
7+
use spacetimedb_fs_utils::compression::Zstd;
8+
pub use spacetimedb_fs_utils::compression::{CompressOnce, CompressionStats};
79

810
use crate::{
911
commit::Commit,
@@ -121,7 +123,13 @@ pub trait Repo: Clone + fmt::Display {
121123
fn remove_segment(&self, offset: u64) -> io::Result<()>;
122124

123125
/// Compress a segment in storage, marking it as immutable.
124-
fn compress_segment(&self, offset: u64) -> io::Result<()>;
126+
fn compress_segment(&self, offset: u64) -> io::Result<CompressionStats> {
127+
self.compress_segment_with(offset, segment_compressor())
128+
}
129+
130+
/// Compress a segment using a supplied [CompressOnce], marking it as
131+
/// immutable.
132+
fn compress_segment_with(&self, offset: u64, f: impl CompressOnce) -> io::Result<CompressionStats>;
125133

126134
/// Traverse all segments in this repository and return list of their
127135
/// offsets, sorted in ascending order.
@@ -164,8 +172,8 @@ impl<T: Repo> Repo for &T {
164172
T::remove_segment(self, offset)
165173
}
166174

167-
fn compress_segment(&self, offset: u64) -> io::Result<()> {
168-
T::compress_segment(self, offset)
175+
fn compress_segment_with(&self, offset: u64, f: impl CompressOnce) -> io::Result<CompressionStats> {
176+
T::compress_segment_with(self, offset, f)
169177
}
170178

171179
fn existing_offsets(&self) -> io::Result<Vec<u64>> {
@@ -408,6 +416,17 @@ pub fn open_segment_reader<R: Repo>(
408416
.map_err(|source| with_segment_context("reading segment header", repo, offset, source))
409417
}
410418

419+
/// Obtain the canonical [CompressOnce] compressor for segments.
420+
///
421+
/// The compressor will create seekable [Zstd] archives with a max frame size
422+
/// of 4KiB. That is, seeking to an arbitrary byte offset (of the uncompressed
423+
/// segment) within the archive will decompress 4KiB of data on average.
424+
pub fn segment_compressor() -> Zstd {
425+
Zstd {
426+
max_frame_size: Some(0x1000),
427+
}
428+
}
429+
411430
fn segment_label<R: Repo>(repo: &R, offset: u64) -> String {
412431
repo.segment_file_path(offset)
413432
.unwrap_or_else(|| format!("offset {offset}"))

crates/commitlog/src/tests/partial.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::{
88

99
use log::{debug, info};
1010
use pretty_assertions::assert_matches;
11+
use spacetimedb_fs_utils::compression::CompressionStats;
1112

1213
use crate::{
1314
commitlog, payload,
@@ -295,8 +296,8 @@ impl Repo for ShortMem {
295296
self.inner.remove_segment(offset)
296297
}
297298

298-
fn compress_segment(&self, offset: u64) -> io::Result<()> {
299-
self.inner.compress_segment(offset)
299+
fn compress_segment_with(&self, offset: u64, f: impl repo::CompressOnce) -> io::Result<CompressionStats> {
300+
self.inner.compress_segment_with(offset, f)
300301
}
301302

302303
fn existing_offsets(&self) -> io::Result<Vec<u64>> {

crates/durability/src/imp/local.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ use futures::FutureExt as _;
1111
use itertools::Itertools as _;
1212
use log::{info, trace, warn};
1313
use scopeguard::ScopeGuard;
14-
use spacetimedb_commitlog::{error, payload::Txdata, Commit, Commitlog, Decoder, Encode, Transaction};
14+
use spacetimedb_commitlog::{
15+
error, payload::Txdata, Commit, Commitlog, CompressionStats, Decoder, Encode, Transaction,
16+
};
1517
use spacetimedb_fs_utils::lockfile::advisory::{LockError, LockedFile};
1618
use spacetimedb_paths::server::ReplicaDir;
1719
use thiserror::Error;
@@ -184,7 +186,7 @@ impl<T: Send + Sync + 'static> Local<T> {
184186
}
185187

186188
/// Compress the segments at the offsets provided, marking them as immutable.
187-
pub fn compress_segments(&self, offsets: &[TxOffset]) -> io::Result<()> {
189+
pub fn compress_segments(&self, offsets: &[TxOffset]) -> io::Result<CompressionStats> {
188190
self.clog.compress_segments(offsets)
189191
}
190192

crates/fs-utils/src/compression.rs

Lines changed: 73 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::fs::File;
22
use std::io;
33
use std::io::{BufReader, Read, Seek, SeekFrom};
4+
use std::ops::AddAssign;
45
use tokio::io::AsyncSeek;
56
use zstd_framed;
67
use zstd_framed::{ZstdReader, ZstdWriter};
@@ -149,16 +150,81 @@ pub fn new_zstd_writer<'a, W: io::Write>(inner: W, max_frame_size: Option<u32>)
149150
.build()
150151
}
151152

153+
/// Compress `src` to `dst` in one go.
154+
///
155+
/// Like a `FnOnce` closure, but polymorphic over the arguments.
156+
pub trait CompressOnce {
157+
fn compress(self, src: impl io::Read, dst: impl io::Write) -> io::Result<CompressionStats>;
158+
}
159+
160+
/// Implements [CompressOnce] for the zstd compression algorithm.
161+
pub struct Zstd {
162+
/// If `Some`, add a seek table with `max_frame_size` to the compressed output.
163+
///
164+
/// See [zstd_framed::writer::ZstdWriterBuilder::with_seek_table].
165+
pub max_frame_size: Option<u32>,
166+
}
167+
168+
impl CompressOnce for Zstd {
169+
fn compress(self, src: impl io::Read, dst: impl io::Write) -> io::Result<CompressionStats> {
170+
compress_with_zstd(src, dst, self.max_frame_size)
171+
}
172+
}
173+
174+
#[derive(Clone, Copy, Debug, Default)]
175+
pub struct CompressionStats {
176+
pub bytes_read: u64,
177+
pub bytes_written: u64,
178+
}
179+
180+
impl AddAssign for CompressionStats {
181+
fn add_assign(&mut self, rhs: Self) {
182+
self.bytes_read += rhs.bytes_read;
183+
self.bytes_written += rhs.bytes_written;
184+
}
185+
}
186+
152187
pub fn compress_with_zstd<W: io::Write, R: io::Read>(
153188
mut src: R,
154-
mut dst: W,
189+
dst: W,
155190
max_frame_size: Option<u32>,
156-
) -> io::Result<()> {
157-
let mut writer = new_zstd_writer(&mut dst, max_frame_size)?;
158-
io::copy(&mut src, &mut writer)?;
159-
writer.shutdown()?;
160-
drop(writer);
161-
Ok(())
191+
) -> io::Result<CompressionStats> {
192+
/// [io::Write] wrapper that counts how many bytes were written.
193+
struct Writer<W> {
194+
bytes_written: u64,
195+
inner: W,
196+
}
197+
198+
impl<W: io::Write> io::Write for Writer<W> {
199+
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
200+
let n = self.inner.write(buf)?;
201+
self.bytes_written += n as u64;
202+
Ok(n)
203+
}
204+
205+
fn flush(&mut self) -> io::Result<()> {
206+
self.inner.flush()
207+
}
208+
}
209+
210+
// Wrap `dst` in [Writer], and use it as the sink for the zstd writer,
211+
// such that we can determine how many (compressed) bytes came out at the end.
212+
let mut dst = Writer {
213+
bytes_written: 0,
214+
inner: dst,
215+
};
216+
let mut zstd_writer = new_zstd_writer(&mut dst, max_frame_size)?;
217+
218+
let bytes_read = io::copy(&mut src, &mut zstd_writer)?;
219+
zstd_writer.shutdown()?;
220+
drop(zstd_writer);
221+
222+
let stats = CompressionStats {
223+
bytes_read,
224+
bytes_written: dst.bytes_written,
225+
};
226+
227+
Ok(stats)
162228
}
163229

164230
pub use async_impls::AsyncCompressReader;

crates/snapshot/src/lib.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,8 @@ impl fmt::Debug for SnapshotSize {
652652
pub struct ObjectCompressionStats {
653653
/// Number of objects freshly compressed.
654654
pub compressed: usize,
655+
/// Cumulative stats of the compressed objects.
656+
pub compression_stats: spacetimedb_fs_utils::compression::CompressionStats,
655657
/// Number of objects hardlinked from a parent repository.
656658
pub hardlinked: usize,
657659
}
@@ -667,8 +669,16 @@ impl ObjectCompressionStats {
667669
}
668670

669671
impl AddAssign for ObjectCompressionStats {
670-
fn add_assign(&mut self, Self { compressed, hardlinked }: Self) {
672+
fn add_assign(
673+
&mut self,
674+
Self {
675+
compressed,
676+
compression_stats,
677+
hardlinked,
678+
}: Self,
679+
) {
671680
self.compressed += compressed;
681+
self.compression_stats += compression_stats;
672682
self.hardlinked += hardlinked;
673683
}
674684
}
@@ -1215,10 +1225,11 @@ impl SnapshotRepository {
12151225
let dst = src.with_extension("_tmp");
12161226
let mut write = BufWriter::new(o_excl().open(&dst)?);
12171227
// The default frame size compress better.
1218-
compress_with_zstd(read, &mut write, None)?;
1228+
let compression_stats = compress_with_zstd(read, &mut write, None)?;
12191229
std::fs::rename(dst, src)?;
12201230
if let Some(stats) = stats {
12211231
stats.compressed += 1;
1232+
stats.compression_stats += compression_stats;
12221233
}
12231234

12241235
Ok(())

0 commit comments

Comments
 (0)