Skip to content

Commit c65243a

Browse files
committed
code review
1 parent 9aede6b commit c65243a

2 files changed

Lines changed: 297 additions & 152 deletions

File tree

datafusion/datasource-parquet/benches/parquet_metadata_statistics.rs

Lines changed: 216 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -24,66 +24,89 @@
2424
use std::hint::black_box;
2525
use std::sync::Arc;
2626

27-
use arrow::array::{ArrayRef, Float64Array, Int64Array, RecordBatch, StringArray};
2827
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
29-
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
30-
use datafusion_common::stats::Precision;
28+
use criterion::{BatchSize, BenchmarkId, Criterion, criterion_group, criterion_main};
3129
use datafusion_datasource_parquet::metadata::DFParquetMetadata;
32-
use parquet::arrow::ArrowWriter;
33-
use parquet::file::metadata::ParquetMetaData;
34-
use parquet::file::properties::{EnabledStatistics, WriterProperties};
30+
use parquet::arrow::ArrowSchemaConverter;
31+
use parquet::data_type::ByteArray;
32+
use parquet::file::metadata::{
33+
ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData,
34+
};
35+
use parquet::file::statistics::{Statistics as ParquetStatistics, ValueStatistics};
36+
37+
const ROWS_PER_GROUP: usize = 8;
3538

3639
#[derive(Debug, Copy, Clone)]
3740
struct BenchmarkSpec {
38-
name: &'static str,
3941
columns: usize,
4042
row_groups: usize,
41-
rows_per_group: usize,
43+
metadata: MetadataState,
44+
}
45+
46+
#[derive(Debug, Copy, Clone)]
47+
enum MetadataState {
48+
Full,
49+
Mixed,
50+
None,
51+
}
52+
53+
impl std::fmt::Display for MetadataState {
54+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55+
match self {
56+
Self::Full => write!(f, "full"),
57+
Self::Mixed => write!(f, "mixed"),
58+
Self::None => write!(f, "none"),
59+
}
60+
}
4261
}
4362

4463
struct BenchmarkCase {
45-
spec: BenchmarkSpec,
4664
schema: SchemaRef,
4765
metadata: ParquetMetaData,
4866
}
4967

