Skip to content

Commit 17679e3

Browse files
committed
refactor(parquet-datasource): mechanical cleanup — split opener.rs, file_format.rs, row_group_filter.rs
A series of pure code-motion splits to make the parquet datasource crate easier to navigate and modify. No public API changes (existing import paths preserved via re-exports), no behavior changes. Five separate moves: 1. `opener.rs` (~2,700 LOC) → `opener/` module: - `opener/early_stop.rs` — `EarlyStoppingStream`, the dynamic-filter early-termination wrapper used at the end of `build_stream`. - `opener/encryption.rs` — `EncryptionContext` + the `ParquetMorselizer::get_encryption_context` helpers. Isolates the `#[cfg(feature = "parquet_encryption")]` gating. - `opener/push_decoder_stream.rs` — `PushDecoderStreamState`, the inner stream state machine that drives a `ParquetPushDecoder`. 2. `file_format.rs` (~2,000 LOC) split: - `sink.rs` — `ParquetSink` + the parallel-write machinery (`column_serializer_task`, `spawn_column_parallel_row_group_writer`, `output_single_parquet_file_parallelized`, etc.). The historical `file_format::ParquetSink` path is preserved via `pub use`. - `schema_coercion.rs` — Arrow-schema coercion utilities (`apply_file_schema_type_coercions`, `coerce_int96_to_resolution`, `coerce_file_schema_to_view_type`, `coerce_file_schema_to_string_type`, `transform_schema_to_view`, `transform_binary_to_string`, `field_with_new_type`) and their tests. Re-exported at the crate root for backward compat. 3. `row_group_filter.rs` (~1,900 LOC): - `bloom_filter.rs` — `BloomFilterStatistics` (the loaded SBBF data + its `PruningStatistics` adapter). Separates "data we loaded from the file" from "the access-plan filter that consumes it". After this PR: - `opener.rs`: 2,717 → 2,433 LOC (-10%; further drop possible by extracting the test module, but tests stay near the code under test for now). - `file_format.rs`: 2,038 → 633 LOC (-69%). - `row_group_filter.rs`: 1,929 → 1,756 LOC (-9%; bloom code moved out, the file is now focused entirely on `RowGroupAccessPlanFilter`).
1 parent 0c4ace8 commit 17679e3

10 files changed

Lines changed: 2031 additions & 1825 deletions

