diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index af43cb1d24d..c4ab243766d 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -1,4 +1,11 @@ -use std::{fmt::Debug, io, marker::PhantomData, mem, ops::Range, vec}; +use std::{ + fmt::Debug, + io, + marker::PhantomData, + mem, + ops::{Range, RangeBounds}, + vec, +}; use itertools::Itertools; use log::{debug, info, trace, warn}; @@ -301,7 +308,22 @@ impl Generic { D: Decoder, D::Error: From, { - fold_transactions_internal(self.commits_from(offset).with_log_format_version(), decoder, offset) + fold_transactions_internal(self.commits_from(offset).with_log_format_version(), decoder, offset..) + } + + pub fn fold_transaction_range(&self, range: impl RangeBounds, decoder: D) -> Result<(), D::Error> + where + D: Decoder, + D::Error: From, + { + use std::ops::Bound::*; + + let start = match range.start_bound() { + Included(x) => *x, + Excluded(x) => x + 1, + Unbounded => 0, + }; + fold_transactions_internal(self.commits_from(start).with_log_format_version(), decoder, range) } } @@ -387,8 +409,29 @@ where D: Decoder, D::Error: From + From, { - let commits = commits_from(repo, max_log_format_version, offset)?; - fold_transactions_internal(commits.with_log_format_version(), de, offset) + fold_transaction_range(repo, max_log_format_version, offset.., de) +} + +pub fn fold_transaction_range( + repo: R, + max_log_format_version: u8, + range: impl RangeBounds, + de: D, +) -> Result<(), D::Error> +where + R: Repo, + D: Decoder, + D::Error: From + From, +{ + use std::ops::Bound::*; + + let start = match range.start_bound() { + Included(x) => *x, + Excluded(x) => x + 1, + Unbounded => 0, + }; + let commits = commits_from(repo, max_log_format_version, start)?; + fold_transactions_internal(commits.with_log_format_version(), de, range) } fn transactions_from_internal<'a, R, D, T>( @@ -409,12 +452,38 @@ where .map(|x| x.and_then(|y| y)) } -fn fold_transactions_internal(mut commits: CommitsWithVersion, de: D, from: u64) -> Result<(), D::Error> +fn fold_transactions_internal( + mut commits: CommitsWithVersion, + de: D, + range: impl RangeBounds, +) -> Result<(), D::Error> where R: Repo, D: Decoder, D::Error: From, { + use std::ops::Bound::*; + + // Avoid reading the first commit if it wouldn't be in the range anyway. + if range_is_empty(&range) { + return Ok(()); + } + + // `true` if `offset` is outside `range`, s.t. it is smaller than the start + // bound. + let before_start = |offset: &u64| match range.start_bound() { + Included(x) => offset < x, + Excluded(x) => offset <= x, + Unbounded => false, + }; + // `true` if `offset` is outside `range`, s.t. it is greater than the end + // bound. + let past_end = |offset: &u64| match range.end_bound() { + Included(x) => offset > x, + Excluded(x) => offset >= x, + Unbounded => false, + }; + while let Some(commit) = commits.next() { let (version, commit) = match commit { Ok(version_and_commit) => version_and_commit, @@ -434,15 +503,18 @@ where trace!("commit {} n={} version={}", commit.min_tx_offset, commit.n, version); let max_tx_offset = commit.min_tx_offset + commit.n as u64; - if max_tx_offset <= from { + // Skip if no transaction in the commit is in range. + if before_start(&max_tx_offset) { continue; } let records = &mut commit.records.as_slice(); for n in 0..commit.n { let tx_offset = commit.min_tx_offset + n as u64; - if tx_offset < from { + if before_start(&tx_offset) { de.skip_record(version, tx_offset, records)?; + } else if past_end(&tx_offset) { + return Ok(()); } else { de.consume_record(version, tx_offset, records)?; } @@ -809,6 +881,25 @@ fn try_seek_using_offset_index( .map(|pos| (index, pos)) } +// `range_bounds_is_empty` https://github.com/rust-lang/rust/issues/137300 +// +// This is correct for integers, but unsound for arbitrary `T`, so unlikely to +// be stabilized. +fn range_is_empty(range: &impl RangeBounds) -> bool { + use std::ops::Bound::*; + + #[rustfmt::skip] + let not_empty = match (range.start_bound(), range.end_bound()) { + (Unbounded, _) | (_, Unbounded) => true, + (Included(start), Excluded(end)) + | (Excluded(start), Included(end)) + | (Excluded(start), Excluded(end)) => start < end, + (Included(start), Included(end)) => start <= end, + }; + + !not_empty +} + #[cfg(test)] mod tests { use std::{cell::Cell, iter::repeat}; diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index a5d071d4182..f7e6501bf37 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -1,6 +1,7 @@ use std::{ io, num::{NonZeroU16, NonZeroU64}, + ops::RangeBounds, sync::RwLock, }; @@ -609,3 +610,11 @@ where { commitlog::fold_transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de) } + +pub fn fold_transaction_range(root: CommitLogDir, range: impl RangeBounds, de: D) -> Result<(), D::Error> +where + D: Decoder, + D::Error: From + From, +{ + commitlog::fold_transaction_range(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, range, de) +}