Skip to content

Commit 4f53d8d

Browse files
committed
lazy stats evaluation
Signed-off-by: Mikhail Kot <to@myrrc.dev>
1 parent 276db60 commit 4f53d8d

8 files changed

Lines changed: 195 additions & 185 deletions

File tree

vortex-duckdb/src/datasource.rs

Lines changed: 154 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
//! to get a blanket [`TableFunction`] implementation covering init, scan, progress, filter
88
//! pushdown, cardinality, partitioning, and virtual columns.
99
10+
use std::any::Any;
11+
use std::cmp::max;
12+
use std::collections::HashSet;
1013
use std::ffi::CString;
1114
use std::fmt::Debug;
1215
use std::sync::Arc;
@@ -26,29 +29,37 @@ use vortex::array::arrays::Struct;
2629
use vortex::array::arrays::StructArray;
2730
use vortex::array::arrays::scalar_fn::ScalarFnArrayExt;
2831
use vortex::array::optimizer::ArrayOptimizer;
32+
use vortex::array::stats::StatsSet;
2933
use vortex::dtype::DType;
3034
use vortex::dtype::FieldNames;
3135
use vortex::error::VortexExpect;
3236
use vortex::error::VortexResult;
37+
use vortex::error::vortex_bail;
3338
use vortex::error::vortex_err;
3439
use vortex::expr::Expression;
3540
use vortex::expr::and_collect;
3641
use vortex::expr::col;
3742
use vortex::expr::root;
3843
use vortex::expr::select;
3944
use vortex::expr::stats::Precision;
40-
use vortex::file::FileStatistics;
45+
use vortex::expr::stats::Stat;
46+
use vortex::file::v2::FileStatsLayoutReader;
4147
use vortex::io::kanal_ext::KanalExt;
4248
use vortex::io::runtime::BlockingRuntime;
4349
use vortex::io::runtime::current::ThreadSafeIterator;
50+
use vortex::layout::scan::multi::MultiLayoutChild;
51+
use vortex::layout::scan::multi::MultiLayoutDataSource;
4452
use vortex::metrics::tracing::get_global_labels;
53+
use vortex::scalar::Scalar;
54+
use vortex::scalar::ScalarValue;
4555
use vortex::scalar_fn::fns::pack::Pack;
56+
use vortex::scan::DataSource;
4657
use vortex::scan::DataSourceRef;
4758
use vortex::scan::ScanRequest;
48-
use vortex_utils::aliases::hash_set::HashSet;
4959

5060
use crate::RUNTIME;
5161
use crate::SESSION;
62+
use crate::convert::ToDuckDBScalar;
5263
use crate::convert::try_from_bound_expression;
5364
use crate::convert::try_from_table_filter;
5465
use crate::duckdb::BindInputRef;
@@ -82,8 +93,6 @@ use crate::exporter::ConversionCache;
8293
static EMPTY_COLUMN_IDX: u64 = 18446744073709551614;
8394
static EMPTY_COLUMN_NAME: &str = "";
8495

85-
pub type DataSourceWithStats = (DataSourceRef, Option<FileStatistics>);
86-
8796
/// A trait for table functions that resolve to a [`DataSourceRef`].
8897
///
8998
/// Implementors only need to define how parameters are declared and how binding produces a
@@ -101,23 +110,25 @@ pub(crate) trait DataSourceTableFunction: Sized + Debug {
101110
}
102111

103112
/// Bind the table function and return a [`DataSourceRef`].
104-
fn bind(ctx: &ClientContextRef, input: &BindInputRef) -> VortexResult<DataSourceWithStats>;
113+
fn bind(ctx: &ClientContextRef, input: &BindInputRef) -> VortexResult<MultiLayoutDataSource>;
105114
}
106115

107-
#[derive(Clone)]
108-
pub enum DataSourceStatistics {
109-
All(Vec<ColumnStatistics>),
110-
/// Dummy column to return a reference to
111-
None(ColumnStatistics),
112-
}
116+
/// Stats set reference for a file from FileStatistics.
117+
type FileStatRef = Arc<[StatsSet]>;
113118

114119
/// Bind data produced by a [`DataSourceTableFunction`].
115120
pub struct DataSourceBindData {
116121
data_source: DataSourceRef,
117122
filter_exprs: Vec<Expression>,
118123
column_names: Vec<String>,
119124
column_types: Vec<LogicalType>,
120-
stats: DataSourceStatistics,
125+
126+
/// Unmerged statistics for all files forming this DataSource.
127+
/// If any file is missing stats, this vector is empty.
128+
stats: Vec<FileStatRef>,
129+
/// Merged per-column statistics, aggregated for all files.
130+
//stats_cache: Arc<HashMap<usize, ColumnStatisticsAggregate>>,
131+
column_dtypes: Vec<DType>,
121132
}
122133

