Skip to content

Commit 1af9bd7

Browse files
authored
Add metrics to FFI_ExecutionPlan (#22136)
## Which issue does this PR close? - Closes #22135 ## Rationale for this change `FFI_ExecutionPlan` exposes most of the `ExecutionPlan` trait but does not expose `metrics()`. As a result, `ForeignExecutionPlan::metrics()` falls through to the trait default (`None`), so anything downstream of an FFI boundary loses metrics. The most visible breakage is `EXPLAIN ANALYZE`, which renders empty metric blocks for foreign plans; anything calling `DisplayableExecutionPlan::with_metrics(...)` on a plan tree containing foreign nodes is similarly affected. This PR makes foreign plans behave the same as local plans for metric reporting. Metrics are passed as a snapshot, and all atomic-backed counters/gauges/timers are read into plain integer fields at marshal time. Correct because none of the in-tree consumers (`AnalyzeExec`, `DisplayableExecutionPlan`) poll metrics during streaming. ## What changes are included in this PR? - New module `datafusion/ffi/src/metrics.rs` with FFI-stable mirrors of `MetricsSet`, `Metric`, `MetricValue` (all 16 variants), `Label`, `MetricType`, `MetricCategory`, `PruningMetrics`, `RatioMetrics`, and `RatioMergeStrategy`, plus bidirectional `From` conversions. - `MetricValue::Custom { value: Arc<dyn CustomMetricValue> }` is marshalled as `(name, Display output, as_usize())`. On the consumer side it is reconstructed as a small `FfiCustomMetricValue` shim that preserves `Display` and `as_usize()`. `aggregate` becomes a no-op (snapshots are not mergeable) and `as_any` only downcasts to the shim — this is the documented compromise. - `FFI_ExecutionPlan` gains a new `metrics` function pointer (appended after `repartitioned`). `ForeignExecutionPlan::metrics()` is implemented to call through it. - Two trivial accessors added to `RatioMetrics`: `merge_strategy()` and `display_raw_values()` — needed to marshal these otherwise-private fields. - `chrono` added as a direct dependency of `datafusion-ffi` (used for `Timestamp` ↔ unix-nanos conversion). ## Are these changes tested? Yes. New tests, all passing: - 7 unit tests in `datafusion/ffi/src/metrics.rs` round-trip every `MetricValue` variant individually, plus a full `Metric` (value + labels + partition + type + category) and a `MetricsSet`. - `test_ffi_execution_plan_metrics_round_trip` in `datafusion/ffi/src/execution_plan.rs` exercises the full FFI path: builds an `ExecutionPlan` with a `MetricsSet`, wraps it in `FFI_ExecutionPlan`, retrieves metrics via `ForeignExecutionPlan::metrics()` through `mock_foreign_marker_id`, and asserts the aggregated value matches. - `EmptyExec` test helper extended with `with_metrics(MetricsSet)`. Existing test suites still pass: `cargo test -p datafusion-ffi --all-features` and `cargo test -p datafusion-ffi --features integration-tests`. ## Are there any user-facing changes? Yes — this PR adds public API and makes a binary-incompatible change to `FFI_ExecutionPlan`. Please add the `api change` label. - **New public types** in `datafusion_ffi::metrics`: `FFI_MetricsSet`, `FFI_Metric`, `FFI_MetricValue`, `FFI_Label`, `FFI_MetricType`, `FFI_MetricCategory`, `FFI_PruningMetrics`, `FFI_RatioMetrics`, `FFI_RatioMergeStrategy`, and `FfiCustomMetricValue`. - **ABI break for `FFI_ExecutionPlan`**: a new `metrics` function pointer field is appended. Producers and consumers must be rebuilt together, as is already enforced by the major-version check via `datafusion_ffi::version()`. - **New public accessors** on `RatioMetrics`: `merge_strategy()` and `display_raw_values()`. Non-breaking additions. - **`MetricValue::Custom` across FFI is lossy by design**: the underlying `dyn CustomMetricValue` is not preserved; only its `Display` output and `as_usize()` snapshot survive. Documented on `FfiCustomMetricValue`.
1 parent 4fac70d commit 1af9bd7

6 files changed

Lines changed: 889 additions & 0 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/ffi/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ arrow = { workspace = true, features = ["ffi"] }
4848
arrow-schema = { workspace = true }
4949
async-ffi = { version = "0.5.0" }
5050
async-trait = { workspace = true }
51+
chrono = { workspace = true }
5152
datafusion-catalog = { workspace = true }
5253
datafusion-common = { workspace = true }
5354
datafusion-datasource = { workspace = true }

datafusion/ffi/src/execution_plan.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use datafusion_common::config::ConfigOptions;
2323
use datafusion_common::tree_node::TreeNodeRecursion;
2424
use datafusion_common::{DataFusionError, Result};
2525
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
26+
use datafusion_physical_expr_common::metrics::MetricsSet;
2627
use datafusion_physical_plan::{
2728
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
2829
};
@@ -32,6 +33,7 @@ use tokio::runtime::Handle;
3233

3334
use crate::config::FFI_ConfigOptions;
3435
use crate::execution::FFI_TaskContext;
36+
use crate::physical_expr::metrics::FFI_MetricsSet;
3537
use crate::plan_properties::FFI_PlanProperties;
3638
use crate::record_batch_stream::FFI_RecordBatchStream;
3739
use crate::util::{FFI_Option, FFI_Result};
@@ -68,6 +70,10 @@ pub struct FFI_ExecutionPlan {
6870
)
6971
-> FFI_Result<FFI_Option<FFI_ExecutionPlan>>,
7072

73+
/// Snapshot the plan's execution metrics. Returns `None` when the
74+
/// underlying [`ExecutionPlan::metrics`] returned `None`.
75+
pub metrics: unsafe extern "C" fn(plan: &Self) -> FFI_Option<FFI_MetricsSet>,
76+
7177
/// Used to create a clone on the provider of the execution plan. This should
7278
/// only need to be called by the receiver of the plan.
7379
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
@@ -179,6 +185,16 @@ unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> SString {
179185
plan.inner().name().into()
180186
}
181187

188+
unsafe extern "C" fn metrics_fn_wrapper(
189+
plan: &FFI_ExecutionPlan,
190+
) -> FFI_Option<FFI_MetricsSet> {
191+
plan.inner()
192+
.metrics()
193+
.as_ref()
194+
.map(FFI_MetricsSet::from)
195+
.into()
196+
}
197+
182198
unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) {
183199
unsafe {
184200
debug_assert!(!plan.private_data.is_null());
@@ -270,6 +286,7 @@ impl FFI_ExecutionPlan {
270286
name: name_fn_wrapper,
271287
execute: execute_fn_wrapper,
272288
repartitioned: repartitioned_fn_wrapper,
289+
metrics: metrics_fn_wrapper,
273290
clone: clone_fn_wrapper,
274291
release: release_fn_wrapper,
275292
private_data: Box::into_raw(private_data) as *mut c_void,
@@ -431,6 +448,12 @@ impl ExecutionPlan for ForeignExecutionPlan {
431448
.map(|plan| <Arc<dyn ExecutionPlan>>::try_from(&plan))
432449
.transpose()
433450
}
451+
452+
fn metrics(&self) -> Option<MetricsSet> {
453+
let ffi: Option<FFI_MetricsSet> =
454+
unsafe { (self.plan.metrics)(&self.plan) }.into();
455+
ffi.map(MetricsSet::from)
456+
}
434457
}
435458

436459
#[cfg(any(test, feature = "integration-tests"))]
@@ -444,6 +467,7 @@ pub mod tests {
444467
pub struct EmptyExec {
445468
props: Arc<PlanProperties>,
446469
children: Vec<Arc<dyn ExecutionPlan>>,
470+
metrics: Option<MetricsSet>,
447471
}
448472

449473
impl EmptyExec {
@@ -456,8 +480,14 @@ pub mod tests {
456480
Boundedness::Bounded,
457481
)),
458482
children: Vec::default(),
483+
metrics: None,
459484
}
460485
}
486+
487+
pub fn with_metrics(mut self, metrics: MetricsSet) -> Self {
488+
self.metrics = Some(metrics);
489+
self
490+
}
461491
}
462492

