Skip to content

Commit 29d9e19

Browse files
authored
Merge branch 'main' into opt_native_shuffle
2 parents e8e438f + c8975b1 commit 29d9e19

9 files changed

Lines changed: 87 additions & 37 deletions

File tree

.github/workflows/codeql.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ jobs:
4949
persist-credentials: false
5050

5151
- name: Initialize CodeQL
52-
uses: github/codeql-action/init@9e0d7b8d25671d64c341c19c0152d693099fb5ba # v4
52+
uses: github/codeql-action/init@7211b7c8077ea37d8641b6271f6a365a22a5fbfa # v4
5353
with:
5454
languages: actions
5555

5656
- name: Perform CodeQL Analysis
57-
uses: github/codeql-action/analyze@9e0d7b8d25671d64c341c19c0152d693099fb5ba # v4
57+
uses: github/codeql-action/analyze@7211b7c8077ea37d8641b6271f6a365a22a5fbfa # v4
5858
with:
5959
category: "/language:actions"

native/Cargo.lock

Lines changed: 17 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/shuffle/src/spark_unsafe/list.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use arrow::array::{
2424
builder::{
2525
ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
2626
Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder,
27-
ListBuilder, StringBuilder, StructBuilder, TimestampMicrosecondBuilder,
27+
ListBuilder, NullBuilder, StringBuilder, StructBuilder, TimestampMicrosecondBuilder,
2828
},
2929
MapBuilder,
3030
};
@@ -393,6 +393,12 @@ pub fn append_to_builder<const NULLABLE: bool>(
393393
let builder = downcast_builder_ref!(Date32Builder, builder);
394394
array.append_dates_to_builder::<NULLABLE>(builder);
395395
}
396+
DataType::Null => {
397+
let builder = downcast_builder_ref!(NullBuilder, builder);
398+
for _ in 0..array.get_num_elements() {
399+
builder.append_null();
400+
}
401+
}
396402
DataType::Binary => {
397403
add_values!(
398404
BinaryBuilder,

native/shuffle/src/spark_unsafe/row.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ use arrow::array::{
2828
builder::{
2929
ArrayBuilder, BinaryBuilder, BinaryDictionaryBuilder, BooleanBuilder, Date32Builder,
3030
Decimal128Builder, Float32Builder, Float64Builder, Int16Builder, Int32Builder,
31-
Int64Builder, Int8Builder, ListBuilder, MapBuilder, StringBuilder, StringDictionaryBuilder,
32-
StructBuilder, TimestampMicrosecondBuilder,
31+
Int64Builder, Int8Builder, ListBuilder, MapBuilder, NullBuilder, StringBuilder,
32+
StringDictionaryBuilder, StructBuilder, TimestampMicrosecondBuilder,
3333
},
3434
types::Int32Type,
3535
Array, ArrayRef, RecordBatch, RecordBatchOptions,
@@ -267,6 +267,10 @@ pub(super) fn append_field(
267267
append_field_to_builder!(Date32Builder, |builder: &mut Date32Builder| builder
268268
.append_value(row.get_date(idx)));
269269
}
270+
DataType::Null => {
271+
let field_builder = get_field_builder!(struct_builder, NullBuilder, idx);
272+
field_builder.append_null();
273+
}
270274
DataType::Timestamp(TimeUnit::Microsecond, _) => {
271275
append_field_to_builder!(
272276
TimestampMicrosecondBuilder,
@@ -1148,6 +1152,12 @@ fn append_columns(
11481152
.append_value(row.get_date(idx))
11491153
);
11501154
}
1155+
DataType::Null => {
1156+
let null_builder = downcast_builder_ref!(NullBuilder, builder);
1157+
for _ in row_start..row_end {
1158+
null_builder.append_null();
1159+
}
1160+
}
11511161
DataType::Timestamp(TimeUnit::Microsecond, _) => {
11521162
append_column_to_builder!(
11531163
TimestampMicrosecondBuilder,
@@ -1252,6 +1262,7 @@ fn make_builders(
12521262
}
12531263
}
12541264
DataType::Date32 => Box::new(Date32Builder::with_capacity(row_num)),
1265+
DataType::Null => Box::new(NullBuilder::new()),
12551266
DataType::Timestamp(TimeUnit::Microsecond, _) => {
12561267
Box::new(TimestampMicrosecondBuilder::with_capacity(row_num).with_data_type(dt.clone()))
12571268
}

spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
4040
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin}
4141
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
4242
import org.apache.spark.sql.internal.SQLConf
43-
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, StructType, TimestampNTZType, TimestampType}
43+
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, NullType, ShortType, StringType, StructType, TimestampNTZType, TimestampType}
4444
import org.apache.spark.sql.vectorized.ColumnarBatch
4545
import org.apache.spark.util.MutablePair
4646
import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator}
@@ -408,7 +408,7 @@ object CometShuffleExchangeExec
408408
def supportedSerializableDataType(dt: DataType): Boolean = dt match {
409409
case _: BooleanType | _: ByteType | _: ShortType | _: IntegerType | _: LongType |
410410
_: FloatType | _: DoubleType | _: StringType | _: BinaryType | _: TimestampType |
411-
_: TimestampNTZType | _: DecimalType | _: DateType =>
411+
_: TimestampNTZType | _: DecimalType | _: DateType | _: NullType =>
412412
true
413413
case StructType(fields) =>
414414
fields.nonEmpty && fields.forall(f => supportedSerializableDataType(f.dataType))
@@ -531,7 +531,7 @@ object CometShuffleExchangeExec
531531
def supportedSerializableDataType(dt: DataType): Boolean = dt match {
532532
case _: BooleanType | _: ByteType | _: ShortType | _: IntegerType | _: LongType |
533533
_: FloatType | _: DoubleType | _: StringType | _: BinaryType | _: TimestampType |
534-
_: TimestampNTZType | _: DecimalType | _: DateType =>
534+
_: TimestampNTZType | _: DecimalType | _: DateType | _: NullType =>
535535
true
536536
case StructType(fields) =>
537537
fields.nonEmpty && fields.forall(f => supportedSerializableDataType(f.dataType)) &&

spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ object Utils extends CometTypeShim with Logging {
148148
}
149149
case TimestampNTZType =>
150150
new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)
151+
case NullType => ArrowType.Null.INSTANCE
151152
case dt if isTimeType(dt) =>
152153
new ArrowType.Time(TimeUnit.NANOSECOND, 64)
153154
case _ =>

spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,13 @@ package org.apache.comet.exec
2222
import java.nio.file.{Files, Paths}
2323

2424
import scala.reflect.runtime.universe._
25-
import scala.util.Random
2625

2726
import org.scalactic.source.Position
2827
import org.scalatest.Tag
2928

3029
import org.apache.hadoop.fs.Path
3130
import org.apache.spark.{Partitioner, SparkConf}
32-
import org.apache.spark.sql.{CometTestBase, DataFrame, RandomDataGenerator, Row}
31+
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
3332
import org.apache.spark.sql.comet.execution.shuffle.{CometShuffleDependency, CometShuffleExchangeExec, CometShuffleManager}
3433
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEShuffleReadExec, ShuffleQueryStageExec}
3534
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
@@ -94,22 +93,16 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
9493
""".stripMargin))
9594
}
9695

97-
test("Fallback to Spark for unsupported input besides ordering") {
98-
val dataGenerator = RandomDataGenerator
99-
.forType(
100-
dataType = NullType,
101-
nullable = true,
102-
new Random(System.nanoTime()),
103-
validJulianDatetime = false)
104-
.get
105-
106-
val schema = new StructType()
107-
.add("index", IntegerType, nullable = false)
108-
.add("col", NullType, nullable = true)
109-
val rdd =
110-
spark.sparkContext.parallelize((1 to 20).map(i => Row(i, dataGenerator())))
111-
val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
112-
checkSparkAnswer(df)
96+
test("columnar shuffle with NullType passthrough column") {
97+
val df = sql("SELECT x, y FROM VALUES ('a', null), ('b', null), ('c', null) AS t(x, y)")
98+
val shuffled = df.repartition(2, $"x")
99+
checkShuffleAnswer(shuffled, 1)
100+
}
101+
102+
test("columnar shuffle with Map[_, NullType] column") {
103+
val df = sql("SELECT id, map(id, null) AS m FROM VALUES (1), (2), (3) AS t(id)")
104+
val shuffled = df.repartition(2, $"id")
105+
checkShuffleAnswer(shuffled, 1)
113106
}
114107

115108
test("columnar shuffle on nested struct including nulls") {

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3925,6 +3925,21 @@ class CometExecSuite extends CometTestBase {
39253925
}
39263926
}
39273927

3928+
test("CometLocalTableScanExec handles NullType column") {
3929+
withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") {
3930+
val df = spark.sql("SELECT * FROM VALUES ('a', null), ('b', null) AS t(x, y)")
3931+
checkSparkAnswerAndOperator(df)
3932+
}
3933+
}
3934+
3935+
test("CometLocalTableScanExec handles NullType nested in struct/array/map") {
3936+
withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") {
3937+
checkSparkAnswerAndOperator(
3938+
spark.sql("SELECT named_struct('a', 1, 'b', null) AS s, array(null, null) AS a, " +
3939+
"map('k', null) AS m FROM VALUES (1), (2) AS t(id)"))
3940+
}
3941+
}
3942+
39283943
test("Native_datafusion reports correct files and bytes scanned") {
39293944
val inputFiles = 2
39303945

spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,24 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper
218218
}
219219
}
220220

221+
test("native shuffle with NullType passthrough column") {
222+
withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") {
223+
val df =
224+
spark.sql("SELECT x, y FROM VALUES ('a', null), ('b', null), ('c', null) AS t(x, y)")
225+
val shuffled = df.repartition(2, $"x")
226+
checkShuffleAnswer(shuffled, 1)
227+
}
228+
}
229+
230+
test("native shuffle with Map[_, NullType] column") {
231+
withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") {
232+
val df = spark.sql("SELECT id, map(id, null) AS m FROM VALUES (1), (2), (3) AS t(id)")
233+
val shuffled = df.repartition(2, $"id")
234+
println(shuffled.queryExecution.executedPlan)
235+
checkShuffleAnswer(shuffled, 1)
236+
}
237+
}
238+
221239
test("fix: Comet native shuffle with binary data") {
222240
withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
223241
val df = sql("SELECT cast(cast(_1 as STRING) as BINARY) as binary, _2 FROM tbl")

0 commit comments

Comments
 (0)