Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/datasource-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ all-features = true

[dependencies]
arrow = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
datafusion-common = { workspace = true, features = ["object_store", "parquet"] }
Expand Down
2 changes: 2 additions & 0 deletions datafusion/datasource-parquet/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ mod row_group_filter;
mod sort;
pub mod source;
mod supported_predicates;
mod virtual_column;
mod writer;

pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
Expand All @@ -46,4 +47,5 @@ pub use reader::*; // Expose so downstream crates can use it
pub use row_filter::build_row_filter;
pub use row_filter::can_expr_be_pushed_down_with_schemas;
pub use row_group_filter::RowGroupAccessPlanFilter;
pub use virtual_column::ParquetVirtualColumn;
pub use writer::plan_to_parquet;
704 changes: 684 additions & 20 deletions datafusion/datasource-parquet/src/opener.rs

Large diffs are not rendered by default.

89 changes: 87 additions & 2 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::DefaultParquetFileReaderFactory;
use crate::ParquetFileReaderFactory;
use crate::opener::ParquetMorselizer;
use crate::opener::build_pruning_predicates;
use crate::opener::build_virtual_columns_state;
use crate::row_filter::can_expr_be_pushed_down_with_schemas;
use datafusion_common::config::ConfigOptions;
#[cfg(feature = "parquet_encryption")]
Expand Down Expand Up @@ -553,6 +554,22 @@ impl FileSource for ParquetSource {
.as_ref()
.map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap());

// Validate virtual columns (extension-type allowlist) and, when
// pushdown is enabled, reject predicates that reference them. Both
// checks depend only on morselizer-level state, so we pay their cost
// once per scan partition rather than per file.
//
// Gating predicate validation on `pushdown_filters` is deliberate:
// when pushdown is off the predicate stays above the scan as a
// `FilterExec` and resolves virtual columns there; the row-filter
// ban only applies to the pushdown path.
let virtual_state = build_virtual_columns_state(
self.table_schema.virtual_columns(),
self.table_schema.file_schema(),
self.predicate.as_ref(),
self.pushdown_filters(),
)?;

Ok(Box::new(ParquetMorselizer {
partition_index: partition,
projection: self.projection.clone(),
Expand Down Expand Up @@ -580,6 +597,7 @@ impl FileSource for ParquetSource {
encryption_factory: self.get_encryption_factory_with_config(),
max_predicate_cache_size: self.max_predicate_cache_size(),
reverse_row_groups: self.reverse_row_groups,
virtual_state,
}))
}

