Skip to content

Commit 5f7fdfb

Browse files
committed
[SPARK-57064][SQL] Widen bucketing rule pattern matches to use FileSourceScanLike trait
1 parent 29fdcef commit 5f7fdfb

2 files changed

Lines changed: 5 additions & 5 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
2424
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
2525
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, PartitioningCollection}
2626
import org.apache.spark.sql.catalyst.rules.Rule
27-
import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan}
27+
import org.apache.spark.sql.execution.{FileSourceScanExec, FileSourceScanLike, FilterExec, ProjectExec, SparkPlan}
2828
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ShuffledHashJoinExec, ShuffledJoin, SortMergeJoinExec}
2929

3030
/**
@@ -119,13 +119,13 @@ object ExtractJoinWithBuckets {
119119
if (j.buildSide == BuildLeft) hasScanOperation(j.right) else hasScanOperation(j.left)
120120
case j: BroadcastNestedLoopJoinExec =>
121121
if (j.buildSide == BuildLeft) hasScanOperation(j.right) else hasScanOperation(j.left)
122-
case f: FileSourceScanExec => f.relation.bucketSpec.nonEmpty
122+
case f: FileSourceScanLike => f.relation.bucketSpec.nonEmpty
123123
case _ => false
124124
}
125125

126126
private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = {
127127
plan.collectFirst {
128-
case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty &&
128+
case f: FileSourceScanLike if f.relation.bucketSpec.nonEmpty &&
129129
f.optionalNumCoalescedBuckets.isEmpty =>
130130
f.relation.bucketSpec.get
131131
}

sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.bucketing
1919

2020
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution}
2121
import org.apache.spark.sql.catalyst.rules.Rule
22-
import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SortExec, SparkPlan}
22+
import org.apache.spark.sql.execution.{FileSourceScanExec, FileSourceScanLike, FilterExec, ProjectExec, SortExec, SparkPlan}
2323
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
2424
import org.apache.spark.sql.execution.exchange.Exchange
2525

@@ -142,7 +142,7 @@ object DisableUnnecessaryBucketedScan extends Rule[SparkPlan] {
142142

143143
def apply(plan: SparkPlan): SparkPlan = {
144144
lazy val hasBucketedScan = plan.exists {
145-
case scan: FileSourceScanExec => scan.bucketedScan
145+
case scan: FileSourceScanLike => scan.bucketedScan
146146
case _ => false
147147
}
148148

0 commit comments

Comments
 (0)