Skip to content

Commit ea5dc31

Browse files
authored
fix: handle type mismatches in native c2r conversion (#3583)
1 parent 77fdeb7 commit ea5dc31

3 files changed

Lines changed: 22 additions & 13 deletions

File tree

native/core/src/execution/columnar_to_row.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1068,7 +1068,19 @@ impl ColumnarToRowContext {
10681068
})?;
10691069
Ok(Arc::new(decimal_array))
10701070
}
1071-
_ => Ok(Arc::clone(array)),
1071+
_ => {
1072+
// For any other type mismatch, attempt an Arrow cast.
1073+
// This handles cases like Int32 → Date32 (which can happen when Spark
1074+
// generates default column values using the physical storage type rather
1075+
// than the logical type).
1076+
let options = CastOptions::default();
1077+
cast_with_options(array, schema_type, &options).map_err(|e| {
1078+
CometError::Internal(format!(
1079+
"Failed to cast array from {:?} to {:?}: {}",
1080+
actual_type, schema_type, e
1081+
))
1082+
})
1083+
}
10721084
}
10731085
}
10741086

spark/src/main/scala/org/apache/spark/sql/comet/CometNativeColumnarToRowExec.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -232,11 +232,6 @@ case class CometNativeColumnarToRowExec(child: SparkPlan)
232232

233233
object CometNativeColumnarToRowExec {
234234

235-
/**
236-
* Checks if native columnar to row conversion is enabled.
237-
*/
238-
def isEnabled: Boolean = CometConf.COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED.get()
239-
240235
/**
241236
* Checks if the given schema is supported by native columnar to row conversion.
242237
*

spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -111,14 +111,16 @@ class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper {
111111

112112
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
113113
pos: Position): Unit = {
114-
Seq("native", "jvm").foreach { shuffleMode =>
115-
super.test(testName + s" ($shuffleMode shuffle)", testTags: _*) {
116-
withSQLConf(
117-
CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "false",
118-
CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) {
119-
testFun
114+
Seq(("native", "false"), ("jvm", "true"), ("jvm", "false")).foreach {
115+
case (shuffleMode, nativeC2R) =>
116+
super.test(testName + s" ($shuffleMode shuffle, nativeC2R=$nativeC2R)", testTags: _*) {
117+
withSQLConf(
118+
CometConf.COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED.key -> nativeC2R,
119+
CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "false",
120+
CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) {
121+
testFun
122+
}
120123
}
121-
}
122124
}
123125
}
124126

0 commit comments

Comments
 (0)