Skip to content

Commit b1fbbb8

Browse files
committed
Merge branch 'main' into codegen_scala_udf
# Conflicts: # dev/diffs/3.4.3.diff # dev/diffs/3.5.8.diff
2 parents dca8b22 + b1c586a commit b1fbbb8

26 files changed

Lines changed: 1502 additions & 494 deletions

File tree

common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public void close() {
128128

129129
protected void initNative() {
130130
LOG.debug("initializing the native column reader");
131-
DataType readType = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get() ? type : null;
131+
DataType readType = CometConf.COMET_SCHEMA_EVOLUTION_ENABLED() ? type : null;
132132
boolean useLegacyDateTimestampOrNTZ =
133133
useLegacyDateTimestamp || type == TimestampNTZType$.MODULE$;
134134
nativeHandle =

common/src/main/java/org/apache/comet/parquet/TypeUtil.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public static void checkParquetType(ColumnDescriptor descriptor, DataType sparkT
130130
PrimitiveType.PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName();
131131
LogicalTypeAnnotation logicalTypeAnnotation =
132132
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
133-
boolean allowTypePromotion = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get();
133+
boolean allowTypePromotion = CometConf.COMET_SCHEMA_EVOLUTION_ENABLED();
134134

135135
if (sparkType instanceof NullType) {
136136
return;
@@ -150,8 +150,8 @@ && isUnsignedIntTypeMatched(logicalTypeAnnotation, 32)) {
150150
// fallbacks. We read them as long values.
151151
return;
152152
} else if (sparkType == DataTypes.LongType && allowTypePromotion) {
153-
// In Comet we allow schema evolution from int to long, if
154-
// `spark.comet.schemaEvolution.enabled` is enabled.
153+
// INT32 -> LONG widening is allowed when Comet's per-Spark-version
154+
// type-promotion default permits it (Spark 4.x). See ShimCometConf.
155155
return;
156156
} else if (sparkType == DataTypes.ByteType || sparkType == DataTypes.ShortType) {
157157
return;
@@ -198,8 +198,8 @@ && isUnsignedIntTypeMatched(logicalTypeAnnotation, 64)) {
198198
break;
199199
case FLOAT:
200200
if (sparkType == DataTypes.FloatType) return;
201-
// In Comet we allow schema evolution from float to double, if
202-
// `spark.comet.schemaEvolution.enabled` is enabled.
201+
// FLOAT -> DOUBLE widening is allowed when Comet's per-Spark-version
202+
// type-promotion default permits it (Spark 4.x). See ShimCometConf.
203203
if (sparkType == DataTypes.DoubleType && allowTypePromotion) return;
204204
break;
205205
case DOUBLE:

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -738,16 +738,6 @@ object CometConf extends ShimCometConf {
738738
.booleanConf
739739
.createWithDefault(true)
740740

741-
val COMET_SCHEMA_EVOLUTION_ENABLED: ConfigEntry[Boolean] =
742-
conf("spark.comet.schemaEvolution.enabled")
743-
.internal()
744-
.category(CATEGORY_SCAN)
745-
.doc("Whether to enable schema evolution in Comet. For instance, promoting a integer " +
746-
"column to a long column, a float column to a double column, etc. This is automatically" +
747-
"enabled when reading from Iceberg tables.")
748-
.booleanConf
749-
.createWithDefault(COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT)
750-
751741
val COMET_ENABLE_PARTIAL_HASH_AGGREGATE: ConfigEntry[Boolean] =
752742
conf("spark.comet.testing.aggregate.partialMode.enabled")
753743
.internal()

common/src/main/spark-3.x/org/apache/comet/shims/ShimCometConf.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,12 @@
2020
package org.apache.comet.shims
2121

2222
trait ShimCometConf {
23-
protected val COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT = false
23+
24+
/**
25+
* Whether Comet's Parquet scan paths allow widening type promotions (e.g. INT32 → INT64, FLOAT
26+
* → DOUBLE). Spark 3.x's vectorized reader rejects these on read, so Comet matches by
27+
* defaulting to false on 3.x. Reads from the deprecated `spark.comet.schemaEvolution.enabled`
28+
* SQL conf were removed in favor of this per-version constant; see #4298.
29+
*/
30+
val COMET_SCHEMA_EVOLUTION_ENABLED: Boolean = false
2431
}

common/src/main/spark-4.x/org/apache/comet/shims/ShimCometConf.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,12 @@
2020
package org.apache.comet.shims
2121

2222
trait ShimCometConf {
23-
protected val COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT = true
23+
24+
/**
25+
* Whether Comet's Parquet scan paths allow widening type promotions (e.g. INT32 → INT64, FLOAT
26+
* → DOUBLE, INT32 → DOUBLE). Spark 4.x's vectorized reader accepts these by default. Reads from
27+
* the deprecated `spark.comet.schemaEvolution.enabled` SQL conf were removed in favor of this
28+
* per-version constant; see #4298.
29+
*/
30+
val COMET_SCHEMA_EVOLUTION_ENABLED: Boolean = true
2431
}

dev/diffs/3.4.3.diff

Lines changed: 9 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -918,7 +918,7 @@ index b5b34922694..a72403780c4 100644
918918
protected val baseResourcePath = {
919919
// use the same way as `SQLQueryTestSuite` to get the resource path
920920
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
921-
index 525d97e4998..481e1b0da2a 100644
921+
index 525d97e4998..f600e162da3 100644
922922
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
923923
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
924924
@@ -1508,7 +1508,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
@@ -931,22 +931,7 @@ index 525d97e4998..481e1b0da2a 100644
931931
AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
932932
sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect()
933933
}
934-
@@ -1960,8 +1961,13 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
935-
countAcc.add(1)
936-
x
937-
})
938-
+ // Comet's `CometProject` implements cross-sibling subexpression elimination over
939-
+ // `ScalaUDF`, but its aggregation operator does not, so each `ScalaUDF` reference inside
940-
+ // the aggregated expression invokes the UDF body separately. TODO(comet#XXXX): extend the
941-
+ // CometProject CSE to the aggregation operator's input projection.
942-
verifyCallCount(
943-
- df.agg(sum(testUdf($"b") + testUdf($"b") + testUdf($"b"))), Row(3.0), 1)
944-
+ df.agg(sum(testUdf($"b") + testUdf($"b") + testUdf($"b"))), Row(3.0),
945-
+ if (isCometEnabled) 3 else 1)
946-
947-
verifyCallCount(
948-
df.selectExpr("testUdf(a + 1) + testUdf(1 + a)", "testUdf(a + 1)"), Row(4, 2), 1)
949-
@@ -3730,7 +3736,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
934+
@@ -3730,7 +3731,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
950935
}
951936
}
952937

