Skip to content

Commit f5d374b

Browse files
rkrishn7LiaCastaneda
authored andcommitted
chore: Update dynamic filter formatting (apache#17647)
* chore: update dynamic filter formatting to indicate expr is placeholder * update tests * update tests (cherry picked from commit d587b8d)
1 parent e0a1211 commit f5d374b

7 files changed

Lines changed: 51 additions & 28 deletions

File tree

datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,10 +232,10 @@ impl RunQueryResult {
232232
}
233233
}
234234

235-
/// Iterate over each line in the plan and check that one of them has `DataSourceExec` and `DynamicFilterPhysicalExpr` in the same line.
235+
/// Iterate over each line in the plan and check that one of them has `DataSourceExec` and `DynamicFilter` in the same line.
236236
fn has_dynamic_filter_expr_pushdown(plan: &str) -> bool {
237237
for line in plan.lines() {
238-
if line.contains("DataSourceExec") && line.contains("DynamicFilterPhysicalExpr") {
238+
if line.contains("DataSourceExec") && line.contains("DynamicFilter") {
239239
return true;
240240
}
241241
}

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
248248
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false]
249249
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
250250
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
251-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] AND DynamicFilterPhysicalExpr [ true ]
251+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ]
252252
"
253253
);
254254

@@ -709,7 +709,7 @@ async fn test_topk_dynamic_filter_pushdown() {
709709
output:
710710
Ok:
711711
- SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], preserve_partitioning=[false]
712-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
712+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
713713
"
714714
);
715715

@@ -735,7 +735,7 @@ async fn test_topk_dynamic_filter_pushdown() {
735735
format!("{}", format_plan_for_test(&plan)),
736736
@r"
737737
- SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], preserve_partitioning=[false], filter=[b@1 > bd]
738-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ b@1 > bd ]
738+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ b@1 > bd ]
739739
"
740740
);
741741
}
@@ -792,7 +792,7 @@ async fn test_topk_dynamic_filter_pushdown_multi_column_sort() {
792792
output:
793793
Ok:
794794
- SortExec: TopK(fetch=2), expr=[b@1 ASC NULLS LAST, a@0 DESC], preserve_partitioning=[false]
795-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
795+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
796796
"
797797
);
798798

@@ -828,7 +828,7 @@ async fn test_topk_dynamic_filter_pushdown_multi_column_sort() {
828828
format!("{}", format_plan_for_test(&plan)),
829829
@r"
830830
- SortExec: TopK(fetch=2), expr=[b@1 ASC NULLS LAST, a@0 DESC], preserve_partitioning=[false], filter=[b@1 < bb OR b@1 = bb AND (a@0 IS NULL OR a@0 > ac)]
831-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ b@1 < bb OR b@1 = bb AND (a@0 IS NULL OR a@0 > ac) ]
831+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ b@1 < bb OR b@1 = bb AND (a@0 IS NULL OR a@0 > ac) ]
832832
"
833833
);
834834
// There should be no more batches
@@ -912,7 +912,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
912912
Ok:
913913
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
914914
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
915-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
915+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
916916
",
917917
);
918918

@@ -1140,7 +1140,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
11401140
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
11411141
- CoalesceBatchesExec: target_batch_size=8192
11421142
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1143-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
1143+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
11441144
"
11451145
);
11461146

@@ -1512,8 +1512,8 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
15121512
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
15131513
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true
15141514
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
1515-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
1516-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
1515+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
1516+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
15171517
",
15181518
);
15191519

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,7 @@ impl Eq for DynamicFilterPhysicalExpr {}
9898

9999
impl Display for DynamicFilterPhysicalExpr {
100100
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101-
let inner = self.current().expect("Failed to get current expression");
102-
write!(f, "DynamicFilterPhysicalExpr [ {inner} ]")
101+
self.render(f, |expr, f| write!(f, "{expr}"))
103102
}
104103
}
105104

@@ -173,6 +172,11 @@ impl DynamicFilterPhysicalExpr {
173172
}
174173
}
175174

175+
/// Get the current generation of the expression.
176+
fn current_generation(&self) -> u64 {
177+
self.inner.read().generation
178+
}
179+
176180
/// Get the current expression.
177181
/// This will return the current expression with any children
178182
/// remapped to match calls to [`PhysicalExpr::with_new_children`].
@@ -206,6 +210,26 @@ impl DynamicFilterPhysicalExpr {
206210
};
207211
Ok(())
208212
}
213+
214+
fn render(
215+
&self,
216+
f: &mut std::fmt::Formatter<'_>,
217+
render_expr: impl FnOnce(
218+
Arc<dyn PhysicalExpr>,
219+
&mut std::fmt::Formatter<'_>,
220+
) -> std::fmt::Result,
221+
) -> std::fmt::Result {
222+
let inner = self.current().map_err(|_| std::fmt::Error)?;
223+
let current_generation = self.current_generation();
224+
write!(f, "DynamicFilter [ ")?;
225+
if current_generation == 1 {
226+
write!(f, "empty")?;
227+
} else {
228+
render_expr(inner, f)?;
229+
}
230+
231+
write!(f, " ]")
232+
}
209233
}
210234

211235
impl PhysicalExpr for DynamicFilterPhysicalExpr {
@@ -295,8 +319,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
295319
}
296320

