Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,23 @@ async fn test_fuzz_topk_filter_pushdown() {
.map(|col| orders.get(**col).unwrap())
.multi_cartesian_product()
{
// Add remaining columns as tiebreakers (ASC NULLS LAST)
// to ensure deterministic results when RG reorder changes
// the read order of rows with equal sort key values.
let tiebreakers: Vec<String> = ["id", "name", "department"]
.iter()
.filter(|c| {
!order_columns
.iter()
.take(num_order_by_columns)
.any(|oc| **oc == **c)
})
.map(|c| format!("{c} ASC NULLS LAST"))
.collect();
let all_orderings =
orderings.into_iter().chain(tiebreakers.iter()).join(", ");
let query = format!(
"SELECT * FROM test_table ORDER BY {} LIMIT {}",
orderings.into_iter().join(", "),
limit
"SELECT * FROM test_table ORDER BY {all_orderings} LIMIT {limit}",
);
queries.push(query);
}
Expand Down
279 changes: 279 additions & 0 deletions datafusion/datasource-parquet/src/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,28 @@
// under the License.

use crate::sort::reverse_row_selection;
use arrow::array::{Array, ArrayRef, BooleanArray};
use arrow::datatypes::Schema;
use datafusion_common::{Result, assert_eq_or_internal_err};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use log::debug;
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};

/// Fraction of adjacent (in sorted-by-min order) row group pairs whose
/// `[min, max]` ranges overlap above which `reorder_by_statistics` will
/// bail out without reordering.
///
/// When stats overlap heavily (e.g. unsorted columns like ClickBench's
/// `EventTime` on `hits_partitioned`), reordering by min cannot enable
/// row-group-level pruning — every "later" RG still has values that
/// could appear in TopK. The reorder cost (CPU sort + lost IO sequential
/// locality + parallel scheduling pessimization across workers all
/// pulling "best" RGs first) then dominates, producing a net regression.
const REORDER_OVERLAP_SKIP_THRESHOLD: f64 = 0.5;

/// A selection of rows and row groups within a ParquetFile to decode.
///
/// A `ParquetAccessPlan` is used to limit the row groups and data pages a `DataSourceExec`
Expand Down Expand Up @@ -377,6 +395,120 @@ impl PreparedAccessPlan {
})
}

/// Reorder row groups by their min statistics for the given sort order.
///
/// This helps TopK queries find optimal values first. Row groups are
/// always sorted by min values in ASC order — direction (DESC) is
/// handled separately by `reverse()` which is applied after reorder.
///
/// Gracefully skips reordering when:
/// - There is a row_selection (too complex to remap)
/// - 0 or 1 row groups (nothing to reorder)
/// - Sort expression is not a simple column reference
/// - Statistics are unavailable
pub(crate) fn reorder_by_statistics(
mut self,
sort_order: &LexOrdering,
file_metadata: &ParquetMetaData,
arrow_schema: &Schema,
) -> Result<Self> {
// Skip if row_selection present (too complex to remap)
if self.row_selection.is_some() {
debug!("Skipping RG reorder: row_selection present");
return Ok(self);
}

// Nothing to reorder
if self.row_group_indexes.len() <= 1 {
return Ok(self);
}

let first_sort_expr = sort_order.first();

// Extract column name from sort expression
let column: &Column = match first_sort_expr.expr.downcast_ref::<Column>() {
Some(col) => col,
None => {
debug!("Skipping RG reorder: sort expr is not a simple column");
return Ok(self);
}
};

// Build statistics converter for this column
let converter = match StatisticsConverter::try_new(
column.name(),
arrow_schema,
file_metadata.file_metadata().schema_descr(),
) {
Ok(c) => c,
Err(e) => {
debug!("Skipping RG reorder: cannot create stats converter: {e}");
return Ok(self);
}
};

// Always sort ASC by min values — direction is handled by reverse
let rg_metadata: Vec<&RowGroupMetaData> = self
.row_group_indexes
.iter()
.map(|&idx| file_metadata.row_group(idx))
.collect();

let stat_mins = match converter.row_group_mins(rg_metadata.iter().copied()) {
Ok(vals) => vals,
Err(e) => {
debug!("Skipping RG reorder: cannot get min values: {e}");
return Ok(self);
}
};
let stat_maxes = match converter.row_group_maxes(rg_metadata.iter().copied()) {
Ok(vals) => vals,
Err(e) => {
debug!("Skipping RG reorder: cannot get max values: {e}");
return Ok(self);
}
};

let sort_options = arrow::compute::SortOptions {
descending: false,
nulls_first: first_sort_expr.options.nulls_first,
};
let sorted_indices =
match arrow::compute::sort_to_indices(&stat_mins, Some(sort_options), None) {
Ok(indices) => indices,
Err(e) => {
debug!("Skipping RG reorder: sort failed: {e}");
return Ok(self);
}
};

// Bail out when adjacent ranges overlap heavily: reordering by min
// would not enable row-group-level pruning and the reorder cost
// (sort CPU + lost IO locality + parallel scheduling pessimization)
// dominates. See [`REORDER_OVERLAP_SKIP_THRESHOLD`].
match adjacent_overlap_ratio(&stat_mins, &stat_maxes, &sorted_indices) {
Some(ratio) if ratio >= REORDER_OVERLAP_SKIP_THRESHOLD => {
debug!(
"Skipping RG reorder: adjacent stats overlap {:.0}% (>= {:.0}% threshold)",
ratio * 100.0,
REORDER_OVERLAP_SKIP_THRESHOLD * 100.0
);
return Ok(self);
}
_ => {}
}

// Apply the reordering
let original_indexes = self.row_group_indexes.clone();
self.row_group_indexes = sorted_indices
.values()
.iter()
.map(|&i| original_indexes[i as usize])
.collect();

Ok(self)
}

