Skip to content

Commit 8aa22da

Browse files
authored
commitlog: Improve committed_meta (#4338)
- Extends `commit::Metadata` to include the checksum - Extends `segment::Metadata` to include `Some(commit::Metadata)` containing the last commit in the segment (if there is one) - Changes `committed_meta` to: - ignore empty segments at the end of the log - try harder to provide useful metadata, even if only a prefix of the latest segment is readable This is allows to eliminate remaining `Commitlog::open` calls with the purpose of querying the latest commit (offset). `Commitlog::open` creates an empty segment if the tail of the log is corrupt, which is a non-obvious side-effect that can be confusing when debugging. It also allows to eliminate uses where the `commits_from` iterator is used to find the latest full commit. The `Commits` iterator requires the caller to handle the case of a corrupted commit at the end of the log, by advancing the iterator once more after it has yielded an error in order to check that it is exhausted, and then deciding whether to ignore the error. This is easy to forget. `committed_meta` now just does the right thing, preserving information about tail corruption for when that's useful.
1 parent c14c8d1 commit 8aa22da

5 files changed

Lines changed: 189 additions & 30 deletions

File tree

crates/commitlog/src/commit.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -342,30 +342,39 @@ impl StoredCommit {
342342
}
343343
}
344344

345-
/// Numbers needed to compute [`crate::segment::Header`].
345+
/// A [`StoredCommit`] sans the records payload.
346346
#[derive(Clone, Debug, Eq, PartialEq)]
347347
pub struct Metadata {
348348
pub tx_range: Range<u64>,
349349
pub size_in_bytes: u64,
350350
pub epoch: u64,
351+
pub checksum: u32,
351352
}
352353

353354
impl Metadata {
354-
/// Extract the [`Metadata`] of a single [`Commit`] from the given reader.
355+
/// Extract the [`Metadata`] of a single [`StoredCommit`] from the given
356+
/// reader.
355357
///
356358
/// Note that this decodes the commit due to checksum verification.
357-
/// Like [`Commit::decode`], returns `None` if the reader is at EOF already.
359+
/// Like [`StoredCommit::decode`], this method returns `None` if the reader
360+
/// is at EOF already.
358361
pub fn extract<R: io::Read>(reader: R) -> io::Result<Option<Self>> {
359-
Commit::decode(reader).map(|maybe_commit| maybe_commit.map(Self::from))
362+
StoredCommit::decode(reader).map(|maybe_commit| maybe_commit.map(Self::from))
360363
}
361364
}
362365

