Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
1b1b586
Add Dynamic work scheduling in FileStream
alamb Apr 3, 2026
5287210
fix: typos
alamb Apr 14, 2026
4cefb3e
Update datafusion/datasource/src/file_stream/mod.rs
alamb Apr 15, 2026
a28fbf1
Merge branch 'main' into alamb/reschedule_io
alamb Apr 16, 2026
cd79312
Add test for file opened/closed metrics
alamb Apr 16, 2026
c58a9a8
properly account for limit
alamb Apr 16, 2026
323a5ca
Merge remote-tracking branch 'apache/main' into alamb/reschedule_io
alamb Apr 16, 2026
910c520
feat: reorder row groups by statistics during sort pushdown
zhuqi-lucas Apr 13, 2026
10803e2
test: add SLT tests for row group reorder by statistics
zhuqi-lucas Apr 13, 2026
714477a
test: add EXPLAIN assertions for row group reorder tests
zhuqi-lucas Apr 13, 2026
768e44d
fix: use max statistics for DESC sort reorder
zhuqi-lucas Apr 13, 2026
1d06fe7
fix: prevent reorder+reverse double-reordering of row groups
zhuqi-lucas Apr 14, 2026
d135763
fix: rebase conflicts and compilation errors
zhuqi-lucas Apr 16, 2026
32b1b95
refactor: introduce AccessPlanOptimizer trait for row group reordering
zhuqi-lucas Apr 16, 2026
0a72fe0
chore: remove benchmark from this PR (tracked in #21582)
zhuqi-lucas Apr 16, 2026
ad8f356
fix: resolve doc link for AccessPlanOptimizer
zhuqi-lucas Apr 17, 2026
2eb2351
fix: restore benchmark files from upstream main
zhuqi-lucas Apr 17, 2026
10e6be9
fix: compose reorder and reverse as sequential steps instead of mutua…
zhuqi-lucas Apr 18, 2026
e195044
Merge PR #21580 (row-group reorder by stats) on top of PR #21351 (dyn…
Dandandan Apr 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

356 changes: 355 additions & 1 deletion datafusion/datasource-parquet/src/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
// under the License.

use crate::sort::reverse_row_selection;
use arrow::datatypes::Schema;
use datafusion_common::{Result, assert_eq_or_internal_err};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use log::debug;
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};

Expand Down Expand Up @@ -346,7 +351,6 @@ impl ParquetAccessPlan {
) -> Result<PreparedAccessPlan> {
let row_group_indexes = self.row_group_indexes();
let row_selection = self.into_overall_row_selection(row_group_meta_data)?;

PreparedAccessPlan::new(row_group_indexes, row_selection)
}
}
Expand Down Expand Up @@ -377,6 +381,108 @@ impl PreparedAccessPlan {
})
}

/// Reorder row groups by their min statistics for the given sort order.
///
/// This helps TopK queries find optimal values first. For ASC sort,
/// row groups with the smallest min values come first. For DESC sort,
/// row groups with the largest min values come first.
///
/// Gracefully skips reordering when:
/// - There is a row_selection (too complex to remap)
/// - 0 or 1 row groups (nothing to reorder)
/// - Sort expression is not a simple column reference
/// - Statistics are unavailable
pub(crate) fn reorder_by_statistics(
mut self,
sort_order: &LexOrdering,
file_metadata: &ParquetMetaData,
arrow_schema: &Schema,
) -> Result<Self> {
// Skip if row_selection present (too complex to remap)
if self.row_selection.is_some() {
debug!("Skipping RG reorder: row_selection present");
return Ok(self);
}

// Nothing to reorder
if self.row_group_indexes.len() <= 1 {
return Ok(self);
}

// Get the first sort expression
// LexOrdering is guaranteed non-empty, so first() returns &PhysicalSortExpr
let first_sort_expr = sort_order.first();

// Extract column name from sort expression
let column: &Column = match first_sort_expr.expr.downcast_ref::<Column>() {
Some(col) => col,
None => {
debug!("Skipping RG reorder: sort expr is not a simple column");
return Ok(self);
}
};

// Build statistics converter for this column
let converter = match StatisticsConverter::try_new(
column.name(),
arrow_schema,
file_metadata.file_metadata().schema_descr(),
) {
Ok(c) => c,
Err(e) => {
debug!("Skipping RG reorder: cannot create stats converter: {e}");
return Ok(self);
}
};

// Always sort by min values in ASC order to align row groups with
// the file's declared output ordering. Direction (DESC) is handled
// separately by ReverseRowGroups which is applied AFTER reorder.
//
// This composable design avoids the problem where reorder(DESC)
// followed by reverse would double-flip the order, and ensures
// that for already-sorted data, reorder is a no-op and reverse
// gives the correct DESC order (including placing small tail RGs first).
let rg_metadata: Vec<&RowGroupMetaData> = self
.row_group_indexes
.iter()
.map(|&idx| file_metadata.row_group(idx))
.collect();

let stat_values = match converter.row_group_mins(rg_metadata.iter().copied()) {
Ok(vals) => vals,
Err(e) => {
debug!("Skipping RG reorder: cannot get min values: {e}");
return Ok(self);
}
};

// Always sort ASC by min values — direction is handled by reverse
let sort_options = arrow::compute::SortOptions {
descending: false,
nulls_first: first_sort_expr.options.nulls_first,
};
let sorted_indices =
match arrow::compute::sort_to_indices(&stat_values, Some(sort_options), None)
{
Ok(indices) => indices,
Err(e) => {
debug!("Skipping RG reorder: sort failed: {e}");
return Ok(self);
}
};

// Apply the reordering
let original_indexes = self.row_group_indexes.clone();
self.row_group_indexes = sorted_indices
.values()
.iter()
.map(|&i| original_indexes[i as usize])
.collect();

Ok(self)
}

