Skip to content

Commit b196f3c

Browse files
committed
perf(parquet): skip RG reorder when stats overlap heavily
Adds an overlap guard to `PreparedAccessPlan::reorder_by_statistics`: when sorted-by-min adjacent row-group `[min, max]` ranges overlap above 50%, reordering cannot enable RG-level pruning (every "later" RG still has values that could appear in TopK results) so the reorder cost — CPU sort, lost IO sequential locality, and parallel scheduling pessimization across workers all pulling "best" RGs first — dominates, producing a net regression. This addresses the ClickBench `hits_partitioned` regressions (Q24, Q25, Q26: 1.11x-1.44x slower) where `EventTime` and `SearchPhrase` are uniformly distributed across row groups via the user-id hash partitioning, so RG min/max ranges fully overlap and reorder cannot help. Sorted / non-overlapping data continues to take the reorder path — verified by `sort_pushdown.slt` Test H still passing. The overlap helper treats null mins/maxes (RGs without statistics) conservatively as overlaps, so missing stats discourage rather than silently disable the guard. Adds 7 unit tests covering: fully disjoint, disjoint after reorder, fully overlapping, partial overlap, null max in previous, null min in next, and the n<2 edge case.
1 parent f7d9156 commit b196f3c

1 file changed

Lines changed: 185 additions & 3 deletions

File tree

datafusion/datasource-parquet/src/access_plan.rs

Lines changed: 185 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
use crate::sort::reverse_row_selection;
19+
use arrow::array::{Array, ArrayRef, BooleanArray};
1920
use arrow::datatypes::Schema;
2021
use datafusion_common::{Result, assert_eq_or_internal_err};
2122
use datafusion_physical_expr::expressions::Column;
@@ -25,6 +26,18 @@ use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
2526
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
2627
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
2728

