Skip to content

Commit 3b08d01

Browse files
committed
feat: route Upper/Lower/InitCap through codegen dispatcher
Routes Spark's Upper, Lower, and InitCap expressions through the existing codegen dispatcher so they execute natively in Comet with Spark-compatible behavior, instead of falling back to Spark for the operator. Previously these expressions were marked Incompatible: - Upper/Lower diverge from Spark for locale-specific characters because the Rust scalar function does not implement JVM/ICU case mappings. - InitCap diverges for hyphen-separated words (issue #1052). The codegen dispatcher runs Spark's own doGenCode inside the Comet pipeline, guaranteeing identical behavior across Spark 3.4 / 3.5 / 4.0. The existing native paths remain available as opt-ins: - spark.comet.caseConversion.enabled=true selects the Rust upper/lower scalar function (faster but locale-incompatible). - spark.comet.expression.InitCap.allowIncompatible=true selects the Rust initcap scalar function (faster but diverges on hyphens). Plan stability suite now sets COMET_SCALA_UDF_CODEGEN_ENABLED=true; TPC-DS q24, q24a, q24b goldens have been regenerated and lose their case-conversion fallback markers.
1 parent a95a2d4 commit 3b08d01

8 files changed

Lines changed: 332 additions & 292 deletions

File tree