/// Reverse the access plan for reverse scanning
pub(crate) fn reverse(mut self, file_metadata: &ParquetMetaData) -> Result<Self> {
// Get the row group indexes before reversing
Expand Down Expand Up @@ -614,4 +720,252 @@ mod test {
.unwrap();
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
}

// ---- reorder_by_statistics tests ----

use arrow::datatypes::{DataType, Field, Schema};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use parquet::basic::Type as PhysicalType;
use parquet::file::metadata::FileMetaData;
use parquet::file::statistics::Statistics;
use parquet::schema::types::Type as SchemaType;

/// Create ParquetMetaData with row groups that have Int32 min/max stats
fn make_metadata_with_stats(min_max_pairs: &[(i32, i32)]) -> ParquetMetaData {
let field = SchemaType::primitive_type_builder("id", PhysicalType::INT32)
.build()
.unwrap();
let schema = SchemaType::group_type_builder("schema")
.with_fields(vec![Arc::new(field)])
.build()
.unwrap();
let schema_descr = Arc::new(SchemaDescriptor::new(Arc::new(schema)));

let row_groups: Vec<RowGroupMetaData> = min_max_pairs
.iter()
.map(|(min, max)| {
let stats =
Statistics::int32(Some(*min), Some(*max), None, Some(100), false);
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
.set_num_values(100)
.set_statistics(stats)
.build()
.unwrap();
RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(100)
.set_column_metadata(vec![column])
.build()
.unwrap()
})
.collect();

let file_meta = FileMetaData::new(
1,
min_max_pairs.len() as i64 * 100,
None,
None,
schema_descr,
None,
);
ParquetMetaData::new(file_meta, row_groups)
}

fn make_sort_order_asc() -> LexOrdering {
LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(Column::new(
"id", 0,
)))])
.unwrap()
}

fn make_sort_order_desc() -> LexOrdering {
use arrow::compute::SortOptions;
LexOrdering::new(vec![PhysicalSortExpr::new(
Arc::new(Column::new("id", 0)),
SortOptions {
descending: true,
nulls_first: false,
},
)])
.unwrap()
}

fn make_arrow_schema() -> Schema {
Schema::new(vec![Field::new("id", DataType::Int32, false)])
}

#[test]
fn test_reorder_by_statistics_asc() {
// RGs in wrong order: [50-99, 200-299, 1-30]
let metadata = make_metadata_with_stats(&[(50, 99), (200, 299), (1, 30)]);
let schema = make_arrow_schema();
let sort_order = make_sort_order_asc();

let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Should be reordered: RG2(1-30), RG0(50-99), RG1(200-299)
assert_eq!(plan.row_group_indexes, vec![2, 0, 1]);
}

