Skip to content

Commit 997b5a6

Browse files
authored
commitlog: Handle empty tail segments upon resumption (#4863)
In #4338, the read-only path was made resilient against empty segments at the end of the log, but corresponding logic was not applied to re-opening the commitlog for writing. This patch rectifies that by ignoring and removing segments from the tail of the log if they contain equal to or less than `segment::Header::LEN` bytes. Additionally, zero-sized segments are eliminated entirely by ensuring that the header is written before moving the segment into place atomically. The benefit of this is not huge, but could simplify commitlog-consuming code by not having to worry about empty (zero-sized) segments. Happy to revert if that is deemed too less of a benefit. # Expected complexity level and risk 2 # Testing Adds a test.
1 parent 1f0e127 commit 997b5a6

11 files changed

Lines changed: 275 additions & 115 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/commitlog/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ itertools.workspace = true
2626
log.workspace = true
2727
memmap2 = "0.9.4"
2828
nix = { workspace = true, optional = true, features = ["fs"] }
29+
scopeguard.workspace = true
2930
serde = { workspace = true, optional = true }
3031
spacetimedb-fs-utils.workspace = true
3132
spacetimedb-paths.workspace = true

crates/commitlog/src/commitlog.rs

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -59,33 +59,27 @@ impl<R: Repo, T> Generic<R, T> {
5959
if !tail.is_empty() {
6060
debug!("segments: {tail:?}");
6161
}
62-
let head = if let Some(last) = tail.pop() {
63-
debug!("resuming last segment: {last}");
64-
// Resume the last segment for writing, or create a new segment
65-
// starting from the last good commit + 1.
66-
repo::resume_segment_writer(&repo, opts, last)?.or_else(|meta| {
67-
// The first commit in the last segment being corrupt is an
68-
// edge case: we'd try to start a new segment with an offset
69-
// equal to the already existing one, which would fail.
70-
//
71-
// We cannot just skip it either, as we don't know the reason
72-
// for the corruption (there could be more, potentially
73-
// recoverable commits in the segment).
74-
//
75-
// Thus, provide some context about what is wrong and refuse to
76-
// start.
77-
if meta.tx_range.is_empty() {
78-
return Err(io::Error::new(
79-
io::ErrorKind::InvalidData,
80-
format!("repo {}: first commit in resumed segment {} is corrupt", repo, last),
81-
));
62+
63+
// Resume the last segment for writing, or
64+
// create a new segment starting from the last good commit + 1.
65+
let head = loop {
66+
if let Some(last) = tail.pop() {
67+
info!("repo {}: resuming last segment: {}", repo, last);
68+
match repo::resume_segment_writer(&repo, opts, last)? {
69+
repo::ResumedSegment::Empty => {
70+
repo.remove_segment(last)?;
71+
continue;
72+
}
73+
repo::ResumedSegment::Resumed(writer) => break writer,
74+
repo::ResumedSegment::Sealed(meta) | repo::ResumedSegment::Corrupted(meta) => {
75+
tail.push(meta.tx_range.start);
76+
break repo::create_segment_writer(&repo, opts, meta.max_epoch, meta.tx_range.end)?;
77+
}
8278
}
83-
tail.push(meta.tx_range.start);
84-
repo::create_segment_writer(&repo, opts, meta.max_epoch, meta.tx_range.end)
85-
})?
86-
} else {
87-
debug!("starting fresh log");
88-
repo::create_segment_writer(&repo, opts, Commit::DEFAULT_EPOCH, 0)?
79+
} else {
80+
info!("repo {}: starting fresh log", repo);
81+
break repo::create_segment_writer(&repo, opts, Commit::DEFAULT_EPOCH, 0)?;
82+
}
8983
};
9084

9185
Ok(Self {

crates/commitlog/src/repo/fs.rs

Lines changed: 86 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@ use std::sync::Arc;
55

66
use log::{debug, warn};
77
use spacetimedb_fs_utils::compression::{compress_with_zstd, CompressReader};
8+
use spacetimedb_fs_utils::lockfile;
89
use spacetimedb_paths::server::{CommitLogDir, SegmentFile};
910
use tempfile::NamedTempFile;
1011

11-
use crate::segment::FileLike;
12+
use crate::segment::{self, FileLike};
1213

1314
use super::{Repo, SegmentLen, SegmentReader, TxOffset, TxOffsetIndex, TxOffsetIndexMut};
1415

@@ -160,6 +161,7 @@ impl FileLike for NamedTempFile {
160161
/// [Self::sealed] returns `true` if the segment is compressed.
161162
pub struct ReadOnlySegment {
162163
inner: CompressReader,
164+
len: u64,
163165
}
164166

165167
impl SegmentReader for ReadOnlySegment {
@@ -195,45 +197,98 @@ impl io::Seek for ReadOnlySegment {
195197
}
196198
}
197199

198-
impl SegmentLen for ReadOnlySegment {}
200+
impl SegmentLen for ReadOnlySegment {
201+
fn segment_len(&mut self) -> io::Result<u64> {
202+
// If the segment is compressed, we guarantee that it is immutable,
203+
// so use the file length as determined when opening the reader.
204+
// Seeking would be somewhat expensive in this case, as the zstd reader
205+
// translates to uncompressed offsets and thus must decompress at least
206+
// some frames.
207+
//
208+
// If the segment is not compressed, we may be reading the active
209+
// segment, so immutability is not guaranteed. Use the default seek
210+
// strategy thus.
211+
if self.inner.is_compressed() {
212+
Ok(self.len)
213+
} else {
214+
use io::Seek as _;
215+
216+
let old_pos = self.stream_position()?;
217+
let len = self.seek(io::SeekFrom::End(0))?;
218+
219+
// Avoid seeking a third time when we were already at the end of the
220+
// stream. The branch is usually way cheaper than a seek operation.
221+
if old_pos != len {
222+
self.seek(io::SeekFrom::Start(old_pos))?;
223+
}
224+
225+
Ok(len)
226+
}
227+
}
228+
}
199229

200230
impl Repo for Fs {
201231
type SegmentWriter = File;
202232
type SegmentReader = ReadOnlySegment;
203233

204-
fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
205-
File::options()
206-
.read(true)
207-
.append(true)
208-
.create_new(true)
209-
.open(self.segment_path(offset))
210-
.or_else(|e| {
211-
if e.kind() == io::ErrorKind::AlreadyExists {
212-
debug!("segment {offset} already exists");
213-
// If the segment is completely empty, we can resume writing.
214-
let file = self.open_segment_writer(offset)?;
215-
if file.metadata()?.len() == 0 {
216-
debug!("segment {offset} is empty");
217-
return Ok(file);
218-
}
219-
220-
// Otherwise, provide some context.
234+
fn create_segment(&self, offset: u64, header: segment::Header) -> io::Result<Self::SegmentWriter> {
235+
let path = self.segment_path(offset);
236+
237+
// We need to check if the segment already exists,
238+
// so use file locking to prevent a TOCTOU race.
239+
// Using `flock` means we don't need to worry about stale lockfiles.
240+
let lock_path = path.0.with_extension("lock");
241+
let _lock = scopeguard::guard(
242+
lockfile::advisory::LockedFile::lock(&lock_path)
243+
.map_err(|e| io::Error::new(e.source.kind(), format!("repo {}: {}: {}", self, e, e.source)))?,
244+
|lockfile| {
245+
if let Err(e) = lockfile.release(true) {
246+
// It's ok if removing the file fails, but print a warning
247+
// anyways.
248+
warn!("repo {}: failed to remove {}: {}", self, lock_path.display(), e);
249+
}
250+
},
251+
);
252+
253+
// Check whether the segment already exists.
254+
// Overwrite it if its length is zero.
255+
match fs::metadata(&path) {
256+
Ok(stat) => {
257+
if stat.len() > 0 {
221258
return Err(io::Error::new(
222259
io::ErrorKind::AlreadyExists,
223260
format!("repo {}: segment {} already exists and is non-empty", self, offset),
224261
));
225262
}
226-
227-
Err(e)
228-
})
229-
.inspect(|_| {
230-
// We're rotating commitlog segments, so we should also take a snapshot at the earliest opportunity.
231-
if let Some(on_new_segment) = self.on_new_segment.as_ref() {
232-
// No need to handle the error here: if the snapshot worker is closed we'll eventually close too,
233-
// and we don't want to die prematurely if there are still TXes to write.
234-
on_new_segment();
263+
}
264+
Err(e) => {
265+
if e.kind() != io::ErrorKind::NotFound {
266+
return Err(io::Error::new(
267+
e.kind(),
268+
format!(
269+
"repo {}: error getting file metadata for segment {}: {}",
270+
self, offset, e
271+
),
272+
));
235273
}
236-
})
274+
}
275+
}
276+
277+
// The segment file either does not exist, or is of length zero.
278+
// Write the header to a temporary file and atomically move it into place.
279+
let mut tmp = tempfile::Builder::new().make_in(&self.root.0, |tmp_path| {
280+
File::options().read(true).append(true).create_new(true).open(tmp_path)
281+
})?;
282+
header.write(&mut tmp)?;
283+
tmp.as_file_mut().sync_all()?;
284+
let segment = tmp.persist(path)?;
285+
286+
// Notify subscribers.
287+
if let Some(on_new_segment) = self.on_new_segment.as_ref() {
288+
on_new_segment();
289+
}
290+
291+
Ok(segment)
237292
}
238293

239294
fn open_segment_writer(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
@@ -248,7 +303,8 @@ impl Repo for Fs {
248303
let path = self.segment_path(offset);
249304
debug!("fs: open segment at {}", path.display());
250305
let file = File::open(&path)?;
251-
CompressReader::new(file).map(|inner| ReadOnlySegment { inner })
306+
let len = file.metadata()?.len();
307+
CompressReader::new(file).map(|inner| ReadOnlySegment { inner, len })
252308
}
253309

254310
fn remove_segment(&self, offset: u64) -> io::Result<()> {
@@ -315,8 +371,6 @@ impl Repo for Fs {
315371
}
316372
}
317373

318-
impl SegmentLen for CompressReader {}
319-
320374
#[cfg(feature = "streaming")]
321375
impl crate::stream::AsyncRepo for Fs {
322376
type AsyncSegmentWriter = tokio::io::BufWriter<tokio::fs::File>;

crates/commitlog/src/repo/mem.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -54,26 +54,28 @@ impl Repo for Memory {
5454
type SegmentWriter = Segment;
5555
type SegmentReader = ReadOnlySegment;
5656

57-
fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
57+
fn create_segment(&self, offset: u64, header: crate::segment::Header) -> io::Result<Self::SegmentWriter> {
5858
let mut inner = self.segments.write().unwrap();
59-
match inner.entry(offset) {
59+
let mut segment = match inner.entry(offset) {
6060
btree_map::Entry::Occupied(entry) => {
6161
let entry = entry.get();
62-
let read_guard = entry.read().unwrap();
63-
if read_guard.is_empty() {
64-
Ok(Segment::from_shared(self.space.clone(), entry.clone()))
62+
if entry.read().unwrap().is_empty() {
63+
Segment::from_shared(self.space.clone(), entry.clone())
6564
} else {
66-
Err(io::Error::new(
65+
return Err(io::Error::new(
6766
io::ErrorKind::AlreadyExists,
6867
format!("segment {offset} already exists"),
69-
))
68+
));
7069
}
7170
}
7271
btree_map::Entry::Vacant(entry) => {
73-
let segment = entry.insert(Arc::new(RwLock::new(Storage::new())));
74-
Ok(Segment::from_shared(self.space.clone(), segment.clone()))
72+
let storage = entry.insert(Arc::new(RwLock::new(Storage::new())));
73+
Segment::from_shared(self.space.clone(), storage.clone())
7574
}
76-
}
75+
};
76+
header.write(&mut segment)?;
77+
78+
Ok(segment)
7779
}
7880

7981
fn open_segment_writer(&self, offset: u64) -> io::Result<Self::SegmentWriter> {

0 commit comments

Comments
 (0)