Expand Down Expand Up @@ -678,7 +696,12 @@ impl FileSource for ParquetSource {
filters: Vec<Arc<dyn PhysicalExpr>>,
config: &ConfigOptions,
) -> datafusion_common::Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
let table_schema = self.table_schema.table_schema();
// Use the schema excluding virtual columns: virtual columns (e.g.
// Parquet `row_number`) are produced by the reader itself and cannot
// be referenced inside a RowFilter, so predicates that reference them
// must not be marked as pushed down — otherwise the scan would
// silently drop them and produce wrong results.
let pushable_schema = self.table_schema.schema_without_virtual_columns();
// Determine if based on configs we should push filters down.
// If either the table / scan itself or the config has pushdown enabled,
// we will push down the filters.
Expand All @@ -694,7 +717,7 @@ impl FileSource for ParquetSource {
let filters: Vec<PushedDownPredicate> = filters
.into_iter()
.map(|filter| {
if can_expr_be_pushed_down_with_schemas(&filter, table_schema) {
if can_expr_be_pushed_down_with_schemas(&filter, &pushable_schema) {
PushedDownPredicate::supported(filter)
} else {
PushedDownPredicate::unsupported(filter)
Expand Down Expand Up @@ -946,4 +969,66 @@ mod tests {
assert!(source.reverse_row_groups());
assert!(source.filter().is_some());
}

#[test]
fn test_try_pushdown_filters_rejects_virtual_column_refs() {
// Virtual columns are produced by the reader and cannot be referenced
// inside a RowFilter. `try_pushdown_filters` must report such filters
// as `PushedDown::No` so the FilterExec above the scan stays in
// place — otherwise the scan would silently drop the predicate and
// produce wrong results.
use arrow::datatypes::{DataType, Field, FieldRef, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_datasource::TableSchema;
use datafusion_expr::{col, lit as logical_lit};
use datafusion_physical_expr::planner::logical2physical;
use datafusion_physical_plan::filter_pushdown::PushedDown;
use parquet::arrow::RowNumber;

let file_schema = Arc::new(Schema::new(vec![Field::new(
"value",
DataType::Int64,
false,
)]));
let row_number_field: FieldRef = Arc::new(
Field::new("row_number", DataType::Int64, false)
.with_extension_type(RowNumber),
);
let table_schema = TableSchema::from_file_schema(file_schema)
.with_virtual_columns(vec![row_number_field]);

let source = ParquetSource::new(table_schema).with_pushdown_filters(true);

let full_schema = source.table_schema.table_schema();

let pushable = logical2physical(&col("value").eq(logical_lit(1i64)), full_schema);
let virtual_only =
logical2physical(&col("row_number").eq(logical_lit(2i64)), full_schema);
let mixed = logical2physical(
&col("row_number")
.eq(logical_lit(2i64))
.or(col("value").eq(logical_lit(4i64))),
full_schema,
);

let config = ConfigOptions::default();
let prop = source
.try_pushdown_filters(vec![pushable, virtual_only, mixed], &config)
.expect("try_pushdown_filters must not error");

assert_eq!(prop.filters.len(), 3);
assert!(
matches!(prop.filters[0], PushedDown::Yes),
"file-column filter should be pushable"
);
assert!(
matches!(prop.filters[1], PushedDown::No),
"filter referencing only a virtual column must not be pushed down"
);
assert!(
matches!(prop.filters[2], PushedDown::No),
"filter mixing a virtual column with a file column must not be \
pushed down (row filter would silently drop it)"
);
}
}
125 changes: 125 additions & 0 deletions datafusion/datasource-parquet/src/virtual_column.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Typed wrapper for parquet virtual columns.
//!
//! arrow-rs identifies virtual columns via arrow extension types carried on
//! the `FieldRef`. [`ParquetVirtualColumn`] lifts that contract into the type
//! system so callers validate at the boundary (via `TryFrom<&FieldRef>`)
//! rather than string-comparing extension-type names deep inside the reader.

use arrow::datatypes::FieldRef;
use arrow_schema::extension::ExtensionType;
use datafusion_common::{DataFusionError, Result, not_impl_err};
use parquet::arrow::RowNumber;
use std::sync::Arc;

/// A parquet virtual column validated to have a supported arrow extension
/// type.
///
/// Construct via [`TryFrom<&FieldRef>`]; add a new variant (and update the
/// `TryFrom` impl) when DataFusion gains support for another arrow-rs virtual
/// extension type.
#[derive(Debug, Clone)]
pub enum ParquetVirtualColumn {
/// Absolute row number within the parquet file. Backed by arrow-rs's
/// [`RowNumber`] extension type.
RowNumber(FieldRef),
}

impl ParquetVirtualColumn {
pub fn field(&self) -> &FieldRef {
match self {
Self::RowNumber(field) => field,
}
}
}

impl From<ParquetVirtualColumn> for FieldRef {
fn from(col: ParquetVirtualColumn) -> Self {
match col {
ParquetVirtualColumn::RowNumber(field) => field,
}
}
}

impl TryFrom<&FieldRef> for ParquetVirtualColumn {
type Error = DataFusionError;

fn try_from(field: &FieldRef) -> Result<Self> {
let Some(name) = field.extension_type_name() else {
return not_impl_err!(
"Virtual column '{}' is missing an Arrow extension type; \
supported extension types: [{}]",
field.name(),
RowNumber::NAME
);
};
match name {
n if n == RowNumber::NAME => Ok(Self::RowNumber(Arc::clone(field))),
other => not_impl_err!(
"Virtual column '{}' uses unsupported Arrow extension type '{}'; \
supported types: [{}]. Add a ParquetVirtualColumn variant and \
a test for this type before wiring it through.",
field.name(),
other,
RowNumber::NAME
),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::{DataType, Field};

#[test]
fn row_number_field_converts() {
let field: FieldRef = Arc::new(
Field::new("row_number", DataType::Int64, false)
.with_extension_type(RowNumber),
);
let col = ParquetVirtualColumn::try_from(&field).expect("valid row_number");
assert!(matches!(col, ParquetVirtualColumn::RowNumber(_)));
assert_eq!(col.field().name(), "row_number");
}

#[test]
fn missing_extension_type_rejected() {
let field: FieldRef = Arc::new(Field::new("plain", DataType::Int64, false));
let err = ParquetVirtualColumn::try_from(&field).unwrap_err();
assert!(
err.to_string().contains("missing an Arrow extension type"),
"got: {err}"
);
}

#[test]
fn unsupported_extension_type_rejected() {
// RowGroupIndex is a real arrow-rs virtual type not yet in our enum.
let field: FieldRef = Arc::new(
Field::new("row_group_index", DataType::Int64, false)
.with_extension_type(parquet::arrow::RowGroupIndex),
);
let err = ParquetVirtualColumn::try_from(&field).unwrap_err();
assert!(
err.to_string().contains("parquet.virtual.row_group_index"),
"error should name the offending extension type, got: {err}"
);
}
}
Loading
Loading