/// Reverse the access plan for reverse scanning
pub(crate) fn reverse(mut self, file_metadata: &ParquetMetaData) -> Result<Self> {
// Get the row group indexes before reversing
Expand All @@ -398,15 +530,162 @@ impl PreparedAccessPlan {
}
}

/// Compute the fraction of adjacent (in sorted-by-min order) row group
/// pairs whose `[min, max]` ranges overlap.
///
/// Two ranges overlap when the later RG's min is `<=` the earlier RG's
/// max. Null mins or maxes (RGs without statistics) are treated as
/// overlapping (conservative — discourages reorder when stats are missing).
///
/// Returns `None` if there are fewer than two row groups, or if the
/// arrow comparison fails (in which case the caller should treat the
/// outcome as "do not skip").
fn adjacent_overlap_ratio(
mins: &ArrayRef,
maxes: &ArrayRef,
sorted_indices: &arrow::array::UInt32Array,
) -> Option<f64> {
let n = sorted_indices.len();
if n < 2 {
return None;
}
// Reorder mins and maxes into sorted-by-min order so we can compare
// adjacent pairs directly.
let mins_sorted = arrow::compute::take(mins.as_ref(), sorted_indices, None).ok()?;
let maxes_sorted = arrow::compute::take(maxes.as_ref(), sorted_indices, None).ok()?;

// Compare mins_sorted[1..n] against maxes_sorted[0..n-1]: an overlap
// exists when the next min is <= the previous max.
let mins_next: ArrayRef = mins_sorted.slice(1, n - 1);
let maxes_prev: ArrayRef = maxes_sorted.slice(0, n - 1);
let cmp =
arrow::compute::kernels::cmp::lt_eq(&mins_next.as_ref(), &maxes_prev.as_ref())
.ok()?;
let overlap_count = overlap_count_with_null_overlap(&cmp, &mins_next, &maxes_prev);
Some(overlap_count as f64 / (n - 1) as f64)
}

