Skip to content

Commit 2eb06c9

Browse files
committed
feat: gate JVM UDF framework behind spark.comet.jvmUdf.enabled
Adds a master switch (default false) for the experimental JVM UDF framework so the Java regex engine cannot be activated without an explicit opt-in. With engine=java but jvmUdf.enabled=false, the six regex serdes return Unsupported with a message naming the master switch instead of silently using either path. Also extends Incompatible with optedInBy: Option[String] so a config (e.g. an engine selector) can serve as a per-expression incompatibility opt-in. Existing allowIncompatible flags continue to work; optedInBy is OR'd into the gating check in QueryPlanSerde. No existing serde uses optedInBy yet — this lays the foundation for the config simplification discussed in #4310.
1 parent f6b4096 commit 2eb06c9

19 files changed

Lines changed: 199 additions & 81 deletions

File tree

docs/source/user-guide/latest/compatibility/regex.md

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,17 @@ spark.comet.exec.regexp.engine=rust # default
2828
spark.comet.exec.regexp.engine=java # experimental
2929
```
3030

31+
The Java engine is built on Comet's experimental JVM UDF framework, which is disabled by default. To use
32+
the Java engine, both configs must be set:
33+
34+
```
35+
spark.comet.jvmUdf.enabled=true
36+
spark.comet.exec.regexp.engine=java
37+
```
38+
39+
If `spark.comet.exec.regexp.engine=java` is set without `spark.comet.jvmUdf.enabled=true`, the regex
40+
expressions fall back to Spark with an explanatory message.
41+
3142
## Choosing an engine
3243

3344
| | Rust engine | Java engine (experimental) |
@@ -47,8 +58,9 @@ spark.comet.expression.regexp.allowIncompatible=true
4758

4859
The **Java engine** is an experimental option for correctness-sensitive workloads. It evaluates expressions
4960
by passing Arrow vectors to a JVM-side UDF that uses `java.util.regex`, producing identical results to Spark
50-
for all patterns. Because it is experimental, the behavior, configuration, and supported expressions may
51-
change in future releases.
61+
for all patterns. Because the underlying JVM UDF framework is experimental, it is gated behind
62+
`spark.comet.jvmUdf.enabled=true`, and the behavior, configuration, and supported expressions may change in
63+
future releases.
5264

5365
## Why the engines differ
5466

spark/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,18 @@ object CometConf extends ShimCometConf {
381381
.booleanConf
382382
.createWithDefault(false)
383383

384+
val COMET_JVM_UDF_ENABLED: ConfigEntry[Boolean] =
385+
conf("spark.comet.jvmUdf.enabled")
386+
.category(CATEGORY_EXEC)
387+
.doc(
388+
"Master switch for the JVM UDF framework, which lets native execution call back into " +
389+
"the JVM to evaluate selected expressions for full Spark compatibility (at the cost " +
390+
"of a JNI round-trip per batch). The framework is experimental and may change in " +
391+
"future releases. Disabled by default; expressions that would otherwise route " +
392+
"through the JVM UDF bridge fall back to native or to Spark while this is false.")
393+
.booleanConf
394+
.createWithDefault(false)
395+
384396
val REGEXP_ENGINE_RUST = "rust"
385397
val REGEXP_ENGINE_JAVA = "java"
386398

@@ -390,10 +402,12 @@ object CometConf extends ShimCometConf {
390402
.doc(
391403
"Selects the engine used to evaluate supported regular-expression " +
392404
s"expressions. `$REGEXP_ENGINE_RUST` uses the native DataFusion regexp engine. " +
393-
s"`$REGEXP_ENGINE_JAVA` is experimental and routes through a JVM-side UDF " +
394-
"(java.util.regex.Pattern) for Spark-compatible semantics, at the cost of JNI " +
395-
"roundtrips per batch. Expressions routed when set to java: rlike, regexp_extract, " +
396-
"regexp_extract_all, regexp_replace, regexp_instr, and split.")
405+
s"`$REGEXP_ENGINE_JAVA` routes through a JVM-side UDF (java.util.regex.Pattern) " +
406+
"for Spark-compatible semantics, at the cost of JNI roundtrips per batch. The " +
407+
s"`$REGEXP_ENGINE_JAVA` engine additionally requires " +
408+
s"${COMET_JVM_UDF_ENABLED.key}=true and is experimental. Expressions routed when " +
409+
"set to java: rlike, regexp_extract, regexp_extract_all, regexp_replace, " +
410+
"regexp_instr, and split.")
397411
.stringConf
398412
.transform(_.toLowerCase(Locale.ROOT))
399413
.checkValues(Set(REGEXP_ENGINE_RUST, REGEXP_ENGINE_JAVA))
@@ -922,6 +936,15 @@ object CometConf extends ShimCometConf {
922936
getBooleanConf(getExprEnabledConfigKey(name), defaultValue = true, conf)
923937
}
924938

939+
/**
940+
* True when the user has selected the experimental Java regexp engine AND opted into the JVM
941+
* UDF framework. Both must be set; otherwise regex expressions should not route through the JVM
942+
* UDF bridge.
943+
*/
944+
def isJavaRegexpEngineActive(conf: SQLConf = SQLConf.get): Boolean = {
945+
COMET_REGEXP_ENGINE.get(conf) == REGEXP_ENGINE_JAVA && COMET_JVM_UDF_ENABLED.get(conf)
946+
}
947+
925948
def getExprEnabledConfigKey(name: String): String = {
926949
s"${CometConf.COMET_EXPR_CONFIG_PREFIX}.$name.enabled"
927950
}

spark/src/main/scala/org/apache/comet/GenerateDocs.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ object GenerateDocs {
303303
annotations += ((fromTypeName, toTypeName, note.trim.replace("(10,2)", "")))
304304
}
305305
"C"
306-
case Incompatible(notes) =>
306+
case Incompatible(notes, _) =>
307307
notes.filter(_.trim.nonEmpty).foreach { note =>
308308
annotations += ((fromTypeName, toTypeName, note.trim.replace("(10,2)", "")))
309309
}

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,7 @@ case class CometExecRule(session: SparkSession)
709709
case Unsupported(notes) =>
710710
withInfo(op, notes.getOrElse(""))
711711
false
712-
case Incompatible(notes) =>
712+
case Incompatible(notes, _) =>
713713
val allowIncompat = CometConf.isOperatorAllowIncompat(opName)
714714
val incompatConf = CometConf.getOperatorAllowIncompatConfigKey(opName)
715715
if (allowIncompat) {

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -530,23 +530,29 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
530530
case Unsupported(notes) =>
531531
withInfo(fn, notes.getOrElse(""))
532532
None
533-
case Incompatible(notes) =>
533+
case Incompatible(notes, optedInBy) =>
534534
val exprAllowIncompat = CometConf.isExprAllowIncompat(exprConfName)
535-
if (exprAllowIncompat) {
535+
val namedConfOptIn = optedInBy.exists(isOptedInVia)
536+
if (exprAllowIncompat || namedConfOptIn) {
536537
if (notes.isDefined) {
537-
logWarning(
538-
s"Comet supports $fn when " +
539-
s"${CometConf.getExprAllowIncompatConfigKey(exprConfName)}=true " +
540-
s"but has notes: ${notes.get}")
538+
val optInDesc = if (namedConfOptIn) {
539+
optedInBy.get
540+
} else {
541+
s"${CometConf.getExprAllowIncompatConfigKey(exprConfName)}=true"
542+
}
543+
logWarning(s"Comet supports $fn when $optInDesc but has notes: ${notes.get}")
541544
}
542545
aggHandler.convert(aggExpr, fn, inputs, binding, conf)
543546
} else {
544547
val optionalNotes = notes.map(str => s" ($str)").getOrElse("")
548+
val extraOptIn = optedInBy
549+
.map(kv => s" or by setting $kv")
550+
.getOrElse("")
545551
withInfo(
546552
fn,
547553
s"$fn is not fully compatible with Spark$optionalNotes. To enable it anyway, " +
548-
s"set ${CometConf.getExprAllowIncompatConfigKey(exprConfName)}=true. " +
549-
s"${CometConf.COMPAT_GUIDE}.")
554+
s"set ${CometConf.getExprAllowIncompatConfigKey(exprConfName)}=true" +
555+
s"$extraOptIn. ${CometConf.COMPAT_GUIDE}.")
550556
None
551557
}
552558
case Compatible(notes) =>
@@ -622,6 +628,21 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
622628
exprToProtoInternal(newExpr, inputs, binding)
623629
}
624630

631+
/**
632+
* True when the current SQLConf has the named config set to the given value. The argument is a
633+
* `key=value` string used by `Incompatible.optedInBy` to declare which config opts the user
634+
* into running an otherwise-incompatible expression. The configured value is compared
635+
* case-insensitively after splitting on the first `=`.
636+
*/
637+
private def isOptedInVia(keyEqualsValue: String): Boolean = {
638+
keyEqualsValue.split("=", 2) match {
639+
case Array(key, expected) =>
640+
Option(SQLConf.get.getConfString(key, null))
641+
.exists(_.equalsIgnoreCase(expected))
642+
case _ => false
643+
}
644+
}
645+
625646
/**
626647
* Convert a Spark expression to a protocol-buffer representation of a native Comet/DataFusion
627648
* expression.
@@ -655,23 +676,29 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
655676
case Unsupported(notes) =>
656677
withInfo(expr, notes.getOrElse(""))
657678
None
658-
case Incompatible(notes) =>
679+
case Incompatible(notes, optedInBy) =>
659680
val exprAllowIncompat = CometConf.isExprAllowIncompat(exprConfName)
660-
if (exprAllowIncompat) {
681+
val namedConfOptIn = optedInBy.exists(isOptedInVia)
682+
if (exprAllowIncompat || namedConfOptIn) {
661683
if (notes.isDefined) {
662-
logWarning(
663-
s"Comet supports $expr when " +
664-
s"${CometConf.getExprAllowIncompatConfigKey(exprConfName)}=true " +
665-
s"but has notes: ${notes.get}")
684+
val optInDesc = if (namedConfOptIn) {
685+
optedInBy.get
686+
} else {
687+
s"${CometConf.getExprAllowIncompatConfigKey(exprConfName)}=true"
688+
}
689+
logWarning(s"Comet supports $expr when $optInDesc but has notes: ${notes.get}")
666690
}
667691
handler.convert(expr, inputs, binding)
668692
} else {
669693
val optionalNotes = notes.map(str => s" ($str)").getOrElse("")
694+
val extraOptIn = optedInBy
695+
.map(kv => s" or by setting $kv")
696+
.getOrElse("")
670697
withInfo(
671698
expr,
672699
s"$expr is not fully compatible with Spark$optionalNotes. To enable it anyway, " +
673-
s"set ${CometConf.getExprAllowIncompatConfigKey(exprConfName)}=true. " +
674-
s"${CometConf.COMPAT_GUIDE}.")
700+
s"set ${CometConf.getExprAllowIncompatConfigKey(exprConfName)}=true" +
701+
s"$extraOptIn. ${CometConf.COMPAT_GUIDE}.")
675702
None
676703
}
677704
case Compatible(notes) =>

spark/src/main/scala/org/apache/comet/serde/SupportLevel.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,17 @@ case class Compatible(notes: Option[String] = None) extends SupportLevel
3737
*
3838
* Any compatibility differences are noted in the
3939
* [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]].
40+
*
41+
* @param notes
42+
* Optional human-readable notes about the incompatibility.
43+
* @param optedInBy
44+
* Optional `key=value` pair naming a SQLConf entry that, when set to `value`, opts the user
45+
* into running this expression despite the incompatibility, in addition to the per-expression
46+
* `spark.comet.expression.<Name>.allowIncompatible` flag. Use this when a broader config (for
47+
* example, an engine selector) already encodes the user's acceptance of the trade-off.
4048
*/
41-
case class Incompatible(notes: Option[String] = None) extends SupportLevel
49+
case class Incompatible(notes: Option[String] = None, optedInBy: Option[String] = None)
50+
extends SupportLevel
4251

4352
/** Comet does not support this feature */
4453
case class Unsupported(notes: Option[String] = None) extends SupportLevel

0 commit comments

Comments
 (0)