Skip to content

Commit b795b6c

Browse files
authored
Merge branch 'main' into broadcast_nested_join_loop_support
2 parents eb3fe00 + 2596861 commit b795b6c

17 files changed

Lines changed: 419 additions & 329 deletions

File tree

.github/workflows/codeql.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ jobs:
4949
persist-credentials: false
5050

5151
- name: Initialize CodeQL
52-
uses: github/codeql-action/init@9e0d7b8d25671d64c341c19c0152d693099fb5ba # v4
52+
uses: github/codeql-action/init@7211b7c8077ea37d8641b6271f6a365a22a5fbfa # v4
5353
with:
5454
languages: actions
5555

5656
- name: Perform CodeQL Analysis
57-
uses: github/codeql-action/analyze@9e0d7b8d25671d64c341c19c0152d693099fb5ba # v4
57+
uses: github/codeql-action/analyze@7211b7c8077ea37d8641b6271f6a365a22a5fbfa # v4
5858
with:
5959
category: "/language:actions"

native/Cargo.lock

Lines changed: 17 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/shuffle/src/spark_unsafe/list.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use arrow::array::{
2424
builder::{
2525
ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
2626
Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder,
27-
ListBuilder, StringBuilder, StructBuilder, TimestampMicrosecondBuilder,
27+
ListBuilder, NullBuilder, StringBuilder, StructBuilder, TimestampMicrosecondBuilder,
2828
},
2929
MapBuilder,
3030
};
@@ -393,6 +393,12 @@ pub fn append_to_builder<const NULLABLE: bool>(
393393
let builder = downcast_builder_ref!(Date32Builder, builder);
394394
array.append_dates_to_builder::<NULLABLE>(builder);
395395
}
396+
DataType::Null => {
397+
let builder = downcast_builder_ref!(NullBuilder, builder);
398+
for _ in 0..array.get_num_elements() {
399+
builder.append_null();
400+
}
401+
}
396402
DataType::Binary => {
397403
add_values!(
398404
BinaryBuilder,

native/shuffle/src/spark_unsafe/row.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ use arrow::array::{
2828
builder::{
2929
ArrayBuilder, BinaryBuilder, BinaryDictionaryBuilder, BooleanBuilder, Date32Builder,
3030
Decimal128Builder, Float32Builder, Float64Builder, Int16Builder, Int32Builder,
31-
Int64Builder, Int8Builder, ListBuilder, MapBuilder, StringBuilder, StringDictionaryBuilder,
32-
StructBuilder, TimestampMicrosecondBuilder,
31+
Int64Builder, Int8Builder, ListBuilder, MapBuilder, NullBuilder, StringBuilder,
32+
StringDictionaryBuilder, StructBuilder, TimestampMicrosecondBuilder,
3333
},
3434
types::Int32Type,
3535
Array, ArrayRef, RecordBatch, RecordBatchOptions,
@@ -267,6 +267,10 @@ pub(super) fn append_field(
267267
append_field_to_builder!(Date32Builder, |builder: &mut Date32Builder| builder
268268
.append_value(row.get_date(idx)));
269269
}
270+
DataType::Null => {
271+
let field_builder = get_field_builder!(struct_builder, NullBuilder, idx);
272+
field_builder.append_null();
273+
}
270274
DataType::Timestamp(TimeUnit::Microsecond, _) => {
271275
append_field_to_builder!(
272276
TimestampMicrosecondBuilder,
@@ -1148,6 +1152,12 @@ fn append_columns(
11481152
.append_value(row.get_date(idx))
11491153
);
11501154
}
1155+
DataType::Null => {
1156+
let null_builder = downcast_builder_ref!(NullBuilder, builder);
1157+
for _ in row_start..row_end {
1158+
null_builder.append_null();
1159+
}
1160+
}
11511161
DataType::Timestamp(TimeUnit::Microsecond, _) => {
11521162
append_column_to_builder!(
11531163
TimestampMicrosecondBuilder,
@@ -1252,6 +1262,7 @@ fn make_builders(
12521262
}
12531263
}
12541264
DataType::Date32 => Box::new(Date32Builder::with_capacity(row_num)),
1265+
DataType::Null => Box::new(NullBuilder::new()),
12551266
DataType::Timestamp(TimeUnit::Microsecond, _) => {
12561267
Box::new(TimestampMicrosecondBuilder::with_capacity(row_num).with_data_type(dt.clone()))
12571268
}

spark/src/main/scala/org/apache/comet/serde/strings.scala

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -54,20 +54,19 @@ object CometStringRepeat extends CometExpressionSerde[StringRepeat] {
5454
class CometCaseConversionBase[T <: Expression](function: String)
5555
extends CometScalarFunction[T](function) {
5656

57-
override def getIncompatibleReasons(): Seq[String] = Seq(
58-
"Results can vary depending on locale and character set." +
59-
s" Requires `${CometConf.COMET_CASE_CONVERSION_ENABLED.key}=true` to enable.")
57+
override def getSupportLevel(expr: T): SupportLevel = Compatible()
6058

6159
override def convert(expr: T, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = {
62-
if (!CometConf.COMET_CASE_CONVERSION_ENABLED.get()) {
63-
withInfo(
64-
expr,
65-
"Comet is not compatible with Spark for case conversion in " +
66-
s"locale-specific cases. Set ${CometConf.COMET_CASE_CONVERSION_ENABLED.key}=true " +
67-
"to enable it anyway.")
68-
return None
60+
if (CometConf.COMET_CASE_CONVERSION_ENABLED.get()) {
61+
// Native scalar function: faster but does not match Spark for locale-specific characters
62+
// (e.g. Turkish dotted/dotless I). Opt-in.
63+
super.convert(expr, inputs, binding)
64+
} else {
65+
// Default: route through the codegen dispatcher so Spark's own doGenCode runs inside the
66+
// Comet pipeline. This guarantees Spark-compatible behavior across 3.4 / 3.5 / 4.0.
67+
// Falls through to Spark when the dispatcher is disabled.
68+
CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding)
6969
}
70-
super.convert(expr, inputs, binding)
7170
}
7271
}
7372

@@ -86,20 +85,20 @@ object CometLength extends CometScalarFunction[Length]("length") {
8685

8786
object CometInitCap extends CometScalarFunction[InitCap]("initcap") {
8887

89-
override def getIncompatibleReasons(): Seq[String] = Seq(
90-
"Treats hyphen as a word separator (e.g. `robert rose-smith` produces `Robert Rose-Smith`" +
91-
" instead of Spark's `Robert Rose-smith`)" +
92-
" (https://github.com/apache/datafusion-comet/issues/1052)")
93-
94-
override def getSupportLevel(expr: InitCap): SupportLevel = {
95-
// Behavior differs from Spark. One example is that for the input "robert rose-smith", Spark
96-
// will produce "Robert Rose-smith", but Comet will produce "Robert Rose-Smith".
97-
// https://github.com/apache/datafusion-comet/issues/1052
98-
Incompatible(None)
99-
}
88+
override def getSupportLevel(expr: InitCap): SupportLevel = Compatible()
10089

10190
override def convert(expr: InitCap, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = {
102-
super.convert(expr, inputs, binding)
91+
if (CometConf.isExprAllowIncompat(getExprConfigName(expr))) {
92+
// Native path: faster but treats hyphen as a word separator (e.g.
93+
// `robert rose-smith` produces `Robert Rose-Smith` instead of Spark's `Robert Rose-smith`).
94+
// https://github.com/apache/datafusion-comet/issues/1052
95+
super.convert(expr, inputs, binding)
96+
} else {
97+
// Default: route through the codegen dispatcher so Spark's own doGenCode runs inside the
98+
// Comet pipeline. This guarantees Spark-compatible behavior across 3.4 / 3.5 / 4.0.
99+
// Falls through to Spark when the dispatcher is disabled.
100+
CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding)
101+
}
103102
}
104103
}
105104

spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
4040
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin}
4141
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
4242
import org.apache.spark.sql.internal.SQLConf
43-
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, StructType, TimestampNTZType, TimestampType}
43+
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, NullType, ShortType, StringType, StructType, TimestampNTZType, TimestampType}
4444
import org.apache.spark.sql.vectorized.ColumnarBatch
4545
import org.apache.spark.util.MutablePair
4646
import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator}
@@ -364,7 +364,7 @@ object CometShuffleExchangeExec
364364
def supportedSerializableDataType(dt: DataType): Boolean = dt match {
365365
case _: BooleanType | _: ByteType | _: ShortType | _: IntegerType | _: LongType |
366366
_: FloatType | _: DoubleType | _: StringType | _: BinaryType | _: TimestampType |
367-
_: TimestampNTZType | _: DecimalType | _: DateType =>
367+
_: TimestampNTZType | _: DecimalType | _: DateType | _: NullType =>
368368
true
369369
case StructType(fields) =>
370370
fields.nonEmpty && fields.forall(f => supportedSerializableDataType(f.dataType))
@@ -487,7 +487,7 @@ object CometShuffleExchangeExec
487487
def supportedSerializableDataType(dt: DataType): Boolean = dt match {
488488
case _: BooleanType | _: ByteType | _: ShortType | _: IntegerType | _: LongType |
489489
_: FloatType | _: DoubleType | _: StringType | _: BinaryType | _: TimestampType |
490-
_: TimestampNTZType | _: DecimalType | _: DateType =>
490+
_: TimestampNTZType | _: DecimalType | _: DateType | _: NullType =>
491491
true
492492
case StructType(fields) =>
493493
fields.nonEmpty && fields.forall(f => supportedSerializableDataType(f.dataType)) &&

spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ object Utils extends CometTypeShim with Logging {
148148
}
149149
case TimestampNTZType =>
150150
new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)
151+
case NullType => ArrowType.Null.INSTANCE
151152
case dt if isTimeType(dt) =>
152153
new ArrowType.Time(TimeUnit.NANOSECOND, 64)
153154
case _ =>

spark/src/test/resources/sql-tests/expressions/string/init_cap.sql

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,30 @@
1515
-- specific language governing permissions and limitations
1616
-- under the License.
1717

18+
-- Routes InitCap through the codegen dispatcher so behavior matches Spark exactly,
19+
-- including the hyphen-as-word-separator case where the Rust scalar function diverges
20+
-- (see https://github.com/apache/datafusion-comet/issues/1052).
21+
-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true
22+
1823
statement
1924
CREATE TABLE test_initcap(s string) USING parquet
2025

2126
statement
2227
INSERT INTO test_initcap VALUES ('hello world'), ('HELLO WORLD'), (''), (NULL), ('hello-world'), ('123abc'), (' spaces ')
2328

24-
query expect_fallback(not fully compatible with Spark)
29+
query
2530
SELECT initcap(s) FROM test_initcap
31+
32+
-- literal arguments
33+
query
34+
SELECT initcap('hello world'), initcap(''), initcap(NULL)
35+
36+
-- hyphen and other word separators - the divergence the codegen dispatcher fixes
37+
statement
38+
CREATE TABLE test_initcap_separators(s string) USING parquet
39+
40+
statement
41+
INSERT INTO test_initcap_separators VALUES ('robert rose-smith'), ('foo.bar'), ('a_b_c'), ("o'reilly")
42+
43+
query
44+
SELECT initcap(s) FROM test_initcap_separators

spark/src/test/resources/sql-tests/expressions/string/lower.sql

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,29 @@
1515
-- specific language governing permissions and limitations
1616
-- under the License.
1717

18+
-- Routes Lower through the codegen dispatcher so behavior matches Spark exactly,
19+
-- including locale-specific case mappings that the Rust scalar function does not implement.
20+
-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true
21+
1822
statement
1923
CREATE TABLE test_lower(s string) USING parquet
2024

2125
statement
2226
INSERT INTO test_lower VALUES ('HELLO'), ('hello'), ('Hello World'), (''), (NULL), ('123ABC')
2327

24-
query expect_fallback(case conversion)
28+
query
2529
SELECT lower(s) FROM test_lower
2630

2731
-- literal arguments
28-
query expect_fallback(case conversion)
32+
query
2933
SELECT lower('HELLO'), lower(''), lower(NULL)
34+
35+
-- locale-sensitive characters: Greek sigma and Turkish dotted I
36+
statement
37+
CREATE TABLE test_lower_unicode(s string) USING parquet
38+
39+
statement
40+
INSERT INTO test_lower_unicode VALUES ('ΣIGMA'), ('İSTANBUL'), ('GROSSE'), ('CAFÉ')
41+
42+
query
43+
SELECT lower(s) FROM test_lower_unicode

spark/src/test/resources/sql-tests/expressions/string/upper.sql

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,29 @@
1515
-- specific language governing permissions and limitations
1616
-- under the License.
1717

18+
-- Routes Upper through the codegen dispatcher so behavior matches Spark exactly,
19+
-- including locale-specific case mappings that the Rust scalar function does not implement.
20+
-- Config: spark.comet.exec.scalaUDF.codegen.enabled=true
21+
1822
statement
1923
CREATE TABLE test_upper(s string) USING parquet
2024

2125
statement
2226
INSERT INTO test_upper VALUES ('hello'), ('HELLO'), ('Hello World'), (''), (NULL), ('123abc')
2327

24-
query expect_fallback(case conversion)
28+
query
2529
SELECT upper(s) FROM test_upper
2630

2731
-- literal arguments
28-
query expect_fallback(case conversion)
32+
query
2933
SELECT upper('hello'), upper(''), upper(NULL)
34+
35+
-- locale-sensitive characters: German sharp s and Turkish dotted/dotless I
36+
statement
37+
CREATE TABLE test_upper_unicode(s string) USING parquet
38+
39+
statement
40+
INSERT INTO test_upper_unicode VALUES ('straße'), ('istanbul'), ('İstanbul'), ('finish')
41+
42+
query
43+
SELECT upper(s) FROM test_upper_unicode

0 commit comments

Comments
 (0)