Skip to content

Commit 772a646

Browse files
authored
Merge branch 'master' into kim/smoketest/tear-down-dc-up
2 parents aec43e1 + a6bc0e5 commit 772a646

4 files changed

Lines changed: 96 additions & 53 deletions

File tree

crates/commitlog/src/commitlog.rs

Lines changed: 48 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ use log::{debug, info, trace, warn};
55

66
use crate::{
77
commit::StoredCommit,
8-
error,
8+
error::{self, source_chain},
9+
index::IndexError,
910
payload::Decoder,
10-
repo::{self, Repo},
11+
repo::{self, Repo, TxOffsetIndex},
1112
segment::{self, FileLike, Transaction, Writer},
1213
Commit, Encode, Options, DEFAULT_LOG_FORMAT_VERSION,
1314
};
@@ -476,25 +477,8 @@ fn reset_to_internal(repo: &impl Repo, segments: &[u64], offset: u64) -> io::Res
476477
// Read commit-wise until we find the byte offset.
477478
let mut reader = repo::open_segment_reader(repo, DEFAULT_LOG_FORMAT_VERSION, segment)?;
478479

479-
let (index_file, mut byte_offset) = repo
480-
.get_offset_index(segment)
481-
.and_then(|index_file| {
482-
let (key, byte_offset) = index_file.key_lookup(offset).map_err(|e| {
483-
io::Error::new(io::ErrorKind::NotFound, format!("Offset index cannot be used: {e:?}"))
484-
})?;
485-
486-
reader.seek_to_offset(&index_file, key).map_err(|e| {
487-
io::Error::new(
488-
io::ErrorKind::InvalidData,
489-
format!("Offset index is not used at offset {key}: {e}"),
490-
)
491-
})?;
492-
493-
Ok((Some(index_file), byte_offset))
494-
})
495-
.inspect_err(|e| {
496-
warn!("commitlog offset index is not used: {e:?}");
497-
})
480+
let (index_file, mut byte_offset) = try_seek_using_offset_index(repo, &mut reader, offset)
481+
.map(|(index_file, byte_offset)| (Some(index_file), byte_offset))
498482
.unwrap_or((None, segment::Header::LEN as u64));
499483

500484
let commits = reader.commits();
@@ -732,18 +716,7 @@ impl<R: Repo> Commits<R> {
732716
/// index to advance the segment reader.
733717
fn try_seek_to_initial_offset(&self, segment: &mut segment::Reader<R::SegmentReader>) {
734718
if let CommitInfo::Initial { next_offset } = &self.last_commit {
735-
let _ = self
736-
.segments
737-
.repo
738-
.get_offset_index(segment.min_tx_offset)
739-
.map_err(Into::into)
740-
.and_then(|index_file| segment.seek_to_offset(&index_file, *next_offset))
741-
.inspect_err(|e| {
742-
warn!(
743-
"commitlog offset index is not used at segment {}: {}",
744-
segment.min_tx_offset, e
745-
);
746-
});
719+
try_seek_using_offset_index(&self.segments.repo, segment, *next_offset);
747720
}
748721
}
749722
}
@@ -794,6 +767,48 @@ impl<R: Repo> Iterator for CommitsWithVersion<R> {
794767
}
795768
}
796769

