Skip to content

Commit fc1bd09

Browse files
committed
fix
Signed-off-by: Mikhail Kot <to@myrrc.dev>
1 parent c59e16f commit fc1bd09

3 files changed

Lines changed: 75 additions & 18 deletions

File tree

vortex-duckdb/cpp/include/duckdb_vx/table_function.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ void duckdb_vx_string_map_insert(duckdb_vx_string_map map, const char *key, cons
4242

4343
// Input data passed into the init_global and init_local callbacks.
4444
typedef struct {
45-
const void *bind_data;
45+
void *bind_data;
4646

4747
/**
4848
* Projected columns that are requested to be read. These are not

vortex-duckdb/src/datasource.rs

Lines changed: 71 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
//! to get a blanket [`TableFunction`] implementation covering init, scan, progress, filter
88
//! pushdown, cardinality, and partitioning.
99
10+
use std::cmp::max;
1011
use std::fmt::Debug;
1112
use std::ops::Range;
1213
use std::sync::Arc;
@@ -51,6 +52,8 @@ use vortex::layout::scan::multi::MultiLayoutDataSource;
5152
use vortex::metrics::tracing::get_global_labels;
5253
use vortex::scalar::Scalar;
5354
use vortex::scalar::ScalarValue;
55+
use vortex::scalar_fn::fns::binary::Binary;
56+
use vortex::scalar_fn::fns::operators::Operator;
5457
use vortex::scalar_fn::fns::pack::Pack;
5558
use vortex::scan::DataSource;
5659
use vortex::scan::ScanRequest;
@@ -74,6 +77,7 @@ use crate::duckdb::DuckdbStringMapRef;
7477
use crate::duckdb::ExpressionRef;
7578
use crate::duckdb::LogicalType;
7679
use crate::duckdb::PartitionData;
80+
use crate::duckdb::TableFilterClass;
7781
use crate::duckdb::TableFilterSetRef;
7882
use crate::duckdb::TableFunction;
7983
use crate::duckdb::TableInitInput;
@@ -118,6 +122,9 @@ pub struct DataSourceBindData {
118122
data_source: Arc<MultiLayoutDataSource>,
119123
filter_exprs: Vec<Expression>,
120124
column_fields: Vec<DuckdbField>,
125+
// There exists at least one non-optional table filter or at least one
126+
// complex filter is pushed down.
127+
has_non_optional_filter: bool,
121128
}
122129

123130
impl Clone for DataSourceBindData {
@@ -127,6 +134,7 @@ impl Clone for DataSourceBindData {
127134
// filter_exprs are consumed once in `init_global`.
128135
filter_exprs: vec![],
129136
column_fields: self.column_fields.clone(),
137+
has_non_optional_filter: self.has_non_optional_filter,
130138
}
131139
}
132140
}
@@ -252,6 +260,26 @@ impl ColumnStatisticsAggregate {
252260
}
253261
}
254262

263+
// Duckdb requires post-filter cardinality estimates, otherwise join
264+
// planner may flip join sides which is a huge regression for some
265+
// queries i.e. 1000x for tpcds 85.
266+
//
267+
// See duckdb/src/optimizer/join_order/relation_statistics_helper.cpp
268+
// As we don't report distinct values (same as Parquet), the only heuristic
269+
// duckdb uses is a 0.2 filter if there is any non-optional filter. We mimic
270+
// it here.
271+
const DEFAULT_SELECTIVITY: f64 = 0.2;
272+
fn postfilter_cardinality(initial_cardinality: u64, has_non_optional_filter: bool) -> u64 {
273+
if has_non_optional_filter {
274+
let post_cardinality = initial_cardinality as f64 * DEFAULT_SELECTIVITY;
275+
// Clamp intentionally
276+
let post_cardinality: u64 = post_cardinality.as_();
277+
max(1, post_cardinality)
278+
} else {
279+
initial_cardinality
280+
}
281+
}
282+
255283
impl<T: DataSourceTableFunction> TableFunction for T {
256284
type BindData = DataSourceBindData;
257285
type GlobalState = DataSourceGlobal;
@@ -275,6 +303,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
275303
data_source: Arc::new(data_source),
276304
filter_exprs: vec![],
277305
column_fields,
306+
has_non_optional_filter: false,
278307
})
279308
}
280309

@@ -297,13 +326,15 @@ impl<T: DataSourceTableFunction> TableFunction for T {
297326
row_range,
298327
file_selection,
299328
file_range,
329+
has_non_optional_filter,
300330
} = extract_table_filter_expr(
301331
init_input.table_filter_set(),
302332
column_ids,
303333
&bind_data.column_fields,
304334
&bind_data.filter_exprs,
305335
bind_data.data_source.dtype(),
306336
)?;
337+
bind_data.has_non_optional_filter |= has_non_optional_filter;
307338

308339
debug!(
309340
%projection,
@@ -502,22 +533,39 @@ impl<T: DataSourceTableFunction> TableFunction for T {
502533
expr: &ExpressionRef,
503534
) -> VortexResult<bool> {
504535
debug!(%expr, "pushing down expression");
536+
505537
let Some(expr) = try_from_bound_expression(expr)? else {
506538
debug!(%expr, "failed to push down expression");
507539
return Ok(false);
508540
};
509-
debug!(%expr, "pushed down expression");
510-
bind_data.filter_exprs.push(expr);
511541

512-
// NOTE(ngates): Vortex does indeed run exact filters, so in theory we should return `true`
513-
// here to tell DuckDB we've handled the filter. However, DuckDB applies some crude
514-
// cardinality estimation heuristics (e.g. an equality filter => 20% selectivity) that
515-
// means by returning false, DuckDB runs an additional filter (a little bit of overhead)
516-
// but tends to end up with a better query plan.
517-
// If we plumb row count estimation into the layout tree, perhaps we could use zone maps
518-
// etc. to return estimates. But this function is probably called too late anyway. Maybe
519-
// we need our own cardinality heuristics.
520-
Ok(false)
542+
// Duckdb calls pushdown_complex_filter during planning phase.
543+
// If all filters are pushed down, duckdb enables a LEFT_DELIM_JOIN ->
544+
// COMPARISON_JOIN (HASH_JOIN) optimization:
545+
// duckdb/src/optimizer/deliminator.cpp: Deliminator::HasSelection,
546+
// Deliminator::Optimize.
547+
//
548+
// This leads to a massive regression on tpch sf=10 q17 and other
549+
// benchmarks.
550+
//
551+
// This bug is reported to Duckdb
552+
// https://github.com/duckdb/duckdb/issues/22669
553+
//
554+
// As a hack, report equality filters as not pushed.
555+
// We can also report only the first filter as not pushed, but this
556+
// has a negative performance impact.
557+
let report_pushed = !expr
558+
.as_opt::<Binary>()
559+
.map(|op| *op == Operator::Eq)
560+
.unwrap_or(false);
561+
562+
// Only table filters may be optional, any complex filter is
563+
// non-optional by definition.
564+
bind_data.has_non_optional_filter = true;
565+
566+
debug!(%expr, report_pushed, "pushed down expression");
567+
bind_data.filter_exprs.push(expr);
568+
Ok(report_pushed)
521569
}
522570

523571
/// Get column-wise statistics. Available only if we're reading a single
@@ -545,8 +593,10 @@ impl<T: DataSourceTableFunction> TableFunction for T {
545593

546594
fn cardinality(bind_data: &Self::BindData) -> Cardinality {
547595
match bind_data.data_source.row_count() {
548-
Some(Precision::Exact(v)) => Cardinality::Maximum(v),
549-
Some(Precision::Inexact(v)) => Cardinality::Estimate(v),
596+
Some(Precision::Exact(v) | Precision::Inexact(v)) => {
597+
// Post-filter estimate is always a heuristic.
598+
Cardinality::Estimate(postfilter_cardinality(v, bind_data.has_non_optional_filter))
599+
}
550600
None => Cardinality::Unknown,
551601
}
552602
}
@@ -565,8 +615,8 @@ impl<T: DataSourceTableFunction> TableFunction for T {
565615
fn to_string(bind_data: &Self::BindData, map: &mut DuckdbStringMapRef) {
566616
map.push("Function", "Vortex Scan");
567617
if !bind_data.filter_exprs.is_empty() {
568-
let mut filters = bind_data.filter_exprs.iter().map(|f| format!("{}", f));
569-
map.push("Filters", &filters.join(" /\\\n"));
618+
let mut filters = bind_data.filter_exprs.iter().map(|f| format!("{f}"));
619+
map.push("Filters", &filters.join("\n"));
570620
}
571621
}
572622
}
@@ -687,6 +737,7 @@ struct FilterWithVirtualColumns {
687737
row_range: Option<Range<u64>>,
688738
file_selection: Selection,
689739
file_range: Option<Range<u64>>,
740+
has_non_optional_filter: bool,
690741
}
691742

692743
/// Creates a table filter expression, row selection, and row range from the table filter set,
@@ -698,6 +749,8 @@ fn extract_table_filter_expr(
698749
additional_filters: &[Expression],
699750
dtype: &DType,
700751
) -> VortexResult<FilterWithVirtualColumns> {
752+
let mut has_non_optional_filter = false;
753+
701754
let mut table_filter_exprs: HashSet<Expression> = if let Some(filter) = table_filter_set {
702755
filter
703756
.into_iter()
@@ -706,6 +759,8 @@ fn extract_table_filter_expr(
706759
!is_virtual_column(column_ids[idx_u])
707760
})
708761
.map(|(idx, ex)| {
762+
has_non_optional_filter |= !matches!(ex.as_class(), TableFilterClass::Optional(_));
763+
709764
let idx_u: usize = idx.as_();
710765
let col_idx: usize = column_ids[idx_u].as_();
711766
let name = &column_fields.get(col_idx).vortex_expect("exists").name;
@@ -741,6 +796,7 @@ fn extract_table_filter_expr(
741796
row_range,
742797
file_selection,
743798
file_range,
799+
has_non_optional_filter,
744800
};
745801
Ok(out)
746802
}

vortex-duckdb/src/duckdb/table_function/init.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,10 @@ impl<'a, T: TableFunction> TableInitInput<'a, T> {
7474
}
7575
}
7676

77+
#[allow(clippy::mut_from_ref)]
7778
/// Returns the bind data for the table function.
78-
pub fn bind_data(&self) -> &T::BindData {
79-
unsafe { &*self.input.bind_data.cast::<T::BindData>() }
79+
pub fn bind_data(&self) -> &mut T::BindData {
80+
unsafe { &mut *self.input.bind_data.cast::<T::BindData>() }
8081
}
8182

8283
pub fn column_ids(&self) -> &[u64] {

0 commit comments

Comments
 (0)