Skip to content

Commit b4a6eb1

Browse files
xudong963adriangb
andauthored
Extract parquet push decoder module (apache#22289)
## Which issue does this PR close? Follow-up to apache#22191. ## Rationale for this change This is a code organization follow-up suggested during review of apache#22191. The push decoder setup and stream-driving state now live outside `opener.rs`, making the opener focus on orchestration. ## What changes are included in this PR? - Move `RowFilterGenerator` to `row_filter.rs`, next to `build_row_filter`. - Add `push_decoder.rs` for `DecoderBuilderConfig` and `PushDecoderStreamState`. - Register the new parquet datasource module from `mod.rs`. No behavior change intended. ## Are these changes tested? Existing tests cover the behavior. ## Are there any user-facing changes? No. This is an internal refactor. --------- Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com>
1 parent 102da39 commit b4a6eb1

4 files changed

Lines changed: 341 additions & 273 deletions

File tree

datafusion/datasource-parquet/src/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub mod metadata;
3030
mod metrics;
3131
mod opener;
3232
mod page_filter;
33+
mod push_decoder;
3334
mod reader;
3435
mod row_filter;
3536
mod row_group_filter;

datafusion/datasource-parquet/src/opener.rs

Lines changed: 37 additions & 273 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,17 @@
1919
2020
use crate::access_plan::PreparedAccessPlan;
2121
use crate::page_filter::PagePruningAccessPlanFilter;
22-
use crate::row_filter::{self, ParquetReadPlan, build_projection_read_plan};
22+
use crate::push_decoder::{DecoderBuilderConfig, PushDecoderStreamState};
23+
use crate::row_filter::{RowFilterGenerator, build_projection_read_plan};
2324
use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter};
2425
use crate::{
2526
ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
2627
apply_file_schema_type_coercions, coerce_int96_to_resolution,
2728
};
28-
use arrow::array::{RecordBatch, RecordBatchOptions};
29+
use arrow::array::RecordBatch;
2930
use arrow::datatypes::DataType;
3031
use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer};
31-
use datafusion_physical_expr::projection::{ProjectionExprs, Projector};
32+
use datafusion_physical_expr::projection::ProjectionExprs;
3233
use datafusion_physical_expr::utils::reassign_expr_columns;
3334
use datafusion_physical_expr_adapter::replace_columns_with_literals;
3435
use std::collections::{HashMap, VecDeque};
@@ -39,12 +40,10 @@ use std::pin::Pin;
3940
use std::sync::Arc;
4041
use std::task::{Context, Poll};
4142

42-
use arrow::datatypes::{Schema, SchemaRef, TimeUnit};
43+
use arrow::datatypes::{SchemaRef, TimeUnit};
4344
use datafusion_common::encryption::FileDecryptionProperties;
4445
use datafusion_common::stats::Precision;
45-
use datafusion_common::{
46-
ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, exec_err,
47-
};
46+
use datafusion_common::{ColumnStatistics, Result, ScalarValue, Statistics, exec_err};
4847
use datafusion_datasource::{PartitionedFile, TableSchema};
4948
use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
5049
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
@@ -53,8 +52,8 @@ use datafusion_physical_expr_common::physical_expr::{
5352
};
5453
use datafusion_physical_expr_common::sort_expr::LexOrdering;
5554
use datafusion_physical_plan::metrics::{
56-
BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder,
57-
MetricCategory, PruningMetrics,
55+
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory,
56+
PruningMetrics,
5857
};
5958
use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate};
6059

@@ -66,18 +65,14 @@ use futures::{
6665
FutureExt, Stream, StreamExt, future::BoxFuture, ready, stream::BoxStream,
6766
};
6867
use log::debug;
69-
use parquet::DecodeResult;
7068
use parquet::arrow::ParquetRecordBatchStreamBuilder;
7169
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
72-
use parquet::arrow::arrow_reader::{
73-
ArrowReaderMetadata, ArrowReaderOptions, RowFilter, RowSelectionPolicy,
74-
};
70+
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
7571
use parquet::arrow::async_reader::AsyncFileReader;
7672
use parquet::arrow::parquet_column;
77-
use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
7873
use parquet::basic::Type;
7974
use parquet::bloom_filter::Sbbf;
80-
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
75+
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
8176

