Skip to content

Commit 7984974

Browse files
committed
Follow ups
Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent a4049b2 commit 7984974

15 files changed

Lines changed: 142 additions & 159 deletions

File tree

java/vortex-jni/src/main/java/dev/vortex/api/DataSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public Schema arrowSchema(BufferAllocator allocator) {
8080
}
8181

8282
/**
83-
* Row count along with the precision of that estimate. Mirrors the Rust {@code Option<Precision<u64>>} returned by
83+
* Row count along with the precision of that estimate. Mirrors the Rust {@code Precision<u64>} returned by
8484
* {@code DataSource::row_count}: {@link RowCount.Unknown} when no estimate is available, {@link RowCount.Estimate}
8585
* for an inexact hint, {@link RowCount.Exact} when the count is authoritative.
8686
*/

vortex-array/src/arrays/primitive/compute/cast.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use crate::dtype::DType;
2222
use crate::dtype::NativePType;
2323
use crate::dtype::Nullability;
2424
use crate::dtype::PType;
25-
use crate::expr::stats::Precision;
2625
use crate::expr::stats::Stat;
2726
use crate::expr::stats::StatsProvider;
2827
use crate::match_each_native_ptype;
@@ -224,8 +223,8 @@ fn values_fit_in(
224223
/// stats cache, otherwise `None`.
225224
fn cached_values_fit_in(array: ArrayView<'_, Primitive>, target_dtype: &DType) -> Option<bool> {
226225
let stats = array.array().statistics();
227-
let min = stats.get(Stat::Min).and_then(Precision::as_exact)?;
228-
let max = stats.get(Stat::Max).and_then(Precision::as_exact)?;
226+
let min = stats.get(Stat::Min).as_exact()?;
227+
let max = stats.get(Stat::Max).as_exact()?;
229228
Some(min.cast(target_dtype).is_ok() && max.cast(target_dtype).is_ok())
230229
}
231230

vortex-array/src/expr/stats/precision.rs

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,6 @@ pub enum Precision<T> {
3030
Absent,
3131
}
3232

33-
impl<T> Precision<Option<T>> {
34-
/// Transpose the `Precision<Option<T>>` into `Option<Precision<T>>`.
35-
pub fn transpose(self) -> Option<Precision<T>> {
36-
use Precision::*;
37-
38-
match self {
39-
Exact(Some(x)) => Some(Exact(x)),
40-
Inexact(Some(x)) => Some(Inexact(x)),
41-
Absent => Some(Absent),
42-
Exact(None) | Inexact(None) => None,
43-
}
44-
}
45-
}
46-
4733
impl<T, E> Precision<Result<T, E>> {
4834
/// Transpose a `Precision<Result<T, E>>` into a `Result<Precision<T>, E>`.
4935
pub fn transpose(self) -> Result<Precision<T>, E> {
@@ -138,6 +124,22 @@ impl<T> Precision<T> {
138124
}
139125
}
140126

127+
pub fn and_then<U, F: FnOnce(T) -> Option<U>>(self, f: F) -> Precision<U> {
128+
use Precision::*;
129+
130+
match self {
131+
Exact(value) => match f(value) {
132+
Some(v) => Exact(v),
133+
None => Absent,
134+
},
135+
Inexact(value) => match f(value) {
136+
Some(v) => Inexact(v),
137+
None => Absent,
138+
},
139+
Absent => Absent,
140+
}
141+
}
142+
141143
/// Zip two `Precision` values into a tuple, keeping the inexactness if any.
142144
pub fn zip<U>(self, other: Precision<U>) -> Precision<(T, U)> {
143145
use Precision::*;
@@ -232,10 +234,15 @@ impl<T: PartialOrd + Clone> StatBound<T> for Precision<T> {
232234
}
233235

234236
fn union(&self, other: &Self) -> Option<Self> {
235-
self.clone()
237+
match self
238+
.clone()
236239
.zip(other.clone())
237240
.map(|(lhs, rhs)| partial_min(&lhs, &rhs).cloned())
238-
.transpose()
241+
{
242+
Precision::Exact(v) => Some(Precision::Exact(v?)),
243+
Precision::Inexact(v) => Some(Precision::Inexact(v?)),
244+
Precision::Absent => None,
245+
}
239246
}
240247

241248
fn intersection(&self, other: &Self) -> Option<IntersectionResult<Self>> {

vortex-datafusion/src/convert/stats.rs

Lines changed: 33 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -27,52 +27,43 @@ pub(crate) fn stats_set_to_df(
2727
// TODO(connor): There's a lot that can go wrong here, should probably handle this
2828
// more gracefully...
2929
// Find the min statistic.
30-
let min = stats_set
31-
.get(Stat::Min)
32-
.map(|stat_val| {
33-
Scalar::try_new(
34-
Stat::Min
35-
.dtype(dtype)
36-
.vortex_expect("must have a valid dtype"),
37-
Some(stat_val),
38-
)
39-
.vortex_expect("`Stat::Min` somehow had an incompatible `DType`")
40-
.try_to_df()
41-
.ok()
42-
})
43-
.transpose();
30+
let min = stats_set.get(Stat::Min).and_then(|stat_val| {
31+
Scalar::try_new(
32+
Stat::Min
33+
.dtype(dtype)
34+
.vortex_expect("must have a valid dtype"),
35+
Some(stat_val),
36+
)
37+
.vortex_expect("`Stat::Min` somehow had an incompatible `DType`")
38+
.try_to_df()
39+
.ok()
40+
});
4441

4542
// Find the max statistic.
46-
let max = stats_set
47-
.get(Stat::Max)
48-
.map(|stat_val| {
49-
Scalar::try_new(
50-
Stat::Max
51-
.dtype(dtype)
52-
.vortex_expect("must have a valid dtype"),
53-
Some(stat_val),
54-
)
55-
.vortex_expect("`Stat::Max` somehow had an incompatible `DType`")
56-
.try_to_df()
57-
.ok()
58-
})
59-
.transpose();
43+
let max = stats_set.get(Stat::Max).and_then(|stat_val| {
44+
Scalar::try_new(
45+
Stat::Max
46+
.dtype(dtype)
47+
.vortex_expect("must have a valid dtype"),
48+
Some(stat_val),
49+
)
50+
.vortex_expect("`Stat::Max` somehow had an incompatible `DType`")
51+
.try_to_df()
52+
.ok()
53+
});
6054

6155
// Find the sum statistic
62-
let sum = stats_set
63-
.get(Stat::Sum)
64-
.map(|stat_val| {
65-
Scalar::try_new(
66-
Stat::Sum
67-
.dtype(dtype)
68-
.vortex_expect("must have a valid dtype"),
69-
Some(stat_val),
70-
)
71-
.vortex_expect("`Stat::Sum` somehow had an incompatible `DType`")
72-
.try_to_df()
73-
.ok()
74-
})
75-
.transpose();
56+
let sum = stats_set.get(Stat::Sum).and_then(|stat_val| {
57+
Scalar::try_new(
58+
Stat::Sum
59+
.dtype(dtype)
60+
.vortex_expect("must have a valid dtype"),
61+
Some(stat_val),
62+
)
63+
.vortex_expect("`Stat::Sum` somehow had an incompatible `DType`")
64+
.try_to_df()
65+
.ok()
66+
});
7667

7768
let null_count = stats_set.get_as::<usize>(Stat::NullCount, &PType::U64.into());
7869

vortex-datafusion/src/lib.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -124,18 +124,6 @@ where
124124
}
125125
}
126126

127-
impl<T> PrecisionExt<T> for Option<Precision<T>>
128-
where
129-
T: Debug + Clone + PartialEq + Eq + PartialOrd,
130-
{
131-
fn to_df(self) -> DFPrecision<T> {
132-
match self {
133-
Some(v) => v.to_df(),
134-
None => DFPrecision::Absent,
135-
}
136-
}
137-
}
138-
139127
#[cfg(test)]
140128
mod common_tests {
141129
use std::sync::Arc;

vortex-datafusion/src/persistent/format.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use datafusion_common::config_namespace;
2121
use datafusion_common::internal_datafusion_err;
2222
use datafusion_common::not_impl_err;
2323
use datafusion_common::parsers::CompressionTypeVariant;
24-
use datafusion_common::stats::Precision;
24+
use datafusion_common::stats::Precision as DFPrecision;
2525
use datafusion_common_runtime::SpawnedTask;
2626
use datafusion_datasource::TableSchema;
2727
use datafusion_datasource::file::FileSource;
@@ -52,7 +52,7 @@ use vortex::dtype::PType;
5252
use vortex::error::VortexExpect;
5353
use vortex::error::VortexResult;
5454
use vortex::error::vortex_err;
55-
use vortex::expr::stats;
55+
use vortex::expr::stats::Precision;
5656
use vortex::expr::stats::Stat;
5757
use vortex::file::EOF_SIZE;
5858
use vortex::file::MAX_POSTSCRIPT_SIZE;
@@ -499,12 +499,12 @@ impl FileFormat for VortexFormat {
499499
let Some(file_stats) = file_stats else {
500500
// If the file has no column stats, the best we can do is return a row count.
501501
return Ok(Statistics {
502-
num_rows: Precision::Exact(
502+
num_rows: DFPrecision::Exact(
503503
usize::try_from(row_count)
504504
.map_err(|_| vortex_err!("Row count overflow"))
505505
.vortex_expect("Row count overflow"),
506506
),
507-
total_byte_size: Precision::Absent,
507+
total_byte_size: DFPrecision::Absent,
508508
column_statistics: vec![
509509
ColumnStatistics::default();
510510
table_schema.fields().len()
@@ -544,6 +544,7 @@ impl FileFormat for VortexFormat {
544544
stats_dtype,
545545
&target_dtype,
546546
);
547+
547548
let max = scalar_stat_to_df(
548549
Stat::Max,
549550
stats_set.get(Stat::Max),
@@ -557,7 +558,7 @@ impl FileFormat for VortexFormat {
557558
null_count: null_count.to_df(),
558559
min_value: min.to_df(),
559560
max_value: max.to_df(),
560-
sum_value: Precision::Absent,
561+
sum_value: DFPrecision::Absent,
561562
distinct_count: is_constant_to_distinct_count(
562563
stats_set.get_as::<bool>(
563564
Stat::IsConstant,
@@ -570,10 +571,10 @@ impl FileFormat for VortexFormat {
570571

571572
let total_byte_size = column_statistics
572573
.iter()
573-
.fold(Precision::Exact(0), |acc, cs| acc.add(&cs.byte_size));
574+
.fold(DFPrecision::Exact(0), |acc, cs| acc.add(&cs.byte_size));
574575

575576
Ok(Statistics {
576-
num_rows: Precision::Exact(
577+
num_rows: DFPrecision::Exact(
577578
usize::try_from(row_count)
578579
.map_err(|_| vortex_err!("Row count overflow"))
579580
.vortex_expect("Row count overflow"),
@@ -639,11 +640,13 @@ impl FileFormat for VortexFormat {
639640

640641
fn scalar_stat_to_df(
641642
stat: Stat,
642-
value: stats::Precision<VortexScalarValue>,
643+
value: Precision<VortexScalarValue>,
643644
stats_dtype: &DType,
644645
target_dtype: &DType,
645-
) -> Option<stats::Precision<DFScalarValue>> {
646-
let stat_dtype = stat.dtype(stats_dtype)?;
646+
) -> Precision<DFScalarValue> {
647+
let Some(stat_dtype) = stat.dtype(stats_dtype) else {
648+
return Precision::Absent;
649+
};
647650

648651
value
649652
.map(|stat_value| {
@@ -652,7 +655,7 @@ fn scalar_stat_to_df(
652655
.try_to_df()
653656
})
654657
.transpose()
655-
.ok()
658+
.unwrap_or(Precision::Absent)
656659
}
657660

658661
#[cfg(test)]

vortex-datafusion/src/v2/source.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -512,10 +512,10 @@ impl DataSource for VortexDataSource {
512512
fn partition_statistics(&self, _partition: Option<usize>) -> DFResult<Statistics> {
513513
// FIXME(ngates): this should be adjusted based on filters. See DuckDB for heuristics,
514514
// and in the future, store the selectivity stats in the session.
515-
let num_rows = estimate_to_df_precision(self.data_source.row_count().as_ref());
515+
let num_rows = estimate_to_df_precision(&self.data_source.row_count());
516516

517517
// FIXME(ngates): byte size should be adjusted for the initial projection...
518-
let total_byte_size = estimate_to_df_precision(self.data_source.byte_size().as_ref());
518+
let total_byte_size = estimate_to_df_precision(&self.data_source.byte_size());
519519

520520
// Column statistics must match the output schema (leftover_schema), which may differ
521521
// from the initial schema after try_swapping_with_projection adds computed columns.
@@ -663,12 +663,10 @@ impl DataSource for VortexDataSource {
663663
/// [`DataFusionPrecision`].
664664
///
665665
/// [`DataFusionPrecision`]: datafusion_common::stats::Precision
666-
fn estimate_to_df_precision(est: Option<&Precision<u64>>) -> DFPrecision<usize> {
666+
fn estimate_to_df_precision(est: &Precision<u64>) -> DFPrecision<usize> {
667667
match est {
668-
Some(Precision::Exact(v)) => DFPrecision::Exact(usize::try_from(*v).unwrap_or(usize::MAX)),
669-
Some(Precision::Inexact(v)) => {
670-
DFPrecision::Inexact(usize::try_from(*v).unwrap_or(usize::MAX))
671-
}
672-
Some(Precision::Absent) | None => DFPrecision::Absent,
668+
Precision::Exact(v) => DFPrecision::Exact(usize::try_from(*v).unwrap_or(usize::MAX)),
669+
Precision::Inexact(v) => DFPrecision::Inexact(usize::try_from(*v).unwrap_or(usize::MAX)),
670+
Precision::Absent => DFPrecision::Absent,
673671
}
674672
}

vortex-datafusion/src/v2/table.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,14 +157,14 @@ impl TableProvider for VortexTable {
157157
// planning over stats from the physical plan?
158158
fn statistics(&self) -> Option<Statistics> {
159159
let num_rows = match self.data_source.row_count() {
160-
Some(VortexPrecision::Exact(v)) => {
160+
VortexPrecision::Exact(v) => {
161161
usize::try_from(v).map(Precision::Exact).unwrap_or_default()
162162
}
163163
_ => Precision::Absent,
164164
};
165165

166166
let total_byte_size = match self.data_source.byte_size() {
167-
Some(VortexPrecision::Exact(v)) => {
167+
VortexPrecision::Exact(v) => {
168168
usize::try_from(v).map(Precision::Exact).unwrap_or_default()
169169
}
170170
_ => Precision::Absent,

vortex-ffi/src/data_source.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,15 +117,15 @@ pub unsafe extern "C-unwind" fn vx_data_source_get_row_count(
117117
) {
118118
let rc = unsafe { &mut *row_count };
119119
match vx_data_source::as_ref(ds).row_count() {
120-
Some(Exact(rows)) => {
120+
Exact(rows) => {
121121
rc.r#type = vx_estimate_type::VX_ESTIMATE_EXACT;
122122
rc.estimate = rows;
123123
}
124-
Some(Inexact(rows)) => {
124+
Inexact(rows) => {
125125
rc.r#type = vx_estimate_type::VX_ESTIMATE_INEXACT;
126126
rc.estimate = rows;
127127
}
128-
Some(Absent) | None => {
128+
Absent => {
129129
rc.r#type = vx_estimate_type::VX_ESTIMATE_UNKNOWN;
130130
}
131131
}

vortex-ffi/src/scan.rs

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -194,17 +194,17 @@ fn scan_request(opts: *const vx_scan_options) -> VortexResult<ScanRequest> {
194194
})
195195
}
196196

197-
fn write_estimate<T: Into<u64>>(estimate: Option<Precision<T>>, out: &mut vx_estimate) {
197+
fn write_estimate<T: Into<u64>>(estimate: Precision<T>, out: &mut vx_estimate) {
198198
match estimate {
199-
Some(Precision::Exact(value)) => {
199+
Precision::Exact(value) => {
200200
out.r#type = vx_estimate_type::VX_ESTIMATE_EXACT;
201201
out.estimate = value.into();
202202
}
203-
Some(Precision::Inexact(value)) => {
203+
Precision::Inexact(value) => {
204204
out.r#type = vx_estimate_type::VX_ESTIMATE_INEXACT;
205205
out.estimate = value.into();
206206
}
207-
Some(Precision::Absent) | None => {
207+
Precision::Absent => {
208208
out.r#type = vx_estimate_type::VX_ESTIMATE_UNKNOWN;
209209
}
210210
}
@@ -234,14 +234,9 @@ pub unsafe extern "C-unwind" fn vx_data_source_scan(
234234
RUNTIME.block_on(async {
235235
let scan = vx_data_source::as_ref(data_source).scan(request).await?;
236236
if !estimate.is_null() {
237-
write_estimate(
238-
scan.partition_count().map(|x| match x {
239-
Precision::Exact(v) => Precision::Exact(v as u64),
240-
Precision::Inexact(v) => Precision::Inexact(v as u64),
241-
Precision::Absent => Precision::Absent,
242-
}),
243-
unsafe { &mut *estimate },
244-
);
237+
write_estimate(scan.partition_count().map(|v| v as u64), unsafe {
238+
&mut *estimate
239+
});
245240
}
246241
Ok(vx_scan::new(VxScan::Pending(scan)))
247242
})

0 commit comments

Comments
 (0)