From 6e7331da50fc47cb8a897dfb7e454f773980cc37 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 6 Aug 2025 15:52:10 +0200 Subject: [PATCH] commitlog: Provide folding over a range of tx offsets When using iterators, the consumer can decide if and when to abort the traversal. With folds, the only way would be to make the decoder return an error, which is not always convenient. Instead, allow folding over a range of transaction offsets. --- crates/commitlog/src/commitlog.rs | 105 ++++++++++++++++++++++++++++-- crates/commitlog/src/lib.rs | 9 +++ 2 files changed, 107 insertions(+), 7 deletions(-) diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index af43cb1d24d..c4ab243766d 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -1,4 +1,11 @@ -use std::{fmt::Debug, io, marker::PhantomData, mem, ops::Range, vec}; +use std::{ + fmt::Debug, + io, + marker::PhantomData, + mem, + ops::{Range, RangeBounds}, + vec, +}; use itertools::Itertools; use log::{debug, info, trace, warn}; @@ -301,7 +308,22 @@ impl Generic { D: Decoder, D::Error: From, { - fold_transactions_internal(self.commits_from(offset).with_log_format_version(), decoder, offset) + fold_transactions_internal(self.commits_from(offset).with_log_format_version(), decoder, offset..) + } + + pub fn fold_transaction_range(&self, range: impl RangeBounds, decoder: D) -> Result<(), D::Error> + where + D: Decoder, + D::Error: From, + { + use std::ops::Bound::*; + + let start = match range.start_bound() { + Included(x) => *x, + Excluded(x) => x + 1, + Unbounded => 0, + }; + fold_transactions_internal(self.commits_from(start).with_log_format_version(), decoder, range) } } @@ -387,8 +409,29 @@ where D: Decoder, D::Error: From + From, { - let commits = commits_from(repo, max_log_format_version, offset)?; - fold_transactions_internal(commits.with_log_format_version(), de, offset) + fold_transaction_range(repo, max_log_format_version, offset.., de) +} + +pub fn fold_transaction_range( + repo: R, + max_log_format_version: u8, + range: impl RangeBounds, + de: D, +) -> Result<(), D::Error> +where + R: Repo, + D: Decoder, + D::Error: From + From, +{ + use std::ops::Bound::*; + + let start = match range.start_bound() { + Included(x) => *x, + Excluded(x) => x + 1, + Unbounded => 0, + }; + let commits = commits_from(repo, max_log_format_version, start)?; + fold_transactions_internal(commits.with_log_format_version(), de, range) } fn transactions_from_internal<'a, R, D, T>( @@ -409,12 +452,38 @@ where .map(|x| x.and_then(|y| y)) } -fn fold_transactions_internal(mut commits: CommitsWithVersion, de: D, from: u64) -> Result<(), D::Error> +fn fold_transactions_internal( + mut commits: CommitsWithVersion, + de: D, + range: impl RangeBounds, +) -> Result<(), D::Error> where R: Repo, D: Decoder, D::Error: From, { + use std::ops::Bound::*; + + // Avoid reading the first commit if it wouldn't be in the range anyway. + if range_is_empty(&range) { + return Ok(()); + } + + // `true` if `offset` is outside `range`, s.t. it is smaller than the start + // bound. + let before_start = |offset: &u64| match range.start_bound() { + Included(x) => offset < x, + Excluded(x) => offset <= x, + Unbounded => false, + }; + // `true` if `offset` is outside `range`, s.t. it is greater than the end + // bound. + let past_end = |offset: &u64| match range.end_bound() { + Included(x) => offset > x, + Excluded(x) => offset >= x, + Unbounded => false, + }; + while let Some(commit) = commits.next() { let (version, commit) = match commit { Ok(version_and_commit) => version_and_commit, @@ -434,15 +503,18 @@ where trace!("commit {} n={} version={}", commit.min_tx_offset, commit.n, version); let max_tx_offset = commit.min_tx_offset + commit.n as u64; - if max_tx_offset <= from { + // Skip if no transaction in the commit is in range. + if before_start(&max_tx_offset) { continue; } let records = &mut commit.records.as_slice(); for n in 0..commit.n { let tx_offset = commit.min_tx_offset + n as u64; - if tx_offset < from { + if before_start(&tx_offset) { de.skip_record(version, tx_offset, records)?; + } else if past_end(&tx_offset) { + return Ok(()); } else { de.consume_record(version, tx_offset, records)?; } @@ -809,6 +881,25 @@ fn try_seek_using_offset_index( .map(|pos| (index, pos)) } +// `range_bounds_is_empty` https://github.com/rust-lang/rust/issues/137300 +// +// This is correct for integers, but unsound for arbitrary `T`, so unlikely to +// be stabilized. +fn range_is_empty(range: &impl RangeBounds) -> bool { + use std::ops::Bound::*; + + #[rustfmt::skip] + let not_empty = match (range.start_bound(), range.end_bound()) { + (Unbounded, _) | (_, Unbounded) => true, + (Included(start), Excluded(end)) + | (Excluded(start), Included(end)) + | (Excluded(start), Excluded(end)) => start < end, + (Included(start), Included(end)) => start <= end, + }; + + !not_empty +} + #[cfg(test)] mod tests { use std::{cell::Cell, iter::repeat}; diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index a5d071d4182..f7e6501bf37 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -1,6 +1,7 @@ use std::{ io, num::{NonZeroU16, NonZeroU64}, + ops::RangeBounds, sync::RwLock, }; @@ -609,3 +610,11 @@ where { commitlog::fold_transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de) } + +pub fn fold_transaction_range(root: CommitLogDir, range: impl RangeBounds, de: D) -> Result<(), D::Error> +where + D: Decoder, + D::Error: From + From, +{ + commitlog::fold_transaction_range(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, range, de) +}