Skip to content

Commit bec171e

Browse files
committed
refactor: route regex expressions through codegen dispatcher instead of hand-written UDFs
Replace the six hand-written `RegExp*UDF` / `StringSplitUDF` JVM UDF implementations with the Arrow-direct codegen dispatcher introduced in PR apache#4417 (`CometScalaUDF.emitJvmCodegenDispatch`). The dispatcher Janino-compiles Spark's own `doGenCode` for the expression, so the regex family inherits Spark-identical semantics with no per-expression glue code. Changes: - Delete `spark/src/main/scala/org/apache/comet/udf/RegExp*UDF.scala` and `StringSplitUDF.scala`. Their behavior is now provided by Spark's `doGenCode` running inside the dispatcher. - Rewrite the regex serdes in `strings.scala`. Expressions with no native Rust path (`RegExpExtract`, `RegExpExtractAll`, `RegExpInStr`) share a new `CometRegexpCodegenOnly` base; expressions with a native path (`RLike`, `RegExpReplace`, `StringSplit`) keep an explicit route table where the JVM arm now delegates to `CometScalaUDF.emitJvmCodegenDispatch`. - Drop the `spark.comet.jvmUdf.enabled` config. The codegen dispatcher already has its own master switch (`spark.comet.exec.scalaUDF.codegen.enabled`); gating the regex family on the same flag avoids two flags for the same path. `spark.comet.exec.regexp.engine` keeps the `java`/`rust` selector semantics, and `engine=java` now requires the codegen flag. - Revert the native Rust additions in `jvm_udf/mod.rs` and `jni-bridge/src/lib.rs`. The codegen dispatcher constructs Arrow output fields JVM-side via `CometBatchKernelCodegenOutput.toFfiArrowField`, so the list-vector field-name normalization cast is unnecessary. - Update `CometRegExpJvmSuite`, `CometRegExpBenchmark`, the regex SQL test fixtures, and the regex compatibility doc to reflect the new gating. Test plan: - `CometRegExpJvmSuite`: 45/45 pass (covers all six regex expressions through the codegen dispatcher). - `CometSqlFileTestSuite`: 289/289 pass. - `CometStringExpressionSuite`: 33/33 pass. - `CometCodegenSuite`: 60/60 pass. - `cargo clippy --all-targets --workspace -- -D warnings`: clean.
1 parent 29428e5 commit bec171e

19 files changed

Lines changed: 121 additions & 952 deletions

File tree

docs/source/user-guide/latest/compatibility/regex.md

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,20 @@ under the License.
2020
# Regular Expressions
2121

2222
Comet provides two regexp engines for evaluating regular expressions: a **Rust engine** that uses the Rust
23-
[`regex`] crate natively, and an experimental **Java engine** that calls back into the JVM via Comet's JVM
24-
UDF framework. The engine is selected with `spark.comet.exec.regexp.engine`, which accepts:
23+
[`regex`] crate natively, and an experimental **Java engine** that runs Spark's own `doGenCode` for the
24+
expression inside Comet's Arrow-direct codegen dispatcher (the same dispatcher used by Comet's
25+
`ScalaUDF` codegen path). The engine is selected with `spark.comet.exec.regexp.engine`, which accepts:
2526

2627
- `java` (default) — route through the Java engine for full Spark compatibility. Requires
27-
`spark.comet.jvmUdf.enabled=true`; otherwise regex expressions fall back to Spark with an explanatory
28-
message.
28+
`spark.comet.exec.scalaUDF.codegen.enabled=true`; otherwise regex expressions fall back to Spark with
29+
an explanatory message.
2930
- `rust` — run the Rust engine when an expression has a native implementation. Setting this is itself
3031
the opt-in for the semantic differences between Java and Rust regex (no separate `allowIncompatible`
3132
flag needed). Expressions without a native Rust implementation (`regexp_extract`,
3233
`regexp_extract_all`, `regexp_instr`) fall back to Spark.
3334

