Skip to content

Commit 2af138f

Browse files
authored
commitlog: Truncate segment before resuming (#5116)
`read_exact` doesn't distinguish between EOF and not enough bytes to fill the given buffer. We may thus consider a segment valid to be resumed, but leave in trailing bytes (less than the commit header length). That can cause silent data loss, because appending more data will render the segment corrupt: a restart will then start a new segment, leaving anything after the trailing bytes unreachable. To fix this, truncate the segment to the size determined by `Metadata::extract` before resuming writes. # Expected complexity level and risk 2 # Testing Added a test
1 parent 998127c commit 2af138f

5 files changed

Lines changed: 128 additions & 26 deletions

File tree

crates/commitlog/src/repo/fs.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::fmt;
22
use std::fs::{self, File};
3-
use std::io;
3+
use std::io::{self, Seek};
44
use std::sync::Arc;
55

66
use log::{debug, warn};
@@ -277,7 +277,7 @@ impl Repo for Fs {
277277
// The segment file either does not exist, or is of length zero.
278278
// Write the header to a temporary file and atomically move it into place.
279279
let mut tmp = tempfile::Builder::new().make_in(&self.root.0, |tmp_path| {
280-
File::options().read(true).append(true).create_new(true).open(tmp_path)
280+
File::options().read(true).write(true).create_new(true).open(tmp_path)
281281
})?;
282282
header.write(&mut tmp)?;
283283
tmp.as_file_mut().sync_all()?;
@@ -292,7 +292,11 @@ impl Repo for Fs {
292292
}
293293

294294
fn open_segment_writer(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
295-
File::options().read(true).append(true).open(self.segment_path(offset))
295+
// NOTE: We previously used `O_APPEND`, but Windows demands `write(true)`
296+
// so that we can truncate trailing garbage in [super::resume_segment_writer].
297+
let mut file = File::options().read(true).write(true).open(self.segment_path(offset))?;
298+
file.seek(io::SeekFrom::End(0))?;
299+
Ok(file)
296300
}
297301

298302
fn segment_file_path(&self, offset: u64) -> Option<String> {

crates/commitlog/src/repo/mod.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use spacetimedb_fs_utils::compression::Zstd;
88
pub use spacetimedb_fs_utils::compression::{CompressOnce, CompressionStats};
99

1010
use crate::{
11-
commit::Commit,
11+
commit::{self, Commit},
1212
error,
1313
index::{IndexFile, IndexFileMut},
1414
segment::{self, FileLike, Header, Metadata, OffsetIndexWriter, Reader, Writer},
@@ -375,16 +375,24 @@ pub fn resume_segment_writer<R: Repo>(
375375
if reader.sealed() {
376376
Ok(ResumedSegment::Sealed(meta))
377377
} else {
378-
let Metadata {
379-
header: _,
380-
tx_range,
381-
size_in_bytes,
382-
max_epoch,
383-
max_commit_offset: _,
384-
max_commit: _,
385-
} = meta;
386378
let mut writer = repo.open_segment_writer(offset)?;
379+
// Ensure that the segment's size is exactly what we determined.
380+
//
381+
// When `Metadata` encounters EOF, it could be that there actually are
382+
// trailing bytes in the segment, but less than the commit header
383+
// length. This is difficult to detect due to the use of `read_exact`,
384+
// so ensure we remove any trailing bytes.
385+
//
386+
// To be extra cautious, check that no more than the header length
387+
// bytes are left over before truncating the segment. This is an assert
388+
// because it would be a bug in `Metadata::extract`.
389+
assert!(
390+
writer.segment_len()? < meta.size_in_bytes + commit::Header::LEN as u64,
391+
"{repo}: trailing bytes exceed commit header length in segment {offset}"
392+
);
393+
writer.ftruncate(meta.tx_range.end, meta.size_in_bytes)?;
387394
// Ensure we have enough space for this segment.
395+
//
388396
// The segment could have been created without the `fallocate` feature
389397
// enabled, so we call this here again to ensure writes can't fail due
390398
// to ENOSPC.
@@ -394,15 +402,15 @@ pub fn resume_segment_writer<R: Repo>(
394402

395403
Ok(ResumedSegment::Resumed(Writer {
396404
commit: Commit {
397-
min_tx_offset: tx_range.end,
405+
min_tx_offset: meta.tx_range.end,
398406
n: 0,
399407
records: Vec::new(),
400-
epoch: max_epoch,
408+
epoch: meta.max_epoch,
401409
},
402410
inner: io::BufWriter::new(writer),
403411

404-
min_tx_offset: tx_range.start,
405-
bytes_written: size_in_bytes,
412+
min_tx_offset: meta.tx_range.start,
413+
bytes_written: meta.size_in_bytes,
406414

407415
offset_index_head: create_offset_index_writer(repo, offset, opts),
408416
}))

crates/commitlog/src/segment.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,12 @@ impl Header {
4646

4747
pub fn decode<R: io::Read>(mut read: R) -> io::Result<Self> {
4848
let mut buf = [0; Self::LEN];
49-
read.read_exact(&mut buf)?;
49+
read.read_exact(&mut buf).map_err(|e| {
50+
io::Error::new(
51+
e.kind(),
52+
format!("failed to read segment header ({} bytes): {}", Self::LEN, e),
53+
)
54+
})?;
5055

5156
if !buf.starts_with(&MAGIC) {
5257
return Err(io::Error::new(
@@ -612,6 +617,7 @@ impl Metadata {
612617
mut reader: R,
613618
offset_index: Option<&TxOffsetIndex>,
614619
) -> Result<Self, error::SegmentMetadata> {
620+
reader.seek(io::SeekFrom::Start(0))?;
615621
let header = Header::decode(&mut reader)?;
616622
Self::with_header(min_tx_offset, header, reader, offset_index)
617623
}

crates/commitlog/tests/random_payload/mod.rs

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
use std::io::Write;
2+
13
use log::info;
2-
use spacetimedb_commitlog::repo::Repo;
4+
use spacetimedb_commitlog::repo::{Repo, SegmentLen};
35
use spacetimedb_commitlog::tests::helpers::enable_logging;
4-
use spacetimedb_commitlog::{commitlog, payload, repo, Commitlog, Options};
6+
use spacetimedb_commitlog::{commit, commitlog, payload, repo, Commitlog, Options};
57
use spacetimedb_paths::server::CommitLogDir;
68
use spacetimedb_paths::FromPathUnchecked;
79
use tempfile::tempdir;
@@ -196,3 +198,60 @@ fn resume_empty_segment() {
196198
}
197199
}
198200
}
201+
202+
/// Tests that resuming a segment that has trailing bytes smaller than a
203+
/// commitlog header causes those trailing bytes to be removed.
204+
///
205+
/// Regression test for https://github.com/clockworklabs/SpacetimeDB/pull/5116
206+
#[test]
207+
fn resume_small_trailing_garbage() {
208+
enable_logging();
209+
210+
let root = tempdir().unwrap();
211+
let path = CommitLogDir::from_path_unchecked(root.path());
212+
213+
let repo = repo::Fs::new(path, None).unwrap();
214+
// Write some data.
215+
{
216+
let mut clog = commitlog::Generic::open(&repo, <_>::default()).unwrap();
217+
for (i, payload) in compressible_payloads().take(1024).enumerate() {
218+
clog.commit([(i as u64, payload)]).unwrap();
219+
clog.flush().unwrap();
220+
clog.sync();
221+
}
222+
}
223+
224+
// Add some extra bytes, less than the commit header length.
225+
let last_segment_size = {
226+
let segments = repo.existing_offsets().unwrap();
227+
let mut last_segment = repo.open_segment_writer(segments.last().copied().unwrap()).unwrap();
228+
last_segment.write_all(&[67u8; commit::Header::LEN - 1]).unwrap();
229+
last_segment.flush().unwrap();
230+
last_segment.sync_all().unwrap();
231+
last_segment.segment_len().unwrap()
232+
};
233+
{
234+
let mut clog = commitlog::Generic::open(&repo, <_>::default()).unwrap();
235+
236+
// The extra bytes should have been truncated away.
237+
let segments = repo.existing_offsets().unwrap();
238+
let mut last_segment = repo.open_segment_writer(segments.last().copied().unwrap()).unwrap();
239+
assert_eq!(
240+
last_segment.segment_len().unwrap(),
241+
last_segment_size - (commit::Header::LEN - 1) as u64
242+
);
243+
244+
// Add some more data.
245+
for (i, payload) in compressible_payloads()
246+
.take(1024)
247+
.enumerate()
248+
.map(|(offset, payload)| (offset + 1024, payload))
249+
{
250+
clog.commit([(i as u64, payload)]).unwrap();
251+
clog.flush().unwrap();
252+
clog.sync();
253+
}
254+
255+
assert_eq!(2048, clog.commits_from(0).map(Result::unwrap).count());
256+
}
257+
}

crates/snapshot/src/lib.rs

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1492,14 +1492,39 @@ impl FileOrDirPath<'_> {
14921492
/// On Windows, only the file needs to be synced, and it's even an error to
14931493
/// sync a directory. Passing in [Self::Dir] is thus a no-op on Windows.
14941494
fn sync_all(&self) -> io::Result<()> {
1495-
#[cfg(target_os = "windows")]
1496-
if let Self::Dir(_) = self {
1497-
return Ok(());
1495+
match self {
1496+
#[cfg(target_os = "windows")]
1497+
Self::Dir(path) => Ok(()),
1498+
#[cfg(not(target_os = "windows"))]
1499+
Self::Dir(path) => File::open(path)
1500+
.map_err(|e| {
1501+
io::Error::new(
1502+
e.kind(),
1503+
format!("failed to open directory {} for fsync: {}", path.display(), e),
1504+
)
1505+
})?
1506+
.sync_all()
1507+
.map_err(|e| io::Error::new(e.kind(), format!("failed to fsync directory {}: {}", path.display(), e))),
1508+
Self::File(path) => {
1509+
File::options()
1510+
.read(true)
1511+
// Windows needs the file to be writable for `sync_all` to work.
1512+
// Set all the open options explicitly, just for visibility.
1513+
.write(true)
1514+
.truncate(false)
1515+
.create(false)
1516+
.append(false)
1517+
.open(path)
1518+
.map_err(|e| {
1519+
io::Error::new(
1520+
e.kind(),
1521+
format!("failed to open file {} for fsync: {}", path.display(), e),
1522+
)
1523+
})?
1524+
.sync_all()
1525+
.map_err(|e| io::Error::new(e.kind(), format!("failed to fsync file {}: {}", path.display(), e)))
1526+
}
14981527
}
1499-
let (Self::File(path) | Self::Dir(path)) = self;
1500-
File::open(path)
1501-
.and_then(|fd| fd.sync_all())
1502-
.map_err(|e| io::Error::new(e.kind(), format!("failed to fsync {}: {}", path.display(), e)))
15031528
}
15041529
}
15051530

0 commit comments

Comments
 (0)