Skip to content

Commit c8e235a

Browse files
committed
add row-index virtual column
1 parent fb1c0f3 commit c8e235a

2 files changed

Lines changed: 177 additions & 10 deletions

File tree

datafusion/datasource-parquet/src/opener/mod.rs

Lines changed: 159 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use std::future::Future;
4444
use std::mem;
4545
use std::sync::Arc;
4646

47-
use arrow::datatypes::{SchemaRef, TimeUnit};
47+
use arrow::datatypes::{FieldRef, Schema, SchemaRef, TimeUnit};
4848
#[cfg(feature = "parquet_encryption")]
4949
use datafusion_common::encryption::FileDecryptionProperties;
5050
use datafusion_common::stats::Precision;
@@ -71,6 +71,7 @@ use parquet::arrow::ParquetRecordBatchStreamBuilder;
7171
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
7272
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
7373
use parquet::arrow::async_reader::AsyncFileReader;
74+
use parquet::arrow::is_virtual_column;
7475
use parquet::arrow::parquet_column;
7576
use parquet::basic::Type;
7677
use parquet::bloom_filter::Sbbf;
@@ -305,6 +306,27 @@ struct MetadataLoadedParquetOpen {
305306
prepared: PreparedParquetOpen,
306307
reader_metadata: ArrowReaderMetadata,
307308
options: ArrowReaderOptions,
309+
virtual_columns: Vec<FieldRef>,
310+
}
311+
312+
/// Split a schema into (real-fields-only schema, virtual-field list).
313+
fn split_virtual_fields(schema: &SchemaRef) -> (SchemaRef, Vec<FieldRef>) {
314+
let mut real = Vec::with_capacity(schema.fields().len());
315+
let mut virt = Vec::new();
316+
for field in schema.fields() {
317+
if is_virtual_column(field) {
318+
virt.push(Arc::clone(field));
319+
} else {
320+
real.push(Arc::clone(field));
321+
}
322+
}
323+
if virt.is_empty() {
324+
return (Arc::clone(schema), virt);
325+
}
326+
(
327+
Arc::new(Schema::new_with_metadata(real, schema.metadata.clone())),
328+
virt,
329+
)
308330
}
309331

310332
/// State of [`ParquetOpenState`]
@@ -723,15 +745,18 @@ impl PreparedParquetOpen {
723745
// unnecessary I/O. We decide later if it is needed to evaluate the
724746
// pruning predicates. Thus default to not requesting it from the
725747
// underlying reader.
726-
let options =
748+
let mut options =
727749
ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip);
728750
#[cfg(feature = "parquet_encryption")]
729-
let mut options = options;
730-
#[cfg(feature = "parquet_encryption")]
731751
if let Some(fd_val) = &self.file_decryption_properties {
732752
options = options.with_file_decryption_properties(Arc::clone(fd_val));
733753
}
734754

755+
let (_, virtual_columns) = split_virtual_fields(&self.logical_file_schema);
756+
if !virtual_columns.is_empty() {
757+
options = options.with_virtual_columns(virtual_columns.clone())?;
758+
}
759+
735760
let mut metadata_timer = self.file_metrics.metadata_load_time.timer();
736761
// Begin by loading the metadata from the underlying reader (note
737762
// the returned metadata may actually include page indexes as some
@@ -747,6 +772,7 @@ impl PreparedParquetOpen {
747772
prepared: self,
748773
reader_metadata,
749774
options,
775+
virtual_columns,
750776
})
751777
}
752778
}
@@ -759,6 +785,7 @@ impl MetadataLoadedParquetOpen {
759785
mut prepared,
760786
mut reader_metadata,
761787
mut options,
788+
virtual_columns,
762789
} = self;
763790

