Skip to content

Commit 8cb451b

Browse files
Merge origin/master into joshua/remove-old-query-code
Resolve import conflict in relational_db.rs (take master's new test utilities).
2 parents 4d7904b + f181fce commit 8cb451b

16 files changed

Lines changed: 864 additions & 183 deletions

File tree

crates/commitlog/src/repo/fs.rs

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use tempfile::NamedTempFile;
1010

1111
use crate::segment::FileLike;
1212

13-
use super::{Repo, SegmentLen, TxOffset, TxOffsetIndex, TxOffsetIndexMut};
13+
use super::{Repo, SegmentLen, SegmentReader, TxOffset, TxOffsetIndex, TxOffsetIndexMut};
1414

1515
const SEGMENT_FILE_EXT: &str = ".stdb.log";
1616

@@ -154,9 +154,52 @@ impl FileLike for NamedTempFile {
154154
}
155155
}
156156

157+
/// A file-backed, read-only segment.
158+
///
159+
/// Transparently handles reading compressed segments.
160+
/// [Self::sealed] returns `true` if the segment is compressed.
161+
pub struct ReadOnlySegment {
162+
inner: CompressReader,
163+
}
164+
165+
impl SegmentReader for ReadOnlySegment {
166+
#[inline]
167+
fn sealed(&self) -> bool {
168+
self.inner.is_compressed()
169+
}
170+
}
171+
172+
impl io::Read for ReadOnlySegment {
173+
#[inline]
174+
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
175+
self.inner.read(buf)
176+
}
177+
}
178+
179+
impl io::BufRead for ReadOnlySegment {
180+
#[inline]
181+
fn fill_buf(&mut self) -> io::Result<&[u8]> {
182+
self.inner.fill_buf()
183+
}
184+
185+
#[inline]
186+
fn consume(&mut self, amount: usize) {
187+
self.inner.consume(amount);
188+
}
189+
}
190+
191+
impl io::Seek for ReadOnlySegment {
192+
#[inline]
193+
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
194+
self.inner.seek(pos)
195+
}
196+
}
197+
198+
impl SegmentLen for ReadOnlySegment {}
199+
157200
impl Repo for Fs {
158201
type SegmentWriter = File;
159-
type SegmentReader = CompressReader;
202+
type SegmentReader = ReadOnlySegment;
160203

161204
fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
162205
File::options()
@@ -198,8 +241,9 @@ impl Repo for Fs {
198241
}
199242

200243
fn open_segment_reader(&self, offset: u64) -> io::Result<Self::SegmentReader> {
244+
debug!("fs: open segment at {}", self.segment_path(offset).display());
201245
let file = File::open(self.segment_path(offset))?;
202-
CompressReader::new(file)
246+
CompressReader::new(file).map(|inner| ReadOnlySegment { inner })
203247
}
204248

205249
fn remove_segment(&self, offset: u64) -> io::Result<()> {
@@ -215,7 +259,7 @@ impl Repo for Fs {
215259
fn compress_segment(&self, offset: u64) -> io::Result<()> {
216260
let src = self.open_segment_reader(offset)?;
217261
// if it's already compressed, leave it be
218-
let CompressReader::None(mut src) = src else {
262+
let CompressReader::None(mut src) = src.inner else {
219263
return Ok(());
220264
};
221265

crates/commitlog/src/repo/mem.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::repo::{
1010
};
1111

1212
mod segment;
13-
pub use segment::Segment;
13+
pub use segment::{ReadOnlySegment, Segment};
1414

1515
pub const PAGE_SIZE: usize = 4096;
1616

@@ -52,7 +52,7 @@ impl fmt::Display for Memory {
5252

5353
impl Repo for Memory {
5454
type SegmentWriter = Segment;
55-
type SegmentReader = io::BufReader<Segment>;
55+
type SegmentReader = ReadOnlySegment;
5656

5757
fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
5858
let mut inner = self.segments.write().unwrap();
@@ -88,7 +88,7 @@ impl Repo for Memory {
8888
}
8989

9090
fn open_segment_reader(&self, offset: u64) -> io::Result<Self::SegmentReader> {
91-
self.open_segment_writer(offset).map(io::BufReader::new)
91+
self.open_segment_writer(offset).map(Into::into)
9292
}
9393

9494
fn remove_segment(&self, offset: u64) -> io::Result<()> {

crates/commitlog/src/repo/mem/segment.rs

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
use crate::{
88
repo::{
99
mem::{SpaceOnDevice, PAGE_SIZE},
10-
SegmentLen,
10+
SegmentLen, SegmentReader,
1111
},
1212
segment::FileLike,
1313
};
@@ -318,3 +318,50 @@ mod async_impls {
318318
}
319319
}
320320
}
321+
322+
pub struct ReadOnlySegment {
323+
inner: io::BufReader<Segment>,
324+
}
325+
326+
impl From<Segment> for ReadOnlySegment {
327+
fn from(inner: Segment) -> Self {
328+
Self {
329+
inner: io::BufReader::new(inner),
330+
}
331+
}
332+
}
333+
334+
impl SegmentReader for ReadOnlySegment {
335+
/// Memory segments dont' support compression, so are never sealed.
336+
fn sealed(&self) -> bool {
337+
false
338+
}
339+
}
340+
341+
impl io::Read for ReadOnlySegment {
342+
#[inline]
343+
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
344+
self.inner.read(buf)
345+
}
346+
}
347+
348+
impl io::BufRead for ReadOnlySegment {
349+
#[inline]
350+
fn fill_buf(&mut self) -> io::Result<&[u8]> {
351+
self.inner.fill_buf()
352+
}
353+
354+
#[inline]
355+
fn consume(&mut self, amount: usize) {
356+
self.inner.consume(amount);
357+
}
358+
}
359+
360+
impl io::Seek for ReadOnlySegment {
361+
#[inline]
362+
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
363+
self.inner.seek(pos)
364+
}
365+
}
366+
367+
impl SegmentLen for ReadOnlySegment {}

crates/commitlog/src/repo/mod.rs

Lines changed: 52 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::{fmt, io};
1+
use std::{
2+
fmt,
3+
io::{self, Seek},
4+
};
25

36
use log::{debug, warn};
47

@@ -52,8 +55,14 @@ pub trait SegmentLen: io::Seek {
5255
}
5356
}
5457

55-
pub trait SegmentReader: io::BufRead + SegmentLen + Send + Sync {}
56-
impl<T: io::BufRead + SegmentLen + Send + Sync> SegmentReader for T {}
58+
pub trait SegmentReader: io::BufRead + SegmentLen + Send + Sync {
59+
/// Whether the segment is considered immutable.
60+
///
61+
/// Currently, this is true when the segment is compressed.
62+
/// [resume_segment_writer] uses this method to indicate that a new segment
63+
/// should be created when opening a commitlog.
64+
fn sealed(&self) -> bool;
65+
}
5766

5867
pub trait SegmentWriter: FileLike + io::Read + io::Write + SegmentLen + Send + Sync {}
5968
impl<T: FileLike + io::Read + io::Write + SegmentLen + Send + Sync> SegmentWriter for T {}
@@ -243,21 +252,9 @@ pub fn resume_segment_writer<R: Repo>(
243252
opts: Options,
244253
offset: u64,
245254
) -> io::Result<Result<Writer<R::SegmentWriter>, Metadata>> {
246-
let mut storage = repo.open_segment_writer(offset)?;
247-
// Ensure we have enough space for this segment.
248-
// The segment could have been created without the `fallocate` feature
249-
// enabled, so we call this here again to ensure writes can't fail due to
250-
// ENOSPC.
251-
fallocate(&mut storage, &opts)?;
255+
let mut reader = repo.open_segment_reader(offset)?;
252256
let offset_index = repo.get_offset_index(offset).ok();
253-
let Metadata {
254-
header,
255-
tx_range,
256-
size_in_bytes,
257-
max_epoch,
258-
max_commit_offset: _,
259-
max_commit: _,
260-
} = match Metadata::extract(offset, &mut storage, offset_index.as_ref()) {
257+
let meta = match Metadata::extract(offset, &mut reader, offset_index.as_ref()) {
261258
Err(error::SegmentMetadata::InvalidCommit { sofar, source }) => {
262259
warn!("invalid commit in segment {offset}: {source}");
263260
debug!("sofar={sofar:?}");
@@ -266,34 +263,55 @@ pub fn resume_segment_writer<R: Repo>(
266263
Err(error::SegmentMetadata::Io(e)) => return Err(e),
267264
Ok(meta) => meta,
268265
};
269-
header
266+
meta.header
270267
.ensure_compatible(opts.log_format_version, Commit::CHECKSUM_ALGORITHM)
271268
.map_err(|msg| io::Error::new(io::ErrorKind::InvalidData, msg))?;
272269
// When resuming, the log format version must be equal.
273-
if header.log_format_version != opts.log_format_version {
270+
if meta.header.log_format_version != opts.log_format_version {
274271
return Err(io::Error::new(
275272
io::ErrorKind::InvalidData,
276273
format!(
277274
"log format version mismatch: current={} segment={}",
278-
opts.log_format_version, header.log_format_version
275+
opts.log_format_version, meta.header.log_format_version
279276
),
280277
));
281278
}
282279

283-
Ok(Ok(Writer {
284-
commit: Commit {
285-
min_tx_offset: tx_range.end,
286-
n: 0,
287-
records: Vec::new(),
288-
epoch: max_epoch,
289-
},
290-
inner: io::BufWriter::new(storage),
291-
292-
min_tx_offset: tx_range.start,
293-
bytes_written: size_in_bytes,
294-
295-
offset_index_head: create_offset_index_writer(repo, offset, opts),
296-
}))
280+
if reader.sealed() {
281+
Ok(Err(meta))
282+
} else {
283+
let Metadata {
284+
header: _,
285+
tx_range,
286+
size_in_bytes,
287+
max_epoch,
288+
max_commit_offset: _,
289+
max_commit: _,
290+
} = meta;
291+
let mut writer = repo.open_segment_writer(offset)?;
292+
// Ensure we have enough space for this segment.
293+
// The segment could have been created without the `fallocate` feature
294+
// enabled, so we call this here again to ensure writes can't fail due
295+
// to ENOSPC.
296+
fallocate(&mut writer, &opts)?;
297+
// We use `O_APPEND`, but make the file offset consistent regardless.
298+
writer.seek(io::SeekFrom::End(0))?;
299+
300+
Ok(Ok(Writer {
301+
commit: Commit {
302+
min_tx_offset: tx_range.end,
303+
n: 0,
304+
records: Vec::new(),
305+
epoch: max_epoch,
306+
},
307+
inner: io::BufWriter::new(writer),
308+
309+
min_tx_offset: tx_range.start,
310+
bytes_written: size_in_bytes,
311+
312+
offset_index_head: create_offset_index_writer(repo, offset, opts),
313+
}))
314+
}
297315
}
298316

299317
/// Open the existing segment at `offset` for reading.

crates/commitlog/src/tests/partial.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ impl fmt::Display for ShortMem {
271271

272272
impl Repo for ShortMem {
273273
type SegmentWriter = ShortSegment;
274-
type SegmentReader = io::BufReader<repo::mem::Segment>;
274+
type SegmentReader = repo::mem::ReadOnlySegment;
275275

276276
fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
277277
self.inner.create_segment(offset).map(|inner| ShortSegment {

crates/commitlog/tests/random_payload/mod.rs

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use log::info;
2+
use spacetimedb_commitlog::repo::Repo;
23
use spacetimedb_commitlog::tests::helpers::enable_logging;
3-
use spacetimedb_commitlog::{payload, Commitlog, Options};
4+
use spacetimedb_commitlog::{commitlog, payload, repo, Commitlog, Options};
45
use spacetimedb_paths::server::CommitLogDir;
56
use spacetimedb_paths::FromPathUnchecked;
67
use tempfile::tempdir;
@@ -75,6 +76,12 @@ fn resets() {
7576
}
7677
}
7778

79+
/// Try to generate commitlogs that will be amenable to compression -
80+
/// random data doesn't compress well, so try and have there be repetition
81+
fn compressible_payloads() -> impl Iterator<Item = [u8; 256]> {
82+
(0..4).map(|_| gen_payload()).cycle()
83+
}
84+
7885
#[test]
7986
fn compression() {
8087
enable_logging();
@@ -90,9 +97,7 @@ fn compression() {
9097
)
9198
.unwrap();
9299

93-
// try to generate commitlogs that will be amenable to compression -
94-
// random data doesn't compress well, so try and have there be repetition
95-
let payloads = (0..4).map(|_| gen_payload()).cycle().take(1024).collect::<Vec<_>>();
100+
let payloads = compressible_payloads().take(1024).collect::<Vec<_>>();
96101
for (i, payload) in payloads.iter().enumerate() {
97102
clog.commit([(i as u64, *payload)]).unwrap();
98103
}
@@ -114,3 +119,45 @@ fn compression() {
114119
.enumerate()
115120
.all(|(i, x)| x.offset == i as u64 && x.txdata == payloads[i]));
116121
}
122+
123+
/// When restoring an archived commitlog, all segments are compressed and should
124+
/// remain immutable.
125+
///
126+
/// Tests that this is upheld, i.e. a fresh segment is created when resuming
127+
/// writes.
128+
#[test]
129+
fn all_segments_sealed() {
130+
enable_logging();
131+
132+
let root = tempdir().unwrap();
133+
let path = CommitLogDir::from_path_unchecked(root.path());
134+
let opts = Options {
135+
max_segment_size: 64 * 1024,
136+
..<_>::default()
137+
};
138+
let num_commits = 1024;
139+
let repo = repo::Fs::new(path, None).unwrap();
140+
{
141+
let mut clog = commitlog::Generic::open(&repo, opts).unwrap();
142+
for (i, payload) in compressible_payloads().take(num_commits).enumerate() {
143+
clog.commit([(i as u64, payload)]).unwrap();
144+
}
145+
clog.flush().unwrap();
146+
clog.sync();
147+
}
148+
149+
let segments = repo.existing_offsets().unwrap();
150+
let num_segments = segments.len();
151+
152+
// Compress all segments via the `repo`,
153+
// to not trigger the assert that the head segment cannot be compressed.
154+
for segment in segments {
155+
repo.compress_segment(segment).unwrap();
156+
}
157+
158+
// Re-opening the commitlog should create a fresh segment at offset `num_commits`.
159+
let _ = commitlog::Generic::<_, [u8; 256]>::open(&repo, opts).unwrap();
160+
let segments = repo.existing_offsets().unwrap();
161+
assert_eq!(num_segments + 1, segments.len());
162+
assert_eq!(segments.last().copied(), Some(num_commits as u64));
163+
}

0 commit comments

Comments
 (0)