Skip to content

Commit d46b56a

Browse files
andygroveclaude
andcommitted
Add NullVector/NullArray support for native columnar-to-row conversion
- Add NullVector to getFieldVector in Utils.scala to allow export - Add DataType::Null handling in columnar_to_row.rs for native C2R - Update withInfo test for new native C2R plan structure This fixes the round test failure when scale is null, which produces a NullArray that needs to be handled by the native C2R path. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 90cc9ba commit d46b56a

3 files changed

Lines changed: 14 additions & 10 deletions

File tree

common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import java.nio.channels.Channels
2626
import scala.jdk.CollectionConverters._
2727

2828
import org.apache.arrow.c.CDataDictionaryProvider
29-
import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, DecimalVector, FieldVector, FixedSizeBinaryVector, Float4Vector, Float8Vector, IntVector, SmallIntVector, TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, ValueVector, VarBinaryVector, VarCharVector, VectorSchemaRoot}
29+
import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, DecimalVector, FieldVector, FixedSizeBinaryVector, Float4Vector, Float8Vector, IntVector, NullVector, SmallIntVector, TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, ValueVector, VarBinaryVector, VarCharVector, VectorSchemaRoot}
3030
import org.apache.arrow.vector.complex.{ListVector, MapVector, StructVector}
3131
import org.apache.arrow.vector.dictionary.DictionaryProvider
3232
import org.apache.arrow.vector.ipc.ArrowStreamWriter
@@ -282,7 +282,7 @@ object Utils extends CometTypeShim {
282282
_: BigIntVector | _: Float4Vector | _: Float8Vector | _: VarCharVector |
283283
_: DecimalVector | _: DateDayVector | _: TimeStampMicroTZVector | _: VarBinaryVector |
284284
_: FixedSizeBinaryVector | _: TimeStampMicroVector | _: StructVector | _: ListVector |
285-
_: MapVector) =>
285+
_: MapVector | _: NullVector) =>
286286
v.asInstanceOf[FieldVector]
287287
case _ =>
288288
throw new SparkException(s"Unsupported Arrow Vector for $reason: ${valueVector.getClass}")

native/core/src/execution/columnar_to_row.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ const MAX_LONG_DIGITS: u8 = 18;
5151
/// This enum holds references to concrete array types, allowing direct access
5252
/// without repeated downcast_ref calls.
5353
enum TypedArray<'a> {
54+
Null(&'a NullArray),
5455
Boolean(&'a BooleanArray),
5556
Int8(&'a Int8Array),
5657
Int16(&'a Int16Array),
@@ -81,6 +82,11 @@ impl<'a> TypedArray<'a> {
8182
fn from_array(array: &'a ArrayRef, _schema_type: &DataType) -> CometResult<Self> {
8283
let actual_type = array.data_type();
8384
match actual_type {
85+
DataType::Null => Ok(TypedArray::Null(
86+
array.as_any().downcast_ref::<NullArray>().ok_or_else(|| {
87+
CometError::Internal("Failed to downcast to NullArray".to_string())
88+
})?,
89+
)),
8490
DataType::Boolean => Ok(TypedArray::Boolean(
8591
array
8692
.as_any()
@@ -235,6 +241,7 @@ impl<'a> TypedArray<'a> {
235241
#[inline]
236242
fn is_null(&self, row_idx: usize) -> bool {
237243
match self {
244+
TypedArray::Null(_) => true, // Null type is always null
238245
TypedArray::Boolean(arr) => arr.is_null(row_idx),
239246
TypedArray::Int8(arr) => arr.is_null(row_idx),
240247
TypedArray::Int16(arr) => arr.is_null(row_idx),
@@ -292,7 +299,8 @@ impl<'a> TypedArray<'a> {
292299
#[inline]
293300
fn is_variable_length(&self) -> bool {
294301
match self {
295-
TypedArray::Boolean(_)
302+
TypedArray::Null(_)
303+
| TypedArray::Boolean(_)
296304
| TypedArray::Int8(_)
297305
| TypedArray::Int16(_)
298306
| TypedArray::Int32(_)

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ import org.apache.hadoop.fs.Path
3030
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
3131
import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, FromUnixTime, Literal, TruncDate, TruncTimestamp}
3232
import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps
33-
import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometProjectExec}
34-
import org.apache.spark.sql.execution.{InputAdapter, ProjectExec, SparkPlan, WholeStageCodegenExec}
33+
import org.apache.spark.sql.comet.{CometNativeColumnarToRowExec, CometProjectExec}
34+
import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
3535
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
3636
import org.apache.spark.sql.functions._
3737
import org.apache.spark.sql.internal.SQLConf
@@ -958,11 +958,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
958958
val query = sql(s"select cast(id as string) from $table")
959959
val (_, cometPlan) = checkSparkAnswerAndOperator(query)
960960
val project = cometPlan
961-
.asInstanceOf[WholeStageCodegenExec]
962-
.child
963-
.asInstanceOf[CometColumnarToRowExec]
964-
.child
965-
.asInstanceOf[InputAdapter]
961+
.asInstanceOf[CometNativeColumnarToRowExec]
966962
.child
967963
.asInstanceOf[CometProjectExec]
968964
val id = project.expressions.head

0 commit comments

Comments
 (0)