363-
impl From<Commit> for Metadata {
364-
fn from(commit: Commit) -> Self {
366+
impl From<StoredCommit> for Metadata {
367+
fn from(commit: StoredCommit) -> Self {
368+
let tx_range = commit.tx_range();
369+
let epoch = commit.epoch;
370+
let checksum = commit.checksum;
371+
let size_in_bytes = Commit::from(commit).encoded_len() as u64;
372+
365373
Self {
366-
tx_range: commit.tx_range(),
367-
size_in_bytes: commit.encoded_len() as u64,
368-
epoch: commit.epoch,
374+
tx_range,
375+
size_in_bytes,
376+
epoch,
377+
checksum,
369378
}
370379
}
371380
}

crates/commitlog/src/commitlog.rs

Lines changed: 140 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::{
1515
error::{self, source_chain},
1616
index::IndexError,
1717
payload::Decoder,
18-
repo::{self, Repo, TxOffsetIndex},
18+
repo::{self, Repo, SegmentLen as _, TxOffsetIndex},
1919
segment::{self, FileLike, Transaction, Writer},
2020
Commit, Encode, Options, DEFAULT_LOG_FORMAT_VERSION,
2121
};
@@ -355,7 +355,142 @@ impl<R: Repo, T> Drop for Generic<R, T> {
355355
}
356356
}
357357

358-
/// Extract the most recently written [`segment::Metadata`] from the commitlog
358+
/// The most recent non empty segment in repo `R`.
359+
///
360+
/// Created by [open_newest_non_empty_segment].
361+
struct MostRecentNonEmptySegment<R> {
362+
/// Number of empty segments that were ignored.
363+
empty_segments: usize,
364+
/// Offset of the non-empty segment.
365+
segment_offset: u64,
366+
/// [Repo::SegmentReader] for the non-empty segment.
367+
segment_reader: R,
368+
}
369+
370+
/// Open the most recent segment in `repo` that is larger than
371+
/// [segment::Header::LEN].
372+
///
373+
/// Note that there should be at most one empty segment in the log. We may,
374+
/// however, want to be lenient on this read-only path, so the number of
375+
/// empty segments is tracked in the returned type rather than returning an
376+
/// error.
377+
fn open_newest_non_empty_segment<R: Repo>(repo: R) -> io::Result<Option<MostRecentNonEmptySegment<R::SegmentReader>>> {
378+
let mut segments = repo.existing_offsets()?;
379+
380+
let mut empty_segments = 0;
381+
let mut segment_offset;
382+
let mut segment_reader;
383+
loop {
384+
let Some(last) = segments.pop() else {
385+
return Ok(None);
386+
};
387+
segment_offset = last;
388+
segment_reader = repo.open_segment_reader(segment_offset)?;
389+
if segment_reader.segment_len()? > segment::Header::LEN as u64 {
390+
break;
391+
} else {
392+
empty_segments += 1;
393+
}
394+
}
395+
396+
Ok(Some(MostRecentNonEmptySegment {
397+
empty_segments,
398+
segment_offset,
399+
segment_reader,
400+
}))
401+
}
402+
403+
/// The most recently written [segment::Metadata] for a given [Repo].
404+
///
405+
/// The type preserves the error information in case the most recent segment
406+
/// contains corrupted data at the end (typically due to a torn write).
407+
///
408+
/// Created by [committed_meta].
409+
pub enum CommittedMeta {
410+
/// The most recent segment could not be traversed successfully until the
411+
/// end, i.e. there is trailing garbage in the segment.
412+
///
413+
/// This variant is also returned in case [open_newest_non_empty_segment]
414+
/// finds more than a single empty segment at the end of the log.
415+
Prefix {
416+
/// The metadata of the prefix that could be traversed successfully.
417+
///
418+
/// It is guaranteed that the metadata spans at least one commit.
419+
metadata: segment::Metadata,
420+
/// The error encountered.
421+
error: io::Error,
422+
},
423+
/// The most recent segment could be traversed successfully until the end.
424+
Complete {
425+
/// The segment metadata.
426+
///
427+
/// It is guaranteed that the metadata spans at least one commit.
428+
metadata: segment::Metadata,
429+
},
430+
}
431+
432+
impl CommittedMeta {
433+
pub fn metadata(&self) -> &segment::Metadata {
434+
let (Self::Prefix { metadata, .. } | Self::Complete { metadata }) = self;
435+
metadata
436+
}
437+
438+
fn extract(repo: impl Repo) -> io::Result<Option<Self>> {
439+
let Some(MostRecentNonEmptySegment {
440+
empty_segments,
441+
segment_offset,
442+
mut segment_reader,
443+
}) = open_newest_non_empty_segment(&repo)?
444+
else {
445+
return Ok(None);
446+
};
447+
let offset_index = repo.get_offset_index(segment_offset).ok();
448+
match segment::Metadata::extract(segment_offset, &mut segment_reader, offset_index.as_ref()) {
449+
// Segment is intact.
450+
Ok(metadata) if empty_segments <= 1 => {
451+
assert!(
452+
!metadata.tx_range.is_empty(),
453+
"segment was promised to be non-empty but contains zero transactions"
454+
);
455+
Ok(Some(CommittedMeta::Complete { metadata }))
456+
}
457+
// Segment is good, but there are too many empty segments.
458+
Ok(metadata) => Ok(Some(CommittedMeta::Prefix {
459+
metadata,
460+
error: io::Error::new(
461+
io::ErrorKind::InvalidData,
462+
format!("repo {}: too many empty segments: {}", repo, empty_segments),
463+
),
464+
})),
465+
// Segment is non-empty, but first commit is corrupt.
466+
Err(error::SegmentMetadata::InvalidCommit { sofar, source }) if sofar.tx_range.is_empty() => {
467+
Err(io::Error::new(
468+
io::ErrorKind::InvalidData,
469+
format!(
470+
"repo {}: first commit in the most recent segment is corrupt: {}",
471+
repo, source
472+
),
473+
))
474+
}
475+
// Some prefix of the segment is good.
476+
Err(error::SegmentMetadata::InvalidCommit { sofar, source }) => Ok(Some(CommittedMeta::Prefix {
477+
metadata: sofar,
478+
error: source,
479+
})),
480+
// Something went wrong, including out-of-order errors and such.
481+
Err(error::SegmentMetadata::Io(e)) => Err(e),
482+
}
483+
}
484+
}
485+
486+
impl From<CommittedMeta> for segment::Metadata {
487+
fn from(meta: CommittedMeta) -> Self {
488+
let (CommittedMeta::Prefix { metadata, .. } | CommittedMeta::Complete { metadata }) = meta;
489+
metadata
490+
}
491+
}
492+
493+
/// Extract the most recently written [CommittedMeta] from the commitlog
359494
/// in `repo`.
360495
///
361496
/// Returns `None` if the commitlog is empty.
@@ -373,18 +508,12 @@ impl<R: Repo, T> Drop for Generic<R, T> {
373508
/// like so:
374509
///
375510
/// ```ignore
376-
/// let max_offset = committed_meta(..)?.map(|meta| meta.tx_range.end);
511+
/// let max_offset = committed_meta(..)?.map(|meta| meta.metadata().tx_range.end);
377512
/// ```
378513
///
379514
/// Unlike `open`, no segment will be created in an empty `repo`.
380-
pub fn committed_meta(repo: impl Repo) -> Result<Option<segment::Metadata>, error::SegmentMetadata> {
381-
let Some(last) = repo.existing_offsets()?.pop() else {
382-
return Ok(None);
383-
};
384-
385-
let mut storage = repo.open_segment_reader(last)?;
386-
let offset_index = repo.get_offset_index(last).ok();
387-
segment::Metadata::extract(last, &mut storage, offset_index.as_ref()).map(Some)
515+
pub fn committed_meta(repo: impl Repo) -> io::Result<Option<CommittedMeta>> {
516+
CommittedMeta::extract(repo)
388517
}
389518

390519
pub fn commits_from<R: Repo>(repo: R, max_log_format_version: u8, offset: u64) -> io::Result<Commits<R>> {

crates/commitlog/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ mod varint;
1919

2020
pub use crate::{
2121
commit::{Commit, StoredCommit},
22+
commitlog::CommittedMeta,
2223
payload::{Decoder, Encode},
2324
repo::fs::SizeOnDisk,
2425
segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION},
@@ -558,7 +559,7 @@ impl<T: Encode> Commitlog<T> {
558559
/// ```
559560
///
560561
/// Unlike `open`, no segment will be created in an empty `repo`.
561-
pub fn committed_meta(root: CommitLogDir) -> Result<Option<segment::Metadata>, error::SegmentMetadata> {
562+
pub fn committed_meta(root: CommitLogDir) -> io::Result<Option<CommittedMeta>> {
562563
commitlog::committed_meta(repo::Fs::new(root, None)?)
563564
}
564565

crates/commitlog/src/repo/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ pub fn resume_segment_writer<R: Repo>(
258258
size_in_bytes,
259259
max_epoch,
260260
max_commit_offset: _,
261+
max_commit: _,
261262
} = match Metadata::extract(offset, &mut storage, offset_index.as_ref()) {
262263
Err(error::SegmentMetadata::InvalidCommit { sofar, source }) => {
263264
warn!("invalid commit in segment {offset}: {source}");

crates/commitlog/src/segment.rs

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,7 @@ pub struct Metadata {
594594
/// `max_commit_offset..tx_range.end` is the range of
595595
/// transactions contained in it.
596596
pub max_commit_offset: u64,
597+
pub max_commit: Option<commit::Metadata>,
597598
}
598599

599600
impl Metadata {
@@ -627,6 +628,7 @@ impl Metadata {
627628
size_in_bytes: Header::LEN as u64,
628629
max_epoch: u64::default(),
629630
max_commit_offset: min_tx_offset,
631+
max_commit: None,
630632
});
631633

632634
reader.seek(SeekFrom::Start(sofar.size_in_bytes))?;
@@ -663,6 +665,7 @@ impl Metadata {
663665
// TODO: Should it be an error to encounter an epoch going backwards?
664666
sofar.max_epoch = commit.epoch.max(sofar.max_epoch);
665667
sofar.max_commit_offset = commit.tx_range.start;
668+
sofar.max_commit = Some(commit);
666669
}
667670

668671
Ok(sofar)
@@ -695,6 +698,7 @@ impl Metadata {
695698
size_in_bytes: byte_offset + commit.size_in_bytes,
696699
max_epoch: commit.epoch,
697700
max_commit_offset: commit.tx_range.start,
701+
max_commit: Some(commit),
698702
});
699703
}
700704

@@ -833,18 +837,33 @@ mod tests {
833837
writer.commit().unwrap();
834838

835839
let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
836-
let metadata = reader.metadata().unwrap();
840+
let Metadata {
841+
header,
842+
tx_range,
843+
size_in_bytes,
844+
max_epoch,
845+
max_commit_offset,
846+
max_commit,
847+
} = reader.metadata().unwrap();
837848

838849
assert_eq!(
839-
metadata,
840-
Metadata {
841-
header: Header::default(),
842-
tx_range: Range { start: 0, end: 5 },
850+
(
851+
header,
852+
tx_range,
853+
size_in_bytes,
854+
max_epoch,
855+
max_commit_offset,
856+
max_commit.is_some_and(|meta| meta.tx_range == (3..5))
857+
),
858+
(
859+
Header::default(),
860+
0..5,
843861
// header + 5 txs + 3 commits
844-
size_in_bytes: (Header::LEN + (5 * 32) + (3 * Commit::FRAMING_LEN)) as u64,
845-
max_epoch: Commit::DEFAULT_EPOCH,
846-
max_commit_offset: 3
847-
}
862+
(Header::LEN + (5 * 32) + (3 * Commit::FRAMING_LEN)) as u64,
863+
Commit::DEFAULT_EPOCH,
864+
3,
865+
true
866+
)
848867
);
849868
}
850869

0 commit comments

Comments
 (0)