Skip to content

Commit e5c67a5

Browse files
committed
The topk algorithm is implemented to optimize the use of both order by and limit
1 parent 13dc38b commit e5c67a5

12 files changed

Lines changed: 308 additions & 12 deletions

File tree

src/db.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,11 @@ impl<S: Storage> State<S> {
227227
NormalizationRuleImpl::CombineFilter,
228228
],
229229
)
230+
.batch(
231+
"TopK".to_string(),
232+
HepBatchStrategy::once_topdown(),
233+
vec![NormalizationRuleImpl::TopK],
234+
)
230235
.batch(
231236
"Expression Remapper".to_string(),
232237
HepBatchStrategy::once_topdown(),

src/execution/dql/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub(crate) mod seq_scan;
1313
pub(crate) mod show_table;
1414
pub(crate) mod show_view;
1515
pub(crate) mod sort;
16+
pub(crate) mod top_k;
1617
pub(crate) mod union;
1718
pub(crate) mod values;
1819

src/execution/dql/sort.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::pin::Pin;
1515
pub(crate) type BumpVec<'bump, T> = bumpalo::collections::Vec<'bump, T>;
1616

1717
#[derive(Clone)]
18-
pub(crate) struct NullableVec<'a, T>(BumpVec<'a, Option<T>>);
18+
pub(crate) struct NullableVec<'a, T>(pub(crate) BumpVec<'a, Option<T>>);
1919

2020
impl<'a, T> NullableVec<'a, T> {
2121
#[inline]
@@ -49,17 +49,31 @@ impl<'a, T> NullableVec<'a, T> {
4949
}
5050
}
5151

52-
struct RemappingIterator<'a> {
52+
pub struct RemappingIterator<'a> {
5353
pos: usize,
5454
tuples: NullableVec<'a, (usize, Tuple)>,
5555
indices: BumpVec<'a, usize>,
5656
}
5757

58+
impl RemappingIterator<'_> {
59+
pub fn new<'a>(
60+
pos: usize,
61+
tuples: NullableVec<'a, (usize, Tuple)>,
62+
indices: BumpVec<'a, usize>,
63+
) -> RemappingIterator<'a> {
64+
RemappingIterator {
65+
pos,
66+
tuples,
67+
indices,
68+
}
69+
}
70+
}
71+
5872
impl Iterator for RemappingIterator<'_> {
5973
type Item = Tuple;
6074