#[test]
fn test_reorder_by_statistics_desc_sorts_asc() {
// reorder_by_statistics always sorts by min ASC regardless of sort
// direction. DESC is handled separately by ReverseRowGroups which
// is applied after reorder in the optimizer pipeline.
//
// RGs: [50-99, 200-299, 1-30]
let metadata = make_metadata_with_stats(&[(50, 99), (200, 299), (1, 30)]);
let schema = make_arrow_schema();
let sort_order = make_sort_order_desc();

let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Always ASC by min: RG2(min=1), RG0(min=50), RG1(min=200)
// Reverse is applied separately for DESC queries.
assert_eq!(plan.row_group_indexes, vec![2, 0, 1]);
}

#[test]
fn test_reorder_by_statistics_single_rg() {
let metadata = make_metadata_with_stats(&[(1, 100)]);
let schema = make_arrow_schema();
let sort_order = make_sort_order_asc();

let plan = PreparedAccessPlan::new(vec![0], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Single RG, no reorder
assert_eq!(plan.row_group_indexes, vec![0]);
}

#[test]
fn test_reorder_by_statistics_with_skipped_rgs() {
// 4 RGs but only 0, 2, 3 are selected (RG1 was pruned)
let metadata =
make_metadata_with_stats(&[(300, 400), (100, 200), (1, 50), (50, 99)]);
let schema = make_arrow_schema();
let sort_order = make_sort_order_asc();

let plan = PreparedAccessPlan::new(vec![0, 2, 3], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Reorder selected RGs by min: RG2(1-50), RG3(50-99), RG0(300-400)
assert_eq!(plan.row_group_indexes, vec![2, 3, 0]);
}

#[test]
fn test_reorder_by_statistics_skips_with_row_selection() {
let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]);
let schema = make_arrow_schema();
let sort_order = make_sort_order_asc();

let selection = RowSelection::from(vec![
RowSelector::select(50),
RowSelector::skip(50),
RowSelector::select(100),
]);

let plan = PreparedAccessPlan::new(vec![0, 1], Some(selection)).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Should NOT reorder because row_selection is present
assert_eq!(plan.row_group_indexes, vec![0, 1]);
}

#[test]
fn test_reorder_by_statistics_already_sorted() {
// Already in correct ASC order
let metadata = make_metadata_with_stats(&[(1, 30), (50, 99), (200, 299)]);
let schema = make_arrow_schema();
let sort_order = make_sort_order_asc();

let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Already sorted, order preserved
assert_eq!(plan.row_group_indexes, vec![0, 1, 2]);
}

#[test]
fn test_reorder_by_statistics_skips_non_column_expr() {
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::BinaryExpr;

let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]);
let schema = make_arrow_schema();

// Sort expression is a binary expression (id + 1), not a simple column
let expr = Arc::new(BinaryExpr::new(
Arc::new(Column::new("id", 0)),
Operator::Plus,
Arc::new(datafusion_physical_expr::expressions::Literal::new(
datafusion_common::ScalarValue::Int32(Some(1)),
)),
));
let sort_order =
LexOrdering::new(vec![PhysicalSortExpr::new_default(expr)]).unwrap();

let plan = PreparedAccessPlan::new(vec![0, 1], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Should NOT reorder because sort expr is not a simple column
assert_eq!(plan.row_group_indexes, vec![0, 1]);
}

#[test]
fn test_reorder_by_statistics_overlapping_rgs_sorts_asc() {
// Overlapping ranges — reorder always uses min ASC:
// RG0: 50-60
// RG1: 40-100 (lower min, wider range)
// RG2: 20-30 (lowest min)
//
// Sorted by min ASC: [RG2(20), RG1(40), RG0(50)]
// For DESC queries, ReverseRowGroups is applied after to flip order.
let metadata = make_metadata_with_stats(&[(50, 60), (40, 100), (20, 30)]);
let schema = make_arrow_schema();
let sort_order = make_sort_order_desc();

let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Always ASC by min: RG2(min=20), RG1(min=40), RG0(min=50)
assert_eq!(plan.row_group_indexes, vec![2, 1, 0]);
}

#[test]
fn test_reorder_by_statistics_skips_missing_column() {
let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]);
// Schema has "id" but sort order references "nonexistent"
let schema = make_arrow_schema();
let sort_order = LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(
Column::new("nonexistent", 99),
))])
.unwrap();

let plan = PreparedAccessPlan::new(vec![0, 1], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Should NOT reorder because column not found in schema
assert_eq!(plan.row_group_indexes, vec![0, 1]);
}
}
Loading
Loading