spark/src/main/scala/org/apache/comet/serde/strings.scala

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -54,20 +54,19 @@ object CometStringRepeat extends CometExpressionSerde[StringRepeat] {
5454
class CometCaseConversionBase[T <: Expression](function: String)
5555
extends CometScalarFunction[T](function) {
5656

57-
override def getIncompatibleReasons(): Seq[String] = Seq(
58-
"Results can vary depending on locale and character set." +
59-
s" Requires `${CometConf.COMET_CASE_CONVERSION_ENABLED.key}=true` to enable.")
57+
override def getSupportLevel(expr: T): SupportLevel = Compatible()
6058

6159
override def convert(expr: T, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = {
62-
if (!CometConf.COMET_CASE_CONVERSION_ENABLED.get()) {
63-
withInfo(
64-
expr,
65-
"Comet is not compatible with Spark for case conversion in " +
66-
s"locale-specific cases. Set ${CometConf.COMET_CASE_CONVERSION_ENABLED.key}=true " +
67-
"to enable it anyway.")
68-
return None
60+
if (CometConf.COMET_CASE_CONVERSION_ENABLED.get()) {
61+
// Native scalar function: faster but does not match Spark for locale-specific characters
62+
// (e.g. Turkish dotted/dotless I). Opt-in.
63+
super.convert(expr, inputs, binding)
64+
} else {
65+
// Default: route through the codegen dispatcher so Spark's own doGenCode runs inside the
66+
// Comet pipeline. This guarantees Spark-compatible behavior across 3.4 / 3.5 / 4.0.
67+
// Falls through to Spark when the dispatcher is disabled.
68+
CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding)
6969
}
70-
super.convert(expr, inputs, binding)
7170
}
7271
}
7372

@@ -86,20 +85,20 @@ object CometLength extends CometScalarFunction[Length]("length") {
8685

8786
object CometInitCap extends CometScalarFunction[InitCap]("initcap") {
8887

89-
override def getIncompatibleReasons(): Seq[String] = Seq(
90-
"Treats hyphen as a word separator (e.g. `robert rose-smith` produces `Robert Rose-Smith`" +
91-
" instead of Spark's `Robert Rose-smith`)" +
92-
" (https://github.com/apache/datafusion-comet/issues/1052)")
93-
94-
override def getSupportLevel(expr: InitCap): SupportLevel = {
95-
// Behavior differs from Spark. One example is that for the input "robert rose-smith", Spark
96-
// will produce "Robert Rose-smith", but Comet will produce "Robert Rose-Smith".
97-
// https://github.com/apache/datafusion-comet/issues/1052
98-
Incompatible(None)
99-
}
88+
override def getSupportLevel(expr: InitCap): SupportLevel = Compatible()
10089

10190
override def convert(expr: InitCap, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = {
102-
super.convert(expr, inputs, binding)
91+
if (CometConf.isExprAllowIncompat(getExprConfigName(expr))) {
92+
// Native path: faster but treats hyphen as a word separator (e.g.
93+
// `robert rose-smith` produces `Robert Rose-Smith` instead of Spark's `Robert Rose-smith`).
94+
// https://github.com/apache/datafusion-comet/issues/1052
95+
super.convert(expr, inputs, binding)
96+
} else {
97+
// Default: route through the codegen dispatcher so Spark's own doGenCode runs inside the
98+
// Comet pipeline. This guarantees Spark-compatible behavior across 3.4 / 3.5 / 4.0.
99+
// Falls through to Spark when the dispatcher is disabled.
100+
CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding)
101+
}
103102
}
104103
}
105104

spark/src/test/resources/sql-tests/expressions/string/init_cap.sql

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,30 @@
1515
-- specific language governing permissions and limitations
1616
-- under the License.
1717

18+
-- Routes InitCap through the codegen dispatcher so behavior matches Spark exactly,
19+
-- including the hyphen-as-word-separator case where the Rust scalar function diverges
20+
-- (see https://github.com/apache/datafusion-comet/issues/1052).
21+
-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true
22+
1823
statement
1924
CREATE TABLE test_initcap(s string) USING parquet
2025

2126
statement
2227
INSERT INTO test_initcap VALUES ('hello world'), ('HELLO WORLD'), (''), (NULL), ('hello-world'), ('123abc'), (' spaces ')
2328

24-
query expect_fallback(not fully compatible with Spark)
29+
query
2530
SELECT initcap(s) FROM test_initcap
31+
32+
-- literal arguments
33+
query
34+
SELECT initcap('hello world'), initcap(''), initcap(NULL)
35+
36+
-- hyphen and other word separators - the divergence the codegen dispatcher fixes
37+
statement
38+
CREATE TABLE test_initcap_separators(s string) USING parquet
39+
40+
statement
41+
INSERT INTO test_initcap_separators VALUES ('robert rose-smith'), ('foo.bar'), ('a_b_c'), ("o'reilly")
42+
43+
query
44+
SELECT initcap(s) FROM test_initcap_separators

spark/src/test/resources/sql-tests/expressions/string/lower.sql

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,29 @@
1515
-- specific language governing permissions and limitations
1616
-- under the License.
1717

18+
-- Routes Lower through the codegen dispatcher so behavior matches Spark exactly,
19+
-- including locale-specific case mappings that the Rust scalar function does not implement.
20+
-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true
21+
1822
statement
1923
CREATE TABLE test_lower(s string) USING parquet
2024

2125
statement
2226
INSERT INTO test_lower VALUES ('HELLO'), ('hello'), ('Hello World'), (''), (NULL), ('123ABC')
2327

24-
query expect_fallback(case conversion)
28+
query
2529
SELECT lower(s) FROM test_lower
2630

2731
-- literal arguments
28-
query expect_fallback(case conversion)
32+
query
2933
SELECT lower('HELLO'), lower(''), lower(NULL)
34+
35+
-- locale-sensitive characters: Greek sigma and Turkish dotted I
36+
statement
37+
CREATE TABLE test_lower_unicode(s string) USING parquet
38+
39+
statement
40+
INSERT INTO test_lower_unicode VALUES ('ΣIGMA'), ('İSTANBUL'), ('GROSSE'), ('CAFÉ')
41+
42+
query
43+
SELECT lower(s) FROM test_lower_unicode

spark/src/test/resources/sql-tests/expressions/string/upper.sql

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,29 @@
1515
-- specific language governing permissions and limitations
1616
-- under the License.
1717

18+
-- Routes Upper through the codegen dispatcher so behavior matches Spark exactly,
19+
-- including locale-specific case mappings that the Rust scalar function does not implement.
20+
-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true
21+
1822
statement
1923
CREATE TABLE test_upper(s string) USING parquet
2024

2125
statement
2226
INSERT INTO test_upper VALUES ('hello'), ('HELLO'), ('Hello World'), (''), (NULL), ('123abc')
2327

24-
query expect_fallback(case conversion)
28+
query
2529
SELECT upper(s) FROM test_upper
2630

2731
-- literal arguments
28-
query expect_fallback(case conversion)
32+
query
2933
SELECT upper('hello'), upper(''), upper(NULL)
34+
35+
-- locale-sensitive characters: German sharp s and Turkish dotted/dotless I
36+
statement
37+
CREATE TABLE test_upper_unicode(s string) USING parquet
38+
39+
statement
40+
INSERT INTO test_upper_unicode VALUES ('straße'), ('istanbul'), ('İstanbul'), ('finish')
41+
42+
query
43+
SELECT upper(s) FROM test_upper_unicode
Lines changed: 86 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,92 +1,90 @@
1-
Filter
2-
: +- Subquery
3-
: +- HashAggregate [COMET: Spark Final aggregate without Comet Partial requires compatible intermediate buffer formats]
4-
: +- Exchange
5-
: +- HashAggregate
6-
: +- HashAggregate [COMET: Spark Final aggregate without Comet Partial requires compatible intermediate buffer formats]
7-
: +- Exchange
8-
: +- HashAggregate
9-
: +- Project
10-
: +- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
11-
: :- CometNativeColumnarToRow
12-
: : +- CometProject
13-
: : +- CometBroadcastHashJoin
14-
: : :- CometProject
15-
: : : +- CometBroadcastHashJoin
16-
: : : :- CometProject
17-
: : : : +- CometBroadcastHashJoin
18-
: : : : :- CometProject
19-
: : : : : +- CometSortMergeJoin
20-
: : : : : :- CometSort
21-
: : : : : : +- CometExchange
22-
: : : : : : +- CometProject
23-
: : : : : : +- CometFilter
24-
: : : : : : +- CometNativeScan parquet spark_catalog.default.store_sales
25-
: : : : : +- CometSort
26-
: : : : : +- CometExchange
27-
: : : : : +- CometProject
28-
: : : : : +- CometFilter
29-
: : : : : +- CometNativeScan parquet spark_catalog.default.store_returns
30-
: : : : +- CometBroadcastExchange
31-
: : : : +- CometProject
32-
: : : : +- CometFilter
33-
: : : : +- CometNativeScan parquet spark_catalog.default.store
34-
: : : +- CometBroadcastExchange
35-
: : : +- CometProject
36-
: : : +- CometFilter
37-
: : : +- CometNativeScan parquet spark_catalog.default.item
38-
: : +- CometBroadcastExchange
39-
: : +- CometProject
40-
: : +- CometFilter
41-
: : +- CometNativeScan parquet spark_catalog.default.customer
42-
: +- BroadcastExchange
43-
: +- CometNativeColumnarToRow
44-
: +- CometProject
45-
: +- CometFilter
46-
: +- CometNativeScan parquet spark_catalog.default.customer_address
47-
+- HashAggregate [COMET: Spark Final aggregate without Comet Partial requires compatible intermediate buffer formats]
48-
+- Exchange
49-
+- HashAggregate
50-
+- HashAggregate [COMET: Spark Final aggregate without Comet Partial requires compatible intermediate buffer formats]
51-
+- Exchange
52-
+- HashAggregate
53-
+- Project
54-
+- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
55-
:- CometNativeColumnarToRow
56-
: +- CometProject
57-
: +- CometBroadcastHashJoin
58-
: :- CometProject
59-
: : +- CometBroadcastHashJoin
60-
: : :- CometProject
61-
: : : +- CometBroadcastHashJoin
62-
: : : :- CometProject
63-
: : : : +- CometSortMergeJoin
64-
: : : : :- CometSort
65-
: : : : : +- CometExchange
66-
: : : : : +- CometProject
67-
: : : : : +- CometFilter
68-
: : : : : +- CometNativeScan parquet spark_catalog.default.store_sales
69-
: : : : +- CometSort
70-
: : : : +- CometExchange
71-
: : : : +- CometProject
72-
: : : : +- CometFilter
73-
: : : : +- CometNativeScan parquet spark_catalog.default.store_returns
74-
: : : +- CometBroadcastExchange
75-
: : : +- CometProject
76-
: : : +- CometFilter
77-
: : : +- CometNativeScan parquet spark_catalog.default.store
78-
: : +- CometBroadcastExchange
79-
: : +- CometProject
80-
: : +- CometFilter
81-
: : +- CometNativeScan parquet spark_catalog.default.item
82-
: +- CometBroadcastExchange
83-
: +- CometProject
84-
: +- CometFilter
85-
: +- CometNativeScan parquet spark_catalog.default.customer
86-
+- BroadcastExchange
87-
+- CometNativeColumnarToRow
1+
CometNativeColumnarToRow
2+
+- CometFilter
3+
: +- Subquery
4+
: +- CometNativeColumnarToRow
5+
: +- CometHashAggregate
6+
: +- CometExchange
7+
: +- CometHashAggregate
8+
: +- CometHashAggregate
9+
: +- CometExchange
10+
: +- CometHashAggregate
11+
: +- CometProject
12+
: +- CometBroadcastHashJoin
13+
: :- CometProject
14+
: : +- CometBroadcastHashJoin
15+
: : :- CometProject
16+
: : : +- CometBroadcastHashJoin
17+
: : : :- CometProject
18+
: : : : +- CometBroadcastHashJoin
19+
: : : : :- CometProject
20+
: : : : : +- CometSortMergeJoin
21+
: : : : : :- CometSort
22+
: : : : : : +- CometExchange
23+
: : : : : : +- CometProject
24+
: : : : : : +- CometFilter
25+
: : : : : : +- CometNativeScan parquet spark_catalog.default.store_sales
26+
: : : : : +- CometSort
27+
: : : : : +- CometExchange
28+
: : : : : +- CometProject
29+
: : : : : +- CometFilter
30+
: : : : : +- CometNativeScan parquet spark_catalog.default.store_returns
31+
: : : : +- CometBroadcastExchange
32+
: : : : +- CometProject
33+
: : : : +- CometFilter
34+
: : : : +- CometNativeScan parquet spark_catalog.default.store
35+
: : : +- CometBroadcastExchange
36+
: : : +- CometProject
37+
: : : +- CometFilter
38+
: : : +- CometNativeScan parquet spark_catalog.default.item
39+
: : +- CometBroadcastExchange
40+
: : +- CometProject
41+
: : +- CometFilter
42+
: : +- CometNativeScan parquet spark_catalog.default.customer
43+
: +- CometBroadcastExchange
44+
: +- CometProject
45+
: +- CometFilter
46+
: +- CometNativeScan parquet spark_catalog.default.customer_address
47+
+- CometHashAggregate
48+
+- CometExchange
49+
+- CometHashAggregate
50+
+- CometHashAggregate
51+
+- CometExchange
52+
+- CometHashAggregate
53+
+- CometProject
54+
+- CometBroadcastHashJoin
55+
:- CometProject
56+
: +- CometBroadcastHashJoin
57+
: :- CometProject
58+
: : +- CometBroadcastHashJoin
59+
: : :- CometProject
60+
: : : +- CometBroadcastHashJoin
61+
: : : :- CometProject
62+
: : : : +- CometSortMergeJoin
63+
: : : : :- CometSort
64+
: : : : : +- CometExchange
65+
: : : : : +- CometProject
66+
: : : : : +- CometFilter
67+
: : : : : +- CometNativeScan parquet spark_catalog.default.store_sales
68+
: : : : +- CometSort
69+
: : : : +- CometExchange
70+
: : : : +- CometProject
71+
: : : : +- CometFilter
72+
: : : : +- CometNativeScan parquet spark_catalog.default.store_returns
73+
: : : +- CometBroadcastExchange
74+
: : : +- CometProject
75+
: : : +- CometFilter
76+
: : : +- CometNativeScan parquet spark_catalog.default.store
77+
: : +- CometBroadcastExchange
78+
: : +- CometProject
79+
: : +- CometFilter
80+
: : +- CometNativeScan parquet spark_catalog.default.item
81+
: +- CometBroadcastExchange
82+
: +- CometProject
83+
: +- CometFilter
84+
: +- CometNativeScan parquet spark_catalog.default.customer
85+
+- CometBroadcastExchange
8886
+- CometProject
8987
+- CometFilter
9088
+- CometNativeScan parquet spark_catalog.default.customer_address
9189

92-
Comet accelerated 66 out of 86 eligible operators (76%). Final plan contains 4 transitions between Spark and Comet.
90+
Comet accelerated 85 out of 86 eligible operators (98%). Final plan contains 2 transitions between Spark and Comet.

0 commit comments

Comments
 (0)