34-
The JVM UDF framework is experimental and disabled by default. With pure defaults
35-
(`engine=java`, `jvmUdf.enabled=false`), all regex expressions fall back to Spark.
35+
The codegen dispatcher is experimental and disabled by default. With pure defaults
36+
(`engine=java`, `scalaUDF.codegen.enabled=false`), all regex expressions fall back to Spark.
3637

3738
## Choosing an engine
3839

@@ -47,9 +48,9 @@ The **Rust engine** is faster but cannot match Java regex semantics for every pa
4748
choice is itself the opt-in, setting `spark.comet.exec.regexp.engine=rust` declares acceptance of those
4849
differences without a separate per-expression flag.
4950

50-
The **Java engine** is the default but the underlying JVM UDF framework is experimental and gated behind
51-
`spark.comet.jvmUdf.enabled=true`; the behavior, configuration, and supported expressions may change in
52-
future releases.
51+
The **Java engine** is the default but the underlying codegen dispatcher is experimental and gated behind
52+
`spark.comet.exec.scalaUDF.codegen.enabled=true`; the behavior, configuration, and supported expressions
53+
may change in future releases.
5354

5455
## Why the engines differ
5556

native/jni-bridge/src/lib.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,7 @@ pub struct JVMClasses<'a> {
231231
/// acquire & release native memory.
232232
pub comet_task_memory_manager: CometTaskMemoryManager<'a>,
233233
/// The CometUdfBridge class used to dispatch JVM scalar UDFs.
234-
/// `None` if the class is not on the classpath; the JVM-UDF dispatch path
235-
/// reports a clear error rather than crashing executor init.
234+
/// `None` if the class is not on the classpath.
236235
pub comet_udf_bridge: Option<CometUdfBridge<'a>>,
237236
}
238237