770+
/// Try to advance `reader` to `offset` using the offset index.
771+
///
772+
/// If successful, returns the offset index and the byte position of `reader`.
773+
/// `None` if the position of `reader` is unchanged.
774+
fn try_seek_using_offset_index<R: Repo>(
775+
repo: &R,
776+
reader: &mut segment::Reader<R::SegmentReader>,
777+
offset: u64,
778+
) -> Option<(TxOffsetIndex, u64)> {
779+
let segment_offset = reader.min_tx_offset;
780+
let index = repo
781+
.get_offset_index(segment_offset)
782+
.inspect_err(|e| {
783+
if e.kind() == io::ErrorKind::NotFound {
784+
debug!("offset index does not exist segment={segment_offset}");
785+
} else {
786+
warn!(
787+
"error opening offset index segment={segment_offset}: {e} {}",
788+
source_chain(&e)
789+
);
790+
}
791+
})
792+
.ok()?;
793+
794+
reader
795+
.seek_to_offset(&index, offset)
796+
.inspect_err(|e| match e {
797+
// Can happen if the segment is empty or small, so don't spam the logs.
798+
IndexError::KeyNotFound => {
799+
debug!("offset not found segment={segment_offset} offset={offset}");
800+
}
801+
e => {
802+
warn!(
803+
"error reading index segment={segment_offset} offset={offset}: {e} {}",
804+
source_chain(&e)
805+
);
806+
}
807+
})
808+
.ok()
809+
.map(|pos| (index, pos))
810+
}
811+
797812
#[cfg(test)]
798813
mod tests {
799814
use std::{cell::Cell, iter::repeat};

crates/commitlog/src/error.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,17 @@ pub enum SegmentMetadata {
7272
#[error(transparent)]
7373
Io(#[from] io::Error),
7474
}
75+
76+
/// Recursively concatenate `e.source()`, separated by ": ".
77+
pub(crate) fn source_chain(e: &impl std::error::Error) -> String {
78+
let mut s = String::new();
79+
let mut source = e.source();
80+
while let Some(cause) = source {
81+
s.push(':');
82+
s.push(' ');
83+
s.push_str(&cause.to_string());
84+
source = cause.source()
85+
}
86+
87+
s
88+
}

crates/commitlog/src/segment.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ impl<R: io::BufRead + io::Seek> Reader<R> {
387387
}
388388
}
389389

390-
pub fn seek_to_offset(&mut self, index_file: &TxOffsetIndex, start_tx_offset: u64) -> Result<(), IndexError> {
390+
pub fn seek_to_offset(&mut self, index_file: &TxOffsetIndex, start_tx_offset: u64) -> Result<u64, IndexError> {
391391
seek_to_offset(&mut self.inner, index_file, start_tx_offset)
392392
}
393393

@@ -424,11 +424,13 @@ impl<R: io::BufRead + io::Seek> Reader<R> {
424424
/// - `segment` - segment reader
425425
/// - `min_tx_offset` - minimum transaction offset in the segment
426426
/// - `start_tx_offset` - transaction offset to advance to
427+
///
428+
/// Returns the byte position `segment` is at after seeking.
427429
pub fn seek_to_offset<R: io::Read + io::Seek>(
428430
mut segment: &mut R,
429431
index_file: &TxOffsetIndex,
430432
start_tx_offset: u64,
431-
) -> Result<(), IndexError> {
433+
) -> Result<u64, IndexError> {
432434
let (index_key, byte_offset) = index_file.key_lookup(start_tx_offset)?;
433435

434436
// If the index_key is 0, it means the index file is empty, return error without seeking
@@ -440,17 +442,17 @@ pub fn seek_to_offset<R: io::Read + io::Seek>(
440442
debug_assert!(index_key <= start_tx_offset);
441443

442444
// Check if the offset index is pointing to the right commit.
443-
validate_commit_header(&mut segment, byte_offset).map(|hdr| {
444-
if hdr.min_tx_offset == index_key {
445-
// Advance the segment Seek if expected commit is found.
446-
segment
447-
.seek(SeekFrom::Start(byte_offset))
448-
.map(|_| ())
449-
.map_err(Into::into)
450-
} else {
451-
Err(io::Error::new(io::ErrorKind::InvalidData, "mismatch key in index offset file").into())
452-
}
453-
})?
445+
let hdr = validate_commit_header(&mut segment, byte_offset)?;
446+
if hdr.min_tx_offset == index_key {
447+
// Advance the segment Seek if expected commit is found.
448+
segment.seek(SeekFrom::Start(byte_offset))
449+
} else {
450+
Err(io::Error::new(
451+
io::ErrorKind::InvalidData,
452+
"mismatched key in offset index file",
453+
))
454+
}
455+
.map_err(Into::into)
454456
}
455457

456458
/// Try to extract the commit header from the asked position without advancing seek.

crates/commitlog/src/stream/reader.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{io::SeekFrom, ops::RangeBounds};
33
use async_stream::try_stream;
44
use bytes::{Buf as _, Bytes};
55
use futures::Stream;
6-
use log::{debug, trace, warn};
6+
use log::{trace, warn};
77
use tokio::{
88
io::{self, AsyncBufRead, AsyncReadExt as _, AsyncSeek, AsyncSeekExt as _},
99
task::spawn_blocking,
@@ -12,6 +12,8 @@ use tokio_util::io::SyncIoBridge;
1212

1313
use crate::{
1414
commit,
15+
error::source_chain,
16+
index::IndexError,
1517
repo::Repo,
1618
segment::{self, seek_to_offset, CHECKSUM_LEN},
1719
};
@@ -83,13 +85,23 @@ fn read_segment(
8385
segment = spawn_blocking(move || {
8486
let mut segment = SyncIoBridge::new(segment);
8587
if let Ok(offset_index) = repo.get_offset_index(segment_start) {
86-
debug!("seek_to_offset segment={} start={}", segment_start, range.start);
88+
trace!("seek_to_offset segment={} start={}", segment_start, range.start);
8789
seek_to_offset(&mut segment, &offset_index, range.start)
88-
.inspect_err(|e| {
89-
warn!(
90-
"error seeking to offset {} in segment {}: {}",
91-
range.start, segment_start, e
92-
)
90+
.inspect_err(|e| match e {
91+
IndexError::KeyNotFound =>
92+
trace!(
93+
"offset not found segment={} offset={}",
94+
segment_start, range.start
95+
),
96+
e => {
97+
warn!(
98+
"error reading index segment={} offset={}: {} {}",
99+
segment_start,
100+
range.start,
101+
e,
102+
source_chain(&e)
103+
)
104+
}
93105
})
94106
.ok();
95107
}

0 commit comments

Comments
 (0)