Skip to content

Commit 1ef4611

Browse files
author
Bhargava Vadlamani
committed
native_support_nested_loop_join
1 parent 692cc37 commit 1ef4611

4 files changed

Lines changed: 11 additions & 6 deletions

File tree

native/core/src/execution/planner.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ use arrow::row::{OwnedRow, RowConverter, SortField};
106106
use datafusion::common::utils::SingleRowListArrayBuilder;
107107
use datafusion::common::UnnestOptions;
108108
use datafusion::physical_plan::filter::FilterExec;
109+
use datafusion::physical_plan::joins::NestedLoopJoinExec;
109110
use datafusion::physical_plan::limit::GlobalLimitExec;
110111
use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
111112
use datafusion_comet_proto::spark_expression::ListLiteral;
@@ -134,7 +135,6 @@ use num::{BigInt, ToPrimitive};
134135
use object_store::path::Path;
135136
use std::cmp::max;
136137
use std::{collections::HashMap, sync::Arc};
137-
use datafusion::physical_plan::joins::NestedLoopJoinExec;
138138
use url::Url;
139139

140140
// For clippy error on type_complexity.
@@ -1202,12 +1202,12 @@ impl PhysicalPlanner {
12021202
OpStruct::BroadcastNestedLoopJoin(bnlj) => {
12031203
let (join_params, scans, shuffle_scans) = self.parse_join_parameters(
12041204
inputs,
1205-
&[],
1205+
children,
12061206
&[],
12071207
&[],
12081208
bnlj.join_type,
12091209
&bnlj.condition,
1210-
partition_count
1210+
partition_count,
12111211
)?;
12121212

12131213
let left = Arc::clone(&join_params.left.native_plan);
@@ -1218,7 +1218,7 @@ impl PhysicalPlanner {
12181218
right,
12191219
join_params.join_filter,
12201220
&join_params.join_type,
1221-
None
1221+
None,
12221222
)?);
12231223

12241224
Ok((

native/core/src/execution/planner/operator_registry.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,6 @@ fn get_operator_type(spark_operator: &Operator) -> Option<OperatorType> {
151151
OpStruct::Explode(_) => None, // Not yet in OperatorType enum
152152
OpStruct::CsvScan(_) => Some(OperatorType::CsvScan),
153153
OpStruct::ShuffleScan(_) => None, // Not yet in OperatorType enum
154-
OpStruct::BroadcastNestedLoopJoin(_) => todo!(),
154+
OpStruct::BroadcastNestedLoopJoin(_) => None,
155155
}
156156
}

spark/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,8 @@ object CometConf extends ShimCometConf {
254254
createExecEnabledConfig("broadcastExchange", defaultValue = true)
255255
val COMET_EXEC_HASH_JOIN_ENABLED: ConfigEntry[Boolean] =
256256
createExecEnabledConfig("hashJoin", defaultValue = true)
257+
val COMET_EXEC_BROADCAST_NESTED_LOOP_JOIN_ENABLED: ConfigEntry[Boolean] =
258+
createExecEnabledConfig("broadcastNestedLoopJoin", defaultValue = false)
257259
val COMET_EXEC_SORT_MERGE_JOIN_ENABLED: ConfigEntry[Boolean] =
258260
createExecEnabledConfig("sortMergeJoin", defaultValue = true)
259261
val COMET_EXEC_AGGREGATE_ENABLED: ConfigEntry[Boolean] =

spark/src/main/scala/org/apache/spark/sql/comet/operators.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2026,8 +2026,11 @@ object CometBroadcastNestedLoopJoinExec extends CometOperatorSerde[BroadcastNest
20262026

20272027
object CometBroadcastHashJoinExec extends CometOperatorSerde[HashJoin] with CometHashJoin {
20282028

2029+
override def getSupportLevel(operator: HashJoin): SupportLevel = {
2030+
Incompatible(Some("Broadcast nested loop join is experimental"))
2031+
}
20292032
override def enabledConfig: Option[ConfigEntry[Boolean]] =
2030-
Some(CometConf.COMET_EXEC_HASH_JOIN_ENABLED)
2033+
Some(CometConf.COMET_EXEC_BROADCAST_NESTED_LOOP_JOIN_ENABLED)
20312034

20322035
override def convert(
20332036
join: HashJoin,

0 commit comments

Comments
 (0)