Skip to content

Commit 2188d26

Browse files
adriangbLiaCastaneda
authored andcommitted
Fix HashJoinExec sideways information passing for partitioned queries (apache#17197)
(cherry picked from commit 64bc58d)
1 parent 1369c45 commit 2188d26

3 files changed

Lines changed: 699 additions & 102 deletions

File tree

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 228 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use datafusion_physical_optimizer::{
4343
use datafusion_physical_plan::{
4444
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
4545
coalesce_batches::CoalesceBatchesExec,
46+
coalesce_partitions::CoalescePartitionsExec,
4647
filter::FilterExec,
4748
repartition::RepartitionExec,
4849
sorts::sort::SortExec,
@@ -226,7 +227,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
226227
format_plan_for_test(&plan),
227228
@r"
228229
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb]
229-
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)], filter=[d@0 >= aa AND d@0 <= ab]
230+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
230231
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
231232
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ]
232233
"
@@ -754,7 +755,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
754755
None,
755756
&JoinType::Inner,
756757
None,
757-
PartitionMode::Partitioned,
758+
PartitionMode::CollectLeft,
758759
datafusion_common::NullEquality::NullEqualsNothing,
759760
)
760761
.unwrap(),
@@ -766,12 +767,12 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
766767
@r"
767768
OptimizationTest:
768769
input:
769-
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
770+
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
770771
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
771772
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
772773
output:
773774
Ok:
774-
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
775+
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
775776
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
776777
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
777778
",
@@ -800,13 +801,233 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
800801
insta::assert_snapshot!(
801802
format!("{}", format_plan_for_test(&plan)),
802803
@r"
803-
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], filter=[a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb]
804+
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
804805
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
805806
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
806807
"
807808
);
808809
}
809810

