Skip to content

Commit 62d9203

Browse files
committed
fixup! Abstract compression function, so we can instrument input and output
1 parent 4a97348 commit 62d9203

5 files changed

Lines changed: 14 additions & 50 deletions

File tree

crates/commitlog/src/repo/fs.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,11 @@ use std::io;
44
use std::sync::Arc;
55

66
use log::{debug, warn};
7-
use spacetimedb_fs_utils::compression::{CompressReader, CompressionStats};
7+
use spacetimedb_fs_utils::compression::{compress_with_zstd, CompressReader, CompressionStats};
88
use spacetimedb_paths::server::{CommitLogDir, SegmentFile};
99
use tempfile::NamedTempFile;
1010

1111
use super::{Repo, SegmentLen, SegmentReader, TxOffset, TxOffsetIndex, TxOffsetIndexMut};
12-
use crate::repo::CompressOnce;
1312
use crate::segment::FileLike;
1413

1514
const SEGMENT_FILE_EXT: &str = ".stdb.log";
@@ -261,15 +260,18 @@ impl Repo for Fs {
261260
fs::remove_file(self.segment_path(offset))
262261
}
263262

264-
fn compress_segment_with(&self, offset: u64, f: impl CompressOnce) -> io::Result<CompressionStats> {
263+
fn compress_segment(&self, offset: u64) -> io::Result<CompressionStats> {
265264
let src = self.open_segment_reader(offset)?;
266265
// if it's already compressed, leave it be
267266
let CompressReader::None(mut src) = src.inner else {
268267
return Ok(<_>::default());
269268
};
270269

271270
let mut dst = NamedTempFile::new_in(&self.root)?;
272-
let stats = f.compress(&mut src, &mut dst)?;
271+
// bytes per frame. in the future, it might be worth looking into putting
272+
// every commit into its own frame, to make seeking more efficient.
273+
let max_frame_size = 0x1000;
274+
let stats = compress_with_zstd(&mut src, &mut dst, Some(max_frame_size))?;
273275
dst.persist(self.segment_path(offset))?;
274276

275277
Ok(stats)

crates/commitlog/src/repo/mem.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ impl Repo for Memory {
104104
Ok(())
105105
}
106106

107-
fn compress_segment_with(&self, _: u64, _: impl super::CompressOnce) -> io::Result<CompressionStats> {
107+
fn compress_segment(&self, _offset: u64) -> io::Result<CompressionStats> {
108108
Ok(<_>::default())
109109
}
110110

crates/commitlog/src/repo/mod.rs

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use std::{
44
};
55

66
use log::{debug, warn};
7-
use spacetimedb_fs_utils::compression::Zstd;
8-
pub use spacetimedb_fs_utils::compression::{CompressOnce, CompressionStats};
7+
8+
pub use spacetimedb_fs_utils::compression::CompressionStats;
99

1010
use crate::{
1111
commit::Commit,
@@ -123,13 +123,7 @@ pub trait Repo: Clone + fmt::Display {
123123
fn remove_segment(&self, offset: u64) -> io::Result<()>;
124124

125125
/// Compress a segment in storage, marking it as immutable.
126-
fn compress_segment(&self, offset: u64) -> io::Result<CompressionStats> {
127-
self.compress_segment_with(offset, segment_compressor())
128-
}
129-
130-
/// Compress a segment using a supplied [CompressOnce], marking it as
131-
/// immutable.
132-
fn compress_segment_with(&self, offset: u64, f: impl CompressOnce) -> io::Result<CompressionStats>;
126+
fn compress_segment(&self, offset: u64) -> io::Result<CompressionStats>;
133127

134128
/// Traverse all segments in this repository and return list of their
135129
/// offsets, sorted in ascending order.
@@ -172,8 +166,8 @@ impl<T: Repo> Repo for &T {
172166
T::remove_segment(self, offset)
173167
}
174168

175-
fn compress_segment_with(&self, offset: u64, f: impl CompressOnce) -> io::Result<CompressionStats> {
176-
T::compress_segment_with(self, offset, f)
169+
fn compress_segment(&self, offset: u64) -> io::Result<CompressionStats> {
170+
T::compress_segment(self, offset)
177171
}
178172

179173
fn existing_offsets(&self) -> io::Result<Vec<u64>> {
@@ -362,17 +356,6 @@ pub fn open_segment_reader<R: Repo>(
362356
.map_err(|source| with_segment_context("reading segment header", repo, offset, source))
363357
}
364358

365-
/// Obtain the canonical [CompressOnce] compressor for segments.
366-
///
367-
/// The compressor will create seekable [Zstd] archives with a max frame size
368-
/// of 4KiB. That is, seeking to an arbitrary byte offset (of the uncompressed
369-
/// segment) within the archive will decompress 4KiB of data on average.
370-
pub fn segment_compressor() -> Zstd {
371-
Zstd {
372-
max_frame_size: Some(0x1000),
373-
}
374-
}
375-
376359
fn segment_label<R: Repo>(repo: &R, offset: u64) -> String {
377360
repo.segment_file_path(offset)
378361
.unwrap_or_else(|| format!("offset {offset}"))

crates/commitlog/src/tests/partial.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,8 +296,8 @@ impl Repo for ShortMem {
296296
self.inner.remove_segment(offset)
297297
}
298298

299-
fn compress_segment_with(&self, offset: u64, f: impl repo::CompressOnce) -> io::Result<CompressionStats> {
300-
self.inner.compress_segment_with(offset, f)
299+
fn compress_segment(&self, offset: u64) -> io::Result<CompressionStats> {
300+
self.inner.compress_segment(offset)
301301
}
302302

303303
fn existing_offsets(&self) -> io::Result<Vec<u64>> {

crates/fs-utils/src/compression.rs

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -150,27 +150,6 @@ pub fn new_zstd_writer<'a, W: io::Write>(inner: W, max_frame_size: Option<u32>)
150150
.build()
151151
}
152152

153-
/// Compress `src` to `dst` in one go.
154-
///
155-
/// Like a `FnOnce` closure, but polymorphic over the arguments.
156-
pub trait CompressOnce {
157-
fn compress(self, src: impl io::Read, dst: impl io::Write) -> io::Result<CompressionStats>;
158-
}
159-
160-
/// Implements [CompressOnce] for the zstd compression algorithm.
161-
pub struct Zstd {
162-
/// If `Some`, add a seek table with `max_frame_size` to the compressed output.
163-
///
164-
/// See [zstd_framed::writer::ZstdWriterBuilder::with_seek_table].
165-
pub max_frame_size: Option<u32>,
166-
}
167-
168-
impl CompressOnce for Zstd {
169-
fn compress(self, src: impl io::Read, dst: impl io::Write) -> io::Result<CompressionStats> {
170-
compress_with_zstd(src, dst, self.max_frame_size)
171-
}
172-
}
173-
174153
#[derive(Clone, Copy, Debug, Default)]
175154
pub struct CompressionStats {
176155
pub bytes_read: u64,

0 commit comments

Comments
 (0)