764791
// Note about schemas: we are actually dealing with **3 different schemas** here:
@@ -770,6 +797,16 @@ impl MetadataLoadedParquetOpen {
770797
// parquet reader will actually produce.
771798
let mut physical_file_schema = Arc::clone(reader_metadata.schema());
772799

800+
// `with_schema` expects real fields only; virtual columns are tracked
801+
// separately on the options.
802+
let resupply_schema = |schema: &SchemaRef| -> SchemaRef {
803+
if virtual_columns.is_empty() {
804+
Arc::clone(schema)
805+
} else {
806+
split_virtual_fields(schema).0
807+
}
808+
};
809+
773810
// The schema loaded from the file may not be the same as the
774811
// desired schema (for example if we want to instruct the parquet
775812
// reader to read strings using Utf8View instead). Update if necessary
@@ -778,7 +815,7 @@ impl MetadataLoadedParquetOpen {
778815
&physical_file_schema,
779816
) {
780817
physical_file_schema = Arc::new(merged);
781-
options = options.with_schema(Arc::clone(&physical_file_schema));
818+
options = options.with_schema(resupply_schema(&physical_file_schema));
782819
reader_metadata = ArrowReaderMetadata::try_new(
783820
Arc::clone(reader_metadata.metadata()),
784821
options.clone(),
@@ -795,7 +832,7 @@ impl MetadataLoadedParquetOpen {
795832
.coerce()
796833
{
797834
physical_file_schema = Arc::new(merged);
798-
options = options.with_schema(Arc::clone(&physical_file_schema));
835+
options = options.with_schema(resupply_schema(&physical_file_schema));
799836
reader_metadata = ArrowReaderMetadata::try_new(
800837
Arc::clone(reader_metadata.metadata()),
801838
options.clone(),
@@ -855,6 +892,7 @@ impl MetadataLoadedParquetOpen {
855892
prepared,
856893
reader_metadata,
857894
options,
895+
virtual_columns,
858896
},
859897
pruning_predicate,
860898
page_pruning_predicate,
@@ -1081,6 +1119,7 @@ impl RowGroupsPrunedParquetOpen {
10811119
prepared,
10821120
reader_metadata,
10831121
options: _,
1122+
virtual_columns: _,
10841123
} = loaded;
10851124

10861125
let file_metadata = Arc::clone(reader_metadata.metadata());
@@ -1867,6 +1906,120 @@ mod test {
18671906
assert_eq!(num_rows, 0);
18681907
}
18691908

1909+
#[tokio::test]
1910+
async fn test_virtual_row_number_column() {
1911+
use arrow::array::Int64Array;
1912+
use parquet::arrow::RowNumber;
1913+
1914+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1915+
1916+
let batch1 =
1917+
record_batch!(("a", Int32, vec![Some(10), Some(11), Some(12)])).unwrap();
1918+
let batch2 =
1919+
record_batch!(("a", Int32, vec![Some(20), Some(21), Some(22)])).unwrap();
1920+
let props = WriterProperties::builder()
1921+
.set_max_row_group_row_count(Some(3))
1922+
.build();
1923+
let data_size = write_parquet_batches(
1924+
Arc::clone(&store),
1925+
"rownum.parquet",
1926+
vec![batch1, batch2],
1927+
Some(props),
1928+
)
1929+
.await;
1930+
1931+
let row_num_field = Arc::new(
1932+
Field::new("row_num", DataType::Int64, false).with_extension_type(RowNumber),
1933+
);
1934+
let file_schema = Arc::new(Schema::new(vec![
1935+
Arc::new(Field::new("a", DataType::Int32, true)),
1936+
row_num_field,
1937+
]));
1938+
1939+
let file = PartitionedFile::new(
1940+
"rownum.parquet".to_string(),
1941+
u64::try_from(data_size).unwrap(),
1942+
);
1943+
1944+
let morselizer = ParquetMorselizerBuilder::new()
1945+
.with_store(Arc::clone(&store))
1946+
.with_schema(Arc::clone(&file_schema))
1947+
.with_projection_indices(&[0, 1])
1948+
.build();
1949+
let stream = open_file(&morselizer, file).await.unwrap();
1950+
let mut row_nums: Vec<i64> = Vec::new();
1951+
let mut s = stream;
1952+
while let Some(Ok(batch)) = s.next().await {
1953+
let arr = batch
1954+
.column(1)
1955+
.as_any()
1956+
.downcast_ref::<Int64Array>()
1957+
.expect("row_num column is Int64");
1958+
row_nums.extend(arr.values().iter().copied());
1959+
}
1960+
assert_eq!(row_nums, vec![0, 1, 2, 3, 4, 5]);
1961+
}
1962+
1963+
#[tokio::test]
1964+
async fn test_virtual_row_number_column_with_row_group_pruning() {
1965+
use arrow::array::Int64Array;
1966+
use parquet::arrow::RowNumber;
1967+
1968+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1969+
1970+
let batch1 =
1971+
record_batch!(("a", Int32, vec![Some(10), Some(11), Some(12)])).unwrap();
1972+
let batch2 =
1973+
record_batch!(("a", Int32, vec![Some(20), Some(21), Some(22)])).unwrap();
1974+
let props = WriterProperties::builder()
1975+
.set_max_row_group_row_count(Some(3))
1976+
.build();
1977+
let data_size = write_parquet_batches(
1978+
Arc::clone(&store),
1979+
"rownum_prune.parquet",
1980+
vec![batch1, batch2],
1981+
Some(props),
1982+
)
1983+
.await;
1984+
1985+
let row_num_field = Arc::new(
1986+
Field::new("row_num", DataType::Int64, false).with_extension_type(RowNumber),
1987+
);
1988+
let file_schema = Arc::new(Schema::new(vec![
1989+
Arc::new(Field::new("a", DataType::Int32, true)),
1990+
row_num_field,
1991+
]));
1992+
1993+
let file = PartitionedFile::new(
1994+
"rownum_prune.parquet".to_string(),
1995+
u64::try_from(data_size).unwrap(),
1996+
);
1997+
1998+
let predicate =
1999+
logical2physical(&col("a").gt_eq(lit(20i32)), &file_schema);
2000+
2001+
let morselizer = ParquetMorselizerBuilder::new()
2002+
.with_store(Arc::clone(&store))
2003+
.with_schema(Arc::clone(&file_schema))
2004+
.with_projection_indices(&[0, 1])
2005+
.with_predicate(predicate)
2006+
.with_pushdown_filters(true)
2007+
.with_row_group_stats_pruning(true)
2008+
.build();
2009+
2010+
let mut s = open_file(&morselizer, file).await.unwrap();
2011+
let mut row_nums: Vec<i64> = Vec::new();
2012+
while let Some(Ok(batch)) = s.next().await {
2013+
let arr = batch
2014+
.column(1)
2015+
.as_any()
2016+
.downcast_ref::<Int64Array>()
2017+
.expect("row_num column is Int64");
2018+
row_nums.extend(arr.values().iter().copied());
2019+
}
2020+
assert_eq!(row_nums, vec![3, 4, 5]);
2021+
}
2022+
18702023
#[tokio::test]
18712024
async fn test_prune_on_partition_statistics_with_dynamic_expression() {
18722025
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ use arrow::record_batch::RecordBatch;
7575
use datafusion_functions::core::getfield::GetFieldFunc;
7676
use parquet::arrow::ProjectionMask;
7777
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
78+
use parquet::arrow::is_virtual_column;
7879
use parquet::file::metadata::ParquetMetaData;
7980
use parquet::schema::types::SchemaDescriptor;
8081

@@ -560,6 +561,10 @@ pub(crate) fn build_parquet_read_plan(
560561

561562
let root_indices = &required_columns.required_columns;
562563

564+
// Virtual-column roots are silently dropped here (they have no parquet
565+
// leaf to map to); the decoder appends virtual columns to every batch
566+
// regardless of the projection mask, so `projected_schema` stays aligned.
567+
// See `build_projection_read_plan` for the symmetric mask-side filter.
563568
let mut leaf_indices =
564569
leaf_indices_for_roots(root_indices.iter().copied(), schema_descr);
565570

@@ -605,6 +610,15 @@ pub(crate) fn build_projection_read_plan(
605610
file_schema: &Schema,
606611
schema_descr: &SchemaDescriptor,
607612
) -> ParquetReadPlan {
613+
// Virtual columns have no parquet column to mask.
614+
let parquet_roots = |roots: &[usize]| -> Vec<usize> {
615+
roots
616+
.iter()
617+
.copied()
618+
.filter(|i| !is_virtual_column(file_schema.field(*i)))
619+
.collect()
620+
};
621+
608622
// fast path: if every expression is a plain Column reference, skip all
609623
// struct analysis and use root-level projection directly
610624
let exprs = exprs.into_iter().collect::<Vec<_>>();
@@ -619,7 +633,7 @@ pub(crate) fn build_projection_read_plan(
619633
root_indices.dedup();
620634

621635
let projection_mask =
622-
ProjectionMask::roots(schema_descr, root_indices.iter().copied());
636+
ProjectionMask::roots(schema_descr, parquet_roots(&root_indices));
623637
let projected_schema = Arc::new(
624638
file_schema
625639
.project(&root_indices)
@@ -649,7 +663,7 @@ pub(crate) fn build_projection_read_plan(
649663
root_indices.dedup();
650664

651665
let projection_mask =
652-
ProjectionMask::roots(schema_descr, root_indices.iter().copied());
666+
ProjectionMask::roots(schema_descr, parquet_roots(&root_indices));
653667

654668
let projected_schema = Arc::new(
655669
file_schema
@@ -682,7 +696,7 @@ pub(crate) fn build_projection_read_plan(
682696
// to match the performance of the simple path
683697
if all_struct_accesses.is_empty() {
684698
let projection_mask =
685-
ProjectionMask::roots(schema_descr, all_root_indices.iter().copied());
699+
ProjectionMask::roots(schema_descr, parquet_roots(&all_root_indices));
686700
let projected_schema = Arc::new(
687701
file_schema
688702
.project(&all_root_indices)
@@ -697,7 +711,7 @@ pub(crate) fn build_projection_read_plan(
697711

698712
let leaf_indices = {
699713
let mut out =
700-
leaf_indices_for_roots(all_root_indices.iter().copied(), schema_descr);
714+
leaf_indices_for_roots(parquet_roots(&all_root_indices), schema_descr);
701715
let struct_leaf_indices =
702716
resolve_struct_field_leaves(&all_struct_accesses, file_schema, schema_descr);
703717

0 commit comments

Comments
 (0)