123134
impl Clone for DataSourceBindData {
@@ -128,6 +139,7 @@ impl Clone for DataSourceBindData {
128139
filter_exprs: vec![],
129140
column_names: self.column_names.clone(),
130141
column_types: self.column_types.clone(),
142+
column_dtypes: self.column_dtypes.clone(),
131143
stats: self.stats.clone(),
132144
}
133145
}
@@ -176,6 +188,84 @@ fn progress(bytes_read: &AtomicU64, bytes_total: &AtomicU64) -> f64 {
176188
read as f64 / total as f64 * 100.
177189
}
178190

191+
impl ColumnStatistics {
192+
fn from(stats: &ColumnStatisticsAggregate, dtype: DType) -> Self {
193+
let min = stats.min.as_ref().map(|value| {
194+
Scalar::try_new(dtype.clone(), Some(value.clone()))
195+
.vortex_expect("scalar dtype and value are incompatible")
196+
.try_to_duckdb_scalar()
197+
.vortex_expect("can't convert Scalar to duckdb Value")
198+
});
199+
200+
let max = stats.max.as_ref().map(|value| {
201+
Scalar::try_new(dtype.clone(), Some(value.clone()))
202+
.vortex_expect("scalar dtype and value are incompatible")
203+
.try_to_duckdb_scalar()
204+
.vortex_expect("can't convert Scalar to duckdb Value")
205+
});
206+
207+
let max_string_length = stats
208+
.max_string_length
209+
.map_or(0, |len| (1u64 << 63) | (len as u64));
210+
211+
Self {
212+
min,
213+
max,
214+
max_string_length,
215+
}
216+
}
217+
}
218+
219+
#[derive(Default)]
220+
pub struct ColumnStatisticsAggregate {
221+
pub min: Option<ScalarValue>,
222+
pub max: Option<ScalarValue>,
223+
pub max_string_length: Option<u32>,
224+
}
225+
226+
impl ColumnStatisticsAggregate {
227+
pub fn new(stats: &StatsSet) -> Self {
228+
let min = match stats.get(Stat::Min) {
229+
Some(Precision::Exact(min)) => Some(min),
230+
_ => None,
231+
};
232+
let max = match stats.get(Stat::Max) {
233+
Some(Precision::Exact(max)) => Some(max),
234+
_ => None,
235+
};
236+
237+
let max_string_length =
238+
if let Some(Precision::Exact(value)) = stats.get(Stat::UncompressedSizeInBytes) {
239+
// DuckDB's string length is u32
240+
#[allow(clippy::cast_possible_truncation)]
241+
Some(value.as_primitive().as_u64().vortex_expect("not a u64") as u32)
242+
} else {
243+
None
244+
};
245+
246+
Self {
247+
min,
248+
max,
249+
max_string_length,
250+
}
251+
}
252+
253+
pub fn merge(&mut self, mut other: Self) {
254+
self.min = match (self.min.take(), other.min.take()) {
255+
(Some(left), Some(right)) => Some(if left.lt(&right) { left } else { right }),
256+
_ => None,
257+
};
258+
self.max = match (self.max.take(), other.max.take()) {
259+
(Some(left), Some(right)) => Some(if left.gt(&right) { left } else { right }),
260+
_ => None,
261+
};
262+
self.max_string_length = match (self.max_string_length, other.max_string_length) {
263+
(Some(left), Some(right)) => Some(max(left, right)),
264+
_ => None,
265+
};
266+
}
267+
}
268+
179269
// ---------------------------------------------------------------------------
180270
// Blanket TableFunction implementation for any DataSourceTableFunction
181271
// ---------------------------------------------------------------------------
@@ -202,38 +292,36 @@ impl<T: DataSourceTableFunction> TableFunction for T {
202292
input: &BindInputRef,
203293
result: &mut BindResultRef,
204294
) -> VortexResult<Self::BindData> {
205-
let (data_source, file_stats) = T::bind(ctx, input)?;
206-
let (column_names, column_types) = extract_schema_from_dtype(data_source.dtype())?;
207-
208-
if file_stats.is_none() {
209-
for i in 0..column_names.len() {
210-
result.add_result_column(&column_names[i], &column_types[i]);
211-
}
212-
213-
let stats = DataSourceStatistics::None(ColumnStatistics::default());
214-
return Ok(DataSourceBindData {
215-
data_source,
216-
filter_exprs: vec![],
217-
column_names,
218-
column_types,
219-
stats,
220-
});
295+
let data_source = T::bind(ctx, input)?;
296+
let (column_names, column_types, column_dtypes) =
297+
extract_schema_from_dtype(data_source.dtype())?;
298+
for (column_name, column_type) in column_names.iter().zip(&column_types) {
299+
result.add_result_column(column_name, column_type);
221300
}
222-
let file_stats = file_stats.vortex_expect("no stats");
223301

224-
let mut stats = Vec::with_capacity(column_names.len());
225-
for i in 0..column_names.len() {
226-
result.add_result_column(&column_names[i], &column_types[i]);
227-
let (stats_set, dtype) = file_stats.get(i);
228-
stats.push(ColumnStatistics::new(stats_set, dtype.clone()));
302+
let len = column_names.len();
303+
let mut stats = Vec::with_capacity(len);
304+
for child in &data_source.children {
305+
let MultiLayoutChild::Opened(reader) = child else {
306+
vortex_bail!("Got deferred file in table function bind");
307+
};
308+
match (reader as &dyn Any).downcast_ref::<FileStatsLayoutReader>() {
309+
Some(reader_inner) => {
310+
stats.push(reader_inner.file_stats.stats_sets().clone());
311+
}
312+
None => {
313+
stats.clear();
314+
break;
315+
}
316+
}
229317
}
230-
let stats = DataSourceStatistics::All(stats);
231318

232319
Ok(DataSourceBindData {
233-
data_source,
320+
data_source: Arc::new(data_source) as DataSourceRef,
234321
filter_exprs: vec![],
235322
column_names,
236323
column_types,
324+
column_dtypes,
237325
stats,
238326
})
239327
}
@@ -445,15 +533,29 @@ impl<T: DataSourceTableFunction> TableFunction for T {
445533
Ok(false)
446534
}
447535

448-
fn statistics<'a>(
449-
_client_context: &ClientContextRef,
450-
bind_data: &'a Self::BindData,
536+
fn statistics(
537+
client_context: &ClientContextRef,
538+
bind_data: &Self::BindData,
451539
column_index: usize,
452-
) -> &'a ColumnStatistics {
453-
match &bind_data.stats {
454-
DataSourceStatistics::All(items) => &items[column_index],
455-
DataSourceStatistics::None(dummy) => dummy,
540+
) -> ColumnStatistics {
541+
let cache = client_context.object_cache();
542+
// TODO(myrrc) cursed
543+
let key = format!("{:p}-{}", std::ptr::addr_of!(bind_data), column_index);
544+
if let Some(value) = cache.get(&key) {
545+
return ColumnStatistics::from(value, bind_data.column_dtypes[column_index].clone());
456546
}
547+
548+
if bind_data.stats.is_empty() {
549+
let stats = ColumnStatisticsAggregate::default();
550+
return ColumnStatistics::from(&stats, bind_data.column_dtypes[column_index].clone());
551+
};
552+
let mut stats = ColumnStatisticsAggregate::new(&bind_data.stats[0][column_index]);
553+
for file_stats in &bind_data.stats[1..] {
554+
stats.merge(ColumnStatisticsAggregate::new(&file_stats[column_index]));
555+
}
556+
let stats = cache.put(&key, stats);
557+
let stats = unsafe { &*stats };
558+
ColumnStatistics::from(stats, bind_data.column_dtypes[column_index].clone())
457559
}
458560

459561
fn cardinality(bind_data: &Self::BindData) -> Cardinality {
@@ -497,21 +599,26 @@ impl<T: DataSourceTableFunction> TableFunction for T {
497599
// ---------------------------------------------------------------------------
498600

499601
/// Extracts DuckDB column names and logical types from a Vortex struct DType.
500-
fn extract_schema_from_dtype(dtype: &DType) -> VortexResult<(Vec<String>, Vec<LogicalType>)> {
602+
fn extract_schema_from_dtype(
603+
dtype: &DType,
604+
) -> VortexResult<(Vec<String>, Vec<LogicalType>, Vec<DType>)> {
501605
let struct_dtype = dtype
502606
.as_struct_fields_opt()
503607
.ok_or_else(|| vortex_err!("Vortex file must contain a struct array at the top level"))?;
504608

505-
let mut column_names = Vec::new();
506-
let mut column_types = Vec::new();
609+
let len = struct_dtype.names().len();
610+
let mut column_names = Vec::with_capacity(len);
611+
let mut column_types = Vec::with_capacity(len);
612+
let mut column_dtypes = Vec::with_capacity(len);
507613

508614
for (field_name, field_dtype) in struct_dtype.names().iter().zip(struct_dtype.fields()) {
509615
let logical_type = LogicalType::try_from(&field_dtype)?;
510616
column_names.push(field_name.to_string());
511617
column_types.push(logical_type);
618+
column_dtypes.push(field_dtype);
512619
}
513620

514-
Ok((column_names, column_types))
621+
Ok((column_names, column_types, column_dtypes))
515622
}
516623

517624
/// Creates a projection expression from raw projection/column ID slices and column names.

0 commit comments

Comments
 (0)