Skip to content

Commit 97fb89e

Browse files
committed
FFI statistics
1 parent 5931124 commit 97fb89e

4 files changed

Lines changed: 364 additions & 1 deletion

File tree

datafusion/ffi/src/execution_plan.rs

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::sync::Arc;
2121

2222
use datafusion_common::config::ConfigOptions;
2323
use datafusion_common::tree_node::TreeNodeRecursion;
24-
use datafusion_common::{DataFusionError, Result};
24+
use datafusion_common::{DataFusionError, Result, Statistics};
2525
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
2626
use datafusion_physical_expr_common::metrics::MetricsSet;
2727
use datafusion_physical_plan::{
@@ -36,6 +36,7 @@ use crate::execution::FFI_TaskContext;
3636
use crate::physical_expr::metrics::FFI_MetricsSet;
3737
use crate::plan_properties::FFI_PlanProperties;
3838
use crate::record_batch_stream::FFI_RecordBatchStream;
39+
use crate::statistics::{deserialize_statistics, serialize_statistics};
3940
use crate::util::{FFI_Option, FFI_Result};
4041
use crate::{df_result, sresult, sresult_return};
4142

@@ -74,6 +75,15 @@ pub struct FFI_ExecutionPlan {
7475
/// underlying [`ExecutionPlan::metrics`] returned `None`.
7576
pub metrics: unsafe extern "C" fn(plan: &Self) -> FFI_Option<FFI_MetricsSet>,
7677

78+
/// Snapshot partition statistics. `partition == None` corresponds to
79+
/// statistics over all partitions; `Some(idx)` corresponds to a specific
80+
/// partition. The returned bytes are a prost-encoded
81+
/// `datafusion_proto_common::Statistics`.
82+
pub partition_statistics: unsafe extern "C" fn(
83+
plan: &Self,
84+
partition: FFI_Option<u64>,
85+
) -> FFI_Result<SVec<u8>>,
86+
7787
/// Used to create a clone on the provider of the execution plan. This should
7888
/// only need to be called by the receiver of the plan.
7989
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
@@ -195,6 +205,17 @@ unsafe extern "C" fn metrics_fn_wrapper(
195205
.into()
196206
}
197207

208+
unsafe extern "C" fn partition_statistics_fn_wrapper(
209+
plan: &FFI_ExecutionPlan,
210+
partition: FFI_Option<u64>,
211+
) -> FFI_Result<SVec<u8>> {
212+
let partition: Option<usize> = Option::<u64>::from(partition).map(|p| p as usize);
213+
plan.inner()
214+
.partition_statistics(partition)
215+
.map(|stats| serialize_statistics(stats.as_ref()).into_iter().collect())
216+
.into()
217+
}
218+
198219
unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) {
199220
unsafe {
200221
debug_assert!(!plan.private_data.is_null());
@@ -287,6 +308,7 @@ impl FFI_ExecutionPlan {
287308
execute: execute_fn_wrapper,
288309
repartitioned: repartitioned_fn_wrapper,
289310
metrics: metrics_fn_wrapper,
311+
partition_statistics: partition_statistics_fn_wrapper,
290312
clone: clone_fn_wrapper,
291313
release: release_fn_wrapper,
292314
private_data: Box::into_raw(private_data) as *mut c_void,
@@ -454,10 +476,24 @@ impl ExecutionPlan for ForeignExecutionPlan {
454476
unsafe { (self.plan.metrics)(&self.plan) }.into();
455477
ffi.map(MetricsSet::from)
456478
}
479+
480+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
481+
let bytes = df_result!(unsafe {
482+
(self.plan.partition_statistics)(
483+
&self.plan,
484+
partition.map(|p| p as u64).into(),
485+
)
486+
})?;
487+
Ok(Arc::new(deserialize_statistics(bytes.as_slice())?))
488+
}
457489
}
458490

459491
#[cfg(any(test, feature = "integration-tests"))]
460492
pub mod tests {
493+
#[cfg(test)]
494+
use datafusion_common::stats::Precision;
495+
#[cfg(test)]
496+
use datafusion_common::{ColumnStatistics, ScalarValue};
461497
use datafusion_physical_plan::Partitioning;
462498
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
463499

@@ -468,6 +504,7 @@ pub mod tests {
468504
props: Arc<PlanProperties>,
469505
children: Vec<Arc<dyn ExecutionPlan>>,
470506
metrics: Option<MetricsSet>,
507+
statistics: Option<Statistics>,
471508
}
472509

473510
impl EmptyExec {
@@ -481,13 +518,19 @@ pub mod tests {
481518
)),
482519
children: Vec::default(),
483520
metrics: None,
521+
statistics: None,
484522
}
485523
}
486524

487525
pub fn with_metrics(mut self, metrics: MetricsSet) -> Self {
488526
self.metrics = Some(metrics);
489527
self
490528
}
529+
530+
pub fn with_statistics(mut self, statistics: Statistics) -> Self {
531+
self.statistics = Some(statistics);
532+
self
533+
}
491534
}
492535

