Skip to content

Commit 095265f

Browse files
authored
fix: Parquet bloom filter pruning can incorrectly filter decimals encoded as FIXED_LEN_BYTE_ARRAY (#22995)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #22994. ## Rationale for this change Parquet bloom filter pruning can incorrectly prune decimal columns encoded as `FIXED_LEN_BYTE_ARRAY`. Bloom filters are checked against the physical bytes stored in the Parquet file. For `FIXED_LEN_BYTE_ARRAY`, the byte width comes from the Parquet column descriptor's `type_length`. DataFusion was checking decimal literals using a fixed-width integer byte representation, which can differ from thefile's fixed byte width and cause false negatives. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> - Carry the Parquet column `type_length` together with the bloom filter metadata. - Use `type_length` when checking decimal literals against `FIXED_LEN_BYTE_ARRAY` bloom filters. - Fall back to conservative pruning behavior when the fixed byte length cannot be represented safely. - Add a regression test for fixed-length decimal bloom filter pruning. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes. ## Are there any user-facing changes? No API changes. This fixes incorrect query results when Parquet bloom filter pruning is enabled for fixed-length decimal columns. <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 88273eb commit 095265f

2 files changed

Lines changed: 187 additions & 21 deletions

File tree

datafusion/datasource-parquet/src/bloom_filter.rs

Lines changed: 176 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,18 @@ use parquet::data_type::Decimal;
3434
/// Parquet row groups and data pages based on the query predicate.
3535
#[derive(Debug, Clone, Default)]
3636
pub(crate) struct BloomFilterStatistics {
37-
/// Per-column Bloom filters
38-
/// Key: predicate column name
39-
/// Value:
40-
/// * [`Sbbf`] (Bloom filter),
41-
/// * Parquet physical [`Type`] needed to evaluate literals against the filter
42-
column_sbbf: HashMap<String, (Sbbf, Type)>,
37+
/// Per-column Bloom filters keyed by predicate column name.
38+
column_sbbf: HashMap<String, ColumnBloomFilter>,
39+
}
40+
41+
#[derive(Debug, Clone)]
42+
struct ColumnBloomFilter {
43+
/// [`Sbbf`] (Bloom filter).
44+
sbbf: Sbbf,
45+
/// Parquet physical [`Type`] needed to evaluate literals against the filter.
46+
physical_type: Type,
47+
/// Type length from the Parquet column descriptor.
48+
type_length: i32,
4349
}
4450

4551
impl BloomFilterStatistics {
@@ -56,15 +62,33 @@ impl BloomFilterStatistics {
5662
}
5763

5864
/// Add a Bloom filter and type for the specified column
59-
pub(crate) fn insert(&mut self, column: impl Into<String>, sbbf: Sbbf, ty: Type) {
60-
self.column_sbbf.insert(column.into(), (sbbf, ty));
65+
pub(crate) fn insert(
66+
&mut self,
67+
column: impl Into<String>,
68+
sbbf: Sbbf,
69+
ty: Type,
70+
type_length: i32,
71+
) {
72+
self.column_sbbf.insert(
73+
column.into(),
74+
ColumnBloomFilter {
75+
sbbf,
76+
physical_type: ty,
77+
type_length,
78+
},
79+
);
6180
}
6281

6382
/// Helper function for checking if [`Sbbf`] filter contains [`ScalarValue`].
6483
///
6584
/// In case the type of scalar is not supported, returns `true`, assuming that the
6685
/// value may be present.
67-
fn check_scalar(sbbf: &Sbbf, value: &ScalarValue, parquet_type: &Type) -> bool {
86+
fn check_scalar(
87+
sbbf: &Sbbf,
88+
value: &ScalarValue,
89+
parquet_type: &Type,
90+
type_length: i32,
91+
) -> bool {
6892
match value {
6993
ScalarValue::Utf8(Some(v))
7094
| ScalarValue::Utf8View(Some(v))
@@ -113,8 +137,14 @@ impl BloomFilterStatistics {
113137
sbbf.check(&decimal)
114138
}
115139
Type::FIXED_LEN_BYTE_ARRAY => {
116-
// keep with from_bytes_to_i128
117-
let b = v.to_be_bytes().to_vec();
140+
let Ok(type_length) = usize::try_from(type_length) else {
141+
return true;
142+
};
143+
if type_length == 0 || type_length > 16 {
144+
return true;
145+
}
146+
let b = v.to_be_bytes();
147+
let b = b[(b.len() - type_length)..].to_vec();
118148
// Use Decimal constructor after https://github.com/apache/arrow-rs/issues/5325
119149
let decimal = Decimal::Bytes {
120150
value: b.into(),
@@ -125,9 +155,12 @@ impl BloomFilterStatistics {
125155
}
126156
_ => true,
127157
},
128-
ScalarValue::Dictionary(_, inner) => {
129-
BloomFilterStatistics::check_scalar(sbbf, inner, parquet_type)
130-
}
158+
ScalarValue::Dictionary(_, inner) => BloomFilterStatistics::check_scalar(
159+
sbbf,
160+
inner,
161+
parquet_type,
162+
type_length,
163+
),
131164
_ => true,
132165
}
133166
}
@@ -164,7 +197,7 @@ impl PruningStatistics for BloomFilterStatistics {
164197
column: &Column,
165198
values: &HashSet<ScalarValue>,
166199
) -> Option<BooleanArray> {
167-
let (sbbf, parquet_type) = self.column_sbbf.get(column.name.as_str())?;
200+
let column_bloom_filter = self.column_sbbf.get(column.name.as_str())?;
168201

169202
// Bloom filters are probabilistic data structures that can return false
170203
// positives (i.e. it might return true even if the value is not
@@ -173,7 +206,14 @@ impl PruningStatistics for BloomFilterStatistics {
173206

174207
let known_not_present = values
175208
.iter()
176-
.map(|value| BloomFilterStatistics::check_scalar(sbbf, value, parquet_type))
209+
.map(|value| {
210+
BloomFilterStatistics::check_scalar(
211+
&column_bloom_filter.sbbf,
212+
value,
213+
&column_bloom_filter.physical_type,
214+
column_bloom_filter.type_length,
215+
)
216+
})
177217
// The row group doesn't contain any of the values if
178218
// all the checks are false
179219
.all(|v| !v);
@@ -201,15 +241,19 @@ mod tests {
201241
use crate::test_util::ExpectedPruning;
202242
use crate::{ParquetAccessPlan, ParquetFileMetrics, RowGroupAccessPlanFilter};
203243

244+
use arrow::array::Decimal128Array;
204245
use arrow::datatypes::{DataType, Field, Schema};
246+
use bytes::{BufMut, BytesMut};
205247
use datafusion_common::Result;
206248
use datafusion_expr::{Expr, col, lit};
207249
use datafusion_physical_expr::planner::logical2physical;
208250
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
209251
use datafusion_pruning::PruningPredicate;
210252
use object_store::ObjectStoreExt;
253+
use parquet::arrow::ArrowWriter;
211254
use parquet::arrow::ParquetRecordBatchStreamBuilder;
212255
use parquet::arrow::async_reader::ParquetObjectReader;
256+
use parquet::file::properties::{EnabledStatistics, WriterProperties};
213257

214258
#[tokio::test]
215259
async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() {
@@ -375,6 +419,82 @@ mod tests {
375419
.await
376420
}
377421

422+
#[tokio::test]
423+
async fn test_row_group_bloom_filter_pruning_predicate_decimal128() {
424+
for precision in [19, 20, 21, 28, 38] {
425+
let scale = 2;
426+
let data = parquet_decimal128_with_bloom_filter(
427+
precision,
428+
scale,
429+
vec![100, 200, 300, 400, 500, 600],
430+
);
431+
let schema = Schema::new(vec![Field::new(
432+
"decimal_col",
433+
DataType::Decimal128(precision, scale),
434+
true,
435+
)]);
436+
let expr = col("decimal_col").eq(Expr::Literal(
437+
ScalarValue::Decimal128(Some(500), precision, scale),
438+
None,
439+
));
440+
let expr = logical2physical(&expr, &schema);
441+
let pruning_predicate =
442+
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
443+
444+
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
445+
&format!("decimal128-{precision}.parquet"),
446+
data,
447+
&pruning_predicate,
448+
)
449+
.await
450+
.unwrap();
451+
452+
assert_eq!(
453+
pruned_row_groups.access_plan().row_group_indexes(),
454+
vec![2],
455+
"precision {precision}"
456+
);
457+
}
458+
}
459+
460+
#[tokio::test]
461+
async fn test_row_group_bloom_filter_pruning_predicate_negative_decimal128() {
462+
for precision in [19, 20, 21, 28, 38] {
463+
let scale = 2;
464+
let data = parquet_decimal128_with_bloom_filter(
465+
precision,
466+
scale,
467+
vec![-100, -200, -300, -400, -500, -600],
468+
);
469+
let schema = Schema::new(vec![Field::new(
470+
"decimal_col",
471+
DataType::Decimal128(precision, scale),
472+
true,
473+
)]);
474+
let expr = col("decimal_col").eq(Expr::Literal(
475+
ScalarValue::Decimal128(Some(-500), precision, scale),
476+
None,
477+
));
478+
let expr = logical2physical(&expr, &schema);
479+
let pruning_predicate =
480+
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
481+
482+
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
483+
&format!("negative-decimal128-{precision}.parquet"),
484+
data,
485+
&pruning_predicate,
486+
)
487+
.await
488+
.unwrap();
489+
490+
assert_eq!(
491+
pruned_row_groups.access_plan().row_group_indexes(),
492+
vec![2],
493+
"precision {precision}"
494+
);
495+
}
496+
}
497+
378498
struct BloomFilterTest {
379499
file_name: String,
380500
schema: Schema,
@@ -467,6 +587,37 @@ mod tests {
467587
}
468588
}
469589

590+
fn parquet_decimal128_with_bloom_filter(
591+
precision: u8,
592+
scale: i8,
593+
values: Vec<i128>,
594+
) -> bytes::Bytes {
595+
let schema = Arc::new(Schema::new(vec![Field::new(
596+
"decimal_col",
597+
DataType::Decimal128(precision, scale),
598+
true,
599+
)]));
600+
let array = Arc::new(
601+
Decimal128Array::from(values)
602+
.with_precision_and_scale(precision, scale)
603+
.unwrap(),
604+
) as ArrayRef;
605+
let batch =
606+
arrow::array::RecordBatch::try_new(schema.clone(), vec![array]).unwrap();
607+
let props = WriterProperties::builder()
608+
.set_max_row_group_row_count(Some(2))
609+
.set_bloom_filter_enabled(true)
610+
.set_statistics_enabled(EnabledStatistics::None)
611+
.build();
612+
let mut out = BytesMut::new().writer();
613+
{
614+
let mut writer = ArrowWriter::try_new(&mut out, schema, Some(props)).unwrap();
615+
writer.write(&batch).unwrap();
616+
writer.finish().unwrap();
617+
}
618+
out.into_inner().freeze()
619+
}
620+
470621
/// Evaluates the pruning predicate on the specified row groups and returns the row groups that are left
471622
async fn test_row_group_bloom_filter_pruning_predicate(
472623
file_name: &str,
@@ -520,6 +671,7 @@ mod tests {
520671
column_name.to_string(),
521672
column_idx,
522673
builder.parquet_schema().column(column_idx).physical_type(),
674+
builder.parquet_schema().column(column_idx).type_length(),
523675
))
524676
})
525677
.collect::<Vec<_>>();
@@ -532,7 +684,8 @@ mod tests {
532684
for idx in pruned_row_groups.row_group_indexes() {
533685
let mut bloom_filters =
534686
BloomFilterStatistics::with_capacity(parquet_columns.len());
535-
for (column_name, column_idx, physical_type) in &parquet_columns {
687+
for (column_name, column_idx, physical_type, type_length) in &parquet_columns
688+
{
536689
let bf = match builder
537690
.get_row_group_column_bloom_filter(idx, *column_idx)
538691
.await
@@ -545,7 +698,12 @@ mod tests {
545698
continue;
546699
}
547700
};
548-
bloom_filters.insert(column_name.clone(), bf, *physical_type);
701+
bloom_filters.insert(
702+
column_name.clone(),
703+
bf,
704+
*physical_type,
705+
*type_length,
706+
);
549707
}
550708
row_group_bloom_filters[idx] = bloom_filters;
551709
}

datafusion/datasource-parquet/src/opener/mod.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,7 +1170,7 @@ impl RowGroupsPrunedParquetOpen {
11701170
mem::replace(&mut prepared.async_file_reader, replacement_reader),
11711171
reader_metadata,
11721172
);
1173-
let parquet_columns: Vec<(String, usize, Type)> = predicate
1173+
let parquet_columns: Vec<(String, usize, Type, i32)> = predicate
11741174
.literal_columns()
11751175
.into_iter()
11761176
.filter_map(|column_name| {
@@ -1184,14 +1184,17 @@ impl RowGroupsPrunedParquetOpen {
11841184
column_name,
11851185
column_idx,
11861186
parquet_schema.column(column_idx).physical_type(),
1187+
parquet_schema.column(column_idx).type_length(),
11871188
))
11881189
})
11891190
.collect();
11901191

11911192
for idx in self.row_groups.row_group_indexes() {
11921193
let mut row_group_filters =
11931194
BloomFilterStatistics::with_capacity(parquet_columns.len());
1194-
for (column_name, column_idx, physical_type) in &parquet_columns {
1195+
for (column_name, column_idx, physical_type, type_length) in
1196+
&parquet_columns
1197+
{
11951198
let bf: Sbbf = match builder
11961199
.get_row_group_column_bloom_filter(idx, *column_idx)
11971200
.await
@@ -1204,7 +1207,12 @@ impl RowGroupsPrunedParquetOpen {
12041207
continue;
12051208
}
12061209
};
1207-
row_group_filters.insert(column_name, bf, *physical_type);
1210+
row_group_filters.insert(
1211+
column_name,
1212+
bf,
1213+
*physical_type,
1214+
*type_length,
1215+
);
12081216
}
12091217
row_group_bloom_filters[idx] = row_group_filters;
12101218
}

0 commit comments

Comments
 (0)