Skip to content

Commit d8c1667

Browse files
committed
feat: push down IS NULL / IS NOT NULL on struct columns into Parquet scan
1 parent c17c87c commit d8c1667

3 files changed

Lines changed: 746 additions & 19 deletions

File tree

datafusion/core/tests/parquet/filter_pushdown.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
//! select * from data limit 10;
2727
//! ```
2828
29+
use std::sync::Arc;
30+
2931
use arrow::compute::concat_batches;
3032
use arrow::record_batch::RecordBatch;
3133
use datafusion::physical_plan::collect;
@@ -617,6 +619,91 @@ fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
617619
}
618620
}
619621

622+
/// Verifies that `struct_col IS NOT NULL` is pushed into the parquet scan
623+
/// and produces correct results, including when the struct is non-null
624+
/// but all leaf fields are null.
625+
#[tokio::test]
626+
async fn struct_is_not_null_pushdown() {
627+
use arrow::array::{Int32Array, StructArray};
628+
use arrow::datatypes::{DataType, Field, Fields, Schema};
629+
630+
let fields = Fields::from(vec![
631+
Field::new("a", DataType::Int32, true),
632+
Field::new("b", DataType::Int32, true),
633+
Field::new("c", DataType::Int32, true),
634+
]);
635+
let schema = Arc::new(Schema::new(vec![
636+
Field::new("id", DataType::Int32, false),
637+
Field::new("s", DataType::Struct(fields.clone()), true),
638+
]));
639+
640+
// Row 0: s={a:1, b:10, c:100} → struct NOT null, leaves NOT null
641+
// Row 1: s={a:NULL, b:NULL, c:NULL} → struct NOT null, ALL leaves null
642+
// Row 2: s=NULL → struct IS null
643+
// Row 3: s={a:3, b:NULL, c:300} → struct NOT null, mixed leaves
644+
let ids = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
645+
let a = Arc::new(Int32Array::from(vec![Some(1), None, None, Some(3)]));
646+
let b = Arc::new(Int32Array::from(vec![Some(10), None, None, None]));
647+
let c = Arc::new(Int32Array::from(vec![Some(100), None, None, Some(300)]));
648+
let struct_array = StructArray::try_new(
649+
fields,
650+
vec![a, b, c],
651+
Some(vec![true, true, false, true].into()),
652+
)
653+
.unwrap();
654+
let batch =
655+
RecordBatch::try_new(schema.clone(), vec![ids, Arc::new(struct_array)]).unwrap();
656+
657+
let tempdir = TempDir::new().unwrap();
658+
let props = WriterProperties::builder().build();
659+
let test_file = TestParquetFile::try_new(
660+
tempdir.path().join("struct.parquet"),
661+
props,
662+
vec![batch],
663+
)
664+
.unwrap();
665+
666+
let scan_options = ParquetScanOptions {
667+
pushdown_filters: true,
668+
enable_page_index: false,
669+
reorder_filters: false,
670+
};
671+
let ctx = SessionContext::new_with_config(scan_options.config());
672+
let filter = col("s").is_not_null();
673+
let exec = test_file.create_scan(&ctx, Some(filter)).await.unwrap();
674+
let result = collect(exec.clone(), ctx.task_ctx()).await.unwrap();
675+
676+
// Verify correct rows: 1, 2, 4 (row 2 has non-null struct with null leaves)
677+
let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
678+
assert_eq!(
679+
total_rows, 3,
680+
"Expected 3 rows (struct non-null), got {total_rows}"
681+
);
682+
683+
let batch = concat_batches(&test_file.schema(), &result).unwrap();
684+
let id_col = batch
685+
.column(0)
686+
.as_any()
687+
.downcast_ref::<Int32Array>()
688+
.unwrap();
689+
let ids: Vec<i32> = (0..id_col.len()).map(|i| id_col.value(i)).collect();
690+
assert_eq!(
691+
ids,
692+
vec![1, 2, 4],
693+
"Row 2 (null-leaves, non-null struct) must be included"
694+
);
695+
696+
// Verify pushdown metrics: filter was evaluated at the row level
697+
let metrics = TestParquetFile::parquet_metrics(&exec).expect("found metrics");
698+
let pushdown_rows_pruned = get_value(&metrics, "pushdown_rows_pruned");
699+
let pushdown_rows_matched = get_value(&metrics, "pushdown_rows_matched");
700+
assert_eq!(
701+
pushdown_rows_matched, 3,
702+
"Expected 3 rows matched by pushdown"
703+
);
704+
assert_eq!(pushdown_rows_pruned, 1, "Expected 1 row pruned by pushdown");
705+
}
706+
620707
#[tokio::test]
621708
async fn predicate_cache_default() -> datafusion_common::Result<()> {
622709
let ctx = SessionContext::new();

datafusion/datasource-parquet/benches/parquet_struct_filter_pushdown.rs

Lines changed: 220 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,5 +349,224 @@ fn build_struct_batch(
349349
)?)
350350
}
351351