6175
fn next(&mut self) -> Option<Self::Item> {
62-
if self.pos > self.tuples.len() - 1 {
76+
if self.pos > self.indices.len() - 1 {
6377
return None;
6478
}
6579
let (_, tuple) = self.tuples.take(self.indices[self.pos]);
@@ -147,11 +161,7 @@ impl SortBy {
147161
}
148162
let indices = radix_sort(sort_keys, arena);
149163

150-
Ok(Box::new(RemappingIterator {
151-
pos: 0,
152-
tuples,
153-
indices,
154-
}))
164+
Ok(Box::new(RemappingIterator::new(0, tuples, indices)))
155165
}
156166
SortBy::Fast => {
157167
let fn_nulls_first = |nulls_first: bool| {

src/execution/dql/top_k.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
use crate::errors::DatabaseError;
2+
use crate::execution::dql::sort::{BumpVec, NullableVec, RemappingIterator};
3+
use crate::execution::{build_read, Executor, ReadExecutor};
4+
use crate::planner::operator::sort::SortField;
5+
use crate::planner::operator::top_k::TopKOperator;
6+
use crate::planner::LogicalPlan;
7+
use crate::storage::table_codec::BumpBytes;
8+
use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache};
9+
use crate::throw;
10+
use crate::types::tuple::{Schema, Tuple};
11+
use bumpalo::Bump;
12+
use std::cmp::Reverse;
13+
use std::collections::BinaryHeap;
14+
use std::ops::Coroutine;
15+
use std::ops::CoroutineState;
16+
use std::pin::Pin;
17+
18+
fn top_sort<'a>(
19+
arena: &'a Bump,
20+
schema: &Schema,
21+
sort_fields: &[SortField],
22+
tuples: NullableVec<'a, (usize, Tuple)>,
23+
limit: Option<usize>,
24+
offset: Option<usize>,
25+
) -> Result<Box<dyn Iterator<Item = Tuple> + 'a>, DatabaseError> {
26+
let mut sort_keys = BumpVec::with_capacity_in(tuples.len(), arena);
27+
for (i, tuple) in tuples.0.iter().enumerate() {
28+
let mut full_key = BumpVec::new_in(arena);
29+
for SortField {
30+
expr,
31+
nulls_first,
32+
asc,
33+
} in sort_fields
34+
{
35+
let mut key = BumpBytes::new_in(arena);
36+
let tuple = tuple.as_ref().map(|(_, tuple)| tuple).unwrap();
37+
expr.eval(Some((tuple, &**schema)))?
38+
.memcomparable_encode(&mut key)?;
39+
if *asc {
40+
for byte in key.iter_mut() {
41+
*byte ^= 0xFF;
42+
}
43+
}
44+
key.push(if *nulls_first { u8::MIN } else { u8::MAX });
45+
full_key.extend(key);
46+
}
47+
//full_key.extend_from_slice(&(i as u64).to_be_bytes());
48+
sort_keys.push((i, full_key))
49+
}
50+
51+
let keep_count = offset.unwrap_or(0) + limit.unwrap_or(sort_keys.len());
52+
53+
let mut heap: BinaryHeap<Reverse<(&[u8], usize)>> = BinaryHeap::with_capacity(keep_count);
54+
for (i, key) in sort_keys.iter() {
55+
let key = key.as_slice();
56+
if heap.len() < keep_count {
57+
heap.push(Reverse((key, *i)));
58+
} else if let Some(&Reverse((min_key, _))) = heap.peek() {
59+
if key > min_key {
60+
heap.pop();
61+
heap.push(Reverse((key, *i)));
62+
}
63+
}
64+
}
65+
66+
let mut topk: Vec<(Vec<u8>, usize)> = heap
67+
.into_iter()
68+
.map(|Reverse((key, i))| (key.to_vec(), i))
69+
.collect();
70+
topk.sort_by(|(k1, i1), (k2, i2)| k1.cmp(k2).then_with(|| i1.cmp(i2).reverse()));
71+
topk.reverse();
72+
73+
let mut bumped_indices =
74+
BumpVec::with_capacity_in(topk.len().saturating_sub(offset.unwrap_or(0)), arena);
75+
for (_, idx) in topk.into_iter().skip(offset.unwrap_or(0)) {
76+
bumped_indices.push(idx);
77+
}
78+
Ok(Box::new(RemappingIterator::new(0, tuples, bumped_indices)))
79+
}
80+
81+
pub struct TopK {
82+
arena: Bump,
83+
sort_fields: Vec<SortField>,
84+
limit: Option<usize>,
85+
offset: Option<usize>,
86+
input: LogicalPlan,
87+
}
88+
89+
impl From<(TopKOperator, LogicalPlan)> for TopK {
90+
fn from(
91+
(
92+
TopKOperator {
93+
sort_fields,
94+
limit,
95+
offset,
96+
},
97+
input,
98+
): (TopKOperator, LogicalPlan),
99+
) -> Self {
100+
TopK {
101+
arena: Default::default(),
102+
sort_fields,
103+
limit,
104+
offset,
105+
input,
106+
}
107+
}
108+
}
109+
110+
impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for TopK {
111+
fn execute(
112+
self,
113+
cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache),
114+
transaction: *mut T,
115+
) -> Executor<'a> {
116+
Box::new(
117+
#[coroutine]
118+
move || {
119+
let TopK {
120+
arena,
121+
sort_fields,
122+
limit,
123+
offset,
124+
mut input,
125+
} = self;
126+
127+
let arena: *const Bump = &arena;
128+
129+
let mut tuples = NullableVec::new(unsafe { &*arena });
130+
let schema = input.output_schema().clone();
131+
let mut tuple_offset = 0;
132+
133+
let mut coroutine = build_read(input, cache, transaction);
134+
135+
while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) {
136+
tuples.put((tuple_offset, throw!(tuple)));
137+
tuple_offset += 1;
138+
}
139+
140+
for tuple in throw!(top_sort(
141+
unsafe { &*arena },
142+
&schema,
143+
&sort_fields,
144+
tuples,
145+
limit,
146+
offset
147+
)) {
148+
yield Ok(tuple)
149+
}
150+
},
151+
)
152+
}
153+
}

