Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 1 addition & 26 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ object CometConf extends ShimCometConf {
private val TRACING_GUIDE = "For more information, refer to the Comet Tracing " +
"Guide (https://datafusion.apache.org/comet/contributor-guide/tracing.html)"

private val DEBUGGING_GUIDE = "For more information, refer to the Comet Debugging " +
"Guide (https://datafusion.apache.org/comet/contributor-guide/debugging.html)"

/** List of all configs that is used for generating documentation */
val allConfs = new ListBuffer[ConfigEntry[_]]

Expand Down Expand Up @@ -112,7 +109,7 @@ object CometConf extends ShimCometConf {
"feature is highly experimental and only partially implemented. It should not " +
"be used in production.")
.booleanConf
.createWithDefault(false)
.createWithEnvVarOrDefault("ENABLE_COMET_WRITE", false)

// Deprecated: native_comet uses mutable buffers incompatible with Arrow FFI best practices
// and does not support complex types. Use native_iceberg_compat or auto instead.
Expand Down Expand Up @@ -552,13 +549,6 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_DEBUG_MEMORY_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_PREFIX.debug.memory")
.category(CATEGORY_TESTING)
.doc(s"When enabled, log all native memory pool interactions. $DEBUGGING_GUIDE.")
.booleanConf
.createWithDefault(false)

val COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE = "verbose"
val COMET_EXTENDED_EXPLAIN_FORMAT_FALLBACK = "fallback"

Expand Down Expand Up @@ -795,14 +785,6 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.regexp.allowIncompatible")
.category(CATEGORY_EXEC)
.doc("Comet is not currently fully compatible with Spark for all regular expressions. " +
s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
.booleanConf
.createWithDefault(false)
Comment on lines -798 to -804
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the config here will remove it from the configuration guide, but maybe that is fine, so long as we are explaining it in the compatibility guide.


val COMET_METRICS_UPDATE_INTERVAL: ConfigEntry[Long] =
conf("spark.comet.metrics.updateInterval")
.category(CATEGORY_EXEC)
Expand All @@ -821,13 +803,6 @@ object CometConf extends ShimCometConf {
.stringConf
.createOptional

val COMET_MAX_TEMP_DIRECTORY_SIZE: ConfigEntry[Long] =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of these configs are used directly from native code. We add them here so that they get documented.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in spark_config.rs:

pub(crate) const COMET_TRACING_ENABLED: &str = "spark.comet.tracing.enabled";
pub(crate) const COMET_DEBUG_ENABLED: &str = "spark.comet.debug.enabled";
pub(crate) const COMET_EXPLAIN_NATIVE_ENABLED: &str = "spark.comet.explain.native.enabled";
pub(crate) const COMET_MAX_TEMP_DIRECTORY_SIZE: &str = "spark.comet.maxTempDirectorySize";
pub(crate) const COMET_DEBUG_MEMORY: &str = "spark.comet.debug.memory";

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ic, I'll add comments!

conf("spark.comet.maxTempDirectorySize")
.category(CATEGORY_EXEC)
.doc("The maximum amount of data (in bytes) stored inside the temporary directories.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(100L * 1024 * 1024 * 1024) // 100 GB

val COMET_RESPECT_DATAFUSION_CONFIGS: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.respectDataFusionConfigs")
.category(CATEGORY_TESTING)
Expand Down
2 changes: 1 addition & 1 deletion dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ index 18123a4d6ec..fbe4c766eee 100644
- regexp_extract($"a", "(\\d+)-(\\d+)", 1)),
- Row("num-num", "300", "100") :: Row("num-num", "400", "100") ::
- Row("num-num", "400-400", "100") :: Nil)
+ withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
+ val df = Seq(
+ ("100-200", "(\\d+)-(\\d+)", "300"),
+ ("100-200", "(\\d+)-(\\d+)", "400"),
Expand Down
2 changes: 1 addition & 1 deletion dev/diffs/3.5.8.diff
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ index fa1a64460fc..1d2e215d6a3 100644
- ("100-200", "(\\d+)-(\\d+)", "300"),
- ("100-200", "(\\d+)-(\\d+)", "400"),
- ("100-200", "(\\d+)", "400")).toDF("a", "b", "c")
+ withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
+ val df = Seq(
+ ("100-200", "(\\d+)-(\\d+)", "300"),
+ ("100-200", "(\\d+)-(\\d+)", "400"),
Expand Down
2 changes: 1 addition & 1 deletion dev/diffs/4.0.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1165,7 +1165,7 @@ index 0df7f806272..52d33d67328 100644
- ("100-200", "(\\d+)-(\\d+)", "300"),
- ("100-200", "(\\d+)-(\\d+)", "400"),
- ("100-200", "(\\d+)", "400")).toDF("a", "b", "c")
+ withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
+ val df = Seq(
+ ("100-200", "(\\d+)-(\\d+)", "300"),
+ ("100-200", "(\\d+)-(\\d+)", "400"),
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/latest/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ the [Comet Supported Expressions Guide](expressions.md) for more information on

Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's
regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but
this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`.
this can be overridden by setting `spark.comet.expression.regexp.allowIncompatible=true`.

## Window Functions

Expand Down
8 changes: 4 additions & 4 deletions spark/src/main/scala/org/apache/comet/serde/strings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,11 @@ object CometRLike extends CometExpressionSerde[RLike] {
expr.right match {
case Literal(pattern, DataTypes.StringType) =>
if (!RegExp.isSupportedPattern(pattern.toString) &&
!CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) {
!CometConf.isExprAllowIncompat("regexp")) {
withInfo(
expr,
s"Regexp pattern $pattern is not compatible with Spark. " +
s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true " +
s"Set ${CometConf.getExprAllowIncompatConfigKey("regexp")}=true " +
"to allow it anyway.")
None
} else {
Expand Down Expand Up @@ -298,11 +298,11 @@ object CometStringLPad extends CometExpressionSerde[StringLPad] {
object CometRegExpReplace extends CometExpressionSerde[RegExpReplace] {
override def getSupportLevel(expr: RegExpReplace): SupportLevel = {
if (!RegExp.isSupportedPattern(expr.regexp.toString) &&
!CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) {
!CometConf.isExprAllowIncompat("regexp")) {
withInfo(
expr,
s"Regexp pattern ${expr.regexp} is not compatible with Spark. " +
s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true " +
s"Set ${CometConf.getExprAllowIncompatConfigKey("regexp")}=true " +
"to allow it anyway.")
return Incompatible()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
-- under the License.

-- Test regexp_replace() with regexp allowIncompatible enabled (happy path)
-- Config: spark.comet.regexp.allowIncompatible=true
-- Config: spark.comet.expression.regexp.allowIncompatible=true
-- ConfigMatrix: parquet.enable.dictionary=false,true

statement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
-- under the License.

-- Test RLIKE with regexp allowIncompatible enabled (happy path)
-- Config: spark.comet.regexp.allowIncompatible=true
-- Config: spark.comet.expression.regexp.allowIncompatible=true
-- ConfigMatrix: parquet.enable.dictionary=false,true

statement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
// add repetitive data to trigger dictionary encoding
Range(0, 100).map(_ => "John Smith")
withParquetFile(data.zipWithIndex, withDictionary) { file =>
withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
spark.read.parquet(file).createOrReplaceTempView(table)
val query = sql(s"select _2 as id, _1 rlike 'R[a-z]+s [Rr]ose' from $table")
checkSparkAnswerAndOperator(query)
Expand Down Expand Up @@ -996,7 +996,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
withTable(table) {
sql(s"create table $table(id int, name varchar(20)) using parquet")
sql(s"insert into $table values(1,'James Smith')")
withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
val query2 = sql(s"select id from $table where name rlike name")
val (_, cometPlan) = checkSparkAnswer(query2)
val explain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan)
Expand Down Expand Up @@ -1030,7 +1030,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
// "Smith$",
"Smith\\Z",
"Smith\\z")
withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
patterns.foreach { pattern =>
val query2 = sql(s"select name, '$pattern', name rlike '$pattern' from $table")
checkSparkAnswerAndOperator(query2)
Expand Down Expand Up @@ -1090,7 +1090,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
"\\V")
val qualifiers = Seq("", "+", "*", "?", "{1,}")

withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
// testing every possible combination takes too long, so we pick some
// random combinations
for (_ <- 0 until 100) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class CometFuzzIcebergSuite extends CometFuzzIcebergBase {
}

test("regexp_replace") {
withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
val df = spark.table(icebergTableName)
// We want to make sure that the schema generator wasn't modified to accidentally omit
// StringType, since then this test would not run any queries and silently pass.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
}

test("regexp_replace") {
withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
val df = spark.read.parquet(filename)
df.createOrReplaceTempView("t1")
// We want to make sure that the schema generator wasn't modified to accidentally omit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ object CometTPCDSMicroBenchmark extends CometTPCQueryBenchmarkBase {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true",
CometConf.getExprAllowIncompatConfigKey("regexp") -> "true",
// enabling COMET_EXPLAIN_NATIVE_ENABLED may add overhead but is useful for debugging
CometConf.COMET_EXPLAIN_NATIVE_ENABLED.key -> "false") {
cometSpark.sql(queryString).noop()
Expand Down
Loading