File tree

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Loaded Parquet Split Block Bloom Filter (SBBF) data, with a
19+
//! [`PruningStatistics`] adapter so the predicate-pruning machinery in
20+
//! [`datafusion_pruning`] can consume it.
21+
22+
use std::collections::{HashMap, HashSet};
23+
24+
use arrow::array::{ArrayRef, BooleanArray};
25+
use datafusion_common::pruning::PruningStatistics;
26+
use datafusion_common::{Column, ScalarValue};
27+
use parquet::basic::Type;
28+
use parquet::bloom_filter::Sbbf;
29+
use parquet::data_type::Decimal;
30+
31+
/// In memory Parquet Split Block Bloom Filters (SBBF).
32+
///
33+
/// This structure implements [`PruningStatistics`] and is used to prune
34+
/// Parquet row groups and data pages based on the query predicate.
35+
#[derive(Debug, Clone, Default)]
36+
pub(crate) struct BloomFilterStatistics {
37+
/// Per-column Bloom filters
38+
/// Key: predicate column name
39+
/// Value:
40+
/// * [`Sbbf`] (Bloom filter),
41+
/// * Parquet physical [`Type`] needed to evaluate literals against the filter
42+
pub(crate) column_sbbf: HashMap<String, (Sbbf, Type)>,
43+
}
44+
45+
impl BloomFilterStatistics {
46+
/// Create an empty [`BloomFilterStatistics`]
47+
pub(crate) fn new() -> Self {
48+
Default::default()
49+
}
50+
51+
/// Create an empty [`BloomFilterStatistics`] with the specified capacity
52+
pub(crate) fn with_capacity(capacity: usize) -> Self {
53+
Self {
54+
column_sbbf: HashMap::with_capacity(capacity),
55+
}
56+
}
57+
58+
/// Add a Bloom filter and type for the specified column
59+
pub(crate) fn insert(&mut self, column: impl Into<String>, sbbf: Sbbf, ty: Type) {
60+
self.column_sbbf.insert(column.into(), (sbbf, ty));
61+
}
62+
63+
/// Helper function for checking if [`Sbbf`] filter contains [`ScalarValue`].
64+
///
65+
/// In case the type of scalar is not supported, returns `true`, assuming that the
66+
/// value may be present.
67+
fn check_scalar(sbbf: &Sbbf, value: &ScalarValue, parquet_type: &Type) -> bool {
68+
match value {
69+
ScalarValue::Utf8(Some(v))
70+
| ScalarValue::Utf8View(Some(v))
71+
| ScalarValue::LargeUtf8(Some(v)) => sbbf.check(&v.as_str()),
72+
ScalarValue::Binary(Some(v))
73+
| ScalarValue::BinaryView(Some(v))
74+
| ScalarValue::LargeBinary(Some(v)) => sbbf.check(v),
75+
ScalarValue::FixedSizeBinary(_size, Some(v)) => sbbf.check(v),
76+
ScalarValue::Boolean(Some(v)) => sbbf.check(v),
77+
ScalarValue::Float64(Some(v)) => sbbf.check(v),
78+
ScalarValue::Float32(Some(v)) => sbbf.check(v),
79+
ScalarValue::Int64(Some(v)) => sbbf.check(v),
80+
ScalarValue::Int32(Some(v)) => sbbf.check(v),
81+
ScalarValue::UInt64(Some(v)) => sbbf.check(v),
82+
ScalarValue::UInt32(Some(v)) => sbbf.check(v),
83+
ScalarValue::Decimal128(Some(v), p, s) => match parquet_type {
84+
Type::INT32 => {
85+
//https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/Encodings.md?plain=1#L35-L42
86+
// All physical type are little-endian
87+
if *p > 9 {
88+
//DECIMAL can be used to annotate the following types:
89+
//
90+
// int32: for 1 <= precision <= 9
91+
// int64: for 1 <= precision <= 18
92+
return true;
93+
}
94+
let b = (*v as i32).to_le_bytes();
95+
// Use Decimal constructor after https://github.com/apache/arrow-rs/issues/5325
96+
let decimal = Decimal::Int32 {
97+
value: b,
98+
precision: *p as i32,
99+
scale: *s as i32,
100+
};
101+
sbbf.check(&decimal)
102+
}
103+
Type::INT64 => {
104+
if *p > 18 {
105+
return true;
106+
}
107+
let b = (*v as i64).to_le_bytes();
108+
let decimal = Decimal::Int64 {
109+
value: b,
110+
precision: *p as i32,
111+
scale: *s as i32,
112+
};
113+
sbbf.check(&decimal)
114+
}
115+
Type::FIXED_LEN_BYTE_ARRAY => {
116+
// keep with from_bytes_to_i128
117+
let b = v.to_be_bytes().to_vec();
118+
// Use Decimal constructor after https://github.com/apache/arrow-rs/issues/5325
119+
let decimal = Decimal::Bytes {
120+
value: b.into(),
121+
precision: *p as i32,
122+
scale: *s as i32,
123+
};
124+
sbbf.check(&decimal)
125+
}
126+
_ => true,
127+
},
128+
ScalarValue::Dictionary(_, inner) => {
129+
BloomFilterStatistics::check_scalar(sbbf, inner, parquet_type)
130+
}
131+
_ => true,
132+
}
133+
}
134+
}
135+
136+
impl PruningStatistics for BloomFilterStatistics {
137+
fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
138+
None
139+
}
140+
141+
fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
142+
None
143+
}
144+
145+
fn num_containers(&self) -> usize {
146+
1
147+
}
148+
149+
fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
150+
None
151+
}
152+
153+
fn row_counts(&self) -> Option<ArrayRef> {
154+
None
155+
}
156+
157+
/// Use bloom filters to determine if we are sure this column can not
158+
/// possibly contain `values`
159+
///
160+
/// The `contained` API returns false if the bloom filters knows that *ALL*
161+
/// of the values in a column are not present.
162+
fn contained(
163+
&self,
164+
column: &Column,
165+
values: &HashSet<ScalarValue>,
166+
) -> Option<BooleanArray> {
167+
let (sbbf, parquet_type) = self.column_sbbf.get(column.name.as_str())?;
168+
169+
// Bloom filters are probabilistic data structures that can return false
170+
// positives (i.e. it might return true even if the value is not
171+
// present) however, the bloom filter will return `false` if the value is
172+
// definitely not present.
173+
174+
let known_not_present = values
175+
.iter()
176+
.map(|value| BloomFilterStatistics::check_scalar(sbbf, value, parquet_type))
177+
// The row group doesn't contain any of the values if
178+
// all the checks are false
179+
.all(|v| !v);
180+
181+
let contains = if known_not_present {
182+
Some(false)
183+
} else {
184+
// Given the bloom filter is probabilistic, we can't be sure that
185+
// the row group actually contains the values. Return `None` to
186+
// indicate this uncertainty
187+
None
188+
};
189+
190+
Some(BooleanArray::from(vec![contains]))
191+
}
192+
}

0 commit comments

Comments
 (0)