@@ -956,36 +941,6 @@ index 525d97e4998..481e1b0da2a 100644
956941
val sc = spark.sparkContext
957942
val hiveVersion = "2.3.9"
958943
// transitive=false, only download specified jar
959-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
960-
index 2dabcf01be7..9bc0be5d9aa 100644
961-
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
962-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
963-
@@ -491,8 +491,23 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper
964-
s"Schema did not match for query #$i\n${expected.sql}: $output") {
965-
output.schema
966-
}
967-
- assertResult(expected.output, s"Result did not match" +
968-
- s" for query #$i\n${expected.sql}") { output.output }
969-
+ // Comet may surface errors as `CometNativeException` instead of the matching Spark
970-
+ // exception class when DataFusion's parquet row filter wraps the typed error via
971-
+ // `format!("{e:?}")`, dropping the JNI bridge's ability to downcast. Same category,
972-
+ // different surface. Collapse both sides to a placeholder when this happens so the
973-
+ // literal compare passes. TODO(comet#XXXX): remove once DataFusion preserves the typed
974-
+ // error end to end.
975-
+ val (expectedOut, actualOut) = if (isCometEnabled &&
976-
+ expected.output.startsWith("org.apache.spark.SparkArithmeticException") &&
977-
+ expected.output.contains("\"DIVIDE_BY_ZERO\"") &&
978-
+ output.output.startsWith("org.apache.comet.CometNativeException") &&
979-
+ output.output.contains("DivideByZero")) {
980-
+ ("[DIVIDE_BY_ZERO]", "[DIVIDE_BY_ZERO]")
981-
+ } else {
982-
+ (expected.output, output.output)
983-
+ }
984-
+ assertResult(expectedOut, s"Result did not match" +
985-
+ s" for query #$i\n${expected.sql}") { actualOut }
986-
}
987-
}
988-
}
989944
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
990945
index 48ad10992c5..51d1ee65422 100644
991946
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -2166,28 +2121,10 @@ index 104b4e416cd..4adb273170a 100644
21662121
case _ =>
21672122
throw new AnalysisException("Can not match ParquetTable in the query.")
21682123
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
2169-
index 8670d95c65e..9411af57a26 100644
2124+
index 8670d95c65e..b624c3811dd 100644
21702125
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
21712126
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
2172-
@@ -41,6 +41,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser}
2173-
2174-
import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils}
2175-
import org.apache.spark.sql._
2176-
+import org.apache.spark.sql.IgnoreCometNativeDataFusion
2177-
import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
2178-
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
2179-
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2180-
@@ -1075,7 +1076,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
2181-
}
2182-
}
2183-
2184-
- test("SPARK-35640: int as long should throw schema incompatible error") {
2185-
+ test("SPARK-35640: int as long should throw schema incompatible error",
2186-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
2187-
val data = (1 to 4).map(i => Tuple1(i))
2188-
val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType)))
2189-
2190-
@@ -1335,7 +1337,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
2127+
@@ -1335,7 +1335,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
21912128
}
21922129
}
21932130