352-
criterion_group!(benches, parquet_struct_filter_pushdown);
352+
// ---------------------------------------------------------------------------
353+
// Benchmark: struct IS NOT NULL pushdown
354+
//
355+
// Uses a separate dataset with a NULLABLE struct containing many leaf fields.
356+
// Compares scanning with vs without row-level pushdown for `s IS NOT NULL`.
357+
//
358+
// The key metric: with pushdown, only 1 leaf column is read for the null
359+
// check (via definition levels); without pushdown, ALL leaf columns are
360+
// decoded to materialize the struct and then check nullability post-scan.
361+
// ---------------------------------------------------------------------------
362+
363+
const NULLABLE_STRUCT_COLUMN_NAME: &str = "s";
364+
/// Number of leaf fields inside the nullable struct.
365+
/// More fields = bigger difference between pushdown and no-pushdown.
366+
const NUM_STRUCT_FIELDS: usize = 12;
367+
/// Fraction of rows where the struct is null (~10%).
368+
const NULL_FRACTION: usize = 10;
369+
370+
struct NullableBenchmarkDataset {
371+
_tempdir: TempDir,
372+
file_path: PathBuf,
373+
}
374+
375+
impl NullableBenchmarkDataset {
376+
fn path(&self) -> &Path {
377+
&self.file_path
378+
}
379+
}
380+
381+
static NULLABLE_DATASET: LazyLock<NullableBenchmarkDataset> = LazyLock::new(|| {
382+
create_nullable_dataset()
383+
.expect("failed to prepare nullable struct benchmark dataset")
384+
});
385+
386+
fn nullable_struct_schema() -> SchemaRef {
387+
let struct_fields: Vec<Field> = (0..NUM_STRUCT_FIELDS)
388+
.map(|i| Field::new(format!("f{i}"), DataType::Utf8, true))
389+
.collect();
390+
Arc::new(Schema::new(vec![
391+
Field::new(ID_COLUMN_NAME, DataType::Int32, false),
392+
Field::new(
393+
NULLABLE_STRUCT_COLUMN_NAME,
394+
DataType::Struct(Fields::from(struct_fields)),
395+
true,
396+
),
397+
]))
398+
}
399+
400+
fn create_nullable_dataset() -> datafusion_common::Result<NullableBenchmarkDataset> {
401+
let tempdir = TempDir::new()?;
402+
let file_path = tempdir.path().join("struct_nullable_filter.parquet");
403+
404+
let schema = nullable_struct_schema();
405+
let writer_props = WriterProperties::builder()
406+
.set_max_row_group_row_count(Some(ROW_GROUP_ROW_COUNT))
407+
.build();
408+
409+
let mut writer = ArrowWriter::try_new(
410+
std::fs::File::create(&file_path)?,
411+
Arc::clone(&schema),
412+
Some(writer_props),
413+
)?;
414+
415+
for rg_idx in 0..TOTAL_ROW_GROUPS {
416+
let batch = build_nullable_struct_batch(&schema, rg_idx, ROW_GROUP_ROW_COUNT)?;
417+
writer.write(&batch)?;
418+
}
419+
420+
writer.close()?;
421+
Ok(NullableBenchmarkDataset {
422+
_tempdir: tempdir,
423+
file_path,
424+
})
425+
}
426+
427+
fn build_nullable_struct_batch(
428+
schema: &SchemaRef,
429+
_rg_idx: usize,
430+
len: usize,
431+
) -> datafusion_common::Result<RecordBatch> {
432+
use arrow::array::NullBufferBuilder;
433+
434+
let large_string: String = "x".repeat(LARGE_STRING_LEN);
435+
let id_array = Arc::new(Int32Array::from_iter_values(0..len as i32));
436+
437+
// Build struct fields — each leaf is a large string column
438+
let fields: Vec<(Arc<Field>, Arc<dyn arrow::array::Array>)> = (0..NUM_STRUCT_FIELDS)
439+
.map(|i| {
440+
let mut builder = StringBuilder::new();
441+
for _ in 0..len {
442+
builder.append_value(&large_string);
443+
}
444+
(
445+
Arc::new(Field::new(format!("f{i}"), DataType::Utf8, true)),
446+
Arc::new(builder.finish()) as Arc<dyn arrow::array::Array>,
447+
)
448+
})
449+
.collect();
450+
451+
// ~10% of rows have null struct
452+
let mut null_buffer = NullBufferBuilder::new(len);
453+
for row in 0..len {
454+
null_buffer.append(row % NULL_FRACTION != 0);
455+
}
456+
let struct_array = StructArray::try_new(
457+
Fields::from(
458+
fields
459+
.iter()
460+
.map(|(f, _)| Arc::clone(f))
461+
.collect::<Vec<_>>(),
462+
),
463+
fields.into_iter().map(|(_, a)| a).collect(),
464+
null_buffer.finish(),
465+
)?;
466+
467+
Ok(RecordBatch::try_new(
468+
Arc::clone(schema),
469+
vec![id_array, Arc::new(struct_array)],
470+
)?)
471+
}
472+
473+
/// `s IS NOT NULL`
474+
fn struct_is_not_null_expr() -> Expr {
475+
col(NULLABLE_STRUCT_COLUMN_NAME).is_not_null()
476+
}
477+
478+
/// `s IS NULL`
479+
fn struct_is_null_expr() -> Expr {
480+
col(NULLABLE_STRUCT_COLUMN_NAME).is_null()
481+
}
482+
483+
fn expected_non_null_rows() -> usize {
484+
// rows where row % NULL_FRACTION != 0
485+
TOTAL_ROWS - TOTAL_ROWS / NULL_FRACTION
486+
}
487+
488+
fn expected_null_rows() -> usize {
489+
TOTAL_ROWS / NULL_FRACTION
490+
}
491+
492+
fn parquet_struct_null_check_pushdown(c: &mut Criterion) {
493+
let dataset_path = NULLABLE_DATASET.path().to_owned();
494+
let mut group = c.benchmark_group("parquet_struct_null_check_pushdown");
495+
group.throughput(Throughput::Elements(TOTAL_ROWS as u64));
496+
497+
// Scenario 1: SELECT * FROM t WHERE s IS NOT NULL — no pushdown
498+
// Without pushdown, ALL 12 leaf columns of the struct are decoded
499+
// to materialize the struct, then IS NOT NULL is checked post-scan.
500+
group.bench_function("select_star/no_pushdown", |b| {
501+
let file_schema = setup_reader(&dataset_path);
502+
let predicate = logical2physical(&struct_is_not_null_expr(), &file_schema);
503+
b.iter(|| {
504+
let matched = scan(&dataset_path, &predicate, false, ProjectionMask::all())
505+
.expect("scan succeeded");
506+
assert_eq!(matched, expected_non_null_rows());
507+
});
508+
});
509+
510+
// Scenario 2: SELECT * FROM t WHERE s IS NOT NULL — with pushdown
511+
// With pushdown, only 1 leaf column is read for the null check.
512+
// Remaining leaves are read only for matched rows.
513+
group.bench_function("select_star/with_pushdown", |b| {
514+
let file_schema = setup_reader(&dataset_path);
515+
let predicate = logical2physical(&struct_is_not_null_expr(), &file_schema);
516+
b.iter(|| {
517+
let matched = scan(&dataset_path, &predicate, true, ProjectionMask::all())
518+
.expect("scan succeeded");
519+
assert_eq!(matched, expected_non_null_rows());
520+
});
521+
});
522+
523+
// Scenario 3: SELECT id FROM t WHERE s IS NOT NULL — no pushdown
524+
// Without pushdown we must read all columns to materialize the struct
525+
// for post-scan IS NOT NULL evaluation, so ProjectionMask::all() is
526+
// correct here even though the query only needs `id` in the output.
527+
group.bench_function("select_id/no_pushdown", |b| {
528+
let file_schema = setup_reader(&dataset_path);
529+
let predicate = logical2physical(&struct_is_not_null_expr(), &file_schema);
530+
b.iter(|| {
531+
let matched = scan(&dataset_path, &predicate, false, ProjectionMask::all())
532+
.expect("scan succeeded");
533+
assert_eq!(matched, expected_non_null_rows());
534+
});
535+
});
536+
537+
// Scenario 4: SELECT id FROM t WHERE s IS NOT NULL — with pushdown
538+
// Best case: pushdown reads 1 leaf for null check, output reads only `id`.
539+
// The 12 struct leaves are never decoded at all.
540+
group.bench_function("select_id/with_pushdown", |b| {
541+
let file_schema = setup_reader(&dataset_path);
542+
let predicate = logical2physical(&struct_is_not_null_expr(), &file_schema);
543+
let id_only = id_projection(&dataset_path);
544+
b.iter(|| {
545+
let matched = scan(&dataset_path, &predicate, true, id_only.clone())
546+
.expect("scan succeeded");
547+
assert_eq!(matched, expected_non_null_rows());
548+
});
549+
});
550+
551+
// Scenario 5: SELECT id FROM t WHERE s IS NULL — with pushdown
552+
// Verify IS NULL pushdown works symmetrically with IS NOT NULL.
553+
group.bench_function("select_id_is_null/with_pushdown", |b| {
554+
let file_schema = setup_reader(&dataset_path);
555+
let predicate = logical2physical(&struct_is_null_expr(), &file_schema);
556+
let id_only = id_projection(&dataset_path);
557+
b.iter(|| {
558+
let matched = scan(&dataset_path, &predicate, true, id_only.clone())
559+
.expect("scan succeeded");
560+
assert_eq!(matched, expected_null_rows());
561+
});
562+
});
563+
564+
group.finish();
565+
}
566+
567+
criterion_group!(
568+
benches,
569+
parquet_struct_filter_pushdown,
570+
parquet_struct_null_check_pushdown
571+
);
353572
criterion_main!(benches);

0 commit comments

Comments
 (0)