diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 49eb55479b..25973f14dd 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -78,11 +78,11 @@ object CometConf extends ShimCometConf { val COMET_PREFIX = "spark.comet"; - val COMET_EXEC_CONFIG_PREFIX: String = s"$COMET_PREFIX.exec"; + val COMET_EXEC_CONFIG_PREFIX: String = s"$COMET_PREFIX.exec" - val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression"; + val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression" - val COMET_OPERATOR_CONFIG_PREFIX: String = s"$COMET_PREFIX.operator"; + val COMET_OPERATOR_CONFIG_PREFIX: String = s"$COMET_PREFIX.operator" val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled") .category(CATEGORY_EXEC) @@ -112,7 +112,7 @@ object CometConf extends ShimCometConf { "feature is highly experimental and only partially implemented. It should not " + "be used in production.") .booleanConf - .createWithDefault(false) + .createWithEnvVarOrDefault("ENABLE_COMET_WRITE", false) // Deprecated: native_comet uses mutable buffers incompatible with Arrow FFI best practices // and does not support complex types. Use native_iceberg_compat or auto instead. @@ -490,6 +490,13 @@ object CometConf extends ShimCometConf { "Ensure that Comet shuffle memory overhead factor is a double greater than 0") .createWithDefault(1.0) + val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize") + .category(CATEGORY_TUNING) + .doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.") + .intConf + .checkValue(v => v > 0, "Batch size must be positive") + .createWithDefault(8192) + val COMET_COLUMNAR_SHUFFLE_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.columnar.shuffle.batch.size") .category(CATEGORY_SHUFFLE) @@ -497,6 +504,9 @@ object CometConf extends ShimCometConf { "this should not be larger than batch size (i.e., `spark.comet.batchSize`). Otherwise " + "it will produce larger batches than expected in the native operator after shuffle.") .intConf + .checkValue( + v => v <= COMET_BATCH_SIZE.get(), + "Should not be larger than batch size `spark.comet.batchSize`") .createWithDefault(8192) val COMET_SHUFFLE_WRITE_BUFFER_SIZE: ConfigEntry[Long] = @@ -552,6 +562,7 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + // Used on native side. Check spark_config.rs how the config is used val COMET_DEBUG_MEMORY_ENABLED: ConfigEntry[Boolean] = conf(s"$COMET_PREFIX.debug.memory") .category(CATEGORY_TESTING) @@ -610,12 +621,6 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) - val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize") - .category(CATEGORY_TUNING) - .doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.") - .intConf - .createWithDefault(8192) - val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] = conf("spark.comet.parquet.enable.directBuffer") .category(CATEGORY_PARQUET) @@ -795,14 +800,6 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) - val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] = - conf("spark.comet.regexp.allowIncompatible") - .category(CATEGORY_EXEC) - .doc("Comet is not currently fully compatible with Spark for all regular expressions. " + - s"Set this config to true to allow them anyway. $COMPAT_GUIDE.") - .booleanConf - .createWithDefault(false) - val COMET_METRICS_UPDATE_INTERVAL: ConfigEntry[Long] = conf("spark.comet.metrics.updateInterval") .category(CATEGORY_EXEC) @@ -821,6 +818,7 @@ object CometConf extends ShimCometConf { .stringConf .createOptional + // Used on native side. Check spark_config.rs how the config is used val COMET_MAX_TEMP_DIRECTORY_SIZE: ConfigEntry[Long] = conf("spark.comet.maxTempDirectorySize") .category(CATEGORY_EXEC) @@ -845,6 +843,9 @@ object CometConf extends ShimCometConf { .booleanConf .createWithEnvVarOrDefault("ENABLE_COMET_STRICT_TESTING", false) + val COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT: ConfigEntry[Boolean] = + createOperatorIncompatConfig("DataWritingCommandExec") + /** Create a config to enable a specific operator */ private def createExecEnabledConfig( exec: String, @@ -860,6 +861,25 @@ object CometConf extends ShimCometConf { .createWithDefault(defaultValue) } + /** + * Converts a config key to a valid environment variable name. Example: + * "spark.comet.operator.DataWritingCommandExec.allowIncompatible" -> + * "SPARK_COMET_OPERATOR_DATAWRITINGCOMMANDEXEC_ALLOWINCOMPATIBLE" + */ + private def configKeyToEnvVar(configKey: String): String = + configKey.toUpperCase(Locale.ROOT).replace('.', '_') + + private def createOperatorIncompatConfig(name: String): ConfigEntry[Boolean] = { + val configKey = getOperatorAllowIncompatConfigKey(name) + val envVar = configKeyToEnvVar(configKey) + conf(configKey) + .category(CATEGORY_EXEC) + .doc(s"Whether to allow incompatibility for operator: $name. " + + s"False by default. Can be overridden with $envVar env variable") + .booleanConf + .createWithEnvVarOrDefault(envVar, false) + } + def isExprEnabled(name: String, conf: SQLConf = SQLConf.get): Boolean = { getBooleanConf(getExprEnabledConfigKey(name), defaultValue = true, conf) } diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 0205888433..5713cacea5 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -1009,7 +1009,7 @@ index 18123a4d6ec..fbe4c766eee 100644 - regexp_extract($"a", "(\\d+)-(\\d+)", 1)), - Row("num-num", "300", "100") :: Row("num-num", "400", "100") :: - Row("num-num", "400-400", "100") :: Nil) -+ withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { ++ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") { + val df = Seq( + ("100-200", "(\\d+)-(\\d+)", "300"), + ("100-200", "(\\d+)-(\\d+)", "400"), diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index d2d72e9d68..3aaecdecb1 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -985,7 +985,7 @@ index fa1a64460fc..1d2e215d6a3 100644 - ("100-200", "(\\d+)-(\\d+)", "300"), - ("100-200", "(\\d+)-(\\d+)", "400"), - ("100-200", "(\\d+)", "400")).toDF("a", "b", "c") -+ withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { ++ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") { + val df = Seq( + ("100-200", "(\\d+)-(\\d+)", "300"), + ("100-200", "(\\d+)-(\\d+)", "400"), diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index d6694e827f..a1b2506554 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -1165,7 +1165,7 @@ index 0df7f806272..52d33d67328 100644 - ("100-200", "(\\d+)-(\\d+)", "300"), - ("100-200", "(\\d+)-(\\d+)", "400"), - ("100-200", "(\\d+)", "400")).toDF("a", "b", "c") -+ withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { ++ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") { + val df = Seq( + ("100-200", "(\\d+)-(\\d+)", "300"), + ("100-200", "(\\d+)-(\\d+)", "400"), diff --git a/docs/source/user-guide/latest/compatibility.md b/docs/source/user-guide/latest/compatibility.md index c09f6a61e6..21695bdf57 100644 --- a/docs/source/user-guide/latest/compatibility.md +++ b/docs/source/user-guide/latest/compatibility.md @@ -62,7 +62,7 @@ the [Comet Supported Expressions Guide](expressions.md) for more information on Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but -this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`. +this can be overridden by setting `spark.comet.expression.regexp.allowIncompatible=true`. ## Window Functions diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala index 31575138f8..69b9bd5f85 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala @@ -60,7 +60,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec case _: ParquetFileFormat => if (!cmd.outputPath.toString.startsWith("file:") && !cmd.outputPath.toString .startsWith("hdfs:")) { - return Unsupported(Some("Only HDFS/local filesystems output paths are supported")) + return Unsupported(Some("Supported output filesystems: local, HDFS")) } if (cmd.bucketSpec.isDefined) { diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index db60709007..ae32b625dc 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -223,11 +223,11 @@ object CometRLike extends CometExpressionSerde[RLike] { expr.right match { case Literal(pattern, DataTypes.StringType) => if (!RegExp.isSupportedPattern(pattern.toString) && - !CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) { + !CometConf.isExprAllowIncompat("regexp")) { withInfo( expr, s"Regexp pattern $pattern is not compatible with Spark. " + - s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true " + + s"Set ${CometConf.getExprAllowIncompatConfigKey("regexp")}=true " + "to allow it anyway.") None } else { @@ -298,11 +298,11 @@ object CometStringLPad extends CometExpressionSerde[StringLPad] { object CometRegExpReplace extends CometExpressionSerde[RegExpReplace] { override def getSupportLevel(expr: RegExpReplace): SupportLevel = { if (!RegExp.isSupportedPattern(expr.regexp.toString) && - !CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) { + !CometConf.isExprAllowIncompat("regexp")) { withInfo( expr, s"Regexp pattern ${expr.regexp} is not compatible with Spark. " + - s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true " + + s"Set ${CometConf.getExprAllowIncompatConfigKey("regexp")}=true " + "to allow it anyway.") return Incompatible() } diff --git a/spark/src/test/resources/sql-tests/expressions/string/regexp_replace_enabled.sql b/spark/src/test/resources/sql-tests/expressions/string/regexp_replace_enabled.sql index 7ea13a18a2..bf8a544515 100644 --- a/spark/src/test/resources/sql-tests/expressions/string/regexp_replace_enabled.sql +++ b/spark/src/test/resources/sql-tests/expressions/string/regexp_replace_enabled.sql @@ -16,7 +16,7 @@ -- under the License. -- Test regexp_replace() with regexp allowIncompatible enabled (happy path) --- Config: spark.comet.regexp.allowIncompatible=true +-- Config: spark.comet.expression.regexp.allowIncompatible=true -- ConfigMatrix: parquet.enable.dictionary=false,true statement diff --git a/spark/src/test/resources/sql-tests/expressions/string/rlike_enabled.sql b/spark/src/test/resources/sql-tests/expressions/string/rlike_enabled.sql index 968a2f22f7..822fb3ddb8 100644 --- a/spark/src/test/resources/sql-tests/expressions/string/rlike_enabled.sql +++ b/spark/src/test/resources/sql-tests/expressions/string/rlike_enabled.sql @@ -16,7 +16,7 @@ -- under the License. -- Test RLIKE with regexp allowIncompatible enabled (happy path) --- Config: spark.comet.regexp.allowIncompatible=true +-- Config: spark.comet.expression.regexp.allowIncompatible=true -- ConfigMatrix: parquet.enable.dictionary=false,true statement diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 1ab8d54fd2..415d8c3a19 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -958,7 +958,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { // add repetitive data to trigger dictionary encoding Range(0, 100).map(_ => "John Smith") withParquetFile(data.zipWithIndex, withDictionary) { file => - withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") { spark.read.parquet(file).createOrReplaceTempView(table) val query = sql(s"select _2 as id, _1 rlike 'R[a-z]+s [Rr]ose' from $table") checkSparkAnswerAndOperator(query) @@ -996,7 +996,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withTable(table) { sql(s"create table $table(id int, name varchar(20)) using parquet") sql(s"insert into $table values(1,'James Smith')") - withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") { val query2 = sql(s"select id from $table where name rlike name") val (_, cometPlan) = checkSparkAnswer(query2) val explain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) @@ -1030,7 +1030,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { // "Smith$", "Smith\\Z", "Smith\\z") - withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") { patterns.foreach { pattern => val query2 = sql(s"select name, '$pattern', name rlike '$pattern' from $table") checkSparkAnswerAndOperator(query2) @@ -1090,7 +1090,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { "\\V") val qualifiers = Seq("", "+", "*", "?", "{1,}") - withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") { // testing every possible combination takes too long, so we pick some // random combinations for (_ <- 0 until 100) { diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzIcebergSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzIcebergSuite.scala index 0a188f6cce..f37d997c41 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzIcebergSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzIcebergSuite.scala @@ -133,7 +133,7 @@ class CometFuzzIcebergSuite extends CometFuzzIcebergBase { } test("regexp_replace") { - withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") { val df = spark.table(icebergTableName) // We want to make sure that the schema generator wasn't modified to accidentally omit // StringType, since then this test would not run any queries and silently pass. diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index 02d13c841d..fe6032414e 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -255,7 +255,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { } test("regexp_replace") { - withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") { val df = spark.read.parquet(filename) df.createOrReplaceTempView("t1") // We want to make sure that the schema generator wasn't modified to accidentally omit diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index e4c405c003..815f03f213 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -48,7 +48,7 @@ class CometParquetWriterSuite extends CometTestBase { withSQLConf( CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax", - CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true") { writeWithCometNativeWriteExec(inputPath, outputPath) @@ -70,7 +70,7 @@ class CometParquetWriterSuite extends CometTestBase { withSQLConf( CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax", - CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true") { withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_datafusion") { @@ -310,55 +310,6 @@ class CometParquetWriterSuite extends CometTestBase { } } - // ignored: native_comet scan is no longer supported - ignore("native write falls back when scan produces non-Arrow data") { - // This test verifies that when a native scan (like native_comet) doesn't support - // certain data types (complex types), the native write correctly falls back to Spark - // instead of failing at runtime with "Comet execution only takes Arrow Arrays" error. - withTempPath { dir => - val inputPath = new File(dir, "input.parquet").getAbsolutePath - val outputPath = new File(dir, "output.parquet").getAbsolutePath - - // Create data with complex types and write without Comet - withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val df = Seq((1, Seq(1, 2, 3)), (2, Seq(4, 5)), (3, Seq(6, 7, 8, 9))) - .toDF("id", "values") - df.write.parquet(inputPath) - } - - // With native Parquet write enabled but using native_comet scan which doesn't - // support complex types, the scan falls back to Spark. The native write should - // detect this and also fall back to Spark instead of failing at runtime. - withSQLConf( - CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true", - // Use native_comet which doesn't support complex types - CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_comet") { - - val plan = - captureWritePlan(path => spark.read.parquet(inputPath).write.parquet(path), outputPath) - - // Verify NO CometNativeWriteExec in the plan (should have fallen back to Spark) - val hasNativeWrite = plan.exists { - case _: CometNativeWriteExec => true - case d: DataWritingCommandExec => - d.child.exists(_.isInstanceOf[CometNativeWriteExec]) - case _ => false - } - - assert( - !hasNativeWrite, - "Expected fallback to Spark write (no CometNativeWriteExec), but found native write " + - s"in plan:\n${plan.treeString}") - - // Verify the data was written correctly - val result = spark.read.parquet(outputPath).collect() - assert(result.length == 3, "Expected 3 rows to be written") - } - } - } - test("parquet write complex types fuzz test") { withTempPath { dir => val outputPath = new File(dir, "output.parquet").getAbsolutePath @@ -517,7 +468,7 @@ class CometParquetWriterSuite extends CometTestBase { withSQLConf( CometConf.COMET_EXEC_ENABLED.key -> "true", // enable experimental native writes - CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true", CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", // explicitly set scan impl to override CI defaults CometConf.COMET_NATIVE_SCAN_IMPL.key -> "auto", diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala index d9c49bc596..5982460a87 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala @@ -114,7 +114,7 @@ object CometTPCDSMicroBenchmark extends CometTPCQueryBenchmarkBase { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true", + CometConf.getExprAllowIncompatConfigKey("regexp") -> "true", // enabling COMET_EXPLAIN_NATIVE_ENABLED may add overhead but is useful for debugging CometConf.COMET_EXPLAIN_NATIVE_ENABLED.key -> "false") { cometSpark.sql(queryString).noop()