8277
/// Stateless Parquet morselizer implementation.
8378
///
@@ -108,7 +103,7 @@ pub(super) struct ParquetMorselizer {
108103
/// Factory for instantiating parquet reader
109104
pub parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
110105
/// Should the filters be evaluated during the parquet scan using
111-
/// [`DataFusionArrowPredicate`](row_filter::DatafusionArrowPredicate)?
106+
/// [`DatafusionArrowPredicate`](crate::row_filter::DatafusionArrowPredicate)?
112107
pub pushdown_filters: bool,
113108
/// Should the filters be reordered to optimize the scan?
114109
pub reorder_filters: bool,
@@ -1158,8 +1153,17 @@ impl RowGroupsPrunedParquetOpen {
11581153
);
11591154

11601155
let (decoder, pending_decoders, remaining_limit) = {
1161-
let mut row_filter_generator =
1162-
RowFilterGenerator::new(&prepared, file_metadata.as_ref());
1156+
let pushdown_predicate = prepared
1157+
.pushdown_filters
1158+
.then_some(prepared.predicate.as_ref())
1159+
.flatten();
1160+
let mut row_filter_generator = RowFilterGenerator::new(
1161+
pushdown_predicate,
1162+
&prepared.physical_file_schema,
1163+
file_metadata.as_ref(),
1164+
prepared.reorder_predicates,
1165+
&prepared.file_metrics,
1166+
);
11631167

11641168
// Split into consecutive runs of row groups that share the same filter
11651169
// requirement. Fully matched row groups skip the RowFilter; others need it.
@@ -1227,272 +1231,32 @@ impl RowGroupsPrunedParquetOpen {
12271231
let output_schema = Arc::clone(&prepared.output_schema);
12281232
let files_ranges_pruned_statistics =
12291233
prepared.file_metrics.files_ranges_pruned_statistics.clone();
1230-
let stream = futures::stream::unfold(
1231-
PushDecoderStreamState {
1232-
decoder,
1233-
pending_decoders,
1234-
remaining_limit,
1235-
reader: prepared.async_file_reader,
1236-
projector,
1237-
output_schema,
1238-
replace_schema,
1239-
arrow_reader_metrics,
1240-
predicate_cache_inner_records,
1241-
predicate_cache_records,
1242-
baseline_metrics: prepared.baseline_metrics,
1243-
},
1244-
|state| async move { state.transition().await },
1245-
)
1246-
.fuse();
1234+
let stream = PushDecoderStreamState {
1235+
decoder,
1236+
pending_decoders,
1237+
remaining_limit,
1238+
reader: prepared.async_file_reader,
1239+
projector,
1240+
output_schema,
1241+
replace_schema,
1242+
arrow_reader_metrics,
1243+
predicate_cache_inner_records,
1244+
predicate_cache_records,
1245+
baseline_metrics: prepared.baseline_metrics,
1246+
}
1247+
.into_stream();
12471248

12481249
// Wrap the stream so a dynamic filter can stop the file scan early.
12491250
if let Some(file_pruner) = prepared.file_pruner {
1250-
let stream = stream.boxed();
12511251
Ok(EarlyStoppingStream::new(
12521252
stream,
12531253
file_pruner,
12541254
files_ranges_pruned_statistics,
12551255
)
12561256
.boxed())
12571257
} else {
1258-
Ok(stream.boxed())
1259-
}
1260-
}
1261-
}
1262-
1263-
/// Builds row filters for decoder runs.
1264-
///
1265-
/// A [`RowFilter`] must be owned by a decoder, so scans split across multiple
1266-
/// decoder runs need a fresh filter for each run that evaluates row predicates.
1267-
struct RowFilterGenerator<'a> {
1268-
predicate: Option<&'a Arc<dyn PhysicalExpr>>,
1269-
physical_file_schema: &'a SchemaRef,
1270-
file_metadata: &'a ParquetMetaData,
1271-
reorder_predicates: bool,
1272-
file_metrics: &'a ParquetFileMetrics,
1273-
first_row_filter: Option<RowFilter>,
1274-
}
1275-
1276-
impl<'a> RowFilterGenerator<'a> {
1277-
fn new(
1278-
prepared: &'a PreparedParquetOpen,
1279-
file_metadata: &'a ParquetMetaData,
1280-
) -> Self {
1281-
let predicate = prepared
1282-
.pushdown_filters
1283-
.then_some(prepared.predicate.as_ref())
1284-
.flatten();
1285-
1286-
let mut generator = Self {
1287-
predicate,
1288-
physical_file_schema: &prepared.physical_file_schema,
1289-
file_metadata,
1290-
reorder_predicates: prepared.reorder_predicates,
1291-
file_metrics: &prepared.file_metrics,
1292-
first_row_filter: None,
1293-
};
1294-
generator.first_row_filter = generator.build_row_filter();
1295-
generator
1296-
}
1297-
1298-
fn has_row_filter(&self) -> bool {
1299-
self.first_row_filter.is_some()
1300-
}
1301-
1302-
fn next_filter(&mut self) -> Option<RowFilter> {
1303-
self.first_row_filter
1304-
.take()
1305-
.or_else(|| self.build_row_filter())
1306-
}
1307-
1308-
fn build_row_filter(&self) -> Option<RowFilter> {
1309-
let predicate = self.predicate?;
1310-
match row_filter::build_row_filter(
1311-
predicate,
1312-
self.physical_file_schema,
1313-
self.file_metadata,
1314-
self.reorder_predicates,
1315-
self.file_metrics,
1316-
) {
1317-
Ok(Some(filter)) => Some(filter),
1318-
Ok(None) => None,
1319-
Err(e) => {
1320-
debug!("Ignoring error building row filter for '{predicate:?}': {e}");
1321-
None
1322-
}
1323-
}
1324-
}
1325-
}
1326-
1327-
/// State shared while building [`ParquetPushDecoder`]s for one file scan.
1328-
///
1329-
/// A scan can be split into multiple decoder runs when row groups have
1330-
/// different filtering requirements. This config holds the options that apply
1331-
/// to every [`ParquetPushDecoderBuilder`], while each run supplies its own
1332-
/// [`PreparedAccessPlan`] and optional row filter.
1333-
struct DecoderBuilderConfig<'a> {
1334-
read_plan: &'a ParquetReadPlan,
1335-
batch_size: usize,
1336-
arrow_reader_metrics: &'a ArrowReaderMetrics,
1337-
force_filter_selections: bool,
1338-
decoder_limit: Option<usize>,
1339-
}
1340-
1341-
impl DecoderBuilderConfig<'_> {
1342-
fn build(
1343-
&self,
1344-
prepared_access_plan: PreparedAccessPlan,
1345-
metadata: ArrowReaderMetadata,
1346-
) -> ParquetPushDecoderBuilder {
1347-
let mut builder = ParquetPushDecoderBuilder::new_with_metadata(metadata)
1348-
.with_projection(self.read_plan.projection_mask.clone())
1349-
.with_batch_size(self.batch_size)
1350-
.with_metrics(self.arrow_reader_metrics.clone());
1351-
if self.force_filter_selections {
1352-
builder = builder.with_row_selection_policy(RowSelectionPolicy::Selectors);
1353-
}
1354-
if let Some(row_selection) = prepared_access_plan.row_selection {
1355-
builder = builder.with_row_selection(row_selection);
1356-
}
1357-
builder = builder.with_row_groups(prepared_access_plan.row_group_indexes);
1358-
if let Some(limit) = self.decoder_limit {
1359-
builder = builder.with_limit(limit);
1360-
}
1361-
builder
1362-
}
1363-
}
1364-
1365-
/// State for a stream that decodes a single Parquet file using a push-based decoder.
1366-
///
1367-
/// The [`transition`](Self::transition) method drives the decoder in a loop: it requests
1368-
/// byte ranges from the [`AsyncFileReader`], pushes the fetched data into the
1369-
/// [`ParquetPushDecoder`], and yields projected [`RecordBatch`]es until the file is
1370-
/// fully consumed.
1371-
struct PushDecoderStreamState {
1372-
decoder: ParquetPushDecoder,
1373-
/// Additional decoders to process after the current one finishes.
1374-
/// Used when fully matched row groups split the scan into consecutive
1375-
/// runs with different filter configurations, maintaining original order.
1376-
pending_decoders: VecDeque<ParquetPushDecoder>,
1377-
/// Global remaining row limit across all decoder runs.
1378-
///
1379-
/// Decoder-local limits are only safe for single-run scans. When the scan
1380-
/// is split across multiple decoders, the combined stream limit is enforced
1381-
/// here instead.
1382-
remaining_limit: Option<usize>,
1383-
reader: Box<dyn AsyncFileReader>,
1384-
projector: Projector,
1385-
output_schema: Arc<Schema>,
1386-
replace_schema: bool,
1387-
arrow_reader_metrics: ArrowReaderMetrics,
1388-
predicate_cache_inner_records: Gauge,
1389-
predicate_cache_records: Gauge,
1390-
baseline_metrics: BaselineMetrics,
1391-
}
1392-
1393-
impl PushDecoderStreamState {
1394-
/// Advances the decoder state machine until the next [`RecordBatch`] is
1395-
/// produced, the file is fully consumed, or an error occurs.
1396-
///
1397-
/// On each iteration the decoder is polled via [`ParquetPushDecoder::try_decode`]:
1398-
/// - [`NeedsData`](DecodeResult::NeedsData) – the requested byte ranges are
1399-
/// fetched from the [`AsyncFileReader`] and fed back into the decoder.
1400-
/// - [`Data`](DecodeResult::Data) – a decoded batch is projected and returned.
1401-
/// - [`Finished`](DecodeResult::Finished) – signals end-of-stream (`None`).
1402-
///
1403-
/// Takes `self` by value (rather than `&mut self`) so the generated future
1404-
/// owns the state directly. This avoids a Stacked Borrows violation under
1405-
/// miri where `&mut self` creates a single opaque borrow that conflicts
1406-
/// with `unfold`'s ownership across yield points.
1407-
async fn transition(mut self) -> Option<(Result<RecordBatch>, Self)> {
1408-
loop {
1409-
if self.remaining_limit == Some(0) {
1410-
return None;
1411-
}
1412-
match self.decoder.try_decode() {
1413-
Ok(DecodeResult::NeedsData(ranges)) => {
1414-
let data = self
1415-
.reader
1416-
.get_byte_ranges(ranges.clone())
1417-
.await
1418-
.map_err(DataFusionError::from);
1419-
match data {
1420-
Ok(data) => {
1421-
if let Err(e) = self.decoder.push_ranges(ranges, data) {
1422-
return Some((Err(DataFusionError::from(e)), self));
1423-
}
1424-
}
1425-
Err(e) => return Some((Err(e), self)),
1426-
}
1427-
}
1428-
Ok(DecodeResult::Data(batch)) => {
1429-
let batch = if let Some(remaining_limit) = self.remaining_limit {
1430-
if batch.num_rows() > remaining_limit {
1431-
self.remaining_limit = Some(0);
1432-
batch.slice(0, remaining_limit)
1433-
} else {
1434-
self.remaining_limit =
1435-
Some(remaining_limit - batch.num_rows());
1436-
batch
1437-
}
1438-
} else {
1439-
batch
1440-
};
1441-
let mut timer = self.baseline_metrics.elapsed_compute().timer();
1442-
self.copy_arrow_reader_metrics();
1443-
let result = self.project_batch(&batch);
1444-
timer.stop();
1445-
// Release the borrow on baseline_metrics before moving self
1446-
drop(timer);
1447-
return Some((result, self));
1448-
}
1449-
Ok(DecodeResult::Finished) => {
1450-
// If there are pending decoders (e.g. for consecutive runs
1451-
// with different filter configurations), switch to the next.
1452-
if let Some(next) = self.pending_decoders.pop_front() {
1453-
self.decoder = next;
1454-
continue;
1455-
}
1456-
return None;
1457-
}
1458-
Err(e) => {
1459-
return Some((Err(DataFusionError::from(e)), self));
1460-
}
1461-
}
1462-
}
1463-
}
1464-
1465-
/// Copies metrics from ArrowReaderMetrics (the metrics collected by the
1466-
/// arrow-rs parquet reader) to the parquet file metrics for DataFusion
1467-
fn copy_arrow_reader_metrics(&self) {
1468-
if let Some(v) = self.arrow_reader_metrics.records_read_from_inner() {
1469-
self.predicate_cache_inner_records.set(v);
1470-
}
1471-
if let Some(v) = self.arrow_reader_metrics.records_read_from_cache() {
1472-
self.predicate_cache_records.set(v);
1473-
}
1474-
}
1475-
1476-
fn project_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
1477-
let mut batch = self.projector.project_batch(batch)?;
1478-
if self.replace_schema {
1479-
// Ensure the output batch has the expected schema.
1480-
// This handles things like schema level and field level metadata, which may not be present
1481-
// in the physical file schema.
1482-
// It is also possible for nullability to differ; some writers create files with
1483-
// OPTIONAL fields even when there are no nulls in the data.
1484-
// In these cases it may make sense for the logical schema to be `NOT NULL`.
1485-
// RecordBatch::try_new_with_options checks that if the schema is NOT NULL
1486-
// the array cannot contain nulls, amongst other checks.
1487-
let (_stream_schema, arrays, num_rows) = batch.into_parts();
1488-
let options = RecordBatchOptions::new().with_row_count(Some(num_rows));
1489-
batch = RecordBatch::try_new_with_options(
1490-
Arc::clone(&self.output_schema),
1491-
arrays,
1492-
&options,
1493-
)?;
1258+
Ok(stream)
14941259
}
1495-
Ok(batch)
14961260
}
14971261
}
14981262

0 commit comments

Comments
 (0)