29+
/// Fraction of adjacent (in sorted-by-min order) row group pairs whose
30+
/// `[min, max]` ranges overlap above which `reorder_by_statistics` will
31+
/// bail out without reordering.
32+
///
33+
/// When stats overlap heavily (e.g. unsorted columns like ClickBench's
34+
/// `EventTime` on `hits_partitioned`), reordering by min cannot enable
35+
/// row-group-level pruning — every "later" RG still has values that
36+
/// could appear in TopK. The reorder cost (CPU sort + lost IO sequential
37+
/// locality + parallel scheduling pessimization across workers all
38+
/// pulling "best" RGs first) then dominates, producing a net regression.
39+
const REORDER_OVERLAP_SKIP_THRESHOLD: f64 = 0.5;
40+
2841
/// A selection of rows and row groups within a ParquetFile to decode.
2942
///
3043
/// A `ParquetAccessPlan` is used to limit the row groups and data pages a `DataSourceExec`
@@ -441,28 +454,50 @@ impl PreparedAccessPlan {
441454
.map(|&idx| file_metadata.row_group(idx))
442455
.collect();
443456

444-
let stat_values = match converter.row_group_mins(rg_metadata.iter().copied()) {
457+
let stat_mins = match converter.row_group_mins(rg_metadata.iter().copied()) {
445458
Ok(vals) => vals,
446459
Err(e) => {
447460
debug!("Skipping RG reorder: cannot get min values: {e}");
448461
return Ok(self);
449462
}
450463
};
464+
let stat_maxes = match converter.row_group_maxes(rg_metadata.iter().copied()) {
465+
Ok(vals) => vals,
466+
Err(e) => {
467+
debug!("Skipping RG reorder: cannot get max values: {e}");
468+
return Ok(self);
469+
}
470+
};
451471

452472
let sort_options = arrow::compute::SortOptions {
453473
descending: false,
454474
nulls_first: first_sort_expr.options.nulls_first,
455475
};
456476
let sorted_indices =
457-
match arrow::compute::sort_to_indices(&stat_values, Some(sort_options), None)
458-
{
477+
match arrow::compute::sort_to_indices(&stat_mins, Some(sort_options), None) {
459478
Ok(indices) => indices,
460479
Err(e) => {
461480
debug!("Skipping RG reorder: sort failed: {e}");
462481
return Ok(self);
463482
}
464483
};
465484

485+
// Bail out when adjacent ranges overlap heavily: reordering by min
486+
// would not enable row-group-level pruning and the reorder cost
487+
// (sort CPU + lost IO locality + parallel scheduling pessimization)
488+
// dominates. See [`REORDER_OVERLAP_SKIP_THRESHOLD`].
489+
match adjacent_overlap_ratio(&stat_mins, &stat_maxes, &sorted_indices) {
490+
Some(ratio) if ratio >= REORDER_OVERLAP_SKIP_THRESHOLD => {
491+
debug!(
492+
"Skipping RG reorder: adjacent stats overlap {:.0}% (>= {:.0}% threshold)",
493+
ratio * 100.0,
494+
REORDER_OVERLAP_SKIP_THRESHOLD * 100.0
495+
);
496+
return Ok(self);
497+
}
498+
_ => {}
499+
}
500+
466501
// Apply the reordering
467502
let original_indexes = self.row_group_indexes.clone();
468503
self.row_group_indexes = sorted_indices
@@ -495,15 +530,162 @@ impl PreparedAccessPlan {
495530
}
496531
}
497532

533+
/// Compute the fraction of adjacent (in sorted-by-min order) row group
534+
/// pairs whose `[min, max]` ranges overlap.
535+
///
536+
/// Two ranges overlap when the later RG's min is `<=` the earlier RG's
537+
/// max. Null mins or maxes (RGs without statistics) are treated as
538+
/// overlapping (conservative — discourages reorder when stats are missing).
539+
///
540+
/// Returns `None` if there are fewer than two row groups, or if the
541+
/// arrow comparison fails (in which case the caller should treat the
542+
/// outcome as "do not skip").
543+
fn adjacent_overlap_ratio(
544+
mins: &ArrayRef,
545+
maxes: &ArrayRef,
546+
sorted_indices: &arrow::array::UInt32Array,
547+
) -> Option<f64> {
548+
let n = sorted_indices.len();
549+
if n < 2 {
550+
return None;
551+
}
552+
// Reorder mins and maxes into sorted-by-min order so we can compare
553+
// adjacent pairs directly.
554+
let mins_sorted = arrow::compute::take(mins.as_ref(), sorted_indices, None).ok()?;
555+
let maxes_sorted = arrow::compute::take(maxes.as_ref(), sorted_indices, None).ok()?;
556+
557+
// Compare mins_sorted[1..n] against maxes_sorted[0..n-1]: an overlap
558+
// exists when the next min is <= the previous max.
559+
let mins_next: ArrayRef = mins_sorted.slice(1, n - 1);
560+
let maxes_prev: ArrayRef = maxes_sorted.slice(0, n - 1);
561+
let cmp =
562+
arrow::compute::kernels::cmp::lt_eq(&mins_next.as_ref(), &maxes_prev.as_ref())
563+
.ok()?;
564+
let overlap_count = overlap_count_with_null_overlap(&cmp, &mins_next, &maxes_prev);
565+
Some(overlap_count as f64 / (n - 1) as f64)
566+
}
567+
568+
/// Count adjacent overlaps, treating null comparisons (caused by null mins
569+
/// or maxes) as overlaps so that missing statistics do not silently disable
570+
/// the overlap guard.
571+
fn overlap_count_with_null_overlap(
572+
cmp: &BooleanArray,
573+
mins_next: &ArrayRef,
574+
maxes_prev: &ArrayRef,
575+
) -> usize {
576+
let n = cmp.len();
577+
let mut overlaps = 0;
578+
for i in 0..n {
579+
let either_null = mins_next.is_null(i) || maxes_prev.is_null(i);
580+
if either_null {
581+
overlaps += 1;
582+
continue;
583+
}
584+
// cmp.value(i) is meaningful since neither input was null.
585+
if !cmp.is_null(i) && cmp.value(i) {
586+
overlaps += 1;
587+
}
588+
}
589+
overlaps
590+
}
591+
498592
#[cfg(test)]
499593
mod test {
500594
use super::*;
595+
use arrow::array::{Int32Array, UInt32Array};
501596
use datafusion_common::assert_contains;
502597
use parquet::basic::LogicalType;
503598
use parquet::file::metadata::ColumnChunkMetaData;
504599
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
505600
use std::sync::{Arc, LazyLock};
506601

602+
fn ratio_for(mins: Vec<Option<i32>>, maxes: Vec<Option<i32>>) -> Option<f64> {
603+
let mins: ArrayRef = Arc::new(Int32Array::from(mins));
604+
let maxes: ArrayRef = Arc::new(Int32Array::from(maxes));
605+
let sorted_indices = arrow::compute::sort_to_indices(
606+
&mins,
607+
Some(arrow::compute::SortOptions {
608+
descending: false,
609+
nulls_first: false,
610+
}),
611+
None,
612+
)
613+
.unwrap();
614+
adjacent_overlap_ratio(&mins, &maxes, &sorted_indices)
615+
}
616+
617+
#[test]
618+
fn overlap_ratio_disjoint_sorted() {
619+
// [0,10] [20,30] [40,50] — already sorted, no overlap
620+
let r = ratio_for(
621+
vec![Some(0), Some(20), Some(40)],
622+
vec![Some(10), Some(30), Some(50)],
623+
);
624+
assert_eq!(r, Some(0.0));
625+
}
626+
627+
#[test]
628+
fn overlap_ratio_disjoint_after_reorder() {
629+
// [40,50] [0,10] [20,30] — fully overlapping in original order, but
630+
// sorted-by-min order is disjoint; the helper sees the sorted view.
631+
let r = ratio_for(
632+
vec![Some(40), Some(0), Some(20)],
633+
vec![Some(50), Some(10), Some(30)],
634+
);
635+
assert_eq!(r, Some(0.0));
636+
}
637+
638+
#[test]
639+
fn overlap_ratio_fully_overlapping() {
640+
// All RGs cover [0, 100] — every adjacent pair in sorted order overlaps
641+
let r = ratio_for(
642+
vec![Some(0), Some(0), Some(0), Some(0)],
643+
vec![Some(100), Some(100), Some(100), Some(100)],
644+
);
645+
assert_eq!(r, Some(1.0));
646+
}
647+
648+
#[test]
649+
fn overlap_ratio_partial() {
650+
// Sorted-by-min: [0,15] [10,25] [30,40] [35,50]
651+
// pair 0: 10 <= 15 -> overlap
652+
// pair 1: 30 <= 25 -> no
653+
// pair 2: 35 <= 40 -> overlap
654+
// 2 / 3
655+
let r = ratio_for(
656+
vec![Some(0), Some(10), Some(30), Some(35)],
657+
vec![Some(15), Some(25), Some(40), Some(50)],
658+
);
659+
let r = r.unwrap();
660+
assert!((r - 2.0 / 3.0).abs() < 1e-9, "expected ~0.667, got {r}");
661+
}
662+
663+
#[test]
664+
fn overlap_ratio_null_max_in_prev_is_overlap() {
665+
// Sorted-by-min order: [0, null] [20, 30]. The first RG's max is
666+
// unknown, so we cannot prove the pair is disjoint and conservatively
667+
// count it as an overlap.
668+
let r = ratio_for(vec![Some(0), Some(20)], vec![None, Some(30)]);
669+
assert_eq!(r, Some(1.0));
670+
}
671+
672+
#[test]
673+
fn overlap_ratio_null_min_in_next_is_overlap() {
674+
// Sorted-by-min order: [0, 10] [null, 20]. The second RG's min is
675+
// unknown, so the comparison is null and conservatively counts as
676+
// overlap.
677+
let r = ratio_for(vec![Some(0), None], vec![Some(10), Some(20)]);
678+
assert_eq!(r, Some(1.0));
679+
}
680+
681+
#[test]
682+
fn overlap_ratio_too_few_rgs() {
683+
let mins: ArrayRef = Arc::new(Int32Array::from(vec![Some(0)]));
684+
let maxes: ArrayRef = Arc::new(Int32Array::from(vec![Some(10)]));
685+
let sorted = UInt32Array::from(vec![0u32]);
686+
assert!(adjacent_overlap_ratio(&mins, &maxes, &sorted).is_none());
687+
}
688+
507689
#[test]
508690
fn test_only_scans() {
509691
let access_plan = ParquetAccessPlan::new(vec![

0 commit comments

Comments
 (0)