diff --git a/.github/scripts/run-benchmarks.sh b/.github/scripts/run-benchmarks.sh new file mode 100644 index 0000000000..197bd7030a --- /dev/null +++ b/.github/scripts/run-benchmarks.sh @@ -0,0 +1,107 @@ +#!/usr/bin/env bash +# Benchmark comparison script for pull requests. +# +# Called by .github/workflows/benchmark.yml (run-benchmark job) after the repo +# has been checked out at the PR head. Writes the formatted markdown comparison +# to /tmp/bench-comment.md; the post-comment job picks it up and posts it. +# +# Expects the following environment variables: +# +# COMMENT - the /bench PR comment body +# BASE_REF - base branch ref (e.g. "main") +# HEAD_SHA - full SHA of the PR head commit + +set -euo pipefail +shopt -s extglob + +# --------------------------------------------------------------------------- +# 1. Parse the /bench comment +# Syntax: /bench [--tags ] [--filter ] +# --tags sets BENCH_TAGS (comma-separated tag list); defaults to "base" +# when the comment is just /bench +# --filter Criterion name regex passed as a positional arg to cargo bench +# --------------------------------------------------------------------------- + +ARGS="${COMMENT#/bench}" +ARGS="${ARGS##+( )}" + +TAGS="" +FILTER="" + +if [[ -z "$ARGS" ]]; then + # Bare /bench with no args: default to the "base" tag + TAGS="base" +else + # Normalize: strip /bench prefix, collapse all whitespace (including newlines) + # to spaces, then strip to a safe allowlist before parsing + ARGS=$(printf '%s' "$ARGS" | tr '\n\r\t' ' ' | tr -s ' ' | tr -cd 'a-zA-Z0-9,_./|*+?()[]^$ -') + ARGS="${ARGS## }" # strip leading space + ARGS="${ARGS%% }" # strip trailing space + + read -ra TOKENS <<< "$ARGS" + i=0 + while [[ $i -lt ${#TOKENS[@]} ]]; do + case "${TOKENS[$i]}" in + --tags) i=$((i + 1)); TAGS="${TOKENS[$i]:-}" ;; + --filter) i=$((i + 1)); FILTER="${TOKENS[$i]:-}" ;; + *) echo "Unknown token: '${TOKENS[$i]}'" >&2; exit 1 ;; + esac + i=$((i + 1)) + done +fi + +# Default: if nothing was parsed, run with BENCH_TAGS=base +if [[ -z "$TAGS" && -z "$FILTER" ]]; then + TAGS="base" +fi + +echo "Parsed tags: ${TAGS:-}" +echo "Parsed filter: ${FILTER:-}" + +[[ -n "$TAGS" ]] && export BENCH_TAGS="$TAGS" + +# --------------------------------------------------------------------------- +# 2. Benchmark the PR branch (already checked out by the workflow) +# --------------------------------------------------------------------------- +(cd benchmarks && cargo bench --locked --bench workload_bench -- --save-baseline changes "$FILTER") + +# --------------------------------------------------------------------------- +# 3. Switch to the base branch and benchmark it +# The benchmarks/target/ directory is not tracked by git, so the +# "changes" baseline files are preserved across the branch switch. +# --------------------------------------------------------------------------- +git fetch origin -- "$BASE_REF" +git checkout FETCH_HEAD +(cd benchmarks && cargo bench --locked --bench workload_bench -- --save-baseline base "$FILTER") + +# --------------------------------------------------------------------------- +# 4. Compare baselines with critcmp and format as a markdown table. +# - Parses actual duration values (not rank factors) for the % column +# - Bolds the faster duration and % cell when the difference is +# statistically significant (error bounds do not overlap) +# --------------------------------------------------------------------------- +# Use `critcmp` to compare the criterion output for `base` and `changes`. We use `critcmp` instead of manually +# parsing criterion outputs because criterion may update its output format. By using `critcmp`, we inherit all +# updated criterion output parsing. +COMPARISON=$((cd benchmarks && critcmp base changes) | python3 benchmarks/ci/parse_critcmp.py) + +# --------------------------------------------------------------------------- +# 5. Write results to /tmp/bench-comment.md +# The post-comment job in benchmark.yml downloads this file as an artifact +# and posts it as a PR comment using a step that holds GH_TOKEN. +# --------------------------------------------------------------------------- +SHORT_SHA="${HEAD_SHA:0:7}" + +SUMMARY="" +[[ -n "$TAGS" ]] && SUMMARY="tags: \`${TAGS}\`" +[[ -n "$FILTER" ]] && SUMMARY+="${SUMMARY:+ | }filter: \`${FILTER}\`" + +{ + echo "## Benchmark for ${SHORT_SHA}" + echo "
" + echo "${SUMMARY}" + echo "" + echo "$COMPARISON" + echo "" + echo "
" +} > /tmp/bench-comment.md diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 46a04b2dea..b214a6ce63 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -6,35 +6,39 @@ on: types: [created, edited] name: Benchmarking PR performance jobs: - runBenchmark: + run-benchmark: name: Run benchmarks if: > github.event.issue.pull_request && (github.event.comment.body == '/bench' || startsWith(github.event.comment.body, '/bench ')) runs-on: ubuntu-latest permissions: - pull-requests: write + contents: read + outputs: + pr_number: ${{ steps.pr.outputs.pr_number }} steps: - - name: Parse benchmark tags - env: - COMMENT: ${{ github.event.comment.body }} - run: | - if [[ "$COMMENT" == "/bench" ]]; then - TAGS="base" - else - TAGS="${COMMENT#/bench }" - TAGS=$(echo "$TAGS" | tr -d '[:space:]') - fi - echo "BENCH_TAGS=$TAGS" >> "$GITHUB_ENV" - echo "Parsed tags: $TAGS" - - name: Get PR HEAD sha + - name: Get PR metadata id: pr - run: | - PR_DATA=$(gh api repos/${{ github.repository }}/pulls/${{ github.event.issue.number }}) - echo "head_sha=$(echo "$PR_DATA" | jq -r .head.sha)" >> "$GITHUB_OUTPUT" - echo "base_ref=$(echo "$PR_DATA" | jq -r .base.ref)" >> "$GITHUB_OUTPUT" env: GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + REPO: ${{ github.repository }} + PR_NUMBER: ${{ github.event.issue.number }} + run: | + PR_DATA=$(gh api "repos/$REPO/pulls/$PR_NUMBER") + HEAD_SHA=$(echo "$PR_DATA" | jq -r .head.sha) + BASE_REF=$(echo "$PR_DATA" | jq -r .base.ref) + [[ "$HEAD_SHA" == *$'\n'* || "$BASE_REF" == *$'\n'* ]] && { echo "Unexpected newline in API response" >&2; exit 1; } + [[ "$BASE_REF" =~ ^[a-zA-Z0-9/_.-]+$ ]] || { echo "Invalid BASE_REF: $BASE_REF" >&2; exit 1; } + printf 'head_sha=%s\n' "$HEAD_SHA" >> "$GITHUB_OUTPUT" + printf 'base_ref=%s\n' "$BASE_REF" >> "$GITHUB_OUTPUT" + printf 'pr_number=%s\n' "$PR_NUMBER" >> "$GITHUB_OUTPUT" + - name: Install critcmp + # Installed before checkout so the PR's .cargo/config.toml cannot + # redirect the registry to a malicious source. The runner's + # pre-installed Rust is sufficient -- no toolchain setup needed here. + # --locked is omitted for cargo install (same exemption as cargo miri + # setup); --version pins the top-level crate. + run: cargo install critcmp --version 0.1.8 - uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1 with: ref: ${{ steps.pr.outputs.head_sha }} @@ -45,13 +49,34 @@ jobs: - uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 # v2 with: save-if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }} - # TODO: This action internally runs `cargo bench` without --locked, bypassing our - # supply chain lockfile policy (see build.yml top-level comment). Replace with - # manual cargo bench --locked + critcmp + gh pr comment steps. - # - uses: boa-dev/criterion-compare-action@adfd3a94634fe2041ce5613eb7df09d247555b87 # v3.2.4 - # with: - # token: ${{ secrets.GITHUB_TOKEN }} - # branchName: ${{ steps.pr.outputs.base_ref }} - # cwd: benchmarks - # benchName: workload_bench - - run: echo "Benchmarking is temporarily disabled. See TODO above." + - name: Run benchmarks + # The comment is posted in the post-comment job after this job completes. + env: + COMMENT: ${{ github.event.comment.body }} + BASE_REF: ${{ steps.pr.outputs.base_ref }} + HEAD_SHA: ${{ steps.pr.outputs.head_sha }} + run: bash .github/scripts/run-benchmarks.sh + - name: Upload benchmark comment + uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2 + with: + name: bench-comment + path: /tmp/bench-comment.md + + post-comment: + name: Post benchmark results + needs: run-benchmark + runs-on: ubuntu-latest + permissions: + pull-requests: write + steps: + - name: Download benchmark comment + uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0 + with: + name: bench-comment + path: /tmp/ + - name: Post results as PR comment + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + PR_NUMBER: ${{ needs.run-benchmark.outputs.pr_number }} + REPO: ${{ github.repository }} + run: gh pr comment "$PR_NUMBER" --repo "$REPO" --body-file /tmp/bench-comment.md diff --git a/benchmarks/README.md b/benchmarks/README.md index d98a8ec379..f0a51b12ab 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -85,12 +85,28 @@ BENCH_TAGS=base,my-feature cargo bench -p delta_kernel_benchmarks ### Running benchmarking on a PR -To trigger benchmarks on a pull request, post a comment on the PR with one of the following: +To trigger benchmarks on a pull request, post a comment using the following syntax: -- `/bench` — runs benchmarks with `BENCH_TAGS=base` -- `/bench ` — runs benchmarks with `BENCH_TAGS=` (e.g. `bench base,tag1`) +``` +/bench [--tags ] [--filter ] +``` + +- `--tags` sets `BENCH_TAGS` (comma-separated), controlling which table groupings run. +- `--filter` is a single-token Criterion regex matched against benchmark names. +- Both flags are optional and independent; they can be given in any order. +- When both are specified, they apply as AND: only benchmarks from tables that match the tag filter AND whose name matches the regex are run. +- Running just `/bench` (with no flags) defaults to `BENCH_TAGS=base`. If neither flag is parsed, the same default applies. + +Examples: +``` +/bench # BENCH_TAGS=base, all benchmark names +/bench --tags base,my-tag # BENCH_TAGS=base,my-tag, all benchmark names +/bench --filter snapshotConstruction # no BENCH_TAGS set, only snapshotConstruction benchmarks +/bench --tags base --filter 101kAdds.*snapshotConstruction # only snapshotConstruction benchmarks from tables tagged "base" +/bench --filter 101kAdds|10kAdds # no BENCH_TAGS set, OR two table names +``` -See [By tag (`BENCH_TAGS`)](#by-tag-bench_tags) for details on how tags work. Results are posted automatically as a PR comment, comparing the PR branch against the base branch. +See [By tag (`BENCH_TAGS`)](#by-tag-bench_tags) for how tags work and [By benchmark name](#by-benchmark-name) for regex pattern examples. Results are posted automatically as a PR comment, comparing the PR branch against the base branch. ## Workload data layout diff --git a/benchmarks/ci/parse_critcmp.py b/benchmarks/ci/parse_critcmp.py new file mode 100644 index 0000000000..3a2d7b4c4d --- /dev/null +++ b/benchmarks/ci/parse_critcmp.py @@ -0,0 +1,68 @@ +import sys, re + +def to_seconds(value, units): + u = units.strip() + if u == 's': return value + if u == 'ms': return value / 1e3 + if u in ('µs', 'us', 'μs'): return value / 1e6 + if u == 'ns': return value / 1e9 + return value + +def is_significant(chg_dur, chg_err, base_dur, base_err): + if chg_dur < base_dur: + return chg_dur + chg_err < base_dur or base_dur - base_err > chg_dur + else: + return chg_dur - chg_err > base_dur or base_dur + base_err < chg_dur + +def parse_duration(s): + m = re.match(r'([0-9.]+)±([0-9.]+)(.+)', s.strip()) + if not m: + return None + return float(m.group(1)), float(m.group(2)), m.group(3).strip() + +lines = sys.stdin.read().splitlines() +print("| Test | Base | PR | % |") +print("|------|--------------|------------------|---|") + +for line in lines[2:]: # skip critcmp header rows + if not line.strip(): + continue + # critcmp columns (split on 2+ spaces): + # with throughput: name, baseFactor, baseDuration, baseBandwidth, changesFactor, changesDuration, changesBandwidth + # without throughput: name, baseFactor, baseDuration, changesFactor, changesDuration + # Locate duration fields by the presence of "±" rather than hardcoding indices, + # so the script works correctly regardless of whether bandwidth columns are present. + fields = re.split(r' +', line) + name = fields[0].strip().replace('|', r'\|') if fields else '' + dur_fields = [f.strip() for f in fields[1:] if '±' in f] + base_dur_str = dur_fields[0] if len(dur_fields) > 0 else None + chg_dur_str = dur_fields[1] if len(dur_fields) > 1 else None + + if not name and not base_dur_str and not chg_dur_str: + continue + + base_display = base_dur_str or 'N/A' + chg_display = chg_dur_str or 'N/A' + difference = 'N/A' + + if base_dur_str and chg_dur_str: + base_p = parse_duration(base_dur_str) + chg_p = parse_duration(chg_dur_str) + if base_p and chg_p: + base_secs = to_seconds(base_p[0], base_p[2]) + base_err_secs = to_seconds(base_p[1], base_p[2]) + chg_secs = to_seconds(chg_p[0], chg_p[2]) + chg_err_secs = to_seconds(chg_p[1], chg_p[2]) + + pct = -(1 - chg_secs / base_secs) * 100 + prefix = '' if chg_secs <= base_secs else '+' + difference = f'{prefix}{pct:.2f}%' + + if is_significant(chg_secs, chg_err_secs, base_secs, base_err_secs): + if chg_secs < base_secs: + chg_display = f'**{chg_dur_str}**' + elif chg_secs > base_secs: + base_display = f'**{base_dur_str}**' + difference = f'**{difference}**' + + print(f'| {name} | {base_display} | {chg_display} | {difference} |') diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 46d0c2aa06..938fd967d6 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -298,20 +298,38 @@ impl LogSegment { .as_ref() .and_then(|hint| hint.checkpoint_schema.clone()); - let listed_files = match (checkpoint_hint, time_travel_version) { - (Some(cp), None) => { - LogSegmentFiles::list_with_checkpoint_hint(&cp, storage, &log_root, log_tail, None)? - } - (Some(cp), Some(end_version)) if cp.version <= end_version => { - LogSegmentFiles::list_with_checkpoint_hint( - &cp, - storage, - &log_root, - log_tail, - Some(end_version), - )? - } - _ => LogSegmentFiles::list(storage, &log_root, log_tail, None, time_travel_version)?, + // The end_version is the time_travel_version, if present + // TODO: When max catalog version is implemented, we would use that as end_version if + // time_travel_version is not present + let end_version = time_travel_version; + + // Keep the hint only if it points at or before end_version, or if there is no end_version bound + let usable_hint = checkpoint_hint.filter(|cp| end_version.is_none_or(|v| cp.version <= v)); + + // Cases: + // + // 1. usable_hint present, end_version is Some --> list_with_checkpoint_hint from hint.version TO end_version + // 2. usable_hint present, end_version is None --> list_with_checkpoint_hint from hint.version unbounded + // 3. no usable_hint, end_version is Some --> backward-scan for checkpoint before end_version, + // list from that checkpoint TO end_version + // (falls back to v0 if no checkpoint found) + // 4. no usable_hint, end_version is None --> list from v0 unbounded + + let listed_files = match (usable_hint, end_version) { + // Cases 1 and 2 + (Some(cp), end_version) => LogSegmentFiles::list_with_checkpoint_hint( + &cp, + storage, + &log_root, + log_tail, + end_version, + )?, + // Case 3 + (None, Some(end)) => LogSegmentFiles::list_with_backward_checkpoint_scan( + storage, &log_root, log_tail, end, + )?, + // Case 4 + (None, None) => LogSegmentFiles::list(storage, &log_root, log_tail, None, None)?, }; LogSegment::try_new( diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index 6da327e36a..d66df9fe75 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -829,6 +829,62 @@ async fn build_snapshot_with_start_checkpoint_and_time_travel_version() { assert_eq!(log_segment.listed.ascending_commit_files[0].version, 4); } +#[rstest::rstest] +#[case::no_hint(None)] +#[case::stale_hint(Some(LastCheckpointHint { + version: 10, // stale: 10 > end_version 5, so it is discarded + size: 10, + parts: None, + size_in_bytes: None, + num_of_add_files: None, + checkpoint_schema: None, + checksum: None, + tags: None, +}))] +#[tokio::test] +async fn build_snapshot_time_travel_no_checkpoint_falls_back_to_v0( + #[case] hint: Option, +) { + let paths: Vec = (0..=5).map(|v| delta_path_for_version(v, "json")).collect(); + let (storage, log_root) = build_log_with_paths_and_checkpoint(&paths, None).await; + + let log_segment = + LogSegment::for_snapshot_impl(storage.as_ref(), log_root, vec![], hint, Some(5)).unwrap(); + + let commit_files = log_segment.listed.ascending_commit_files; + let checkpoint_parts = log_segment.listed.checkpoint_parts; + + assert_eq!(checkpoint_parts.len(), 0); + let versions = commit_files.into_iter().map(|x| x.version).collect_vec(); + assert_eq!(versions, vec![0, 1, 2, 3, 4, 5]); +} + +#[tokio::test] +async fn build_snapshot_time_travel_no_hint_checkpoint_at_end_version_included() { + let (storage, log_root) = build_log_with_paths_and_checkpoint( + &[ + delta_path_for_version(0, "json"), + delta_path_for_version(1, "json"), + delta_path_for_version(2, "json"), + delta_path_for_version(3, "json"), + delta_path_for_version(4, "json"), + delta_path_for_version(5, "json"), + delta_path_for_version(5, "checkpoint.parquet"), + ], + None, + ) + .await; + + let log_segment = + LogSegment::for_snapshot_impl(storage.as_ref(), log_root, vec![], None, Some(5)).unwrap(); + + let commit_files = log_segment.listed.ascending_commit_files; + let checkpoint_parts = log_segment.listed.checkpoint_parts; + assert_eq!(checkpoint_parts.len(), 1); + assert_eq!(checkpoint_parts[0].version, 5); + assert_eq!(commit_files.len(), 0); +} + #[tokio::test] async fn build_table_changes_with_commit_versions() { let (storage, log_root) = build_log_with_paths_and_checkpoint( diff --git a/kernel/src/log_segment_files.rs b/kernel/src/log_segment_files.rs index 3355e9af24..cafcf6133a 100644 --- a/kernel/src/log_segment_files.rs +++ b/kernel/src/log_segment_files.rs @@ -1,12 +1,16 @@ //! [`LogSegmentFiles`] is a struct holding the result of listing the delta log. Currently, it -//! exposes three APIs for listing: +//! exposes four APIs for listing: //! 1. [`list_commits`]: Lists all commit files between the provided start and end versions. //! 2. [`list`]: Lists all commit and checkpoint files between the provided start and end versions. //! 3. [`list_with_checkpoint_hint`]: Lists all commit and checkpoint files after the provided //! checkpoint hint. +//! 4. [`list_with_backward_checkpoint_scan`]: Scans backward from an end version in +//! 1000-version windows until a complete checkpoint is found or the log is exhausted. //! //! After listing, one can leverage the [`LogSegmentFiles`] to construct a [`LogSegment`]. //! +//! [`list_with_backward_checkpoint_scan`]: Self::list_with_backward_checkpoint_scan +//! //! [`list_commits`]: Self::list_commits //! [`list`]: Self::list //! [`list_with_checkpoint_hint`]: Self::list_with_checkpoint_hint @@ -121,6 +125,24 @@ fn group_checkpoint_parts(parts: Vec) -> HashMap Option { + ascending_files + .iter() + .filter(|f| f.is_checkpoint() && f.location.size > 0) + .chunk_by(|f| f.version) + .into_iter() + .filter_map(|(version, parts)| { + let owned: Vec = parts.cloned().collect(); + group_checkpoint_parts(owned) + .iter() + .any(|(num_parts, part_files)| part_files.len() == *num_parts as usize) + .then_some(version) + }) + .last() +} + /// Accumulates and groups log files during listing. Each "group" consists of all files that /// share the same version number (e.g., commit, checkpoint parts, CRC files). /// @@ -223,15 +245,23 @@ impl ListingAccumulator { } } +/// Number of versions covered by each backward-scan window in +/// `LogSegmentFiles::list_with_backward_checkpoint_scan` +const BACKWARD_SCAN_WINDOW_SIZE: u64 = 1000; + impl LogSegmentFiles { /// Assembles a `LogSegmentFiles` from `fs_files` (an iterator of files /// listed from storage) and `log_tail` (catalog-provided commits). - // This logic is identical to the old `list()` implementation; it was extracted so that - // callers with different listing strategies can share the same core logic. - fn build_log_segment_files( + /// + /// - `fs_files`: files listed from storage in ascending version order + /// - `log_tail`: catalog-provided commits + /// - `start_version`: start version of the entire listing range provided; in practice, + /// this is the lower bound (inclusive) for log_tail entries included in the result + /// - `end_version`: upper bound (inclusive) on versions to include, `None` means no bound + pub(crate) fn build_log_segment_files( fs_files: impl Iterator>, log_tail: Vec, - start: Version, + start_version: Version, end_version: Option, ) -> DeltaResult { // check log_tail is only commits @@ -241,8 +271,9 @@ impl LogSegmentFiles { "log_tail should only contain commits" ); - let end = end_version.unwrap_or(Version::MAX); let log_tail_start_version = log_tail.first().map(|f| f.version); + let end = end_version.unwrap_or(Version::MAX); + let mut acc = ListingAccumulator { end_version, ..Default::default() @@ -280,9 +311,12 @@ impl LogSegmentFiles { // Phase 1 maintains ascending version order throughout, which is required by the checkpoint // grouping logic. Note that Phase 1 already skipped filesystem commits at log_tail // versions, so there's no duplication here. + // + // log_tail entries at versions before a checkpoint may still be included + // here - LogSegment::try_new is the safeguard that filters those out unconditionally let filtered_log_tail = log_tail .into_iter() - .filter(|entry| entry.version >= start && entry.version <= end); + .filter(|entry| entry.version >= start_version && entry.version <= end); for file in filtered_log_tail { // Track max published version for published commits from the log_tail if matches!(file.file_type, LogPathFileType::Commit) { @@ -388,7 +422,6 @@ impl LogSegmentFiles { /// List all commit and checkpoint files after the provided checkpoint. It is guaranteed that all /// the returned [`ParsedLogPath`]s will have a version less than or equal to the `end_version`. - /// See [`list_log_files_with_version`] for details on the return type. pub(crate) fn list_with_checkpoint_hint( checkpoint_metadata: &LastCheckpointHint, storage: &dyn StorageHandler, @@ -405,7 +438,7 @@ impl LogSegmentFiles { )?; let Some(latest_checkpoint) = listed_files.checkpoint_parts.last() else { - // TODO: We could potentially recover here + // Kernel should not compensate for corrupt tables, so we fail if we can't find a checkpoint return Err(Error::invalid_checkpoint( "Had a _last_checkpoint hint but didn't find any checkpoints", )); @@ -425,14 +458,68 @@ impl LogSegmentFiles { } Ok(listed_files) } + + /// Returns a [`LogSegmentFiles`] ending at `end_version`, rooted at the most recent complete + /// checkpoint at or before `end_version`, or rooted at version 0 if no checkpoint is found. + /// + /// To find the checkpoint without a full forward listing from version 0, this scans backward + /// from `end_version` in windows of size [`BACKWARD_SCAN_WINDOW_SIZE`], stopping as soon as + /// a complete checkpoint is found (or version 0 is reached). + /// Then, all files from the windows that were scanned are combined with `log_tail` to produce a log segment + /// rooted at the checkpoint version (or version 0 if no checkpoint) with all commits after the + /// checkpoint version. A log_tail commit at exactly the checkpoint version may be included at this + /// stage but will be filtered out by `LogSegment::try_new`. + /// + /// For example, given the desired end_version = 12500 and a checkpoint at v8900: + /// - Window 1 [11501, 12501): no checkpoint -> continue + /// - Window 2 [10501, 11501): no checkpoint -> continue + /// - Window 3 [9501, 10501): no checkpoint -> continue + /// - Window 4 [8501, 9501): checkpoint at v8900 found -> stop + /// All files from windows 1-4 are combined with `log_tail` to produce a log segment + /// rooted at the checkpoint at v8900 with all commits from v8901 to v12500. + #[instrument(name = "log.list_with_backward_checkpoint_scan", skip_all, fields(end = end_version), err)] + pub(crate) fn list_with_backward_checkpoint_scan( + storage: &dyn StorageHandler, + log_root: &Url, + log_tail: Vec, + end_version: Version, + ) -> DeltaResult { + // Scan backward in 1000-version windows, collecting ALL file types, until a complete + // checkpoint is found or the log is exhausted. + let mut windows: Vec> = Vec::new(); + let mut found_checkpoint_version: Option = None; + // upper is the exclusive upper bound of the next window; adding 1 includes end_version + // in the first window. The inclusive range passed to list_from_storage is [lower, upper - 1]. + let mut upper = end_version + 1; + while upper > 0 { + let lower = upper.saturating_sub(BACKWARD_SCAN_WINDOW_SIZE); + let window_files: Vec<_> = + list_from_storage(storage, log_root, lower, upper - 1)?.try_collect()?; + + found_checkpoint_version = find_complete_checkpoint_version(&window_files); + windows.push(window_files); + + if found_checkpoint_version.is_some() { + break; + } + upper = lower; + } + + let fs_iter = windows.into_iter().rev().flatten().map(Ok); + let start = found_checkpoint_version.unwrap_or(0); + Self::build_log_segment_files(fs_iter, log_tail, start, Some(end_version)) + } } #[cfg(test)] mod list_log_files_with_log_tail_tests { + use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use url::Url; + use rstest::rstest; + use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::engine::default::filesystem::ObjectStoreStorageHandler; use crate::object_store::{memory::InMemory, path::Path as ObjectPath, ObjectStore}; @@ -545,6 +632,56 @@ mod list_log_files_with_log_tail_tests { ); } + /// A [`StorageHandler`] wrapper that counts the number of `list_from` calls. + /// Used to verify that `list_with_backward_checkpoint_scan` issues the expected + /// number of storage listing requests. + struct CountingStorageHandler { + inner: Box, + list_from_count: AtomicU32, + } + + impl CountingStorageHandler { + fn new(inner: Box) -> Self { + Self { + inner, + list_from_count: AtomicU32::new(0), + } + } + + fn call_count(&self) -> u32 { + self.list_from_count.load(Ordering::Relaxed) + } + } + + impl StorageHandler for CountingStorageHandler { + fn list_from( + &self, + path: &Url, + ) -> DeltaResult>>> { + self.list_from_count.fetch_add(1, Ordering::Relaxed); + self.inner.list_from(path) + } + + fn read_files( + &self, + _files: Vec, + ) -> DeltaResult>>> { + panic!("read_files should not be called during listing"); + } + + fn put(&self, _path: &Url, _data: bytes::Bytes, _overwrite: bool) -> DeltaResult<()> { + panic!("put should not be called during listing"); + } + + fn copy_atomic(&self, _src: &Url, _dest: &Url) -> DeltaResult<()> { + panic!("copy_atomic should not be called during listing"); + } + + fn head(&self, _path: &Url) -> DeltaResult { + panic!("head should not be called during listing"); + } + } + /// Helper to call `LogSegmentFiles::list()` and destructure the result for assertions. /// Returns (ascending_commit_files, ascending_compaction_files, checkpoint_parts, /// latest_crc_file, latest_commit_file, max_published_version). @@ -575,6 +712,8 @@ mod list_log_files_with_log_tail_tests { ) } + // ===== list() tests ===== + #[tokio::test] async fn test_empty_log_tail() { let log_files = vec![ @@ -901,4 +1040,438 @@ mod list_log_files_with_log_tail_tests { // max_published_version reflects all published commits seen (filesystem 0-5 + log_tail 6-10) assert_eq!(max_pub, Some(10)); } + + // ===== list_with_backward_checkpoint_scan() tests ===== + + // Log from v0 to v1005. Each case places an optional single-part checkpoint and + // verifies the expected commits, checkpoint version, and number of storage listings. + // + // Window boundaries (window size=1000, end_version=1005, exclusive upper): + // Window 1: [6, 1006) covers v6..=v1005 + // Window 2: [0, 6) covers v0..=v5 + // + // A checkpoint at v6+ is found in window 1 (1 listing); at v5 or lower in window 2 + // (2 listings). A checkpoint beyond end_version is never seen. + #[rstest] + // No checkpoint: scan exhausts both windows, all 1006 commits returned + #[case::no_checkpoint(None, 0..=1005, None, 2)] + // Checkpoint beyond end_version is never seen; same behavior as no checkpoint + #[case::checkpoint_beyond_end(Some(1006), 0..=1005, None, 2)] + // Checkpoint at end_version: found in window 1, no commits after it + #[case::checkpoint_at_end(Some(1005), 0..0, Some(1005), 1)] + // Checkpoint at v5: falls in window 2 -> 2 listings; commits 6..=1005 returned. + // Tests the inclusive window boundary: window 1 covers [6, 1006) or [6, 1005] (lower = 1006 - 1000 = 6), + // so v5 falls just outside it and requires a second listing, while v6 (next case) does not. + #[case::checkpoint_in_second_window(Some(5), 6..=1005, Some(5), 2)] + // Checkpoint at v6: falls in window 1 -> 1 listing; commits 7..=1005 returned + #[case::checkpoint_in_first_window(Some(6), 7..=1005, Some(6), 1)] + #[tokio::test] + async fn backward_scan_single_checkpoint_cases( + #[case] checkpoint_version: Option, + #[case] expected_commits: impl Iterator, + #[case] expected_checkpoint: Option, + #[case] expected_listings: u32, + ) { + let mut log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=1005) + .map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem)) + .collect(); + + if let Some(cp) = checkpoint_version { + log_files.push(( + cp, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + )); + } + + let (storage, log_root) = create_storage(log_files).await; + let counter = CountingStorageHandler::new(storage); + + let result = + LogSegmentFiles::list_with_backward_checkpoint_scan(&counter, &log_root, vec![], 1005) + .unwrap(); + + assert_eq!(counter.call_count(), expected_listings); + + assert_eq!( + result.checkpoint_parts.len(), + if expected_checkpoint.is_some() { 1 } else { 0 } + ); + if let Some(cp_version) = expected_checkpoint { + assert_eq!(result.checkpoint_parts[0].version, cp_version); + } + + assert!(result + .ascending_commit_files + .iter() + .map(|f| f.version) + .eq(expected_commits)); + } + + /// end_version=3000. Window 2 contains an incomplete 2-of-2 multipart checkpoint (only + /// part 1 present). find_complete_checkpoint_version must return None for window 2, causing + /// the scan to continue to window 3, where a complete single-part checkpoint at v500 is + /// found. Verifies that incomplete parts from window 2 are discarded and do not pollute + /// the result's checkpoint_parts. + /// + /// Window 1 [2001, 3001): commits v2001..=v3000, no checkpoint -> continue + /// Window 2 [1001, 2001): commits v1001..=v2000, v1500 (1-of-2 parts) incomplete -> continue + /// Window 3 [1, 1001): commits v1..=v1000, v500 (complete) -> checkpoint found -> break + fn files_incomplete_in_second_window_complete_in_third_window( + ) -> Vec<(Version, LogPathFileType, CommitSource)> { + let mut log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=3000) + .map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem)) + .collect(); + log_files.push(( + 500, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + )); + log_files.push(( + 1500, + LogPathFileType::MultiPartCheckpoint { + part_num: 1, + num_parts: 2, + }, + CommitSource::Filesystem, + )); + log_files + } + fn multipart_checkpoint_files() -> Vec<(Version, LogPathFileType, CommitSource)> { + // Log v0..=v52 with a complete 3-part checkpoint at v50. + // Single window [0, 53): checkpoint found -> stop. + let mut log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=52) + .map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem)) + .collect(); + log_files.extend([ + ( + 50, + LogPathFileType::MultiPartCheckpoint { + part_num: 1, + num_parts: 3, + }, + CommitSource::Filesystem, + ), + ( + 50, + LogPathFileType::MultiPartCheckpoint { + part_num: 2, + num_parts: 3, + }, + CommitSource::Filesystem, + ), + ( + 50, + LogPathFileType::MultiPartCheckpoint { + part_num: 3, + num_parts: 3, + }, + CommitSource::Filesystem, + ), + ]); + log_files + } + + struct BackwardScanExpected { + listings: u32, + checkpoint_parts: usize, + checkpoint_version: Version, + commit_count: usize, + first_commit: Version, + last_commit: Version, + } + + // Case 1: complete 3-part checkpoint at v50, single window needed + // Case 2: incomplete 1-of-2 part at v1500 in window 2, complete checkpoint at v500 in window 3 + #[rstest] + #[case::multipart_checkpoint( + multipart_checkpoint_files(), + 52, + BackwardScanExpected { listings: 1, checkpoint_parts: 3, checkpoint_version: 50, commit_count: 2, first_commit: 51, last_commit: 52 } + )] + #[case::incomplete_in_second_window_complete_in_third( + files_incomplete_in_second_window_complete_in_third_window(), + 3000, + BackwardScanExpected { listings: 3, checkpoint_parts: 1, checkpoint_version: 500, commit_count: 2500, first_commit: 501, last_commit: 3000 } + )] + #[tokio::test] + async fn backward_scan_multipart_checkpoint_cases( + #[case] log_files: Vec<(Version, LogPathFileType, CommitSource)>, + #[case] end_version: Version, + #[case] expected: BackwardScanExpected, + ) { + let BackwardScanExpected { + listings: expected_listings, + checkpoint_parts: expected_checkpoint_parts, + checkpoint_version: expected_checkpoint_version, + commit_count: expected_commit_count, + first_commit: expected_first_commit, + last_commit: expected_last_commit, + } = expected; + let (storage, log_root) = create_storage(log_files).await; + let counter = CountingStorageHandler::new(storage); + + let result = LogSegmentFiles::list_with_backward_checkpoint_scan( + &counter, + &log_root, + vec![], + end_version, + ) + .unwrap(); + + assert_eq!(counter.call_count(), expected_listings); + assert_eq!(result.checkpoint_parts.len(), expected_checkpoint_parts); + assert!(result + .checkpoint_parts + .iter() + .all(|p| p.version == expected_checkpoint_version)); + assert_eq!(result.ascending_commit_files.len(), expected_commit_count); + assert_eq!( + result.ascending_commit_files.first().unwrap().version, + expected_first_commit + ); + assert_eq!( + result.ascending_commit_files.last().unwrap().version, + expected_last_commit + ); + assert_eq!( + result.latest_commit_file.unwrap().version, + expected_last_commit + ); + } + + #[tokio::test] + async fn backward_scan_with_log_tail_derives_lower_bound_from_checkpoint() { + // FS: commits v0..=v7 + checkpoint at v5. log_tail: catalog commits v8..=v10. + // The checkpoint at v5 sets the lower bound to v6, so FS commits v6 and v7 plus all + // catalog entries v8..=v10 are included. + let mut log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=7) + .map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem)) + .collect(); + log_files.push(( + 5, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + )); + let (storage, log_root) = create_storage(log_files).await; + + let log_tail: Vec<_> = (8u64..=10) + .map(|v| { + make_parsed_log_path_with_source(v, LogPathFileType::Commit, CommitSource::Catalog) + }) + .collect(); + + let result = LogSegmentFiles::list_with_backward_checkpoint_scan( + storage.as_ref(), + &log_root, + log_tail, + 10, + ) + .unwrap(); + + assert_eq!(result.checkpoint_parts.len(), 1); + assert_eq!(result.checkpoint_parts[0].version, 5); + + // FS commits v6, v7 after the checkpoint; catalog commits v8..=v10 + let expected = [ + (6, CommitSource::Filesystem), + (7, CommitSource::Filesystem), + (8, CommitSource::Catalog), + (9, CommitSource::Catalog), + (10, CommitSource::Catalog), + ]; + assert_eq!(result.ascending_commit_files.len(), expected.len()); + for (file, (version, source)) in result.ascending_commit_files.iter().zip(expected) { + assert_eq!(file.version, version); + assert_source(file, source); + } + assert_eq!(result.latest_commit_file.unwrap().version, 10); + } + + #[tokio::test] + async fn backward_scan_with_log_tail_starting_before_checkpoint() { + // FS: commits v0..=v5 + checkpoint at v5 + CRC at v6. log_tail: catalog commits v3..=v8, + // starting before the checkpoint. The checkpoint at v5 sets the lower bound to v5, so + // log_tail v3..=v4 are excluded. The log_tail commit at v5 passes through (it is at the + // checkpoint version). The CRC at v6 is preserved even though v6 is within the log_tail range. + let mut log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=5) + .map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem)) + .collect(); + log_files.push(( + 5, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + )); + log_files.push((6, LogPathFileType::Crc, CommitSource::Filesystem)); + let (storage, log_root) = create_storage(log_files).await; + + let log_tail: Vec<_> = (3u64..=8) + .map(|v| { + make_parsed_log_path_with_source(v, LogPathFileType::Commit, CommitSource::Catalog) + }) + .collect(); + + let result = LogSegmentFiles::list_with_backward_checkpoint_scan( + storage.as_ref(), + &log_root, + log_tail, + 8, + ) + .unwrap(); + + assert_eq!(result.checkpoint_parts.len(), 1); + assert_eq!(result.checkpoint_parts[0].version, 5); + + // CRC at v6 is preserved even though v6 is within the log_tail range + let crc = result.latest_crc_file.unwrap(); + assert_eq!(crc.version, 6); + assert!(matches!(crc.file_type, LogPathFileType::Crc)); + + // v5 passes the start version filter (>= 5) and is included here + assert_eq!(result.ascending_commit_files.len(), 4); + for (i, commit) in result.ascending_commit_files.iter().enumerate() { + assert_eq!(commit.version, (i + 5) as u64); + assert_source(commit, CommitSource::Catalog); + } + assert_eq!(result.latest_commit_file.unwrap().version, 8); + } + + #[tokio::test] + async fn backward_scan_log_tail_defines_latest_version() { + // FS: commits v0..=v5. log_tail: catalog commit v4. end_version=5. + // FS v4 and v5 are filtered since log_tail_start=4. max_published_version is Some(5), + // the highest FS commit seen within end_version, even though v5 is not in + // ascending_commit_files. + let log_files: Vec<(Version, LogPathFileType, CommitSource)> = (0u64..=5) + .map(|v| (v, LogPathFileType::Commit, CommitSource::Filesystem)) + .collect(); + let (storage, log_root) = create_storage(log_files).await; + + let log_tail = vec![make_parsed_log_path_with_source( + 4, + LogPathFileType::Commit, + CommitSource::Catalog, + )]; + + let result = LogSegmentFiles::list_with_backward_checkpoint_scan( + storage.as_ref(), + &log_root, + log_tail, + 5, + ) + .unwrap(); + + let expected = [ + (0, CommitSource::Filesystem), + (1, CommitSource::Filesystem), + (2, CommitSource::Filesystem), + (3, CommitSource::Filesystem), + (4, CommitSource::Catalog), + ]; + assert_eq!(result.ascending_commit_files.len(), expected.len()); + for (file, (version, source)) in result.ascending_commit_files.iter().zip(expected) { + assert_eq!(file.version, version); + assert_source(file, source); + } + assert_eq!(result.latest_commit_file.unwrap().version, 4); + assert_eq!(result.max_published_version, Some(5)); + } + + // ===== find_complete_checkpoint_version direct unit tests (other cases already covered by tests above) ===== + + fn zero_size_checkpoint_files() -> Vec { + // Commits v0..=5 plus a zero-size checkpoint at v3. make_parsed_log_path_with_source + // always sets a non-zero size; override here to simulate a corrupt/empty checkpoint + // file and exercise the size > 0 guard. + let mut files: Vec = (0..=5) + .map(|v| { + make_parsed_log_path_with_source( + v, + LogPathFileType::Commit, + CommitSource::Filesystem, + ) + }) + .collect(); + let mut cp = make_parsed_log_path_with_source( + 3, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + ); + cp.location.size = 0; + files.push(cp); + files + } + + fn incomplete_then_complete_files() -> Vec { + // Commits v0..=10, an incomplete checkpoint at v5 (1 of 3 parts), and a complete + // checkpoint at v10. find_complete_checkpoint_version must continue past the failed group + // and find the complete one. + let mut files: Vec = (0..=10) + .map(|v| { + make_parsed_log_path_with_source( + v, + LogPathFileType::Commit, + CommitSource::Filesystem, + ) + }) + .collect(); + files.push(make_parsed_log_path_with_source( + 5, + LogPathFileType::MultiPartCheckpoint { + part_num: 1, + num_parts: 3, + }, + CommitSource::Filesystem, + )); + files.push(make_parsed_log_path_with_source( + 10, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + )); + files + } + + fn two_complete_checkpoints_files() -> Vec { + // Commits v0..=10, complete checkpoint at v5 and complete checkpoint at v10. + // The function must return the latest (v10), not the first (v5). + let mut files: Vec = (0..=10) + .map(|v| { + make_parsed_log_path_with_source( + v, + LogPathFileType::Commit, + CommitSource::Filesystem, + ) + }) + .collect(); + files.push(make_parsed_log_path_with_source( + 5, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + )); + files.push(make_parsed_log_path_with_source( + 10, + LogPathFileType::SinglePartCheckpoint, + CommitSource::Filesystem, + )); + files + } + + #[rstest] + // Commits v0..=5, no checkpoint files + #[case::no_checkpoint( + (0u64..=5).map(|v| make_parsed_log_path_with_source(v, LogPathFileType::Commit, CommitSource::Filesystem)).collect(), + None + )] + // Commits v0..=5 plus a zero-size (corrupt) checkpoint at v3 + #[case::zero_size_checkpoint(zero_size_checkpoint_files(), None)] + // Commits v0..=10, incomplete checkpoint at v5, complete checkpoint at v10 + #[case::incomplete_then_complete(incomplete_then_complete_files(), Some(10))] + // Commits v0..=10, complete checkpoint at v5 and v10: must return v10 (latest) + #[case::two_complete(two_complete_checkpoints_files(), Some(10))] + fn find_complete_checkpoint_version_cases( + #[case] files: Vec, + #[case] expected: Option, + ) { + assert_eq!(find_complete_checkpoint_version(&files), expected); + } }