From 268118d1790f5981dabdcc2c617858a1a417d712 Mon Sep 17 00:00:00 2001 From: Lorena Rosati Date: Sat, 28 Mar 2026 00:15:46 +0000 Subject: [PATCH 01/18] re create functionality of criterion compare action --- .github/scripts/run-benchmarks.sh | 185 ++++++++++++++++++++++++++++++ .github/workflows/benchmark.yml | 84 +++++++++----- benchmarks/README.md | 22 +++- 3 files changed, 258 insertions(+), 33 deletions(-) create mode 100644 .github/scripts/run-benchmarks.sh diff --git a/.github/scripts/run-benchmarks.sh b/.github/scripts/run-benchmarks.sh new file mode 100644 index 0000000000..03b9b68e59 --- /dev/null +++ b/.github/scripts/run-benchmarks.sh @@ -0,0 +1,185 @@ +#!/usr/bin/env bash +# Benchmark comparison script for pull requests. +# +# Called by .github/workflows/benchmark.yml (run-benchmark job) after the repo +# has been checked out at the PR head. Writes the formatted markdown comparison +# to /tmp/bench-comment.md; the post-comment job picks it up and posts it. +# +# Expects the following environment variables: +# +# COMMENT - the /bench PR comment body +# BASE_REF - base branch ref (e.g. "main") +# HEAD_SHA - full SHA of the PR head commit +# +# Usage for local testing: +# COMMENT="/bench --tags base --filter snapshotConstruction" \ +# BASE_REF=main HEAD_SHA=abc1234 \ +# bash .github/scripts/run-benchmarks.sh + +set -euo pipefail +shopt -s extglob + +# --------------------------------------------------------------------------- +# 1. Parse the /bench comment +# Syntax: /bench [--tags ] [--filter ] +# --tags sets BENCH_TAGS (comma-separated tag list); defaults to "base" +# when the comment is bare /bench with no recognized args +# --filter Criterion name regex passed as a positional arg to cargo bench +# --------------------------------------------------------------------------- + +ARGS="${COMMENT#/bench}" +ARGS="${ARGS##+( )}" + +TAGS="" +FILTER="" + +if [[ -z "$ARGS" ]]; then + # Bare /bench with no args: default to the "base" tag + TAGS="base" +else + # Parse --tags and --filter flags; each takes the next whitespace-delimited + # token as its value. Unknown tokens are silently ignored. + while [[ -n "$ARGS" ]]; do + TOKEN="${ARGS%% *}" + ARGS="${ARGS#"$TOKEN"}" + ARGS="${ARGS##+( )}" + + if [[ "$TOKEN" == "--tags" ]]; then + TAGS="${ARGS%% *}" + ARGS="${ARGS#"$TAGS"}" + ARGS="${ARGS##+( )}" + elif [[ "$TOKEN" == "--filter" ]]; then + FILTER="${ARGS%% *}" + ARGS="${ARGS#"$FILTER"}" + ARGS="${ARGS##+( )}" + fi + done +fi + +# Sanitize tags: strict allowlist (alphanumeric, comma, underscore, hyphen) +TAGS=$(printf '%s' "$TAGS" | tr -cd 'a-zA-Z0-9,_-') + +# Sanitize filter: strip control characters only, preserving regex metacharacters. +# The filter is always passed double-quoted to cargo bench so no shell injection +# is possible from the preserved printable characters. +FILTER=$(printf '%s' "$FILTER" | tr -d '\000-\037\177') + +# If nothing was parsed (unrecognized tokens, typos, missing values), default to "base" +if [[ -z "$TAGS" && -z "$FILTER" ]]; then + TAGS="base" +fi + +echo "Parsed tags: ${TAGS:-}" +echo "Parsed filter: ${FILTER:-}" + +[[ -n "$TAGS" ]] && export BENCH_TAGS="$TAGS" + +# --------------------------------------------------------------------------- +# 2. Benchmark the PR branch (already checked out by the workflow) +# --------------------------------------------------------------------------- +(cd benchmarks && cargo bench --locked --bench workload_bench -- --save-baseline changes "$FILTER") + +# --------------------------------------------------------------------------- +# 3. Switch to the base branch and benchmark it +# The benchmarks/target/ directory is not tracked by git, so the +# "changes" baseline files are preserved across the branch switch. +# --------------------------------------------------------------------------- +git fetch origin -- "$BASE_REF" +git checkout -- "$BASE_REF" +(cd benchmarks && cargo bench --locked --bench workload_bench -- --save-baseline base "$FILTER") + +# --------------------------------------------------------------------------- +# 4. Compare baselines with critcmp and format as a markdown table. +# Replicates criterion-compare-action's output: +# - Parses actual duration values (not rank factors) for the % column +# - Bolds the faster duration and % cell when the difference is +# statistically significant (error bounds do not overlap) +# --------------------------------------------------------------------------- +cat > /tmp/parse_critcmp.py << 'PYEOF' +import sys, re + +def to_seconds(value, units): + u = units.strip() + if u == 's': return value + if u == 'ms': return value / 1e3 + if u in ('µs', 'us', 'μs'): return value / 1e6 + if u == 'ns': return value / 1e9 + return value + +def is_significant(chg_dur, chg_err, base_dur, base_err): + if chg_dur < base_dur: + return chg_dur + chg_err < base_dur or base_dur - base_err > chg_dur + else: + return chg_dur - chg_err > base_dur or base_dur + base_err < chg_dur + +def parse_duration(s): + m = re.match(r'([0-9.]+)±([0-9.]+)(.+)', s.strip()) + if not m: + return None + return float(m.group(1)), float(m.group(2)), m.group(3).strip() + +lines = sys.stdin.read().splitlines() +print("| Test | Base | PR | % |") +print("|------|--------------|------------------|---|") + +for line in lines[2:]: # skip critcmp header rows + if not line.strip(): + continue + # critcmp columns (split on 2+ spaces): + # with throughput: name, baseFactor, baseDuration, baseBandwidth, changesFactor, changesDuration, changesBandwidth + # without throughput: name, baseFactor, baseDuration, changesFactor, changesDuration + # Locate duration fields by the presence of "±" rather than hardcoding indices, + # so the script works correctly regardless of whether bandwidth columns are present. + fields = re.split(r' +', line) + name = fields[0].strip().replace('|', r'\|') if fields else '' + dur_fields = [f.strip() for f in fields[1:] if '±' in f] + base_dur_str = dur_fields[0] if len(dur_fields) > 0 else None + chg_dur_str = dur_fields[1] if len(dur_fields) > 1 else None + + if not name and not base_dur_str and not chg_dur_str: + continue + + base_display = base_dur_str or 'N/A' + chg_display = chg_dur_str or 'N/A' + difference = 'N/A' + + if base_dur_str and chg_dur_str: + base_p = parse_duration(base_dur_str) + chg_p = parse_duration(chg_dur_str) + if base_p and chg_p: + base_secs = to_seconds(base_p[0], base_p[2]) + base_err_secs = to_seconds(base_p[1], base_p[2]) + chg_secs = to_seconds(chg_p[0], chg_p[2]) + chg_err_secs = to_seconds(chg_p[1], chg_p[2]) + + pct = -(1 - chg_secs / base_secs) * 100 + prefix = '' if chg_secs <= base_secs else '+' + difference = f'{prefix}{pct:.2f}%' + + if is_significant(chg_secs, chg_err_secs, base_secs, base_err_secs): + if chg_secs < base_secs: + chg_display = f'**{chg_dur_str}**' + elif chg_secs > base_secs: + base_display = f'**{base_dur_str}**' + difference = f'**{difference}**' + + print(f'| {name} | {base_display} | {chg_display} | {difference} |') +PYEOF + +COMPARISON=$((cd benchmarks && critcmp base changes) | python3 /tmp/parse_critcmp.py) + +# --------------------------------------------------------------------------- +# 5. Write results to /tmp/bench-comment.md +# The post-comment job in benchmark.yml downloads this file as an artifact +# and posts it as a PR comment using a step that holds GH_TOKEN. +# --------------------------------------------------------------------------- +SHORT_SHA="${HEAD_SHA:0:7}" +{ + echo "## Benchmark for ${SHORT_SHA}" + echo "
" + echo "Click to view benchmark" + echo "" + echo "$COMPARISON" + echo "" + echo "
" +} > /tmp/bench-comment.md diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 46a04b2dea..cf9a56c524 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -6,35 +6,39 @@ on: types: [created, edited] name: Benchmarking PR performance jobs: - runBenchmark: + run-benchmark: name: Run benchmarks if: > github.event.issue.pull_request && (github.event.comment.body == '/bench' || startsWith(github.event.comment.body, '/bench ')) runs-on: ubuntu-latest permissions: - pull-requests: write + contents: read + outputs: + pr_number: ${{ steps.pr.outputs.pr_number }} steps: - - name: Parse benchmark tags - env: - COMMENT: ${{ github.event.comment.body }} - run: | - if [[ "$COMMENT" == "/bench" ]]; then - TAGS="base" - else - TAGS="${COMMENT#/bench }" - TAGS=$(echo "$TAGS" | tr -d '[:space:]') - fi - echo "BENCH_TAGS=$TAGS" >> "$GITHUB_ENV" - echo "Parsed tags: $TAGS" - - name: Get PR HEAD sha + - name: Get PR metadata id: pr - run: | - PR_DATA=$(gh api repos/${{ github.repository }}/pulls/${{ github.event.issue.number }}) - echo "head_sha=$(echo "$PR_DATA" | jq -r .head.sha)" >> "$GITHUB_OUTPUT" - echo "base_ref=$(echo "$PR_DATA" | jq -r .base.ref)" >> "$GITHUB_OUTPUT" env: GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + REPO: ${{ github.repository }} + PR_NUMBER: ${{ github.event.issue.number }} + run: | + PR_DATA=$(gh api "repos/$REPO/pulls/$PR_NUMBER") + HEAD_SHA=$(echo "$PR_DATA" | jq -r .head.sha) + BASE_REF=$(echo "$PR_DATA" | jq -r .base.ref) + [[ "$HEAD_SHA" == *$'\n'* || "$BASE_REF" == *$'\n'* ]] && { echo "Unexpected newline in API response" >&2; exit 1; } + [[ "$BASE_REF" =~ ^[a-zA-Z0-9/_.-]+$ ]] || { echo "Invalid BASE_REF: $BASE_REF" >&2; exit 1; } + printf 'head_sha=%s\n' "$HEAD_SHA" >> "$GITHUB_OUTPUT" + printf 'base_ref=%s\n' "$BASE_REF" >> "$GITHUB_OUTPUT" + printf 'pr_number=%s\n' "$PR_NUMBER" >> "$GITHUB_OUTPUT" + - name: Install critcmp + # Installed before checkout so the PR's .cargo/config.toml cannot + # redirect the registry to a malicious source. The runner's + # pre-installed Rust is sufficient -- no toolchain setup needed here. + # --locked is omitted for cargo install (same exemption as cargo miri + # setup); --version pins the top-level crate. + run: cargo install critcmp --version 0.1.8 - uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1 with: ref: ${{ steps.pr.outputs.head_sha }} @@ -45,13 +49,35 @@ jobs: - uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 # v2 with: save-if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }} - # TODO: This action internally runs `cargo bench` without --locked, bypassing our - # supply chain lockfile policy (see build.yml top-level comment). Replace with - # manual cargo bench --locked + critcmp + gh pr comment steps. - # - uses: boa-dev/criterion-compare-action@adfd3a94634fe2041ce5613eb7df09d247555b87 # v3.2.4 - # with: - # token: ${{ secrets.GITHUB_TOKEN }} - # branchName: ${{ steps.pr.outputs.base_ref }} - # cwd: benchmarks - # benchName: workload_bench - - run: echo "Benchmarking is temporarily disabled. See TODO above." + - name: Run benchmarks + # GH_TOKEN is intentionally absent from this step's env. Untrusted PR code + # (build.rs, bench files) must not have access to the write-capable token. + # The comment is posted in the post-comment job after this job completes. + env: + COMMENT: ${{ github.event.comment.body }} + BASE_REF: ${{ steps.pr.outputs.base_ref }} + HEAD_SHA: ${{ steps.pr.outputs.head_sha }} + run: bash .github/scripts/run-benchmarks.sh + - name: Upload benchmark comment + uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2 + with: + name: bench-comment + path: /tmp/bench-comment.md + + post-comment: + name: Post benchmark results + needs: run-benchmark + runs-on: ubuntu-latest + permissions: + pull-requests: write + steps: + - name: Download benchmark comment + uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0 + with: + name: bench-comment + path: /tmp/ + - name: Post results as PR comment + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + PR_NUMBER: ${{ needs.run-benchmark.outputs.pr_number }} + run: gh pr comment "$PR_NUMBER" --body-file /tmp/bench-comment.md diff --git a/benchmarks/README.md b/benchmarks/README.md index d98a8ec379..579432f077 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -85,12 +85,26 @@ BENCH_TAGS=base,my-feature cargo bench -p delta_kernel_benchmarks ### Running benchmarking on a PR -To trigger benchmarks on a pull request, post a comment on the PR with one of the following: +To trigger benchmarks on a pull request, post a comment using the following syntax: -- `/bench` — runs benchmarks with `BENCH_TAGS=base` -- `/bench ` — runs benchmarks with `BENCH_TAGS=` (e.g. `bench base,tag1`) +``` +/bench [--tags ] [--filter ] +``` + +- `--tags` sets `BENCH_TAGS` (comma-separated), controlling which tables run. Defaults to `base` if omitted. +- `--filter` is a single-token Criterion regex matched against benchmark names. Optional. +- Both flags are independent and can be given in any order. + +Examples: +``` +/bench # BENCH_TAGS=base, all benchmark names +/bench --tags base,my-tag # BENCH_TAGS=base,my-tag, all benchmark names +/bench --filter snapshotConstruction # BENCH_TAGS=base, only snapshotConstruction benchmarks +/bench --tags base --filter 101kAdds.*snapshotConstruction # combined: AND pattern +/bench --filter 101kAdds|10kAdds # OR two table names +``` -See [By tag (`BENCH_TAGS`)](#by-tag-bench_tags) for details on how tags work. Results are posted automatically as a PR comment, comparing the PR branch against the base branch. +See [By tag (`BENCH_TAGS`)](#by-tag-bench_tags) for how tags work and [By benchmark name](#by-benchmark-name) for regex pattern examples. Results are posted automatically as a PR comment, comparing the PR branch against the base branch. ## Workload data layout From 7300a25b5baf9db5c961ecab287aeadbf7344aba Mon Sep 17 00:00:00 2001 From: Lorena Rosati Date: Wed, 18 Mar 2026 22:46:44 +0000 Subject: [PATCH 02/18] refactor list add list with backward checkpoint scan added tests, some restructuring fixes fix restored list() tests for reviewer readability quick fix quick fix fix final tests test fixes fix --- kernel/src/log_segment.rs | 46 ++- kernel/src/log_segment/tests.rs | 56 +++ kernel/src/log_segment_files.rs | 699 +++++++++++++++++++++++++++++++- 3 files changed, 776 insertions(+), 25 deletions(-) diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 46d0c2aa06..49c0909ed1 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -298,20 +298,38 @@ impl LogSegment { .as_ref() .and_then(|hint| hint.checkpoint_schema.clone()); - let listed_files = match (checkpoint_hint, time_travel_version) { - (Some(cp), None) => { - LogSegmentFiles::list_with_checkpoint_hint(&cp, storage, &log_root, log_tail, None)? - } - (Some(cp), Some(end_version)) if cp.version <= end_version => { - LogSegmentFiles::list_with_checkpoint_hint( - &cp, - storage, - &log_root, - log_tail, - Some(end_version), - )? - } - _ => LogSegmentFiles::list(storage, &log_root, log_tail, None, time_travel_version)?, + // The end_version is the time_travel_version, if present + // TODO: When max catalog version is implemented, we would use that as end_version if time_travel_version is not present + let end_version = time_travel_version; + + // Keep the hint only if it doesn't point past end_version + // If there is no end_version bound, any hint is acceptable + let usable_hint = checkpoint_hint.filter(|cp| end_version.is_none_or(|v| cp.version <= v)); + + // Cases: + // + // 1. usable_hint present, end_version is Some --> list_with_checkpoint_hint from hint.version TO end_version + // 2. usable_hint present, end_version is None --> list_with_checkpoint_hint from hint.version unbounded + // 3. no usable_hint, end_version is Some --> backward-scan for checkpoint before end_version, + // list from that checkpoint TO end_version + // (falls back to v0 if no checkpoint found) + // 4. no usable_hint, end_version is None --> list from v0 unbounded + + let listed_files = match (usable_hint, end_version) { + // Cases 1 and 2 + (Some(cp), end_version) => LogSegmentFiles::list_with_checkpoint_hint( + &cp, + storage, + &log_root, + log_tail, + end_version, + )?, + // Case 3 + (None, Some(end)) => LogSegmentFiles::list_with_backward_checkpoint_scan( + storage, &log_root, log_tail, end, + )?, + // Case 4 + (None, None) => LogSegmentFiles::list(storage, &log_root, log_tail, None, None)?, }; LogSegment::try_new( diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index 6da327e36a..d66df9fe75 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -829,6 +829,62 @@ async fn build_snapshot_with_start_checkpoint_and_time_travel_version() { assert_eq!(log_segment.listed.ascending_commit_files[0].version, 4); } +#[rstest::rstest] +#[case::no_hint(None)] +#[case::stale_hint(Some(LastCheckpointHint { + version: 10, // stale: 10 > end_version 5, so it is discarded + size: 10, + parts: None, + size_in_bytes: None, + num_of_add_files: None, + checkpoint_schema: None, + checksum: None, + tags: None, +}))] +#[tokio::test] +async fn build_snapshot_time_travel_no_checkpoint_falls_back_to_v0( + #[case] hint: Option, +) { + let paths: Vec = (0..=5).map(|v| delta_path_for_version(v, "json")).collect(); + let (storage, log_root) = build_log_with_paths_and_checkpoint(&paths, None).await; + + let log_segment = + LogSegment::for_snapshot_impl(storage.as_ref(), log_root, vec![], hint, Some(5)).unwrap(); + + let commit_files = log_segment.listed.ascending_commit_files; + let checkpoint_parts = log_segment.listed.checkpoint_parts; + + assert_eq!(checkpoint_parts.len(), 0); + let versions = commit_files.into_iter().map(|x| x.version).collect_vec(); + assert_eq!(versions, vec![0, 1, 2, 3, 4, 5]); +} + +#[tokio::test] +async fn build_snapshot_time_travel_no_hint_checkpoint_at_end_version_included() { + let (storage, log_root) = build_log_with_paths_and_checkpoint( + &[ + delta_path_for_version(0, "json"), + delta_path_for_version(1, "json"), + delta_path_for_version(2, "json"), + delta_path_for_version(3, "json"), + delta_path_for_version(4, "json"), + delta_path_for_version(5, "json"), + delta_path_for_version(5, "checkpoint.parquet"), + ], + None, + ) + .await; + + let log_segment = + LogSegment::for_snapshot_impl(storage.as_ref(), log_root, vec![], None, Some(5)).unwrap(); + + let commit_files = log_segment.listed.ascending_commit_files; + let checkpoint_parts = log_segment.listed.checkpoint_parts; + assert_eq!(checkpoint_parts.len(), 1); + assert_eq!(checkpoint_parts[0].version, 5); + assert_eq!(commit_files.len(), 0); +} + #[tokio::test] async fn build_table_changes_with_commit_versions() { let (storage, log_root) = build_log_with_paths_and_checkpoint( diff --git a/kernel/src/log_segment_files.rs b/kernel/src/log_segment_files.rs index 3355e9af24..198d03bfff 100644 --- a/kernel/src/log_segment_files.rs +++ b/kernel/src/log_segment_files.rs @@ -1,12 +1,16 @@ //! [`LogSegmentFiles`] is a struct holding the result of listing the delta log. Currently, it -//! exposes three APIs for listing: +//! exposes four APIs for listing: //! 1. [`list_commits`]: Lists all commit files between the provided start and end versions. //! 2. [`list`]: Lists all commit and checkpoint files between the provided start and end versions. //! 3. [`list_with_checkpoint_hint`]: Lists all commit and checkpoint files after the provided //! checkpoint hint. +//! 4. [`list_with_backward_checkpoint_scan`]: Scans backward from an end version in +//! 1000-version windows until a complete checkpoint is found or the log is exhausted. //! //! After listing, one can leverage the [`LogSegmentFiles`] to construct a [`LogSegment`]. //! +//! [`list_with_backward_checkpoint_scan`]: Self::list_with_backward_checkpoint_scan +//! //! [`list_commits`]: Self::list_commits //! [`list`]: Self::list //! [`list_with_checkpoint_hint`]: Self::list_with_checkpoint_hint @@ -121,6 +125,23 @@ fn group_checkpoint_parts(parts: Vec) -> HashMap bool { + files + .iter() + .filter(|f| f.is_checkpoint() && f.location.size > 0) + .chunk_by(|f| f.version) + .into_iter() + .any(|(_, parts)| { + let owned: Vec = parts.cloned().collect(); + group_checkpoint_parts(owned) + .iter() + .any(|(num_parts, part_files)| part_files.len() == *num_parts as usize) + }) +} + /// Accumulates and groups log files during listing. Each "group" consists of all files that /// share the same version number (e.g., commit, checkpoint parts, CRC files). /// @@ -223,15 +244,23 @@ impl ListingAccumulator { } } +/// Size of backward-scan window in LogSegmentFiles::list_with_backward_checkpoint_scan` +/// The range `[upper - 999, upper]` is inclusive on both ends, giving exactly 1000 versions per window +const BACKWARD_SCAN_WINDOW_SIZE: u64 = 999; + impl LogSegmentFiles { /// Assembles a `LogSegmentFiles` from `fs_files` (an iterator of files - /// listed from storage) and `log_tail` (catalog-provided commits). - // This logic is identical to the old `list()` implementation; it was extracted so that - // callers with different listing strategies can share the same core logic. - fn build_log_segment_files( + /// listed from storage) and `log_tail` (catalog-provided commits) + /// + /// `start_version` controls how the log_tail is filtered: + /// - `Some(v)`: include log_tail entries at version >= v + /// - `None`: derive the lower bound from the most recent complete checkpoint found during + /// the filesystem phase; entries at version > cp_version are included (checkpoint wins over + /// commits AT the checkpoint version). Falls back to 0 if no checkpoint was found. + pub(crate) fn build_log_segment_files( fs_files: impl Iterator>, log_tail: Vec, - start: Version, + start_version: Option, end_version: Option, ) -> DeltaResult { // check log_tail is only commits @@ -241,7 +270,6 @@ impl LogSegmentFiles { "log_tail should only contain commits" ); - let end = end_version.unwrap_or(Version::MAX); let log_tail_start_version = log_tail.first().map(|f| f.version); let mut acc = ListingAccumulator { end_version, @@ -274,7 +302,31 @@ impl LogSegmentFiles { acc.process_file(file); } - // Phase 2: Process log_tail entries. We do this after Phase 1 because log_tail commits + // Phase 2: resolve log_tail start version and end version upper bound + let resolved_start = match start_version { + Some(v) => v, + None => { + // Flush the last pending group so that output.checkpoint_parts is populated + // before we inspect it. Without this flush, a checkpoint whose parts arrived + // last would remain in pending_checkpoint_parts and the derived start version + // would fall back to 0 instead of the checkpoint version. + if let Some(gv) = acc.group_version { + acc.flush_checkpoint_group(gv); + } + // Use cp_version + 1: a checkpoint wins over a commit at the checkpoint + // version and must not be replayed on top of it. The checkpoint has already + // been flushed from pending_checkpoint_parts into output.checkpoint_parts, so + // there are no pending parts in Phase 3 that would naturally discard it. + acc.output + .checkpoint_parts + .first() + .map(|p| p.version + 1) + .unwrap_or(0) + } + }; + let end = end_version.unwrap_or(Version::MAX); + + // Phase 3: Process log_tail entries. We do this after Phase 1 because log_tail commits // start at log_tail_start_version and are in ascending version order — they always extend // (or overlap with, but supersede) the filesystem-listed commits. Processing them after // Phase 1 maintains ascending version order throughout, which is required by the checkpoint @@ -282,7 +334,7 @@ impl LogSegmentFiles { // versions, so there's no duplication here. let filtered_log_tail = log_tail .into_iter() - .filter(|entry| entry.version >= start && entry.version <= end); + .filter(|entry| entry.version >= resolved_start && entry.version <= end); for file in filtered_log_tail { // Track max published version for published commits from the log_tail if matches!(file.file_type, LogPathFileType::Commit) { @@ -383,12 +435,11 @@ impl LogSegmentFiles { let start = start_version.unwrap_or(0); let end = end_version.unwrap_or(Version::MAX); let fs_iter = list_from_storage(storage, log_root, start, end)?; - Self::build_log_segment_files(fs_iter, log_tail, start, end_version) + Self::build_log_segment_files(fs_iter, log_tail, Some(start), end_version) } /// List all commit and checkpoint files after the provided checkpoint. It is guaranteed that all /// the returned [`ParsedLogPath`]s will have a version less than or equal to the `end_version`. - /// See [`list_log_files_with_version`] for details on the return type. pub(crate) fn list_with_checkpoint_hint( checkpoint_metadata: &LastCheckpointHint, storage: &dyn StorageHandler, @@ -425,10 +476,61 @@ impl LogSegmentFiles { } Ok(listed_files) } + + /// Lists log files by scanning backward from `end_version` in 1000-version windows until a + /// complete checkpoint is found or the log is exhausted. The resulting files are combined + /// with the `log_tail` (catalog-provided commits) to build a [`LogSegmentFiles`]. + /// + /// This avoids a full forward listing when we have an upper-bound version but no checkpoint + /// hint: instead of listing from version 0, we walk backward from `end_version` in bounded + /// windows, stopping as soon as we find a complete checkpoint (or reach version 0). + /// + /// For example, given end_version = 12500 and a checkpoint at v8900: + /// - Window 1 [11501, 12500]: no checkpoint -> continue + /// - Window 2 [10501, 11500]: no checkpoint -> continue + /// - Window 3 [9501, 10500]: no checkpoint -> continue + /// - Window 4 [8501, 9500]: checkpoint at v8900 found -> stop + /// All files from windows 1-4 are then passed to `build_log_segment_files`, which + /// returns a log segment with the checkpoint at v8900 and all commits from v8901 to v12500 + #[instrument(name = "log.list_with_backward_checkpoint_scan", skip_all, fields(end = end_version), err)] + pub(crate) fn list_with_backward_checkpoint_scan( + storage: &dyn StorageHandler, + log_root: &Url, + log_tail: Vec, + end_version: Version, + ) -> DeltaResult { + // Scan backward in 1000-version windows, collecting ALL file types, until a complete + // checkpoint is found or the log is exhausted. + let mut windows: Vec> = Vec::new(); + let mut upper = end_version; + + loop { + let lower = upper.saturating_sub(BACKWARD_SCAN_WINDOW_SIZE); + let window_files = list_from_storage(storage, log_root, lower, upper)? + .collect::>>()?; + + let checkpoint_found = has_complete_checkpoint_in(&window_files); + windows.push(window_files); + + if checkpoint_found || lower == 0 { + break; + } + upper = lower - 1; + } + + // Replay all windows in ascending version order. + windows.reverse(); + let fs_iter = windows.into_iter().flatten().map(Ok); + + // Pass None so the log_tail lower bound is derived from the checkpoint found during + // the fs phase. + Self::build_log_segment_files(fs_iter, log_tail, None, Some(end_version)) + } } #[cfg(test)] mod list_log_files_with_log_tail_tests { + use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use url::Url; @@ -545,6 +647,56 @@ mod list_log_files_with_log_tail_tests { ); } + /// A [`StorageHandler`] wrapper that counts the number of `list_from` calls. + /// Used to verify that `list_with_backward_checkpoint_scan` issues the expected + /// number of storage listing requests. + struct CountingStorageHandler { + inner: Box, + list_from_count: AtomicU32, + } + + impl CountingStorageHandler { + fn new(inner: Box) -> Self { + Self { + inner, + list_from_count: AtomicU32::new(0), + } + } + + fn call_count(&self) -> u32 { + self.list_from_count.load(Ordering::Relaxed) + } + } + + impl StorageHandler for CountingStorageHandler { + fn list_from( + &self, + path: &Url, + ) -> DeltaResult>>> { + self.list_from_count.fetch_add(1, Ordering::Relaxed); + self.inner.list_from(path) + } + + fn read_files( + &self, + _files: Vec, + ) -> DeltaResult>>> { + panic!("read_files should not be called during listing"); + } + + fn put(&self, _path: &Url, _data: bytes::Bytes, _overwrite: bool) -> DeltaResult<()> { + panic!("put should not be called during listing"); + } + + fn copy_atomic(&self, _src: &Url, _dest: &Url) -> DeltaResult<()> { + panic!("copy_atomic should not be called during listing"); + } + + fn head(&self, _path: &Url) -> DeltaResult { + panic!("head should not be called during listing"); + } + } + /// Helper to call `LogSegmentFiles::list()` and destructure the result for assertions. /// Returns (ascending_commit_files, ascending_compaction_files, checkpoint_parts, /// latest_crc_file, latest_commit_file, max_published_version). @@ -575,6 +727,8 @@ mod list_log_files_with_log_tail_tests { ) } + // ===== list() tests ===== + #[tokio::test] async fn test_empty_log_tail() { let log_files = vec![ @@ -901,4 +1055,527 @@ mod list_log_files_with_log_tail_tests { // max_published_version reflects all published commits seen (filesystem 0-5 + log_tail 6-10) assert_eq!(max_pub, Some(10)); } + + // ===== list_with_backward_checkpoint_scan() tests ===== + + #[tokio::test] + async fn backward_scan_no_checkpoint_returns_all_commits_from_v0() { + // When no checkpoint exists, list_with_backward_checkpoint_scan scans all the way to v0 + // and returns every commit. Spans >1000 versions to exercise multiple windows. + // Window 1: [6, 1005] -> v1004, v1005, no checkpoint, lower=6 != 0 -> continue + // Window 2: [0, 5] -> v0-v5, no checkpoint, lower=0 -> break + let log_files = vec![ + (0, LogPathFileType::Commit, CommitSource::Filesystem), + (1, LogPathFileType::Commit, CommitSource::Filesystem), + (2, LogPathFileType::Commit, CommitSource::Filesystem), + (3, LogPathFileType::Commit, CommitSource::Filesystem), + (4, LogPathFileType::Commit, CommitSource::Filesystem), + (5, LogPathFileType::Commit, CommitSource::Filesystem), + (1004, LogPathFileType::Commit, CommitSource::Filesystem), + (1005, LogPathFileType::Commit, CommitSource::Filesystem), + ]; + let (storage, log_root) = create_storage(log_files).await; + let counter = CountingStorageHandler::new(storage); + + let result = + LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, vec![], 1005) + .unwrap(); + + assert_eq!(counter.call_count(), 2); + + assert!(result.checkpoint_parts.is_empty()); + + // All 8 commits returned in ascending order + assert_eq!(result.ascending_commit_files.len(), 8); + assert_eq!(result.ascending_commit_files[0].version, 0); + assert_eq!(result.ascending_commit_files[1].version, 1); + assert_eq!(result.ascending_commit_files[2].version, 2); + assert_eq!(result.ascending_commit_files[3].version, 3); + assert_eq!(result.ascending_commit_files[4].version, 4); + assert_eq!(result.ascending_commit_files[5].version, 5); + assert_eq!(result.ascending_commit_files[6].version, 1004); + assert_eq!(result.ascending_commit_files[7].version, 1005); + + assert_eq!(result.latest_commit_file.unwrap().version, 1005); + } + + #[tokio::test] + async fn backward_scan_checkpoint_found_in_second_window() { + // end_version=3000, checkpoint at v1500, sparse commits on either side. + // Window 1 [2001, 3000]: no checkpoint -> continue. + // Window 2 [1001, 2000]: checkpoint at v1500 -> stop. + let log_files = vec![ + ( + 1500, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + ), + (1501, LogPathFileType::Commit, CommitSource::Filesystem), + (3000, LogPathFileType::Commit, CommitSource::Filesystem), + ]; + let (storage, log_root) = create_storage(log_files).await; + let counter = CountingStorageHandler::new(storage); + + let result = + LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, vec![], 3000) + .unwrap(); + + assert_eq!(counter.call_count(), 2); + + assert_eq!(result.checkpoint_parts.len(), 1); + assert_eq!(result.checkpoint_parts[0].version, 1500); + + assert_eq!(result.ascending_commit_files.len(), 2); + assert_eq!(result.ascending_commit_files[0].version, 1501); + assert_eq!(result.ascending_commit_files[1].version, 3000); + + assert_eq!(result.latest_commit_file.unwrap().version, 3000); + } + + #[tokio::test] + async fn backward_scan_incomplete_in_second_window_complete_in_third_three_calls() { + // end_version=3000. Window 2 contains an incomplete 2-of-2 multipart checkpoint (only + // part 1 present). has_complete_checkpoint_in must return false for window 2, causing + // the scan to continue to window 3, where a complete single-part checkpoint at v500 is + // found. Verifies that incomplete parts from window 2 are discarded and do not pollute + // the result's checkpoint_parts. + // + // Window 1 [2001, 3000]: v2500, v3000 — no checkpoint -> lower=2001 != 0 -> continue + // Window 2 [1001, 2000]: v1500 (1-of-2 parts) — incomplete -> lower=1001 != 0 -> continue + // Window 3 [1, 1000]: v500 (complete), v501 — checkpoint found -> break + let log_files = vec![ + ( + 500, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + ), + (501, LogPathFileType::Commit, CommitSource::Filesystem), + ( + 1500, + LogPathFileType::MultiPartCheckpoint { + part_num: 1, + num_parts: 2, + }, + CommitSource::Filesystem, + ), + (2500, LogPathFileType::Commit, CommitSource::Filesystem), + (3000, LogPathFileType::Commit, CommitSource::Filesystem), + ]; + let (storage, log_root) = create_storage(log_files).await; + let counter = CountingStorageHandler::new(storage); + + let result = + LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, vec![], 3000) + .unwrap(); + + assert_eq!(counter.call_count(), 3); + + // Only the complete checkpoint at v500; the incomplete v1500 part is discarded + assert_eq!(result.checkpoint_parts.len(), 1); + assert_eq!(result.checkpoint_parts[0].version, 500); + + // Commits after the checkpoint: v501 (FS), v2500 (FS), v3000 (FS) + assert_eq!(result.ascending_commit_files.len(), 3); + assert_eq!(result.ascending_commit_files[0].version, 501); + assert_eq!(result.ascending_commit_files[1].version, 2500); + assert_eq!(result.ascending_commit_files[2].version, 3000); + + assert_eq!(result.latest_commit_file.unwrap().version, 3000); + } + + #[tokio::test] + async fn backward_scan_with_multipart_checkpoint() { + // A complete 3-part checkpoint at v50 with commits v51/v52 after it. + // Single window [0, 52]: checkpoint found -> stop. + let log_files = vec![ + (0, LogPathFileType::Commit, CommitSource::Filesystem), + (1, LogPathFileType::Commit, CommitSource::Filesystem), + ( + 50, + LogPathFileType::MultiPartCheckpoint { + part_num: 1, + num_parts: 3, + }, + CommitSource::Filesystem, + ), + ( + 50, + LogPathFileType::MultiPartCheckpoint { + part_num: 2, + num_parts: 3, + }, + CommitSource::Filesystem, + ), + ( + 50, + LogPathFileType::MultiPartCheckpoint { + part_num: 3, + num_parts: 3, + }, + CommitSource::Filesystem, + ), + (51, LogPathFileType::Commit, CommitSource::Filesystem), + (52, LogPathFileType::Commit, CommitSource::Filesystem), + ]; + let (storage, log_root) = create_storage(log_files).await; + let counter = CountingStorageHandler::new(storage); + + let result = + LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, vec![], 52) + .unwrap(); + + assert_eq!(counter.call_count(), 1); + + assert_eq!(result.checkpoint_parts.len(), 3); + assert!(result.checkpoint_parts.iter().all(|p| p.version == 50)); + + assert_eq!(result.ascending_commit_files.len(), 2); + assert_eq!(result.ascending_commit_files[0].version, 51); + assert_eq!(result.ascending_commit_files[1].version, 52); + + assert_eq!(result.latest_commit_file.unwrap().version, 52); + } + + #[tokio::test] + async fn backward_scan_with_log_tail_derives_lower_bound_from_checkpoint() { + // Backward scan with a non-empty log_tail spanning >1000 versions. The FS contains + // commits v0-v5, a checkpoint at v50, and commits v51-v52. The log_tail provides + // catalog commits v1100-v1102. + // Window 1: [103, 1102] -> no FS checkpoint in range -> continue + // Window 2: [0, 102] -> checkpoint at v50 found -> stop + let log_files = vec![ + (0, LogPathFileType::Commit, CommitSource::Filesystem), + (1, LogPathFileType::Commit, CommitSource::Filesystem), + (2, LogPathFileType::Commit, CommitSource::Filesystem), + (3, LogPathFileType::Commit, CommitSource::Filesystem), + (4, LogPathFileType::Commit, CommitSource::Filesystem), + (5, LogPathFileType::Commit, CommitSource::Filesystem), + ( + 50, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + ), + (51, LogPathFileType::Commit, CommitSource::Filesystem), + (52, LogPathFileType::Commit, CommitSource::Filesystem), + ]; + let (storage, log_root) = create_storage(log_files).await; + let counter = CountingStorageHandler::new(storage); + + let log_tail = vec![ + make_parsed_log_path_with_source(1100, LogPathFileType::Commit, CommitSource::Catalog), + make_parsed_log_path_with_source(1101, LogPathFileType::Commit, CommitSource::Catalog), + make_parsed_log_path_with_source(1102, LogPathFileType::Commit, CommitSource::Catalog), + ]; + + let result = LogSegmentFiles::list_with_backward_checkpoint_scan( + &counter, &log_root, log_tail, 1102, + ) + .unwrap(); + + assert_eq!(counter.call_count(), 2); + + // Checkpoint at v50 + assert_eq!(result.checkpoint_parts.len(), 1); + assert_eq!(result.checkpoint_parts[0].version, 50); + + // Commits: v51, v52 from FS; v1100, v1101, v1102 from catalog + assert_eq!(result.ascending_commit_files.len(), 5); + assert_eq!(result.ascending_commit_files[0].version, 51); + assert_eq!(result.ascending_commit_files[1].version, 52); + assert_eq!(result.ascending_commit_files[2].version, 1100); + assert_eq!(result.ascending_commit_files[3].version, 1101); + assert_eq!(result.ascending_commit_files[4].version, 1102); + assert_source(&result.ascending_commit_files[0], CommitSource::Filesystem); + assert_source(&result.ascending_commit_files[1], CommitSource::Filesystem); + assert_source(&result.ascending_commit_files[2], CommitSource::Catalog); + assert_source(&result.ascending_commit_files[3], CommitSource::Catalog); + assert_source(&result.ascending_commit_files[4], CommitSource::Catalog); + + assert_eq!(result.latest_commit_file.unwrap().version, 1102); + } + + #[tokio::test] + async fn backward_scan_with_log_tail_starting_before_checkpoint() { + // log_tail starts at v30, before the checkpoint at v50. The derived lower bound must + // be cp_version + 1 = 51, so log_tail entries at v30, v40, v50 are excluded. + // Single window [0, 70]: checkpoint at v50 found -> stop. + let log_files = vec![ + (0, LogPathFileType::Commit, CommitSource::Filesystem), + (1, LogPathFileType::Commit, CommitSource::Filesystem), + (2, LogPathFileType::Commit, CommitSource::Filesystem), + ( + 50, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + ), + ]; + let (storage, log_root) = create_storage(log_files).await; + let counter = CountingStorageHandler::new(storage); + + let log_tail = vec![ + make_parsed_log_path_with_source(30, LogPathFileType::Commit, CommitSource::Catalog), + make_parsed_log_path_with_source(40, LogPathFileType::Commit, CommitSource::Catalog), + make_parsed_log_path_with_source(50, LogPathFileType::Commit, CommitSource::Catalog), + make_parsed_log_path_with_source(60, LogPathFileType::Commit, CommitSource::Catalog), + make_parsed_log_path_with_source(70, LogPathFileType::Commit, CommitSource::Catalog), + ]; + + let result = + LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, log_tail, 70) + .unwrap(); + + assert_eq!(counter.call_count(), 1); + + // Checkpoint at v50 found; lower bound = 51, so log_tail v30/v40/v50 are excluded + assert_eq!(result.checkpoint_parts.len(), 1); + assert_eq!(result.checkpoint_parts[0].version, 50); + + assert_eq!(result.ascending_commit_files.len(), 2); + assert_eq!(result.ascending_commit_files[0].version, 60); + assert_eq!(result.ascending_commit_files[1].version, 70); + assert_source(&result.ascending_commit_files[0], CommitSource::Catalog); + assert_source(&result.ascending_commit_files[1], CommitSource::Catalog); + + assert_eq!(result.latest_commit_file.unwrap().version, 70); + } + + #[tokio::test] + async fn backward_scan_no_checkpoint_with_log_tail_uses_lower_bound_zero() { + // When the backward scan exhausts all windows to v0 without finding a checkpoint, + // the derived lower bound falls back to 0, so all log_tail entries are included. + // FS has v0/v1/v2, log_tail has v1050/v1051/v1052. Spans >1000 versions. + // Window 1: [53, 1052] -> no checkpoint, lower=53 != 0 -> continue + // Window 2: [0, 52] -> no checkpoint, lower=0 -> break + let log_files = vec![ + (0, LogPathFileType::Commit, CommitSource::Filesystem), + (1, LogPathFileType::Commit, CommitSource::Filesystem), + (2, LogPathFileType::Commit, CommitSource::Filesystem), + ]; + let (storage, log_root) = create_storage(log_files).await; + let counter = CountingStorageHandler::new(storage); + + let log_tail = vec![ + make_parsed_log_path_with_source(1050, LogPathFileType::Commit, CommitSource::Catalog), + make_parsed_log_path_with_source(1051, LogPathFileType::Commit, CommitSource::Catalog), + make_parsed_log_path_with_source(1052, LogPathFileType::Commit, CommitSource::Catalog), + ]; + + let result = LogSegmentFiles::list_with_backward_checkpoint_scan( + &counter, &log_root, log_tail, 1052, + ) + .unwrap(); + + assert_eq!(counter.call_count(), 2); + + // No checkpoint found; lower bound = 0, so all FS commits (v0-v2) and all log_tail + // entries (v1050-v1052) are included + assert!(result.checkpoint_parts.is_empty()); + assert_eq!(result.ascending_commit_files.len(), 6); + assert_eq!(result.ascending_commit_files[0].version, 0); + assert_eq!(result.ascending_commit_files[1].version, 1); + assert_eq!(result.ascending_commit_files[2].version, 2); + assert_eq!(result.ascending_commit_files[3].version, 1050); + assert_eq!(result.ascending_commit_files[4].version, 1051); + assert_eq!(result.ascending_commit_files[5].version, 1052); + assert_source(&result.ascending_commit_files[0], CommitSource::Filesystem); + assert_source(&result.ascending_commit_files[1], CommitSource::Filesystem); + assert_source(&result.ascending_commit_files[2], CommitSource::Filesystem); + assert_source(&result.ascending_commit_files[3], CommitSource::Catalog); + assert_source(&result.ascending_commit_files[4], CommitSource::Catalog); + assert_source(&result.ascending_commit_files[5], CommitSource::Catalog); + assert_eq!(result.latest_commit_file.unwrap().version, 1052); + } + + #[tokio::test] + async fn backward_scan_log_tail_defines_latest_version() { + // The backward scan only lists up to end_version=1001; FS v1002 is above that bound + // and never seen, so max_published_version=Some(1001) not Some(1002). + // Window 1: [2, 1001] -> v1001(FS commit listed, but skipped since >= log_tail_start); + // no checkpoint -> continue + // Window 2: [0, 1] -> v0(FS commit); no checkpoint, lower=0 -> break + let log_files = vec![ + (0, LogPathFileType::Commit, CommitSource::Filesystem), + (1001, LogPathFileType::Commit, CommitSource::Filesystem), + (1002, LogPathFileType::Commit, CommitSource::Filesystem), + ]; + let (storage, log_root) = create_storage(log_files).await; + let counter = CountingStorageHandler::new(storage); + + let log_tail = vec![make_parsed_log_path_with_source( + 1001, + LogPathFileType::Commit, + CommitSource::Catalog, + )]; + + let result = LogSegmentFiles::list_with_backward_checkpoint_scan( + &counter, &log_root, log_tail, 1001, + ) + .unwrap(); + + assert_eq!(counter.call_count(), 2); + + assert_eq!(result.ascending_commit_files.len(), 2); + assert_eq!(result.ascending_commit_files[0].version, 0); + assert_eq!(result.ascending_commit_files[1].version, 1001); + assert_source(&result.ascending_commit_files[0], CommitSource::Filesystem); + assert_source(&result.ascending_commit_files[1], CommitSource::Catalog); + assert_eq!(result.latest_commit_file.unwrap().version, 1001); + // v1002 is above end_version and never listed, so max_pub=Some(1001) not Some(1002) + assert_eq!(result.max_published_version, Some(1001)); + } + + #[tokio::test] + async fn backward_scan_non_commit_files_at_log_tail_versions_are_preserved() { + // FS has commits v0-v5, checkpoint v7, CRC v8; log_tail v6-v10. + // Checkpoint and CRC are preserved even though their versions are covered by log_tail. + // Single window [0, 10]: checkpoint at v7 found -> stop. + let log_files = vec![ + (0, LogPathFileType::Commit, CommitSource::Filesystem), + (1, LogPathFileType::Commit, CommitSource::Filesystem), + (2, LogPathFileType::Commit, CommitSource::Filesystem), + (3, LogPathFileType::Commit, CommitSource::Filesystem), + (4, LogPathFileType::Commit, CommitSource::Filesystem), + (5, LogPathFileType::Commit, CommitSource::Filesystem), + ( + 7, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + ), + (8, LogPathFileType::Crc, CommitSource::Filesystem), + ]; + let (storage, log_root) = create_storage(log_files).await; + let counter = CountingStorageHandler::new(storage); + + let log_tail = vec![ + make_parsed_log_path_with_source(6, LogPathFileType::Commit, CommitSource::Catalog), + make_parsed_log_path_with_source(7, LogPathFileType::Commit, CommitSource::Catalog), + make_parsed_log_path_with_source(8, LogPathFileType::Commit, CommitSource::Catalog), + make_parsed_log_path_with_source(9, LogPathFileType::Commit, CommitSource::Catalog), + make_parsed_log_path_with_source(10, LogPathFileType::Commit, CommitSource::Catalog), + ]; + + let result = + LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, log_tail, 10) + .unwrap(); + + assert_eq!(counter.call_count(), 1); + + // Checkpoint at version 7 is preserved from filesystem + assert_eq!(result.checkpoint_parts.len(), 1); + assert_eq!(result.checkpoint_parts[0].version, 7); + assert!(result.checkpoint_parts[0].is_checkpoint()); + + // CRC at version 8 is preserved from filesystem + let crc = result.latest_crc_file.unwrap(); + assert_eq!(crc.version, 8); + assert!(matches!(crc.file_type, LogPathFileType::Crc)); + + // After checkpoint at v7: resolved_start = 8 (checkpoint version + 1). + // Log_tail commits at v6-v7 are below resolved_start and filtered out; + // only log_tail commits v8-v10 remain (3 commits). + assert_eq!(result.ascending_commit_files.len(), 3); + for (i, commit) in result.ascending_commit_files.iter().enumerate() { + assert_eq!(commit.version, (i + 8) as u64); + assert_source(commit, CommitSource::Catalog); + } + assert_eq!(result.latest_commit_file.unwrap().version, 10); + // max_published_version reflects filesystem commits 0-5 + log_tail commits 6-10 + assert_eq!(result.max_published_version, Some(10)); + } + + #[tokio::test] + async fn backward_scan_window_stops_at_zero_single_call() { + // end_version=500: lower = 500.saturating_sub(999) = 0, so the loop breaks immediately + // after the first window. Proves underflow safety via saturating_sub. + let log_files = vec![ + (0, LogPathFileType::Commit, CommitSource::Filesystem), + (500, LogPathFileType::Commit, CommitSource::Filesystem), + ]; + let (storage, log_root) = create_storage(log_files).await; + let counter = CountingStorageHandler::new(storage); + + LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, vec![], 500) + .unwrap(); + + assert_eq!(counter.call_count(), 1); + } + + #[tokio::test] + async fn backward_scan_window_boundary_999_single_call() { + // end_version=999: lower = 999 - 999 = 0, so exactly one 1000-version window covers + // the entire log and the loop exits after one call. + let log_files = vec![ + (0, LogPathFileType::Commit, CommitSource::Filesystem), + (999, LogPathFileType::Commit, CommitSource::Filesystem), + ]; + let (storage, log_root) = create_storage(log_files).await; + let counter = CountingStorageHandler::new(storage); + + LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, vec![], 999) + .unwrap(); + + assert_eq!(counter.call_count(), 1); + } + + #[tokio::test] + async fn backward_scan_window_boundary_1000_two_calls() { + // end_version=1000: first window lower = 1000 - 999 = 1 (not 0), so the loop + // continues. Second window: upper = 0, lower = 0.saturating_sub(999) = 0, breaks. + // This is the boundary where a second call becomes necessary. + let log_files = vec![ + (0, LogPathFileType::Commit, CommitSource::Filesystem), + (1000, LogPathFileType::Commit, CommitSource::Filesystem), + ]; + let (storage, log_root) = create_storage(log_files).await; + let counter = CountingStorageHandler::new(storage); + + LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, vec![], 1000) + .unwrap(); + + assert_eq!(counter.call_count(), 2); + } + + // ===== has_complete_checkpoint_in direct unit tests (other cases for has_complete_checkpoint_in already covered by tests above) ===== + + #[test] + fn has_complete_checkpoint_empty_slice_returns_false() { + assert!(!has_complete_checkpoint_in(&[])); + } + + #[test] + fn has_complete_checkpoint_zero_size_filtered_returns_false() { + // The size > 0 guard must reject zero-size checkpoints to avoid treating empty/corrupt + // files as complete checkpoints. + let mut p = make_parsed_log_path_with_source( + 5, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + ); + // make_parsed_log_path_with_source always sets a non-zero size; override here to + // simulate a corrupt/empty checkpoint file and exercise the size > 0 guard. + p.location.size = 0; + assert!(!has_complete_checkpoint_in(&[p])); + } + + #[test] + fn has_complete_checkpoint_incomplete_at_one_version_complete_at_another_returns_true() { + // An incomplete checkpoint at v5 (1 of 3 parts) followed by a complete checkpoint at v10. + // has_complete_checkpoint_in must continue past the failed group and find the complete one. + let files = vec![ + make_parsed_log_path_with_source( + 5, + LogPathFileType::MultiPartCheckpoint { + part_num: 1, + num_parts: 3, + }, + CommitSource::Filesystem, + ), + make_parsed_log_path_with_source( + 10, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + ), + ]; + assert!(has_complete_checkpoint_in(&files)); + } } From 5f34c71367968f6347d2066115b67ba9a294b0a1 Mon Sep 17 00:00:00 2001 From: Lorena Rosati Date: Tue, 24 Mar 2026 00:26:29 +0000 Subject: [PATCH 03/18] improved testing --- kernel/src/log_segment_files.rs | 742 +++++++++++++------------------- 1 file changed, 310 insertions(+), 432 deletions(-) diff --git a/kernel/src/log_segment_files.rs b/kernel/src/log_segment_files.rs index 198d03bfff..1908278cbe 100644 --- a/kernel/src/log_segment_files.rs +++ b/kernel/src/log_segment_files.rs @@ -244,9 +244,10 @@ impl ListingAccumulator { } } -/// Size of backward-scan window in LogSegmentFiles::list_with_backward_checkpoint_scan` -/// The range `[upper - 999, upper]` is inclusive on both ends, giving exactly 1000 versions per window -const BACKWARD_SCAN_WINDOW_SIZE: u64 = 999; +/// Number of versions covered by each backward-scan window in +/// `LogSegmentFiles::list_with_backward_checkpoint_scan`. The range `[lower, upper)` is +/// half-open, so a window size of 1000 covers exactly 1000 versions. +const BACKWARD_SCAN_WINDOW_SIZE: u64 = 1000; impl LogSegmentFiles { /// Assembles a `LogSegmentFiles` from `fs_files` (an iterator of files @@ -486,10 +487,10 @@ impl LogSegmentFiles { /// windows, stopping as soon as we find a complete checkpoint (or reach version 0). /// /// For example, given end_version = 12500 and a checkpoint at v8900: - /// - Window 1 [11501, 12500]: no checkpoint -> continue - /// - Window 2 [10501, 11500]: no checkpoint -> continue - /// - Window 3 [9501, 10500]: no checkpoint -> continue - /// - Window 4 [8501, 9500]: checkpoint at v8900 found -> stop + /// - Window 1 [11501, 12501): no checkpoint -> continue + /// - Window 2 [10501, 11501): no checkpoint -> continue + /// - Window 3 [9501, 10501): no checkpoint -> continue + /// - Window 4 [8501, 9501): checkpoint at v8900 found -> stop /// All files from windows 1-4 are then passed to `build_log_segment_files`, which /// returns a log segment with the checkpoint at v8900 and all commits from v8901 to v12500 #[instrument(name = "log.list_with_backward_checkpoint_scan", skip_all, fields(end = end_version), err)] @@ -502,25 +503,24 @@ impl LogSegmentFiles { // Scan backward in 1000-version windows, collecting ALL file types, until a complete // checkpoint is found or the log is exhausted. let mut windows: Vec> = Vec::new(); - let mut upper = end_version; - - loop { + // upper is the exclusive upper bound of the next window; adding 1 includes end_version + // in the first window. The inclusive range passed to list_from_storage is [lower, upper - 1]. + let mut upper = end_version + 1; + while upper > 0 { let lower = upper.saturating_sub(BACKWARD_SCAN_WINDOW_SIZE); - let window_files = list_from_storage(storage, log_root, lower, upper)? - .collect::>>()?; + let window_files: Vec<_> = + list_from_storage(storage, log_root, lower, upper - 1)?.try_collect()?; let checkpoint_found = has_complete_checkpoint_in(&window_files); windows.push(window_files); - if checkpoint_found || lower == 0 { + if checkpoint_found { break; } - upper = lower - 1; + upper = lower; } - // Replay all windows in ascending version order. - windows.reverse(); - let fs_iter = windows.into_iter().flatten().map(Ok); + let fs_iter = windows.into_iter().rev().flatten().map(Ok); // Pass None so the log_tail lower bound is derived from the checkpoint found during // the fs phase. @@ -535,6 +535,8 @@ mod list_log_files_with_log_tail_tests { use url::Url; + use rstest::rstest; + use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::engine::default::filesystem::ObjectStoreStorageHandler; use crate::object_store::{memory::InMemory, path::Path as ObjectPath, ObjectStore}; @@ -1058,138 +1060,116 @@ mod list_log_files_with_log_tail_tests { // ===== list_with_backward_checkpoint_scan() tests ===== + // Log from v0 to v1005. Each case places an optional single-part checkpoint and + // verifies the expected commits, checkpoint version, and number of storage listings. + // + // Window boundaries (window size=1000, end_version=1005, exclusive upper): + // Window 1: [6, 1006) covers v6..=v1005 + // Window 2: [0, 6) covers v0..=v5 + // + // A checkpoint at v6+ is found in window 1 (1 listing); at v5 or lower in window 2 + // (2 listings). A checkpoint beyond end_version is never seen. + #[rstest] + // No checkpoint: scan exhausts both windows, all 1006 commits returned + #[case::no_checkpoint(None, 1005, (0u64..=1005).collect(), None, 2)] + // Checkpoint beyond end_version is never seen; same behavior as no checkpoint + #[case::checkpoint_beyond_end(Some(1006), 1005, (0u64..=1005).collect(), None, 2)] + // Checkpoint at end_version: found in window 1, no commits after it + #[case::checkpoint_at_end(Some(1005), 1005, vec![], Some(1005), 1)] + // Checkpoint at v5: falls in window 2 -> 2 listings; commits 6..=1005 returned. + // Tests the inclusive window boundary: window 1 covers [6, 1006) or [6, 1005] (lower = 1006 - 1000 = 6), + // so v5 falls just outside it and requires a second listing, while v6 (next case) does not. + #[case::checkpoint_in_second_window(Some(5), 1005, (6u64..=1005).collect(), Some(5), 2)] + // Checkpoint at v6: falls in window 1 -> 1 listing; commits 7..=1005 returned + #[case::checkpoint_in_first_window(Some(6), 1005, (7u64..=1005).collect(), Some(6), 1)] #[tokio::test] - async fn backward_scan_no_checkpoint_returns_all_commits_from_v0() { - // When no checkpoint exists, list_with_backward_checkpoint_scan scans all the way to v0 - // and returns every commit. Spans >1000 versions to exercise multiple windows. - // Window 1: [6, 1005] -> v1004, v1005, no checkpoint, lower=6 != 0 -> continue - // Window 2: [0, 5] -> v0-v5, no checkpoint, lower=0 -> break - let log_files = vec![ - (0, LogPathFileType::Commit, CommitSource::Filesystem), - (1, LogPathFileType::Commit, CommitSource::Filesystem), - (2, LogPathFileType::Commit, CommitSource::Filesystem), - (3, LogPathFileType::Commit, CommitSource::Filesystem), - (4, LogPathFileType::Commit, CommitSource::Filesystem), - (5, LogPathFileType::Commit, CommitSource::Filesystem), - (1004, LogPathFileType::Commit, CommitSource::Filesystem), - (1005, LogPathFileType::Commit, CommitSource::Filesystem), - ]; - let (storage, log_root) = create_storage(log_files).await; - let counter = CountingStorageHandler::new(storage); - - let result = - LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, vec![], 1005) - .unwrap(); - - assert_eq!(counter.call_count(), 2); - - assert!(result.checkpoint_parts.is_empty()); - - // All 8 commits returned in ascending order - assert_eq!(result.ascending_commit_files.len(), 8); - assert_eq!(result.ascending_commit_files[0].version, 0); - assert_eq!(result.ascending_commit_files[1].version, 1); - assert_eq!(result.ascending_commit_files[2].version, 2); - assert_eq!(result.ascending_commit_files[3].version, 3); - assert_eq!(result.ascending_commit_files[4].version, 4); - assert_eq!(result.ascending_commit_files[5].version, 5); - assert_eq!(result.ascending_commit_files[6].version, 1004); - assert_eq!(result.ascending_commit_files[7].version, 1005); - - assert_eq!(result.latest_commit_file.unwrap().version, 1005); - } + async fn backward_scan_single_checkpoint_cases( + #[case] checkpoint_version: Option, + #[case] end_version: u64, + #[case] expected_commits: Vec, + #[case] expected_checkpoint: Option, + #[case] expected_listings: u32, + ) { + let mut log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=1005) + .map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem)) + .collect(); + + // Add a checkpoint only if it falls within the log range + if let Some(cp) = checkpoint_version { + if cp <= 1005 { + log_files.push(( + cp, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + )); + } + } - #[tokio::test] - async fn backward_scan_checkpoint_found_in_second_window() { - // end_version=3000, checkpoint at v1500, sparse commits on either side. - // Window 1 [2001, 3000]: no checkpoint -> continue. - // Window 2 [1001, 2000]: checkpoint at v1500 -> stop. - let log_files = vec![ - ( - 1500, - LogPathFileType::SinglePartCheckpoint, - CommitSource::Filesystem, - ), - (1501, LogPathFileType::Commit, CommitSource::Filesystem), - (3000, LogPathFileType::Commit, CommitSource::Filesystem), - ]; let (storage, log_root) = create_storage(log_files).await; let counter = CountingStorageHandler::new(storage); - let result = - LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, vec![], 3000) - .unwrap(); - - assert_eq!(counter.call_count(), 2); + let result = LogSegmentFiles::list_with_backward_checkpoint_scan( + &counter, + &log_root, + vec![], + end_version, + ) + .unwrap(); - assert_eq!(result.checkpoint_parts.len(), 1); - assert_eq!(result.checkpoint_parts[0].version, 1500); + assert_eq!(counter.call_count(), expected_listings); - assert_eq!(result.ascending_commit_files.len(), 2); - assert_eq!(result.ascending_commit_files[0].version, 1501); - assert_eq!(result.ascending_commit_files[1].version, 3000); + assert_eq!( + result.checkpoint_parts.len(), + if expected_checkpoint.is_some() { 1 } else { 0 } + ); + if let Some(cp_version) = expected_checkpoint { + assert_eq!(result.checkpoint_parts[0].version, cp_version); + } - assert_eq!(result.latest_commit_file.unwrap().version, 3000); + let actual_commits: Vec = result + .ascending_commit_files + .iter() + .map(|f| f.version) + .collect(); + assert_eq!(actual_commits, expected_commits); } - #[tokio::test] - async fn backward_scan_incomplete_in_second_window_complete_in_third_three_calls() { + fn files_incomplete_in_second_window_complete_in_third_window( + ) -> Vec<(Version, LogPathFileType, CommitSource)> { // end_version=3000. Window 2 contains an incomplete 2-of-2 multipart checkpoint (only // part 1 present). has_complete_checkpoint_in must return false for window 2, causing // the scan to continue to window 3, where a complete single-part checkpoint at v500 is // found. Verifies that incomplete parts from window 2 are discarded and do not pollute // the result's checkpoint_parts. // - // Window 1 [2001, 3000]: v2500, v3000 — no checkpoint -> lower=2001 != 0 -> continue - // Window 2 [1001, 2000]: v1500 (1-of-2 parts) — incomplete -> lower=1001 != 0 -> continue - // Window 3 [1, 1000]: v500 (complete), v501 — checkpoint found -> break - let log_files = vec![ - ( - 500, - LogPathFileType::SinglePartCheckpoint, - CommitSource::Filesystem, - ), - (501, LogPathFileType::Commit, CommitSource::Filesystem), - ( - 1500, - LogPathFileType::MultiPartCheckpoint { - part_num: 1, - num_parts: 2, - }, - CommitSource::Filesystem, - ), - (2500, LogPathFileType::Commit, CommitSource::Filesystem), - (3000, LogPathFileType::Commit, CommitSource::Filesystem), - ]; - let (storage, log_root) = create_storage(log_files).await; - let counter = CountingStorageHandler::new(storage); - - let result = - LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, vec![], 3000) - .unwrap(); - - assert_eq!(counter.call_count(), 3); - - // Only the complete checkpoint at v500; the incomplete v1500 part is discarded - assert_eq!(result.checkpoint_parts.len(), 1); - assert_eq!(result.checkpoint_parts[0].version, 500); - - // Commits after the checkpoint: v501 (FS), v2500 (FS), v3000 (FS) - assert_eq!(result.ascending_commit_files.len(), 3); - assert_eq!(result.ascending_commit_files[0].version, 501); - assert_eq!(result.ascending_commit_files[1].version, 2500); - assert_eq!(result.ascending_commit_files[2].version, 3000); - - assert_eq!(result.latest_commit_file.unwrap().version, 3000); + // Window 1 [2001, 3001): commits v2001..=v3000, no checkpoint -> continue + // Window 2 [1001, 2001): commits v1001..=v2000, v1500 (1-of-2 parts) incomplete -> continue + // Window 3 [1, 1001): commits v1..=v1000, v500 (complete) -> checkpoint found -> break + let mut log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=3000) + .map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem)) + .collect(); + log_files.push(( + 500, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + )); + log_files.push(( + 1500, + LogPathFileType::MultiPartCheckpoint { + part_num: 1, + num_parts: 2, + }, + CommitSource::Filesystem, + )); + log_files } - - #[tokio::test] - async fn backward_scan_with_multipart_checkpoint() { - // A complete 3-part checkpoint at v50 with commits v51/v52 after it. - // Single window [0, 52]: checkpoint found -> stop. - let log_files = vec![ - (0, LogPathFileType::Commit, CommitSource::Filesystem), - (1, LogPathFileType::Commit, CommitSource::Filesystem), + fn multipart_checkpoint_files() -> Vec<(Version, LogPathFileType, CommitSource)> { + // Log v0..=v52 with a complete 3-part checkpoint at v50. + // Single window [0, 53): checkpoint found -> stop. + let mut log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=52) + .map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem)) + .collect(); + log_files.extend([ ( 50, LogPathFileType::MultiPartCheckpoint { @@ -1214,368 +1194,266 @@ mod list_log_files_with_log_tail_tests { }, CommitSource::Filesystem, ), - (51, LogPathFileType::Commit, CommitSource::Filesystem), - (52, LogPathFileType::Commit, CommitSource::Filesystem), - ]; + ]); + log_files + } + + // Case 1: complete 3-part checkpoint at v50, single window needed + // Case 2: incomplete 1-of-2 part at v1500 in window 2, complete checkpoint at v500 in window 3 + #[rstest] + #[case::multipart_checkpoint(multipart_checkpoint_files(), 52, 1, 3, 50, 2, 51, 52)] + #[case::incomplete_in_second_window_complete_in_third( + files_incomplete_in_second_window_complete_in_third_window(), + 3000, + 3, + 1, + 500, + 2500, + 501, + 3000 + )] + #[tokio::test] + async fn backward_scan_multipart_checkpoint_cases( + #[case] log_files: Vec<(Version, LogPathFileType, CommitSource)>, + #[case] end_version: Version, + #[case] expected_listings: u32, + #[case] expected_checkpoint_parts: usize, + #[case] expected_checkpoint_version: Version, + #[case] expected_commit_count: usize, + #[case] expected_first_commit: Version, + #[case] expected_last_commit: Version, + ) { let (storage, log_root) = create_storage(log_files).await; let counter = CountingStorageHandler::new(storage); - let result = - LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, vec![], 52) - .unwrap(); - - assert_eq!(counter.call_count(), 1); - - assert_eq!(result.checkpoint_parts.len(), 3); - assert!(result.checkpoint_parts.iter().all(|p| p.version == 50)); - - assert_eq!(result.ascending_commit_files.len(), 2); - assert_eq!(result.ascending_commit_files[0].version, 51); - assert_eq!(result.ascending_commit_files[1].version, 52); + let result = LogSegmentFiles::list_with_backward_checkpoint_scan( + &counter, + &log_root, + vec![], + end_version, + ) + .unwrap(); - assert_eq!(result.latest_commit_file.unwrap().version, 52); + assert_eq!(counter.call_count(), expected_listings); + assert_eq!(result.checkpoint_parts.len(), expected_checkpoint_parts); + assert!(result + .checkpoint_parts + .iter() + .all(|p| p.version == expected_checkpoint_version)); + assert_eq!(result.ascending_commit_files.len(), expected_commit_count); + assert_eq!( + result.ascending_commit_files.first().unwrap().version, + expected_first_commit + ); + assert_eq!( + result.ascending_commit_files.last().unwrap().version, + expected_last_commit + ); + assert_eq!( + result.latest_commit_file.unwrap().version, + expected_last_commit + ); } #[tokio::test] async fn backward_scan_with_log_tail_derives_lower_bound_from_checkpoint() { - // Backward scan with a non-empty log_tail spanning >1000 versions. The FS contains - // commits v0-v5, a checkpoint at v50, and commits v51-v52. The log_tail provides - // catalog commits v1100-v1102. - // Window 1: [103, 1102] -> no FS checkpoint in range -> continue - // Window 2: [0, 102] -> checkpoint at v50 found -> stop - let log_files = vec![ - (0, LogPathFileType::Commit, CommitSource::Filesystem), - (1, LogPathFileType::Commit, CommitSource::Filesystem), - (2, LogPathFileType::Commit, CommitSource::Filesystem), - (3, LogPathFileType::Commit, CommitSource::Filesystem), - (4, LogPathFileType::Commit, CommitSource::Filesystem), - (5, LogPathFileType::Commit, CommitSource::Filesystem), - ( - 50, - LogPathFileType::SinglePartCheckpoint, - CommitSource::Filesystem, - ), - (51, LogPathFileType::Commit, CommitSource::Filesystem), - (52, LogPathFileType::Commit, CommitSource::Filesystem), - ]; + // FS: commits v0..=v7 + checkpoint at v5. log_tail: catalog commits v8..=v10. + // After finding the checkpoint at v5, the lower bound is derived as v6, so FS commits + // v6 and v7 plus all catalog entries v8..=v10 are included. + let mut log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=7) + .map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem)) + .collect(); + log_files.push(( + 5, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + )); let (storage, log_root) = create_storage(log_files).await; - let counter = CountingStorageHandler::new(storage); - let log_tail = vec![ - make_parsed_log_path_with_source(1100, LogPathFileType::Commit, CommitSource::Catalog), - make_parsed_log_path_with_source(1101, LogPathFileType::Commit, CommitSource::Catalog), - make_parsed_log_path_with_source(1102, LogPathFileType::Commit, CommitSource::Catalog), - ]; + let log_tail: Vec<_> = (8u64..=10) + .map(|v| { + make_parsed_log_path_with_source(v, LogPathFileType::Commit, CommitSource::Catalog) + }) + .collect(); let result = LogSegmentFiles::list_with_backward_checkpoint_scan( - &counter, &log_root, log_tail, 1102, + storage.as_ref(), + &log_root, + log_tail, + 10, ) .unwrap(); - assert_eq!(counter.call_count(), 2); - - // Checkpoint at v50 assert_eq!(result.checkpoint_parts.len(), 1); - assert_eq!(result.checkpoint_parts[0].version, 50); + assert_eq!(result.checkpoint_parts[0].version, 5); - // Commits: v51, v52 from FS; v1100, v1101, v1102 from catalog + // FS commits v6, v7 after the checkpoint; catalog commits v8..=v10 assert_eq!(result.ascending_commit_files.len(), 5); - assert_eq!(result.ascending_commit_files[0].version, 51); - assert_eq!(result.ascending_commit_files[1].version, 52); - assert_eq!(result.ascending_commit_files[2].version, 1100); - assert_eq!(result.ascending_commit_files[3].version, 1101); - assert_eq!(result.ascending_commit_files[4].version, 1102); + assert_eq!(result.ascending_commit_files[0].version, 6); assert_source(&result.ascending_commit_files[0], CommitSource::Filesystem); + assert_eq!(result.ascending_commit_files[1].version, 7); assert_source(&result.ascending_commit_files[1], CommitSource::Filesystem); + assert_eq!(result.ascending_commit_files[2].version, 8); assert_source(&result.ascending_commit_files[2], CommitSource::Catalog); + assert_eq!(result.ascending_commit_files[3].version, 9); assert_source(&result.ascending_commit_files[3], CommitSource::Catalog); + assert_eq!(result.ascending_commit_files[4].version, 10); assert_source(&result.ascending_commit_files[4], CommitSource::Catalog); - - assert_eq!(result.latest_commit_file.unwrap().version, 1102); + assert_eq!(result.latest_commit_file.unwrap().version, 10); } #[tokio::test] async fn backward_scan_with_log_tail_starting_before_checkpoint() { - // log_tail starts at v30, before the checkpoint at v50. The derived lower bound must - // be cp_version + 1 = 51, so log_tail entries at v30, v40, v50 are excluded. - // Single window [0, 70]: checkpoint at v50 found -> stop. - let log_files = vec![ - (0, LogPathFileType::Commit, CommitSource::Filesystem), - (1, LogPathFileType::Commit, CommitSource::Filesystem), - (2, LogPathFileType::Commit, CommitSource::Filesystem), - ( - 50, - LogPathFileType::SinglePartCheckpoint, - CommitSource::Filesystem, - ), - ]; - let (storage, log_root) = create_storage(log_files).await; - let counter = CountingStorageHandler::new(storage); - - let log_tail = vec![ - make_parsed_log_path_with_source(30, LogPathFileType::Commit, CommitSource::Catalog), - make_parsed_log_path_with_source(40, LogPathFileType::Commit, CommitSource::Catalog), - make_parsed_log_path_with_source(50, LogPathFileType::Commit, CommitSource::Catalog), - make_parsed_log_path_with_source(60, LogPathFileType::Commit, CommitSource::Catalog), - make_parsed_log_path_with_source(70, LogPathFileType::Commit, CommitSource::Catalog), - ]; - - let result = - LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, log_tail, 70) - .unwrap(); - - assert_eq!(counter.call_count(), 1); - - // Checkpoint at v50 found; lower bound = 51, so log_tail v30/v40/v50 are excluded - assert_eq!(result.checkpoint_parts.len(), 1); - assert_eq!(result.checkpoint_parts[0].version, 50); - - assert_eq!(result.ascending_commit_files.len(), 2); - assert_eq!(result.ascending_commit_files[0].version, 60); - assert_eq!(result.ascending_commit_files[1].version, 70); - assert_source(&result.ascending_commit_files[0], CommitSource::Catalog); - assert_source(&result.ascending_commit_files[1], CommitSource::Catalog); - - assert_eq!(result.latest_commit_file.unwrap().version, 70); - } - - #[tokio::test] - async fn backward_scan_no_checkpoint_with_log_tail_uses_lower_bound_zero() { - // When the backward scan exhausts all windows to v0 without finding a checkpoint, - // the derived lower bound falls back to 0, so all log_tail entries are included. - // FS has v0/v1/v2, log_tail has v1050/v1051/v1052. Spans >1000 versions. - // Window 1: [53, 1052] -> no checkpoint, lower=53 != 0 -> continue - // Window 2: [0, 52] -> no checkpoint, lower=0 -> break - let log_files = vec![ - (0, LogPathFileType::Commit, CommitSource::Filesystem), - (1, LogPathFileType::Commit, CommitSource::Filesystem), - (2, LogPathFileType::Commit, CommitSource::Filesystem), - ]; + // FS: commits v0..=v5 + checkpoint at v5 + CRC at v6. log_tail: catalog commits v3..=v8, + // starting before the checkpoint. The derived lower bound is v6, so log_tail v3..=v5 are + // excluded. Also verifies that the CRC at v6 is preserved even though v6 is a log_tail version. + let mut log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=5) + .map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem)) + .collect(); + log_files.push(( + 5, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + )); + log_files.push((6, LogPathFileType::Crc, CommitSource::Filesystem)); let (storage, log_root) = create_storage(log_files).await; - let counter = CountingStorageHandler::new(storage); - let log_tail = vec![ - make_parsed_log_path_with_source(1050, LogPathFileType::Commit, CommitSource::Catalog), - make_parsed_log_path_with_source(1051, LogPathFileType::Commit, CommitSource::Catalog), - make_parsed_log_path_with_source(1052, LogPathFileType::Commit, CommitSource::Catalog), - ]; + let log_tail: Vec<_> = (3u64..=8) + .map(|v| { + make_parsed_log_path_with_source(v, LogPathFileType::Commit, CommitSource::Catalog) + }) + .collect(); let result = LogSegmentFiles::list_with_backward_checkpoint_scan( - &counter, &log_root, log_tail, 1052, + storage.as_ref(), + &log_root, + log_tail, + 8, ) .unwrap(); - assert_eq!(counter.call_count(), 2); + // Checkpoint at v5; lower bound = 6, so log_tail v3..=v5 are excluded + assert_eq!(result.checkpoint_parts.len(), 1); + assert_eq!(result.checkpoint_parts[0].version, 5); - // No checkpoint found; lower bound = 0, so all FS commits (v0-v2) and all log_tail - // entries (v1050-v1052) are included - assert!(result.checkpoint_parts.is_empty()); - assert_eq!(result.ascending_commit_files.len(), 6); - assert_eq!(result.ascending_commit_files[0].version, 0); - assert_eq!(result.ascending_commit_files[1].version, 1); - assert_eq!(result.ascending_commit_files[2].version, 2); - assert_eq!(result.ascending_commit_files[3].version, 1050); - assert_eq!(result.ascending_commit_files[4].version, 1051); - assert_eq!(result.ascending_commit_files[5].version, 1052); - assert_source(&result.ascending_commit_files[0], CommitSource::Filesystem); - assert_source(&result.ascending_commit_files[1], CommitSource::Filesystem); - assert_source(&result.ascending_commit_files[2], CommitSource::Filesystem); - assert_source(&result.ascending_commit_files[3], CommitSource::Catalog); - assert_source(&result.ascending_commit_files[4], CommitSource::Catalog); - assert_source(&result.ascending_commit_files[5], CommitSource::Catalog); - assert_eq!(result.latest_commit_file.unwrap().version, 1052); + // CRC at v6 is preserved even though v6 is covered by the log_tail + let crc = result.latest_crc_file.unwrap(); + assert_eq!(crc.version, 6); + assert!(matches!(crc.file_type, LogPathFileType::Crc)); + + assert_eq!(result.ascending_commit_files.len(), 3); + for (i, commit) in result.ascending_commit_files.iter().enumerate() { + assert_eq!(commit.version, (i + 6) as u64); + assert_source(commit, CommitSource::Catalog); + } + assert_eq!(result.latest_commit_file.unwrap().version, 8); } #[tokio::test] async fn backward_scan_log_tail_defines_latest_version() { - // The backward scan only lists up to end_version=1001; FS v1002 is above that bound - // and never seen, so max_published_version=Some(1001) not Some(1002). - // Window 1: [2, 1001] -> v1001(FS commit listed, but skipped since >= log_tail_start); - // no checkpoint -> continue - // Window 2: [0, 1] -> v0(FS commit); no checkpoint, lower=0 -> break - let log_files = vec![ - (0, LogPathFileType::Commit, CommitSource::Filesystem), - (1001, LogPathFileType::Commit, CommitSource::Filesystem), - (1002, LogPathFileType::Commit, CommitSource::Filesystem), - ]; + // FS: commits v0..=v5. log_tail: catalog commit v4. end_version=5. + // FS v4 and v5 are filtered by log_tail_start=4, so ascending_commit_files contains + // v0..=v3 (FS) + v4 (Catalog). max_published_version is Some(5): the highest FS + // commit seen within end_version, even though it was filtered from ascending_commit_files. + let log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=5) + .map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem)) + .collect(); let (storage, log_root) = create_storage(log_files).await; - let counter = CountingStorageHandler::new(storage); let log_tail = vec![make_parsed_log_path_with_source( - 1001, + 4, LogPathFileType::Commit, CommitSource::Catalog, )]; let result = LogSegmentFiles::list_with_backward_checkpoint_scan( - &counter, &log_root, log_tail, 1001, + storage.as_ref(), + &log_root, + log_tail, + 5, ) .unwrap(); - assert_eq!(counter.call_count(), 2); - - assert_eq!(result.ascending_commit_files.len(), 2); + assert_eq!(result.ascending_commit_files.len(), 5); assert_eq!(result.ascending_commit_files[0].version, 0); - assert_eq!(result.ascending_commit_files[1].version, 1001); assert_source(&result.ascending_commit_files[0], CommitSource::Filesystem); - assert_source(&result.ascending_commit_files[1], CommitSource::Catalog); - assert_eq!(result.latest_commit_file.unwrap().version, 1001); - // v1002 is above end_version and never listed, so max_pub=Some(1001) not Some(1002) - assert_eq!(result.max_published_version, Some(1001)); - } - - #[tokio::test] - async fn backward_scan_non_commit_files_at_log_tail_versions_are_preserved() { - // FS has commits v0-v5, checkpoint v7, CRC v8; log_tail v6-v10. - // Checkpoint and CRC are preserved even though their versions are covered by log_tail. - // Single window [0, 10]: checkpoint at v7 found -> stop. - let log_files = vec![ - (0, LogPathFileType::Commit, CommitSource::Filesystem), - (1, LogPathFileType::Commit, CommitSource::Filesystem), - (2, LogPathFileType::Commit, CommitSource::Filesystem), - (3, LogPathFileType::Commit, CommitSource::Filesystem), - (4, LogPathFileType::Commit, CommitSource::Filesystem), - (5, LogPathFileType::Commit, CommitSource::Filesystem), - ( - 7, - LogPathFileType::SinglePartCheckpoint, - CommitSource::Filesystem, - ), - (8, LogPathFileType::Crc, CommitSource::Filesystem), - ]; - let (storage, log_root) = create_storage(log_files).await; - let counter = CountingStorageHandler::new(storage); - - let log_tail = vec![ - make_parsed_log_path_with_source(6, LogPathFileType::Commit, CommitSource::Catalog), - make_parsed_log_path_with_source(7, LogPathFileType::Commit, CommitSource::Catalog), - make_parsed_log_path_with_source(8, LogPathFileType::Commit, CommitSource::Catalog), - make_parsed_log_path_with_source(9, LogPathFileType::Commit, CommitSource::Catalog), - make_parsed_log_path_with_source(10, LogPathFileType::Commit, CommitSource::Catalog), - ]; - - let result = - LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, log_tail, 10) - .unwrap(); - - assert_eq!(counter.call_count(), 1); - - // Checkpoint at version 7 is preserved from filesystem - assert_eq!(result.checkpoint_parts.len(), 1); - assert_eq!(result.checkpoint_parts[0].version, 7); - assert!(result.checkpoint_parts[0].is_checkpoint()); - - // CRC at version 8 is preserved from filesystem - let crc = result.latest_crc_file.unwrap(); - assert_eq!(crc.version, 8); - assert!(matches!(crc.file_type, LogPathFileType::Crc)); - - // After checkpoint at v7: resolved_start = 8 (checkpoint version + 1). - // Log_tail commits at v6-v7 are below resolved_start and filtered out; - // only log_tail commits v8-v10 remain (3 commits). - assert_eq!(result.ascending_commit_files.len(), 3); - for (i, commit) in result.ascending_commit_files.iter().enumerate() { - assert_eq!(commit.version, (i + 8) as u64); - assert_source(commit, CommitSource::Catalog); - } - assert_eq!(result.latest_commit_file.unwrap().version, 10); - // max_published_version reflects filesystem commits 0-5 + log_tail commits 6-10 - assert_eq!(result.max_published_version, Some(10)); - } - - #[tokio::test] - async fn backward_scan_window_stops_at_zero_single_call() { - // end_version=500: lower = 500.saturating_sub(999) = 0, so the loop breaks immediately - // after the first window. Proves underflow safety via saturating_sub. - let log_files = vec![ - (0, LogPathFileType::Commit, CommitSource::Filesystem), - (500, LogPathFileType::Commit, CommitSource::Filesystem), - ]; - let (storage, log_root) = create_storage(log_files).await; - let counter = CountingStorageHandler::new(storage); - - LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, vec![], 500) - .unwrap(); - - assert_eq!(counter.call_count(), 1); - } - - #[tokio::test] - async fn backward_scan_window_boundary_999_single_call() { - // end_version=999: lower = 999 - 999 = 0, so exactly one 1000-version window covers - // the entire log and the loop exits after one call. - let log_files = vec![ - (0, LogPathFileType::Commit, CommitSource::Filesystem), - (999, LogPathFileType::Commit, CommitSource::Filesystem), - ]; - let (storage, log_root) = create_storage(log_files).await; - let counter = CountingStorageHandler::new(storage); - - LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, vec![], 999) - .unwrap(); - - assert_eq!(counter.call_count(), 1); - } - - #[tokio::test] - async fn backward_scan_window_boundary_1000_two_calls() { - // end_version=1000: first window lower = 1000 - 999 = 1 (not 0), so the loop - // continues. Second window: upper = 0, lower = 0.saturating_sub(999) = 0, breaks. - // This is the boundary where a second call becomes necessary. - let log_files = vec![ - (0, LogPathFileType::Commit, CommitSource::Filesystem), - (1000, LogPathFileType::Commit, CommitSource::Filesystem), - ]; - let (storage, log_root) = create_storage(log_files).await; - let counter = CountingStorageHandler::new(storage); - - LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, vec![], 1000) - .unwrap(); - - assert_eq!(counter.call_count(), 2); + assert_eq!(result.ascending_commit_files[1].version, 1); + assert_source(&result.ascending_commit_files[1], CommitSource::Filesystem); + assert_eq!(result.ascending_commit_files[2].version, 2); + assert_source(&result.ascending_commit_files[2], CommitSource::Filesystem); + assert_eq!(result.ascending_commit_files[3].version, 3); + assert_source(&result.ascending_commit_files[3], CommitSource::Filesystem); + assert_eq!(result.ascending_commit_files[4].version, 4); + assert_source(&result.ascending_commit_files[4], CommitSource::Catalog); + assert_eq!(result.latest_commit_file.unwrap().version, 4); + assert_eq!(result.max_published_version, Some(5)); } // ===== has_complete_checkpoint_in direct unit tests (other cases for has_complete_checkpoint_in already covered by tests above) ===== - #[test] - fn has_complete_checkpoint_empty_slice_returns_false() { - assert!(!has_complete_checkpoint_in(&[])); + fn zero_size_checkpoint_files() -> Vec { + // Commits v0..=5 plus a zero-size checkpoint at v3. make_parsed_log_path_with_source + // always sets a non-zero size; override here to simulate a corrupt/empty checkpoint + // file and exercise the size > 0 guard. + let mut files: Vec = (0..=5) + .map(|v| { + make_parsed_log_path_with_source( + v, + LogPathFileType::Commit, + CommitSource::Filesystem, + ) + }) + .collect(); + let mut cp = make_parsed_log_path_with_source( + 3, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + ); + cp.location.size = 0; + files.push(cp); + files } - #[test] - fn has_complete_checkpoint_zero_size_filtered_returns_false() { - // The size > 0 guard must reject zero-size checkpoints to avoid treating empty/corrupt - // files as complete checkpoints. - let mut p = make_parsed_log_path_with_source( + fn incomplete_then_complete_files() -> Vec { + // Commits v0..=10, an incomplete checkpoint at v5 (1 of 3 parts), and a complete + // checkpoint at v10. has_complete_checkpoint_in must continue past the failed group + // and find the complete one. + let mut files: Vec = (0..=10) + .map(|v| { + make_parsed_log_path_with_source( + v, + LogPathFileType::Commit, + CommitSource::Filesystem, + ) + }) + .collect(); + files.push(make_parsed_log_path_with_source( 5, + LogPathFileType::MultiPartCheckpoint { + part_num: 1, + num_parts: 3, + }, + CommitSource::Filesystem, + )); + files.push(make_parsed_log_path_with_source( + 10, LogPathFileType::SinglePartCheckpoint, CommitSource::Filesystem, - ); - // make_parsed_log_path_with_source always sets a non-zero size; override here to - // simulate a corrupt/empty checkpoint file and exercise the size > 0 guard. - p.location.size = 0; - assert!(!has_complete_checkpoint_in(&[p])); + )); + files } - #[test] - fn has_complete_checkpoint_incomplete_at_one_version_complete_at_another_returns_true() { - // An incomplete checkpoint at v5 (1 of 3 parts) followed by a complete checkpoint at v10. - // has_complete_checkpoint_in must continue past the failed group and find the complete one. - let files = vec![ - make_parsed_log_path_with_source( - 5, - LogPathFileType::MultiPartCheckpoint { - part_num: 1, - num_parts: 3, - }, - CommitSource::Filesystem, - ), - make_parsed_log_path_with_source( - 10, - LogPathFileType::SinglePartCheckpoint, - CommitSource::Filesystem, - ), - ]; - assert!(has_complete_checkpoint_in(&files)); + #[rstest] + // Commits v0..=5, no checkpoint files + #[case::no_checkpoint( + (0u64..=5).map(|v| make_parsed_log_path_with_source(v, LogPathFileType::Commit, CommitSource::Filesystem)).collect(), + false + )] + // Commits v0..=5 plus a zero-size (corrupt) checkpoint at v3 + #[case::zero_size_checkpoint(zero_size_checkpoint_files(), false)] + // Commits v0..=10, incomplete checkpoint at v5, complete checkpoint at v10 + #[case::incomplete_then_complete(incomplete_then_complete_files(), true)] + fn has_complete_checkpoint_in_cases(#[case] files: Vec, #[case] expected: bool) { + assert_eq!(has_complete_checkpoint_in(&files), expected); } } From 73439495e9034533de757da806d9edc0fc901ded Mon Sep 17 00:00:00 2001 From: Lorena Rosati Date: Tue, 24 Mar 2026 00:31:34 +0000 Subject: [PATCH 04/18] commenting --- kernel/src/log_segment_files.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/kernel/src/log_segment_files.rs b/kernel/src/log_segment_files.rs index 1908278cbe..b1f8031190 100644 --- a/kernel/src/log_segment_files.rs +++ b/kernel/src/log_segment_files.rs @@ -1258,8 +1258,8 @@ mod list_log_files_with_log_tail_tests { #[tokio::test] async fn backward_scan_with_log_tail_derives_lower_bound_from_checkpoint() { // FS: commits v0..=v7 + checkpoint at v5. log_tail: catalog commits v8..=v10. - // After finding the checkpoint at v5, the lower bound is derived as v6, so FS commits - // v6 and v7 plus all catalog entries v8..=v10 are included. + // The checkpoint at v5 sets the lower bound to v6, so FS commits v6 and v7 plus all + // catalog entries v8..=v10 are included. let mut log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=7) .map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem)) .collect(); @@ -1305,8 +1305,9 @@ mod list_log_files_with_log_tail_tests { #[tokio::test] async fn backward_scan_with_log_tail_starting_before_checkpoint() { // FS: commits v0..=v5 + checkpoint at v5 + CRC at v6. log_tail: catalog commits v3..=v8, - // starting before the checkpoint. The derived lower bound is v6, so log_tail v3..=v5 are - // excluded. Also verifies that the CRC at v6 is preserved even though v6 is a log_tail version. + // starting before the checkpoint. The checkpoint at v5 sets the lower bound to v6, so + // log_tail v3..=v5 are excluded. The CRC at v6 is preserved even though v6 is within + // the log_tail range. let mut log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=5) .map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem)) .collect(); @@ -1332,11 +1333,10 @@ mod list_log_files_with_log_tail_tests { ) .unwrap(); - // Checkpoint at v5; lower bound = 6, so log_tail v3..=v5 are excluded assert_eq!(result.checkpoint_parts.len(), 1); assert_eq!(result.checkpoint_parts[0].version, 5); - // CRC at v6 is preserved even though v6 is covered by the log_tail + // CRC at v6 is preserved even though v6 is within the log_tail range let crc = result.latest_crc_file.unwrap(); assert_eq!(crc.version, 6); assert!(matches!(crc.file_type, LogPathFileType::Crc)); @@ -1352,9 +1352,9 @@ mod list_log_files_with_log_tail_tests { #[tokio::test] async fn backward_scan_log_tail_defines_latest_version() { // FS: commits v0..=v5. log_tail: catalog commit v4. end_version=5. - // FS v4 and v5 are filtered by log_tail_start=4, so ascending_commit_files contains - // v0..=v3 (FS) + v4 (Catalog). max_published_version is Some(5): the highest FS - // commit seen within end_version, even though it was filtered from ascending_commit_files. + // FS v4 and v5 are filtered since log_tail_start=4. max_published_version is Some(5), + // the highest FS commit seen within end_version, even though v5 is not in + // ascending_commit_files. let log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=5) .map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem)) .collect(); From 24935fdc59b325bef8ed1427b45fd0bba41e7e13 Mon Sep 17 00:00:00 2001 From: Lorena Rosati Date: Tue, 24 Mar 2026 17:27:43 +0000 Subject: [PATCH 05/18] fixed build error --- kernel/src/log_segment_files.rs | 50 +++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/kernel/src/log_segment_files.rs b/kernel/src/log_segment_files.rs index b1f8031190..85f36859a7 100644 --- a/kernel/src/log_segment_files.rs +++ b/kernel/src/log_segment_files.rs @@ -1094,15 +1094,12 @@ mod list_log_files_with_log_tail_tests { .map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem)) .collect(); - // Add a checkpoint only if it falls within the log range if let Some(cp) = checkpoint_version { - if cp <= 1005 { - log_files.push(( - cp, - LogPathFileType::SinglePartCheckpoint, - CommitSource::Filesystem, - )); - } + log_files.push(( + cp, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + )); } let (storage, log_root) = create_storage(log_files).await; @@ -1198,31 +1195,42 @@ mod list_log_files_with_log_tail_tests { log_files } + struct BackwardScanExpected { + listings: u32, + checkpoint_parts: usize, + checkpoint_version: Version, + commit_count: usize, + first_commit: Version, + last_commit: Version, + } + // Case 1: complete 3-part checkpoint at v50, single window needed // Case 2: incomplete 1-of-2 part at v1500 in window 2, complete checkpoint at v500 in window 3 #[rstest] - #[case::multipart_checkpoint(multipart_checkpoint_files(), 52, 1, 3, 50, 2, 51, 52)] + #[case::multipart_checkpoint( + multipart_checkpoint_files(), + 52, + BackwardScanExpected { listings: 1, checkpoint_parts: 3, checkpoint_version: 50, commit_count: 2, first_commit: 51, last_commit: 52 } + )] #[case::incomplete_in_second_window_complete_in_third( files_incomplete_in_second_window_complete_in_third_window(), 3000, - 3, - 1, - 500, - 2500, - 501, - 3000 + BackwardScanExpected { listings: 3, checkpoint_parts: 1, checkpoint_version: 500, commit_count: 2500, first_commit: 501, last_commit: 3000 } )] #[tokio::test] async fn backward_scan_multipart_checkpoint_cases( #[case] log_files: Vec<(Version, LogPathFileType, CommitSource)>, #[case] end_version: Version, - #[case] expected_listings: u32, - #[case] expected_checkpoint_parts: usize, - #[case] expected_checkpoint_version: Version, - #[case] expected_commit_count: usize, - #[case] expected_first_commit: Version, - #[case] expected_last_commit: Version, + #[case] expected: BackwardScanExpected, ) { + let BackwardScanExpected { + listings: expected_listings, + checkpoint_parts: expected_checkpoint_parts, + checkpoint_version: expected_checkpoint_version, + commit_count: expected_commit_count, + first_commit: expected_first_commit, + last_commit: expected_last_commit, + } = expected; let (storage, log_root) = create_storage(log_files).await; let counter = CountingStorageHandler::new(storage); From 544d0056457e60922b456d0ac251fb623225d357 Mon Sep 17 00:00:00 2001 From: Lorena Rosati Date: Tue, 24 Mar 2026 17:36:03 +0000 Subject: [PATCH 06/18] fixes --- kernel/src/log_segment_files.rs | 46 ++++++++++++++------------------- 1 file changed, 20 insertions(+), 26 deletions(-) diff --git a/kernel/src/log_segment_files.rs b/kernel/src/log_segment_files.rs index 85f36859a7..227c4dfd0f 100644 --- a/kernel/src/log_segment_files.rs +++ b/kernel/src/log_segment_files.rs @@ -1071,22 +1071,21 @@ mod list_log_files_with_log_tail_tests { // (2 listings). A checkpoint beyond end_version is never seen. #[rstest] // No checkpoint: scan exhausts both windows, all 1006 commits returned - #[case::no_checkpoint(None, 1005, (0u64..=1005).collect(), None, 2)] + #[case::no_checkpoint(None, 0..=1005, None, 2)] // Checkpoint beyond end_version is never seen; same behavior as no checkpoint - #[case::checkpoint_beyond_end(Some(1006), 1005, (0u64..=1005).collect(), None, 2)] + #[case::checkpoint_beyond_end(Some(1006), 0..=1005, None, 2)] // Checkpoint at end_version: found in window 1, no commits after it - #[case::checkpoint_at_end(Some(1005), 1005, vec![], Some(1005), 1)] + #[case::checkpoint_at_end(Some(1005), 0..0, Some(1005), 1)] // Checkpoint at v5: falls in window 2 -> 2 listings; commits 6..=1005 returned. // Tests the inclusive window boundary: window 1 covers [6, 1006) or [6, 1005] (lower = 1006 - 1000 = 6), // so v5 falls just outside it and requires a second listing, while v6 (next case) does not. - #[case::checkpoint_in_second_window(Some(5), 1005, (6u64..=1005).collect(), Some(5), 2)] + #[case::checkpoint_in_second_window(Some(5), 6..=1005, Some(5), 2)] // Checkpoint at v6: falls in window 1 -> 1 listing; commits 7..=1005 returned - #[case::checkpoint_in_first_window(Some(6), 1005, (7u64..=1005).collect(), Some(6), 1)] + #[case::checkpoint_in_first_window(Some(6), 7..=1005, Some(6), 1)] #[tokio::test] async fn backward_scan_single_checkpoint_cases( #[case] checkpoint_version: Option, - #[case] end_version: u64, - #[case] expected_commits: Vec, + #[case] expected_commits: impl Iterator, #[case] expected_checkpoint: Option, #[case] expected_listings: u32, ) { @@ -1105,13 +1104,9 @@ mod list_log_files_with_log_tail_tests { let (storage, log_root) = create_storage(log_files).await; let counter = CountingStorageHandler::new(storage); - let result = LogSegmentFiles::list_with_backward_checkpoint_scan( - &counter, - &log_root, - vec![], - end_version, - ) - .unwrap(); + let result = + LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, vec![], 1005) + .unwrap(); assert_eq!(counter.call_count(), expected_listings); @@ -1123,25 +1118,24 @@ mod list_log_files_with_log_tail_tests { assert_eq!(result.checkpoint_parts[0].version, cp_version); } - let actual_commits: Vec = result + assert!(result .ascending_commit_files .iter() .map(|f| f.version) - .collect(); - assert_eq!(actual_commits, expected_commits); + .eq(expected_commits)); } + /// end_version=3000. Window 2 contains an incomplete 2-of-2 multipart checkpoint (only + /// part 1 present). has_complete_checkpoint_in must return false for window 2, causing + /// the scan to continue to window 3, where a complete single-part checkpoint at v500 is + /// found. Verifies that incomplete parts from window 2 are discarded and do not pollute + /// the result's checkpoint_parts. + /// + /// Window 1 [2001, 3001): commits v2001..=v3000, no checkpoint -> continue + /// Window 2 [1001, 2001): commits v1001..=v2000, v1500 (1-of-2 parts) incomplete -> continue + /// Window 3 [1, 1001): commits v1..=v1000, v500 (complete) -> checkpoint found -> break fn files_incomplete_in_second_window_complete_in_third_window( ) -> Vec<(Version, LogPathFileType, CommitSource)> { - // end_version=3000. Window 2 contains an incomplete 2-of-2 multipart checkpoint (only - // part 1 present). has_complete_checkpoint_in must return false for window 2, causing - // the scan to continue to window 3, where a complete single-part checkpoint at v500 is - // found. Verifies that incomplete parts from window 2 are discarded and do not pollute - // the result's checkpoint_parts. - // - // Window 1 [2001, 3001): commits v2001..=v3000, no checkpoint -> continue - // Window 2 [1001, 2001): commits v1001..=v2000, v1500 (1-of-2 parts) incomplete -> continue - // Window 3 [1, 1001): commits v1..=v1000, v500 (complete) -> checkpoint found -> break let mut log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=3000) .map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem)) .collect(); From 08f555b47ff98d5548f19a207770859a49da8987 Mon Sep 17 00:00:00 2001 From: Lorena Rosati Date: Tue, 24 Mar 2026 17:50:34 +0000 Subject: [PATCH 07/18] small restructuring of tests --- kernel/src/log_segment_files.rs | 46 +++++++++++++++++---------------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/kernel/src/log_segment_files.rs b/kernel/src/log_segment_files.rs index 227c4dfd0f..011cd4a32e 100644 --- a/kernel/src/log_segment_files.rs +++ b/kernel/src/log_segment_files.rs @@ -1290,17 +1290,18 @@ mod list_log_files_with_log_tail_tests { assert_eq!(result.checkpoint_parts[0].version, 5); // FS commits v6, v7 after the checkpoint; catalog commits v8..=v10 - assert_eq!(result.ascending_commit_files.len(), 5); - assert_eq!(result.ascending_commit_files[0].version, 6); - assert_source(&result.ascending_commit_files[0], CommitSource::Filesystem); - assert_eq!(result.ascending_commit_files[1].version, 7); - assert_source(&result.ascending_commit_files[1], CommitSource::Filesystem); - assert_eq!(result.ascending_commit_files[2].version, 8); - assert_source(&result.ascending_commit_files[2], CommitSource::Catalog); - assert_eq!(result.ascending_commit_files[3].version, 9); - assert_source(&result.ascending_commit_files[3], CommitSource::Catalog); - assert_eq!(result.ascending_commit_files[4].version, 10); - assert_source(&result.ascending_commit_files[4], CommitSource::Catalog); + let expected = [ + (6, CommitSource::Filesystem), + (7, CommitSource::Filesystem), + (8, CommitSource::Catalog), + (9, CommitSource::Catalog), + (10, CommitSource::Catalog), + ]; + assert_eq!(result.ascending_commit_files.len(), expected.len()); + for (file, (version, source)) in result.ascending_commit_files.iter().zip(expected) { + assert_eq!(file.version, version); + assert_source(file, source); + } assert_eq!(result.latest_commit_file.unwrap().version, 10); } @@ -1376,17 +1377,18 @@ mod list_log_files_with_log_tail_tests { ) .unwrap(); - assert_eq!(result.ascending_commit_files.len(), 5); - assert_eq!(result.ascending_commit_files[0].version, 0); - assert_source(&result.ascending_commit_files[0], CommitSource::Filesystem); - assert_eq!(result.ascending_commit_files[1].version, 1); - assert_source(&result.ascending_commit_files[1], CommitSource::Filesystem); - assert_eq!(result.ascending_commit_files[2].version, 2); - assert_source(&result.ascending_commit_files[2], CommitSource::Filesystem); - assert_eq!(result.ascending_commit_files[3].version, 3); - assert_source(&result.ascending_commit_files[3], CommitSource::Filesystem); - assert_eq!(result.ascending_commit_files[4].version, 4); - assert_source(&result.ascending_commit_files[4], CommitSource::Catalog); + let expected = [ + (0, CommitSource::Filesystem), + (1, CommitSource::Filesystem), + (2, CommitSource::Filesystem), + (3, CommitSource::Filesystem), + (4, CommitSource::Catalog), + ]; + assert_eq!(result.ascending_commit_files.len(), expected.len()); + for (file, (version, source)) in result.ascending_commit_files.iter().zip(expected) { + assert_eq!(file.version, version); + assert_source(file, source); + } assert_eq!(result.latest_commit_file.unwrap().version, 4); assert_eq!(result.max_published_version, Some(5)); } From 2f97d5122cf7326775801a26dfbde690dd71409e Mon Sep 17 00:00:00 2001 From: Lorena Rosati Date: Tue, 24 Mar 2026 18:27:34 +0000 Subject: [PATCH 08/18] find version of last checkpoint once --- kernel/src/log_segment_files.rs | 110 ++++++++++++++++++-------------- 1 file changed, 61 insertions(+), 49 deletions(-) diff --git a/kernel/src/log_segment_files.rs b/kernel/src/log_segment_files.rs index 011cd4a32e..d3120c3181 100644 --- a/kernel/src/log_segment_files.rs +++ b/kernel/src/log_segment_files.rs @@ -125,21 +125,24 @@ fn group_checkpoint_parts(parts: Vec) -> HashMap bool { +fn find_complete_checkpoint_version(files: &[ParsedLogPath]) -> Option { files .iter() .filter(|f| f.is_checkpoint() && f.location.size > 0) .chunk_by(|f| f.version) .into_iter() - .any(|(_, parts)| { + .filter_map(|(version, parts)| { let owned: Vec = parts.cloned().collect(); group_checkpoint_parts(owned) .iter() .any(|(num_parts, part_files)| part_files.len() == *num_parts as usize) + .then_some(version) }) + .last() } /// Accumulates and groups log files during listing. Each "group" consists of all files that @@ -251,17 +254,16 @@ const BACKWARD_SCAN_WINDOW_SIZE: u64 = 1000; impl LogSegmentFiles { /// Assembles a `LogSegmentFiles` from `fs_files` (an iterator of files - /// listed from storage) and `log_tail` (catalog-provided commits) + /// listed from storage) and `log_tail` (catalog-provided commits). /// - /// `start_version` controls how the log_tail is filtered: - /// - `Some(v)`: include log_tail entries at version >= v - /// - `None`: derive the lower bound from the most recent complete checkpoint found during - /// the filesystem phase; entries at version > cp_version are included (checkpoint wins over - /// commits AT the checkpoint version). Falls back to 0 if no checkpoint was found. + /// `start_version` is the inclusive lower bound for log replay: log_tail entries at versions + /// below `start_version` are excluded. Callers are responsible for computing this value — + /// typically `0` when there is no checkpoint, or `checkpoint_version + 1` when there is one + /// (a checkpoint supersedes any commit at the same version). pub(crate) fn build_log_segment_files( fs_files: impl Iterator>, log_tail: Vec, - start_version: Option, + start_version: Version, end_version: Option, ) -> DeltaResult { // check log_tail is only commits @@ -303,31 +305,9 @@ impl LogSegmentFiles { acc.process_file(file); } - // Phase 2: resolve log_tail start version and end version upper bound - let resolved_start = match start_version { - Some(v) => v, - None => { - // Flush the last pending group so that output.checkpoint_parts is populated - // before we inspect it. Without this flush, a checkpoint whose parts arrived - // last would remain in pending_checkpoint_parts and the derived start version - // would fall back to 0 instead of the checkpoint version. - if let Some(gv) = acc.group_version { - acc.flush_checkpoint_group(gv); - } - // Use cp_version + 1: a checkpoint wins over a commit at the checkpoint - // version and must not be replayed on top of it. The checkpoint has already - // been flushed from pending_checkpoint_parts into output.checkpoint_parts, so - // there are no pending parts in Phase 3 that would naturally discard it. - acc.output - .checkpoint_parts - .first() - .map(|p| p.version + 1) - .unwrap_or(0) - } - }; let end = end_version.unwrap_or(Version::MAX); - // Phase 3: Process log_tail entries. We do this after Phase 1 because log_tail commits + // Phase 2: Process log_tail entries. We do this after Phase 1 because log_tail commits // start at log_tail_start_version and are in ascending version order — they always extend // (or overlap with, but supersede) the filesystem-listed commits. Processing them after // Phase 1 maintains ascending version order throughout, which is required by the checkpoint @@ -335,7 +315,7 @@ impl LogSegmentFiles { // versions, so there's no duplication here. let filtered_log_tail = log_tail .into_iter() - .filter(|entry| entry.version >= resolved_start && entry.version <= end); + .filter(|entry| entry.version >= start_version && entry.version <= end); for file in filtered_log_tail { // Track max published version for published commits from the log_tail if matches!(file.file_type, LogPathFileType::Commit) { @@ -436,7 +416,7 @@ impl LogSegmentFiles { let start = start_version.unwrap_or(0); let end = end_version.unwrap_or(Version::MAX); let fs_iter = list_from_storage(storage, log_root, start, end)?; - Self::build_log_segment_files(fs_iter, log_tail, Some(start), end_version) + Self::build_log_segment_files(fs_iter, log_tail, start, end_version) } /// List all commit and checkpoint files after the provided checkpoint. It is guaranteed that all @@ -503,6 +483,7 @@ impl LogSegmentFiles { // Scan backward in 1000-version windows, collecting ALL file types, until a complete // checkpoint is found or the log is exhausted. let mut windows: Vec> = Vec::new(); + let mut found_checkpoint_version: Option = None; // upper is the exclusive upper bound of the next window; adding 1 includes end_version // in the first window. The inclusive range passed to list_from_storage is [lower, upper - 1]. let mut upper = end_version + 1; @@ -511,20 +492,21 @@ impl LogSegmentFiles { let window_files: Vec<_> = list_from_storage(storage, log_root, lower, upper - 1)?.try_collect()?; - let checkpoint_found = has_complete_checkpoint_in(&window_files); + found_checkpoint_version = find_complete_checkpoint_version(&window_files); windows.push(window_files); - if checkpoint_found { + if found_checkpoint_version.is_some() { break; } upper = lower; } let fs_iter = windows.into_iter().rev().flatten().map(Ok); - - // Pass None so the log_tail lower bound is derived from the checkpoint found during - // the fs phase. - Self::build_log_segment_files(fs_iter, log_tail, None, Some(end_version)) + // Log replay starts at cp_version + 1: a checkpoint supersedes any commit at the same + // version, so commits at or before the checkpoint are not needed. Falls back to 0 if no + // checkpoint was found. + let start_version = found_checkpoint_version.map(|v| v + 1).unwrap_or(0); + Self::build_log_segment_files(fs_iter, log_tail, start_version, Some(end_version)) } } @@ -1126,7 +1108,7 @@ mod list_log_files_with_log_tail_tests { } /// end_version=3000. Window 2 contains an incomplete 2-of-2 multipart checkpoint (only - /// part 1 present). has_complete_checkpoint_in must return false for window 2, causing + /// part 1 present). find_complete_checkpoint_version must return None for window 2, causing /// the scan to continue to window 3, where a complete single-part checkpoint at v500 is /// found. Verifies that incomplete parts from window 2 are discarded and do not pollute /// the result's checkpoint_parts. @@ -1393,7 +1375,7 @@ mod list_log_files_with_log_tail_tests { assert_eq!(result.max_published_version, Some(5)); } - // ===== has_complete_checkpoint_in direct unit tests (other cases for has_complete_checkpoint_in already covered by tests above) ===== + // ===== find_complete_checkpoint_version direct unit tests (other cases already covered by tests above) ===== fn zero_size_checkpoint_files() -> Vec { // Commits v0..=5 plus a zero-size checkpoint at v3. make_parsed_log_path_with_source @@ -1420,7 +1402,7 @@ mod list_log_files_with_log_tail_tests { fn incomplete_then_complete_files() -> Vec { // Commits v0..=10, an incomplete checkpoint at v5 (1 of 3 parts), and a complete - // checkpoint at v10. has_complete_checkpoint_in must continue past the failed group + // checkpoint at v10. find_complete_checkpoint_version must continue past the failed group // and find the complete one. let mut files: Vec = (0..=10) .map(|v| { @@ -1447,17 +1429,47 @@ mod list_log_files_with_log_tail_tests { files } + fn two_complete_checkpoints_files() -> Vec { + // Commits v0..=10, complete checkpoint at v5 and complete checkpoint at v10. + // The function must return the latest (v10), not the first (v5). + let mut files: Vec = (0..=10) + .map(|v| { + make_parsed_log_path_with_source( + v, + LogPathFileType::Commit, + CommitSource::Filesystem, + ) + }) + .collect(); + files.push(make_parsed_log_path_with_source( + 5, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + )); + files.push(make_parsed_log_path_with_source( + 10, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + )); + files + } + #[rstest] // Commits v0..=5, no checkpoint files #[case::no_checkpoint( (0u64..=5).map(|v| make_parsed_log_path_with_source(v, LogPathFileType::Commit, CommitSource::Filesystem)).collect(), - false + None )] // Commits v0..=5 plus a zero-size (corrupt) checkpoint at v3 - #[case::zero_size_checkpoint(zero_size_checkpoint_files(), false)] + #[case::zero_size_checkpoint(zero_size_checkpoint_files(), None)] // Commits v0..=10, incomplete checkpoint at v5, complete checkpoint at v10 - #[case::incomplete_then_complete(incomplete_then_complete_files(), true)] - fn has_complete_checkpoint_in_cases(#[case] files: Vec, #[case] expected: bool) { - assert_eq!(has_complete_checkpoint_in(&files), expected); + #[case::incomplete_then_complete(incomplete_then_complete_files(), Some(10))] + // Commits v0..=10, complete checkpoint at v5 and v10: must return v10 (latest) + #[case::two_complete(two_complete_checkpoints_files(), Some(10))] + fn find_complete_checkpoint_version_cases( + #[case] files: Vec, + #[case] expected: Option, + ) { + assert_eq!(find_complete_checkpoint_version(&files), expected); } } From 154f933b2e7d8597d5693974ebc3c20ec86743b4 Mon Sep 17 00:00:00 2001 From: Lorena Rosati Date: Tue, 24 Mar 2026 18:30:23 +0000 Subject: [PATCH 09/18] fix --- kernel/src/log_segment_files.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/kernel/src/log_segment_files.rs b/kernel/src/log_segment_files.rs index d3120c3181..a6f6d69cab 100644 --- a/kernel/src/log_segment_files.rs +++ b/kernel/src/log_segment_files.rs @@ -255,11 +255,6 @@ const BACKWARD_SCAN_WINDOW_SIZE: u64 = 1000; impl LogSegmentFiles { /// Assembles a `LogSegmentFiles` from `fs_files` (an iterator of files /// listed from storage) and `log_tail` (catalog-provided commits). - /// - /// `start_version` is the inclusive lower bound for log replay: log_tail entries at versions - /// below `start_version` are excluded. Callers are responsible for computing this value — - /// typically `0` when there is no checkpoint, or `checkpoint_version + 1` when there is one - /// (a checkpoint supersedes any commit at the same version). pub(crate) fn build_log_segment_files( fs_files: impl Iterator>, log_tail: Vec, @@ -274,6 +269,8 @@ impl LogSegmentFiles { ); let log_tail_start_version = log_tail.first().map(|f| f.version); + let end = end_version.unwrap_or(Version::MAX); + let mut acc = ListingAccumulator { end_version, ..Default::default() @@ -305,8 +302,6 @@ impl LogSegmentFiles { acc.process_file(file); } - let end = end_version.unwrap_or(Version::MAX); - // Phase 2: Process log_tail entries. We do this after Phase 1 because log_tail commits // start at log_tail_start_version and are in ascending version order — they always extend // (or overlap with, but supersede) the filesystem-listed commits. Processing them after From b43fc6a8169f26e1be75b768c71d894fc69a3029 Mon Sep 17 00:00:00 2001 From: Lorena Rosati Date: Wed, 25 Mar 2026 16:39:33 +0000 Subject: [PATCH 10/18] comment from child PR, closed child PR --- kernel/src/log_segment_files.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/src/log_segment_files.rs b/kernel/src/log_segment_files.rs index a6f6d69cab..556f127eb0 100644 --- a/kernel/src/log_segment_files.rs +++ b/kernel/src/log_segment_files.rs @@ -432,7 +432,7 @@ impl LogSegmentFiles { )?; let Some(latest_checkpoint) = listed_files.checkpoint_parts.last() else { - // TODO: We could potentially recover here + // Kernel should not compensate for corrupt tables, so we fail if we can't find a checkpoint return Err(Error::invalid_checkpoint( "Had a _last_checkpoint hint but didn't find any checkpoints", )); From 63d74f28bfb14221b67fea647da3492e690dee05 Mon Sep 17 00:00:00 2001 From: Lorena Rosati Date: Wed, 25 Mar 2026 16:42:15 +0000 Subject: [PATCH 11/18] comment from child PR, closed child PR From 675f4e8d6c78294e89820e22c863e91df93f146e Mon Sep 17 00:00:00 2001 From: Lorena Rosati Date: Fri, 27 Mar 2026 00:49:17 +0000 Subject: [PATCH 12/18] clarity on start bound for calling build log segment files --- kernel/src/log_segment_files.rs | 35 +++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/kernel/src/log_segment_files.rs b/kernel/src/log_segment_files.rs index 556f127eb0..c8a0afc7eb 100644 --- a/kernel/src/log_segment_files.rs +++ b/kernel/src/log_segment_files.rs @@ -255,10 +255,16 @@ const BACKWARD_SCAN_WINDOW_SIZE: u64 = 1000; impl LogSegmentFiles { /// Assembles a `LogSegmentFiles` from `fs_files` (an iterator of files /// listed from storage) and `log_tail` (catalog-provided commits). + /// + /// - `fs_files`: files listed from storage in ascending version order + /// - `log_tail`: catalog-provided commits + /// - `log_tail_min_version`: lower bound (inclusive) for log_tail entries included in the + /// result + /// - `end_version`: upper bound (inclusive) on versions to include, `None` means no bound pub(crate) fn build_log_segment_files( fs_files: impl Iterator>, log_tail: Vec, - start_version: Version, + log_tail_min_version: Version, end_version: Option, ) -> DeltaResult { // check log_tail is only commits @@ -270,7 +276,7 @@ impl LogSegmentFiles { let log_tail_start_version = log_tail.first().map(|f| f.version); let end = end_version.unwrap_or(Version::MAX); - + let mut acc = ListingAccumulator { end_version, ..Default::default() @@ -308,9 +314,12 @@ impl LogSegmentFiles { // Phase 1 maintains ascending version order throughout, which is required by the checkpoint // grouping logic. Note that Phase 1 already skipped filesystem commits at log_tail // versions, so there's no duplication here. + // + // log_tail entries at versions before a checkpoint may still be included + // here - LogSegment::try_new is the safeguard that filters those out unconditionally let filtered_log_tail = log_tail .into_iter() - .filter(|entry| entry.version >= start_version && entry.version <= end); + .filter(|entry| entry.version >= log_tail_min_version && entry.version <= end); for file in filtered_log_tail { // Track max published version for published commits from the log_tail if matches!(file.file_type, LogPathFileType::Commit) { @@ -411,6 +420,7 @@ impl LogSegmentFiles { let start = start_version.unwrap_or(0); let end = end_version.unwrap_or(Version::MAX); let fs_iter = list_from_storage(storage, log_root, start, end)?; + // We pass start as the log_tail_min_version to exclude log_tail entries before the listing start Self::build_log_segment_files(fs_iter, log_tail, start, end_version) } @@ -497,11 +507,9 @@ impl LogSegmentFiles { } let fs_iter = windows.into_iter().rev().flatten().map(Ok); - // Log replay starts at cp_version + 1: a checkpoint supersedes any commit at the same - // version, so commits at or before the checkpoint are not needed. Falls back to 0 if no - // checkpoint was found. - let start_version = found_checkpoint_version.map(|v| v + 1).unwrap_or(0); - Self::build_log_segment_files(fs_iter, log_tail, start_version, Some(end_version)) + // We pass the checkpoint version as the log_tail_min_version to exclude log_tail entries before the checkpoint + let log_tail_min_version = found_checkpoint_version.unwrap_or(0); + Self::build_log_segment_files(fs_iter, log_tail, log_tail_min_version, Some(end_version)) } } @@ -1285,9 +1293,9 @@ mod list_log_files_with_log_tail_tests { #[tokio::test] async fn backward_scan_with_log_tail_starting_before_checkpoint() { // FS: commits v0..=v5 + checkpoint at v5 + CRC at v6. log_tail: catalog commits v3..=v8, - // starting before the checkpoint. The checkpoint at v5 sets the lower bound to v6, so - // log_tail v3..=v5 are excluded. The CRC at v6 is preserved even though v6 is within - // the log_tail range. + // starting before the checkpoint. The checkpoint at v5 sets the lower bound to v5, so + // log_tail v3..=v4 are excluded. The log_tail commit at v5 passes through (it is at the + // checkpoint version). The CRC at v6 is preserved even though v6 is within the log_tail range. let mut log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=5) .map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem)) .collect(); @@ -1321,9 +1329,10 @@ mod list_log_files_with_log_tail_tests { assert_eq!(crc.version, 6); assert!(matches!(crc.file_type, LogPathFileType::Crc)); - assert_eq!(result.ascending_commit_files.len(), 3); + // v5 passes the log_tail_min_version filter (>= 5) and is included here + assert_eq!(result.ascending_commit_files.len(), 4); for (i, commit) in result.ascending_commit_files.iter().enumerate() { - assert_eq!(commit.version, (i + 6) as u64); + assert_eq!(commit.version, (i + 5) as u64); assert_source(commit, CommitSource::Catalog); } assert_eq!(result.latest_commit_file.unwrap().version, 8); From 5614ad4d90cde0233ee65bc7d159736c9b57390d Mon Sep 17 00:00:00 2001 From: Lorena Rosati Date: Fri, 27 Mar 2026 01:20:57 +0000 Subject: [PATCH 13/18] fixes --- kernel/src/log_segment.rs | 6 +++--- kernel/src/log_segment_files.rs | 30 +++++++++++++++--------------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 49c0909ed1..938fd967d6 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -299,11 +299,11 @@ impl LogSegment { .and_then(|hint| hint.checkpoint_schema.clone()); // The end_version is the time_travel_version, if present - // TODO: When max catalog version is implemented, we would use that as end_version if time_travel_version is not present + // TODO: When max catalog version is implemented, we would use that as end_version if + // time_travel_version is not present let end_version = time_travel_version; - // Keep the hint only if it doesn't point past end_version - // If there is no end_version bound, any hint is acceptable + // Keep the hint only if it points at or before end_version, or if there is no end_version bound let usable_hint = checkpoint_hint.filter(|cp| end_version.is_none_or(|v| cp.version <= v)); // Cases: diff --git a/kernel/src/log_segment_files.rs b/kernel/src/log_segment_files.rs index c8a0afc7eb..cc5f003d7f 100644 --- a/kernel/src/log_segment_files.rs +++ b/kernel/src/log_segment_files.rs @@ -127,10 +127,8 @@ fn group_checkpoint_parts(parts: Vec) -> HashMap Option { - files +fn find_complete_checkpoint_version(ascending_files: &[ParsedLogPath]) -> Option { + ascending_files .iter() .filter(|f| f.is_checkpoint() && f.location.size > 0) .chunk_by(|f| f.version) @@ -248,8 +246,7 @@ impl ListingAccumulator { } /// Number of versions covered by each backward-scan window in -/// `LogSegmentFiles::list_with_backward_checkpoint_scan`. The range `[lower, upper)` is -/// half-open, so a window size of 1000 covers exactly 1000 versions. +/// `LogSegmentFiles::list_with_backward_checkpoint_scan` const BACKWARD_SCAN_WINDOW_SIZE: u64 = 1000; impl LogSegmentFiles { @@ -463,21 +460,24 @@ impl LogSegmentFiles { Ok(listed_files) } - /// Lists log files by scanning backward from `end_version` in 1000-version windows until a - /// complete checkpoint is found or the log is exhausted. The resulting files are combined - /// with the `log_tail` (catalog-provided commits) to build a [`LogSegmentFiles`]. + /// Returns a [`LogSegmentFiles`] ending at `end_version`, rooted at the most recent complete + /// checkpoint at or before `end_version`, or rooted at version 0 if no checkpoint is found. /// - /// This avoids a full forward listing when we have an upper-bound version but no checkpoint - /// hint: instead of listing from version 0, we walk backward from `end_version` in bounded - /// windows, stopping as soon as we find a complete checkpoint (or reach version 0). + /// To find the checkpoint without a full forward listing from version 0, this scans backward + /// from `end_version` in windows of size [`BACKWARD_SCAN_WINDOW_SIZE`], stopping as soon as + /// a complete checkpoint is found (or version 0 is reached). + /// Then, all files from the windows that were scanned are combined with `log_tail` to produce a log segment + /// rooted at the checkpoint version (or version 0 if no checkpoint) with all commits after the + /// checkpoint version. A log_tail commit at exactly the checkpoint version may be included at this + /// stage but will be filtered out by `LogSegment::try_new`. /// - /// For example, given end_version = 12500 and a checkpoint at v8900: + /// For example, given the desired end_version = 12500 and a checkpoint at v8900: /// - Window 1 [11501, 12501): no checkpoint -> continue /// - Window 2 [10501, 11501): no checkpoint -> continue /// - Window 3 [9501, 10501): no checkpoint -> continue /// - Window 4 [8501, 9501): checkpoint at v8900 found -> stop - /// All files from windows 1-4 are then passed to `build_log_segment_files`, which - /// returns a log segment with the checkpoint at v8900 and all commits from v8901 to v12500 + /// All files from windows 1-4 are combined with `log_tail` to produce a log segment + /// rooted at the checkpoint at v8900 with all commits from v8901 to v12500. #[instrument(name = "log.list_with_backward_checkpoint_scan", skip_all, fields(end = end_version), err)] pub(crate) fn list_with_backward_checkpoint_scan( storage: &dyn StorageHandler, From 953fa813966fd7b44d03d654ba2732ed8c60b994 Mon Sep 17 00:00:00 2001 From: Lorena Rosati Date: Fri, 27 Mar 2026 23:12:37 +0000 Subject: [PATCH 14/18] comments, param name change --- kernel/src/log_segment_files.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/kernel/src/log_segment_files.rs b/kernel/src/log_segment_files.rs index cc5f003d7f..cafcf6133a 100644 --- a/kernel/src/log_segment_files.rs +++ b/kernel/src/log_segment_files.rs @@ -255,13 +255,13 @@ impl LogSegmentFiles { /// /// - `fs_files`: files listed from storage in ascending version order /// - `log_tail`: catalog-provided commits - /// - `log_tail_min_version`: lower bound (inclusive) for log_tail entries included in the - /// result + /// - `start_version`: start version of the entire listing range provided; in practice, + /// this is the lower bound (inclusive) for log_tail entries included in the result /// - `end_version`: upper bound (inclusive) on versions to include, `None` means no bound pub(crate) fn build_log_segment_files( fs_files: impl Iterator>, log_tail: Vec, - log_tail_min_version: Version, + start_version: Version, end_version: Option, ) -> DeltaResult { // check log_tail is only commits @@ -316,7 +316,7 @@ impl LogSegmentFiles { // here - LogSegment::try_new is the safeguard that filters those out unconditionally let filtered_log_tail = log_tail .into_iter() - .filter(|entry| entry.version >= log_tail_min_version && entry.version <= end); + .filter(|entry| entry.version >= start_version && entry.version <= end); for file in filtered_log_tail { // Track max published version for published commits from the log_tail if matches!(file.file_type, LogPathFileType::Commit) { @@ -417,7 +417,6 @@ impl LogSegmentFiles { let start = start_version.unwrap_or(0); let end = end_version.unwrap_or(Version::MAX); let fs_iter = list_from_storage(storage, log_root, start, end)?; - // We pass start as the log_tail_min_version to exclude log_tail entries before the listing start Self::build_log_segment_files(fs_iter, log_tail, start, end_version) } @@ -507,9 +506,8 @@ impl LogSegmentFiles { } let fs_iter = windows.into_iter().rev().flatten().map(Ok); - // We pass the checkpoint version as the log_tail_min_version to exclude log_tail entries before the checkpoint - let log_tail_min_version = found_checkpoint_version.unwrap_or(0); - Self::build_log_segment_files(fs_iter, log_tail, log_tail_min_version, Some(end_version)) + let start = found_checkpoint_version.unwrap_or(0); + Self::build_log_segment_files(fs_iter, log_tail, start, Some(end_version)) } } @@ -1329,7 +1327,7 @@ mod list_log_files_with_log_tail_tests { assert_eq!(crc.version, 6); assert!(matches!(crc.file_type, LogPathFileType::Crc)); - // v5 passes the log_tail_min_version filter (>= 5) and is included here + // v5 passes the start version filter (>= 5) and is included here assert_eq!(result.ascending_commit_files.len(), 4); for (i, commit) in result.ascending_commit_files.iter().enumerate() { assert_eq!(commit.version, (i + 5) as u64); From b55bee60afae841d613bd6946e1da13acf1e5fe4 Mon Sep 17 00:00:00 2001 From: Lorena Rosati Date: Mon, 30 Mar 2026 05:55:59 +0000 Subject: [PATCH 15/18] fix: use FETCH_HEAD instead of branch name in git checkout git checkout -- treats the argument as a pathspec, not a branch name, causing "pathspec did not match" errors. Using FETCH_HEAD avoids passing any user-controlled input to the checkout command entirely. --- .github/scripts/run-benchmarks.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/scripts/run-benchmarks.sh b/.github/scripts/run-benchmarks.sh index 03b9b68e59..0f21289660 100644 --- a/.github/scripts/run-benchmarks.sh +++ b/.github/scripts/run-benchmarks.sh @@ -85,7 +85,7 @@ echo "Parsed filter: ${FILTER:-}" # "changes" baseline files are preserved across the branch switch. # --------------------------------------------------------------------------- git fetch origin -- "$BASE_REF" -git checkout -- "$BASE_REF" +git checkout FETCH_HEAD (cd benchmarks && cargo bench --locked --bench workload_bench -- --save-baseline base "$FILTER") # --------------------------------------------------------------------------- From c651b7de92c2b7e7d48a1524b12ee5b6b2cb77d4 Mon Sep 17 00:00:00 2001 From: Lorena Rosati Date: Mon, 30 Mar 2026 06:21:45 +0000 Subject: [PATCH 16/18] fix: pass --repo to gh pr comment in post-comment job The post-comment job runs on a fresh runner with no checkout, so gh cannot auto-detect the repository from git. Pass --repo explicitly. --- .github/workflows/benchmark.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index cf9a56c524..dcd0973414 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -80,4 +80,5 @@ jobs: env: GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} PR_NUMBER: ${{ needs.run-benchmark.outputs.pr_number }} - run: gh pr comment "$PR_NUMBER" --body-file /tmp/bench-comment.md + REPO: ${{ github.repository }} + run: gh pr comment "$PR_NUMBER" --repo "$REPO" --body-file /tmp/bench-comment.md From 7a7215a25ac04ee8a46f927782d19650b02c45e8 Mon Sep 17 00:00:00 2001 From: Lorena Rosati Date: Mon, 30 Mar 2026 07:44:18 +0000 Subject: [PATCH 17/18] add summary to comment, readme changes --- .github/scripts/run-benchmarks.sh | 17 ++++++++--------- .github/workflows/benchmark.yml | 2 -- benchmarks/README.md | 14 ++++++++------ 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/.github/scripts/run-benchmarks.sh b/.github/scripts/run-benchmarks.sh index 0f21289660..81a67c85ba 100644 --- a/.github/scripts/run-benchmarks.sh +++ b/.github/scripts/run-benchmarks.sh @@ -10,11 +10,6 @@ # COMMENT - the /bench PR comment body # BASE_REF - base branch ref (e.g. "main") # HEAD_SHA - full SHA of the PR head commit -# -# Usage for local testing: -# COMMENT="/bench --tags base --filter snapshotConstruction" \ -# BASE_REF=main HEAD_SHA=abc1234 \ -# bash .github/scripts/run-benchmarks.sh set -euo pipefail shopt -s extglob @@ -23,7 +18,7 @@ shopt -s extglob # 1. Parse the /bench comment # Syntax: /bench [--tags ] [--filter ] # --tags sets BENCH_TAGS (comma-separated tag list); defaults to "base" -# when the comment is bare /bench with no recognized args +# when the comment is just /bench # --filter Criterion name regex passed as a positional arg to cargo bench # --------------------------------------------------------------------------- @@ -60,8 +55,7 @@ fi TAGS=$(printf '%s' "$TAGS" | tr -cd 'a-zA-Z0-9,_-') # Sanitize filter: strip control characters only, preserving regex metacharacters. -# The filter is always passed double-quoted to cargo bench so no shell injection -# is possible from the preserved printable characters. +# The filter is always passed double-quoted to cargo bench. FILTER=$(printf '%s' "$FILTER" | tr -d '\000-\037\177') # If nothing was parsed (unrecognized tokens, typos, missing values), default to "base" @@ -174,10 +168,15 @@ COMPARISON=$((cd benchmarks && critcmp base changes) | python3 /tmp/parse_critcm # and posts it as a PR comment using a step that holds GH_TOKEN. # --------------------------------------------------------------------------- SHORT_SHA="${HEAD_SHA:0:7}" + +SUMMARY="" +[[ -n "$TAGS" ]] && SUMMARY="tags: \`${TAGS}\`" +[[ -n "$FILTER" ]] && SUMMARY+="${SUMMARY:+ | }filter: \`${FILTER}\`" + { echo "## Benchmark for ${SHORT_SHA}" echo "
" - echo "Click to view benchmark" + echo "${SUMMARY}" echo "" echo "$COMPARISON" echo "" diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index dcd0973414..b214a6ce63 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -50,8 +50,6 @@ jobs: with: save-if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }} - name: Run benchmarks - # GH_TOKEN is intentionally absent from this step's env. Untrusted PR code - # (build.rs, bench files) must not have access to the write-capable token. # The comment is posted in the post-comment job after this job completes. env: COMMENT: ${{ github.event.comment.body }} diff --git a/benchmarks/README.md b/benchmarks/README.md index 579432f077..f0a51b12ab 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -91,17 +91,19 @@ To trigger benchmarks on a pull request, post a comment using the following synt /bench [--tags ] [--filter ] ``` -- `--tags` sets `BENCH_TAGS` (comma-separated), controlling which tables run. Defaults to `base` if omitted. -- `--filter` is a single-token Criterion regex matched against benchmark names. Optional. -- Both flags are independent and can be given in any order. +- `--tags` sets `BENCH_TAGS` (comma-separated), controlling which table groupings run. +- `--filter` is a single-token Criterion regex matched against benchmark names. +- Both flags are optional and independent; they can be given in any order. +- When both are specified, they apply as AND: only benchmarks from tables that match the tag filter AND whose name matches the regex are run. +- Running just `/bench` (with no flags) defaults to `BENCH_TAGS=base`. If neither flag is parsed, the same default applies. Examples: ``` /bench # BENCH_TAGS=base, all benchmark names /bench --tags base,my-tag # BENCH_TAGS=base,my-tag, all benchmark names -/bench --filter snapshotConstruction # BENCH_TAGS=base, only snapshotConstruction benchmarks -/bench --tags base --filter 101kAdds.*snapshotConstruction # combined: AND pattern -/bench --filter 101kAdds|10kAdds # OR two table names +/bench --filter snapshotConstruction # no BENCH_TAGS set, only snapshotConstruction benchmarks +/bench --tags base --filter 101kAdds.*snapshotConstruction # only snapshotConstruction benchmarks from tables tagged "base" +/bench --filter 101kAdds|10kAdds # no BENCH_TAGS set, OR two table names ``` See [By tag (`BENCH_TAGS`)](#by-tag-bench_tags) for how tags work and [By benchmark name](#by-benchmark-name) for regex pattern examples. Results are posted automatically as a PR comment, comparing the PR branch against the base branch. From 47d1fb26634975e7635f1d4e3ccec31b27ea6345 Mon Sep 17 00:00:00 2001 From: Lorena Rosati Date: Mon, 30 Mar 2026 19:27:49 +0000 Subject: [PATCH 18/18] moved python to new file, refactoring --- .github/scripts/run-benchmarks.sh | 117 +++++------------------------- benchmarks/ci/parse_critcmp.py | 68 +++++++++++++++++ 2 files changed, 88 insertions(+), 97 deletions(-) create mode 100644 benchmarks/ci/parse_critcmp.py diff --git a/.github/scripts/run-benchmarks.sh b/.github/scripts/run-benchmarks.sh index 81a67c85ba..197bd7030a 100644 --- a/.github/scripts/run-benchmarks.sh +++ b/.github/scripts/run-benchmarks.sh @@ -32,33 +32,25 @@ if [[ -z "$ARGS" ]]; then # Bare /bench with no args: default to the "base" tag TAGS="base" else - # Parse --tags and --filter flags; each takes the next whitespace-delimited - # token as its value. Unknown tokens are silently ignored. - while [[ -n "$ARGS" ]]; do - TOKEN="${ARGS%% *}" - ARGS="${ARGS#"$TOKEN"}" - ARGS="${ARGS##+( )}" - - if [[ "$TOKEN" == "--tags" ]]; then - TAGS="${ARGS%% *}" - ARGS="${ARGS#"$TAGS"}" - ARGS="${ARGS##+( )}" - elif [[ "$TOKEN" == "--filter" ]]; then - FILTER="${ARGS%% *}" - ARGS="${ARGS#"$FILTER"}" - ARGS="${ARGS##+( )}" - fi + # Normalize: strip /bench prefix, collapse all whitespace (including newlines) + # to spaces, then strip to a safe allowlist before parsing + ARGS=$(printf '%s' "$ARGS" | tr '\n\r\t' ' ' | tr -s ' ' | tr -cd 'a-zA-Z0-9,_./|*+?()[]^$ -') + ARGS="${ARGS## }" # strip leading space + ARGS="${ARGS%% }" # strip trailing space + + read -ra TOKENS <<< "$ARGS" + i=0 + while [[ $i -lt ${#TOKENS[@]} ]]; do + case "${TOKENS[$i]}" in + --tags) i=$((i + 1)); TAGS="${TOKENS[$i]:-}" ;; + --filter) i=$((i + 1)); FILTER="${TOKENS[$i]:-}" ;; + *) echo "Unknown token: '${TOKENS[$i]}'" >&2; exit 1 ;; + esac + i=$((i + 1)) done fi -# Sanitize tags: strict allowlist (alphanumeric, comma, underscore, hyphen) -TAGS=$(printf '%s' "$TAGS" | tr -cd 'a-zA-Z0-9,_-') - -# Sanitize filter: strip control characters only, preserving regex metacharacters. -# The filter is always passed double-quoted to cargo bench. -FILTER=$(printf '%s' "$FILTER" | tr -d '\000-\037\177') - -# If nothing was parsed (unrecognized tokens, typos, missing values), default to "base" +# Default: if nothing was parsed, run with BENCH_TAGS=base if [[ -z "$TAGS" && -z "$FILTER" ]]; then TAGS="base" fi @@ -84,83 +76,14 @@ git checkout FETCH_HEAD # --------------------------------------------------------------------------- # 4. Compare baselines with critcmp and format as a markdown table. -# Replicates criterion-compare-action's output: # - Parses actual duration values (not rank factors) for the % column # - Bolds the faster duration and % cell when the difference is # statistically significant (error bounds do not overlap) # --------------------------------------------------------------------------- -cat > /tmp/parse_critcmp.py << 'PYEOF' -import sys, re - -def to_seconds(value, units): - u = units.strip() - if u == 's': return value - if u == 'ms': return value / 1e3 - if u in ('µs', 'us', 'μs'): return value / 1e6 - if u == 'ns': return value / 1e9 - return value - -def is_significant(chg_dur, chg_err, base_dur, base_err): - if chg_dur < base_dur: - return chg_dur + chg_err < base_dur or base_dur - base_err > chg_dur - else: - return chg_dur - chg_err > base_dur or base_dur + base_err < chg_dur - -def parse_duration(s): - m = re.match(r'([0-9.]+)±([0-9.]+)(.+)', s.strip()) - if not m: - return None - return float(m.group(1)), float(m.group(2)), m.group(3).strip() - -lines = sys.stdin.read().splitlines() -print("| Test | Base | PR | % |") -print("|------|--------------|------------------|---|") - -for line in lines[2:]: # skip critcmp header rows - if not line.strip(): - continue - # critcmp columns (split on 2+ spaces): - # with throughput: name, baseFactor, baseDuration, baseBandwidth, changesFactor, changesDuration, changesBandwidth - # without throughput: name, baseFactor, baseDuration, changesFactor, changesDuration - # Locate duration fields by the presence of "±" rather than hardcoding indices, - # so the script works correctly regardless of whether bandwidth columns are present. - fields = re.split(r' +', line) - name = fields[0].strip().replace('|', r'\|') if fields else '' - dur_fields = [f.strip() for f in fields[1:] if '±' in f] - base_dur_str = dur_fields[0] if len(dur_fields) > 0 else None - chg_dur_str = dur_fields[1] if len(dur_fields) > 1 else None - - if not name and not base_dur_str and not chg_dur_str: - continue - - base_display = base_dur_str or 'N/A' - chg_display = chg_dur_str or 'N/A' - difference = 'N/A' - - if base_dur_str and chg_dur_str: - base_p = parse_duration(base_dur_str) - chg_p = parse_duration(chg_dur_str) - if base_p and chg_p: - base_secs = to_seconds(base_p[0], base_p[2]) - base_err_secs = to_seconds(base_p[1], base_p[2]) - chg_secs = to_seconds(chg_p[0], chg_p[2]) - chg_err_secs = to_seconds(chg_p[1], chg_p[2]) - - pct = -(1 - chg_secs / base_secs) * 100 - prefix = '' if chg_secs <= base_secs else '+' - difference = f'{prefix}{pct:.2f}%' - - if is_significant(chg_secs, chg_err_secs, base_secs, base_err_secs): - if chg_secs < base_secs: - chg_display = f'**{chg_dur_str}**' - elif chg_secs > base_secs: - base_display = f'**{base_dur_str}**' - difference = f'**{difference}**' - - print(f'| {name} | {base_display} | {chg_display} | {difference} |') -PYEOF - -COMPARISON=$((cd benchmarks && critcmp base changes) | python3 /tmp/parse_critcmp.py) +# Use `critcmp` to compare the criterion output for `base` and `changes`. We use `critcmp` instead of manually +# parsing criterion outputs because criterion may update its output format. By using `critcmp`, we inherit all +# updated criterion output parsing. +COMPARISON=$((cd benchmarks && critcmp base changes) | python3 benchmarks/ci/parse_critcmp.py) # --------------------------------------------------------------------------- # 5. Write results to /tmp/bench-comment.md diff --git a/benchmarks/ci/parse_critcmp.py b/benchmarks/ci/parse_critcmp.py new file mode 100644 index 0000000000..3a2d7b4c4d --- /dev/null +++ b/benchmarks/ci/parse_critcmp.py @@ -0,0 +1,68 @@ +import sys, re + +def to_seconds(value, units): + u = units.strip() + if u == 's': return value + if u == 'ms': return value / 1e3 + if u in ('µs', 'us', 'μs'): return value / 1e6 + if u == 'ns': return value / 1e9 + return value + +def is_significant(chg_dur, chg_err, base_dur, base_err): + if chg_dur < base_dur: + return chg_dur + chg_err < base_dur or base_dur - base_err > chg_dur + else: + return chg_dur - chg_err > base_dur or base_dur + base_err < chg_dur + +def parse_duration(s): + m = re.match(r'([0-9.]+)±([0-9.]+)(.+)', s.strip()) + if not m: + return None + return float(m.group(1)), float(m.group(2)), m.group(3).strip() + +lines = sys.stdin.read().splitlines() +print("| Test | Base | PR | % |") +print("|------|--------------|------------------|---|") + +for line in lines[2:]: # skip critcmp header rows + if not line.strip(): + continue + # critcmp columns (split on 2+ spaces): + # with throughput: name, baseFactor, baseDuration, baseBandwidth, changesFactor, changesDuration, changesBandwidth + # without throughput: name, baseFactor, baseDuration, changesFactor, changesDuration + # Locate duration fields by the presence of "±" rather than hardcoding indices, + # so the script works correctly regardless of whether bandwidth columns are present. + fields = re.split(r' +', line) + name = fields[0].strip().replace('|', r'\|') if fields else '' + dur_fields = [f.strip() for f in fields[1:] if '±' in f] + base_dur_str = dur_fields[0] if len(dur_fields) > 0 else None + chg_dur_str = dur_fields[1] if len(dur_fields) > 1 else None + + if not name and not base_dur_str and not chg_dur_str: + continue + + base_display = base_dur_str or 'N/A' + chg_display = chg_dur_str or 'N/A' + difference = 'N/A' + + if base_dur_str and chg_dur_str: + base_p = parse_duration(base_dur_str) + chg_p = parse_duration(chg_dur_str) + if base_p and chg_p: + base_secs = to_seconds(base_p[0], base_p[2]) + base_err_secs = to_seconds(base_p[1], base_p[2]) + chg_secs = to_seconds(chg_p[0], chg_p[2]) + chg_err_secs = to_seconds(chg_p[1], chg_p[2]) + + pct = -(1 - chg_secs / base_secs) * 100 + prefix = '' if chg_secs <= base_secs else '+' + difference = f'{prefix}{pct:.2f}%' + + if is_significant(chg_secs, chg_err_secs, base_secs, base_err_secs): + if chg_secs < base_secs: + chg_display = f'**{chg_dur_str}**' + elif chg_secs > base_secs: + base_display = f'**{base_dur_str}**' + difference = f'**{difference}**' + + print(f'| {name} | {base_display} | {chg_display} | {difference} |')