811+
#[tokio::test]
812+
async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
813+
use datafusion_common::JoinType;
814+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
815+
816+
// Rouugh plan we're trying to recreate:
817+
// COPY (select i as k from generate_series(1, 10000000) as t(i))
818+
// TO 'test_files/scratch/push_down_filter/t1.parquet'
819+
// STORED AS PARQUET;
820+
// COPY (select i as k, i as v from generate_series(1, 10000000) as t(i))
821+
// TO 'test_files/scratch/push_down_filter/t2.parquet'
822+
// STORED AS PARQUET;
823+
// create external table t1 stored as parquet location 'test_files/scratch/push_down_filter/t1.parquet';
824+
// create external table t2 stored as parquet location 'test_files/scratch/push_down_filter/t2.parquet';
825+
// explain
826+
// select *
827+
// from t1
828+
// join t2 on t1.k = t2.k;
829+
// +---------------+------------------------------------------------------------+
830+
// | plan_type | plan |
831+
// +---------------+------------------------------------------------------------+
832+
// | physical_plan | ┌───────────────────────────┐ |
833+
// | | │ CoalesceBatchesExec │ |
834+
// | | │ -------------------- │ |
835+
// | | │ target_batch_size: │ |
836+
// | | │ 8192 │ |
837+
// | | └─────────────┬─────────────┘ |
838+
// | | ┌─────────────┴─────────────┐ |
839+
// | | │ HashJoinExec │ |
840+
// | | │ -------------------- ├──────────────┐ |
841+
// | | │ on: (k = k) │ │ |
842+
// | | └─────────────┬─────────────┘ │ |
843+
// | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
844+
// | | │ CoalesceBatchesExec ││ CoalesceBatchesExec │ |
845+
// | | │ -------------------- ││ -------------------- │ |
846+
// | | │ target_batch_size: ││ target_batch_size: │ |
847+
// | | │ 8192 ││ 8192 │ |
848+
// | | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
849+
// | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
850+
// | | │ RepartitionExec ││ RepartitionExec │ |
851+
// | | │ -------------------- ││ -------------------- │ |
852+
// | | │ partition_count(in->out): ││ partition_count(in->out): │ |
853+
// | | │ 12 -> 12 ││ 12 -> 12 │ |
854+
// | | │ ││ │ |
855+
// | | │ partitioning_scheme: ││ partitioning_scheme: │ |
856+
// | | │ Hash([k@0], 12) ││ Hash([k@0], 12) │ |
857+
// | | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
858+
// | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
859+
// | | │ DataSourceExec ││ DataSourceExec │ |
860+
// | | │ -------------------- ││ -------------------- │ |
861+
// | | │ files: 12 ││ files: 12 │ |
862+
// | | │ format: parquet ││ format: parquet │ |
863+
// | | │ ││ predicate: true │ |
864+
// | | └───────────────────────────┘└───────────────────────────┘ |
865+
// | | |
866+
// +---------------+------------------------------------------------------------+
867+
868+
// Create build side with limited values
869+
let build_batches = vec![record_batch!(
870+
("a", Utf8, ["aa", "ab"]),
871+
("b", Utf8, ["ba", "bb"]),
872+
("c", Float64, [1.0, 2.0]) // Extra column not used in join
873+
)
874+
.unwrap()];
875+
let build_side_schema = Arc::new(Schema::new(vec![
876+
Field::new("a", DataType::Utf8, false),
877+
Field::new("b", DataType::Utf8, false),
878+
Field::new("c", DataType::Float64, false),
879+
]));
880+
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
881+
.with_support(true)
882+
.with_batches(build_batches)
883+
.build();
884+
885+
// Create probe side with more values
886+
let probe_batches = vec![record_batch!(
887+
("a", Utf8, ["aa", "ab", "ac", "ad"]),
888+
("b", Utf8, ["ba", "bb", "bc", "bd"]),
889+
("e", Float64, [1.0, 2.0, 3.0, 4.0]) // Extra column not used in join
890+
)
891+
.unwrap()];
892+
let probe_side_schema = Arc::new(Schema::new(vec![
893+
Field::new("a", DataType::Utf8, false),
894+
Field::new("b", DataType::Utf8, false),
895+
Field::new("e", DataType::Float64, false),
896+
]));
897+
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
898+
.with_support(true)
899+
.with_batches(probe_batches)
900+
.build();
901+
902+
// Create RepartitionExec nodes for both sides with hash partitioning on join keys
903+
let partition_count = 12;
904+
905+
// Build side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec
906+
let build_hash_exprs = vec![
907+
col("a", &build_side_schema).unwrap(),
908+
col("b", &build_side_schema).unwrap(),
909+
];
910+
let build_repartition = Arc::new(
911+
RepartitionExec::try_new(
912+
build_scan,
913+
Partitioning::Hash(build_hash_exprs, partition_count),
914+
)
915+
.unwrap(),
916+
);
917+
let build_coalesce = Arc::new(CoalesceBatchesExec::new(build_repartition, 8192));
918+
919+
// Probe side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec
920+
let probe_hash_exprs = vec![
921+
col("a", &probe_side_schema).unwrap(),
922+
col("b", &probe_side_schema).unwrap(),
923+
];
924+
let probe_repartition = Arc::new(
925+
RepartitionExec::try_new(
926+
probe_scan,
927+
Partitioning::Hash(probe_hash_exprs, partition_count),
928+
)
929+
.unwrap(),
930+
);
931+
let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192));
932+
933+
// Create HashJoinExec with partitioned inputs
934+
let on = vec![
935+
(
936+
col("a", &build_side_schema).unwrap(),
937+
col("a", &probe_side_schema).unwrap(),
938+
),
939+
(
940+
col("b", &build_side_schema).unwrap(),
941+
col("b", &probe_side_schema).unwrap(),
942+
),
943+
];
944+
let hash_join = Arc::new(
945+
HashJoinExec::try_new(
946+
build_coalesce,
947+
probe_coalesce,
948+
on,
949+
None,
950+
&JoinType::Inner,
951+
None,
952+
PartitionMode::Partitioned,
953+
datafusion_common::NullEquality::NullEqualsNothing,
954+
)
955+
.unwrap(),
956+
);
957+
958+
// Top-level CoalesceBatchesExec
959+
let cb =
960+
Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn ExecutionPlan>;
961+
// Top-level CoalesceParititionsExec
962+
let plan = Arc::new(CoalescePartitionsExec::new(cb)) as Arc<dyn ExecutionPlan>;
963+
964+
// expect the predicate to be pushed down into the probe side DataSource
965+
insta::assert_snapshot!(
966+
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
967+
@r"
968+
OptimizationTest:
969+
input:
970+
- CoalescePartitionsExec
971+
- CoalesceBatchesExec: target_batch_size=8192
972+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
973+
- CoalesceBatchesExec: target_batch_size=8192
974+
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
975+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
976+
- CoalesceBatchesExec: target_batch_size=8192
977+
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
978+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
979+
output:
980+
Ok:
981+
- CoalescePartitionsExec
982+
- CoalesceBatchesExec: target_batch_size=8192
983+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
984+
- CoalesceBatchesExec: target_batch_size=8192
985+
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
986+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
987+
- CoalesceBatchesExec: target_batch_size=8192
988+
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
989+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
990+
"
991+
);
992+
993+
// Actually apply the optimization to the plan and execute to see the filter in action
994+
let mut config = ConfigOptions::default();
995+
config.execution.parquet.pushdown_filters = true;
996+
config.optimizer.enable_dynamic_filter_pushdown = true;
997+
let plan = FilterPushdown::new_post_optimization()
998+
.optimize(plan, &config)
999+
.unwrap();
1000+
let config = SessionConfig::new().with_batch_size(10);
1001+
let session_ctx = SessionContext::new_with_config(config);
1002+
session_ctx.register_object_store(
1003+
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
1004+
Arc::new(InMemory::new()),
1005+
);
1006+
let state = session_ctx.state();
1007+
let task_ctx = state.task_ctx();
1008+
let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
1009+
// Iterate one batch
1010+
if let Some(batch_result) = stream.next().await {
1011+
batch_result.unwrap();
1012+
}
1013+
1014+
// Now check what our filter looks like
1015+
insta::assert_snapshot!(
1016+
format!("{}", format_plan_for_test(&plan)),
1017+
@r"
1018+
- CoalescePartitionsExec
1019+
- CoalesceBatchesExec: target_batch_size=8192
1020+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
1021+
- CoalesceBatchesExec: target_batch_size=8192
1022+
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1023+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1024+
- CoalesceBatchesExec: target_batch_size=8192
1025+
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1026+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ]
1027+
"
1028+
);
1029+
}
1030+
8101031
#[tokio::test]
8111032
async fn test_nested_hashjoin_dynamic_filter_pushdown() {
8121033
use datafusion_common::JoinType;
@@ -946,9 +1167,9 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
9461167
insta::assert_snapshot!(
9471168
format!("{}", format_plan_for_test(&plan)),
9481169
@r"
949-
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)], filter=[b@0 >= aa AND b@0 <= ab]
1170+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
9501171
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true
951-
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)], filter=[d@0 >= ca AND d@0 <= ce]
1172+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
9521173
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ b@0 >= aa AND b@0 <= ab ]
9531174
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= ca AND d@0 <= ce ]
9541175
"

0 commit comments

Comments
 (0)