463493
impl DisplayAs for EmptyExec {
@@ -490,6 +520,7 @@ pub mod tests {
490520
Ok(Arc::new(EmptyExec {
491521
props: Arc::clone(&self.props),
492522
children,
523+
metrics: self.metrics.clone(),
493524
}))
494525
}
495526

@@ -501,6 +532,10 @@ pub mod tests {
501532
unimplemented!()
502533
}
503534

535+
fn metrics(&self) -> Option<MetricsSet> {
536+
self.metrics.clone()
537+
}
538+
504539
fn apply_expressions(
505540
&self,
506541
f: &mut dyn FnMut(
@@ -587,6 +622,43 @@ pub mod tests {
587622
Ok(())
588623
}
589624

625+
#[test]
626+
fn test_ffi_execution_plan_metrics_round_trip() -> Result<()> {
627+
use datafusion_physical_expr_common::metrics::{Count, Metric, MetricValue};
628+
629+
let schema = Arc::new(arrow::datatypes::Schema::new(vec![
630+
arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Float32, false),
631+
]));
632+
633+
// Plans without metrics still return None across the boundary.
634+
let bare_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
635+
let mut bare_local = FFI_ExecutionPlan::new(bare_plan, None);
636+
bare_local.library_marker_id = crate::mock_foreign_marker_id;
637+
let bare_foreign: Arc<dyn ExecutionPlan> = (&bare_local).try_into()?;
638+
assert!(bare_foreign.metrics().is_none());
639+
640+
// Plans with metrics produce equivalent MetricsSets after a round trip.
641+
let mut original_metrics = MetricsSet::new();
642+
let c0 = Count::new();
643+
c0.add(11);
644+
original_metrics
645+
.push(Arc::new(Metric::new(MetricValue::OutputRows(c0), Some(0))));
646+
let c1 = Count::new();
647+
c1.add(31);
648+
original_metrics
649+
.push(Arc::new(Metric::new(MetricValue::OutputRows(c1), Some(1))));
650+
651+
let metric_plan = Arc::new(EmptyExec::new(schema).with_metrics(original_metrics));
652+
let mut metric_local = FFI_ExecutionPlan::new(metric_plan, None);
653+
metric_local.library_marker_id = crate::mock_foreign_marker_id;
654+
let metric_foreign: Arc<dyn ExecutionPlan> = (&metric_local).try_into()?;
655+
656+
let observed = metric_foreign.metrics().expect("metrics should be present");
657+
assert_eq!(observed.output_rows(), Some(42));
658+
659+
Ok(())
660+
}
661+
590662
#[test]
591663
fn test_ffi_execution_plan_local_bypass() {
592664
let schema = Arc::new(arrow::datatypes::Schema::new(vec![

0 commit comments

Comments
 (0)