src/execution/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use crate::execution::dql::seq_scan::SeqScan;
3636
use crate::execution::dql::show_table::ShowTables;
3737
use crate::execution::dql::show_view::ShowViews;
3838
use crate::execution::dql::sort::Sort;
39+
use crate::execution::dql::top_k::TopK;
3940
use crate::execution::dql::union::Union;
4041
use crate::execution::dql::values::Values;
4142
use crate::planner::operator::join::JoinCondition;
@@ -133,6 +134,11 @@ pub fn build_read<'a, T: Transaction + 'a>(
133134

134135
Limit::from((op, input)).execute(cache, transaction)
135136
}
137+
Operator::TopK(op) => {
138+
let input = childrens.pop_only();
139+
140+
TopK::from((op, input)).execute(cache, transaction)
141+
}
136142
Operator::Values(op) => Values::from(op).execute(cache, transaction),
137143
Operator::ShowTable => ShowTables.execute(cache, transaction),
138144
Operator::ShowView => ShowViews.execute(cache, transaction),

src/optimizer/rule/normalization/column_pruning.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ impl ColumnPruning {
111111
| Operator::Join(_)
112112
| Operator::Filter(_)
113113
| Operator::Union(_)
114-
| Operator::Except(_) => {
114+
| Operator::Except(_)
115+
| Operator::TopK(_) => {
115116
let temp_columns = operator.referenced_columns(false);
116117
// why?
117118
let mut column_references = column_references;

src/optimizer/rule/normalization/compilation_in_advance.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@ impl ExpressionRemapper {
7676
TryReference::new(output_exprs).visit(&mut sort_field.expr)?;
7777
}
7878
}
79+
Operator::TopK(op) => {
80+
for sort_field in op.sort_fields.iter_mut() {
81+
TryReference::new(output_exprs).visit(&mut sort_field.expr)?;
82+
}
83+
}
7984
Operator::FunctionScan(op) => {
8085
for expr in op.table_function.args.iter_mut() {
8186
TryReference::new(output_exprs).visit(expr)?;
@@ -186,6 +191,11 @@ impl EvaluatorBind {
186191
BindEvaluator.visit(&mut sort_field.expr)?;
187192
}
188193
}
194+
Operator::TopK(op) => {
195+
for sort_field in op.sort_fields.iter_mut() {
196+
BindEvaluator.visit(&mut sort_field.expr)?;
197+
}
198+
}
189199
Operator::FunctionScan(op) => {
190200
for expr in op.table_function.args.iter_mut() {
191201
BindEvaluator.visit(expr)?;

src/optimizer/rule/normalization/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,22 @@ use crate::optimizer::rule::normalization::combine_operators::{
1010
use crate::optimizer::rule::normalization::compilation_in_advance::{
1111
EvaluatorBind, ExpressionRemapper,
1212
};
13+
1314
use crate::optimizer::rule::normalization::pushdown_limit::{
1415
LimitProjectTranspose, PushLimitIntoScan, PushLimitThroughJoin,
1516
};
1617
use crate::optimizer::rule::normalization::pushdown_predicates::PushPredicateIntoScan;
1718
use crate::optimizer::rule::normalization::pushdown_predicates::PushPredicateThroughJoin;
1819
use crate::optimizer::rule::normalization::simplification::ConstantCalculation;
1920
use crate::optimizer::rule::normalization::simplification::SimplifyFilter;
20-
21+
use crate::optimizer::rule::normalization::top_k::TopK;
2122
mod column_pruning;
2223
mod combine_operators;
2324
mod compilation_in_advance;
2425
mod pushdown_limit;
2526
mod pushdown_predicates;
2627
mod simplification;
28+
mod top_k;
2729

2830
#[derive(Debug, Copy, Clone)]
2931
pub enum NormalizationRuleImpl {
@@ -46,6 +48,7 @@ pub enum NormalizationRuleImpl {
4648
// CompilationInAdvance
4749
ExpressionRemapper,
4850
EvaluatorBind,
51+
TopK,
4952
}
5053

5154
impl MatchPattern for NormalizationRuleImpl {
@@ -64,6 +67,7 @@ impl MatchPattern for NormalizationRuleImpl {
6467
NormalizationRuleImpl::ConstantCalculation => ConstantCalculation.pattern(),
6568
NormalizationRuleImpl::ExpressionRemapper => ExpressionRemapper.pattern(),
6669
NormalizationRuleImpl::EvaluatorBind => EvaluatorBind.pattern(),
70+
NormalizationRuleImpl::TopK => TopK.pattern(),
6771
}
6872
}
6973
}
@@ -94,6 +98,7 @@ impl NormalizationRule for NormalizationRuleImpl {
9498
NormalizationRuleImpl::ConstantCalculation => ConstantCalculation.apply(node_id, graph),
9599
NormalizationRuleImpl::ExpressionRemapper => ExpressionRemapper.apply(node_id, graph),
96100
NormalizationRuleImpl::EvaluatorBind => EvaluatorBind.apply(node_id, graph),
101+
NormalizationRuleImpl::TopK => TopK.apply(node_id, graph),
97102
}
98103
}
99104
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
use crate::errors::DatabaseError;
2+
use crate::optimizer::core::pattern::Pattern;
3+
use crate::optimizer::core::pattern::PatternChildrenPredicate;
4+
use crate::optimizer::core::rule::{MatchPattern, NormalizationRule};
5+
use crate::optimizer::heuristic::graph::{HepGraph, HepNodeId};
6+
use crate::planner::operator::top_k::TopKOperator;
7+
use crate::planner::operator::Operator;
8+
use std::sync::LazyLock;
9+
10+
static TOP_K_RULE: LazyLock<Pattern> = LazyLock::new(|| Pattern {
11+
predicate: |op| matches!(op, Operator::Limit(_)),
12+
children: PatternChildrenPredicate::Predicate(vec![Pattern {
13+
predicate: |op| matches!(op, Operator::Sort(_)),
14+
children: PatternChildrenPredicate::None,
15+
}]),
16+
});
17+
18+
pub struct TopK;
19+
20+
impl MatchPattern for TopK {
21+
fn pattern(&self) -> &Pattern {
22+
&TOP_K_RULE
23+
}
24+
}
25+
26+
impl NormalizationRule for TopK {
27+
fn apply(&self, node_id: HepNodeId, graph: &mut HepGraph) -> Result<(), DatabaseError> {
28+
if let Operator::Limit(op) = graph.operator(node_id) {
29+
if let Some(child_id) = graph.eldest_child_at(node_id) {
30+
if let Operator::Sort(child_op) = graph.operator(child_id) {
31+
graph.replace_node(
32+
node_id,
33+
Operator::TopK(TopKOperator {
34+
sort_fields: child_op.sort_fields.clone(),
35+
limit: op.limit,
36+
offset: op.offset,
37+
}),
38+
);
39+
graph.remove_node(child_id, false);
40+
}
41+
}
42+
}
43+
44+
Ok(())
45+
}
46+
}

src/planner/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ impl LogicalPlan {
131131
mut childrens_iter: ChildrensIter,
132132
) -> SchemaOutput {
133133
match operator {
134-
Operator::Filter(_) | Operator::Sort(_) | Operator::Limit(_) => {
134+
Operator::Filter(_) | Operator::Sort(_) | Operator::Limit(_) | Operator::TopK(_) => {
135135
childrens_iter.next().unwrap().output_schema_direct()
136136
}
137137
Operator::Aggregate(op) => SchemaOutput::Schema(

0 commit comments

Comments
 (0)