Skip to content

Commit ed7b3a2

Browse files
committed
Merge remote-tracking branch 'apache/main' into fix-is-not-null-proto-hooks
# Conflicts: # datafusion/proto/src/physical_plan/to_proto.rs
2 parents 5ff21ae + 11a79a6 commit ed7b3a2

51 files changed

Lines changed: 2072 additions & 556 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

datafusion/common/src/config.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,8 +1151,13 @@ config_namespace! {
11511151
/// in parallel using the provided `target_partitions` level
11521152
pub repartition_aggregations: bool, default = true
11531153

1154-
/// Minimum total files size in bytes to perform file scan repartitioning.
1155-
pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
1154+
/// Minimum total file size in bytes for file-group byte-range
1155+
/// splitting to fire. Files (or merged file groups) smaller than this
1156+
/// stay as one partition. Lower values produce more, smaller
1157+
/// partitions — better at filling `target_partitions` worth of cores
1158+
/// when files are modestly sized, at the cost of slightly more
1159+
/// per-partition open / metadata-load overhead.
1160+
pub repartition_file_min_size: usize, default = 1024 * 1024
11561161

11571162
/// Should DataFusion repartition data using the join keys to execute joins in parallel
11581163
/// using the provided `target_partitions` level

datafusion/common/src/utils/mod.rs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,137 @@ pub fn longest_consecutive_prefix<T: Borrow<usize>>(
395395
count
396396
}
397397

398+
/// Splits `vec` at index `n`, returning the first `n` elements and leaving the
399+
/// remaining `vec.len() - n` elements in `vec`.
400+
///
401+
/// Allocates for whichever side is smaller, so the new allocation is
402+
/// `min(n, vec.len() - n)` rather than always `n` (as `vec.drain(0..n).collect()`
403+
/// would). This matters when the split emits a prefix under memory pressure,
404+
/// where `n` can be close to `vec.len()`.
405+
pub fn split_vec_min_alloc<T>(vec: &mut Vec<T>, n: usize) -> Vec<T> {
406+
if n * 2 <= vec.len() {
407+
vec.drain(0..n).collect()
408+
} else {
409+
let remaining = vec.split_off(n);
410+
std::mem::replace(vec, remaining)
411+
}
412+
}
413+
414+
#[cfg(test)]
415+
mod split_vec_min_alloc_tests {
416+
use super::split_vec_min_alloc;
417+
418+
#[test]
419+
fn drain_branch() {
420+
// n * 2 <= len -> drain+collect branch (allocates n elements)
421+
let mut v = vec![1, 2, 3, 4, 5, 6];
422+
let first = split_vec_min_alloc(&mut v, 2);
423+
assert_eq!(first, vec![1, 2]);
424+
assert_eq!(v, vec![3, 4, 5, 6]);
425+
}
426+
427+
#[test]
428+
fn split_off_branch() {
429+
// remaining < n -> split_off+replace branch (allocates remaining elements)
430+
let mut v = vec![1, 2, 3, 4, 5, 6];
431+
let first = split_vec_min_alloc(&mut v, 4);
432+
assert_eq!(first, vec![1, 2, 3, 4]);
433+
assert_eq!(v, vec![5, 6]);
434+
}
435+
436+
#[test]
437+
fn exactly_half() {
438+
// n * 2 == len -> drain branch (boundary)
439+
let mut v = vec![1, 2, 3, 4];
440+
let first = split_vec_min_alloc(&mut v, 2);
441+
assert_eq!(first, vec![1, 2]);
442+
assert_eq!(v, vec![3, 4]);
443+
}
444+
445+
#[test]
446+
fn take_all() {
447+
let mut v = vec![1, 2, 3];
448+
let first = split_vec_min_alloc(&mut v, 3);
449+
assert_eq!(first, vec![1, 2, 3]);
450+
assert!(v.is_empty());
451+
}
452+
453+
#[test]
454+
fn take_none() {
455+
let mut v = vec![1, 2, 3];
456+
let first = split_vec_min_alloc(&mut v, 0);
457+
assert!(first.is_empty());
458+
assert_eq!(v, vec![1, 2, 3]);
459+
}
460+
461+
#[test]
462+
fn emitted_prefix_does_not_realloc_on_push() {
463+
// Demonstrates *why* the split-off branch must NOT call `shrink_to_fit`.
464+
//
465+
// Downstream callers (e.g. `multi_group_by/bytes.rs`, which does
466+
// `first_n_offsets.push(offset_n)` right after the split) push onto the
467+
// emitted prefix immediately. The split-off branch hands the original
468+
// backing allocation to that prefix, so the prefix already has spare
469+
// capacity for the very next push.
470+
//
471+
// If we shrank the prefix to fit, that next push would have to
472+
// reallocate, and Vec's growth strategy would land it at a *larger*
473+
// capacity than the original allocation we started with -- the opposite
474+
// of the memory saving `shrink_to_fit` was meant to deliver.
475+
476+
// A Vec with a known, deliberately large capacity. n*2 > len, so this
477+
// takes the split-off branch.
478+
let mut v: Vec<u32> = Vec::with_capacity(64);
479+
v.extend(0..10);
480+
let original_capacity = v.capacity();
481+
assert!(original_capacity >= 64);
482+
483+
// Emit a prefix that is most of the Vec (n = 8, remaining = 2).
484+
let mut prefix = split_vec_min_alloc(&mut v, 8);
485+
assert_eq!(prefix, vec![0, 1, 2, 3, 4, 5, 6, 7]);
486+
487+
// The split-off branch moved the original backing store into `prefix`,
488+
// so it keeps the original (large) capacity -- no shrink happened.
489+
assert_eq!(
490+
prefix.capacity(),
491+
original_capacity,
492+
"split-off branch must hand the original allocation to the prefix"
493+
);
494+
495+
// The caller's very next operation: push one element onto the prefix.
496+
prefix.push(99);
497+
498+
// Because the capacity was preserved, the push reused the existing
499+
// allocation: post-push capacity is unchanged and still <= original.
500+
// This is the realloc that `shrink_to_fit` would have forced.
501+
assert_eq!(
502+
prefix.capacity(),
503+
original_capacity,
504+
"push must reuse the preserved allocation (no realloc)"
505+
);
506+
assert!(prefix.capacity() <= original_capacity);
507+
508+
// Counter-demonstration: had we shrunk the prefix to fit (capacity 8),
509+
// the same push would have reallocated. Vec doubles on growth, so the
510+
// post-push capacity (16) ends up LARGER than where a length-8 prefix
511+
// started -- and we paid a realloc for it.
512+
let mut shrunk: Vec<u32> = prefix[..8].to_vec();
513+
shrunk.shrink_to_fit();
514+
let shrunk_capacity = shrink_then_push_capacity(&mut shrunk);
515+
assert!(
516+
shrunk_capacity > 8,
517+
"shrink-to-fit then push reallocates to a larger capacity"
518+
);
519+
}
520+
521+
/// Helper for the counter-demonstration above: push one element and report
522+
/// the resulting capacity.
523+
fn shrink_then_push_capacity(v: &mut Vec<u32>) -> usize {
524+
v.push(99);
525+
v.capacity()
526+
}
527+
}
528+
398529
/// Creates single element [`ListArray`], [`LargeListArray`] and
399530
/// [`FixedSizeListArray`] from other arrays
400531
///

datafusion/core/tests/dataframe/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -840,7 +840,7 @@ async fn test_aggregate_with_pk() -> Result<()> {
840840
let aggr_expr = vec![];
841841
let df = df.aggregate(group_expr, aggr_expr)?;
842842

843-
// Since id and name are functionally dependant, we can use name among
843+
// Since id and name are functionally dependent, we can use name among
844844
// expression even if it is not part of the group by expression and can
845845
// select "name" column even though it wasn't explicitly grouped
846846
let df = df.select(vec![col("id"), col("name")])?;
@@ -895,7 +895,7 @@ async fn test_aggregate_with_pk2() -> Result<()> {
895895
"
896896
);
897897

898-
// Since id and name are functionally dependant, we can use name among expression
898+
// Since id and name are functionally dependent, we can use name among expression
899899
// even if it is not part of the group by expression.
900900
let df_results = df.collect().await?;
901901

@@ -943,7 +943,7 @@ async fn test_aggregate_with_pk3() -> Result<()> {
943943
"
944944
);
945945

946-
// Since id and name are functionally dependant, we can use name among expression
946+
// Since id and name are functionally dependent, we can use name among expression
947947
// even if it is not part of the group by expression.
948948
let df_results = df.collect().await?;
949949

datafusion/core/tests/physical_optimizer/limit_pushdown.rs

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,3 +714,123 @@ fn no_limit_preserves_plan_identity() -> Result<()> {
714714

715715
Ok(())
716716
}
717+
718+
#[test]
719+
fn outer_offset_does_not_leak_through_sort_into_inner_limit() -> Result<()> {
720+
// Regression test for https://github.com/apache/datafusion/issues/22489
721+
//
722+
// When an outer OFFSET is separated from an inner LIMIT by a SortExec
723+
// with different sort keys, the outer skip must not reduce the inner
724+
// fetch. Before the fix, combine_limit merged them, producing
725+
// GlobalLimitExec(skip=1, fetch=7) instead of preserving the inner
726+
// LIMIT 8.
727+
//
728+
// Plan structure:
729+
// GlobalLimitExec: skip=1, fetch=None (outer OFFSET 1)
730+
// SortExec: [c1 DESC] (outer sort — different key)
731+
// GlobalLimitExec: skip=0, fetch=8 (inner LIMIT 8)
732+
// SortExec: [c2 ASC] (inner sort — different key)
733+
// EmptyExec
734+
let schema = create_schema();
735+
let empty = empty_exec(Arc::clone(&schema));
736+
737+
let inner_ordering: LexOrdering = [PhysicalSortExpr {
738+
expr: col("c2", &schema)?,
739+
options: SortOptions::default(),
740+
}]
741+
.into();
742+
let inner_sort = sort_exec(inner_ordering, empty);
743+
let inner_limit = global_limit_exec(inner_sort, 0, Some(8));
744+
745+
let outer_ordering: LexOrdering = [PhysicalSortExpr {
746+
expr: col("c1", &schema)?,
747+
options: SortOptions {
748+
descending: true,
749+
nulls_first: false,
750+
},
751+
}]
752+
.into();
753+
let outer_sort = sort_exec(outer_ordering, inner_limit);
754+
let outer_limit = global_limit_exec(outer_sort, 1, None);
755+
756+
let initial = format_plan(&outer_limit);
757+
insta::assert_snapshot!(
758+
initial,
759+
@r"
760+
GlobalLimitExec: skip=1, fetch=None
761+
SortExec: expr=[c1@0 DESC NULLS LAST], preserve_partitioning=[false]
762+
GlobalLimitExec: skip=0, fetch=8
763+
SortExec: expr=[c2@1 ASC], preserve_partitioning=[false]
764+
EmptyExec
765+
"
766+
);
767+
768+
let after_optimize =
769+
LimitPushdown::new().optimize(outer_limit, &ConfigOptions::new())?;
770+
let optimized = format_plan(&after_optimize);
771+
insta::assert_snapshot!(
772+
optimized,
773+
@r"
774+
GlobalLimitExec: skip=1, fetch=None
775+
SortExec: expr=[c1@0 DESC NULLS LAST], preserve_partitioning=[false]
776+
SortExec: TopK(fetch=8), expr=[c2@1 ASC], preserve_partitioning=[false]
777+
EmptyExec
778+
"
779+
);
780+
781+
Ok(())
782+
}
783+
784+
#[test]
785+
fn outer_offset_with_same_sort_key_still_pushes_limit() -> Result<()> {
786+
// Companion to outer_offset_does_not_leak_through_sort_into_inner_limit:
787+
// when both sorts use the *same* key, the inner LIMIT should still be
788+
// pushed into the SortExec as TopK.
789+
//
790+
// Plan structure:
791+
// GlobalLimitExec: skip=1, fetch=None (outer OFFSET 1)
792+
// SortExec: [c1 ASC] (outer sort — same key)
793+
// GlobalLimitExec: skip=0, fetch=8 (inner LIMIT 8)
794+
// SortExec: [c1 ASC] (inner sort — same key)
795+
// EmptyExec
796+
let schema = create_schema();
797+
let empty = empty_exec(Arc::clone(&schema));
798+
799+
let ordering: LexOrdering = [PhysicalSortExpr {
800+
expr: col("c1", &schema)?,
801+
options: SortOptions::default(),
802+
}]
803+
.into();
804+
805+
let inner_sort = sort_exec(ordering.clone(), empty);
806+
let inner_limit = global_limit_exec(inner_sort, 0, Some(8));
807+
let outer_sort = sort_exec(ordering, inner_limit);
808+
let outer_limit = global_limit_exec(outer_sort, 1, None);
809+
810+
let initial = format_plan(&outer_limit);
811+
insta::assert_snapshot!(
812+
initial,
813+
@r"
814+
GlobalLimitExec: skip=1, fetch=None
815+
SortExec: expr=[c1@0 ASC], preserve_partitioning=[false]
816+
GlobalLimitExec: skip=0, fetch=8
817+
SortExec: expr=[c1@0 ASC], preserve_partitioning=[false]
818+
EmptyExec
819+
"
820+
);
821+
822+
let after_optimize =
823+
LimitPushdown::new().optimize(outer_limit, &ConfigOptions::new())?;
824+
let optimized = format_plan(&after_optimize);
825+
insta::assert_snapshot!(
826+
optimized,
827+
@r"
828+
GlobalLimitExec: skip=1, fetch=None
829+
SortExec: expr=[c1@0 ASC], preserve_partitioning=[false]
830+
SortExec: TopK(fetch=8), expr=[c1@0 ASC], preserve_partitioning=[false]
831+
EmptyExec
832+
"
833+
);
834+
835+
Ok(())
836+
}

datafusion/ffi/src/physical_expr/partitioning.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ impl From<&Partitioning> for FFI_Partitioning {
4545
.collect();
4646
Self::Hash(exprs, *size)
4747
}
48+
// FFI does not yet expose range partition metadata.
49+
// See https://github.com/apache/datafusion/issues/22394
50+
Partitioning::Range(range) => {
51+
Self::UnknownPartitioning(range.partition_count())
52+
}
4853
Partitioning::UnknownPartitioning(size) => Self::UnknownPartitioning(*size),
4954
}
5055
}