@@ -2198,7 +2135,7 @@ index 8670d95c65e..9411af57a26 100644
21982135
checkAnswer(
21992136
// "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY.
22002137
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
2201-
index 29cb224c878..1f7a0ebf0bd 100644
2138+
index 29cb224c878..dcb8a0e9bef 100644
22022139
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
22032140
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
22042141
@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
@@ -2235,7 +2172,7 @@ index 29cb224c878..1f7a0ebf0bd 100644
22352172

22362173
- test("SPARK-34212 Parquet should read decimals correctly") {
22372174
+ test("SPARK-34212 Parquet should read decimals correctly",
2238-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
2175+
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4354")) {
22392176
def readParquet(schema: String, path: File): DataFrame = {
22402177
spark.read.schema(schema).parquet(path.toString)
22412178
}
@@ -2265,7 +2202,7 @@ index 29cb224c878..1f7a0ebf0bd 100644
22652202

22662203
- test("row group skipping doesn't overflow when reading into larger type") {
22672204
+ test("row group skipping doesn't overflow when reading into larger type",
2268-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
2205+
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4354")) {
22692206
withTempPath { path =>
22702207
Seq(0).toDF("a").write.parquet(path.toString)
22712208
// The vectorized and non-vectorized readers will produce different exceptions, we don't need
@@ -2354,7 +2291,7 @@ index 5c0b7def039..151184bc98c 100644
23542291
assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
23552292
s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
23562293
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
2357-
index bf5c51b89bb..dc3aac281c3 100644
2294+
index bf5c51b89bb..7e143a0e0f9 100644
23582295
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
23592296
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
23602297
@@ -27,6 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
@@ -2381,7 +2318,7 @@ index bf5c51b89bb..dc3aac281c3 100644
23812318

23822319
- test("schema mismatch failure error message for parquet vectorized reader") {
23832320
+ test("schema mismatch failure error message for parquet vectorized reader",
2384-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
2321+
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4316")) {
23852322
withTempPath { dir =>
23862323
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
23872324
assert(e.getCause.isInstanceOf[SparkException])

0 commit comments

Comments
 (0)