5068
fn parquet_metadata_statistics(c: &mut Criterion) {
51-
let specs = [
52-
BenchmarkSpec {
53-
name: "wide_one_row_group",
54-
columns: 1024,
55-
row_groups: 1,
56-
rows_per_group: 16,
57-
},
58-
BenchmarkSpec {
59-
name: "moderate_width_many_row_groups",
60-
columns: 64,
61-
row_groups: 128,
62-
rows_per_group: 8,
63-
},
64-
BenchmarkSpec {
65-
name: "wide_many_row_groups",
66-
columns: 256,
67-
row_groups: 32,
68-
rows_per_group: 8,
69-
},
69+
let metadata_states = [
70+
MetadataState::Full,
71+
MetadataState::Mixed,
72+
MetadataState::None,
7073
];
74+
let column_counts = [8, 64, 256];
75+
let row_group_counts = [1, 32, 128];
7176

72-
let cases: Vec<_> = specs.into_iter().map(BenchmarkCase::new).collect();
7377
let mut group = c.benchmark_group("parquet_metadata_statistics");
74-
group.sample_size(10);
75-
76-
for case in &cases {
77-
group.bench_function(BenchmarkId::from_parameter(case.spec.name), |b| {
78-
b.iter(|| {
79-
let statistics = DFParquetMetadata::statistics_from_parquet_metadata(
80-
black_box(&case.metadata),
81-
black_box(&case.schema),
82-
)
83-
.expect("statistics extraction failed");
84-
black_box(statistics);
85-
});
86-
});
78+
79+
for metadata in metadata_states {
80+
for columns in column_counts {
81+
for row_groups in row_group_counts {
82+
let spec = BenchmarkSpec {
83+
columns,
84+
row_groups,
85+
metadata,
86+
};
87+
group.bench_function(
88+
BenchmarkId::from_parameter(format!(
89+
"metadata_{}_col_{}_rg_{}",
90+
spec.metadata, spec.columns, spec.row_groups,
91+
)),
92+
|b| {
93+
b.iter_batched(
94+
|| BenchmarkCase::new(spec),
95+
|case| {
96+
let statistics =
97+
DFParquetMetadata::statistics_from_parquet_metadata(
98+
black_box(&case.metadata),
99+
black_box(&case.schema),
100+
)
101+
.expect("statistics extraction failed");
102+
black_box(statistics);
103+
},
104+
BatchSize::PerIteration,
105+
);
106+
},
107+
);
108+
}
109+
}
87110
}
88111

89112
group.finish();
@@ -92,115 +115,189 @@ fn parquet_metadata_statistics(c: &mut Criterion) {
92115
impl BenchmarkCase {
93116
fn new(spec: BenchmarkSpec) -> Self {
94117
let schema = make_schema(spec.columns);
95-
let props = WriterProperties::builder()
96-
.set_max_row_group_row_count(Some(spec.rows_per_group))
97-
.set_statistics_enabled(EnabledStatistics::Chunk)
98-
.build();
99-
let file = tempfile::Builder::new()
100-
.prefix("parquet_metadata_statistics")
101-
.suffix(".parquet")
102-
.tempfile()
103-
.expect("failed to create temporary parquet file");
104-
let mut writer = ArrowWriter::try_new(
105-
file.reopen().expect("failed to reopen temporary file"),
106-
Arc::clone(&schema),
107-
Some(props),
108-
)
109-
.expect("failed to create parquet writer");
110-
111-
for row_group in 0..spec.row_groups {
112-
writer
113-
.write(&make_batch(&schema, row_group, spec.rows_per_group))
114-
.expect("failed to write benchmark row group");
115-
}
118+
let metadata = match spec.metadata {
119+
MetadataState::Full => {
120+
make_synthetic_metadata(&schema, spec, full_statistics)
121+
}
122+
MetadataState::Mixed => {
123+
make_synthetic_metadata(&schema, spec, mixed_statistics)
124+
}
125+
MetadataState::None => make_synthetic_metadata(&schema, spec, |_, _, _| None),
126+
};
116127

117-
let metadata = writer.close().expect("failed to close parquet writer");
118-
assert_eq!(metadata.row_groups().len(), spec.row_groups);
119-
120-
let statistics =
121-
DFParquetMetadata::statistics_from_parquet_metadata(&metadata, &schema)
122-
.expect("failed to validate benchmark metadata");
123-
assert_eq!(statistics.column_statistics.len(), spec.columns);
124-
assert_eq!(
125-
statistics.num_rows,
126-
Precision::Exact(spec.row_groups * spec.rows_per_group)
127-
);
128-
129-
Self {
130-
spec,
131-
schema,
132-
metadata,
133-
}
128+
Self { schema, metadata }
134129
}
135130
}
136131

137-
fn make_schema(columns: usize) -> SchemaRef {
138-
let fields = (0..columns)
139-
.map(|idx| {
140-
let data_type = match idx % 4 {
141-
0 => DataType::Int64,
142-
1 => DataType::Float64,
143-
2 => DataType::Utf8,
144-
_ => DataType::Int64,
145-
};
146-
Field::new(format!("c{idx:04}"), data_type, true)
132+
fn make_synthetic_metadata(
133+
schema: &SchemaRef,
134+
spec: BenchmarkSpec,
135+
statistics: fn(&DataType, usize, usize) -> Option<ParquetStatistics>,
136+
) -> ParquetMetaData {
137+
let schema_descr = Arc::new(
138+
ArrowSchemaConverter::new()
139+
.convert(schema.as_ref())
140+
.expect("failed to convert arrow schema"),
141+
);
142+
let row_groups = (0..spec.row_groups)
143+
.map(|row_group| {
144+
let columns = schema
145+
.fields()
146+
.iter()
147+
.enumerate()
148+
.map(|(column_idx, field)| {
149+
let mut builder =
150+
ColumnChunkMetaData::builder(schema_descr.column(column_idx));
151+
if let Some(statistics) =
152+
statistics(field.data_type(), column_idx, row_group)
153+
{
154+
builder = builder.set_statistics(statistics);
155+
}
156+
builder
157+
.set_num_values(ROWS_PER_GROUP as i64)
158+
.build()
159+
.expect("failed to build column metadata")
160+
})
161+
.collect::<Vec<_>>();
162+
163+
RowGroupMetaData::builder(Arc::clone(&schema_descr))
164+
.set_num_rows(ROWS_PER_GROUP as i64)
165+
.set_total_byte_size((spec.columns * ROWS_PER_GROUP * 8) as i64)
166+
.set_column_metadata(columns)
167+
.build()
168+
.expect("failed to build row group metadata")
147169
})
148170
.collect::<Vec<_>>();
149171

150-
Arc::new(Schema::new(fields))
172+
let file_metadata = FileMetaData::new(
173+
1,
174+
(spec.row_groups * ROWS_PER_GROUP) as i64,
175+
Some("datafusion parquet metadata benchmark".to_string()),
176+
None,
177+
schema_descr,
178+
None,
179+
);
180+
181+
ParquetMetaData::new(file_metadata, row_groups)
151182
}
152183

153-
fn make_batch(
154-
schema: &SchemaRef,
184+
fn full_statistics(
185+
data_type: &DataType,
186+
column_idx: usize,
155187
row_group: usize,
156-
rows_per_group: usize,
157-
) -> RecordBatch {
158-
let columns = schema
159-
.fields()
160-
.iter()
161-
.enumerate()
162-
.map(|(column_idx, field)| {
163-
make_array(field.data_type(), column_idx, row_group, rows_per_group)
164-
})
165-
.collect::<Vec<_>>();
188+
) -> Option<ParquetStatistics> {
189+
Some(statistics(
190+
data_type,
191+
column_idx,
192+
row_group,
193+
true,
194+
true,
195+
Some(null_count_for_rows()),
196+
))
197+
}
166198

167-
RecordBatch::try_new(Arc::clone(schema), columns)
168-
.expect("failed to create benchmark record batch")
199+
fn mixed_statistics(
200+
data_type: &DataType,
201+
column_idx: usize,
202+
row_group: usize,
203+
) -> Option<ParquetStatistics> {
204+
if column_idx.is_multiple_of(16) || row_group.is_multiple_of(5) {
205+
return None;
206+
}
207+
208+
let min_exact = !row_group.is_multiple_of(3);
209+
let max_exact = !row_group.is_multiple_of(4);
210+
let null_count = (!row_group.is_multiple_of(7)).then(null_count_for_rows);
211+
212+
Some(statistics(
213+
data_type, column_idx, row_group, min_exact, max_exact, null_count,
214+
))
169215
}
170216

171-
fn make_array(
217+
fn statistics(
172218
data_type: &DataType,
173219
column_idx: usize,
174220
row_group: usize,
175-
rows_per_group: usize,
176-
) -> ArrayRef {
221+
min_exact: bool,
222+
max_exact: bool,
223+
null_count: Option<u64>,
224+
) -> ParquetStatistics {
225+
let min_row = first_non_null_row();
226+
let max_row = last_non_null_row();
227+
177228
match data_type {
178229
DataType::Int64 => {
179-
Arc::new(Int64Array::from_iter((0..rows_per_group).map(|row| {
180-
nullable_value(row, value(column_idx, row_group, row))
181-
})))
230+
let min = min_row.map(|row| value(column_idx, row_group, row));
231+
let max = max_row.map(|row| value(column_idx, row_group, row));
232+
ParquetStatistics::Int64(
233+
ValueStatistics::new(min, max, None, null_count, false)
234+
.with_min_is_exact(min_exact)
235+
.with_max_is_exact(max_exact),
236+
)
182237
}
183238
DataType::Float64 => {
184-
Arc::new(Float64Array::from_iter((0..rows_per_group).map(|row| {
185-
nullable_value(row, value(column_idx, row_group, row) as f64 * 1.5)
186-
})))
239+
let min = min_row.map(|row| value(column_idx, row_group, row) as f64 * 1.5);
240+
let max = max_row.map(|row| value(column_idx, row_group, row) as f64 * 1.5);
241+
ParquetStatistics::Double(
242+
ValueStatistics::new(min, max, None, null_count, false)
243+
.with_min_is_exact(min_exact)
244+
.with_max_is_exact(max_exact),
245+
)
187246
}
188247
DataType::Utf8 => {
189-
Arc::new(StringArray::from_iter((0..rows_per_group).map(|row| {
190-
nullable_value(row, format!("s{column_idx}_{row_group}_{row}"))
191-
})))
248+
let min = min_row.map(|row| {
249+
ByteArray::from(string_value(column_idx, row_group, row).into_bytes())
250+
});
251+
let max = max_row.map(|row| {
252+
ByteArray::from(string_value(column_idx, row_group, row).into_bytes())
253+
});
254+
ParquetStatistics::ByteArray(
255+
ValueStatistics::new(min, max, None, null_count, false)
256+
.with_min_is_exact(min_exact)
257+
.with_max_is_exact(max_exact),
258+
)
192259
}
193260
other => unreachable!("unsupported benchmark data type: {other:?}"),
194261
}
195262
}
196263

197-
fn nullable_value<T>(row: usize, value: T) -> Option<T> {
198-
(!row.is_multiple_of(7)).then_some(value)
264+
fn make_schema(columns: usize) -> SchemaRef {
265+
let fields = (0..columns)
266+
.map(|idx| {
267+
let data_type = match idx % 4 {
268+
0 => DataType::Int64,
269+
1 => DataType::Float64,
270+
2 => DataType::Utf8,
271+
_ => DataType::Int64,
272+
};
273+
Field::new(format!("c{idx:04}"), data_type, true)
274+
})
275+
.collect::<Vec<_>>();
276+
277+
Arc::new(Schema::new(fields))
278+
}
279+
280+
fn first_non_null_row() -> Option<usize> {
281+
(0..ROWS_PER_GROUP).find(|row| !row.is_multiple_of(7))
282+
}
283+
284+
fn last_non_null_row() -> Option<usize> {
285+
(0..ROWS_PER_GROUP).rev().find(|row| !row.is_multiple_of(7))
286+
}
287+
288+
fn null_count_for_rows() -> u64 {
289+
(0..ROWS_PER_GROUP)
290+
.filter(|row| row.is_multiple_of(7))
291+
.count() as u64
199292
}
200293

201294
fn value(column_idx: usize, row_group: usize, row: usize) -> i64 {
202295
(column_idx as i64 * 10_000) + (row_group as i64 * 100) + row as i64
203296
}
204297

298+
fn string_value(column_idx: usize, row_group: usize, row: usize) -> String {
299+
format!("s{column_idx:04}_{row_group:04}_{row:04}")
300+
}
301+
205302
criterion_group!(benches, parquet_metadata_statistics);
206303
criterion_main!(benches);

0 commit comments

Comments
 (0)