Skip to content

Commit 276db60

Browse files
committed
initial
Signed-off-by: Mikhail Kot <to@myrrc.dev>
1 parent 4a5b7d7 commit 276db60

16 files changed

Lines changed: 437 additions & 39 deletions

File tree

benchmarks/datafusion-bench/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ async fn register_v2_tables<B: Benchmark + ?Sized>(
316316
None => format!("*.{}", format.ext()),
317317
};
318318

319-
let multi_ds = MultiFileDataSource::new(SESSION.clone())
319+
let (multi_ds, _) = MultiFileDataSource::new(SESSION.clone())
320320
.with_glob(glob_pattern, Some(fs))
321321
.build()
322322
.await?;

vortex-array/src/stats/stats_set.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -459,11 +459,28 @@ impl MutTypedStatsSetRef<'_, '_> {
459459
) {
460460
(Some(m1), Some(m2)) => {
461461
// If the combine sum is exact, then we can sum them.
462-
if let Some(scalar_value) = m1.zip(m2).as_exact().and_then(|(s1, s2)| {
463-
s1.as_primitive()
464-
.checked_add(&s2.as_primitive())
465-
.and_then(|pscalar| pscalar.pvalue().map(ScalarValue::Primitive))
466-
}) {
462+
if let Some(scalar_value) =
463+
m1.zip(m2).as_exact().and_then(|(s1, s2)| match s1.dtype() {
464+
DType::Primitive(..) => s1
465+
.as_primitive()
466+
.checked_add(&s2.as_primitive())
467+
.and_then(|pscalar| pscalar.pvalue().map(ScalarValue::Primitive)),
468+
DType::Decimal(..) => s1
469+
.as_decimal()
470+
.checked_binary_numeric(
471+
&s2.as_decimal(),
472+
crate::scalar::NumericOperator::Add,
473+
)
474+
.map(|scalar| {
475+
ScalarValue::Decimal(
476+
scalar
477+
.decimal_value()
478+
.vortex_expect("no decimal value in scalar"),
479+
)
480+
}),
481+
_ => None,
482+
})
483+
{
467484
self.set(Stat::Sum, Precision::Exact(scalar_value));
468485
}
469486
}

vortex-duckdb/cpp/include/duckdb_vx/table_function.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,15 @@ typedef struct {
9797
bool has_max_cardinality;
9898
} duckdb_vx_node_statistics;
9999

100+
typedef struct {
101+
duckdb_value min;
102+
duckdb_value max;
103+
// upper bit: "length is set". lower 32 bits: DuckDB's max string length.
104+
uint64_t max_string_length;
105+
} duckdb_column_statistics;
106+
107+
typedef idx_t column_t;
108+
100109
// A transparent DuckDB table function vtable, which can be used to configure a table function.
101110
// See duckdb/include/function/tfunc.hpp for details on each field.
102111
typedef struct {
@@ -137,7 +146,11 @@ typedef struct {
137146

138147
// void *in_out_function;
139148
// void *in_out_function_final;
140-
void *statistics;
149+
150+
void (*statistics)(duckdb_client_context context,
151+
const void *bind_data,
152+
size_t column_index,
153+
duckdb_column_statistics *stats_out);
141154

142155
// void *dependency;
143156
void (*cardinality)(void *bind_data, duckdb_vx_node_statistics *node_stats_out);

vortex-duckdb/cpp/table_function.cpp

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
#include "duckdb_vx/table_function.h"
45
#include "duckdb_vx/duckdb_diagnostics.h"
56

67
DUCKDB_INCLUDES_BEGIN
@@ -30,8 +31,10 @@ struct CTableFunctionInfo final : TableFunctionInfo {
3031
};
3132

3233
struct CTableBindData final : TableFunctionData {
33-
CTableBindData(unique_ptr<CTableFunctionInfo> info_p, unique_ptr<vortex::CData> ffi_data_p)
34-
: info(std::move(info_p)), ffi_data(std::move(ffi_data_p)) {
34+
CTableBindData(unique_ptr<CTableFunctionInfo> info_p,
35+
unique_ptr<vortex::CData> ffi_data_p,
36+
const vector<LogicalType> &types)
37+
: info(std::move(info_p)), ffi_data(std::move(ffi_data_p)), types(types) {
3538
}
3639

3740
unique_ptr<FunctionData> Copy() const override {
@@ -43,11 +46,13 @@ struct CTableBindData final : TableFunctionData {
4346
throw BinderException(IntoErrString(error_out));
4447
}
4548
return make_uniq<CTableBindData>(make_uniq<CTableFunctionInfo>(info->vtab),
46-
unique_ptr<CData>(reinterpret_cast<CData *>(copied_ffi_data)));
49+
unique_ptr<CData>(reinterpret_cast<CData *>(copied_ffi_data)),
50+
types);
4751
}
4852

4953
unique_ptr<CTableFunctionInfo> info;
5054
unique_ptr<CData> ffi_data;
55+
vector<LogicalType> types;
5156
};
5257

5358
struct CTableGlobalData final : GlobalTableFunctionState {
@@ -88,6 +93,79 @@ double c_table_scan_progress(ClientContext &context,
8893
return bind.info->vtab.table_scan_progress(c_ctx, c_bind_data, c_global_state);
8994
}
9095

96+
static Value &UnwrapValue(duckdb_value value) {
97+
return *(reinterpret_cast<Value *>(value));
98+
}
99+
100+
unique_ptr<BaseStatistics> numeric_stats(duckdb_column_statistics &stats, LogicalType type) {
101+
BaseStatistics out = StringStats::CreateUnknown(type);
102+
if (stats.min) {
103+
NumericStats::SetMin(out, UnwrapValue(stats.min));
104+
duckdb_destroy_value(&stats.min);
105+
}
106+
if (stats.max) {
107+
NumericStats::SetMax(out, UnwrapValue(stats.max));
108+
duckdb_destroy_value(&stats.max);
109+
}
110+
return out.ToUnique();
111+
}
112+
113+
unique_ptr<BaseStatistics> string_stats(duckdb_column_statistics &stats, LogicalType type) {
114+
BaseStatistics out = StringStats::CreateUnknown(type);
115+
if (stats.min) {
116+
StringStats::SetMin(out, StringValue::Get(UnwrapValue(stats.min)));
117+
duckdb_destroy_value(&stats.min);
118+
}
119+
if (stats.max) {
120+
StringStats::SetMax(out, StringValue::Get(UnwrapValue(stats.max)));
121+
duckdb_destroy_value(&stats.max);
122+
}
123+
if (stats.max_string_length >> 63) {
124+
StringStats::SetMaxStringLength(out, uint32_t(stats.max_string_length));
125+
}
126+
return out.ToUnique();
127+
}
128+
129+
unique_ptr<BaseStatistics>
130+
c_statistics(ClientContext &context, const FunctionData *bind_data, column_t column_index) {
131+
if (column_index == COLUMN_IDENTIFIER_EMPTY) {
132+
return BaseStatistics::CreateUnknown(LogicalTypeId::INVALID).ToUnique();
133+
}
134+
135+
const auto &bind = bind_data->Cast<CTableBindData>();
136+
void *const ffi_bind = bind.ffi_data->DataPtr();
137+
138+
duckdb_client_context c_ctx = reinterpret_cast<duckdb_client_context>(&context);
139+
duckdb_column_statistics statistics = {};
140+
const LogicalType type = bind.types[column_index];
141+
142+
switch (type.id()) {
143+
case LogicalTypeId::BOOLEAN:
144+
case LogicalTypeId::TINYINT:
145+
case LogicalTypeId::SMALLINT:
146+
case LogicalTypeId::INTEGER:
147+
case LogicalTypeId::BIGINT:
148+
case LogicalTypeId::FLOAT:
149+
case LogicalTypeId::DOUBLE:
150+
case LogicalTypeId::UTINYINT:
151+
case LogicalTypeId::USMALLINT:
152+
case LogicalTypeId::UINTEGER:
153+
case LogicalTypeId::UBIGINT:
154+
case LogicalTypeId::UHUGEINT:
155+
case LogicalTypeId::HUGEINT: {
156+
bind.info->vtab.statistics(c_ctx, ffi_bind, column_index, &statistics);
157+
return numeric_stats(statistics, type);
158+
}
159+
case LogicalTypeId::VARCHAR:
160+
case LogicalTypeId::BLOB: {
161+
bind.info->vtab.statistics(c_ctx, ffi_bind, column_index, &statistics);
162+
return string_stats(statistics, type);
163+
}
164+
default:
165+
return BaseStatistics::CreateUnknown(type).ToUnique();
166+
}
167+
}
168+
91169
unique_ptr<FunctionData> c_bind(ClientContext &context,
92170
TableFunctionBindInput &input,
93171
vector<LogicalType> &return_types,
@@ -111,7 +189,8 @@ unique_ptr<FunctionData> c_bind(ClientContext &context,
111189
}
112190

113191
return make_uniq<CTableBindData>(make_uniq<CTableFunctionInfo>(info.vtab),
114-
unique_ptr<CData>(reinterpret_cast<CData *>(ffi_bind_data)));
192+
unique_ptr<CData>(reinterpret_cast<CData *>(ffi_bind_data)),
193+
return_types);
115194
}
116195

117196
unique_ptr<GlobalTableFunctionState> c_init_global(ClientContext &context, TableFunctionInitInput &input) {
@@ -363,6 +442,7 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d
363442
tf.get_virtual_columns = c_get_virtual_columns;
364443
tf.to_string = c_to_string;
365444
tf.table_scan_progress = c_table_scan_progress;
445+
tf.statistics = c_statistics;
366446

367447
// Set up the parameters
368448
tf.arguments.reserve(vtab->parameter_count);

vortex-duckdb/src/datasource.rs

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use vortex::expr::col;
3737
use vortex::expr::root;
3838
use vortex::expr::select;
3939
use vortex::expr::stats::Precision;
40+
use vortex::file::FileStatistics;
4041
use vortex::io::kanal_ext::KanalExt;
4142
use vortex::io::runtime::BlockingRuntime;
4243
use vortex::io::runtime::current::ThreadSafeIterator;
@@ -54,6 +55,7 @@ use crate::duckdb::BindInputRef;
5455
use crate::duckdb::BindResultRef;
5556
use crate::duckdb::Cardinality;
5657
use crate::duckdb::ClientContextRef;
58+
use crate::duckdb::ColumnStatistics;
5759
use crate::duckdb::DataChunkRef;
5860
use crate::duckdb::ExpressionRef;
5961
use crate::duckdb::LogicalType;
@@ -80,6 +82,8 @@ use crate::exporter::ConversionCache;
8082
static EMPTY_COLUMN_IDX: u64 = 18446744073709551614;
8183
static EMPTY_COLUMN_NAME: &str = "";
8284

85+
pub type DataSourceWithStats = (DataSourceRef, Option<FileStatistics>);
86+
8387
/// A trait for table functions that resolve to a [`DataSourceRef`].
8488
///
8589
/// Implementors only need to define how parameters are declared and how binding produces a
@@ -97,7 +101,14 @@ pub(crate) trait DataSourceTableFunction: Sized + Debug {
97101
}
98102

99103
/// Bind the table function and return a [`DataSourceRef`].
100-
fn bind(ctx: &ClientContextRef, input: &BindInputRef) -> VortexResult<DataSourceRef>;
104+
fn bind(ctx: &ClientContextRef, input: &BindInputRef) -> VortexResult<DataSourceWithStats>;
105+
}
106+
107+
#[derive(Clone)]
108+
pub enum DataSourceStatistics {
109+
All(Vec<ColumnStatistics>),
110+
/// Dummy column to return a reference to
111+
None(ColumnStatistics),
101112
}
102113

103114
/// Bind data produced by a [`DataSourceTableFunction`].
@@ -106,6 +117,7 @@ pub struct DataSourceBindData {
106117
filter_exprs: Vec<Expression>,
107118
column_names: Vec<String>,
108119
column_types: Vec<LogicalType>,
120+
stats: DataSourceStatistics,
109121
}
110122

111123
impl Clone for DataSourceBindData {
@@ -116,6 +128,7 @@ impl Clone for DataSourceBindData {
116128
filter_exprs: vec![],
117129
column_names: self.column_names.clone(),
118130
column_types: self.column_types.clone(),
131+
stats: self.stats.clone(),
119132
}
120133
}
121134
}
@@ -189,19 +202,39 @@ impl<T: DataSourceTableFunction> TableFunction for T {
189202
input: &BindInputRef,
190203
result: &mut BindResultRef,
191204
) -> VortexResult<Self::BindData> {
192-
let data_source = T::bind(ctx, input)?;
193-
205+
let (data_source, file_stats) = T::bind(ctx, input)?;
194206
let (column_names, column_types) = extract_schema_from_dtype(data_source.dtype())?;
195207

196-
for (column_name, column_type) in column_names.iter().zip(&column_types) {
197-
result.add_result_column(column_name, column_type);
208+
if file_stats.is_none() {
209+
for i in 0..column_names.len() {
210+
result.add_result_column(&column_names[i], &column_types[i]);
211+
}
212+
213+
let stats = DataSourceStatistics::None(ColumnStatistics::default());
214+
return Ok(DataSourceBindData {
215+
data_source,
216+
filter_exprs: vec![],
217+
column_names,
218+
column_types,
219+
stats,
220+
});
198221
}
222+
let file_stats = file_stats.vortex_expect("no stats");
223+
224+
let mut stats = Vec::with_capacity(column_names.len());
225+
for i in 0..column_names.len() {
226+
result.add_result_column(&column_names[i], &column_types[i]);
227+
let (stats_set, dtype) = file_stats.get(i);
228+
stats.push(ColumnStatistics::new(stats_set, dtype.clone()));
229+
}
230+
let stats = DataSourceStatistics::All(stats);
199231

200232
Ok(DataSourceBindData {
201233
data_source,
202234
filter_exprs: vec![],
203235
column_names,
204236
column_types,
237+
stats,
205238
})
206239
}
207240

@@ -412,6 +445,17 @@ impl<T: DataSourceTableFunction> TableFunction for T {
412445
Ok(false)
413446
}
414447

448+
fn statistics<'a>(
449+
_client_context: &ClientContextRef,
450+
bind_data: &'a Self::BindData,
451+
column_index: usize,
452+
) -> &'a ColumnStatistics {
453+
match &bind_data.stats {
454+
DataSourceStatistics::All(items) => &items[column_index],
455+
DataSourceStatistics::None(dummy) => dummy,
456+
}
457+
}
458+
415459
fn cardinality(bind_data: &Self::BindData) -> Cardinality {
416460
match bind_data.data_source.row_count() {
417461
Some(Precision::Exact(v)) => Cardinality::Maximum(v),

0 commit comments

Comments
 (0)