@@ -305,9 +304,6 @@ impl JVMClasses<'_> {
305304
comet_shuffle_block_iterator: CometShuffleBlockIterator::new(env).unwrap(),
306305
comet_task_memory_manager: CometTaskMemoryManager::new(env).unwrap(),
307306
comet_udf_bridge: {
308-
// Optional: if the bridge class is absent (e.g. comet shading
309-
// dropped org.apache.comet.udf.*), record None and clear the
310-
// pending JVM exception so other JNI calls keep working.
311307
let bridge = CometUdfBridge::new(env).ok();
312308
if env.exception_check() {
313309
env.exception_clear();

native/spark-expr/src/jvm_udf/mod.rs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,7 @@ impl PhysicalExpr for JvmScalarUdfExpr {
179179
let bridge = JVMClasses::get().comet_udf_bridge.as_ref().ok_or_else(|| {
180180
CometError::from(ExecutionError::GeneralError(
181181
"JVM UDF bridge unavailable: org.apache.comet.udf.CometUdfBridge \
182-
class was not found on the JVM classpath. Set \
183-
spark.comet.exec.regexp.engine=rust to disable this path."
182+
class was not found on the JVM classpath."
184183
.to_string(),
185184
))
186185
})?;
@@ -244,19 +243,7 @@ impl PhysicalExpr for JvmScalarUdfExpr {
244243
// exactly once when the Box drops at end of scope.
245244
let result_data = unsafe { from_ffi(*out_array, &out_schema) }
246245
.map_err(|e| CometError::Arrow { source: e })?;
247-
let result_array = make_array(result_data);
248-
249-
// The JVM may produce arrays with different field names (e.g. Arrow Java's
250-
// ListVector uses "$data$" for child fields) than what DataFusion expects
251-
// (e.g. "item"). Cast to the declared return_type to normalize schema.
252-
let result_array = if result_array.data_type() != &self.return_type {
253-
arrow::compute::cast(&result_array, &self.return_type)
254-
.map_err(|e| CometError::Arrow { source: e })?
255-
} else {
256-
result_array
257-
};
258-
259-
Ok(ColumnarValue::Array(result_array))
246+
Ok(ColumnarValue::Array(make_array(result_data)))
260247
}
261248

262249
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {

spark/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -362,15 +362,15 @@ object CometConf extends ShimCometConf {
362362
.booleanConf
363363
.createWithDefault(false)
364364

365-
val COMET_JVM_UDF_ENABLED: ConfigEntry[Boolean] =
366-
conf("spark.comet.jvmUdf.enabled")
365+
val COMET_SCALA_UDF_CODEGEN_ENABLED: ConfigEntry[Boolean] =
366+
conf("spark.comet.exec.scalaUDF.codegen.enabled")
367367
.category(CATEGORY_EXEC)
368-
.doc(
369-
"Master switch for the JVM UDF framework, which lets native execution call back into " +
370-
"the JVM to evaluate selected expressions for full Spark compatibility (at the cost " +
371-
"of a JNI round-trip per batch). The framework is experimental and may change in " +
372-
"future releases. Disabled by default; expressions that would otherwise route " +
373-
"through the JVM UDF bridge fall back to native or to Spark while this is false.")
368+
.doc("Experimental. Whether to route Spark `ScalaUDF` expressions through Comet's " +
369+
"Arrow-direct codegen dispatcher. When enabled, a supported ScalaUDF is compiled into " +
370+
"a per-batch kernel that reads and writes Arrow vectors directly from native " +
371+
"execution. When disabled, plans containing a ScalaUDF fall back to Spark for the " +
372+
"enclosing operator. The same dispatcher backs `spark.comet.exec.regexp.engine=java` " +
373+
"so the regex family routes through it as well.")
374374
.booleanConf
375375
.createWithDefault(false)
376376

@@ -382,9 +382,9 @@ object CometConf extends ShimCometConf {
382382
.category(CATEGORY_EXEC)
383383
.doc(
384384
"Selects the engine used to evaluate Spark regular-expression expressions. " +
385-
s"`$REGEXP_ENGINE_JAVA` (default) routes through a JVM-side UDF " +
386-
"(java.util.regex.Pattern) for Spark-compatible semantics, at the cost of JNI " +
387-
s"roundtrips per batch; this requires ${COMET_JVM_UDF_ENABLED.key}=true and " +
385+
s"`$REGEXP_ENGINE_JAVA` (default) routes through the Arrow-direct codegen dispatcher " +
386+
"so Spark's own `doGenCode` (backed by `java.util.regex.Pattern`) runs inside the " +
387+
s"Comet pipeline; this requires ${COMET_SCALA_UDF_CODEGEN_ENABLED.key}=true and " +
388388
s"falls back to Spark otherwise. `$REGEXP_ENGINE_RUST` runs the native DataFusion " +
389389
"regexp engine when an implementation exists; setting this is itself the opt-in " +
390390
"for the semantic differences between Java and Rust regex. Affected expressions: " +
@@ -396,18 +396,6 @@ object CometConf extends ShimCometConf {
396396
.checkValues(Set(REGEXP_ENGINE_RUST, REGEXP_ENGINE_JAVA))
397397
.createWithDefault(REGEXP_ENGINE_JAVA)
398398

399-
val COMET_SCALA_UDF_CODEGEN_ENABLED: ConfigEntry[Boolean] =
400-
conf("spark.comet.exec.scalaUDF.codegen.enabled")
401-
.category(CATEGORY_EXEC)
402-
.doc("Experimental. Whether to route Spark `ScalaUDF` expressions through Comet's " +
403-
"Arrow-direct codegen dispatcher. When enabled, a supported ScalaUDF is compiled into " +
404-
"a per-batch kernel that reads and writes Arrow vectors directly from native " +
405-
"execution. When disabled, plans containing a ScalaUDF fall back to Spark for the " +
406-
"enclosing operator.")
407-
.booleanConf
408-
.createWithDefault(false)
409-
410-
411399
val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
412400
conf("spark.comet.native.shuffle.partitioning.hash.enabled")
413401
.category(CATEGORY_SHUFFLE)

0 commit comments

Comments
 (0)