Skip to content

Commit 8a263b2

Browse files
authored
Expose ExecutionPlan statistics across the FFI boundary (#22157)
## Which issue does this PR close? - Closes #22152 ## Rationale for this change `ExecutionPlan::partition_statistics` and `TableProvider::statistics` are not currently transported across the DataFusion FFI boundary, so foreign plans and providers always report `Statistics::new_unknown` / `None`. This blocks optimizer rules that depend on statistics (e.g. join reordering, partition pruning) from working with out-of-process plugins, which defeats the point of exposing those hooks to plugin authors. `Statistics` contains `Precision<ScalarValue>` for column min/max/sum. `ScalarValue` is a large enum that's impractical to mirror in `#[repr(C)]`, so I reuse the existing `datafusion_proto_common::Statistics` prost encoding — the same pattern this crate already uses for filter expressions. ## What changes are included in this PR? - New `datafusion_ffi::statistics` module with `[de]serialize_statistics` helpers wrapping the`datafusion_proto_common::Statistics` round-trip. - New `partition_statistics` field on `FFI_ExecutionPlan` and corresponding `ExecutionPlan::partition_statistics` impl on `ForeignExecutionPlan` - New `statistics` field on `FFI_TableProvider` and corresponding `TableProvider::statistics` impl on `ForeignTableProvider`. Since the trait returns `Option<Statistics>`, the implementation cannot propagate decode errors, it logs a `log::warn!` and triggers a `debug_assert!`. This PR is expected to be merged after #22136 so it includes those changes. ## Are these changes tested? Yes: - Unit tests in `statistics.rs` cover three round-trip cases: `Statistics::new_unknown`, fully-exact statistics with `ScalarValue::Int32`/`Int64`/`Utf8` min/max/sum, and mixed `Precision::Exact`/`Inexact`/`Absent` values. - A new round-trip integration test in `execution_plan.rs` exercises `ForeignExecutionPlan::partition_statistics` with both `None` and `Some(idx)` partitions, against a plan with no statistics (returns `Statistics::new_unknown`) and a plan with concrete statistics. - A new round-trip integration test in `table_provider.rs` uses a thin `TableWithStats` wrapper over `MemTable` to verify both the `None` path and the concrete `Statistics` path through `ForeignTableProvider::statistics`. ## Are there any user-facing changes? This is a breaking ABI change for the `datafusion-ffi` crate: - `FFI_ExecutionPlan` gains a `partition_statistics` field. - `FFI_TableProvider` gains a `statistics` field. Plugins compiled against earlier versions of `datafusion-ffi` will need to be recompiled. There are no breaking changes to the Rust trait surface or to `Statistics` itself; downstream `ExecutionPlan` / `TableProvider` implementations require no changes.
1 parent 2c8cf23 commit 8a263b2

8 files changed

Lines changed: 500 additions & 10 deletions

File tree

datafusion/ffi/src/execution_plan.rs

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::sync::Arc;
2121

2222
use datafusion_common::config::ConfigOptions;
2323
use datafusion_common::tree_node::TreeNodeRecursion;
24-
use datafusion_common::{DataFusionError, Result};
24+
use datafusion_common::{DataFusionError, Result, Statistics};
2525
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
2626
use datafusion_physical_expr_common::metrics::MetricsSet;
2727
use datafusion_physical_plan::{
@@ -36,6 +36,7 @@ use crate::execution::FFI_TaskContext;
3636
use crate::physical_expr::metrics::FFI_MetricsSet;
3737
use crate::plan_properties::FFI_PlanProperties;
3838
use crate::record_batch_stream::FFI_RecordBatchStream;
39+
use crate::statistics::{deserialize_statistics, serialize_statistics};
3940
use crate::util::{FFI_Option, FFI_Result};
4041
use crate::{df_result, sresult, sresult_return};
4142

@@ -74,6 +75,15 @@ pub struct FFI_ExecutionPlan {
7475
/// underlying [`ExecutionPlan::metrics`] returned `None`.
7576
pub metrics: unsafe extern "C" fn(plan: &Self) -> FFI_Option<FFI_MetricsSet>,
7677

78+
/// Snapshot partition statistics. `partition == None` corresponds to
79+
/// statistics over all partitions; `Some(idx)` corresponds to a specific
80+
/// partition. The returned bytes are a prost-encoded
81+
/// `datafusion_proto_common::Statistics`.
82+
pub partition_statistics: unsafe extern "C" fn(
83+
plan: &Self,
84+
partition: FFI_Option<usize>,
85+
) -> FFI_Result<SVec<u8>>,
86+
7787
/// Used to create a clone on the provider of the execution plan. This should
7888
/// only need to be called by the receiver of the plan.
7989
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
@@ -195,6 +205,17 @@ unsafe extern "C" fn metrics_fn_wrapper(
195205
.into()
196206
}
197207

208+
unsafe extern "C" fn partition_statistics_fn_wrapper(
209+
plan: &FFI_ExecutionPlan,
210+
partition: FFI_Option<usize>,
211+
) -> FFI_Result<SVec<u8>> {
212+
let partition: Option<usize> = partition.into();
213+
plan.inner()
214+
.partition_statistics(partition)
215+
.map(|stats| SVec::from(serialize_statistics(stats.as_ref()).as_slice()))
216+
.into()
217+
}
218+
198219
unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) {
199220
unsafe {
200221
debug_assert!(!plan.private_data.is_null());
@@ -287,6 +308,7 @@ impl FFI_ExecutionPlan {
287308
execute: execute_fn_wrapper,
288309
repartitioned: repartitioned_fn_wrapper,
289310
metrics: metrics_fn_wrapper,
311+
partition_statistics: partition_statistics_fn_wrapper,
290312
clone: clone_fn_wrapper,
291313
release: release_fn_wrapper,
292314
private_data: Box::into_raw(private_data) as *mut c_void,
@@ -454,6 +476,13 @@ impl ExecutionPlan for ForeignExecutionPlan {
454476
unsafe { (self.plan.metrics)(&self.plan) }.into();
455477
ffi.map(MetricsSet::from)
456478
}
479+
480+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
481+
let bytes = df_result!(unsafe {
482+
(self.plan.partition_statistics)(&self.plan, partition.into())
483+
})?;
484+
Ok(Arc::new(deserialize_statistics(bytes.as_slice())?))
485+
}
457486
}
458487

459488
#[cfg(any(test, feature = "integration-tests"))]
@@ -468,6 +497,7 @@ pub mod tests {
468497
props: Arc<PlanProperties>,
469498
children: Vec<Arc<dyn ExecutionPlan>>,
470499
metrics: Option<MetricsSet>,
500+
statistics: Option<Statistics>,
471501
}
472502

473503
impl EmptyExec {
@@ -481,13 +511,19 @@ pub mod tests {
481511
)),
482512
children: Vec::default(),
483513
metrics: None,
514+
statistics: None,
484515
}
485516
}
486517

487518
pub fn with_metrics(mut self, metrics: MetricsSet) -> Self {
488519
self.metrics = Some(metrics);
489520
self
490521
}
522+
523+
pub fn with_statistics(mut self, statistics: Statistics) -> Self {
524+
self.statistics = Some(statistics);
525+
self
526+
}
491527
}
492528

