Skip to content

Commit 2a43867

Browse files
committed
refactor: introduce REGEXP_ENGINE_RUST/REGEXP_ENGINE_JAVA constants
Replace string literals "rust"/"java" used for the regexp engine selector with named constants on CometConf. Tighten CometRLike.getSupportLevel so it only reports Compatible(None) when the pattern is a Literal, matching the actual constraint enforced by the convert path.
1 parent 7d0f25c commit 2a43867

4 files changed

Lines changed: 149 additions & 37 deletions

File tree

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -383,19 +383,22 @@ object CometConf extends ShimCometConf {
383383
.booleanConf
384384
.createWithDefault(false)
385385

386+
val REGEXP_ENGINE_RUST = "rust"
387+
val REGEXP_ENGINE_JAVA = "java"
388+
386389
val COMET_REGEXP_ENGINE: ConfigEntry[String] =
387390
conf("spark.comet.exec.regexp.engine")
388391
.category(CATEGORY_EXEC)
389392
.doc(
390393
"Experimental. Selects the engine used to evaluate supported regular-expression " +
391-
"expressions. `rust` uses the native DataFusion regexp engine. `java` routes through " +
392-
"a JVM-side UDF (java.util.regex.Pattern) for Spark-compatible semantics, at the " +
393-
"cost of JNI roundtrips per batch. Only RLike is routed today; additional " +
394-
"expressions may opt in over time.")
394+
s"expressions. `$REGEXP_ENGINE_RUST` uses the native DataFusion regexp engine. " +
395+
s"`$REGEXP_ENGINE_JAVA` routes through a JVM-side UDF (java.util.regex.Pattern) for " +
396+
"Spark-compatible semantics, at the cost of JNI roundtrips per batch. Only RLike " +
397+
"is routed today; additional expressions may opt in over time.")
395398
.stringConf
396399
.transform(_.toLowerCase(Locale.ROOT))
397-
.checkValues(Set("rust", "java"))
398-
.createWithDefault("rust")
400+
.checkValues(Set(REGEXP_ENGINE_RUST, REGEXP_ENGINE_JAVA))
401+
.createWithDefault(REGEXP_ENGINE_RUST)
399402

400403
val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
401404
conf("spark.comet.native.shuffle.partitioning.hash.enabled")

spark/src/main/scala/org/apache/comet/serde/strings.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -267,15 +267,18 @@ object CometRLike extends CometExpressionSerde[RLike] {
267267
"Uses Rust regexp engine, which has different behavior to Java regexp engine")
268268

