Skip to content

Commit a7c2f7d

Browse files
AdamGSxudong963Dandandan
authored
Optimize Parquet metadata row-group level statistics collection (#22462)
## Which issue does this PR close? - Closes #. ## Rationale for this change The current stats aggregation does a bunch of unnecessary work, this PR tries to do the minimal amount of work at every step. ## What changes are included in this PR? In addition to splitting up the summarization logic into some clearer functions and a reusable function for min/max, I've tried to do the minimal amount of work at each step: 1. Only allocate boolean masks if there's a mix of exact/inexact stats between row groups. 2. No need to allocate an Arrow array for null count. 3. No need to re-calculate the parquet column index - its already in `stats_converter`, as far as I can tell its exactly the same code path. 4. No need to recalculate the number of rows - we already know it. I've also included a benchmark, the effect on my laptop is: ``` parquet_metadata_statistics/wide_one_row_group time: [2.9945 ms 3.0313 ms 3.0487 ms] change: [−44.473% −43.790% −43.044%] (p = 0.00 < 0.05) Performance has improved. Benchmarking parquet_metadata_statistics/moderate_width_many_row_groups: Collecting 10 samples in estimated 5 parquet_metadata_statistics/moderate_width_many_row_groups time: [236.75 µs 237.37 µs 238.48 µs] change: [−22.330% −21.550% −20.794%] (p = 0.00 < 0.05) Performance has improved. Found 1 outliers among 10 measurements (10.00%) 1 (10.00%) high severe Benchmarking parquet_metadata_statistics/wide_many_row_groups: Collecting 10 samples in estimated 5.0127 s (7 parquet_metadata_statistics/wide_many_row_groups time: [628.67 µs 636.88 µs 645.79 µs] change: [−29.409% −28.225% −26.999%] (p = 0.00 < 0.05) Performance has improved. Found 1 outliers among 10 measurements (10.00%) 1 (10.00%) high mild ``` ## Are these changes tested? Existing tests and few additional small unit tests. ## Are there any user-facing changes? None --------- Signed-off-by: Adam Gutglick <adamgsal@gmail.com> Co-authored-by: xudong.w <wxd963996380@gmail.com> Co-authored-by: Daniël Heres <danielheres@gmail.com>
1 parent 0e17880 commit a7c2f7d

3 files changed

Lines changed: 597 additions & 92 deletions

File tree

datafusion/datasource-parquet/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,7 @@ harness = false
8787
[[bench]]
8888
name = "parquet_struct_filter_pushdown"
8989
harness = false
90+
91+
[[bench]]
92+
name = "parquet_metadata_statistics"
93+
harness = false
Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmarks for deriving DataFusion table statistics from Parquet metadata.
19+
//!
20+
//! This mirrors the structure of Arrow's `arrow_statistics` benchmark: build
21+
//! Parquet metadata once, then repeatedly measure statistics extraction. The
22+
//! benchmark targets the cold planning/statistics path used by listing tables.
23+
24+
use std::hint::black_box;
25+
use std::sync::Arc;
26+
27+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
28+
use criterion::{BatchSize, BenchmarkId, Criterion, criterion_group, criterion_main};
29+
use datafusion_datasource_parquet::metadata::DFParquetMetadata;
30+
use parquet::arrow::ArrowSchemaConverter;
31+
use parquet::data_type::ByteArray;
32+
use parquet::file::metadata::{
33+
ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData,
34+
};
35+
use parquet::file::statistics::{Statistics as ParquetStatistics, ValueStatistics};
36+
37+
const ROWS_PER_GROUP: usize = 8;
38+
39+
#[derive(Debug, Copy, Clone)]
40+
struct BenchmarkSpec {
41+
columns: usize,
42+
row_groups: usize,
43+
metadata: MetadataState,
44+
}
45+
46+
#[derive(Debug, Copy, Clone)]
47+
enum MetadataState {
48+
Full,
49+
Mixed,
50+
None,
51+
}
52+
53+
impl std::fmt::Display for MetadataState {
54+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55+
match self {
56+
Self::Full => write!(f, "full"),
57+
Self::Mixed => write!(f, "mixed"),
58+
Self::None => write!(f, "none"),
59+
}
60+
}
61+
}
62+
63+
struct BenchmarkCase {
64+
schema: SchemaRef,
65+
metadata: ParquetMetaData,
66+
}
67+
68+
fn parquet_metadata_statistics(c: &mut Criterion) {
69+
let metadata_states = [
70+
MetadataState::Full,
71+
MetadataState::Mixed,
72+
MetadataState::None,
73+
];
74+
let column_counts = [8, 64, 256];
75+
let row_group_counts = [1, 32, 128];
76+
77+
let mut group = c.benchmark_group("parquet_metadata_statistics");
78+
79+
for metadata in metadata_states {
80+
for columns in column_counts {
81+
for row_groups in row_group_counts {
82+
let spec = BenchmarkSpec {
83+
columns,
84+
row_groups,
85+
metadata,
86+
};
87+
group.bench_function(
88+
BenchmarkId::from_parameter(format!(
89+
"metadata_{}_col_{}_rg_{}",
90+
spec.metadata, spec.columns, spec.row_groups,
91+
)),
92+
|b| {
93+
b.iter_batched(
94+
|| BenchmarkCase::new(spec),
95+
|case| {
96+
let statistics =
97+
DFParquetMetadata::statistics_from_parquet_metadata(
98+
black_box(&case.metadata),
99+
black_box(&case.schema),
100+
)
101+
.expect("statistics extraction failed");
102+
black_box(statistics);
103+
},
104+
BatchSize::PerIteration,
105+
);
106+
},
107+
);
108+
}
109+
}
110+
}
111+
112+
group.finish();
113+
}
114+
115+
impl BenchmarkCase {
116+
fn new(spec: BenchmarkSpec) -> Self {
117+
let schema = make_schema(spec.columns);
118+
let metadata = match spec.metadata {
119+
MetadataState::Full => {
120+
make_synthetic_metadata(&schema, spec, full_statistics)
121+
}
122+
MetadataState::Mixed => {
123+
make_synthetic_metadata(&schema, spec, mixed_statistics)
124+
}
125+
MetadataState::None => make_synthetic_metadata(&schema, spec, |_, _, _| None),
126+
};
127+
128+
Self { schema, metadata }
129+
}
130+
}
131+
132+
fn make_synthetic_metadata(
133+
schema: &SchemaRef,
134+
spec: BenchmarkSpec,
135+
statistics: fn(&DataType, usize, usize) -> Option<ParquetStatistics>,
136+
) -> ParquetMetaData {
137+
let schema_descr = Arc::new(
138+
ArrowSchemaConverter::new()
139+
.convert(schema.as_ref())
140+
.expect("failed to convert arrow schema"),
141+
);
142+
let row_groups = (0..spec.row_groups)
143+
.map(|row_group| {
144+
let columns = schema
145+
.fields()
146+
.iter()
147+
.enumerate()
148+
.map(|(column_idx, field)| {
149+
let mut builder =
150+
ColumnChunkMetaData::builder(schema_descr.column(column_idx));
151+
if let Some(statistics) =
152+
statistics(field.data_type(), column_idx, row_group)
153+
{
154+
builder = builder.set_statistics(statistics);
155+
}
156+
builder
157+
.set_num_values(ROWS_PER_GROUP as i64)
158+
.build()
159+
.expect("failed to build column metadata")
160+
})
161+
.collect::<Vec<_>>();
162+
163+
RowGroupMetaData::builder(Arc::clone(&schema_descr))
164+
.set_num_rows(ROWS_PER_GROUP as i64)
165+
.set_total_byte_size((spec.columns * ROWS_PER_GROUP * 8) as i64)
166+
.set_column_metadata(columns)
167+
.build()
168+
.expect("failed to build row group metadata")
169+
})
170+
.collect::<Vec<_>>();
171+
172+
let file_metadata = FileMetaData::new(
173+
1,
174+
(spec.row_groups * ROWS_PER_GROUP) as i64,
175+
Some("datafusion parquet metadata benchmark".to_string()),
176+
None,
177+
schema_descr,
178+
None,
179+
);
180+
181+
ParquetMetaData::new(file_metadata, row_groups)
182+
}
183+
184+
fn full_statistics(
185+
data_type: &DataType,
186+
column_idx: usize,
187+
row_group: usize,
188+
) -> Option<ParquetStatistics> {
189+
Some(statistics(
190+
data_type,
191+
column_idx,
192+
row_group,
193+
true,
194+
true,
195+
Some(null_count_for_rows()),
196+
))
197+
}
198+
199+
fn mixed_statistics(
200+
data_type: &DataType,
201+
column_idx: usize,
202+
row_group: usize,
203+
) -> Option<ParquetStatistics> {
204+
if column_idx.is_multiple_of(16) || row_group.is_multiple_of(5) {
205+
return None;
206+
}
207+
208+
let min_exact = !row_group.is_multiple_of(3);
209+
let max_exact = !row_group.is_multiple_of(4);
210+
let null_count = (!row_group.is_multiple_of(7)).then(null_count_for_rows);
211+
212+
Some(statistics(
213+
data_type, column_idx, row_group, min_exact, max_exact, null_count,
214+
))
215+
}
216+
217+
fn statistics(
218+
data_type: &DataType,
219+
column_idx: usize,
220+
row_group: usize,
221+
min_exact: bool,
222+
max_exact: bool,
223+
null_count: Option<u64>,
224+
) -> ParquetStatistics {
225+
let min_row = first_non_null_row();
226+
let max_row = last_non_null_row();
227+
228+
match data_type {
229+
DataType::Int64 => {
230+
let min = min_row.map(|row| value(column_idx, row_group, row));
231+
let max = max_row.map(|row| value(column_idx, row_group, row));
232+
ParquetStatistics::Int64(
233+
ValueStatistics::new(min, max, None, null_count, false)
234+
.with_min_is_exact(min_exact)
235+
.with_max_is_exact(max_exact),
236+
)
237+
}
238+
DataType::Float64 => {
239+
let min = min_row.map(|row| value(column_idx, row_group, row) as f64 * 1.5);
240+
let max = max_row.map(|row| value(column_idx, row_group, row) as f64 * 1.5);
241+
ParquetStatistics::Double(
242+
ValueStatistics::new(min, max, None, null_count, false)
243+
.with_min_is_exact(min_exact)
244+
.with_max_is_exact(max_exact),
245+
)
246+
}
247+
DataType::Utf8 => {
248+
let min = min_row.map(|row| {
249+
ByteArray::from(string_value(column_idx, row_group, row).into_bytes())
250+
});
251+
let max = max_row.map(|row| {
252+
ByteArray::from(string_value(column_idx, row_group, row).into_bytes())
253+
});
254+
ParquetStatistics::ByteArray(
255+
ValueStatistics::new(min, max, None, null_count, false)
256+
.with_min_is_exact(min_exact)
257+
.with_max_is_exact(max_exact),
258+
)
259+
}
260+
other => unreachable!("unsupported benchmark data type: {other:?}"),
261+
}
262+
}
263+
264+
fn make_schema(columns: usize) -> SchemaRef {
265+
let fields = (0..columns)
266+
.map(|idx| {
267+
let data_type = match idx % 4 {
268+
0 => DataType::Int64,
269+
1 => DataType::Float64,
270+
2 => DataType::Utf8,
271+
_ => DataType::Int64,
272+
};
273+
Field::new(format!("c{idx:04}"), data_type, true)
274+
})
275+
.collect::<Vec<_>>();
276+
277+
Arc::new(Schema::new(fields))
278+
}
279+
280+
fn first_non_null_row() -> Option<usize> {
281+
(0..ROWS_PER_GROUP).find(|row| !row.is_multiple_of(7))
282+
}
283+
284+
fn last_non_null_row() -> Option<usize> {
285+
(0..ROWS_PER_GROUP).rev().find(|row| !row.is_multiple_of(7))
286+
}
287+
288+
fn null_count_for_rows() -> u64 {
289+
(0..ROWS_PER_GROUP)
290+
.filter(|row| row.is_multiple_of(7))
291+
.count() as u64
292+
}
293+
294+
fn value(column_idx: usize, row_group: usize, row: usize) -> i64 {
295+
(column_idx as i64 * 10_000) + (row_group as i64 * 100) + row as i64
296+
}
297+
298+
fn string_value(column_idx: usize, row_group: usize, row: usize) -> String {
299+
format!("s{column_idx:04}_{row_group:04}_{row:04}")
300+
}
301+
302+
criterion_group!(benches, parquet_metadata_statistics);
303+
criterion_main!(benches);

0 commit comments

Comments
 (0)