Skip to content

Commit f2d6cff

Browse files
committed
remove CometScanExec.scala, CometBatchScanExec.scala, CometScanRule.scala, CometExecRule.scala, and related stuff
1 parent d052a6f commit f2d6cff

40 files changed

Lines changed: 428 additions & 3761 deletions

spark/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -93,17 +93,6 @@ object CometConf extends ShimCometConf {
9393
.booleanConf
9494
.createWithEnvVarOrDefault("ENABLE_COMET", true)
9595

96-
val COMET_USE_PLANNER: ConfigEntry[Boolean] = conf("spark.comet.planner.enabled")
97-
.category(CATEGORY_EXEC)
98-
.doc(
99-
"When true, Comet registers the single-rule CometPlanner in place of the legacy " +
100-
"CometScanRule + CometExecRule pair. Default true. Flip to false to run the legacy " +
101-
"path as a rollback. The choice is evaluated once at session extension injection, so " +
102-
"changes after session creation do not switch rules. The selected rules assert on this " +
103-
"flag at entry to surface configuration drift early.")
104-
.booleanConf
105-
.createWithDefault(true)
106-
10796
val COMET_NATIVE_SCAN_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.scan.enabled")
10897
.category(CATEGORY_TESTING)
10998
.doc("Whether to enable native scans. Intended for use in Comet's own test suites to " +

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 18 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -33,24 +33,21 @@ import org.apache.spark.sql.internal.SQLConf
3333

3434
import org.apache.comet.CometConf._
3535
import org.apache.comet.planner.CometPlanner
36-
import org.apache.comet.rules.{CometExecRule, CometPlanAdaptiveDynamicPruningFilters, CometReuseSubquery, CometScanRule, CometShuffleRule, CometSpark34AqeDppFallbackRule, EliminateRedundantTransitions}
36+
import org.apache.comet.rules.{CometPlanAdaptiveDynamicPruningFilters, CometReuseSubquery, EliminateRedundantTransitions}
3737
import org.apache.comet.shims.ShimCometSparkSessionExtensions
3838

3939
/**
4040
* CometDriverPlugin will register an instance of this class with Spark.
4141
*
42-
* Comet rules are injected into Spark's rule pipeline at several extension points. The execution
43-
* order differs between AQE and non-AQE paths:
42+
* Comet rules are injected into Spark's rule pipeline at several extension points:
4443
*
4544
* Non-AQE (QueryExecution.preparations):
4645
* {{{
4746
* 1. PlanDynamicPruningFilters -- Spark creates non-AQE DPP (SubqueryBroadcastExec)
4847
* 2. PlanSubqueries -- Spark creates SubqueryExec for scalar subqueries
4948
* 3. EnsureRequirements -- Spark inserts shuffles/sorts
5049
* 4. ApplyColumnarRulesAndInsertTransitions:
51-
* a. preColumnarTransitions: CometScanRule, CometExecRule
52-
* - CometExecRule.convertSubqueryBroadcasts converts SubqueryBroadcastExec to
53-
* CometSubqueryBroadcastExec for exchange reuse with Comet broadcasts
50+
* a. preColumnarTransitions: CometPlanner
5451
* b. insertTransitions: ColumnarToRow/RowToColumnar added
5552
* c. postColumnarTransitions: EliminateRedundantTransitions
5653
* 5. ReuseExchangeAndSubquery -- Spark deduplicates subqueries (sees Comet nodes)
@@ -60,28 +57,27 @@ import org.apache.comet.shims.ShimCometSparkSessionExtensions
6057
* {{{
6158
* Initial plan:
6259
* PlanAdaptiveSubqueries: creates SubqueryAdaptiveBroadcastExec (SAB) for AQE DPP
63-
* queryStagePreparationRules: CometScanRule, CometExecRule
64-
* - CometExecRule.convertSubqueryBroadcasts wraps SABs in
65-
* CometSubqueryAdaptiveBroadcastExec to prevent Spark's
66-
* PlanAdaptiveDynamicPruningFilters from replacing DPP with Literal.TrueLiteral
60+
* queryStagePreparationRules: CometPlanner
61+
* - prePass `Spark34DppFallbackPrePass` is a no-op on 3.5+
6762
*
6863
* Per stage (optimizeQueryStage + postStageCreationRules):
6964
* 1. queryStageOptimizerRules:
70-
* a. PlanAdaptiveDynamicPruningFilters (Spark) -- skips wrapped SABs
65+
* a. PlanAdaptiveDynamicPruningFilters (Spark) -- skips planner-wrapped SABs
7166
* b. ReuseAdaptiveSubquery (Spark)
7267
* c. CometPlanAdaptiveDynamicPruningFilters -- converts wrapped SABs to
7368
* CometSubqueryBroadcastExec with BroadcastQueryStageExec for broadcast reuse
7469
* d. CometReuseSubquery -- deduplicates converted subqueries
7570
* 2. postStageCreationRules -> ApplyColumnarRulesAndInsertTransitions:
76-
* a. preColumnarTransitions: CometScanRule, CometExecRule (no-ops, already converted)
71+
* a. preColumnarTransitions: CometPlanner (no-op, already converted)
7772
* b. insertTransitions
7873
* c. postColumnarTransitions: EliminateRedundantTransitions
7974
* }}}
8075
*
81-
* On Spark 3.4, injectQueryStageOptimizerRule is unavailable. CometExecRule does not wrap SABs,
82-
* and CometPlanAdaptiveDynamicPruningFilters/CometReuseSubquery are not registered. AQE DPP scans
83-
* fall back to Spark so that Spark's PlanAdaptiveDynamicPruningFilters handles them natively
84-
* (with DPP).
76+
* On Spark 3.4, `injectQueryStageOptimizerRule` is unavailable.
77+
* `CometPlanAdaptiveDynamicPruningFilters` and `CometReuseSubquery` are not registered. The
78+
* planner's `Spark34DppFallbackPrePass` arranges for Spark's own
79+
* `PlanAdaptiveDynamicPruningFilters` to succeed by tagging specific nodes with
80+
* `CometTags.SKIP_COMET`, which Phase 2 honors as `Fallback`.
8581
*/
8682
class CometSparkSessionExtensions
8783
extends (SparkSessionExtensions => Unit)
@@ -90,33 +86,17 @@ class CometSparkSessionExtensions
9086
override def apply(extensions: SparkSessionExtensions): Unit = {
9187
extensions.injectColumnar { session => CometScanColumnar(session) }
9288
extensions.injectColumnar { session => CometExecColumnar(session) }
93-
// Pre-3.5 only: tag AQE DPP regions so the conversion rules below leave them Spark-native.
94-
// Registered before CometScanRule/CometExecRule so tags are in place when conversion runs.
95-
// No-op on Spark 3.5+; see CometSpark34AqeDppFallbackRule's class docstring.
96-
injectPreSpark35QueryStagePrepRuleShim(extensions, CometSpark34AqeDppFallbackRule)
97-
if (CometConf.COMET_USE_PLANNER.get()) {
98-
extensions.injectQueryStagePrepRule { session => CometPlanner(session) }
99-
// Covers the legacy `exec=off + shuffle=on` mode that CometPlanner intentionally
100-
// leaves untouched. See CometShuffleRule for the removal plan.
101-
extensions.injectQueryStagePrepRule { session => CometShuffleRule(session) }
102-
} else {
103-
extensions.injectQueryStagePrepRule { session => CometScanRule(session) }
104-
extensions.injectQueryStagePrepRule { session => CometExecRule(session) }
105-
}
89+
extensions.injectQueryStagePrepRule { session => CometPlanner(session) }
10690
injectQueryStageOptimizerRuleShim(extensions, CometPlanAdaptiveDynamicPruningFilters)
10791
injectQueryStageOptimizerRuleShim(extensions, CometReuseSubquery)
10892
}
10993

11094
case class CometScanColumnar(session: SparkSession) extends ColumnarRule {
111-
override def preColumnarTransitions: Rule[SparkPlan] =
112-
if (CometConf.COMET_USE_PLANNER.get()) CometPlanner(session)
113-
else CometScanRule(session)
95+
override def preColumnarTransitions: Rule[SparkPlan] = CometPlanner(session)
11496
}
11597

11698
case class CometExecColumnar(session: SparkSession) extends ColumnarRule {
117-
override def preColumnarTransitions: Rule[SparkPlan] =
118-
if (CometConf.COMET_USE_PLANNER.get()) CometShuffleRule(session)
119-
else CometExecRule(session)
99+
override def preColumnarTransitions: Rule[SparkPlan] = CometPlanner(session)
120100

121101
override def postColumnarTransitions: Rule[SparkPlan] =
122102
EliminateRedundantTransitions(session)
@@ -181,7 +161,9 @@ object CometSparkSessionExtensions extends Logging {
181161
}
182162

183163
def isCometScan(op: SparkPlan): Boolean = {
184-
op.isInstanceOf[CometBatchScanExec] || op.isInstanceOf[CometScanExec]
164+
op.isInstanceOf[CometNativeScanExec] ||
165+
op.isInstanceOf[CometIcebergNativeScanExec] ||
166+
op.isInstanceOf[CometCsvNativeScanExec]
185167
}
186168

187169
def isSpark35Plus: Boolean = {

spark/src/main/scala/org/apache/comet/planner/CometPlanner.scala

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import org.apache.comet.CometConf
3232
import org.apache.comet.CometSparkSessionExtensions.isCometLoaded
3333
import org.apache.comet.planner.phases.{NormalizePrePass, Phase1LikelyComet, Phase2Decision, Phase3Emit, Spark34DppFallbackPrePass, SubqueryBroadcastRewrite}
3434
import org.apache.comet.planner.tags.CometTags
35-
import org.apache.comet.rules.RewriteJoin
3635

3736
/**
3837
* Single-pass compiler from Spark physical plans to Comet-accelerated plans. Replaces the
@@ -63,12 +62,6 @@ case class CometPlanner(session: SparkSession) extends Rule[SparkPlan] with Logg
6362
logDebug("CometPlanner skip: Comet extension not loaded")
6463
return plan
6564
}
66-
assert(
67-
CometConf.COMET_USE_PLANNER.get(conf),
68-
s"CometPlanner ran while ${CometConf.COMET_USE_PLANNER.key}=false. The legacy " +
69-
"CometScanRule + CometExecRule should be the sole rules on this path. Either " +
70-
"COMET_USE_PLANNER was flipped after session creation or CometPlanner was registered " +
71-
"by mistake.")
7265

7366
// Comet exec globally disabled OR root already converted (AQE re-entry): skip phase 1/2/3
7467
// but still run convertBlocks. AQE re-planning can graft a previously-emitted CometNativeExec
@@ -345,10 +338,6 @@ case class CometPlanner(session: SparkSession) extends Rule[SparkPlan] with Logg
345338
*/
346339
private def checkPostEmitInvariants(plan: SparkPlan): Unit = {
347340
plan.foreach { node =>
348-
assert(
349-
!node.getClass.getName.contains("CometBatchScanExec"),
350-
"CometBatchScanExec found in emitted plan. CometPlanner should emit " +
351-
s"CometIcebergNativeScanExec / CometCsvNativeScanExec directly. node=$node")
352341
assert(
353342
!node.getClass.getName.endsWith(".CometSinkPlaceHolder") &&
354343
!node.getClass.getName.endsWith(".CometScanWrapper"),
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.planner
21+
22+
import org.apache.spark.sql.execution.datasources.FileFormat
23+
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
24+
25+
/**
26+
* File-format predicates used by the V1 scan gate. Kept as a standalone object so the "strict
27+
* class equality" check is unit-testable. Lives outside `V1ScanGate` because that class scopes a
28+
* per-scan classification call; this is a pure type check.
29+
*/
30+
object FileFormats {
31+
32+
/**
33+
* True iff `fileFormat` is exactly Spark's built-in `ParquetFileFormat`. Subclasses (Delta
34+
* Lake's variant, etc.) are rejected — they may extend Parquet but rely on extra read-time
35+
* machinery the native scan doesn't implement.
36+
*/
37+
def isParquetFileFormat(fileFormat: FileFormat): Boolean =
38+
fileFormat.getClass.equals(classOf[ParquetFileFormat])
39+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.planner
21+
22+
import org.apache.spark.sql.comet._
23+
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
24+
import org.apache.spark.sql.execution._
25+
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec}
26+
import org.apache.spark.sql.execution.command.DataWritingCommandExec
27+
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
28+
import org.apache.spark.sql.execution.window.WindowExec
29+
30+
import org.apache.comet.serde.CometOperatorSerde
31+
import org.apache.comet.serde.operator._
32+
33+
/**
34+
* Map from Spark operator class to its CometOperatorSerde. The planner's Phase 1/2/3 use this to
35+
* look up the serde for a given node and drive prediction, decision, and emission via the serde's
36+
* generic interface.
37+
*
38+
* Two categories preserved from the legacy split for documentation only; the planner treats them
39+
* uniformly via `allExecs`:
40+
*
41+
* - `nativeExecs`: serdes that produce a fully native CometNativeExec at emit time.
42+
* - `sinks`: serdes whose native plan is a `Scan` operator (JVM-orchestrated wrappers like
43+
* `CometCollectLimitExec`, `CometUnionExec`).
44+
*/
45+
object OperatorRegistry {
46+
47+
val nativeExecs: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
48+
Map(
49+
classOf[ProjectExec] -> CometProjectExec,
50+
classOf[FilterExec] -> CometFilterExec,
51+
classOf[LocalLimitExec] -> CometLocalLimitExec,
52+
classOf[GlobalLimitExec] -> CometGlobalLimitExec,
53+
classOf[ExpandExec] -> CometExpandExec,
54+
classOf[GenerateExec] -> CometExplodeExec,
55+
classOf[HashAggregateExec] -> CometHashAggregateExec,
56+
classOf[ObjectHashAggregateExec] -> CometObjectHashAggregateExec,
57+
classOf[BroadcastHashJoinExec] -> CometBroadcastHashJoinExec,
58+
classOf[ShuffledHashJoinExec] -> CometHashJoinExec,
59+
classOf[SortMergeJoinExec] -> CometSortMergeJoinExec,
60+
classOf[SortExec] -> CometSortExec,
61+
classOf[LocalTableScanExec] -> CometLocalTableScanExec,
62+
classOf[WindowExec] -> CometWindowExec,
63+
classOf[DataWritingCommandExec] -> CometDataWritingCommand)
64+
65+
val sinks: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
66+
Map(
67+
classOf[CoalesceExec] -> CometCoalesceExec,
68+
classOf[CollectLimitExec] -> CometCollectLimitExec,
69+
classOf[TakeOrderedAndProjectExec] -> CometTakeOrderedAndProjectExec,
70+
classOf[UnionExec] -> CometUnionExec)
71+
72+
val allExecs: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] = nativeExecs ++ sinks
73+
}

spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala renamed to spark/src/main/scala/org/apache/comet/planner/RewriteJoin.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* under the License.
1818
*/
1919

20-
package org.apache.comet.rules
20+
package org.apache.comet.planner
2121

2222
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, JoinSelectionHelper}
2323
import org.apache.spark.sql.catalyst.plans.LeftSemi
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.planner.gates
21+
22+
import scala.collection.mutable.ListBuffer
23+
24+
import org.apache.spark.sql.types.{DataType, ShortType, StructType}
25+
26+
import org.apache.comet.{CometConf, DataTypeSupport}
27+
import org.apache.comet.serde.QueryPlanSerde.isStringCollationType
28+
import org.apache.comet.shims.CometTypeShim
29+
30+
/**
31+
* Schema-level support check used by both V1 and V2 native scan gates. Moved out of the deleted
32+
* `CometScanRule` companion. Rejects types known to be unsupported by the native scan path
33+
* (unsigned small ints when the safety check is on, collated strings, variant-shredded structs,
34+
* empty structs) and delegates everything else to `DataTypeSupport`.
35+
*/
36+
case class CometScanTypeChecker() extends DataTypeSupport with CometTypeShim {
37+
38+
override def isTypeSupported(
39+
dt: DataType,
40+
name: String,
41+
fallbackReasons: ListBuffer[String]): Boolean = {
42+
dt match {
43+
case ShortType if CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get() =>
44+
fallbackReasons += "Native Parquet scan may not handle unsigned UINT_8 correctly for " +
45+
s"$dt. Set ${CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key}=false to allow " +
46+
"native execution if your data does not contain unsigned small integers. " +
47+
CometConf.COMPAT_GUIDE
48+
false
49+
case dt if isStringCollationType(dt) =>
50+
// we don't need specific support for collation in scans, but this is a convenient
51+
// place to force the whole query to fall back to Spark for now
52+
false
53+
case s: StructType if isVariantStruct(s) =>
54+
// Spark 4.0's PushVariantIntoScan rewrites a VariantType column into a struct of typed
55+
// fields plus per-field VariantMetadata, expecting the scan to honor Parquet variant
56+
// shredding semantics. Comet's native scan does not, so fall back to Spark.
57+
fallbackReasons +=
58+
s"Unsupported $name of type VariantType (shredded; not supported by native scan)"
59+
false
60+
case s: StructType if s.fields.isEmpty =>
61+
false
62+
case _ =>
63+
super.isTypeSupported(dt, name, fallbackReasons)
64+
}
65+
}
66+
}

spark/src/main/scala/org/apache/comet/planner/gates/V1ScanGate.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,14 @@ import org.apache.spark.sql.SparkSession
2626
import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow}
2727
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
2828
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues
29-
import org.apache.spark.sql.comet.CometScanExec
3029
import org.apache.spark.sql.execution.{FileSourceScanExec, InSubqueryExec, SubqueryAdaptiveBroadcastExec}
3130
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
3231
import org.apache.spark.sql.internal.SQLConf
3332

3433
import org.apache.comet.CometConf._
3534
import org.apache.comet.CometSparkSessionExtensions.isSpark35Plus
3635
import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, isEncryptionConfigSupported}
37-
import org.apache.comet.rules.CometScanTypeChecker
36+
import org.apache.comet.planner.FileFormats
3837
import org.apache.comet.serde.operator.CometNativeScan
3938
import org.apache.comet.shims.ShimFileFormat
4039

@@ -82,7 +81,7 @@ object V1ScanGate extends Logging {
8281
return reject(s"Unsupported relation $other")
8382
}
8483

85-
if (!CometScanExec.isFileFormatSupported(r.fileFormat)) {
84+
if (!FileFormats.isParquetFileFormat(r.fileFormat)) {
8685
return reject(s"Unsupported file format ${r.fileFormat}")
8786
}
8887

0 commit comments

Comments
 (0)