Skip to content

Commit 7bf36ff

Browse files
committed
migrate some files to show usage
1 parent feaaadb commit 7bf36ff

6 files changed

Lines changed: 47 additions & 134 deletions

File tree

spark/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.network.util.JavaUtils
2929
import org.apache.spark.sql.comet.util.Utils
3030
import org.apache.spark.sql.internal.SQLConf
3131

32-
import org.apache.comet.shims.ShimCometConf
32+
import org.apache.comet.enableIfVer
3333

3434
/**
3535
* Configurations for a Comet application. Mostly inspired by [[SQLConf]] in Spark.
@@ -43,7 +43,7 @@ import org.apache.comet.shims.ShimCometConf
4343
* which retrieves the config value from the thread-local [[SQLConf]] object. Alternatively, you
4444
* can also explicitly pass a [[SQLConf]] object to the `get` method.
4545
*/
46-
object CometConf extends ShimCometConf {
46+
object CometConf {
4747

4848
val COMPAT_GUIDE: String = "For more information, refer to the Comet Compatibility " +
4949
"Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html)"
@@ -852,6 +852,26 @@ object CometConf extends ShimCometConf {
852852
val COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT: ConfigEntry[Boolean] =
853853
createOperatorIncompatConfig("DataWritingCommandExec")
854854

855+
/**
856+
* Whether Comet's Parquet scan paths allow widening type promotions (e.g. INT32 -> INT64, FLOAT
857+
* -> DOUBLE, INT32 -> DOUBLE). Reads from the deprecated `spark.comet.schemaEvolution.enabled`
858+
* SQL conf were removed in favor of this per-version constant; see #4298.
859+
*/
860+
val COMET_SCHEMA_EVOLUTION_ENABLED: Boolean = isCometSchemaEvolutionEnabled
861+
862+
/**
863+
* Spark 3.x's vectorized reader rejects these on read, so Comet matches by defaulting to false
864+
* on 3.x
865+
*/
866+
@enableIfVer(spark = "3")
867+
private def isCometSchemaEvolutionEnabled: Boolean = false
868+
869+
/**
870+
* Spark 4.x's reader accepts them, so it defaults to true
871+
*/
872+
@enableIfVer(spark = "4")
873+
private def isCometSchemaEvolutionEnabled: Boolean = true
874+
855875
/** Create a config to enable a specific operator */
856876
private def createExecEnabledConfig(
857877
exec: String,

spark/src/main/spark-3.5/org/apache/comet/shims/ShimFileFormat.scala renamed to spark/src/main/scala/org/apache/comet/shims/ShimFileFormat.scala

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,36 @@
1919

2020
package org.apache.comet.shims
2121

22-
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
23-
import org.apache.spark.sql.execution.datasources.parquet.ParquetRowIndexUtil
2422
import org.apache.spark.sql.types.StructType
2523

24+
import org.apache.comet.enableIfVer
25+
2626
object ShimFileFormat {
2727
// A name for a temporary column that holds row indexes computed by the file format reader
2828
// until they can be placed in the _metadata struct.
29-
val ROW_INDEX_TEMPORARY_COLUMN_NAME = ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
29+
val ROW_INDEX_TEMPORARY_COLUMN_NAME: String = getRowIndexTemporaryColumnName
30+
31+
@enableIfVer(spark = "<3.5.0")
32+
private def getRowIndexTemporaryColumnName: String = {
33+
import org.apache.spark.sql.execution.datasources.FileFormat
34+
FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
35+
}
36+
37+
@enableIfVer(spark = ">=3.5.0")
38+
private def getRowIndexTemporaryColumnName: String = {
39+
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
40+
ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
41+
}
42+
43+
@enableIfVer(spark = "<3.5.0")
44+
def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int = {
45+
import org.apache.spark.sql.execution.datasources.RowIndexUtil
46+
RowIndexUtil.findRowIndexColumnIndexInSchema(sparkSchema)
47+
}
3048

31-
def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int =
49+
@enableIfVer(spark = ">=3.5.0")
50+
def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int = {
51+
import org.apache.spark.sql.execution.datasources.parquet.ParquetRowIndexUtil
3252
ParquetRowIndexUtil.findRowIndexColumnIndexInSchema(sparkSchema)
53+
}
3354
}

spark/src/main/spark-3.4/org/apache/comet/shims/ShimFileFormat.scala

Lines changed: 0 additions & 33 deletions
This file was deleted.

spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometConf.scala

Lines changed: 0 additions & 31 deletions
This file was deleted.

spark/src/main/spark-4.x/org/apache/comet/shims/ShimCometConf.scala

Lines changed: 0 additions & 31 deletions
This file was deleted.

spark/src/main/spark-4.x/org/apache/comet/shims/ShimFileFormat.scala

Lines changed: 0 additions & 33 deletions
This file was deleted.

0 commit comments

Comments
 (0)