Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 38 additions & 18 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ object CometConf extends ShimCometConf {

val COMET_PREFIX = "spark.comet";

val COMET_EXEC_CONFIG_PREFIX: String = s"$COMET_PREFIX.exec";
val COMET_EXEC_CONFIG_PREFIX: String = s"$COMET_PREFIX.exec"

val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression";
val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression"

val COMET_OPERATOR_CONFIG_PREFIX: String = s"$COMET_PREFIX.operator";
val COMET_OPERATOR_CONFIG_PREFIX: String = s"$COMET_PREFIX.operator"

val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled")
.category(CATEGORY_EXEC)
Expand Down Expand Up @@ -112,7 +112,7 @@ object CometConf extends ShimCometConf {
"feature is highly experimental and only partially implemented. It should not " +
"be used in production.")
.booleanConf
.createWithDefault(false)
.createWithEnvVarOrDefault("ENABLE_COMET_WRITE", false)

// Deprecated: native_comet uses mutable buffers incompatible with Arrow FFI best practices
// and does not support complex types. Use native_iceberg_compat or auto instead.
Expand Down Expand Up @@ -490,13 +490,23 @@ object CometConf extends ShimCometConf {
"Ensure that Comet shuffle memory overhead factor is a double greater than 0")
.createWithDefault(1.0)

val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize")
.category(CATEGORY_TUNING)
.doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.")
.intConf
.checkValue(v => v > 0, "Batch size must be positive")
.createWithDefault(8192)

val COMET_COLUMNAR_SHUFFLE_BATCH_SIZE: ConfigEntry[Int] =
conf("spark.comet.columnar.shuffle.batch.size")
.category(CATEGORY_SHUFFLE)
.doc("Batch size when writing out sorted spill files on the native side. Note that " +
"this should not be larger than batch size (i.e., `spark.comet.batchSize`). Otherwise " +
"it will produce larger batches than expected in the native operator after shuffle.")
.intConf
.checkValue(
v => v <= COMET_BATCH_SIZE.get(),
"Should not be larger than batch size `spark.comet.batchSize`")
.createWithDefault(8192)

val COMET_SHUFFLE_WRITE_BUFFER_SIZE: ConfigEntry[Long] =
Expand Down Expand Up @@ -552,6 +562,7 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

// Used on native side. Check spark_config.rs how the config is used
val COMET_DEBUG_MEMORY_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_PREFIX.debug.memory")
.category(CATEGORY_TESTING)
Expand Down Expand Up @@ -610,12 +621,6 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize")
.category(CATEGORY_TUNING)
.doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.")
.intConf
.createWithDefault(8192)

val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] =
conf("spark.comet.parquet.enable.directBuffer")
.category(CATEGORY_PARQUET)
Expand Down Expand Up @@ -795,14 +800,6 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.regexp.allowIncompatible")
.category(CATEGORY_EXEC)
.doc("Comet is not currently fully compatible with Spark for all regular expressions. " +
s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
.booleanConf
.createWithDefault(false)
Comment on lines -798 to -804
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the config here will remove it from the configuration guide, but maybe that is fine, so long as we are explaining it in the compatibility guide.


val COMET_METRICS_UPDATE_INTERVAL: ConfigEntry[Long] =
conf("spark.comet.metrics.updateInterval")
.category(CATEGORY_EXEC)
Expand All @@ -821,6 +818,7 @@ object CometConf extends ShimCometConf {
.stringConf
.createOptional

// Used on native side. Check spark_config.rs how the config is used
val COMET_MAX_TEMP_DIRECTORY_SIZE: ConfigEntry[Long] =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of these configs are used directly from native code. We add them here so that they get documented.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in spark_config.rs:

pub(crate) const COMET_TRACING_ENABLED: &str = "spark.comet.tracing.enabled";
pub(crate) const COMET_DEBUG_ENABLED: &str = "spark.comet.debug.enabled";
pub(crate) const COMET_EXPLAIN_NATIVE_ENABLED: &str = "spark.comet.explain.native.enabled";
pub(crate) const COMET_MAX_TEMP_DIRECTORY_SIZE: &str = "spark.comet.maxTempDirectorySize";
pub(crate) const COMET_DEBUG_MEMORY: &str = "spark.comet.debug.memory";

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ic, I'll add comments!

conf("spark.comet.maxTempDirectorySize")
.category(CATEGORY_EXEC)
Expand All @@ -845,6 +843,9 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithEnvVarOrDefault("ENABLE_COMET_STRICT_TESTING", false)

val COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT: ConfigEntry[Boolean] =
createOperatorIncompatConfig("DataWritingCommandExec")

/** Create a config to enable a specific operator */
private def createExecEnabledConfig(
exec: String,
Expand All @@ -860,6 +861,25 @@ object CometConf extends ShimCometConf {
.createWithDefault(defaultValue)
}

/**
* Converts a config key to a valid environment variable name. Example:
* "spark.comet.operator.DataWritingCommandExec.allowIncompatible" ->
* "SPARK_COMET_OPERATOR_DATAWRITINGCOMMANDEXEC_ALLOWINCOMPATIBLE"
*/
private def configKeyToEnvVar(configKey: String): String =
configKey.toUpperCase(Locale.ROOT).replace('.', '_')

private def createOperatorIncompatConfig(name: String): ConfigEntry[Boolean] = {
val configKey = getOperatorAllowIncompatConfigKey(name)
val envVar = configKeyToEnvVar(configKey)
conf(configKey)
.category(CATEGORY_EXEC)
.doc(s"Whether to allow incompatibility for operator: $name. " +
s"False by default. Can be overridden with $envVar env variable")
.booleanConf
.createWithEnvVarOrDefault(envVar, false)
}

def isExprEnabled(name: String, conf: SQLConf = SQLConf.get): Boolean = {
getBooleanConf(getExprEnabledConfigKey(name), defaultValue = true, conf)
}
Expand Down
2 changes: 1 addition & 1 deletion dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ index 18123a4d6ec..fbe4c766eee 100644
- regexp_extract($"a", "(\\d+)-(\\d+)", 1)),
- Row("num-num", "300", "100") :: Row("num-num", "400", "100") ::
- Row("num-num", "400-400", "100") :: Nil)
+ withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
+ val df = Seq(
+ ("100-200", "(\\d+)-(\\d+)", "300"),
+ ("100-200", "(\\d+)-(\\d+)", "400"),
Expand Down
2 changes: 1 addition & 1 deletion dev/diffs/3.5.8.diff
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ index fa1a64460fc..1d2e215d6a3 100644
- ("100-200", "(\\d+)-(\\d+)", "300"),
- ("100-200", "(\\d+)-(\\d+)", "400"),
- ("100-200", "(\\d+)", "400")).toDF("a", "b", "c")
+ withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
+ val df = Seq(
+ ("100-200", "(\\d+)-(\\d+)", "300"),
+ ("100-200", "(\\d+)-(\\d+)", "400"),
Expand Down
2 changes: 1 addition & 1 deletion dev/diffs/4.0.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1165,7 +1165,7 @@ index 0df7f806272..52d33d67328 100644
- ("100-200", "(\\d+)-(\\d+)", "300"),
- ("100-200", "(\\d+)-(\\d+)", "400"),
- ("100-200", "(\\d+)", "400")).toDF("a", "b", "c")
+ withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
+ val df = Seq(
+ ("100-200", "(\\d+)-(\\d+)", "300"),
+ ("100-200", "(\\d+)-(\\d+)", "400"),
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/latest/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ the [Comet Supported Expressions Guide](expressions.md) for more information on

Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's
regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but
this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`.
this can be overridden by setting `spark.comet.expression.regexp.allowIncompatible=true`.

## Window Functions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
case _: ParquetFileFormat =>
if (!cmd.outputPath.toString.startsWith("file:") && !cmd.outputPath.toString
.startsWith("hdfs:")) {
return Unsupported(Some("Only HDFS/local filesystems output paths are supported"))
return Unsupported(Some("Supported output filesystems: local, HDFS"))
}

if (cmd.bucketSpec.isDefined) {
Expand Down
8 changes: 4 additions & 4 deletions spark/src/main/scala/org/apache/comet/serde/strings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,11 @@ object CometRLike extends CometExpressionSerde[RLike] {
expr.right match {
case Literal(pattern, DataTypes.StringType) =>
if (!RegExp.isSupportedPattern(pattern.toString) &&
!CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) {
!CometConf.isExprAllowIncompat("regexp")) {
withInfo(
expr,
s"Regexp pattern $pattern is not compatible with Spark. " +
s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true " +
s"Set ${CometConf.getExprAllowIncompatConfigKey("regexp")}=true " +
"to allow it anyway.")
None
} else {
Expand Down Expand Up @@ -298,11 +298,11 @@ object CometStringLPad extends CometExpressionSerde[StringLPad] {
object CometRegExpReplace extends CometExpressionSerde[RegExpReplace] {
override def getSupportLevel(expr: RegExpReplace): SupportLevel = {
if (!RegExp.isSupportedPattern(expr.regexp.toString) &&
!CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) {
!CometConf.isExprAllowIncompat("regexp")) {
withInfo(
expr,
s"Regexp pattern ${expr.regexp} is not compatible with Spark. " +
s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true " +
s"Set ${CometConf.getExprAllowIncompatConfigKey("regexp")}=true " +
"to allow it anyway.")
return Incompatible()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
-- under the License.

-- Test regexp_replace() with regexp allowIncompatible enabled (happy path)
-- Config: spark.comet.regexp.allowIncompatible=true
-- Config: spark.comet.expression.regexp.allowIncompatible=true
-- ConfigMatrix: parquet.enable.dictionary=false,true

statement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
-- under the License.

-- Test RLIKE with regexp allowIncompatible enabled (happy path)
-- Config: spark.comet.regexp.allowIncompatible=true
-- Config: spark.comet.expression.regexp.allowIncompatible=true
-- ConfigMatrix: parquet.enable.dictionary=false,true

statement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
// add repetitive data to trigger dictionary encoding
Range(0, 100).map(_ => "John Smith")
withParquetFile(data.zipWithIndex, withDictionary) { file =>
withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
spark.read.parquet(file).createOrReplaceTempView(table)
val query = sql(s"select _2 as id, _1 rlike 'R[a-z]+s [Rr]ose' from $table")
checkSparkAnswerAndOperator(query)
Expand Down Expand Up @@ -996,7 +996,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
withTable(table) {
sql(s"create table $table(id int, name varchar(20)) using parquet")
sql(s"insert into $table values(1,'James Smith')")
withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
val query2 = sql(s"select id from $table where name rlike name")
val (_, cometPlan) = checkSparkAnswer(query2)
val explain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan)
Expand Down Expand Up @@ -1030,7 +1030,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
// "Smith$",
"Smith\\Z",
"Smith\\z")
withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
patterns.foreach { pattern =>
val query2 = sql(s"select name, '$pattern', name rlike '$pattern' from $table")
checkSparkAnswerAndOperator(query2)
Expand Down Expand Up @@ -1090,7 +1090,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
"\\V")
val qualifiers = Seq("", "+", "*", "?", "{1,}")

withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
// testing every possible combination takes too long, so we pick some
// random combinations
for (_ <- 0 until 100) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class CometFuzzIcebergSuite extends CometFuzzIcebergBase {
}

test("regexp_replace") {
withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
val df = spark.table(icebergTableName)
// We want to make sure that the schema generator wasn't modified to accidentally omit
// StringType, since then this test would not run any queries and silently pass.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
}

test("regexp_replace") {
withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
val df = spark.read.parquet(filename)
df.createOrReplaceTempView("t1")
// We want to make sure that the schema generator wasn't modified to accidentally omit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class CometParquetWriterSuite extends CometTestBase {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true",
CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true") {

writeWithCometNativeWriteExec(inputPath, outputPath)
Expand All @@ -70,7 +70,7 @@ class CometParquetWriterSuite extends CometTestBase {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true",
CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true") {

withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_datafusion") {
Expand Down Expand Up @@ -310,55 +310,6 @@ class CometParquetWriterSuite extends CometTestBase {
}
}

// ignored: native_comet scan is no longer supported
ignore("native write falls back when scan produces non-Arrow data") {
// This test verifies that when a native scan (like native_comet) doesn't support
// certain data types (complex types), the native write correctly falls back to Spark
// instead of failing at runtime with "Comet execution only takes Arrow Arrays" error.
withTempPath { dir =>
val inputPath = new File(dir, "input.parquet").getAbsolutePath
val outputPath = new File(dir, "output.parquet").getAbsolutePath

// Create data with complex types and write without Comet
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
val df = Seq((1, Seq(1, 2, 3)), (2, Seq(4, 5)), (3, Seq(6, 7, 8, 9)))
.toDF("id", "values")
df.write.parquet(inputPath)
}

// With native Parquet write enabled but using native_comet scan which doesn't
// support complex types, the scan falls back to Spark. The native write should
// detect this and also fall back to Spark instead of failing at runtime.
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true",
// Use native_comet which doesn't support complex types
CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_comet") {

val plan =
captureWritePlan(path => spark.read.parquet(inputPath).write.parquet(path), outputPath)

// Verify NO CometNativeWriteExec in the plan (should have fallen back to Spark)
val hasNativeWrite = plan.exists {
case _: CometNativeWriteExec => true
case d: DataWritingCommandExec =>
d.child.exists(_.isInstanceOf[CometNativeWriteExec])
case _ => false
}

assert(
!hasNativeWrite,
"Expected fallback to Spark write (no CometNativeWriteExec), but found native write " +
s"in plan:\n${plan.treeString}")

// Verify the data was written correctly
val result = spark.read.parquet(outputPath).collect()
assert(result.length == 3, "Expected 3 rows to be written")
}
}
}

test("parquet write complex types fuzz test") {
withTempPath { dir =>
val outputPath = new File(dir, "output.parquet").getAbsolutePath
Expand Down Expand Up @@ -517,7 +468,7 @@ class CometParquetWriterSuite extends CometTestBase {
withSQLConf(
CometConf.COMET_EXEC_ENABLED.key -> "true",
// enable experimental native writes
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true",
CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true",
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
// explicitly set scan impl to override CI defaults
CometConf.COMET_NATIVE_SCAN_IMPL.key -> "auto",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ object CometTPCDSMicroBenchmark extends CometTPCQueryBenchmarkBase {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true",
CometConf.getExprAllowIncompatConfigKey("regexp") -> "true",
// enabling COMET_EXPLAIN_NATIVE_ENABLED may add overhead but is useful for debugging
CometConf.COMET_EXPLAIN_NATIVE_ENABLED.key -> "false") {
cometSpark.sql(queryString).noop()
Expand Down
Loading