@@ -190,3 +190,371 @@ impl PruningStatistics for BloomFilterStatistics {
190190 Some ( BooleanArray :: from ( vec ! [ contains] ) )
191191 }
192192}
193+
194+ #[ cfg( test) ]
195+ mod tests {
196+ use super :: * ;
197+
198+ use std:: sync:: Arc ;
199+
200+ use crate :: reader:: ParquetFileReader ;
201+ use crate :: test_util:: ExpectedPruning ;
202+ use crate :: { ParquetAccessPlan , ParquetFileMetrics , RowGroupAccessPlanFilter } ;
203+
204+ use arrow:: datatypes:: { DataType , Field , Schema } ;
205+ use datafusion_common:: Result ;
206+ use datafusion_expr:: { Expr , col, lit} ;
207+ use datafusion_physical_expr:: planner:: logical2physical;
208+ use datafusion_physical_plan:: metrics:: ExecutionPlanMetricsSet ;
209+ use datafusion_pruning:: PruningPredicate ;
210+ use object_store:: ObjectStoreExt ;
211+ use parquet:: arrow:: ParquetRecordBatchStreamBuilder ;
212+ use parquet:: arrow:: async_reader:: ParquetObjectReader ;
213+
214+ #[ tokio:: test]
215+ async fn test_row_group_bloom_filter_pruning_predicate_simple_expr ( ) {
216+ BloomFilterTest :: new_data_index_bloom_encoding_stats ( )
217+ . with_expect_all_pruned ( )
218+ // generate pruning predicate `(String = "Hello_Not_exists")`
219+ . run ( col ( r#""String""# ) . eq ( lit ( "Hello_Not_Exists" ) ) )
220+ . await
221+ }
222+
223+ #[ tokio:: test]
224+ async fn test_row_group_bloom_filter_pruning_predicate_multiple_expr ( ) {
225+ BloomFilterTest :: new_data_index_bloom_encoding_stats ( )
226+ . with_expect_all_pruned ( )
227+ // generate pruning predicate `(String = "Hello_Not_exists" OR String = "Hello_Not_exists2")`
228+ . run (
229+ lit ( "1" ) . eq ( lit ( "1" ) ) . and (
230+ col ( r#""String""# )
231+ . eq ( lit ( "Hello_Not_Exists" ) )
232+ . or ( col ( r#""String""# ) . eq ( lit ( "Hello_Not_Exists2" ) ) ) ,
233+ ) ,
234+ )
235+ . await
236+ }
237+
238+ #[ tokio:: test]
239+ async fn test_row_group_bloom_filter_pruning_predicate_multiple_expr_view ( ) {
240+ BloomFilterTest :: new_data_index_bloom_encoding_stats ( )
241+ . with_expect_all_pruned ( )
242+ // generate pruning predicate `(String = "Hello_Not_exists" OR String = "Hello_Not_exists2")`
243+ . run (
244+ lit ( "1" ) . eq ( lit ( "1" ) ) . and (
245+ col ( r#""String""# )
246+ . eq ( Expr :: Literal (
247+ ScalarValue :: Utf8View ( Some ( String :: from ( "Hello_Not_Exists" ) ) ) ,
248+ None ,
249+ ) )
250+ . or ( col ( r#""String""# ) . eq ( Expr :: Literal (
251+ ScalarValue :: Utf8View ( Some ( String :: from (
252+ "Hello_Not_Exists2" ,
253+ ) ) ) ,
254+ None ,
255+ ) ) ) ,
256+ ) ,
257+ )
258+ . await
259+ }
260+
261+ #[ tokio:: test]
262+ async fn test_row_group_bloom_filter_pruning_predicate_sql_in ( ) {
263+ // load parquet file
264+ let testdata = datafusion_common:: test_util:: parquet_test_data ( ) ;
265+ let file_name = "data_index_bloom_encoding_stats.parquet" ;
266+ let path = format ! ( "{testdata}/{file_name}" ) ;
267+ let data = bytes:: Bytes :: from ( std:: fs:: read ( path) . unwrap ( ) ) ;
268+
269+ // generate pruning predicate
270+ let schema = Schema :: new ( vec ! [ Field :: new( "String" , DataType :: Utf8 , false ) ] ) ;
271+
272+ let expr = col ( r#""String""# ) . in_list (
273+ ( 1 ..25 )
274+ . map ( |i| lit ( format ! ( "Hello_Not_Exists{i}" ) ) )
275+ . collect :: < Vec < _ > > ( ) ,
276+ false ,
277+ ) ;
278+ let expr = logical2physical ( & expr, & schema) ;
279+ let pruning_predicate =
280+ PruningPredicate :: try_new ( expr, Arc :: new ( schema) ) . unwrap ( ) ;
281+
282+ let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate (
283+ file_name,
284+ data,
285+ & pruning_predicate,
286+ )
287+ . await
288+ . unwrap ( ) ;
289+ assert ! (
290+ pruned_row_groups
291+ . access_plan( )
292+ . row_group_indexes( )
293+ . is_empty( )
294+ ) ;
295+ }
296+
297+ #[ tokio:: test]
298+ async fn test_row_group_bloom_filter_pruning_predicate_with_exists_value ( ) {
299+ BloomFilterTest :: new_data_index_bloom_encoding_stats ( )
300+ . with_expect_none_pruned ( )
301+ // generate pruning predicate `(String = "Hello")`
302+ . run ( col ( r#""String""# ) . eq ( lit ( "Hello" ) ) )
303+ . await
304+ }
305+
306+ #[ tokio:: test]
307+ async fn test_row_group_bloom_filter_pruning_predicate_with_exists_2_values ( ) {
308+ BloomFilterTest :: new_data_index_bloom_encoding_stats ( )
309+ . with_expect_none_pruned ( )
310+ // generate pruning predicate `(String = "Hello") OR (String = "the quick")`
311+ . run (
312+ col ( r#""String""# )
313+ . eq ( lit ( "Hello" ) )
314+ . or ( col ( r#""String""# ) . eq ( lit ( "the quick" ) ) ) ,
315+ )
316+ . await
317+ }
318+
319+ #[ tokio:: test]
320+ async fn test_row_group_bloom_filter_pruning_predicate_with_exists_3_values ( ) {
321+ BloomFilterTest :: new_data_index_bloom_encoding_stats ( )
322+ . with_expect_none_pruned ( )
323+ // generate pruning predicate `(String = "Hello") OR (String = "the quick") OR (String = "are you")`
324+ . run (
325+ col ( r#""String""# )
326+ . eq ( lit ( "Hello" ) )
327+ . or ( col ( r#""String""# ) . eq ( lit ( "the quick" ) ) )
328+ . or ( col ( r#""String""# ) . eq ( lit ( "are you" ) ) ) ,
329+ )
330+ . await
331+ }
332+
333+ #[ tokio:: test]
334+ async fn test_row_group_bloom_filter_pruning_predicate_with_exists_3_values_view ( ) {
335+ BloomFilterTest :: new_data_index_bloom_encoding_stats ( )
336+ . with_expect_none_pruned ( )
337+ // generate pruning predicate `(String = "Hello") OR (String = "the quick") OR (String = "are you")`
338+ . run (
339+ col ( r#""String""# )
340+ . eq ( Expr :: Literal (
341+ ScalarValue :: Utf8View ( Some ( String :: from ( "Hello" ) ) ) ,
342+ None ,
343+ ) )
344+ . or ( col ( r#""String""# ) . eq ( Expr :: Literal (
345+ ScalarValue :: Utf8View ( Some ( String :: from ( "the quick" ) ) ) ,
346+ None ,
347+ ) ) )
348+ . or ( col ( r#""String""# ) . eq ( Expr :: Literal (
349+ ScalarValue :: Utf8View ( Some ( String :: from ( "are you" ) ) ) ,
350+ None ,
351+ ) ) ) ,
352+ )
353+ . await
354+ }
355+
356+ #[ tokio:: test]
357+ async fn test_row_group_bloom_filter_pruning_predicate_with_or_not_eq ( ) {
358+ BloomFilterTest :: new_data_index_bloom_encoding_stats ( )
359+ . with_expect_none_pruned ( )
360+ // generate pruning predicate `(String = "foo") OR (String != "bar")`
361+ . run (
362+ col ( r#""String""# )
363+ . not_eq ( lit ( "foo" ) )
364+ . or ( col ( r#""String""# ) . not_eq ( lit ( "bar" ) ) ) ,
365+ )
366+ . await
367+ }
368+
369+ #[ tokio:: test]
370+ async fn test_row_group_bloom_filter_pruning_predicate_without_bloom_filter ( ) {
371+ // generate pruning predicate on a column without a bloom filter
372+ BloomFilterTest :: new_all_types ( )
373+ . with_expect_none_pruned ( )
374+ . run ( col ( r#""string_col""# ) . eq ( lit ( "0" ) ) )
375+ . await
376+ }
377+
378+ struct BloomFilterTest {
379+ file_name : String ,
380+ schema : Schema ,
381+ // which row groups are expected to be left after pruning
382+ post_pruning_row_groups : ExpectedPruning ,
383+ }
384+
385+ impl BloomFilterTest {
386+ /// Return a test for data_index_bloom_encoding_stats.parquet
387+ /// Note the values in the `String` column are:
388+ /// ```sql
389+ /// > select * from './parquet-testing/data/data_index_bloom_encoding_stats.parquet';
390+ /// +-----------+
391+ /// | String |
392+ /// +-----------+
393+ /// | Hello |
394+ /// | This is |
395+ /// | a |
396+ /// | test |
397+ /// | How |
398+ /// | are you |
399+ /// | doing |
400+ /// | today |
401+ /// | the quick |
402+ /// | brown fox |
403+ /// | jumps |
404+ /// | over |
405+ /// | the lazy |
406+ /// | dog |
407+ /// +-----------+
408+ /// ```
409+ fn new_data_index_bloom_encoding_stats ( ) -> Self {
410+ Self {
411+ file_name : String :: from ( "data_index_bloom_encoding_stats.parquet" ) ,
412+ schema : Schema :: new ( vec ! [ Field :: new( "String" , DataType :: Utf8 , false ) ] ) ,
413+ post_pruning_row_groups : ExpectedPruning :: None ,
414+ }
415+ }
416+
417+ // Return a test for alltypes_plain.parquet
418+ fn new_all_types ( ) -> Self {
419+ Self {
420+ file_name : String :: from ( "alltypes_plain.parquet" ) ,
421+ schema : Schema :: new ( vec ! [ Field :: new(
422+ "string_col" ,
423+ DataType :: Utf8 ,
424+ false ,
425+ ) ] ) ,
426+ post_pruning_row_groups : ExpectedPruning :: None ,
427+ }
428+ }
429+
430+ /// Expect all row groups to be pruned
431+ pub fn with_expect_all_pruned ( mut self ) -> Self {
432+ self . post_pruning_row_groups = ExpectedPruning :: All ;
433+ self
434+ }
435+
436+ /// Expect all row groups not to be pruned
437+ pub fn with_expect_none_pruned ( mut self ) -> Self {
438+ self . post_pruning_row_groups = ExpectedPruning :: None ;
439+ self
440+ }
441+
442+ /// Prune this file using the specified expression and check that the expected row groups are left
443+ async fn run ( self , expr : Expr ) {
444+ let Self {
445+ file_name,
446+ schema,
447+ post_pruning_row_groups,
448+ } = self ;
449+
450+ let testdata = datafusion_common:: test_util:: parquet_test_data ( ) ;
451+ let path = format ! ( "{testdata}/{file_name}" ) ;
452+ let data = bytes:: Bytes :: from ( std:: fs:: read ( path) . unwrap ( ) ) ;
453+
454+ let expr = logical2physical ( & expr, & schema) ;
455+ let pruning_predicate =
456+ PruningPredicate :: try_new ( expr, Arc :: new ( schema) ) . unwrap ( ) ;
457+
458+ let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate (
459+ & file_name,
460+ data,
461+ & pruning_predicate,
462+ )
463+ . await
464+ . unwrap ( ) ;
465+
466+ post_pruning_row_groups. assert ( & pruned_row_groups) ;
467+ }
468+ }
469+
470+ /// Evaluates the pruning predicate on the specified row groups and returns the row groups that are left
471+ async fn test_row_group_bloom_filter_pruning_predicate (
472+ file_name : & str ,
473+ data : bytes:: Bytes ,
474+ pruning_predicate : & PruningPredicate ,
475+ ) -> Result < RowGroupAccessPlanFilter > {
476+ use datafusion_datasource:: PartitionedFile ;
477+ use object_store:: ObjectMeta ;
478+
479+ let object_meta = ObjectMeta {
480+ location : object_store:: path:: Path :: parse ( file_name) . expect ( "creating path" ) ,
481+ last_modified : chrono:: DateTime :: from ( std:: time:: SystemTime :: now ( ) ) ,
482+ size : data. len ( ) as u64 ,
483+ e_tag : None ,
484+ version : None ,
485+ } ;
486+ let in_memory = object_store:: memory:: InMemory :: new ( ) ;
487+ in_memory
488+ . put ( & object_meta. location , data. into ( ) )
489+ . await
490+ . expect ( "put parquet file into in memory object store" ) ;
491+
492+ let metrics = ExecutionPlanMetricsSet :: new ( ) ;
493+ let file_metrics =
494+ ParquetFileMetrics :: new ( 0 , object_meta. location . as_ref ( ) , & metrics) ;
495+ let inner =
496+ ParquetObjectReader :: new ( Arc :: new ( in_memory) , object_meta. location . clone ( ) )
497+ . with_file_size ( object_meta. size ) ;
498+
499+ let partitioned_file = PartitionedFile :: new_from_meta ( object_meta) ;
500+
501+ let reader = ParquetFileReader {
502+ inner,
503+ file_metrics : file_metrics. clone ( ) ,
504+ partitioned_file,
505+ } ;
506+ let mut builder = ParquetRecordBatchStreamBuilder :: new ( reader) . await . unwrap ( ) ;
507+
508+ let access_plan = ParquetAccessPlan :: new_all ( builder. metadata ( ) . num_row_groups ( ) ) ;
509+ let mut pruned_row_groups = RowGroupAccessPlanFilter :: new ( access_plan) ;
510+ let literal_columns = pruning_predicate. literal_columns ( ) ;
511+ let parquet_columns: Vec < _ > = literal_columns
512+ . into_iter ( )
513+ . filter_map ( |column_name| {
514+ let ( column_idx, _) = parquet:: arrow:: parquet_column (
515+ builder. parquet_schema ( ) ,
516+ pruning_predicate. schema ( ) ,
517+ & column_name,
518+ ) ?;
519+ Some ( (
520+ column_name. to_string ( ) ,
521+ column_idx,
522+ builder. parquet_schema ( ) . column ( column_idx) . physical_type ( ) ,
523+ ) )
524+ } )
525+ . collect :: < Vec < _ > > ( ) ;
526+ let mut row_group_bloom_filters =
527+ Vec :: with_capacity ( builder. metadata ( ) . num_row_groups ( ) ) ;
528+ row_group_bloom_filters. resize_with (
529+ builder. metadata ( ) . num_row_groups ( ) ,
530+ BloomFilterStatistics :: new,
531+ ) ;
532+ for idx in pruned_row_groups. row_group_indexes ( ) {
533+ let mut bloom_filters =
534+ BloomFilterStatistics :: with_capacity ( parquet_columns. len ( ) ) ;
535+ for ( column_name, column_idx, physical_type) in & parquet_columns {
536+ let bf = match builder
537+ . get_row_group_column_bloom_filter ( idx, * column_idx)
538+ . await
539+ {
540+ Ok ( Some ( bf) ) => bf,
541+ Ok ( None ) => continue ,
542+ Err ( e) => {
543+ log:: debug!( "Ignoring error reading bloom filter: {e}" ) ;
544+ file_metrics. predicate_evaluation_errors . add ( 1 ) ;
545+ continue ;
546+ }
547+ } ;
548+ bloom_filters. insert ( column_name. clone ( ) , bf, * physical_type) ;
549+ }
550+ row_group_bloom_filters[ idx] = bloom_filters;
551+ }
552+ pruned_row_groups. prune_by_bloom_filters (
553+ pruning_predicate,
554+ & file_metrics,
555+ & row_group_bloom_filters,
556+ ) ;
557+
558+ Ok ( pruned_row_groups)
559+ }
560+ }
0 commit comments