Skip to content

Commit 2ed3d8c

Browse files
committed
Abstract compression function, so we can instrument input and output
1 parent 027547d commit 2ed3d8c

5 files changed

Lines changed: 50 additions & 14 deletions

File tree

crates/commitlog/src/repo/fs.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ use std::io;
44
use std::sync::Arc;
55

66
use log::{debug, warn};
7-
use spacetimedb_fs_utils::compression::{compress_with_zstd, CompressReader, CompressionStats};
7+
use spacetimedb_fs_utils::compression::{CompressReader, CompressionStats};
88
use spacetimedb_paths::server::{CommitLogDir, SegmentFile};
99
use tempfile::NamedTempFile;
1010

1111
use super::{Repo, SegmentLen, SegmentReader, TxOffset, TxOffsetIndex, TxOffsetIndexMut};
12+
use crate::repo::CompressOnce;
1213
use crate::segment::FileLike;
1314

1415
const SEGMENT_FILE_EXT: &str = ".stdb.log";
@@ -260,18 +261,15 @@ impl Repo for Fs {
260261
fs::remove_file(self.segment_path(offset))
261262
}
262263

263-
fn compress_segment(&self, offset: u64) -> io::Result<CompressionStats> {
264+
fn compress_segment_with(&self, offset: u64, f: impl CompressOnce) -> io::Result<CompressionStats> {
264265
let src = self.open_segment_reader(offset)?;
265266
// if it's already compressed, leave it be
266267
let CompressReader::None(mut src) = src.inner else {
267268
return Ok(<_>::default());
268269
};
269270

270271
let mut dst = NamedTempFile::new_in(&self.root)?;
271-
// bytes per frame. in the future, it might be worth looking into putting
272-
// every commit into its own frame, to make seeking more efficient.
273-
let max_frame_size = 0x1000;
274-
let stats = compress_with_zstd(&mut src, &mut dst, Some(max_frame_size))?;
272+
let stats = f.compress(&mut src, &mut dst)?;
275273
dst.persist(self.segment_path(offset))?;
276274

277275
Ok(stats)

crates/commitlog/src/repo/mem.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ impl Repo for Memory {
104104
Ok(())
105105
}
106106

107-
fn compress_segment(&self, _offset: u64) -> io::Result<CompressionStats> {
107+
fn compress_segment_with(&self, _: u64, _: impl super::CompressOnce) -> io::Result<CompressionStats> {
108108
Ok(<_>::default())
109109
}
110110

crates/commitlog/src/repo/mod.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use std::{
44
};
55

66
use log::{debug, warn};
7-
8-
pub use spacetimedb_fs_utils::compression::CompressionStats;
7+
use spacetimedb_fs_utils::compression::Zstd;
8+
pub use spacetimedb_fs_utils::compression::{CompressOnce, CompressionStats};
99

1010
use crate::{
1111
commit::Commit,
@@ -123,7 +123,13 @@ pub trait Repo: Clone + fmt::Display {
123123
fn remove_segment(&self, offset: u64) -> io::Result<()>;
124124

125125
/// Compress a segment in storage, marking it as immutable.
126-
fn compress_segment(&self, offset: u64) -> io::Result<CompressionStats>;
126+
fn compress_segment(&self, offset: u64) -> io::Result<CompressionStats> {
127+
self.compress_segment_with(offset, segment_compressor())
128+
}
129+
130+
/// Compress a segment using a supplied [CompressOnce], marking it as
131+
/// immutable.
132+
fn compress_segment_with(&self, offset: u64, f: impl CompressOnce) -> io::Result<CompressionStats>;
127133

128134
/// Traverse all segments in this repository and return list of their
129135
/// offsets, sorted in ascending order.
@@ -166,8 +172,8 @@ impl<T: Repo> Repo for &T {
166172
T::remove_segment(self, offset)
167173
}
168174

169-
fn compress_segment(&self, offset: u64) -> io::Result<CompressionStats> {
170-
T::compress_segment(self, offset)
175+
fn compress_segment_with(&self, offset: u64, f: impl CompressOnce) -> io::Result<CompressionStats> {
176+
T::compress_segment_with(self, offset, f)
171177
}
172178

173179
fn existing_offsets(&self) -> io::Result<Vec<u64>> {
@@ -356,6 +362,17 @@ pub fn open_segment_reader<R: Repo>(
356362
.map_err(|source| with_segment_context("reading segment header", repo, offset, source))
357363
}
358364

365+
/// Obtain the canonical [CompressOnce] compressor for segments.
366+
///
367+
/// The compressor will create seekable [Zstd] archives with a max frame size
368+
/// of 4KiB. That is, seeking to an arbitrary byte offset (of the uncompressed
369+
/// segment) within the archive will decompress 4KiB of data on average.
370+
pub fn segment_compressor() -> Zstd {
371+
Zstd {
372+
max_frame_size: Some(0x1000),
373+
}
374+
}
375+
359376
fn segment_label<R: Repo>(repo: &R, offset: u64) -> String {
360377
repo.segment_file_path(offset)
361378
.unwrap_or_else(|| format!("offset {offset}"))

crates/commitlog/src/tests/partial.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,8 +296,8 @@ impl Repo for ShortMem {
296296
self.inner.remove_segment(offset)
297297
}
298298

299-
fn compress_segment(&self, offset: u64) -> io::Result<CompressionStats> {
300-
self.inner.compress_segment(offset)
299+
fn compress_segment_with(&self, offset: u64, f: impl repo::CompressOnce) -> io::Result<CompressionStats> {
300+
self.inner.compress_segment_with(offset, f)
301301
}
302302

303303
fn existing_offsets(&self) -> io::Result<Vec<u64>> {

crates/fs-utils/src/compression.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,27 @@ pub fn new_zstd_writer<'a, W: io::Write>(inner: W, max_frame_size: Option<u32>)
150150
.build()
151151
}
152152

153+
/// Compress `src` to `dst` in one go.
154+
///
155+
/// Like a `FnOnce` closure, but polymorphic over the arguments.
156+
pub trait CompressOnce {
157+
fn compress(self, src: impl io::Read, dst: impl io::Write) -> io::Result<CompressionStats>;
158+
}
159+
160+
/// Implements [CompressOnce] for the zstd compression algorithm.
161+
pub struct Zstd {
162+
/// If `Some`, add a seek table with `max_frame_size` to the compressed output.
163+
///
164+
/// See [zstd_framed::writer::ZstdWriterBuilder::with_seek_table].
165+
pub max_frame_size: Option<u32>,
166+
}
167+
168+
impl CompressOnce for Zstd {
169+
fn compress(self, src: impl io::Read, dst: impl io::Write) -> io::Result<CompressionStats> {
170+
compress_with_zstd(src, dst, self.max_frame_size)
171+
}
172+
}
173+
153174
#[derive(Clone, Copy, Debug, Default)]
154175
pub struct CompressionStats {
155176
pub bytes_read: u64,

0 commit comments

Comments
 (0)