Skip to content

Commit 152d8c4

Browse files
authored
Add file_row_index UDF to query file-level row indexes from Parquet files (#22604)
## Which issue does this PR close? - Part of #20135 ## Rationale for this change This PR includes the "front end" side of @mbutrovich's #22026, bridging the last mile to allow users to query file row indexes. ## What changes are included in this PR? 1. A new Scalar UDF `file_row_index`, following #20071's example. The function returns 0-based row indexes for Parquet scans. 2. Expands the row-filter PushdownChecker to also check if the predicate contains the new function, denying it from being pushed down if it does. 3. I've added a couple of utilities to find or rewrite ScalarUDF instances in physical expressions trees, I've seen @alamb point this mistake out in multiple PRs (including [here](#20071 (comment))). They can also be used in #20071. They are currently in `schema_rewriter.rs` which was the best place I could think of, but maybe they should be move elsewhere. 4. A dedicated rewrite function for `file_row_index`, which turns it into a `Cast(Column(...))`, which is required to return Int64 values. 5. In `ParquetSource::try_pushdown_projection`, we look for `FileRowIndexFunc`, and if it exists we rewrite it and the source's table schema. ## Are these changes tested? In addition to individual unit tests, I've added a new SLT file (`file_row_index.slt`) that tests for the following cases: 1. Querying `file_row_index` from a table backed by multiple files 2. Filtering on `file_row_index` when its part of the projection 3. Filtering on `file_row_index` when its **not** of the projection, when filter pushdown is either enabled or disabled (this part didn't work in a previous iteration, but figured it out today). ## Are there any user-facing changes? 1. New scalar function type - `FileRowIndexFunc`/`file_row_index`, 5. Rewrite logic in `physical-expr-adapter` - `rewrite_file_row_index_expr` specifically for the new UDF, `rewrite_file_row_index_projection` to rewrite the `ProjectionExprs` and two utility functions that should make it clearer how to manipulate and find ScalarUDFs in physical expressions - `expr_references_scalar_udf` and `rewrite_scalar_udf`. --------- Signed-off-by: Adam Gutglick <adamgsal@gmail.com>
1 parent a1e88e2 commit 152d8c4

9 files changed

Lines changed: 780 additions & 23 deletions

File tree

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ use arrow::array::BooleanArray;
7272
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
7373
use arrow::error::{ArrowError, Result as ArrowResult};
7474
use arrow::record_batch::RecordBatch;
75+
use datafusion_functions::core::file_row_index::FileRowIndexFunc;
7576
use datafusion_functions::core::getfield::GetFieldFunc;
7677
use parquet::arrow::ProjectionMask;
7778
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
@@ -260,6 +261,9 @@ struct PushdownChecker<'schema> {
260261
non_primitive_columns: bool,
261262
/// Does the expression reference any columns not present in the file schema?
262263
projected_columns: bool,
264+
/// Does the expression references a ScalarUDF that requires some rewrite
265+
/// and therefore can't be pushed down into the row-filter.
266+
has_unpushable_udfs: bool,
263267
/// Indices into the file schema of columns required to evaluate the expression.
264268
/// Does not include struct columns accessed via `get_field`.
265269
required_columns: Vec<usize>,
@@ -276,6 +280,7 @@ impl<'schema> PushdownChecker<'schema> {
276280
Self {
277281
non_primitive_columns: false,
278282
projected_columns: false,
283+
has_unpushable_udfs: false,
279284
required_columns: Vec::new(),
280285
struct_field_accesses: Vec::new(),
281286
allow_list_columns,
@@ -372,7 +377,7 @@ impl<'schema> PushdownChecker<'schema> {
372377

373378
#[inline]
374379
fn prevents_pushdown(&self) -> bool {
375-
self.non_primitive_columns || self.projected_columns
380+
self.non_primitive_columns || self.projected_columns || self.has_unpushable_udfs
376381
}
377382

378383
/// Consumes the checker and returns sorted, deduplicated column indices
@@ -484,6 +489,13 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
484489
return Ok(recursion);
485490
}
486491

492+
if ScalarFunctionExpr::try_downcast_func::<FileRowIndexFunc>(node.as_ref())
493+
.is_some()
494+
{
495+
self.has_unpushable_udfs = true;
496+
return Ok(TreeNodeRecursion::Jump);
497+
}
498+
487499
Ok(TreeNodeRecursion::Continue)
488500
}
489501
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 117 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ use crate::opener::ParquetMorselizer;
2626
use crate::opener::build_pruning_predicates;
2727
use crate::opener::build_virtual_columns_state;
2828
use crate::row_filter::can_expr_be_pushed_down_with_schemas;
29+
use arrow_schema::Fields;
30+
use arrow_schema::extension::ExtensionType;
31+
use arrow_schema::{DataType, Field};
2932
use datafusion_common::config::ConfigOptions;
3033
#[cfg(feature = "parquet_encryption")]
3134
use datafusion_common::config::EncryptionFactoryOptions;
@@ -40,9 +43,14 @@ use datafusion_common::config::TableParquetOptions;
4043
use datafusion_datasource::TableSchema;
4144
use datafusion_datasource::file::FileSource;
4245
use datafusion_datasource::file_scan_config::FileScanConfig;
46+
use datafusion_functions::core::file_row_index::FileRowIndexFunc;
47+
use datafusion_physical_expr::expressions::Column;
4348
use datafusion_physical_expr::projection::ProjectionExprs;
4449
use datafusion_physical_expr::{EquivalenceProperties, conjunction};
45-
use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
50+
use datafusion_physical_expr_adapter::expr_references_scalar_udf;
51+
use datafusion_physical_expr_adapter::{
52+
DefaultPhysicalExprAdapterFactory, rewrite_file_row_index_projection,
53+
};
4654
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
4755
use datafusion_physical_expr_common::physical_expr::fmt_sql;
4856
use datafusion_physical_plan::DisplayFormatType;
@@ -60,6 +68,7 @@ use datafusion_execution::parquet_encryption::EncryptionFactory;
6068
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
6169
use itertools::Itertools;
6270
use object_store::ObjectStore;
71+
use parquet::arrow::RowNumber;
6372
#[cfg(feature = "parquet_encryption")]
6473
use parquet::encryption::decrypt::FileDecryptionProperties;
6574

@@ -669,7 +678,28 @@ impl FileSource for ParquetSource {
669678
projection: &ProjectionExprs,
670679
) -> datafusion_common::Result<Option<Arc<dyn FileSource>>> {
671680
let mut source = self.clone();
672-
source.projection = self.projection.try_merge(projection)?;
681+
682+
// If there's no reference to `FileRowIndexFunc` in the projection, we can just merge
683+
// both projections as-is, there's no need to modify the projection first.
684+
if !projection.iter().any(|projection_expr| {
685+
expr_references_scalar_udf::<FileRowIndexFunc>(&projection_expr.expr)
686+
}) {
687+
source.projection = self.projection.try_merge(projection)?;
688+
return Ok(Some(Arc::new(source)));
689+
}
690+
691+
// If we can find a reference to `FileRowIndexFunc`, we add it as a virtual column
692+
// or re-use an existing one in the table's schema.
693+
let (table_schema, row_index_col) =
694+
table_schema_with_row_index_col(self.table_schema());
695+
696+
source.table_schema = table_schema;
697+
source.projection = rewrite_file_row_index_projection(
698+
&self.projection,
699+
projection,
700+
&row_index_col,
701+
)?;
702+
673703
Ok(Some(Arc::new(source)))
674704
}
675705

@@ -952,15 +982,12 @@ impl FileSource for ParquetSource {
952982
reversed_eq_properties.ordering_satisfy(order.iter().cloned())?;
953983
let sort_order = LexOrdering::new(order.iter().cloned());
954984
let column_in_file_schema = sort_order.as_ref().is_some_and(|s| {
955-
s.first()
956-
.expr
957-
.downcast_ref::<datafusion_physical_expr::expressions::Column>()
958-
.is_some_and(|col| {
959-
self.table_schema
960-
.file_schema()
961-
.field_with_name(col.name())
962-
.is_ok()
963-
})
985+
s.first().expr.downcast_ref::<Column>().is_some_and(|col| {
986+
self.table_schema
987+
.file_schema()
988+
.field_with_name(col.name())
989+
.is_ok()
990+
})
964991
});
965992

966993
if !column_in_file_schema && !reversed_satisfies {
@@ -989,6 +1016,69 @@ impl FileSource for ParquetSource {
9891016
}
9901017
}
9911018

1019+
/// Returns the a [`TableSchema`] containing a [`RowNumber`] virtual column and a [`Column`] expression referencing its row index column.
1020+
/// The expression is then merged into a projection.
1021+
///
1022+
/// - If the schema already has a virtual column with the [`RowNumber`] type, it returns the schema unchanged.
1023+
/// - If the schema doesn't have the appropriate virtual column, it returns a modified schema with the virtual column appended to it.
1024+
fn table_schema_with_row_index_col(table_schema: &TableSchema) -> (TableSchema, Column) {
1025+
// If we can find a virtual column with the `RowNumber` type, we just return the schema
1026+
// and create the appropriate `column` we're going to use
1027+
if let Some((idx, field)) =
1028+
table_schema
1029+
.virtual_columns()
1030+
.iter()
1031+
.enumerate()
1032+
.find(|(_, field)| {
1033+
field
1034+
.extension_type_name()
1035+
.is_some_and(|name| name == RowNumber::NAME)
1036+
})
1037+
{
1038+
let virtual_offset = table_schema.file_schema().fields().len()
1039+
+ table_schema.table_partition_cols().len();
1040+
1041+
return (
1042+
table_schema.clone(),
1043+
Column::new(field.name(), virtual_offset + idx),
1044+
);
1045+
}
1046+
1047+
// The hidden field is shared across all files in this scan, but it must
1048+
// have a unique table-schema name because later rewrites resolve it by
1049+
// column name and index.
1050+
let base_row_index_name = "__datafusion_file_row_index";
1051+
let mut row_index_name = base_row_index_name.to_string();
1052+
let mut suffix = 0;
1053+
while table_schema
1054+
.table_schema()
1055+
.field_with_name(&row_index_name)
1056+
.is_ok()
1057+
{
1058+
suffix += 1;
1059+
row_index_name = format!("{base_row_index_name}_{suffix}");
1060+
}
1061+
1062+
let row_index_table_idx = table_schema.table_schema().fields().len();
1063+
let row_index_field = Arc::new(
1064+
Field::new(&row_index_name, DataType::Int64, true).with_extension_type(RowNumber),
1065+
);
1066+
(
1067+
TableSchema::builder(Arc::clone(table_schema.file_schema()))
1068+
.with_table_partition_cols(table_schema.table_partition_cols().clone())
1069+
.with_virtual_columns(
1070+
table_schema
1071+
.virtual_columns()
1072+
.iter()
1073+
.cloned()
1074+
.chain([row_index_field])
1075+
.collect::<Fields>(),
1076+
)
1077+
.build(),
1078+
Column::new(&row_index_name, row_index_table_idx),
1079+
)
1080+
}
1081+
9921082
#[cfg(test)]
9931083
mod tests {
9941084
use super::*;
@@ -1622,7 +1712,9 @@ mod tests {
16221712
use datafusion_common::config::ConfigOptions;
16231713
use datafusion_datasource::TableSchema;
16241714
use datafusion_expr::{col, lit as logical_lit};
1715+
use datafusion_functions::core::expr_fn::file_row_index;
16251716
use datafusion_physical_expr::planner::logical2physical;
1717+
use datafusion_physical_expr_adapter::rewrite_file_row_index_expr;
16261718
use datafusion_physical_plan::filter_pushdown::PushedDown;
16271719
use parquet::arrow::RowNumber;
16281720

@@ -1652,13 +1744,20 @@ mod tests {
16521744
.or(col("value").eq(logical_lit(4i64))),
16531745
full_schema,
16541746
);
1747+
let (_, row_index_col) = table_schema_with_row_index_col(source.table_schema());
1748+
let row_index = rewrite_file_row_index_expr(
1749+
logical2physical(&file_row_index().gt(logical_lit(2i64)), full_schema),
1750+
row_index_col.name(),
1751+
row_index_col.index(),
1752+
)
1753+
.expect("file_row_index should rewrite to the row_number virtual column");
16551754

16561755
let config = ConfigOptions::default();
16571756
let prop = source
1658-
.try_pushdown_filters(vec![pushable, virtual_only, mixed], &config)
1757+
.try_pushdown_filters(vec![pushable, virtual_only, mixed, row_index], &config)
16591758
.expect("try_pushdown_filters must not error");
16601759

1661-
assert_eq!(prop.filters.len(), 3);
1760+
assert_eq!(prop.filters.len(), 4);
16621761
assert!(
16631762
matches!(prop.filters[0], PushedDown::Yes),
16641763
"file-column filter should be pushable"
@@ -1672,5 +1771,10 @@ mod tests {
16721771
"filter mixing a virtual column with a file column must not be \
16731772
pushed down (row filter would silently drop it)"
16741773
);
1774+
assert!(
1775+
matches!(prop.filters[3], PushedDown::No),
1776+
"file_row_index() rewrites to a virtual column and must not be \
1777+
pushed down"
1778+
);
16751779
}
16761780
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Implementation of the `file_row_index` scalar function.
19+
20+
use arrow::datatypes::DataType;
21+
use datafusion_common::utils::take_function_args;
22+
use datafusion_common::{Result, exec_err};
23+
use datafusion_doc::Documentation;
24+
use datafusion_expr::{
25+
ColumnarValue, ExpressionPlacement, ScalarFunctionArgs, ScalarUDFImpl, Signature,
26+
Volatility,
27+
};
28+
use datafusion_macros::user_doc;
29+
30+
/// Scalar UDF implementation for `file_row_index()`.
31+
///
32+
/// File sources that can expose per-file row indexes rewrite this placeholder
33+
/// function into a source-provided physical expression. Direct evaluation
34+
/// returns an error because there is no file context outside a scan.
35+
#[user_doc(
36+
doc_section(label = "Other Functions"),
37+
description = r#"Returns the zero-based row offset within the source file
38+
that produced the current row.
39+
40+
The value is scoped to one file, so rows from different files in the same scan
41+
can have the same row index. This function is intended to be rewritten at
42+
file-scan time. If the input file is not known (for example, if this function
43+
is evaluated outside a file scan, or was not pushed down into one), direct
44+
evaluation returns an error.
45+
"#,
46+
syntax_example = "file_row_index()",
47+
sql_example = r#"```sql
48+
SELECT file_row_index() FROM t;
49+
```"#
50+
)]
51+
#[derive(Debug, PartialEq, Eq, Hash)]
52+
pub struct FileRowIndexFunc {
53+
signature: Signature,
54+
}
55+
56+
impl Default for FileRowIndexFunc {
57+
fn default() -> Self {
58+
Self::new()
59+
}
60+
}
61+
62+
impl FileRowIndexFunc {
63+
pub fn new() -> Self {
64+
Self {
65+
signature: Signature::nullary(Volatility::Volatile),
66+
}
67+
}
68+
}
69+
70+
impl ScalarUDFImpl for FileRowIndexFunc {
71+
fn name(&self) -> &str {
72+
"file_row_index"
73+
}
74+
75+
fn signature(&self) -> &Signature {
76+
&self.signature
77+
}
78+
79+
fn return_type(&self, args: &[DataType]) -> Result<DataType> {
80+
let [] = take_function_args(self.name(), args)?;
81+
Ok(DataType::Int64)
82+
}
83+
84+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
85+
let [] = take_function_args(self.name(), args.args)?;
86+
exec_err!("file_row_index() is source dependent and cannot be evaluated directly")
87+
}
88+
89+
fn placement(&self, _args: &[ExpressionPlacement]) -> ExpressionPlacement {
90+
ExpressionPlacement::MoveTowardsLeafNodes
91+
}
92+
93+
fn documentation(&self) -> Option<&Documentation> {
94+
self.doc()
95+
}
96+
}

datafusion/functions/src/core/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub mod arrowtypeof;
2828
pub mod cast_to_type;
2929
pub mod coalesce;
3030
pub mod expr_ext;
31+
pub mod file_row_index;
3132
pub mod getfield;
3233
pub mod greatest;
3334
mod greatest_least_utils;
@@ -67,6 +68,7 @@ make_udf_function!(version::VersionFunc, version);
6768
make_udf_function!(arrow_metadata::ArrowMetadataFunc, arrow_metadata);
6869
make_udf_function!(with_metadata::WithMetadataFunc, with_metadata);
6970
make_udf_function!(arrow_field::ArrowFieldFunc, arrow_field);
71+
make_udf_function!(file_row_index::FileRowIndexFunc, file_row_index);
7072

7173
pub mod expr_fn {
7274
use datafusion_expr::{Expr, Literal};
@@ -143,6 +145,9 @@ pub mod expr_fn {
143145
union_tag,
144146
"Returns the name of the currently selected field in the union",
145147
arg1
148+
),(
149+
file_row_index,
150+
"Returns the offset of the row within its source file",
146151
));
147152

148153
#[doc = "Returns the value of the field with the given name from the struct"]
@@ -196,5 +201,6 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
196201
union_tag(),
197202
version(),
198203
r#struct(),
204+
file_row_index(),
199205
]
200206
}

datafusion/physical-expr-adapter/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,6 @@ pub mod schema_rewriter;
2929
pub use schema_rewriter::{
3030
BatchAdapter, BatchAdapterFactory, DefaultPhysicalExprAdapter,
3131
DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory,
32-
replace_columns_with_literals,
32+
expr_references_scalar_udf, replace_columns_with_literals,
33+
rewrite_file_row_index_expr, rewrite_file_row_index_projection,
3334
};

0 commit comments

Comments
 (0)