Skip to content

Commit 0fd52b7

Browse files
authored
chore: remove dead native_iceberg_compat code path (#4363)
1 parent fbc3d2f commit 0fd52b7

97 files changed

Lines changed: 315 additions & 21124 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

benchmarks/tpc/engines/comet-hashjoin.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,5 @@ driver_class_path = ["$COMET_JAR"]
3030
"spark.executor.extraClassPath" = "$COMET_JAR"
3131
"spark.plugins" = "org.apache.spark.CometPlugin"
3232
"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
33-
"spark.comet.scan.impl" = "native_datafusion"
3433
"spark.comet.exec.replaceSortMergeJoin" = "true"
3534
"spark.comet.expression.Cast.allowIncompatible" = "true"

benchmarks/tpc/engines/comet.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,4 @@ driver_class_path = ["$COMET_JAR"]
3030
"spark.executor.extraClassPath" = "$COMET_JAR"
3131
"spark.plugins" = "org.apache.spark.CometPlugin"
3232
"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
33-
"spark.comet.scan.impl" = "native_datafusion"
3433
"spark.comet.expression.Cast.allowIncompatible" = "true"

dev/diffs/3.4.3.diff

Lines changed: 27 additions & 64 deletions
Large diffs are not rendered by default.

dev/diffs/3.5.8.diff

Lines changed: 35 additions & 84 deletions
Large diffs are not rendered by default.

dev/diffs/4.0.2.diff

Lines changed: 26 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -739,10 +739,10 @@ index 9c529d14221..ab2850b5d68 100644
739739
}
740740
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
741741
new file mode 100644
742-
index 00000000000..5691536c114
742+
index 00000000000..4b31bea33de
743743
--- /dev/null
744744
+++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
745-
@@ -0,0 +1,45 @@
745+
@@ -0,0 +1,42 @@
746746
+/*
747747
+ * Licensed to the Apache Software Foundation (ASF) under one or more
748748
+ * contributor license agreements. See the NOTICE file distributed with
@@ -771,9 +771,6 @@ index 00000000000..5691536c114
771771
+ * Tests with this tag will be ignored when Comet is enabled (e.g., via `ENABLE_COMET`).
772772
+ */
773773
+case class IgnoreComet(reason: String) extends Tag("DisableComet")
774-
+case class IgnoreCometNativeIcebergCompat(reason: String) extends Tag("DisableComet")
775-
+case class IgnoreCometNativeDataFusion(reason: String) extends Tag("DisableComet")
776-
+case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet")
777774
+
778775
+/**
779776
+ * Helper trait that disables Comet for all tests regardless of default config values.
@@ -1199,7 +1196,7 @@ index 0df7f806272..92390bd819f 100644
11991196

12001197
test("non-matching optional group") {
12011198
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
1202-
index 2e33f6505ab..54f5081e10a 100644
1199+
index 2e33f6505ab..fc1a2c8f964 100644
12031200
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
12041201
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
12051202
@@ -23,10 +23,11 @@ import org.apache.spark.SparkRuntimeException
@@ -1220,7 +1217,7 @@ index 2e33f6505ab..54f5081e10a 100644
12201217
_.asInstanceOf[FileScanRDD].filePartitions.forall(
12211218
_.files.forall(_.urlEncodedPath.contains("p=0"))))
12221219
+ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
1223-
+ fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _)))) =>
1220+
+ fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
12241221
+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
12251222
+ fs.inputRDDs().forall(
12261223
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(
@@ -2544,14 +2541,14 @@ index cd6f41b4ef4..4b6a17344bc 100644
25442541
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
25452542
)
25462543
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
2547-
index 6080a5e8e4b..f5dadef89ae 100644
2544+
index 6080a5e8e4b..23a451d5bcf 100644
25482545
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
25492546
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
25502547
@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType
25512548

25522549
import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException}
25532550
import org.apache.spark.sql._
2554-
+import org.apache.spark.sql.IgnoreCometNativeScan
2551+
+import org.apache.spark.sql.IgnoreComet
25552552
import org.apache.spark.sql.catalyst.dsl.expressions._
25562553
import org.apache.spark.sql.catalyst.expressions._
25572554
import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints
@@ -2574,7 +2571,7 @@ index 6080a5e8e4b..f5dadef89ae 100644
25742571

25752572
- test("Filters should be pushed down for vectorized Parquet reader at row group level") {
25762573
+ test("Filters should be pushed down for vectorized Parquet reader at row group level",
2577-
+ IgnoreCometNativeScan("Native scans do not support the tested accumulator")) {
2574+
+ IgnoreComet("Native scans do not support the tested accumulator")) {
25782575
import testImplicits._
25792576

25802577
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
@@ -2609,7 +2606,7 @@ index 6080a5e8e4b..f5dadef89ae 100644
26092606
}
26102607

26112608
- test("filter pushdown - StringPredicate") {
2612-
+ test("filter pushdown - StringPredicate", IgnoreCometNativeScan("cannot be pushed down")) {
2609+
+ test("filter pushdown - StringPredicate", IgnoreComet("cannot be pushed down")) {
26132610
import testImplicits._
26142611
// keep() should take effect on StartsWith/EndsWith/Contains
26152612
Seq(
@@ -2619,7 +2616,7 @@ index 6080a5e8e4b..f5dadef89ae 100644
26192616

26202617
- test("SPARK-17091: Convert IN predicate to Parquet filter push-down") {
26212618
+ test("SPARK-17091: Convert IN predicate to Parquet filter push-down",
2622-
+ IgnoreCometNativeScan("Comet has different push-down behavior")) {
2619+
+ IgnoreComet("Comet has different push-down behavior")) {
26232620
val schema = StructType(Seq(
26242621
StructField("a", IntegerType, nullable = false)
26252622
))
@@ -2666,7 +2663,7 @@ index 6080a5e8e4b..f5dadef89ae 100644
26662663

26672664
- test("SPARK-34562: Bloom filter push down") {
26682665
+ test("SPARK-34562: Bloom filter push down",
2669-
+ IgnoreCometNativeScan("Native scans do not support the tested accumulator")) {
2666+
+ IgnoreComet("Native scans do not support the tested accumulator")) {
26702667
withTempPath { dir =>
26712668
val path = dir.getCanonicalPath
26722669
spark.range(100).selectExpr("id * 2 AS id")
@@ -2773,32 +2770,21 @@ index 30503af0fab..1491f4bc2d5 100644
27732770

27742771
import testImplicits._
27752772
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
2776-
index 08fd8a9ecb5..27aee839b8c 100644
2773+
index 08fd8a9ecb5..306958da489 100644
27772774
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
27782775
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
2779-
@@ -20,6 +20,7 @@ import java.io.File
2780-
2781-
import scala.jdk.CollectionConverters._
2782-
2783-
+import org.apache.comet.CometConf
2784-
import org.apache.hadoop.fs.Path
2785-
import org.apache.parquet.column.ParquetProperties._
2786-
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
2787-
@@ -27,6 +28,7 @@ import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
2776+
@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
27882777

27892778
import org.apache.spark.SparkException
27902779
import org.apache.spark.sql.QueryTest
2791-
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec}
2780+
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec}
27922781
import org.apache.spark.sql.execution.FileSourceScanExec
27932782
import org.apache.spark.sql.execution.datasources.FileFormat
27942783
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
2795-
@@ -245,6 +247,17 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2784+
@@ -245,6 +246,14 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
27962785
case f: FileSourceScanExec =>
27972786
numPartitions += f.inputRDD.partitions.length
27982787
numOutputRows += f.metrics("numOutputRows").value
2799-
+ case b: CometScanExec =>
2800-
+ numPartitions += b.inputRDD.partitions.length
2801-
+ numOutputRows += b.metrics("numOutputRows").value
28022788
+ case b: CometBatchScanExec =>
28032789
+ numPartitions += b.inputRDD.partitions.length
28042790
+ numOutputRows += b.metrics("numOutputRows").value
@@ -2810,16 +2796,13 @@ index 08fd8a9ecb5..27aee839b8c 100644
28102796
case _ =>
28112797
}
28122798
assert(numPartitions > 0)
2813-
@@ -303,6 +316,12 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2799+
@@ -303,6 +312,9 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
28142800
val conf = RowIndexTestConf(useDataSourceV2 = useDataSourceV2)
28152801

28162802
test(s"invalid row index column type - ${conf.desc}") {
28172803
+ // https://github.com/apache/datafusion-comet/issues/3886
28182804
+ // Comet throws RuntimeException instead of SparkException
2819-
+ assume(!Seq(
2820-
+ CometConf.SCAN_NATIVE_DATAFUSION,
2821-
+ CometConf.SCAN_AUTO
2822-
+ ).contains(CometConf.COMET_NATIVE_SCAN_IMPL.get()))
2805+
+ assume(false)
28232806
withSQLConf(conf.sqlConfs: _*) {
28242807
withTempPath{ path =>
28252808
val df = spark.range(0, 10, 1, 1).toDF("id")
@@ -2844,15 +2827,15 @@ index 5c0b7def039..151184bc98c 100644
28442827
assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
28452828
s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
28462829
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
2847-
index 0acb21f3e6f..e7c65429119 100644
2830+
index 0acb21f3e6f..15bd866d8aa 100644
28482831
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
28492832
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
28502833
@@ -27,7 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
28512834
import org.apache.parquet.schema.Type._
28522835

28532836
import org.apache.spark.SparkException
28542837
-import org.apache.spark.sql.{AnalysisException, Row}
2855-
+import org.apache.spark.sql.{AnalysisException, IgnoreComet, IgnoreCometNativeDataFusion, Row}
2838+
+import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row}
28562839
import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType
28572840
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
28582841
import org.apache.spark.sql.functions.desc
@@ -2872,7 +2855,7 @@ index 0acb21f3e6f..e7c65429119 100644
28722855

28732856
- test("schema mismatch failure error message for parquet vectorized reader") {
28742857
+ test("schema mismatch failure error message for parquet vectorized reader",
2875-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4316")) {
2858+
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4316")) {
28762859
withTempPath { dir =>
28772860
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
28782861
assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
@@ -3460,69 +3443,39 @@ index 86c4e49f6f6..2e639e5f38d 100644
34603443
val tblTargetName = "tbl_target"
34613444
val tblSourceQualified = s"default.$tblSourceName"
34623445
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
3463-
index f0f3f94b811..be5e113c3ed 100644
3446+
index f0f3f94b811..b7d18771314 100644
34643447
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
34653448
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
3466-
@@ -27,13 +27,14 @@ import scala.jdk.CollectionConverters._
3467-
import scala.language.implicitConversions
3468-
import scala.util.control.NonFatal
3469-
3470-
+import org.apache.comet.CometConf
3471-
import org.apache.hadoop.fs.Path
3472-
import org.scalactic.source.Position
3473-
import org.scalatest.{BeforeAndAfterAll, Suite, Tag}
3449+
@@ -33,7 +33,7 @@ import org.scalatest.{BeforeAndAfterAll, Suite, Tag}
34743450
import org.scalatest.concurrent.Eventually
34753451

34763452
import org.apache.spark.SparkFunSuite
34773453
-import org.apache.spark.sql.{AnalysisException, Row}
3478-
+import org.apache.spark.sql.{AnalysisException, IgnoreComet, IgnoreCometNativeDataFusion, IgnoreCometNativeIcebergCompat, IgnoreCometNativeScan, Row}
3454+
+import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row}
34793455
import org.apache.spark.sql.catalyst.FunctionIdentifier
34803456
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
34813457
import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE
3482-
@@ -42,6 +43,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase
3458+
@@ -42,6 +42,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase
34833459
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
34843460
import org.apache.spark.sql.catalyst.util._
34853461
import org.apache.spark.sql.classic.{ClassicConversions, ColumnConversions, ColumnNodeToExpressionConverter, DataFrame, Dataset, SparkSession, SQLImplicits}
34863462
+import org.apache.spark.sql.comet.{CometFilterExec, CometProjectExec}
34873463
import org.apache.spark.sql.execution.FilterExec
34883464
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
34893465
import org.apache.spark.sql.execution.datasources.DataSourceUtils
3490-
@@ -121,6 +123,34 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
3466+
@@ -121,6 +122,11 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
34913467

34923468
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)
34933469
(implicit pos: Position): Unit = {
34943470
+ // Check Comet skip tags first, before DisableAdaptiveExecution handling
34953471
+ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) {
34963472
+ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
34973473
+ return
3498-
+ }
3499-
+ if (isCometEnabled) {
3500-
+ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
3501-
+ val isNativeIcebergCompat = cometScanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
3502-
+ cometScanImpl == CometConf.SCAN_AUTO
3503-
+ val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION ||
3504-
+ cometScanImpl == CometConf.SCAN_AUTO
3505-
+ if (isNativeIcebergCompat &&
3506-
+ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
3507-
+ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: _*)(testFun)
3508-
+ return
3509-
+ }
3510-
+ if (isNativeDataFusion &&
3511-
+ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) {
3512-
+ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun)
3513-
+ return
3514-
+ }
3515-
+ if ((isNativeDataFusion || isNativeIcebergCompat) &&
3516-
+ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) {
3517-
+ ignore(testName + " (disabled for NATIVE_DATAFUSION and NATIVE_ICEBERG_COMPAT)",
3518-
+ testTags: _*)(testFun)
3519-
+ return
3520-
+ }
35213474
+ }
35223475
if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
35233476
super.test(testName, testTags: _*) {
35243477
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
3525-
@@ -248,8 +278,15 @@ private[sql] trait SQLTestUtilsBase
3478+
@@ -248,8 +254,15 @@ private[sql] trait SQLTestUtilsBase
35263479
override protected def converter: ColumnNodeToExpressionConverter = self.spark.converter
35273480
}
35283481

@@ -3538,7 +3491,7 @@ index f0f3f94b811..be5e113c3ed 100644
35383491
super.withSQLConf(pairs: _*)(f)
35393492
}
35403493

3541-
@@ -451,6 +488,8 @@ private[sql] trait SQLTestUtilsBase
3494+
@@ -451,6 +464,8 @@ private[sql] trait SQLTestUtilsBase
35423495
val schema = df.schema
35433496
val withoutFilters = df.queryExecution.executedPlan.transform {
35443497
case FilterExec(_, child) => child

0 commit comments

Comments
 (0)