493536
impl DisplayAs for EmptyExec {
@@ -521,6 +564,7 @@ pub mod tests {
521564
props: Arc::clone(&self.props),
522565
children,
523566
metrics: self.metrics.clone(),
567+
statistics: self.statistics.clone(),
524568
}))
525569
}
526570

@@ -536,6 +580,15 @@ pub mod tests {
536580
self.metrics.clone()
537581
}
538582

583+
fn partition_statistics(
584+
&self,
585+
_partition: Option<usize>,
586+
) -> Result<Arc<Statistics>> {
587+
Ok(Arc::new(self.statistics.clone().unwrap_or_else(|| {
588+
Statistics::new_unknown(self.props.eq_properties.schema())
589+
})))
590+
}
591+
539592
fn apply_expressions(
540593
&self,
541594
f: &mut dyn FnMut(
@@ -659,6 +712,51 @@ pub mod tests {
659712
Ok(())
660713
}
661714

715+
#[test]
716+
fn test_ffi_execution_plan_partition_statistics_round_trip() -> Result<()> {
717+
let schema = Arc::new(arrow::datatypes::Schema::new(vec![
718+
arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Int32, true),
719+
]));
720+
721+
// Plans without explicit statistics return Statistics::new_unknown across
722+
// the boundary.
723+
let bare_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
724+
let mut bare_local = FFI_ExecutionPlan::new(bare_plan, None);
725+
bare_local.library_marker_id = crate::mock_foreign_marker_id;
726+
let bare_foreign: Arc<dyn ExecutionPlan> = (&bare_local).try_into()?;
727+
let bare_stats = bare_foreign.partition_statistics(None)?;
728+
assert_eq!(bare_stats.as_ref(), &Statistics::new_unknown(&schema));
729+
730+
// Plans with statistics round-trip them faithfully, including
731+
// ScalarValue-typed min/max.
732+
let original_stats = Statistics {
733+
num_rows: Precision::Exact(7),
734+
total_byte_size: Precision::Inexact(128),
735+
column_statistics: vec![ColumnStatistics {
736+
null_count: Precision::Exact(1),
737+
max_value: Precision::Exact(ScalarValue::Int32(Some(10))),
738+
min_value: Precision::Exact(ScalarValue::Int32(Some(-3))),
739+
sum_value: Precision::Absent,
740+
distinct_count: Precision::Inexact(6),
741+
byte_size: Precision::Exact(28),
742+
}],
743+
};
744+
let stats_plan = Arc::new(
745+
EmptyExec::new(Arc::clone(&schema)).with_statistics(original_stats.clone()),
746+
);
747+
let mut stats_local = FFI_ExecutionPlan::new(stats_plan, None);
748+
stats_local.library_marker_id = crate::mock_foreign_marker_id;
749+
let stats_foreign: Arc<dyn ExecutionPlan> = (&stats_local).try_into()?;
750+
751+
let observed = stats_foreign.partition_statistics(None)?;
752+
assert_eq!(observed.as_ref(), &original_stats);
753+
754+
let observed_partition = stats_foreign.partition_statistics(Some(1))?;
755+
assert_eq!(observed_partition.as_ref(), &original_stats);
756+
757+
Ok(())
758+
}
759+
662760
#[test]
663761
fn test_ffi_execution_plan_local_bypass() {
664762
let schema = Arc::new(arrow::datatypes::Schema::new(vec![

datafusion/ffi/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub mod proto;
4141
pub mod record_batch_stream;
4242
pub mod schema_provider;
4343
pub mod session;
44+
pub mod statistics;
4445
pub mod table_provider;
4546
pub mod table_provider_factory;
4647
pub mod table_source;

datafusion/ffi/src/statistics.rs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Helpers for moving [`Statistics`] across the FFI boundary as prost-encoded
19+
//! `datafusion_proto_common::Statistics` bytes.
20+
//!
21+
//! [`Statistics`] contains [`Precision<ScalarValue>`] for column min/max/sum,
22+
//! and `ScalarValue` is a large enum that's impractical to mirror in
23+
//! `#[repr(C)]`. The proto round-trip already exists in `datafusion-proto-common`
24+
//! and is the same pattern used to ship filter expressions across the FFI
25+
//! boundary, so we reuse it here.
26+
//!
27+
//! [`Precision<ScalarValue>`]: datafusion_common::stats::Precision
28+
29+
use datafusion_common::{DataFusionError, Result, Statistics};
30+
use prost::Message;
31+
32+
/// Serialize [`Statistics`] to prost-encoded
33+
/// `datafusion_proto_common::Statistics` bytes.
34+
pub(crate) fn serialize_statistics(stats: &Statistics) -> Vec<u8> {
35+
datafusion_proto_common::Statistics::from(stats).encode_to_vec()
36+
}
37+
38+
/// Decode prost-encoded `datafusion_proto_common::Statistics` bytes back into
39+
/// [`Statistics`].
40+
pub(crate) fn deserialize_statistics(bytes: &[u8]) -> Result<Statistics> {
41+
let proto = datafusion_proto_common::Statistics::decode(bytes).map_err(|e| {
42+
DataFusionError::Plan(format!("failed to decode Statistics: {e}"))
43+
})?;
44+
Statistics::try_from(&proto)
45+
}
46+
47+
#[cfg(test)]
48+
mod tests {
49+
use std::sync::Arc;
50+
51+
use arrow::datatypes::{DataType, Field, Schema};
52+
use datafusion_common::ScalarValue;
53+
use datafusion_common::stats::Precision;
54+
use datafusion_common::{ColumnStatistics, Statistics};
55+
56+
use super::*;
57+
58+
#[test]
59+
fn round_trip_unknown_statistics() {
60+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
61+
let original = Statistics::new_unknown(&Arc::new(schema));
62+
63+
let bytes = serialize_statistics(&original);
64+
let observed = deserialize_statistics(&bytes).expect("decode");
65+
66+
assert_eq!(observed, original);
67+
}
68+
69+
#[test]
70+
fn round_trip_exact_statistics_with_scalar_values() {
71+
let original = Statistics {
72+
num_rows: Precision::Exact(100),
73+
total_byte_size: Precision::Exact(4096),
74+
column_statistics: vec![
75+
ColumnStatistics {
76+
null_count: Precision::Exact(2),
77+
max_value: Precision::Exact(ScalarValue::Int32(Some(50))),
78+
min_value: Precision::Exact(ScalarValue::Int32(Some(-10))),
79+
sum_value: Precision::Exact(ScalarValue::Int64(Some(1234))),
80+
distinct_count: Precision::Exact(40),
81+
byte_size: Precision::Exact(800),
82+
},
83+
ColumnStatistics {
84+
null_count: Precision::Exact(0),
85+
max_value: Precision::Exact(ScalarValue::Utf8(Some(
86+
"zebra".to_string(),
87+
))),
88+
min_value: Precision::Exact(ScalarValue::Utf8(Some(
89+
"ant".to_string(),
90+
))),
91+
sum_value: Precision::Absent,
92+
distinct_count: Precision::Inexact(95),
93+
byte_size: Precision::Inexact(2048),
94+
},
95+
],
96+
};
97+
98+
let bytes = serialize_statistics(&original);
99+
let observed = deserialize_statistics(&bytes).expect("decode");
100+
101+
assert_eq!(observed, original);
102+
}
103+
104+
#[test]
105+
fn round_trip_mixed_precision() {
106+
let original = Statistics {
107+
num_rows: Precision::Inexact(42),
108+
total_byte_size: Precision::Absent,
109+
column_statistics: vec![ColumnStatistics {
110+
null_count: Precision::Absent,
111+
max_value: Precision::Inexact(ScalarValue::Float64(Some(1.5))),
112+
min_value: Precision::Absent,
113+
sum_value: Precision::Inexact(ScalarValue::Float64(Some(63.0))),
114+
distinct_count: Precision::Absent,
115+
byte_size: Precision::Absent,
116+
}],
117+
};
118+
119+
let bytes = serialize_statistics(&original);
120+
let observed = deserialize_statistics(&bytes).expect("decode");
121+
122+
assert_eq!(observed, original);
123+
}
124+
}

0 commit comments

Comments
 (0)