Skip to content

Commit b3b7fe3

Browse files
committed
perf(arrow-ord): Avoid full index materialization for small-limit lexsorts
1 parent fd1c5b3 commit b3b7fe3

1 file changed

Lines changed: 134 additions & 28 deletions

File tree

arrow-ord/src/sort.rs

Lines changed: 134 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -949,40 +949,55 @@ pub fn lexsort_to_indices(
949949
));
950950
};
951951

952-
let mut value_indices = (0..row_count).collect::<Vec<usize>>();
953-
let mut len = value_indices.len();
952+
let len = limit.unwrap_or(row_count).min(row_count);
954953

955-
if let Some(limit) = limit {
956-
len = limit.min(len);
954+
if len == 0 {
955+
return Ok(UInt32Array::from(Vec::<u32>::new()));
957956
}
958957

959-
// Instantiate specialized versions of comparisons for small numbers
960-
// of columns as it helps the compiler generate better code.
961-
match columns.len() {
962-
2 => {
963-
sort_fixed_column::<2>(columns, &mut value_indices, len)?;
964-
}
965-
3 => {
966-
sort_fixed_column::<3>(columns, &mut value_indices, len)?;
967-
}
968-
4 => {
969-
sort_fixed_column::<4>(columns, &mut value_indices, len)?;
970-
}
971-
5 => {
972-
sort_fixed_column::<5>(columns, &mut value_indices, len)?;
958+
// The heap path avoids allocating and partially sorting all row indices
959+
// when the requested limit is a small fraction of the input. For larger
960+
// limits, the existing partial-sort path is preferred because heap
961+
// maintenance costs grow with the requested limit.
962+
let value_indices = if limit.is_some() && len <= row_count / 10 {
963+
match columns.len() {
964+
2 => lexsort_topk_fixed::<2>(columns, row_count, len)?,
965+
3 => lexsort_topk_fixed::<3>(columns, row_count, len)?,
966+
4 => lexsort_topk_fixed::<4>(columns, row_count, len)?,
967+
5 => lexsort_topk_fixed::<5>(columns, row_count, len)?,
968+
_ => {
969+
let lexicographical_comparator = LexicographicalComparator::try_new(columns)?;
970+
lexsort_topk(row_count, len, |a, b| {
971+
lexicographical_comparator.compare(a, b)
972+
})
973+
}
973974
}
974-
_ => {
975-
let lexicographical_comparator = LexicographicalComparator::try_new(columns)?;
976-
// uint32 can be sorted unstably
977-
sort_unstable_by(&mut value_indices, len, |a, b| {
978-
lexicographical_comparator.compare(*a, *b)
979-
});
975+
} else {
976+
let mut value_indices = (0..row_count).collect::<Vec<usize>>();
977+
978+
// Instantiate specialized versions of comparisons for small numbers
979+
// of columns as it helps the compiler generate better code.
980+
match columns.len() {
981+
2 => sort_fixed_column::<2>(columns, &mut value_indices, len)?,
982+
3 => sort_fixed_column::<3>(columns, &mut value_indices, len)?,
983+
4 => sort_fixed_column::<4>(columns, &mut value_indices, len)?,
984+
5 => sort_fixed_column::<5>(columns, &mut value_indices, len)?,
985+
_ => {
986+
let lexicographical_comparator = LexicographicalComparator::try_new(columns)?;
987+
sort_unstable_by(&mut value_indices, len, |a, b| {
988+
lexicographical_comparator.compare(*a, *b)
989+
});
990+
}
980991
}
981-
}
992+
993+
value_indices.truncate(len);
994+
value_indices
995+
};
996+
982997
Ok(UInt32Array::from(
983-
value_indices[..len]
984-
.iter()
985-
.map(|i| *i as u32)
998+
value_indices
999+
.into_iter()
1000+
.map(|i| i as u32)
9861001
.collect::<Vec<_>>(),
9871002
))
9881003
}
@@ -1000,6 +1015,97 @@ fn sort_fixed_column<const N: usize>(
10001015
Ok(())
10011016
}
10021017

1018+
// Uses the fixed-column comparator for the bounded heap path.
1019+
fn lexsort_topk_fixed<const N: usize>(
1020+
columns: &[SortColumn],
1021+
row_count: usize,
1022+
limit: usize,
1023+
) -> Result<Vec<usize>, ArrowError> {
1024+
let lexicographical_comparator = FixedLexicographicalComparator::<N>::try_new(columns)?;
1025+
Ok(lexsort_topk(row_count, limit, |a, b| {
1026+
lexicographical_comparator.compare(a, b)
1027+
}))
1028+
}
1029+
1030+
// Keeps the smallest `limit` indices in a bounded max-heap.
1031+
// The root is the largest retained index according to `compare`.
1032+
fn lexsort_topk(
1033+
row_count: usize,
1034+
limit: usize,
1035+
mut compare: impl FnMut(usize, usize) -> Ordering,
1036+
) -> Vec<usize> {
1037+
if limit >= row_count {
1038+
let mut value_indices = (0..row_count).collect::<Vec<_>>();
1039+
value_indices.sort_unstable_by(|a, b| compare(*a, *b));
1040+
return value_indices;
1041+
}
1042+
1043+
let mut heap = Vec::with_capacity(limit);
1044+
1045+
for idx in 0..row_count {
1046+
if heap.len() < limit {
1047+
heap.push(idx);
1048+
let pos = heap.len() - 1;
1049+
sift_up_worst_heap(&mut heap, pos, &mut compare);
1050+
} else if compare(idx, heap[0]) == Ordering::Less {
1051+
heap[0] = idx;
1052+
sift_down_worst_heap(&mut heap, 0, &mut compare);
1053+
}
1054+
}
1055+
1056+
// Return the selected prefix in sorted order.
1057+
heap.sort_unstable_by(|a, b| compare(*a, *b));
1058+
heap
1059+
}
1060+
1061+
// Moves a newly inserted index toward the root while it is larger than its parent.
1062+
fn sift_up_worst_heap(
1063+
heap: &mut [usize],
1064+
mut pos: usize,
1065+
compare: &mut impl FnMut(usize, usize) -> Ordering,
1066+
) {
1067+
while pos > 0 {
1068+
let parent = (pos - 1) / 2;
1069+
1070+
if compare(heap[parent], heap[pos]) != Ordering::Less {
1071+
break;
1072+
}
1073+
1074+
heap.swap(parent, pos);
1075+
pos = parent;
1076+
}
1077+
}
1078+
1079+
// Moves the root down until both children are no larger than it.
1080+
// The larger child is selected at each step so the worst retained row remains
1081+
// at heap[0].
1082+
fn sift_down_worst_heap(
1083+
heap: &mut [usize],
1084+
mut pos: usize,
1085+
compare: &mut impl FnMut(usize, usize) -> Ordering,
1086+
) {
1087+
loop {
1088+
let left = pos * 2 + 1;
1089+
if left >= heap.len() {
1090+
break;
1091+
}
1092+
1093+
let right = left + 1;
1094+
let mut worst = left;
1095+
1096+
if right < heap.len() && compare(heap[left], heap[right]) == Ordering::Less {
1097+
worst = right;
1098+
}
1099+
1100+
if compare(heap[pos], heap[worst]) != Ordering::Less {
1101+
break;
1102+
}
1103+
1104+
heap.swap(pos, worst);
1105+
pos = worst;
1106+
}
1107+
}
1108+
10031109
/// It's unstable_sort, may not preserve the order of equal elements
10041110
pub fn partial_sort<T, F>(v: &mut [T], limit: usize, mut is_less: F)
10051111
where

0 commit comments

Comments
 (0)