/// Count adjacent overlaps, treating null comparisons (caused by null mins
/// or maxes) as overlaps so that missing statistics do not silently disable
/// the overlap guard.
fn overlap_count_with_null_overlap(
cmp: &BooleanArray,
mins_next: &ArrayRef,
maxes_prev: &ArrayRef,
) -> usize {
let n = cmp.len();
let mut overlaps = 0;
for i in 0..n {
let either_null = mins_next.is_null(i) || maxes_prev.is_null(i);
if either_null {
overlaps += 1;
continue;
}
// cmp.value(i) is meaningful since neither input was null.
if !cmp.is_null(i) && cmp.value(i) {
overlaps += 1;
}
}
overlaps
}

#[cfg(test)]
mod test {
use super::*;
use arrow::array::{Int32Array, UInt32Array};
use datafusion_common::assert_contains;
use parquet::basic::LogicalType;
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
use std::sync::{Arc, LazyLock};

fn ratio_for(mins: Vec<Option<i32>>, maxes: Vec<Option<i32>>) -> Option<f64> {
let mins: ArrayRef = Arc::new(Int32Array::from(mins));
let maxes: ArrayRef = Arc::new(Int32Array::from(maxes));
let sorted_indices = arrow::compute::sort_to_indices(
&mins,
Some(arrow::compute::SortOptions {
descending: false,
nulls_first: false,
}),
None,
)
.unwrap();
adjacent_overlap_ratio(&mins, &maxes, &sorted_indices)
}

#[test]
fn overlap_ratio_disjoint_sorted() {
// [0,10] [20,30] [40,50] — already sorted, no overlap
let r = ratio_for(
vec![Some(0), Some(20), Some(40)],
vec![Some(10), Some(30), Some(50)],
);
assert_eq!(r, Some(0.0));
}

#[test]
fn overlap_ratio_disjoint_after_reorder() {
// [40,50] [0,10] [20,30] — fully overlapping in original order, but
// sorted-by-min order is disjoint; the helper sees the sorted view.
let r = ratio_for(
vec![Some(40), Some(0), Some(20)],
vec![Some(50), Some(10), Some(30)],
);
assert_eq!(r, Some(0.0));
}

#[test]
fn overlap_ratio_fully_overlapping() {
// All RGs cover [0, 100] — every adjacent pair in sorted order overlaps
let r = ratio_for(
vec![Some(0), Some(0), Some(0), Some(0)],
vec![Some(100), Some(100), Some(100), Some(100)],
);
assert_eq!(r, Some(1.0));
}

#[test]
fn overlap_ratio_partial() {
// Sorted-by-min: [0,15] [10,25] [30,40] [35,50]
// pair 0: 10 <= 15 -> overlap
// pair 1: 30 <= 25 -> no
// pair 2: 35 <= 40 -> overlap
// 2 / 3
let r = ratio_for(
vec![Some(0), Some(10), Some(30), Some(35)],
vec![Some(15), Some(25), Some(40), Some(50)],
);
let r = r.unwrap();
assert!((r - 2.0 / 3.0).abs() < 1e-9, "expected ~0.667, got {r}");
}

#[test]
fn overlap_ratio_null_max_in_prev_is_overlap() {
// Sorted-by-min order: [0, null] [20, 30]. The first RG's max is
// unknown, so we cannot prove the pair is disjoint and conservatively
// count it as an overlap.
let r = ratio_for(vec![Some(0), Some(20)], vec![None, Some(30)]);
assert_eq!(r, Some(1.0));
}

#[test]
fn overlap_ratio_null_min_in_next_is_overlap() {
// Sorted-by-min order: [0, 10] [null, 20]. The second RG's min is
// unknown, so the comparison is null and conservatively counts as
// overlap.
let r = ratio_for(vec![Some(0), None], vec![Some(10), Some(20)]);
assert_eq!(r, Some(1.0));
}

#[test]
fn overlap_ratio_too_few_rgs() {
let mins: ArrayRef = Arc::new(Int32Array::from(vec![Some(0)]));
let maxes: ArrayRef = Arc::new(Int32Array::from(vec![Some(10)]));
let sorted = UInt32Array::from(vec![0u32]);
assert!(adjacent_overlap_ratio(&mins, &maxes, &sorted).is_none());
}

#[test]
fn test_only_scans() {
let access_plan = ParquetAccessPlan::new(vec![
Expand Down
Loading
Loading