diff --git a/datafusion/datasource-parquet/Cargo.toml b/datafusion/datasource-parquet/Cargo.toml index a5855af17a536..a1a45986a7697 100644 --- a/datafusion/datasource-parquet/Cargo.toml +++ b/datafusion/datasource-parquet/Cargo.toml @@ -86,3 +86,7 @@ harness = false [[bench]] name = "parquet_struct_filter_pushdown" harness = false + +[[bench]] +name = "parquet_metadata_statistics" +harness = false diff --git a/datafusion/datasource-parquet/benches/parquet_metadata_statistics.rs b/datafusion/datasource-parquet/benches/parquet_metadata_statistics.rs new file mode 100644 index 0000000000000..46ebd100fde88 --- /dev/null +++ b/datafusion/datasource-parquet/benches/parquet_metadata_statistics.rs @@ -0,0 +1,303 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks for deriving DataFusion table statistics from Parquet metadata. +//! +//! This mirrors the structure of Arrow's `arrow_statistics` benchmark: build +//! Parquet metadata once, then repeatedly measure statistics extraction. The +//! benchmark targets the cold planning/statistics path used by listing tables. + +use std::hint::black_box; +use std::sync::Arc; + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use criterion::{BatchSize, BenchmarkId, Criterion, criterion_group, criterion_main}; +use datafusion_datasource_parquet::metadata::DFParquetMetadata; +use parquet::arrow::ArrowSchemaConverter; +use parquet::data_type::ByteArray; +use parquet::file::metadata::{ + ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData, +}; +use parquet::file::statistics::{Statistics as ParquetStatistics, ValueStatistics}; + +const ROWS_PER_GROUP: usize = 8; + +#[derive(Debug, Copy, Clone)] +struct BenchmarkSpec { + columns: usize, + row_groups: usize, + metadata: MetadataState, +} + +#[derive(Debug, Copy, Clone)] +enum MetadataState { + Full, + Mixed, + None, +} + +impl std::fmt::Display for MetadataState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Full => write!(f, "full"), + Self::Mixed => write!(f, "mixed"), + Self::None => write!(f, "none"), + } + } +} + +struct BenchmarkCase { + schema: SchemaRef, + metadata: ParquetMetaData, +} + +fn parquet_metadata_statistics(c: &mut Criterion) { + let metadata_states = [ + MetadataState::Full, + MetadataState::Mixed, + MetadataState::None, + ]; + let column_counts = [8, 64, 256]; + let row_group_counts = [1, 32, 128]; + + let mut group = c.benchmark_group("parquet_metadata_statistics"); + + for metadata in metadata_states { + for columns in column_counts { + for row_groups in row_group_counts { + let spec = BenchmarkSpec { + columns, + row_groups, + metadata, + }; + group.bench_function( + BenchmarkId::from_parameter(format!( + "metadata_{}_col_{}_rg_{}", + spec.metadata, spec.columns, spec.row_groups, + )), + |b| { + b.iter_batched( + || BenchmarkCase::new(spec), + |case| { + let statistics = + DFParquetMetadata::statistics_from_parquet_metadata( + black_box(&case.metadata), + black_box(&case.schema), + ) + .expect("statistics extraction failed"); + black_box(statistics); + }, + BatchSize::PerIteration, + ); + }, + ); + } + } + } + + group.finish(); +} + +impl BenchmarkCase { + fn new(spec: BenchmarkSpec) -> Self { + let schema = make_schema(spec.columns); + let metadata = match spec.metadata { + MetadataState::Full => { + make_synthetic_metadata(&schema, spec, full_statistics) + } + MetadataState::Mixed => { + make_synthetic_metadata(&schema, spec, mixed_statistics) + } + MetadataState::None => make_synthetic_metadata(&schema, spec, |_, _, _| None), + }; + + Self { schema, metadata } + } +} + +fn make_synthetic_metadata( + schema: &SchemaRef, + spec: BenchmarkSpec, + statistics: fn(&DataType, usize, usize) -> Option, +) -> ParquetMetaData { + let schema_descr = Arc::new( + ArrowSchemaConverter::new() + .convert(schema.as_ref()) + .expect("failed to convert arrow schema"), + ); + let row_groups = (0..spec.row_groups) + .map(|row_group| { + let columns = schema + .fields() + .iter() + .enumerate() + .map(|(column_idx, field)| { + let mut builder = + ColumnChunkMetaData::builder(schema_descr.column(column_idx)); + if let Some(statistics) = + statistics(field.data_type(), column_idx, row_group) + { + builder = builder.set_statistics(statistics); + } + builder + .set_num_values(ROWS_PER_GROUP as i64) + .build() + .expect("failed to build column metadata") + }) + .collect::>(); + + RowGroupMetaData::builder(Arc::clone(&schema_descr)) + .set_num_rows(ROWS_PER_GROUP as i64) + .set_total_byte_size((spec.columns * ROWS_PER_GROUP * 8) as i64) + .set_column_metadata(columns) + .build() + .expect("failed to build row group metadata") + }) + .collect::>(); + + let file_metadata = FileMetaData::new( + 1, + (spec.row_groups * ROWS_PER_GROUP) as i64, + Some("datafusion parquet metadata benchmark".to_string()), + None, + schema_descr, + None, + ); + + ParquetMetaData::new(file_metadata, row_groups) +} + +fn full_statistics( + data_type: &DataType, + column_idx: usize, + row_group: usize, +) -> Option { + Some(statistics( + data_type, + column_idx, + row_group, + true, + true, + Some(null_count_for_rows()), + )) +} + +fn mixed_statistics( + data_type: &DataType, + column_idx: usize, + row_group: usize, +) -> Option { + if column_idx.is_multiple_of(16) || row_group.is_multiple_of(5) { + return None; + } + + let min_exact = !row_group.is_multiple_of(3); + let max_exact = !row_group.is_multiple_of(4); + let null_count = (!row_group.is_multiple_of(7)).then(null_count_for_rows); + + Some(statistics( + data_type, column_idx, row_group, min_exact, max_exact, null_count, + )) +} + +fn statistics( + data_type: &DataType, + column_idx: usize, + row_group: usize, + min_exact: bool, + max_exact: bool, + null_count: Option, +) -> ParquetStatistics { + let min_row = first_non_null_row(); + let max_row = last_non_null_row(); + + match data_type { + DataType::Int64 => { + let min = min_row.map(|row| value(column_idx, row_group, row)); + let max = max_row.map(|row| value(column_idx, row_group, row)); + ParquetStatistics::Int64( + ValueStatistics::new(min, max, None, null_count, false) + .with_min_is_exact(min_exact) + .with_max_is_exact(max_exact), + ) + } + DataType::Float64 => { + let min = min_row.map(|row| value(column_idx, row_group, row) as f64 * 1.5); + let max = max_row.map(|row| value(column_idx, row_group, row) as f64 * 1.5); + ParquetStatistics::Double( + ValueStatistics::new(min, max, None, null_count, false) + .with_min_is_exact(min_exact) + .with_max_is_exact(max_exact), + ) + } + DataType::Utf8 => { + let min = min_row.map(|row| { + ByteArray::from(string_value(column_idx, row_group, row).into_bytes()) + }); + let max = max_row.map(|row| { + ByteArray::from(string_value(column_idx, row_group, row).into_bytes()) + }); + ParquetStatistics::ByteArray( + ValueStatistics::new(min, max, None, null_count, false) + .with_min_is_exact(min_exact) + .with_max_is_exact(max_exact), + ) + } + other => unreachable!("unsupported benchmark data type: {other:?}"), + } +} + +fn make_schema(columns: usize) -> SchemaRef { + let fields = (0..columns) + .map(|idx| { + let data_type = match idx % 4 { + 0 => DataType::Int64, + 1 => DataType::Float64, + 2 => DataType::Utf8, + _ => DataType::Int64, + }; + Field::new(format!("c{idx:04}"), data_type, true) + }) + .collect::>(); + + Arc::new(Schema::new(fields)) +} + +fn first_non_null_row() -> Option { + (0..ROWS_PER_GROUP).find(|row| !row.is_multiple_of(7)) +} + +fn last_non_null_row() -> Option { + (0..ROWS_PER_GROUP).rev().find(|row| !row.is_multiple_of(7)) +} + +fn null_count_for_rows() -> u64 { + (0..ROWS_PER_GROUP) + .filter(|row| row.is_multiple_of(7)) + .count() as u64 +} + +fn value(column_idx: usize, row_group: usize, row: usize) -> i64 { + (column_idx as i64 * 10_000) + (row_group as i64 * 100) + row as i64 +} + +fn string_value(column_idx: usize, row_group: usize, row: usize) -> String { + format!("s{column_idx:04}_{row_group:04}_{row:04}") +} + +criterion_group!(benches, parquet_metadata_statistics); +criterion_main!(benches); diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index c32e45935636f..d3831766a42ab 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -20,9 +20,8 @@ use crate::{Int96Coercer, apply_file_schema_type_coercions}; use arrow::array::{Array, ArrayRef, BooleanArray}; -use arrow::compute::and; use arrow::compute::kernels::cmp::eq; -use arrow::compute::sum; +use arrow::compute::{and, sum}; use arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit}; use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::stats::Precision; @@ -46,6 +45,7 @@ use parquet::file::metadata::{ PageIndexPolicy, ParquetMetaData, ParquetMetaDataPushDecoder, RowGroupMetaData, SortingColumn, }; +use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::schema::types::SchemaDescriptor; use std::any::Any; use std::collections::HashMap; @@ -353,13 +353,12 @@ impl<'a> DFParquetMetadata<'a> { distinct_counts_array: &mut distinct_counts_array, }; summarize_column_statistics( - file_metadata.schema_descr(), logical_file_schema, - &physical_file_schema, &mut accumulators, idx, &stats_converter, row_groups_metadata, + num_rows, ) .ok(); } @@ -506,119 +505,208 @@ impl StatisticsAccumulators<'_> { } fn summarize_column_statistics( - parquet_schema: &SchemaDescriptor, logical_file_schema: &Schema, - physical_file_schema: &Schema, accumulators: &mut StatisticsAccumulators, logical_schema_index: usize, stats_converter: &StatisticsConverter, row_groups_metadata: &[RowGroupMetaData], + num_rows: usize, ) -> Result<()> { - let max_values = stats_converter.row_group_maxes(row_groups_metadata)?; - let min_values = stats_converter.row_group_mins(row_groups_metadata)?; - let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?; - let is_max_value_exact_stat = - stats_converter.row_group_is_max_value_exact(row_groups_metadata)?; - let is_min_value_exact_stat = - stats_converter.row_group_is_min_value_exact(row_groups_metadata)?; + let parquet_index = stats_converter.parquet_column_index(); if let Some(max_acc) = &mut accumulators.max_accs[logical_schema_index] { - max_acc.update_batch(&[Arc::clone(&max_values)])?; - - // handle the common special case when all row groups have exact statistics - let exactness = &is_max_value_exact_stat; - if !exactness.is_empty() && exactness.null_count() == 0 && !exactness.has_false() - { - accumulators.is_max_value_exact[logical_schema_index] = Some(true); - } else if !exactness.has_true() { - accumulators.is_max_value_exact[logical_schema_index] = Some(false); - } else { - let val = max_acc.evaluate()?; - accumulators.is_max_value_exact[logical_schema_index] = - has_any_exact_match(&val, &max_values, exactness); - } + accumulators.is_max_value_exact[logical_schema_index] = summarize_bound( + max_acc, + &stats_converter.row_group_maxes(row_groups_metadata)?, + parquet_index, + row_groups_metadata, + ParquetStatistics::max_is_exact, + || Ok(stats_converter.row_group_is_max_value_exact(row_groups_metadata)?), + )?; } if let Some(min_acc) = &mut accumulators.min_accs[logical_schema_index] { - min_acc.update_batch(&[Arc::clone(&min_values)])?; + accumulators.is_min_value_exact[logical_schema_index] = summarize_bound( + min_acc, + &stats_converter.row_group_mins(row_groups_metadata)?, + parquet_index, + row_groups_metadata, + ParquetStatistics::min_is_exact, + || Ok(stats_converter.row_group_is_min_value_exact(row_groups_metadata)?), + )?; + } + + accumulators.null_counts_array[logical_schema_index] = + summarize_null_counts(stats_converter, row_groups_metadata)?; + + accumulators.distinct_counts_array[logical_schema_index] = + summarize_distinct_counts(parquet_index, row_groups_metadata); + + let arrow_field = logical_file_schema.field(logical_schema_index); + accumulators.column_byte_sizes[logical_schema_index] = compute_arrow_column_size( + arrow_field.data_type(), + row_groups_metadata, + parquet_index, + num_rows, + ); + + Ok(()) +} - // handle the common special case when all row groups have exact statistics - let exactness = &is_min_value_exact_stat; - if !exactness.is_empty() && exactness.null_count() == 0 && !exactness.has_false() +/// Feed a column's per-row-group min or max `values` into `acc` and decide +/// whether the resulting bound is exact across all row groups. +/// +/// `is_exact` reads the per-row-group exactness flag straight from the raw +/// parquet statistics. `row_group_exactness` rebuilds the exactness as a Boolean +/// array and is only called for the rare case where row groups disagree. +fn summarize_bound( + acc: &mut A, + values: &ArrayRef, + parquet_index: Option, + row_groups_metadata: &[RowGroupMetaData], + is_exact: impl Fn(&ParquetStatistics) -> bool, + row_group_exactness: impl FnOnce() -> Result, +) -> Result> { + acc.update_batch(&[Arc::clone(values)])?; + + Ok( + match summarize_row_group_exactness(parquet_index, row_groups_metadata, is_exact) { - accumulators.is_min_value_exact[logical_schema_index] = Some(true); - } else if !exactness.has_true() { - accumulators.is_min_value_exact[logical_schema_index] = Some(false); - } else { - let val = min_acc.evaluate()?; - accumulators.is_min_value_exact[logical_schema_index] = - has_any_exact_match(&val, &min_values, exactness); - } + ExactnessSummary::AllExact => Some(true), + ExactnessSummary::NoneExact => Some(false), + ExactnessSummary::Mixed => { + let exactness = row_group_exactness()?; + has_any_exact_match(&acc.evaluate()?, values, &exactness) + } + }, + ) +} + +fn summarize_null_counts( + stats_converter: &StatisticsConverter, + row_groups_metadata: &[RowGroupMetaData], +) -> Result> { + if row_groups_metadata.is_empty() { + return Ok(Precision::Exact(0)); } - accumulators.null_counts_array[logical_schema_index] = match sum(&null_counts) { - Some(null_count) => Precision::Exact(null_count as usize), + let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?; + + match sum(&null_counts) { + Some(count) => { + // If any row group has an unknown null_count, either because column + // statistics are absent or because the null_count field is omitted, + // report the aggregate as inexact. + if null_counts.null_count() > 0 { + Ok(Precision::Inexact(count as usize)) + } else { + Ok(Precision::Exact(count as usize)) + } + } None => match null_counts.len() { // If sum() returned None we either have no rows or all values are null - 0 => Precision::Exact(0), - _ => Precision::Absent, + 0 => Ok(Precision::Exact(0)), + _ => Ok(Precision::Absent), }, + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +enum ExactnessSummary { + AllExact, + NoneExact, + Mixed, +} + +fn summarize_row_group_exactness( + parquet_idx: Option, + row_groups_metadata: &[RowGroupMetaData], + exactness: impl Fn(&ParquetStatistics) -> bool, +) -> ExactnessSummary { + let Some(parquet_idx) = parquet_idx else { + return ExactnessSummary::NoneExact; }; - // This is the same logic as parquet_column but we start from arrow schema index - // instead of looking up by name. - let parquet_index = parquet_column( - parquet_schema, - physical_file_schema, - logical_file_schema.field(logical_schema_index).name(), - ) - .map(|(idx, _)| idx); + summarize_exactness(row_groups_metadata.iter().map(|row_group| { + row_group + .columns() + .get(parquet_idx) + .and_then(|column| column.statistics()) + .map(&exactness) + })) +} - // Extract distinct counts from row group column statistics - accumulators.distinct_counts_array[logical_schema_index] = - if let Some(parquet_idx) = parquet_index { - let num_row_groups = row_groups_metadata.len(); - let distinct_counts: Vec = row_groups_metadata - .iter() - .filter_map(|rg| { - rg.columns() - .get(parquet_idx) - .and_then(|col| col.statistics()) - .and_then(|stats| stats.distinct_count_opt()) - }) - .collect(); +fn summarize_exactness(exactness: I) -> ExactnessSummary +where + I: IntoIterator>, +{ + let mut has_true = false; + let mut has_false_or_null = false; + + for exactness in exactness { + match exactness { + Some(true) => has_true = true, + Some(false) | None => has_false_or_null = true, + } - let coverage = distinct_counts.len() as f64 / num_row_groups.max(1) as f64; + if has_true && has_false_or_null { + return ExactnessSummary::Mixed; + } + } - if coverage < PARTIAL_NDV_THRESHOLD { - Precision::Absent - } else if distinct_counts.len() == 1 && num_row_groups == 1 { - // Single row group with distinct count - use exact value - Precision::Exact(distinct_counts[0] as usize) - } else { - // Multiple row groups - use max as a lower bound estimate - // (can't accurately merge NDV since duplicates may exist across row groups) - match distinct_counts.iter().max() { - Some(&max_ndv) => Precision::Inexact(max_ndv as usize), - None => Precision::Absent, - } - } - } else { - Precision::Absent - }; + if has_true { + ExactnessSummary::AllExact + } else { + ExactnessSummary::NoneExact + } +} - let arrow_field = logical_file_schema.field(logical_schema_index); - accumulators.column_byte_sizes[logical_schema_index] = compute_arrow_column_size( - arrow_field.data_type(), - row_groups_metadata, - parquet_index, - row_groups_metadata - .iter() - .map(|rg| rg.num_rows() as usize) - .sum(), - ); +/// Extract distinct counts from row group column statistics. +fn summarize_distinct_counts( + parquet_idx: Option, + row_groups_metadata: &[RowGroupMetaData], +) -> Precision { + let Some(parquet_idx) = parquet_idx else { + return Precision::Absent; + }; - Ok(()) + let num_row_groups = row_groups_metadata.len(); + if num_row_groups == 0 { + return Precision::Absent; + } + + let required_count = (num_row_groups as f64 * PARTIAL_NDV_THRESHOLD).ceil() as usize; + let mut ndv_count = 0; + let mut max_distinct_count: Option = None; + + for (row_group_idx, row_group) in row_groups_metadata.iter().enumerate() { + if let Some(distinct_count) = row_group + .columns() + .get(parquet_idx) + .and_then(|col| col.statistics()) + .and_then(|stats| stats.distinct_count_opt()) + { + ndv_count += 1; + max_distinct_count = Some(match max_distinct_count { + Some(max) => max.max(distinct_count), + None => distinct_count, + }); + } + + // Return early if there's no chance to reach the required coverage. + let remaining = num_row_groups - row_group_idx - 1; + if ndv_count + remaining < required_count { + return Precision::Absent; + } + } + + match max_distinct_count { + Some(distinct_count) if num_row_groups == 1 => { + Precision::Exact(distinct_count as usize) + } + Some(distinct_count) => Precision::Inexact(distinct_count as usize), + None => Precision::Absent, + } } /// Compute the Arrow in-memory size for a single column @@ -866,6 +954,30 @@ mod tests { } } + #[test] + fn test_summarize_exactness() { + assert_eq!( + summarize_exactness([Some(true), Some(true)]), + ExactnessSummary::AllExact + ); + assert_eq!( + summarize_exactness([Some(false), None]), + ExactnessSummary::NoneExact + ); + assert_eq!( + summarize_exactness([Some(true), Some(false)]), + ExactnessSummary::Mixed + ); + assert_eq!( + summarize_exactness([Some(true), None]), + ExactnessSummary::Mixed + ); + assert_eq!( + summarize_exactness(std::iter::empty()), + ExactnessSummary::NoneExact + ); + } + mod ndv_tests { use super::*; use arrow::datatypes::Field; @@ -951,6 +1063,92 @@ mod tests { ParquetMetaData::new(file_meta, row_groups) } + #[test] + fn test_summarize_null_counts() { + let schema_descr = create_schema_descr(1); + let arrow_schema = create_arrow_schema(2); + let stats_with_count = + ParquetStatistics::int32(Some(1), Some(10), None, Some(2), false); + let stats_without_count = + ParquetStatistics::int32(Some(1), Some(10), None, None, false); + + let row_groups = vec![ + create_row_group_with_stats( + &schema_descr, + vec![Some(stats_with_count)], + 10, + ), + create_row_group_with_stats( + &schema_descr, + vec![Some(stats_without_count.clone())], + 10, + ), + create_row_group_with_stats(&schema_descr, vec![None], 10), + ]; + let stats_converter = + StatisticsConverter::try_new("col_0", &arrow_schema, &schema_descr) + .unwrap(); + let missing_column_converter = + StatisticsConverter::try_new("col_1", &arrow_schema, &schema_descr) + .unwrap(); + + assert_eq!( + summarize_null_counts(&stats_converter, &row_groups).unwrap(), + Precision::Inexact(2) + ); + assert_eq!( + summarize_null_counts(&missing_column_converter, &row_groups).unwrap(), + Precision::Absent + ); + assert_eq!( + summarize_null_counts(&stats_converter, &[]).unwrap(), + Precision::Exact(0) + ); + assert_eq!( + summarize_null_counts(&missing_column_converter, &[]).unwrap(), + Precision::Exact(0) + ); + + let missing_counts_unknown_converter = + StatisticsConverter::try_new("col_0", &arrow_schema, &schema_descr) + .unwrap() + .with_missing_null_counts_as_zero(false); + assert_eq!( + summarize_null_counts(&missing_counts_unknown_converter, &row_groups) + .unwrap(), + Precision::Inexact(2) + ); + + let row_groups_without_count = vec![ + create_row_group_with_stats( + &schema_descr, + vec![Some(stats_without_count.clone())], + 10, + ), + create_row_group_with_stats( + &schema_descr, + vec![Some(stats_without_count)], + 10, + ), + ]; + assert_eq!( + summarize_null_counts(&stats_converter, &row_groups_without_count) + .unwrap(), + Precision::Exact(0) + ); + + let missing_counts_unknown_converter = + stats_converter.with_missing_null_counts_as_zero(false); + assert_eq!( + summarize_null_counts( + &missing_counts_unknown_converter, + &row_groups_without_count, + ) + .unwrap(), + Precision::Absent + ); + } + #[test] fn test_distinct_count_single_row_group_with_ndv() { // Single row group with distinct count should return Exact