493529
impl DisplayAs for EmptyExec {
@@ -521,6 +557,7 @@ pub mod tests {
521557
props: Arc::clone(&self.props),
522558
children,
523559
metrics: self.metrics.clone(),
560+
statistics: self.statistics.clone(),
524561
}))
525562
}
526563

@@ -536,6 +573,15 @@ pub mod tests {
536573
self.metrics.clone()
537574
}
538575

576+
fn partition_statistics(
577+
&self,
578+
_partition: Option<usize>,
579+
) -> Result<Arc<Statistics>> {
580+
Ok(Arc::new(self.statistics.clone().unwrap_or_else(|| {
581+
Statistics::new_unknown(self.props.eq_properties.schema())
582+
})))
583+
}
584+
539585
fn apply_expressions(
540586
&self,
541587
f: &mut dyn FnMut(
@@ -659,6 +705,54 @@ pub mod tests {
659705
Ok(())
660706
}
661707

708+
#[test]
709+
fn test_ffi_execution_plan_partition_statistics_round_trip() -> Result<()> {
710+
use datafusion_common::stats::Precision;
711+
use datafusion_common::{ColumnStatistics, ScalarValue};
712+
713+
let schema = Arc::new(arrow::datatypes::Schema::new(vec![
714+
arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Int32, true),
715+
]));
716+
717+
// Plans without explicit statistics return Statistics::new_unknown across
718+
// the boundary.
719+
let bare_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
720+
let mut bare_local = FFI_ExecutionPlan::new(bare_plan, None);
721+
bare_local.library_marker_id = crate::mock_foreign_marker_id;
722+
let bare_foreign: Arc<dyn ExecutionPlan> = (&bare_local).try_into()?;
723+
let bare_stats = bare_foreign.partition_statistics(None)?;
724+
assert_eq!(bare_stats.as_ref(), &Statistics::new_unknown(&schema));
725+
726+
// Plans with statistics round-trip them faithfully, including
727+
// ScalarValue-typed min/max.
728+
let original_stats = Statistics {
729+
num_rows: Precision::Exact(7),
730+
total_byte_size: Precision::Inexact(128),
731+
column_statistics: vec![ColumnStatistics {
732+
null_count: Precision::Exact(1),
733+
max_value: Precision::Exact(ScalarValue::Int32(Some(10))),
734+
min_value: Precision::Exact(ScalarValue::Int32(Some(-3))),
735+
sum_value: Precision::Absent,
736+
distinct_count: Precision::Inexact(6),
737+
byte_size: Precision::Exact(28),
738+
}],
739+
};
740+
let stats_plan = Arc::new(
741+
EmptyExec::new(Arc::clone(&schema)).with_statistics(original_stats.clone()),
742+
);
743+
let mut stats_local = FFI_ExecutionPlan::new(stats_plan, None);
744+
stats_local.library_marker_id = crate::mock_foreign_marker_id;
745+
let stats_foreign: Arc<dyn ExecutionPlan> = (&stats_local).try_into()?;
746+
747+
let observed = stats_foreign.partition_statistics(None)?;
748+
assert_eq!(observed.as_ref(), &original_stats);
749+
750+
let observed_partition = stats_foreign.partition_statistics(Some(1))?;
751+
assert_eq!(observed_partition.as_ref(), &original_stats);
752+
753+
Ok(())
754+
}
755+
662756
#[test]
663757
fn test_ffi_execution_plan_local_bypass() {
664758
let schema = Arc::new(arrow::datatypes::Schema::new(vec![

datafusion/ffi/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub mod proto;
4141
pub mod record_batch_stream;
4242
pub mod schema_provider;
4343
pub mod session;
44+
pub mod statistics;
4445
pub mod table_provider;
4546
pub mod table_provider_factory;
4647
pub mod table_source;

datafusion/ffi/src/physical_expr/metrics.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub struct FFI_MetricsSet {
5959
pub struct FFI_Metric {
6060
pub value: FFI_MetricValue,
6161
pub labels: SVec<FFI_Label>,
62-
pub partition: FFI_Option<u64>,
62+
pub partition: FFI_Option<usize>,
6363
pub metric_type: FFI_MetricType,
6464
pub metric_category: FFI_Option<FFI_MetricCategory>,
6565
}
@@ -203,7 +203,7 @@ impl From<&Metric> for FFI_Metric {
203203
Self {
204204
value: FFI_MetricValue::from(m.value()),
205205
labels: m.labels().iter().map(FFI_Label::from).collect(),
206-
partition: m.partition().map(|p| p as u64).into(),
206+
partition: m.partition().into(),
207207
metric_type: m.metric_type().into(),
208208
metric_category: m.metric_category().map(FFI_MetricCategory::from).into(),
209209
}
@@ -213,14 +213,10 @@ impl From<&Metric> for FFI_Metric {
213213
impl From<FFI_Metric> for Metric {
214214
fn from(m: FFI_Metric) -> Self {
215215
let labels: Vec<Label> = m.labels.into_iter().map(Label::from).collect();
216-
let partition: Option<u64> = m.partition.into();
216+
let partition: Option<usize> = m.partition.into();
217217
let category: Option<FFI_MetricCategory> = m.metric_category.into();
218-
let mut metric = Metric::new_with_labels(
219-
m.value.into(),
220-
partition.map(|p| p as usize),
221-
labels,
222-
)
223-
.with_type(m.metric_type.into());
218+
let mut metric = Metric::new_with_labels(m.value.into(), partition, labels)
219+
.with_type(m.metric_type.into());
224220
if let Some(c) = category {
225221
metric = metric.with_category(c.into());
226222
}

datafusion/ffi/src/statistics.rs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Helpers for moving [`Statistics`] across the FFI boundary as prost-encoded
19+
//! `datafusion_proto_common::Statistics` bytes.
20+
//!
21+
//! [`Statistics`] contains [`Precision<ScalarValue>`] for column min/max/sum,
22+
//! and `ScalarValue` is a large enum that's impractical to mirror in
23+
//! `#[repr(C)]`. The proto round-trip already exists in `datafusion-proto-common`
24+
//! and is the same pattern used to ship filter expressions across the FFI
25+
//! boundary, so we reuse it here.
26+
//!
27+
//! [`Precision<ScalarValue>`]: datafusion_common::stats::Precision
28+
29+
use datafusion_common::{DataFusionError, Result, Statistics};
30+
use prost::Message;
31+
32+
/// Serialize [`Statistics`] to prost-encoded
33+
/// `datafusion_proto_common::Statistics` bytes.
34+
pub(crate) fn serialize_statistics(stats: &Statistics) -> Vec<u8> {
35+
datafusion_proto_common::Statistics::from(stats).encode_to_vec()
36+
}
37+
38+
/// Decode prost-encoded `datafusion_proto_common::Statistics` bytes back into
39+
/// [`Statistics`].
40+
pub(crate) fn deserialize_statistics(bytes: &[u8]) -> Result<Statistics> {
41+
let proto = datafusion_proto_common::Statistics::decode(bytes).map_err(|e| {
42+
DataFusionError::Plan(format!("failed to decode Statistics: {e}"))
43+
})?;
44+
Statistics::try_from(&proto)
45+
}
46+
47+
#[cfg(test)]
48+
mod tests {
49+
use std::sync::Arc;
50+
51+
use arrow::datatypes::{DataType, Field, Schema};
52+
use datafusion_common::ScalarValue;
53+
use datafusion_common::stats::Precision;
54+
use datafusion_common::{ColumnStatistics, Statistics};
55+
56+
use super::*;
57+
58+
#[test]
59+
fn round_trip_unknown_statistics() {
60+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
61+
let original = Statistics::new_unknown(&Arc::new(schema));
62+
63+
let bytes = serialize_statistics(&original);
64+
let observed = deserialize_statistics(&bytes).expect("decode");
65+
66+
assert_eq!(observed, original);
67+
}
68+
69+
#[test]
70+
fn round_trip_exact_statistics_with_scalar_values() {
71+
let original = Statistics {
72+
num_rows: Precision::Exact(100),
73+
total_byte_size: Precision::Exact(4096),
74+
column_statistics: vec![
75+
ColumnStatistics {
76+
null_count: Precision::Exact(2),
77+
max_value: Precision::Exact(ScalarValue::Int32(Some(50))),
78+
min_value: Precision::Exact(ScalarValue::Int32(Some(-10))),
79+
sum_value: Precision::Exact(ScalarValue::Int64(Some(1234))),
80+
distinct_count: Precision::Exact(40),
81+
byte_size: Precision::Exact(800),
82+
},
83+
ColumnStatistics {
84+
null_count: Precision::Exact(0),
85+
max_value: Precision::Exact(ScalarValue::Utf8(Some(
86+
"zebra".to_string(),
87+
))),
88+
min_value: Precision::Exact(ScalarValue::Utf8(Some(
89+
"ant".to_string(),
90+
))),
91+
sum_value: Precision::Absent,
92+
distinct_count: Precision::Inexact(95),
93+
byte_size: Precision::Inexact(2048),
94+
},
95+
],
96+
};
97+
98+
let bytes = serialize_statistics(&original);
99+
let observed = deserialize_statistics(&bytes).expect("decode");
100+
101+
assert_eq!(observed, original);
102+
}
103+
104+
#[test]
105+
fn round_trip_mixed_precision() {
106+
let original = Statistics {
107+
num_rows: Precision::Inexact(42),
108+
total_byte_size: Precision::Absent,
109+
column_statistics: vec![ColumnStatistics {
110+
null_count: Precision::Absent,
111+
max_value: Precision::Inexact(ScalarValue::Float64(Some(1.5))),
112+
min_value: Precision::Absent,
113+
sum_value: Precision::Inexact(ScalarValue::Float64(Some(63.0))),
114+
distinct_count: Precision::Absent,
115+
byte_size: Precision::Absent,
116+
}],
117+
};
118+
119+
let bytes = serialize_statistics(&original);
120+
let observed = deserialize_statistics(&bytes).expect("decode");
121+
122+
assert_eq!(observed, original);
123+
}
124+
}

0 commit comments

Comments
 (0)