269269
override def getSupportLevel(expr: RLike): SupportLevel = {
270-
if (CometConf.COMET_REGEXP_ENGINE.get() == "java") {
271-
Compatible(None)
270+
if (CometConf.COMET_REGEXP_ENGINE.get() == CometConf.REGEXP_ENGINE_JAVA) {
271+
expr.right match {
272+
case _: Literal => Compatible(None)
273+
case _ => Unsupported(Some("Only scalar regexp patterns are supported"))
274+
}
272275
} else {
273276
super.getSupportLevel(expr)
274277
}
275278
}
276279

277280
override def convert(expr: RLike, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = {
278-
if (CometConf.COMET_REGEXP_ENGINE.get() == "java") {
281+
if (CometConf.COMET_REGEXP_ENGINE.get() == CometConf.REGEXP_ENGINE_JAVA) {
279282
convertViaJvmUdf(expr, inputs, binding)
280283
} else {
281284
convertViaNativeRegex(expr, inputs, binding)

spark/src/test/scala/org/apache/comet/CometRegExpJvmSuite.scala

Lines changed: 132 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,52 +21,157 @@ package org.apache.comet
2121

2222
import org.apache.spark.SparkConf
2323
import org.apache.spark.sql.CometTestBase
24-
import org.apache.spark.sql.comet.CometProjectExec
24+
import org.apache.spark.sql.comet.{CometFilterExec, CometProjectExec}
2525
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
2626

2727
class CometRegExpJvmSuite extends CometTestBase with AdaptiveSparkPlanHelper {
2828

2929
override protected def sparkConf: SparkConf =
30-
super.sparkConf.set("spark.comet.exec.regexp.engine", "java")
30+
super.sparkConf.set(CometConf.COMET_REGEXP_ENGINE.key, CometConf.REGEXP_ENGINE_JAVA)
3131

32-
test("rlike: Java regex semantics, with flag on") {
32+
// Patterns that the Rust regex crate cannot handle. Using one of these proves
33+
// the JVM path was taken: if the pattern reached native, native would have
34+
// rejected it and the operator would not be Comet.
35+
private val backreference = "^(\\\\w)\\\\1$"
36+
private val lookahead = "foo(?=bar)"
37+
private val lookbehind = "(?<=foo)bar"
38+
private val embeddedFlags = "(?i)foo"
39+
private val namedGroup = "(?<digit>\\\\d)"
40+
41+
private def withSubjects(values: String*)(f: => Unit): Unit = {
3342
withTable("t") {
3443
sql("CREATE TABLE t (s STRING) USING parquet")
35-
sql("INSERT INTO t VALUES ('abc123'), ('no digits'), (NULL), ('mixed_42_data')")
44+
val rows = values
45+
.map(v => if (v == null) "(NULL)" else s"('${v.replace("'", "''")}')")
46+
.mkString(", ")
47+
sql(s"INSERT INTO t VALUES $rows")
48+
f
49+
}
50+
}
51+
52+
test("rlike: projection produces Java regex semantics with null handling") {
53+
withSubjects("abc123", "no digits", null, "mixed_42_data") {
3654
val df = sql("SELECT s, s rlike '\\\\d+' AS m FROM t")
37-
val rows = df.collect().map { r =>
38-
val matched: Any = if (r.isNullAt(1)) null else r.getBoolean(1)
39-
(Option(r.getString(0)), matched)
40-
}
55+
checkSparkAnswerAndOperator(df)
56+
}
57+
}
58+
59+
test("rlike: predicate filters rows using Java regex semantics") {
60+
withSubjects("abc123", "no digits", null, "mixed_42_data") {
61+
val df = sql("SELECT s FROM t WHERE s rlike '\\\\d+'")
62+
checkSparkAnswerAndOperator(df)
63+
}
64+
}
65+
66+
test("rlike: backreference in projection (Java-only construct)") {
67+
withSubjects("aa", "ab", "xyzzy", null) {
68+
val df = sql(s"SELECT s, s rlike '$backreference' FROM t")
69+
checkSparkAnswerAndOperator(df)
70+
val plan = df.queryExecution.executedPlan
4171
assert(
42-
rows.toSet === Set(
43-
(Some("abc123"), true),
44-
(Some("no digits"), false),
45-
(None, null),
46-
(Some("mixed_42_data"), true)))
72+
collect(plan) { case p: CometProjectExec => p }.nonEmpty,
73+
s"Expected CometProjectExec in:\n$plan")
4774
}
4875
}
4976

50-
test("rlike: plan contains CometProjectExec when flag is on") {
51-
withTable("t") {
52-
sql("CREATE TABLE t (s STRING) USING parquet")
53-
sql("INSERT INTO t VALUES ('a1')")
54-
val df = sql("SELECT s rlike '[a-z]\\\\d' FROM t")
55-
df.collect()
56-
val cometProjects = collect(df.queryExecution.executedPlan) { case p: CometProjectExec =>
57-
p
58-
}
77+
test("rlike: backreference in predicate (Java-only construct)") {
78+
withSubjects("aa", "ab", "xyzzy", null) {
79+
val df = sql(s"SELECT s FROM t WHERE s rlike '$backreference'")
80+
checkSparkAnswerAndOperator(df)
81+
val plan = df.queryExecution.executedPlan
82+
assert(
83+
collect(plan) { case f: CometFilterExec => f }.nonEmpty,
84+
s"Expected CometFilterExec in:\n$plan")
85+
}
86+
}
87+
88+
test("rlike: lookahead pattern (Java-only construct)") {
89+
withSubjects("foobar", "foobaz", "barfoo", null) {
90+
checkSparkAnswerAndOperator(sql(s"SELECT s, s rlike '$lookahead' FROM t"))
91+
checkSparkAnswerAndOperator(sql(s"SELECT s FROM t WHERE s rlike '$lookahead'"))
92+
}
93+
}
94+
95+
test("rlike: lookbehind pattern (Java-only construct)") {
96+
withSubjects("foobar", "barbar", "foofoo", null) {
97+
checkSparkAnswerAndOperator(sql(s"SELECT s, s rlike '$lookbehind' FROM t"))
98+
}
99+
}
100+
101+
test("rlike: embedded case-insensitive flag (Java-only construct)") {
102+
withSubjects("FOO", "foo", "fOO", "bar") {
103+
checkSparkAnswerAndOperator(sql(s"SELECT s, s rlike '$embeddedFlags' FROM t"))
104+
}
105+
}
106+
107+
test("rlike: named groups (Java-only construct)") {
108+
withSubjects("a1", "ab", "9z", null) {
109+
checkSparkAnswerAndOperator(sql(s"SELECT s, s rlike '$namedGroup' FROM t"))
110+
}
111+
}
112+
113+
test("rlike: empty pattern matches every non-null row") {
114+
withSubjects("abc", "", null) {
115+
checkSparkAnswerAndOperator(sql("SELECT s, s rlike '' FROM t"))
116+
}
117+
}
118+
119+
test("rlike: empty subject string is handled correctly") {
120+
withSubjects("", "x", null) {
121+
checkSparkAnswerAndOperator(sql("SELECT s, s rlike '^$' FROM t"))
122+
}
123+
}
124+
125+
test("rlike: all-null subject column produces all-null result") {
126+
withSubjects(null, null, null) {
127+
checkSparkAnswerAndOperator(sql("SELECT s rlike '\\\\d+' FROM t"))
128+
}
129+
}
130+
131+
test("rlike: null literal pattern falls back to Spark") {
132+
withSubjects("a", "b", null) {
133+
// Convert path rejects Literal(null) pattern; query must still produce
134+
// Spark-compatible all-null output via the Spark fallback.
135+
checkSparkAnswer(sql("SELECT s rlike CAST(NULL AS STRING) FROM t"))
136+
}
137+
}
138+
139+
test("rlike: invalid pattern falls back to Spark") {
140+
withSubjects("a") {
141+
// Convert path catches PatternSyntaxException at planning time; Spark's
142+
// own RLike runs and throws its native error.
143+
val ex = intercept[Throwable](sql("SELECT s rlike '[' FROM t").collect())
59144
assert(
60-
cometProjects.nonEmpty,
61-
s"Expected at least one CometProjectExec in:\n${df.queryExecution.executedPlan}")
145+
ex.getMessage.toLowerCase.contains("regex") ||
146+
ex.getMessage.contains("PatternSyntax") ||
147+
ex.getMessage.contains("Unclosed"),
148+
s"Unexpected error: ${ex.getMessage}")
149+
}
150+
}
151+
152+
test("rlike: combines with filter, projection, and aggregate") {
153+
withTable("t") {
154+
sql("CREATE TABLE t (s STRING, k INT) USING parquet")
155+
sql("""INSERT INTO t VALUES
156+
| ('aa', 1), ('ab', 1), ('aa', 2), ('xyzzy', 2), ('aa', 3), (NULL, 3)""".stripMargin)
157+
val df = sql(s"""SELECT k, COUNT(*) AS c
158+
|FROM t
159+
|WHERE s rlike '$backreference'
160+
|GROUP BY k
161+
|ORDER BY k""".stripMargin)
162+
checkSparkAnswerAndOperator(df)
62163
}
63164
}
64165

65-
test("rlike: result matches Spark for arbitrary patterns") {
166+
test("rlike: many rows spanning multiple batches") {
66167
withTable("t") {
67168
sql("CREATE TABLE t (s STRING) USING parquet")
68-
sql("INSERT INTO t VALUES ('123'), ('abc'), ('mix3d')")
69-
checkSparkAnswer(sql("SELECT s, s rlike '\\\\d+' FROM t"))
169+
val values = (0 until 5000)
170+
.map(i => if (i % 7 == 0) "(NULL)" else s"('row_${i}_aa')")
171+
.mkString(", ")
172+
sql(s"INSERT INTO t VALUES $values")
173+
checkSparkAnswerAndOperator(sql(s"SELECT s, s rlike '$backreference' FROM t"))
174+
checkSparkAnswerAndOperator(sql(s"SELECT s FROM t WHERE s rlike '$backreference'"))
70175
}
71176
}
72177
}

spark/src/test/scala/org/apache/spark/sql/benchmark/CometRegExpBenchmark.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ object CometRegExpBenchmark extends CometBenchmarkBase {
110110
}
111111

112112
benchmark.addCase("Comet (Exec, JVM regex)") { _ =>
113-
val configs = baseExec ++ Map(CometConf.COMET_REGEXP_ENGINE.key -> "java")
113+
val configs =
114+
baseExec ++ Map(CometConf.COMET_REGEXP_ENGINE.key -> CometConf.REGEXP_ENGINE_JAVA)
114115
withSQLConf(configs.toSeq: _*) {
115116
spark.sql(query).noop()
116117
}

0 commit comments

Comments
 (0)