297321
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298-
let inner = self.current().map_err(|_| std::fmt::Error)?;
299-
inner.fmt_sql(f)
322+
self.render(f, |expr, f| expr.fmt_sql(f))
300323
}
301324

302325
fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {

datafusion/sqllogictest/test_files/explain_tree.slt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -392,9 +392,9 @@ physical_plan
392392
44)-----------------------------│ -------------------- ││ -------------------- │
393393
45)-----------------------------│ files: 1 ││ partition_count(in->out): │
394394
46)-----------------------------│ format: parquet ││ 1 -> 4 │
395-
47)-----------------------------│ predicate: true ││ │
396-
48)-----------------------------│ ││ partitioning_scheme: │
397-
49)-----------------------------│ ││ RoundRobinBatch(4) │
395+
47)-----------------------------│ ││ │
396+
48)-----------------------------│ predicate: ││ partitioning_scheme: │
397+
49)-----------------------------│ DynamicFilter [ empty ] ││ RoundRobinBatch(4) │
398398
50)-----------------------------└───────────────────────────┘└─────────────┬─────────────┘
399399
51)----------------------------------------------------------┌─────────────┴─────────────┐
400400
52)----------------------------------------------------------│ DataSourceExec │

datafusion/sqllogictest/test_files/limit.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -853,7 +853,7 @@ physical_plan
853853
01)ProjectionExec: expr=[1 as foo]
854854
02)--SortPreservingMergeExec: [part_key@0 ASC NULLS LAST], fetch=1
855855
03)----SortExec: TopK(fetch=1), expr=[part_key@0 ASC NULLS LAST], preserve_partitioning=[true]
856-
04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/test_limit_with_partitions/part-0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/test_limit_with_partitions/part-1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/test_limit_with_partitions/part-2.parquet]]}, projection=[part_key], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ]
856+
04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/test_limit_with_partitions/part-0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/test_limit_with_partitions/part-1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/test_limit_with_partitions/part-2.parquet]]}, projection=[part_key], file_type=parquet, predicate=DynamicFilter [ empty ]
857857

858858
query I
859859
with selection as (

datafusion/sqllogictest/test_files/push_down_filter.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ physical_plan
310310
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)]
311311
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet
312312
04)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
313-
05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilterPhysicalExpr [ true ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[]
313+
05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilter [ empty ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[]
314314

315315
statement ok
316316
drop table small_table;

datafusion/sqllogictest/test_files/topk.slt

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ explain select number, letter, age from partial_sorted order by number desc, let
316316
----
317317
physical_plan
318318
01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, letter@1 ASC NULLS LAST]
319-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ]
319+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
320320

321321

322322
# Explain variations of the above query with different orderings, and different sort prefixes.
@@ -326,43 +326,43 @@ explain select number, letter, age from partial_sorted order by age desc limit 3
326326
----
327327
physical_plan
328328
01)SortExec: TopK(fetch=3), expr=[age@2 DESC], preserve_partitioning=[false]
329-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ]
329+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
330330

331331
query TT
332332
explain select number, letter, age from partial_sorted order by number desc, letter desc limit 3;
333333
----
334334
physical_plan
335335
01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC]
336-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ]
336+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
337337

338338
query TT
339339
explain select number, letter, age from partial_sorted order by number asc limit 3;
340340
----
341341
physical_plan
342342
01)SortExec: TopK(fetch=3), expr=[number@0 ASC NULLS LAST], preserve_partitioning=[false]
343-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ]
343+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
344344

345345
query TT
346346
explain select number, letter, age from partial_sorted order by letter asc, number desc limit 3;
347347
----
348348
physical_plan
349349
01)SortExec: TopK(fetch=3), expr=[letter@1 ASC NULLS LAST, number@0 DESC], preserve_partitioning=[false]
350-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ]
350+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
351351

352352
# Explicit NULLS ordering cases (reversing the order of the NULLS on the number and letter orderings)
353353
query TT
354354
explain select number, letter, age from partial_sorted order by number desc, letter asc NULLS FIRST limit 3;
355355
----
356356
physical_plan
357357
01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC], preserve_partitioning=[false], sort_prefix=[number@0 DESC]
358-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ]
358+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
359359

360360
query TT
361361
explain select number, letter, age from partial_sorted order by number desc NULLS LAST, letter asc limit 3;
362362
----
363363
physical_plan
364364
01)SortExec: TopK(fetch=3), expr=[number@0 DESC NULLS LAST, letter@1 ASC NULLS LAST], preserve_partitioning=[false]
365-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ]
365+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
366366

367367

368368
# Verify that the sort prefix is correctly computed on the normalized ordering (removing redundant aliased columns)
@@ -372,7 +372,7 @@ explain select number, letter, age, number as column4, letter as column5 from pa
372372
physical_plan
373373
01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, letter@1 ASC NULLS LAST]
374374
02)--ProjectionExec: expr=[number@0 as number, letter@1 as letter, age@2 as age, number@0 as column4, letter@1 as column5]
375-
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ]
375+
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
376376

377377
# Verify that the sort prefix is correctly computed over normalized, order-maintaining projections (number + 1, number, number + 1, age)
378378
query TT

0 commit comments

Comments
 (0)