Skip to content

Commit d07982f

Browse files
authored
Merge branch 'main' into multi-column-null-aware-anti-join
2 parents f6db769 + 936f959 commit d07982f

File tree

32 files changed

+1699
-328
lines changed

32 files changed

+1699
-328
lines changed

datafusion/core/benches/topk_aggregate.rs

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,63 @@
1717

1818
mod data_utils;
1919

20+
use arrow::array::Int64Builder;
21+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
22+
use arrow::record_batch::RecordBatch;
2023
use arrow::util::pretty::pretty_format_batches;
2124
use criterion::{Criterion, criterion_group, criterion_main};
2225
use data_utils::make_data;
2326
use datafusion::physical_plan::{collect, displayable};
2427
use datafusion::prelude::SessionContext;
2528
use datafusion::{datasource::MemTable, error::Result};
2629
use datafusion_execution::config::SessionConfig;
30+
use rand::SeedableRng;
31+
use rand::seq::SliceRandom;
2732
use std::hint::black_box;
2833
use std::sync::Arc;
2934
use tokio::runtime::Runtime;
3035

3136
const LIMIT: usize = 10;
3237

38+
/// Create deterministic data for DISTINCT benchmarks with predictable trace_ids
39+
/// This ensures consistent results across benchmark runs
40+
fn make_distinct_data(
41+
partition_cnt: i32,
42+
sample_cnt: i32,
43+
) -> Result<(Arc<Schema>, Vec<Vec<RecordBatch>>)> {
44+
let mut rng = rand::rngs::SmallRng::from_seed([42; 32]);
45+
let total_samples = partition_cnt as usize * sample_cnt as usize;
46+
let mut ids = Vec::new();
47+
for i in 0..total_samples {
48+
ids.push(i as i64);
49+
}
50+
ids.shuffle(&mut rng);
51+
52+
let mut global_idx = 0;
53+
let schema = test_distinct_schema();
54+
let mut partitions = vec![];
55+
for _ in 0..partition_cnt {
56+
let mut id_builder = Int64Builder::new();
57+
58+
for _ in 0..sample_cnt {
59+
let id = ids[global_idx];
60+
id_builder.append_value(id);
61+
global_idx += 1;
62+
}
63+
64+
let id_col = Arc::new(id_builder.finish());
65+
let batch = RecordBatch::try_new(schema.clone(), vec![id_col])?;
66+
partitions.push(vec![batch]);
67+
}
68+
69+
Ok((schema, partitions))
70+
}
71+
72+
/// Returns a Schema for distinct benchmarks with i64 trace_id
73+
fn test_distinct_schema() -> SchemaRef {
74+
Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]))
75+
}
76+
3377
async fn create_context(
3478
partition_cnt: i32,
3579
sample_cnt: i32,
@@ -50,6 +94,25 @@ async fn create_context(
5094
Ok(ctx)
5195
}
5296

97+
async fn create_context_distinct(
98+
partition_cnt: i32,
99+
sample_cnt: i32,
100+
use_topk: bool,
101+
) -> Result<SessionContext> {
102+
// Use deterministic data generation for DISTINCT queries to ensure consistent results
103+
let (schema, parts) = make_distinct_data(partition_cnt, sample_cnt).unwrap();
104+
let mem_table = Arc::new(MemTable::try_new(schema, parts).unwrap());
105+
106+
// Create the DataFrame
107+
let mut cfg = SessionConfig::new();
108+
let opts = cfg.options_mut();
109+
opts.optimizer.enable_topk_aggregation = use_topk;
110+
let ctx = SessionContext::new_with_config(cfg);
111+
let _ = ctx.register_table("traces", mem_table)?;
112+
113+
Ok(ctx)
114+
}
115+
53116
fn run(rt: &Runtime, ctx: SessionContext, limit: usize, use_topk: bool, asc: bool) {
54117
black_box(rt.block_on(async { aggregate(ctx, limit, use_topk, asc).await })).unwrap();
55118
}
@@ -59,6 +122,17 @@ fn run_string(rt: &Runtime, ctx: SessionContext, limit: usize, use_topk: bool) {
59122
.unwrap();
60123
}
61124

125+
fn run_distinct(
126+
rt: &Runtime,
127+
ctx: SessionContext,
128+
limit: usize,
129+
use_topk: bool,
130+
asc: bool,
131+
) {
132+
black_box(rt.block_on(async { aggregate_distinct(ctx, limit, use_topk, asc).await }))
133+
.unwrap();
134+
}
135+
62136
async fn aggregate(
63137
ctx: SessionContext,
64138
limit: usize,
@@ -133,6 +207,84 @@ async fn aggregate_string(
133207
Ok(())
134208
}
135209

210+
async fn aggregate_distinct(
211+
ctx: SessionContext,
212+
limit: usize,
213+
use_topk: bool,
214+
asc: bool,
215+
) -> Result<()> {
216+
let order_direction = if asc { "asc" } else { "desc" };
217+
let sql = format!(
218+
"select id from traces group by id order by id {order_direction} limit {limit};"
219+
);
220+
let df = ctx.sql(sql.as_str()).await?;
221+
let plan = df.create_physical_plan().await?;
222+
let actual_phys_plan = displayable(plan.as_ref()).indent(true).to_string();
223+
assert_eq!(
224+
actual_phys_plan.contains(&format!("lim=[{limit}]")),
225+
use_topk
226+
);
227+
let batches = collect(plan, ctx.task_ctx()).await?;
228+
assert_eq!(batches.len(), 1);
229+
let batch = batches.first().unwrap();
230+
assert_eq!(batch.num_rows(), LIMIT);
231+
232+
let actual = format!("{}", pretty_format_batches(&batches)?).to_lowercase();
233+
234+
let expected_asc = r#"
235+
+----+
236+
| id |
237+
+----+
238+
| 0 |
239+
| 1 |
240+
| 2 |
241+
| 3 |
242+
| 4 |
243+
| 5 |
244+
| 6 |
245+
| 7 |
246+
| 8 |
247+
| 9 |
248+
+----+
249+
"#
250+
.trim();
251+
252+
let expected_desc = r#"
253+
+---------+
254+
| id |
255+
+---------+
256+
| 9999999 |
257+
| 9999998 |
258+
| 9999997 |
259+
| 9999996 |
260+
| 9999995 |
261+
| 9999994 |
262+
| 9999993 |
263+
| 9999992 |
264+
| 9999991 |
265+
| 9999990 |
266+
+---------+
267+
"#
268+
.trim();
269+
270+
// Verify exact results match expected values
271+
if asc {
272+
assert_eq!(
273+
actual.trim(),
274+
expected_asc,
275+
"Ascending DISTINCT results do not match expected values"
276+
);
277+
} else {
278+
assert_eq!(
279+
actual.trim(),
280+
expected_desc,
281+
"Descending DISTINCT results do not match expected values"
282+
);
283+
}
284+
285+
Ok(())
286+
}
287+
136288
fn criterion_benchmark(c: &mut Criterion) {
137289
let rt = Runtime::new().unwrap();
138290
let limit = LIMIT;
@@ -253,6 +405,37 @@ fn criterion_benchmark(c: &mut Criterion) {
253405
.as_str(),
254406
|b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
255407
);
408+
409+
// DISTINCT benchmarks
410+
let ctx = rt.block_on(async {
411+
create_context_distinct(partitions, samples, false)
412+
.await
413+
.unwrap()
414+
});
415+
c.bench_function(
416+
format!("distinct {} rows desc [no TopK]", partitions * samples).as_str(),
417+
|b| b.iter(|| run_distinct(&rt, ctx.clone(), limit, false, false)),
418+
);
419+
420+
c.bench_function(
421+
format!("distinct {} rows asc [no TopK]", partitions * samples).as_str(),
422+
|b| b.iter(|| run_distinct(&rt, ctx.clone(), limit, false, true)),
423+
);
424+
425+
let ctx_topk = rt.block_on(async {
426+
create_context_distinct(partitions, samples, true)
427+
.await
428+
.unwrap()
429+
});
430+
c.bench_function(
431+
format!("distinct {} rows desc [TopK]", partitions * samples).as_str(),
432+
|b| b.iter(|| run_distinct(&rt, ctx_topk.clone(), limit, true, false)),
433+
);
434+
435+
c.bench_function(
436+
format!("distinct {} rows asc [TopK]", partitions * samples).as_str(),
437+
|b| b.iter(|| run_distinct(&rt, ctx_topk.clone(), limit, true, true)),
438+
);
256439
}
257440

258441
criterion_group!(benches, criterion_benchmark);

datafusion/core/src/physical_planner.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::physical_expr::{create_physical_expr, create_physical_exprs};
3939
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
4040
use crate::physical_plan::analyze::AnalyzeExec;
4141
use crate::physical_plan::explain::ExplainExec;
42-
use crate::physical_plan::filter::FilterExec;
42+
use crate::physical_plan::filter::FilterExecBuilder;
4343
use crate::physical_plan::joins::utils as join_utils;
4444
use crate::physical_plan::joins::{
4545
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec,
@@ -938,8 +938,12 @@ impl DefaultPhysicalPlanner {
938938
input_schema.as_arrow(),
939939
)? {
940940
PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => {
941-
FilterExec::try_new(Arc::clone(&runtime_expr[0]), physical_input)?
942-
.with_batch_size(session_state.config().batch_size())?
941+
FilterExecBuilder::new(
942+
Arc::clone(&runtime_expr[0]),
943+
physical_input,
944+
)
945+
.with_batch_size(session_state.config().batch_size())
946+
.build()?
943947
}
944948
PlanAsyncExpr::Async(
945949
async_map,
@@ -949,16 +953,17 @@ impl DefaultPhysicalPlanner {
949953
async_map.async_exprs,
950954
physical_input,
951955
)?;
952-
FilterExec::try_new(
956+
FilterExecBuilder::new(
953957
Arc::clone(&runtime_expr[0]),
954958
Arc::new(async_exec),
955-
)?
959+
)
956960
// project the output columns excluding the async functions
957961
// The async functions are always appended to the end of the schema.
958-
.with_projection(Some(
962+
.apply_projection(Some(
959963
(0..input.schema().fields().len()).collect(),
960964
))?
961-
.with_batch_size(session_state.config().batch_size())?
965+
.with_batch_size(session_state.config().batch_size())
966+
.build()?
962967
}
963968
_ => {
964969
return internal_err!(

datafusion/core/tests/execution/coop.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use datafusion::physical_expr::aggregate::AggregateExprBuilder;
2424
use datafusion::physical_plan;
2525
use datafusion::physical_plan::ExecutionPlan;
2626
use datafusion::physical_plan::aggregates::{
27-
AggregateExec, AggregateMode, PhysicalGroupBy,
27+
AggregateExec, AggregateMode, LimitOptions, PhysicalGroupBy,
2828
};
2929
use datafusion::physical_plan::execution_plan::Boundedness;
3030
use datafusion::prelude::SessionContext;
@@ -233,6 +233,7 @@ async fn agg_grouped_topk_yields(
233233
#[values(false, true)] pretend_infinite: bool,
234234
) -> Result<(), Box<dyn Error>> {
235235
// build session
236+
236237
let session_ctx = SessionContext::new();
237238

238239
// set up a top-k aggregation
@@ -260,7 +261,7 @@ async fn agg_grouped_topk_yields(
260261
inf.clone(),
261262
inf.schema(),
262263
)?
263-
.with_limit(Some(100)),
264+
.with_limit_options(Some(LimitOptions::new(100))),
264265
);
265266

266267
query_yields(aggr, session_ctx.task_ctx()).await

datafusion/core/tests/parquet/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -576,9 +576,9 @@ fn make_uint_batches(start: u8, end: u8) -> RecordBatch {
576576
Field::new("u64", DataType::UInt64, true),
577577
]));
578578
let v8: Vec<u8> = (start..end).collect();
579-
let v16: Vec<u16> = (start as _..end as _).collect();
580-
let v32: Vec<u32> = (start as _..end as _).collect();
581-
let v64: Vec<u64> = (start as _..end as _).collect();
579+
let v16: Vec<u16> = (start as u16..end as u16).collect();
580+
let v32: Vec<u32> = (start as u32..end as u32).collect();
581+
let v64: Vec<u64> = (start as u64..end as u64).collect();
582582
RecordBatch::try_new(
583583
schema,
584584
vec![

datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use datafusion_physical_optimizer::PhysicalOptimizerRule;
3737
use datafusion_physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate;
3838
use datafusion_physical_plan::ExecutionPlan;
3939
use datafusion_physical_plan::aggregates::{
40-
AggregateExec, AggregateMode, PhysicalGroupBy,
40+
AggregateExec, AggregateMode, LimitOptions, PhysicalGroupBy,
4141
};
4242
use datafusion_physical_plan::displayable;
4343
use datafusion_physical_plan::repartition::RepartitionExec;
@@ -260,7 +260,7 @@ fn aggregations_with_limit_combined() -> datafusion_common::Result<()> {
260260
schema,
261261
)
262262
.unwrap()
263-
.with_limit(Some(5)),
263+
.with_limit_options(Some(LimitOptions::new(5))),
264264
);
265265
let plan: Arc<dyn ExecutionPlan> = final_agg;
266266
// should combine the Partial/Final AggregateExecs to a Single AggregateExec

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ use datafusion_physical_plan::{
5858
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
5959
coalesce_partitions::CoalescePartitionsExec,
6060
collect,
61-
filter::FilterExec,
61+
filter::{FilterExec, FilterExecBuilder},
6262
repartition::RepartitionExec,
6363
sorts::sort::SortExec,
6464
};
@@ -480,9 +480,10 @@ fn test_filter_with_projection() {
480480
let projection = vec![1, 0];
481481
let predicate = col_lit_predicate("a", "foo", &schema());
482482
let plan = Arc::new(
483-
FilterExec::try_new(predicate, Arc::clone(&scan))
483+
FilterExecBuilder::new(predicate, Arc::clone(&scan))
484+
.apply_projection(Some(projection))
484485
.unwrap()
485-
.with_projection(Some(projection))
486+
.build()
486487
.unwrap(),
487488
);
488489

@@ -505,9 +506,10 @@ fn test_filter_with_projection() {
505506
let projection = vec![1];
506507
let predicate = col_lit_predicate("a", "foo", &schema());
507508
let plan = Arc::new(
508-
FilterExec::try_new(predicate, scan)
509+
FilterExecBuilder::new(predicate, scan)
510+
.apply_projection(Some(projection))
509511
.unwrap()
510-
.with_projection(Some(projection))
512+
.build()
511513
.unwrap(),
512514
);
513515
insta::assert_snapshot!(
@@ -564,9 +566,9 @@ fn test_pushdown_through_aggregates_on_grouping_columns() {
564566
let scan = TestScanBuilder::new(schema()).with_support(true).build();
565567

566568
let filter = Arc::new(
567-
FilterExec::try_new(col_lit_predicate("a", "foo", &schema()), scan)
568-
.unwrap()
569+
FilterExecBuilder::new(col_lit_predicate("a", "foo", &schema()), scan)
569570
.with_batch_size(10)
571+
.build()
570572
.unwrap(),
571573
);
572574

@@ -596,9 +598,9 @@ fn test_pushdown_through_aggregates_on_grouping_columns() {
596598

597599
let predicate = col_lit_predicate("b", "bar", &schema());
598600
let plan = Arc::new(
599-
FilterExec::try_new(predicate, aggregate)
600-
.unwrap()
601+
FilterExecBuilder::new(predicate, aggregate)
601602
.with_batch_size(100)
603+
.build()
602604
.unwrap(),
603605
);
604606

0 commit comments

Comments
 (0)