Skip to content

Commit 580b0ab

Browse files
Use leaf level ProjectionMask for parquet projections (#20925)
- Added on from #20913 Please review from the third commit ## Rationale for this change This PR reuses the `ParquetReadPlan` (introduced for the row filter pushdown) to also resolve projection expressions to parquet leaf column indices Previously, projecting a single field from a struct with many children would read all leaves of that struct. This aligns the projection path with the row filter path, which already had leaf-level struct pruning
1 parent 37cd3de commit 580b0ab

3 files changed

Lines changed: 330 additions & 12 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! [`ParquetOpener`] for opening Parquet files
1919
2020
use crate::page_filter::PagePruningAccessPlanFilter;
21+
use crate::row_filter::build_projection_read_plan;
2122
use crate::row_group_filter::RowGroupAccessPlanFilter;
2223
use crate::{
2324
ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
@@ -59,13 +60,13 @@ use datafusion_execution::parquet_encryption::EncryptionFactory;
5960
use futures::{Stream, StreamExt, ready};
6061
use log::debug;
6162
use parquet::DecodeResult;
63+
use parquet::arrow::ParquetRecordBatchStreamBuilder;
6264
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
6365
use parquet::arrow::arrow_reader::{
6466
ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy,
6567
};
6668
use parquet::arrow::async_reader::AsyncFileReader;
6769
use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
68-
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
6970
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
7071

7172
/// Implements [`FileOpener`] for a parquet file
@@ -583,12 +584,14 @@ impl FileOpener for ParquetOpener {
583584
// metrics from the arrow reader itself
584585
let arrow_reader_metrics = ArrowReaderMetrics::enabled();
585586

586-
let indices = projection.column_indices();
587-
let mask =
588-
ProjectionMask::roots(reader_metadata.parquet_schema(), indices.clone());
587+
let read_plan = build_projection_read_plan(
588+
projection.expr_iter(),
589+
&physical_file_schema,
590+
reader_metadata.parquet_schema(),
591+
);
589592

590593
let decoder = builder
591-
.with_projection(mask)
594+
.with_projection(read_plan.projection_mask)
592595
.with_metrics(arrow_reader_metrics.clone())
593596
.build()?;
594597

@@ -601,7 +604,7 @@ impl FileOpener for ParquetOpener {
601604
// Rebase column indices to match the narrowed stream schema.
602605
// The projection expressions have indices based on physical_file_schema,
603606
// but the stream only contains the columns selected by the ProjectionMask.
604-
let stream_schema = Arc::new(physical_file_schema.project(&indices)?);
607+
let stream_schema = read_plan.projected_schema;
605608
let replace_schema = stream_schema != output_schema;
606609
let projection = projection
607610
.try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?;

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 246 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ use datafusion_common::cast::as_boolean_array;
8383
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
8484
use datafusion_physical_expr::ScalarFunctionExpr;
8585
use datafusion_physical_expr::expressions::{Column, Literal};
86-
use datafusion_physical_expr::utils::reassign_expr_columns;
86+
use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
8787
use datafusion_physical_expr::{PhysicalExpr, split_conjunction};
8888

8989
use datafusion_physical_plan::metrics;
@@ -424,10 +424,26 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
424424
.first()
425425
.and_then(|a| a.as_any().downcast_ref::<Column>())
426426
{
427+
// for Map columns, get_field performs a runtime key lookup rather than a
428+
// schema-level field access so the entire Map column must be read,
429+
// we skip the struct field optimization and defer to normal Column traversal
430+
let is_map_column = self
431+
.file_schema
432+
.index_of(column.name())
433+
.ok()
434+
.map(|idx| {
435+
matches!(
436+
self.file_schema.field(idx).data_type(),
437+
DataType::Map(_, _)
438+
)
439+
})
440+
.unwrap_or(false);
441+
427442
let return_type = func.return_type();
428443

429-
if !DataType::is_nested(return_type)
430-
|| self.is_nested_type_supported(return_type)
444+
if !is_map_column
445+
&& (!DataType::is_nested(return_type)
446+
|| self.is_nested_type_supported(return_type))
431447
{
432448
// try to resolve all field name arguments to strinrg literals
433449
// if any argument is not a string literal, we can not determine the exact
@@ -579,6 +595,136 @@ pub(crate) fn build_parquet_read_plan(
579595
)))
580596
}
581597

598+
/// Builds a unified [`ParquetReadPlan`] for a set of projection expressions
599+
///
600+
/// Unlike [`build_parquet_read_plan`] (which is used for filter pushdown and
601+
/// returns `None` when an expression references unsupported nested types or
602+
/// missing columns), this function always succeeds. It collects every column
603+
/// that *can* be resolved in the file and produces a leaf-level projection
604+
/// mask. Columns missing from the file are silently skipped since the projection
605+
/// layer handles those by inserting nulls.
606+
pub(crate) fn build_projection_read_plan(
607+
exprs: impl IntoIterator<Item = Arc<dyn PhysicalExpr>>,
608+
file_schema: &Schema,
609+
schema_descr: &SchemaDescriptor,
610+
) -> ParquetReadPlan {
611+
// fast path: if every expression is a plain Column reference, skip all
612+
// struct analysis and use root-level projection directly
613+
let exprs = exprs.into_iter().collect::<Vec<_>>();
614+
let all_plain_columns = exprs
615+
.iter()
616+
.all(|e| e.as_any().downcast_ref::<Column>().is_some());
617+
618+
if all_plain_columns {
619+
let mut root_indices: Vec<usize> = exprs
620+
.iter()
621+
.map(|e| e.as_any().downcast_ref::<Column>().unwrap().index())
622+
.collect();
623+
root_indices.sort_unstable();
624+
root_indices.dedup();
625+
626+
let projection_mask =
627+
ProjectionMask::roots(schema_descr, root_indices.iter().copied());
628+
let projected_schema = Arc::new(
629+
file_schema
630+
.project(&root_indices)
631+
.expect("valid column indices"),
632+
);
633+
634+
return ParquetReadPlan {
635+
projection_mask,
636+
projected_schema,
637+
};
638+
}
639+
640+
// secondary fast path: if the schema has no struct columns, we can skip
641+
// PushdownChecker traversal and use root-level projection
642+
let has_struct_columns = file_schema
643+
.fields()
644+
.iter()
645+
.any(|f| matches!(f.data_type(), DataType::Struct(_)));
646+
647+
if !has_struct_columns {
648+
let mut root_indices = exprs
649+
.into_iter()
650+
.flat_map(|e| collect_columns(&e).into_iter().map(|col| col.index()))
651+
.collect::<Vec<_>>();
652+
653+
root_indices.sort_unstable();
654+
root_indices.dedup();
655+
656+
let projection_mask =
657+
ProjectionMask::roots(schema_descr, root_indices.iter().copied());
658+
659+
let projected_schema = Arc::new(
660+
file_schema
661+
.project(&root_indices)
662+
.expect("valid column indices"),
663+
);
664+
665+
return ParquetReadPlan {
666+
projection_mask,
667+
projected_schema,
668+
};
669+
}
670+
671+
let mut all_root_indices = Vec::new();
672+
let mut all_struct_accesses = Vec::new();
673+
674+
for expr in exprs {
675+
let mut checker = PushdownChecker::new(file_schema, true);
676+
let _ = expr.visit(&mut checker);
677+
let columns = checker.into_sorted_columns();
678+
679+
all_root_indices.extend_from_slice(&columns.required_columns);
680+
all_struct_accesses.extend(columns.struct_field_accesses);
681+
}
682+
683+
all_root_indices.sort_unstable();
684+
all_root_indices.dedup();
685+
686+
// when no struct field accesses were found, fall back to root-level projection
687+
// to match the performance of the simple path
688+
if all_struct_accesses.is_empty() {
689+
let projection_mask =
690+
ProjectionMask::roots(schema_descr, all_root_indices.iter().copied());
691+
let projected_schema = Arc::new(
692+
file_schema
693+
.project(&all_root_indices)
694+
.expect("valid column indices"),
695+
);
696+
697+
return ParquetReadPlan {
698+
projection_mask,
699+
projected_schema,
700+
};
701+
}
702+
703+
let leaf_indices = {
704+
let mut out =
705+
leaf_indices_for_roots(all_root_indices.iter().copied(), schema_descr);
706+
let struct_leaf_indices =
707+
resolve_struct_field_leaves(&all_struct_accesses, file_schema, schema_descr);
708+
709+
out.extend_from_slice(&struct_leaf_indices);
710+
out.sort_unstable();
711+
out.dedup();
712+
713+
out
714+
};
715+
716+
let projection_mask =
717+
ProjectionMask::leaves(schema_descr, leaf_indices.iter().copied());
718+
719+
let projected_schema =
720+
build_filter_schema(file_schema, &all_root_indices, &all_struct_accesses);
721+
722+
ParquetReadPlan {
723+
projection_mask,
724+
projected_schema,
725+
}
726+
}
727+
582728
fn leaf_indices_for_roots<I>(
583729
root_indices: I,
584730
schema_descr: &SchemaDescriptor,
@@ -654,6 +800,8 @@ fn build_filter_schema(
654800
regular_indices: &[usize],
655801
struct_field_accesses: &[StructFieldAccess],
656802
) -> SchemaRef {
803+
let regular_set: BTreeSet<usize> = regular_indices.iter().copied().collect();
804+
657805
let all_indices = regular_indices
658806
.iter()
659807
.copied()
@@ -669,6 +817,15 @@ fn build_filter_schema(
669817
.map(|&idx| {
670818
let field = file_schema.field(idx);
671819

820+
// if this column appears as a regular (whole-column) reference,
821+
// keep the full type
822+
//
823+
// Pruning is only valid when the column is accessed exclusively
824+
// through struct field accesses
825+
if regular_set.contains(&idx) {
826+
return Arc::new(field.clone());
827+
}
828+
672829
// collect all field paths that access this root struct column
673830
let field_paths = struct_field_accesses
674831
.iter()
@@ -683,7 +840,6 @@ fn build_filter_schema(
683840
.collect::<Vec<_>>();
684841

685842
if field_paths.is_empty() {
686-
// its a regular column - use the full type
687843
return Arc::new(field.clone());
688844
}
689845

@@ -696,7 +852,10 @@ fn build_filter_schema(
696852
})
697853
.collect::<Vec<_>>();
698854

699-
Arc::new(Schema::new(fields))
855+
Arc::new(Schema::new_with_metadata(
856+
fields,
857+
file_schema.metadata().clone(),
858+
))
700859
}
701860

702861
fn prune_struct_type(dt: &DataType, paths: &[&[String]]) -> DataType {
@@ -958,6 +1117,8 @@ mod test {
9581117
use parquet::file::reader::{FileReader, SerializedFileReader};
9591118
use tempfile::NamedTempFile;
9601119

1120+
use datafusion_physical_expr::expressions::Column as PhysicalColumn;
1121+
9611122
// List predicates used by the decoder should be accepted for pushdown
9621123
#[test]
9631124
fn test_filter_candidate_builder_supports_list_types() {
@@ -1814,6 +1975,86 @@ mod test {
18141975
assert_eq!(file_metrics.pushdown_rows_matched.value(), 2);
18151976
}
18161977

1978+
#[test]
1979+
fn projection_read_plan_preserves_full_struct() {
1980+
// Schema: id (Int32), s (Struct{value: Int32, label: Utf8})
1981+
// Parquet leaves: id=0, s.value=1, s.label=2
1982+
let struct_fields: Fields = vec![
1983+
Arc::new(Field::new("value", DataType::Int32, false)),
1984+
Arc::new(Field::new("label", DataType::Utf8, false)),
1985+
]
1986+
.into();
1987+
1988+
let schema = Arc::new(Schema::new(vec![
1989+
Field::new("id", DataType::Int32, false),
1990+
Field::new("s", DataType::Struct(struct_fields.clone()), false),
1991+
]));
1992+
1993+
let batch = RecordBatch::try_new(
1994+
Arc::clone(&schema),
1995+
vec![
1996+
Arc::new(Int32Array::from(vec![1, 2, 3])),
1997+
Arc::new(StructArray::new(
1998+
struct_fields,
1999+
vec![
2000+
Arc::new(Int32Array::from(vec![10, 20, 30])) as _,
2001+
Arc::new(StringArray::from(vec!["a", "b", "c"])) as _,
2002+
],
2003+
None,
2004+
)),
2005+
],
2006+
)
2007+
.unwrap();
2008+
2009+
let file = NamedTempFile::new().expect("temp file");
2010+
let mut writer =
2011+
ArrowWriter::try_new(file.reopen().unwrap(), Arc::clone(&schema), None)
2012+
.expect("writer");
2013+
writer.write(&batch).expect("write batch");
2014+
writer.close().expect("close writer");
2015+
2016+
let reader_file = file.reopen().expect("reopen file");
2017+
let builder = ParquetRecordBatchReaderBuilder::try_new(reader_file)
2018+
.expect("reader builder");
2019+
let metadata = builder.metadata().clone();
2020+
let file_schema = builder.schema().clone();
2021+
let schema_descr = metadata.file_metadata().schema_descr();
2022+
2023+
// Simulate SELECT * output projection: Column("id") and Column("s")
2024+
// Plus a get_field(s, 'value') expression from the pushed-down filter
2025+
let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
2026+
Arc::new(PhysicalColumn::new("id", 0)),
2027+
Arc::new(PhysicalColumn::new("s", 1)),
2028+
logical2physical(
2029+
&get_field().call(vec![
2030+
col("s"),
2031+
Expr::Literal(ScalarValue::Utf8(Some("value".to_string())), None),
2032+
]),
2033+
&file_schema,
2034+
),
2035+
];
2036+
2037+
let read_plan = build_projection_read_plan(exprs, &file_schema, schema_descr);
2038+
2039+
// The projected schema must have the FULL struct type because Column("s")
2040+
// is in the projection. It should NOT be narrowed to Struct{value: Int32}.
2041+
let s_field = read_plan.projected_schema.field_with_name("s").unwrap();
2042+
assert_eq!(
2043+
s_field.data_type(),
2044+
&DataType::Struct(
2045+
vec![
2046+
Arc::new(Field::new("value", DataType::Int32, false)),
2047+
Arc::new(Field::new("label", DataType::Utf8, false)),
2048+
]
2049+
.into()
2050+
),
2051+
);
2052+
2053+
// all3 Parquet leaves should be in the projection mask
2054+
let expected_mask = ProjectionMask::leaves(schema_descr, [0, 1, 2]);
2055+
assert_eq!(read_plan.projection_mask, expected_mask,);
2056+
}
2057+
18172058
/// Sanity check that the given expression could be evaluated against the given schema without any errors.
18182059
/// This will fail if the expression references columns that are not in the schema or if the types of the columns are incompatible, etc.
18192060
fn check_expression_can_evaluate_against_schema(

0 commit comments

Comments
 (0)