From 4afd916409e11159583eeaf0d684f1424f88490c Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 26 May 2025 08:06:03 +0200 Subject: [PATCH] commitlog: Reduce log noise when offset index cannot be used Much noise is e.g. created when the index is empty. Also consolidate the logic a bit. --- crates/commitlog/src/commitlog.rs | 81 ++++++++++++++++----------- crates/commitlog/src/error.rs | 14 +++++ crates/commitlog/src/segment.rs | 28 ++++----- crates/commitlog/src/stream/reader.rs | 26 ++++++--- 4 files changed, 96 insertions(+), 53 deletions(-) diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index b5c92545685..33638de2749 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -5,9 +5,10 @@ use log::{debug, info, trace, warn}; use crate::{ commit::StoredCommit, - error, + error::{self, source_chain}, + index::IndexError, payload::Decoder, - repo::{self, Repo}, + repo::{self, Repo, TxOffsetIndex}, segment::{self, FileLike, Transaction, Writer}, Commit, Encode, Options, DEFAULT_LOG_FORMAT_VERSION, }; @@ -476,25 +477,8 @@ fn reset_to_internal(repo: &impl Repo, segments: &[u64], offset: u64) -> io::Res // Read commit-wise until we find the byte offset. let mut reader = repo::open_segment_reader(repo, DEFAULT_LOG_FORMAT_VERSION, segment)?; - let (index_file, mut byte_offset) = repo - .get_offset_index(segment) - .and_then(|index_file| { - let (key, byte_offset) = index_file.key_lookup(offset).map_err(|e| { - io::Error::new(io::ErrorKind::NotFound, format!("Offset index cannot be used: {e:?}")) - })?; - - reader.seek_to_offset(&index_file, key).map_err(|e| { - io::Error::new( - io::ErrorKind::InvalidData, - format!("Offset index is not used at offset {key}: {e}"), - ) - })?; - - Ok((Some(index_file), byte_offset)) - }) - .inspect_err(|e| { - warn!("commitlog offset index is not used: {e:?}"); - }) + let (index_file, mut byte_offset) = try_seek_using_offset_index(repo, &mut reader, offset) + .map(|(index_file, byte_offset)| (Some(index_file), byte_offset)) .unwrap_or((None, segment::Header::LEN as u64)); let commits = reader.commits(); @@ -732,18 +716,7 @@ impl Commits { /// index to advance the segment reader. fn try_seek_to_initial_offset(&self, segment: &mut segment::Reader) { if let CommitInfo::Initial { next_offset } = &self.last_commit { - let _ = self - .segments - .repo - .get_offset_index(segment.min_tx_offset) - .map_err(Into::into) - .and_then(|index_file| segment.seek_to_offset(&index_file, *next_offset)) - .inspect_err(|e| { - warn!( - "commitlog offset index is not used at segment {}: {}", - segment.min_tx_offset, e - ); - }); + try_seek_using_offset_index(&self.segments.repo, segment, *next_offset); } } } @@ -794,6 +767,48 @@ impl Iterator for CommitsWithVersion { } } +/// Try to advance `reader` to `offset` using the offset index. +/// +/// If successful, returns the offset index and the byte position of `reader`. +/// `None` if the position of `reader` is unchanged. +fn try_seek_using_offset_index( + repo: &R, + reader: &mut segment::Reader, + offset: u64, +) -> Option<(TxOffsetIndex, u64)> { + let segment_offset = reader.min_tx_offset; + let index = repo + .get_offset_index(segment_offset) + .inspect_err(|e| { + if e.kind() == io::ErrorKind::NotFound { + debug!("offset index does not exist segment={segment_offset}"); + } else { + warn!( + "error opening offset index segment={segment_offset}: {e} {}", + source_chain(&e) + ); + } + }) + .ok()?; + + reader + .seek_to_offset(&index, offset) + .inspect_err(|e| match e { + // Can happen if the segment is empty or small, so don't spam the logs. + IndexError::KeyNotFound => { + debug!("offset not found segment={segment_offset} offset={offset}"); + } + e => { + warn!( + "error reading index segment={segment_offset} offset={offset}: {e} {}", + source_chain(&e) + ); + } + }) + .ok() + .map(|pos| (index, pos)) +} + #[cfg(test)] mod tests { use std::{cell::Cell, iter::repeat}; diff --git a/crates/commitlog/src/error.rs b/crates/commitlog/src/error.rs index 655ffcb0f1f..0d72303b649 100644 --- a/crates/commitlog/src/error.rs +++ b/crates/commitlog/src/error.rs @@ -72,3 +72,17 @@ pub enum SegmentMetadata { #[error(transparent)] Io(#[from] io::Error), } + +/// Recursively concatenate `e.source()`, separated by ": ". +pub(crate) fn source_chain(e: &impl std::error::Error) -> String { + let mut s = String::new(); + let mut source = e.source(); + while let Some(cause) = source { + s.push(':'); + s.push(' '); + s.push_str(&cause.to_string()); + source = cause.source() + } + + s +} diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index 7d21e20e615..1c1411f3215 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -387,7 +387,7 @@ impl Reader { } } - pub fn seek_to_offset(&mut self, index_file: &TxOffsetIndex, start_tx_offset: u64) -> Result<(), IndexError> { + pub fn seek_to_offset(&mut self, index_file: &TxOffsetIndex, start_tx_offset: u64) -> Result { seek_to_offset(&mut self.inner, index_file, start_tx_offset) } @@ -424,11 +424,13 @@ impl Reader { /// - `segment` - segment reader /// - `min_tx_offset` - minimum transaction offset in the segment /// - `start_tx_offset` - transaction offset to advance to +/// +/// Returns the byte position `segment` is at after seeking. pub fn seek_to_offset( mut segment: &mut R, index_file: &TxOffsetIndex, start_tx_offset: u64, -) -> Result<(), IndexError> { +) -> Result { let (index_key, byte_offset) = index_file.key_lookup(start_tx_offset)?; // If the index_key is 0, it means the index file is empty, return error without seeking @@ -440,17 +442,17 @@ pub fn seek_to_offset( debug_assert!(index_key <= start_tx_offset); // Check if the offset index is pointing to the right commit. - validate_commit_header(&mut segment, byte_offset).map(|hdr| { - if hdr.min_tx_offset == index_key { - // Advance the segment Seek if expected commit is found. - segment - .seek(SeekFrom::Start(byte_offset)) - .map(|_| ()) - .map_err(Into::into) - } else { - Err(io::Error::new(io::ErrorKind::InvalidData, "mismatch key in index offset file").into()) - } - })? + let hdr = validate_commit_header(&mut segment, byte_offset)?; + if hdr.min_tx_offset == index_key { + // Advance the segment Seek if expected commit is found. + segment.seek(SeekFrom::Start(byte_offset)) + } else { + Err(io::Error::new( + io::ErrorKind::InvalidData, + "mismatched key in offset index file", + )) + } + .map_err(Into::into) } /// Try to extract the commit header from the asked position without advancing seek. diff --git a/crates/commitlog/src/stream/reader.rs b/crates/commitlog/src/stream/reader.rs index 8c2c9898574..9868380b934 100644 --- a/crates/commitlog/src/stream/reader.rs +++ b/crates/commitlog/src/stream/reader.rs @@ -3,7 +3,7 @@ use std::{io::SeekFrom, ops::RangeBounds}; use async_stream::try_stream; use bytes::{Buf as _, Bytes}; use futures::Stream; -use log::{debug, trace, warn}; +use log::{trace, warn}; use tokio::{ io::{self, AsyncBufRead, AsyncReadExt as _, AsyncSeek, AsyncSeekExt as _}, task::spawn_blocking, @@ -12,6 +12,8 @@ use tokio_util::io::SyncIoBridge; use crate::{ commit, + error::source_chain, + index::IndexError, repo::Repo, segment::{self, seek_to_offset, CHECKSUM_LEN}, }; @@ -83,13 +85,23 @@ fn read_segment( segment = spawn_blocking(move || { let mut segment = SyncIoBridge::new(segment); if let Ok(offset_index) = repo.get_offset_index(segment_start) { - debug!("seek_to_offset segment={} start={}", segment_start, range.start); + trace!("seek_to_offset segment={} start={}", segment_start, range.start); seek_to_offset(&mut segment, &offset_index, range.start) - .inspect_err(|e| { - warn!( - "error seeking to offset {} in segment {}: {}", - range.start, segment_start, e - ) + .inspect_err(|e| match e { + IndexError::KeyNotFound => + trace!( + "offset not found segment={} offset={}", + segment_start, range.start + ), + e => { + warn!( + "error reading index segment={} offset={}: {} {}", + segment_start, + range.start, + e, + source_chain(&e) + ) + } }) .ok(); }