datafusion/functions-aggregate/src/min_max/min_max_bytes.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls:
2727
use std::mem::size_of;
2828
use std::sync::Arc;
2929

30+
use datafusion_common::utils::split_vec_min_alloc;
31+
3032
/// Implements fast Min/Max [`GroupsAccumulator`] for "bytes" types ([`StringArray`],
3133
/// [`BinaryArray`], [`StringViewArray`], etc)
3234
///
@@ -493,7 +495,7 @@ impl MinMaxBytesState {
493495
)
494496
}
495497
EmitTo::First(n) => {
496-
let first_min_maxes: Vec<_> = self.min_max.drain(..n).collect();
498+
let first_min_maxes = split_vec_min_alloc(&mut self.min_max, n);
497499
let first_data_capacity: usize = first_min_maxes
498500
.iter()
499501
.map(|opt| opt.as_ref().map(|s| s.len()).unwrap_or(0))

datafusion/functions-aggregate/src/min_max/min_max_struct.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ use datafusion_common::{
3030
use datafusion_expr::{EmitTo, GroupsAccumulator};
3131
use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::apply_filter_as_nulls;
3232

33+
use datafusion_common::utils::split_vec_min_alloc;
34+
3335
/// Accumulator for MIN/MAX operations on Struct data types.
3436
///
3537
/// This accumulator tracks the minimum or maximum struct value encountered
@@ -282,7 +284,7 @@ impl MinMaxStructState {
282284
)
283285
}
284286
EmitTo::First(n) => {
285-
let first_min_maxes: Vec<_> = self.min_max.drain(..n).collect();
287+
let first_min_maxes = split_vec_min_alloc(&mut self.min_max, n);
286288
let first_data_capacity: usize = first_min_maxes
287289
.iter()
288290
.map(|opt| opt.as_ref().map(|s| s.len()).unwrap_or(0))

0 commit comments

Comments
 (0)