Skip to content

Commit 76a52ca

Browse files
authored
Use Offset Index on Meta extract (#2549)
1 parent c39f7fa commit 76a52ca

7 files changed

Lines changed: 218 additions & 59 deletions

File tree

crates/commitlog/src/commitlog.rs

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{io, marker::PhantomData, mem, ops::Range, vec};
1+
use std::{fmt::Debug, io, marker::PhantomData, mem, ops::Range, vec};
22

33
use itertools::Itertools;
44
use log::{debug, info, trace, warn};
@@ -333,7 +333,8 @@ pub fn committed_meta(repo: impl Repo) -> Result<Option<segment::Metadata>, erro
333333
};
334334

335335
let mut storage = repo.open_segment_reader(last)?;
336-
segment::Metadata::extract(last, &mut storage).map(Some)
336+
let offset_index = repo.get_offset_index(last).ok();
337+
segment::Metadata::extract(last, &mut storage, offset_index.as_ref()).map(Some)
337338
}
338339

339340
pub fn commits_from<R: Repo>(repo: R, max_log_format_version: u8, offset: u64) -> io::Result<Commits<R>> {
@@ -464,29 +465,61 @@ fn reset_to_internal(repo: &impl Repo, segments: &[u64], offset: u64) -> io::Res
464465
repo.remove_segment(segment)?;
465466
} else {
466467
// Read commit-wise until we find the byte offset.
467-
let reader = repo::open_segment_reader(repo, DEFAULT_LOG_FORMAT_VERSION, segment)?;
468+
let mut reader = repo::open_segment_reader(repo, DEFAULT_LOG_FORMAT_VERSION, segment)?;
469+
470+
let (index_file, mut byte_offset) = repo
471+
.get_offset_index(segment)
472+
.and_then(|index_file| {
473+
let (key, byte_offset) = index_file.key_lookup(offset).map_err(|e| {
474+
io::Error::new(io::ErrorKind::NotFound, format!("Offset index cannot be used: {e:?}"))
475+
})?;
476+
477+
reader.seek_to_offset(&index_file, key).map_err(|e| {
478+
io::Error::new(
479+
io::ErrorKind::InvalidData,
480+
format!("Offset index is not used at offset {key}: {e}"),
481+
)
482+
})?;
483+
484+
Ok((Some(index_file), byte_offset))
485+
})
486+
.inspect_err(|e| {
487+
warn!("commitlog offset index is not used: {e:?}");
488+
})
489+
.unwrap_or((None, segment::Header::LEN as u64));
490+
468491
let commits = reader.commits();
469492

470-
let mut bytes_read = 0;
471493
for commit in commits {
472494
let commit = commit?;
473495
if commit.min_tx_offset > offset {
474496
break;
475497
}
476-
bytes_read += Commit::from(commit).encoded_len() as u64;
498+
byte_offset += Commit::from(commit).encoded_len() as u64;
477499
}
478500

479-
if bytes_read == 0 {
501+
if byte_offset == segment::Header::LEN as u64 {
480502
// Segment is empty, just remove it.
481503
repo.remove_segment(segment)?;
482504
} else {
483-
let byte_offset = segment::Header::LEN as u64 + bytes_read;
484505
debug!("truncating segment {segment} to {offset} at {byte_offset}");
485506
let mut file = repo.open_segment_writer(segment)?;
486-
// Note: The offset index truncates equal or greater,
487-
// inclusive. We'd like to retain `offset` in the index, as
488-
// the commit is also retained in the log.
489-
file.ftruncate(offset + 1, byte_offset)?;
507+
508+
if let Some(mut index_file) = index_file {
509+
let index_file = index_file.as_mut();
510+
// Note: The offset index truncates equal or greater,
511+
// inclusive. We'd like to retain `offset` in the index, as
512+
// the commit is also retained in the log.
513+
index_file.ftruncate(offset + 1, byte_offset).map_err(|e| {
514+
io::Error::new(
515+
io::ErrorKind::InvalidData,
516+
format!("Failed to truncate offset index: {e}"),
517+
)
518+
})?;
519+
index_file.async_flush()?;
520+
}
521+
522+
file.ftruncate(offset, byte_offset)?;
490523
// Some filesystems require fsync after ftruncate.
491524
file.fsync()?;
492525
break;

crates/commitlog/src/index/indexfile.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,9 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
159159
/// - `IndexError::OutOfMemory`: Append after index file is already full.
160160
pub fn append(&mut self, key: Key, value: u64) -> Result<(), IndexError> {
161161
let key = key.into();
162-
if self.last_key()? >= key {
163-
return Err(IndexError::InvalidInput);
162+
let last_key = self.last_key()?;
163+
if last_key >= key {
164+
return Err(IndexError::InvalidInput(last_key, key));
164165
}
165166

166167
let start = self.num_entries * ENTRY_SIZE;
@@ -186,7 +187,7 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
186187
}
187188

188189
/// Truncates the index file starting from the entry with a key greater than or equal to the given key.
189-
pub fn truncate(&mut self, key: Key) -> Result<(), IndexError> {
190+
pub(crate) fn truncate(&mut self, key: Key) -> Result<(), IndexError> {
190191
let key = key.into();
191192
let (found_key, index) = self.find_index(Key::from(key))?;
192193

crates/commitlog/src/index/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ pub enum IndexError {
1616
#[error("Asked key is smaller than the first entry in the index")]
1717
KeyNotFound,
1818

19-
#[error("Key should be monotnously increasing")]
20-
InvalidInput,
19+
#[error("Key should be monotonically increasing: input: {1}, last: {0}")]
20+
InvalidInput(u64, u64),
2121

2222
#[error("index file is not readable")]
2323
InvalidFormat,

crates/commitlog/src/repo/mod.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,20 @@ impl<T: Repo> Repo for &T {
147147
fn existing_offsets(&self) -> io::Result<Vec<u64>> {
148148
T::existing_offsets(self)
149149
}
150+
151+
fn create_offset_index(&self, offset: TxOffset, cap: u64) -> io::Result<TxOffsetIndexMut> {
152+
T::create_offset_index(self, offset, cap)
153+
}
154+
155+
/// Remove [`TxOffsetIndexMut`] named with `offset`.
156+
fn remove_offset_index(&self, offset: TxOffset) -> io::Result<()> {
157+
T::remove_offset_index(self, offset)
158+
}
159+
160+
/// Get [`TxOffsetIndex`] for the given `offset`.
161+
fn get_offset_index(&self, offset: TxOffset) -> io::Result<TxOffsetIndex> {
162+
T::get_offset_index(self, offset)
163+
}
150164
}
151165

152166
impl<T: SegmentLen> SegmentLen for io::BufReader<T> {
@@ -223,12 +237,13 @@ pub fn resume_segment_writer<R: Repo>(
223237
offset: u64,
224238
) -> io::Result<Result<Writer<R::SegmentWriter>, Metadata>> {
225239
let mut storage = repo.open_segment_writer(offset)?;
240+
let offset_index = repo.get_offset_index(offset).ok();
226241
let Metadata {
227242
header,
228243
tx_range,
229244
size_in_bytes,
230245
max_epoch,
231-
} = match Metadata::extract(offset, &mut storage) {
246+
} = match Metadata::extract(offset, &mut storage, offset_index.as_ref()) {
232247
Err(error::SegmentMetadata::InvalidCommit { sofar, source }) => {
233248
warn!("invalid commit in segment {offset}: {source}");
234249
debug!("sofar={sofar:?}");

crates/commitlog/src/segment.rs

Lines changed: 119 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use log::{debug, warn};
1010
use crate::{
1111
commit::{self, Commit, StoredCommit},
1212
error,
13-
index::IndexError,
13+
index::{IndexError, IndexFileMut},
1414
payload::Encode,
1515
repo::{TxOffset, TxOffsetIndex, TxOffsetIndexMut},
1616
Options,
@@ -332,11 +332,31 @@ impl FileLike for OffsetIndexWriter {
332332

333333
fn ftruncate(&mut self, tx_offset: u64, _size: u64) -> io::Result<()> {
334334
self.reset();
335-
let _ = self.head.truncate(tx_offset);
335+
self.head
336+
.truncate(tx_offset)
337+
.inspect_err(|e| {
338+
warn!("failed to truncate offset index at {tx_offset}: {e:?}");
339+
})
340+
.ok();
336341
Ok(())
337342
}
338343
}
339344

345+
impl FileLike for IndexFileMut<TxOffset> {
346+
fn fsync(&mut self) -> io::Result<()> {
347+
self.async_flush()
348+
}
349+
350+
fn ftruncate(&mut self, tx_offset: u64, _size: u64) -> io::Result<()> {
351+
self.truncate(tx_offset).map_err(|e| {
352+
io::Error::new(
353+
ErrorKind::Other,
354+
format!("failed to truncate offset index at {tx_offset}: {e:?}"),
355+
)
356+
})
357+
}
358+
}
359+
340360
#[derive(Debug)]
341361
pub struct Reader<R> {
342362
pub header: Header,
@@ -393,7 +413,7 @@ impl<R: io::BufRead + io::Seek> Reader<R> {
393413

394414
#[cfg(test)]
395415
pub(crate) fn metadata(self) -> Result<Metadata, error::SegmentMetadata> {
396-
Metadata::with_header(self.min_tx_offset, self.header, self.inner)
416+
Metadata::with_header(self.min_tx_offset, self.header, self.inner, None)
397417
}
398418
}
399419

@@ -513,30 +533,38 @@ pub struct Metadata {
513533
}
514534

515535
impl Metadata {
516-
/// Read and validate metadata from a segment.
536+
/// Reads and validates metadata from a segment.
537+
/// It will look for last commit index offset and then traverse the segment
517538
///
518-
/// This traverses the entire segment, consuming thre `reader.
519-
/// Doing so is necessary to determine `max_tx_offset`, `size_in_bytes` and
520-
/// `max_epoch`.
521-
pub(crate) fn extract<R: io::Read>(min_tx_offset: u64, mut reader: R) -> Result<Self, error::SegmentMetadata> {
539+
/// Determines `max_tx_offset`, `size_in_bytes`, and `max_epoch` from the segment.
540+
pub(crate) fn extract<R: io::Read + io::Seek>(
541+
min_tx_offset: TxOffset,
542+
mut reader: R,
543+
offset_index: Option<&TxOffsetIndex>,
544+
) -> Result<Self, error::SegmentMetadata> {
522545
let header = Header::decode(&mut reader)?;
523-
Self::with_header(min_tx_offset, header, reader)
546+
Self::with_header(min_tx_offset, header, reader, offset_index)
524547
}
525548

526-
fn with_header<R: io::Read>(
549+
fn with_header<R: io::Read + io::Seek>(
527550
min_tx_offset: u64,
528551
header: Header,
529552
mut reader: R,
553+
offset_index: Option<&TxOffsetIndex>,
530554
) -> Result<Self, error::SegmentMetadata> {
531-
let mut sofar = Self {
532-
header,
533-
tx_range: Range {
534-
start: min_tx_offset,
535-
end: min_tx_offset,
536-
},
537-
size_in_bytes: Header::LEN as u64,
538-
max_epoch: Commit::DEFAULT_EPOCH,
539-
};
555+
let mut sofar = offset_index
556+
.and_then(|index| Self::find_valid_indexed_commit(min_tx_offset, header, &mut reader, index).ok())
557+
.unwrap_or_else(|| Self {
558+
header,
559+
tx_range: Range {
560+
start: min_tx_offset,
561+
end: min_tx_offset,
562+
},
563+
size_in_bytes: Header::LEN as u64,
564+
max_epoch: u64::default(),
565+
});
566+
567+
reader.seek(SeekFrom::Start(sofar.size_in_bytes))?;
540568

541569
fn commit_meta<R: io::Read>(
542570
reader: &mut R,
@@ -573,6 +601,78 @@ impl Metadata {
573601

574602
Ok(sofar)
575603
}
604+
605+
/// Finds the last valid commit in the segment using the offset index.
606+
/// It traverses the index in reverse order, starting from the last key.
607+
///
608+
/// Returns
609+
/// * `Ok((Metadata)` - If a valid commit is found containing the commit, It adds a default
610+
/// header, which should be replaced with the actual header.
611+
/// * `Err` - If no valid commit is found or if the index is empty
612+
fn find_valid_indexed_commit<R: io::Read + io::Seek>(
613+
min_tx_offset: u64,
614+
header: Header,
615+
reader: &mut R,
616+
offset_index: &TxOffsetIndex,
617+
) -> io::Result<Metadata> {
618+
let mut candidate_last_key = TxOffset::MAX;
619+
620+
while let Ok((key, byte_offset)) = offset_index.key_lookup(candidate_last_key) {
621+
match Self::validate_commit_at_offset(reader, key, byte_offset) {
622+
Ok(commit) => {
623+
return Ok(Metadata {
624+
header,
625+
tx_range: Range {
626+
start: min_tx_offset,
627+
end: commit.tx_range.end,
628+
},
629+
size_in_bytes: byte_offset + commit.size_in_bytes,
630+
max_epoch: commit.epoch,
631+
});
632+
}
633+
634+
// `TxOffset` at `byte_offset` is not valid, so try with previous entry
635+
Err(_) => {
636+
candidate_last_key = key.saturating_sub(1);
637+
if candidate_last_key == 0 {
638+
break;
639+
}
640+
}
641+
}
642+
}
643+
644+
Err(io::Error::new(
645+
ErrorKind::InvalidData,
646+
format!("No valid commit found in index up to key: {}", candidate_last_key),
647+
))
648+
}
649+
650+
/// Validates and decodes a commit at `byte_offset` in the segment.
651+
///
652+
/// # Returns
653+
/// * `Ok(commit::Metadata)` - If a valid commit is found with matching transaction offset
654+
/// * `Err` - If commit can't be decoded or has mismatched transaction offset
655+
fn validate_commit_at_offset<R: io::Read + io::Seek>(
656+
reader: &mut R,
657+
tx_offset: TxOffset,
658+
byte_offset: u64,
659+
) -> io::Result<commit::Metadata> {
660+
reader.seek(SeekFrom::Start(byte_offset))?;
661+
let commit = commit::Metadata::extract(reader)?
662+
.ok_or_else(|| io::Error::new(ErrorKind::InvalidData, "failed to decode commit"))?;
663+
664+
if commit.tx_range.start != tx_offset {
665+
return Err(io::Error::new(
666+
ErrorKind::InvalidData,
667+
format!(
668+
"mismatch key in index offset file: expected={} actual={}",
669+
tx_offset, commit.tx_range.start
670+
),
671+
));
672+
}
673+
674+
Ok(commit)
675+
}
576676
}
577677

578678
#[cfg(test)]

crates/commitlog/src/stream/reader.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1-
use std::ops::RangeBounds;
1+
use std::{io::SeekFrom, ops::RangeBounds};
22

33
use async_stream::try_stream;
44
use bytes::{Buf as _, Bytes};
55
use futures::Stream;
6-
use log::{debug, info, trace, warn};
7-
use tokio::io::{self, AsyncBufRead, AsyncReadExt as _, AsyncSeek, AsyncSeekExt as _, SeekFrom};
8-
use tokio::task::spawn_blocking;
6+
use log::{debug, trace, warn};
7+
use tokio::{
8+
io::{self, AsyncBufRead, AsyncReadExt as _, AsyncSeek, AsyncSeekExt as _},
9+
task::spawn_blocking,
10+
};
911
use tokio_util::io::SyncIoBridge;
1012

1113
use crate::{
@@ -64,7 +66,7 @@ fn read_segment(
6466
range: RangeFromMaybeToInclusive,
6567
) -> impl Stream<Item = io::Result<Bytes>> {
6668
try_stream! {
67-
info!("reading segment {segment_start}");
69+
trace!("reading segment {segment_start}");
6870
let (segment_header, segment_header_bytes) = {
6971
let mut buf = [0u8; segment::Header::LEN];
7072
segment.read_exact(&mut buf).await?;

0 commit comments

Comments
 (0)