From 768b3e90f261c7aea58bdb98dc698b90deeeae34 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 14 Dec 2025 16:24:01 +0400 Subject: [PATCH 1/7] impl map_from_entries --- native/core/src/execution/jni_api.rs | 2 + .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../scala/org/apache/comet/serde/maps.scala | 29 +++++++++++- .../comet/CometMapExpressionSuite.scala | 45 +++++++++++++++++++ 4 files changed, 77 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index a24d993059..4f53cea3e6 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -46,6 +46,7 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; +use datafusion_spark::function::map::map_from_entries::MapFromEntries; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; @@ -337,6 +338,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 54df2f1688..a99cf3824b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -125,7 +125,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays) + classOf[MapFromArrays] -> CometMapFromArrays, + classOf[MapFromEntries] -> CometMapFromEntries) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 2e217f6af0..498aa3594c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -19,9 +19,12 @@ package org.apache.comet.serde +import scala.annotation.tailrec + import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ArrayType, MapType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, MapType, StructType} +import org.apache.comet.serde.CometArrayReverse.containsBinary import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -89,3 +92,27 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } + +object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from_entries") { + val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in map_from_entries" + val valueUnsupportedReason = "Using BinaryType as Map values is not allowed in map_from_entries" + + private def containsBinary(dataType: DataType): Boolean = { + dataType match { + case BinaryType => true + case StructType(fields) => fields.exists(field => containsBinary(field.dataType)) + case ArrayType(elementType, _) => containsBinary(elementType) + case _ => false + } + } + + override def getSupportLevel(expr: MapFromEntries): SupportLevel = { + if (containsBinary(expr.dataType.keyType)) { + return Incompatible(Some(keyUnsupportedReason)) + } + if (containsBinary(expr.dataType.valueType)) { + return Incompatible(Some(valueUnsupportedReason)) + } + Compatible(None) + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 88c13391a6..01b9744ed6 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -25,7 +25,9 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.BinaryType +import org.apache.comet.serde.CometMapFromEntries import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { @@ -125,4 +127,47 @@ class CometMapExpressionSuite extends CometTestBase { } } + test("map_from_entries") { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + val filename = path.toString + val random = new Random(42) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val schemaGenOptions = + SchemaGenOptions( + generateArray = true, + generateStruct = true, + primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes.filterNot(_ == BinaryType)) + val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + schemaGenOptions, + dataGenOptions) + } + val df = spark.read.parquet(filename) + df.createOrReplaceTempView("t1") + for (field <- df.schema.fieldNames) { + checkSparkAnswerAndOperator( + spark.sql(s"SELECT map_from_entries(array(struct($field as a, $field as b))) FROM t1")) + } + } + } + + test("map_from_entries - fallback for binary type") { + val table = "t2" + withTable(table) { + sql( + s"create table $table using parquet as select cast(array() as array) as c1 from range(10)") + checkSparkAnswerAndFallbackReason( + sql(s"select map_from_entries(array(struct(c1, 0))) from $table"), + CometMapFromEntries.keyUnsupportedReason) + checkSparkAnswerAndFallbackReason( + sql(s"select map_from_entries(array(struct(0, c1))) from $table"), + CometMapFromEntries.valueUnsupportedReason) + } + } + } From c68c3428676b5d991e7ba9e13464bf2ce1ec84e8 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 16 Dec 2025 16:10:43 +0400 Subject: [PATCH 2/7] Revert "impl map_from_entries" This reverts commit 768b3e90f261c7aea58bdb98dc698b90deeeae34. --- native/core/src/execution/jni_api.rs | 2 - .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../scala/org/apache/comet/serde/maps.scala | 29 +----------- .../comet/CometMapExpressionSuite.scala | 45 ------------------- 4 files changed, 2 insertions(+), 77 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 4f53cea3e6..a24d993059 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -46,7 +46,6 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; -use datafusion_spark::function::map::map_from_entries::MapFromEntries; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; @@ -338,7 +337,6 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); - session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index a99cf3824b..54df2f1688 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -125,8 +125,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays, - classOf[MapFromEntries] -> CometMapFromEntries) + classOf[MapFromArrays] -> CometMapFromArrays) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 498aa3594c..2e217f6af0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -19,12 +19,9 @@ package org.apache.comet.serde -import scala.annotation.tailrec - import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, MapType, StructType} +import org.apache.spark.sql.types.{ArrayType, MapType} -import org.apache.comet.serde.CometArrayReverse.containsBinary import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -92,27 +89,3 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } - -object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from_entries") { - val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in map_from_entries" - val valueUnsupportedReason = "Using BinaryType as Map values is not allowed in map_from_entries" - - private def containsBinary(dataType: DataType): Boolean = { - dataType match { - case BinaryType => true - case StructType(fields) => fields.exists(field => containsBinary(field.dataType)) - case ArrayType(elementType, _) => containsBinary(elementType) - case _ => false - } - } - - override def getSupportLevel(expr: MapFromEntries): SupportLevel = { - if (containsBinary(expr.dataType.keyType)) { - return Incompatible(Some(keyUnsupportedReason)) - } - if (containsBinary(expr.dataType.valueType)) { - return Incompatible(Some(valueUnsupportedReason)) - } - Compatible(None) - } -} diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 01b9744ed6..88c13391a6 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -25,9 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.BinaryType -import org.apache.comet.serde.CometMapFromEntries import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { @@ -127,47 +125,4 @@ class CometMapExpressionSuite extends CometTestBase { } } - test("map_from_entries") { - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - val filename = path.toString - val random = new Random(42) - withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val schemaGenOptions = - SchemaGenOptions( - generateArray = true, - generateStruct = true, - primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes.filterNot(_ == BinaryType)) - val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) - ParquetGenerator.makeParquetFile( - random, - spark, - filename, - 100, - schemaGenOptions, - dataGenOptions) - } - val df = spark.read.parquet(filename) - df.createOrReplaceTempView("t1") - for (field <- df.schema.fieldNames) { - checkSparkAnswerAndOperator( - spark.sql(s"SELECT map_from_entries(array(struct($field as a, $field as b))) FROM t1")) - } - } - } - - test("map_from_entries - fallback for binary type") { - val table = "t2" - withTable(table) { - sql( - s"create table $table using parquet as select cast(array() as array) as c1 from range(10)") - checkSparkAnswerAndFallbackReason( - sql(s"select map_from_entries(array(struct(c1, 0))) from $table"), - CometMapFromEntries.keyUnsupportedReason) - checkSparkAnswerAndFallbackReason( - sql(s"select map_from_entries(array(struct(0, c1))) from $table"), - CometMapFromEntries.valueUnsupportedReason) - } - } - } From cfc751aec6f6fea9120d21aa239d930bb703a9f3 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Thu, 18 Jun 2026 18:43:43 +0400 Subject: [PATCH 3/7] Feat: add empty2Null inner spark function --- native/spark-expr/src/comet_scalar_funcs.rs | 5 +- .../src/string_funcs/empty_to_null.rs | 151 ++++++++++++++++++ native/spark-expr/src/string_funcs/mod.rs | 2 + .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../org/apache/comet/serde/strings.scala | 6 +- 5 files changed, 163 insertions(+), 4 deletions(-) create mode 100644 native/spark-expr/src/string_funcs/empty_to_null.rs diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 62eeaa2b1d..6c5d9ea0b7 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -27,8 +27,8 @@ use crate::{ spark_isnan, spark_lpad, spark_make_decimal, spark_month_name, spark_read_side_padding, spark_round, spark_rpad, spark_to_time, spark_unhex, spark_unscaled_value, EvalMode, SparkArrayCompact, SparkArrayPositionFunc, SparkArraySlice, SparkArraysOverlap, SparkContains, - SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, SparkMakeDate, SparkMakeTime, - SparkNextDay, SparkSecondsToTimestamp, SparkSizeFunc, + SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, SparkEmpty2Null, SparkMakeDate, + SparkMakeTime, SparkNextDay, SparkSecondsToTimestamp, SparkSizeFunc, }; use arrow::datatypes::DataType; use datafusion::common::{DataFusionError, Result as DataFusionResult}; @@ -249,6 +249,7 @@ fn all_scalar_functions() -> Vec> { Arc::new(ScalarUDF::new_from_impl(SparkSecondsToTimestamp::default())), Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())), Arc::new(ScalarUDF::new_from_impl(JsonArrayLength::default())), + Arc::new(ScalarUDF::new_from_impl(SparkEmpty2Null::default())), ] } diff --git a/native/spark-expr/src/string_funcs/empty_to_null.rs b/native/spark-expr/src/string_funcs/empty_to_null.rs new file mode 100644 index 0000000000..53b68920ab --- /dev/null +++ b/native/spark-expr/src/string_funcs/empty_to_null.rs @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, OffsetSizeTrait, StringArray}; +use arrow::datatypes::DataType; +use datafusion::common::cast::{as_generic_string_array, as_string_view_array}; +use datafusion::common::{exec_err, Result, ScalarValue}; +use datafusion::logical_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use std::any::Any; +use std::sync::Arc; + +/// Spark `Empty2Null`: returns NULL if the string is NULL or empty (`""`), +/// otherwise returns the string itself. Used in partitioned writes to ensure +/// that an empty string ends up in `__HIVE_DEFAULT_PARTITION__`, just like NULL. +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkEmpty2Null { + signature: Signature, +} + +impl Default for SparkEmpty2Null { + fn default() -> Self { + Self::new() + } +} + +impl SparkEmpty2Null { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for SparkEmpty2Null { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "empty2Null" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + Ok(arg_types[0].clone()) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + spark_empty_to_null(&args.args) + } +} + +fn spark_empty_to_null(args: &[ColumnarValue]) -> Result { + if args.len() != 1 { + return exec_err!("spark empty2Null takes exactly one argument"); + } + match &args[0] { + ColumnarValue::Array(array) => { + let result = spark_empty_to_null_array(&array)?; + Ok(ColumnarValue::Array(result)) + } + ColumnarValue::Scalar(scalar) => { + let result = spark_empty_to_null_scalar(&scalar)?; + Ok(ColumnarValue::Scalar(result)) + } + } +} + +fn spark_empty_to_null_array(array: &ArrayRef) -> Result { + match array.data_type() { + DataType::Utf8 => spark_empty_to_null_string_array::(array), + DataType::LargeUtf8 => spark_empty_to_null_string_array::(array), + DataType::Utf8View => spark_empty_to_null_string_view(array), + other => { + exec_err!("unsupported data type {other:?} for function `empty2Null`") + } + } +} + +fn spark_empty_to_null_scalar(scalar: &ScalarValue) -> Result { + match scalar { + ScalarValue::Utf8(v) => Ok(ScalarValue::Utf8(v.clone().filter(|s| !s.is_empty()))), + ScalarValue::LargeUtf8(v) => { + Ok(ScalarValue::LargeUtf8(v.clone().filter(|s| !s.is_empty()))) + } + ScalarValue::Utf8View(v) => Ok(ScalarValue::Utf8View(v.clone().filter(|s| !s.is_empty()))), + other => { + exec_err!("unsupported data type {other:?} for function `empty2Null`") + } + } +} + +fn spark_empty_to_null_string_array(array: &ArrayRef) -> Result { + let str_array = as_generic_string_array::(array)?; + let result = str_array + .iter() + .map(|opt| opt.filter(|s| !s.is_empty())) + .collect::(); + Ok(Arc::new(result)) +} + +fn spark_empty_to_null_string_view(str_view: &ArrayRef) -> Result { + let str_array = as_string_view_array(str_view)?; + let result = str_array + .iter() + .map(|opt| opt.filter(|s| !s.is_empty())) + .collect::(); + Ok(Arc::new(result)) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Array, StringArray}; + + #[test] + fn test_empty_to_null_utf8() { + let input = Arc::new(StringArray::from(vec![ + Some("a"), + Some(""), + None, + Some("b"), + ])) as ArrayRef; + let result = spark_empty_to_null_array(&input).unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + assert_eq!(result.value(0), "a"); + assert!(result.is_null(1)); // "" -> null + assert!(result.is_null(2)); // null -> null + assert_eq!(result.value(3), "b"); + } +} + diff --git a/native/spark-expr/src/string_funcs/mod.rs b/native/spark-expr/src/string_funcs/mod.rs index ce1b8009a8..fe48cb75ba 100644 --- a/native/spark-expr/src/string_funcs/mod.rs +++ b/native/spark-expr/src/string_funcs/mod.rs @@ -16,6 +16,7 @@ // under the License. mod contains; +mod empty_to_null; mod get_json_object; mod regexp_extract; mod regexp_extract_all; @@ -24,6 +25,7 @@ mod split; mod substring; pub use contains::SparkContains; +pub use empty_to_null::SparkEmpty2Null; pub use get_json_object::spark_get_json_object; pub use regexp_extract::spark_regexp_extract; pub use regexp_extract_all::spark_regexp_extract_all; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 7d5f13bb1c..b9fefb021a 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -256,7 +256,8 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { classOf[ToCharacter] -> CometToCharacter, classOf[ToNumber] -> CometToNumber, classOf[TryToNumber] -> CometTryToNumber, - classOf[Mask] -> CometMask) + classOf[Mask] -> CometMask, + classOf[Empty2Null] -> CometEmpty2Null) base ++ sparkVersionSpecificStringExpressions } diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index 05448ba653..9de630122e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -19,7 +19,7 @@ package org.apache.comet.serde -import org.apache.spark.sql.catalyst.expressions.{Attribute, BitLength, Cast, Concat, ConcatWs, Elt, Expression, FindInSet, FormatNumber, FormatString, GetJsonObject, If, InitCap, IsNull, Left, Length, Levenshtein, Like, Literal, Lower, Mask, OctetLength, Overlay, RegExpExtract, RegExpExtractAll, RegExpInStr, RegExpReplace, Right, RLike, SoundEx, StringLocate, StringLPad, StringRepeat, StringReplace, StringRPad, StringSplit, StringTranslate, Substring, SubstringIndex, ToCharacter, ToNumber, TryToNumber, UnBase64, Upper} +import org.apache.spark.sql.catalyst.expressions.{Attribute, BitLength, Cast, Concat, ConcatWs, Elt, Empty2Null, Expression, FindInSet, FormatNumber, FormatString, GetJsonObject, If, InitCap, IsNull, Left, Length, Levenshtein, Like, Literal, Lower, Mask, OctetLength, Overlay, RegExpExtract, RegExpExtractAll, RegExpInStr, RegExpReplace, Right, RLike, SoundEx, StringLocate, StringLPad, StringRepeat, StringReplace, StringRPad, StringSplit, StringTranslate, Substring, SubstringIndex, ToCharacter, ToNumber, TryToNumber, UnBase64, Upper} import org.apache.spark.sql.types.{BinaryType, DataTypes, LongType, StringType} import org.apache.spark.unsafe.types.UTF8String @@ -620,6 +620,10 @@ object CometGetJsonObject extends CometCodegenDispatch[GetJsonObject] { } } +// A internal function that converts the empty string to null for partition values. +// This function should be only used in V1Writes. +object CometEmpty2Null extends CometScalarFunction[Empty2Null]("empty2Null") + // Expressions routed through the JVM codegen dispatcher: no native implementation, so Spark's own // doGenCode runs inside the Comet pipeline, matching Spark exactly. object CometLevenshtein extends CometCodegenDispatch[Levenshtein] From 9ba7a8effd22ac7270e93d4fbe545a6ed27ad866 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Fri, 19 Jun 2026 19:53:57 +0400 Subject: [PATCH 4/7] fix pr issues --- native/spark-expr/src/comet_scalar_funcs.rs | 5 +- .../src/string_funcs/empty_to_null.rs | 151 ------------------ native/spark-expr/src/string_funcs/mod.rs | 2 - .../org/apache/comet/serde/strings.scala | 8 +- 4 files changed, 6 insertions(+), 160 deletions(-) delete mode 100644 native/spark-expr/src/string_funcs/empty_to_null.rs diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 6c5d9ea0b7..62eeaa2b1d 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -27,8 +27,8 @@ use crate::{ spark_isnan, spark_lpad, spark_make_decimal, spark_month_name, spark_read_side_padding, spark_round, spark_rpad, spark_to_time, spark_unhex, spark_unscaled_value, EvalMode, SparkArrayCompact, SparkArrayPositionFunc, SparkArraySlice, SparkArraysOverlap, SparkContains, - SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, SparkEmpty2Null, SparkMakeDate, - SparkMakeTime, SparkNextDay, SparkSecondsToTimestamp, SparkSizeFunc, + SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, SparkMakeDate, SparkMakeTime, + SparkNextDay, SparkSecondsToTimestamp, SparkSizeFunc, }; use arrow::datatypes::DataType; use datafusion::common::{DataFusionError, Result as DataFusionResult}; @@ -249,7 +249,6 @@ fn all_scalar_functions() -> Vec> { Arc::new(ScalarUDF::new_from_impl(SparkSecondsToTimestamp::default())), Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())), Arc::new(ScalarUDF::new_from_impl(JsonArrayLength::default())), - Arc::new(ScalarUDF::new_from_impl(SparkEmpty2Null::default())), ] } diff --git a/native/spark-expr/src/string_funcs/empty_to_null.rs b/native/spark-expr/src/string_funcs/empty_to_null.rs deleted file mode 100644 index 53b68920ab..0000000000 --- a/native/spark-expr/src/string_funcs/empty_to_null.rs +++ /dev/null @@ -1,151 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::array::{ArrayRef, OffsetSizeTrait, StringArray}; -use arrow::datatypes::DataType; -use datafusion::common::cast::{as_generic_string_array, as_string_view_array}; -use datafusion::common::{exec_err, Result, ScalarValue}; -use datafusion::logical_expr::{ - ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, -}; -use std::any::Any; -use std::sync::Arc; - -/// Spark `Empty2Null`: returns NULL if the string is NULL or empty (`""`), -/// otherwise returns the string itself. Used in partitioned writes to ensure -/// that an empty string ends up in `__HIVE_DEFAULT_PARTITION__`, just like NULL. -#[derive(Debug, PartialEq, Eq, Hash)] -pub struct SparkEmpty2Null { - signature: Signature, -} - -impl Default for SparkEmpty2Null { - fn default() -> Self { - Self::new() - } -} - -impl SparkEmpty2Null { - pub fn new() -> Self { - Self { - signature: Signature::variadic_any(Volatility::Immutable), - } - } -} - -impl ScalarUDFImpl for SparkEmpty2Null { - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - "empty2Null" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, arg_types: &[DataType]) -> Result { - Ok(arg_types[0].clone()) - } - - fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - spark_empty_to_null(&args.args) - } -} - -fn spark_empty_to_null(args: &[ColumnarValue]) -> Result { - if args.len() != 1 { - return exec_err!("spark empty2Null takes exactly one argument"); - } - match &args[0] { - ColumnarValue::Array(array) => { - let result = spark_empty_to_null_array(&array)?; - Ok(ColumnarValue::Array(result)) - } - ColumnarValue::Scalar(scalar) => { - let result = spark_empty_to_null_scalar(&scalar)?; - Ok(ColumnarValue::Scalar(result)) - } - } -} - -fn spark_empty_to_null_array(array: &ArrayRef) -> Result { - match array.data_type() { - DataType::Utf8 => spark_empty_to_null_string_array::(array), - DataType::LargeUtf8 => spark_empty_to_null_string_array::(array), - DataType::Utf8View => spark_empty_to_null_string_view(array), - other => { - exec_err!("unsupported data type {other:?} for function `empty2Null`") - } - } -} - -fn spark_empty_to_null_scalar(scalar: &ScalarValue) -> Result { - match scalar { - ScalarValue::Utf8(v) => Ok(ScalarValue::Utf8(v.clone().filter(|s| !s.is_empty()))), - ScalarValue::LargeUtf8(v) => { - Ok(ScalarValue::LargeUtf8(v.clone().filter(|s| !s.is_empty()))) - } - ScalarValue::Utf8View(v) => Ok(ScalarValue::Utf8View(v.clone().filter(|s| !s.is_empty()))), - other => { - exec_err!("unsupported data type {other:?} for function `empty2Null`") - } - } -} - -fn spark_empty_to_null_string_array(array: &ArrayRef) -> Result { - let str_array = as_generic_string_array::(array)?; - let result = str_array - .iter() - .map(|opt| opt.filter(|s| !s.is_empty())) - .collect::(); - Ok(Arc::new(result)) -} - -fn spark_empty_to_null_string_view(str_view: &ArrayRef) -> Result { - let str_array = as_string_view_array(str_view)?; - let result = str_array - .iter() - .map(|opt| opt.filter(|s| !s.is_empty())) - .collect::(); - Ok(Arc::new(result)) -} - -#[cfg(test)] -mod tests { - use super::*; - use arrow::array::{Array, StringArray}; - - #[test] - fn test_empty_to_null_utf8() { - let input = Arc::new(StringArray::from(vec![ - Some("a"), - Some(""), - None, - Some("b"), - ])) as ArrayRef; - let result = spark_empty_to_null_array(&input).unwrap(); - let result = result.as_any().downcast_ref::().unwrap(); - assert_eq!(result.value(0), "a"); - assert!(result.is_null(1)); // "" -> null - assert!(result.is_null(2)); // null -> null - assert_eq!(result.value(3), "b"); - } -} - diff --git a/native/spark-expr/src/string_funcs/mod.rs b/native/spark-expr/src/string_funcs/mod.rs index fe48cb75ba..ce1b8009a8 100644 --- a/native/spark-expr/src/string_funcs/mod.rs +++ b/native/spark-expr/src/string_funcs/mod.rs @@ -16,7 +16,6 @@ // under the License. mod contains; -mod empty_to_null; mod get_json_object; mod regexp_extract; mod regexp_extract_all; @@ -25,7 +24,6 @@ mod split; mod substring; pub use contains::SparkContains; -pub use empty_to_null::SparkEmpty2Null; pub use get_json_object::spark_get_json_object; pub use regexp_extract::spark_regexp_extract; pub use regexp_extract_all::spark_regexp_extract_all; diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index 9de630122e..06c9113299 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -620,10 +620,6 @@ object CometGetJsonObject extends CometCodegenDispatch[GetJsonObject] { } } -// A internal function that converts the empty string to null for partition values. -// This function should be only used in V1Writes. -object CometEmpty2Null extends CometScalarFunction[Empty2Null]("empty2Null") - // Expressions routed through the JVM codegen dispatcher: no native implementation, so Spark's own // doGenCode runs inside the Comet pipeline, matching Spark exactly. object CometLevenshtein extends CometCodegenDispatch[Levenshtein] @@ -651,3 +647,7 @@ object CometToNumber extends CometCodegenDispatch[ToNumber] object CometTryToNumber extends CometCodegenDispatch[TryToNumber] object CometMask extends CometCodegenDispatch[Mask] + +// A internal function that converts the empty string to null for partition values. +// This function should be only used in V1Writes. +object CometEmpty2Null extends CometCodegenDispatch[Empty2Null] From 85583e58479b58c529b51e7e36459bd88151ee81 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 21 Jun 2026 15:40:36 +0400 Subject: [PATCH 5/7] add tests --- .../comet/SparkInternalFunctionsSuite.scala | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 spark/src/test/scala/org/apache/spark/sql/comet/SparkInternalFunctionsSuite.scala diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/SparkInternalFunctionsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/SparkInternalFunctionsSuite.scala new file mode 100644 index 0000000000..496228070a --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/comet/SparkInternalFunctionsSuite.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import scala.util.Using + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.sql.{CometTestBase, Row} +import org.apache.spark.sql.catalyst.expressions.Empty2Null +import org.apache.spark.sql.classic.ExpressionUtils +import org.apache.spark.sql.functions._ + +import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus + +class SparkInternalFunctionsSuite extends CometTestBase { + import testImplicits._ + + test("empty2null is offloaded to Comet") { + assume(isSpark40Plus) + withParquetTable(Seq("", "a", null, "b").map(Tuple1(_)), "tbl") { + val df = sql("select _1 from tbl") + .select(ExpressionUtils.column(Empty2Null(ExpressionUtils.expression(col("_1")))).as("p")) + checkSparkAnswerAndOperator(df) + } + } + + test("partitioned write with empty string partition value") { + withTempPath { path => + Seq(("", 1), ("a", 2)) + .toDF("part", "value") + .write + .partitionBy("part") + .parquet(path.toString) + Using(FileSystem.get(spark.sparkContext.hadoopConfiguration)) { fs => + val partitions = fs + .listStatus(new Path(path.toString)) + .filter(_.isDirectory) + .map(_.getPath.getName) + .sorted + assert(partitions.contains("part=a")) + assert(!partitions.contains("part=")) + assert(partitions.count(_.startsWith("part=__HIVE_DEFAULT_PARTITION__")) == 1) + } + checkAnswer(spark.read.parquet(path.toString), Row(1, null) :: Row(2, "a") :: Nil) + } + } +} From 8422c8c1969623b98543fbe36dad5cf959d2b189 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 21 Jun 2026 15:43:50 +0400 Subject: [PATCH 6/7] add tests --- .github/workflows/pr_build_linux.yml | 1 + .github/workflows/pr_build_macos.yml | 1 + ...ctionsSuite.scala => CometSparkInternalFunctionsSuite.scala} | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) rename spark/src/test/scala/org/apache/spark/sql/comet/{SparkInternalFunctionsSuite.scala => CometSparkInternalFunctionsSuite.scala} (97%) diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 502d96a205..16edade639 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -362,6 +362,7 @@ jobs: org.apache.spark.sql.CometCollationSuite org.apache.comet.CometFuzzAggregateSuite org.apache.spark.sql.comet.execution.arrow.CometArrowStreamSuite + org.apache.spark.sql.comet.CometSparkInternalFunctionsSuite - name: "expressions" value: | org.apache.comet.CometExpressionSuite diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index f2a3e84fa1..a5ea513a9e 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -178,6 +178,7 @@ jobs: org.apache.spark.sql.CometCollationSuite org.apache.comet.CometFuzzAggregateSuite org.apache.spark.sql.comet.execution.arrow.CometArrowStreamSuite + org.apache.spark.sql.comet.CometSparkInternalFunctionsSuite - name: "expressions" value: | org.apache.comet.CometExpressionSuite diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/SparkInternalFunctionsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometSparkInternalFunctionsSuite.scala similarity index 97% rename from spark/src/test/scala/org/apache/spark/sql/comet/SparkInternalFunctionsSuite.scala rename to spark/src/test/scala/org/apache/spark/sql/comet/CometSparkInternalFunctionsSuite.scala index 496228070a..eff4f99b55 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/SparkInternalFunctionsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometSparkInternalFunctionsSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.functions._ import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus -class SparkInternalFunctionsSuite extends CometTestBase { +class CometSparkInternalFunctionsSuite extends CometTestBase { import testImplicits._ test("empty2null is offloaded to Comet") { From ebbaf7268aca562a70b77366e1266cc6b28568c5 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 21 Jun 2026 17:08:51 +0400 Subject: [PATCH 7/7] add tests --- .../parquet/CometParquetWriterSuite.scala | 24 ++++++++++++++++- .../CometSparkInternalFunctionsSuite.scala | 27 +------------------ 2 files changed, 24 insertions(+), 27 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index f6795b91a3..0a33a50450 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -21,8 +21,9 @@ package org.apache.comet.parquet import java.io.File -import scala.util.Random +import scala.util.{Random, Using} +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometNativeWriteExec, CometScanExec} import org.apache.spark.sql.execution.{FileSourceScanExec, QueryExecution, SparkPlan} @@ -37,6 +38,27 @@ class CometParquetWriterSuite extends CometTestBase { import testImplicits._ + test("partitioned write with empty string partition value") { + withTempPath { path => + Seq(("", 1), ("a", 2)) + .toDF("part", "value") + .write + .partitionBy("part") + .parquet(path.toString) + Using(FileSystem.get(spark.sparkContext.hadoopConfiguration)) { fs => + val partitions = fs + .listStatus(new Path(path.toString)) + .filter(_.isDirectory) + .map(_.getPath.getName) + .sorted + assert(partitions.contains("part=a")) + assert(!partitions.contains("part=")) + assert(partitions.count(_.startsWith("part=__HIVE_DEFAULT_PARTITION__")) == 1) + } + checkAnswer(spark.read.parquet(path.toString), Row(1, null) :: Row(2, "a") :: Nil) + } + } + test("basic parquet write") { withTempPath { dir => val outputPath = new File(dir, "output.parquet").getAbsolutePath diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometSparkInternalFunctionsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometSparkInternalFunctionsSuite.scala index eff4f99b55..5ca4b2060d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometSparkInternalFunctionsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometSparkInternalFunctionsSuite.scala @@ -19,10 +19,7 @@ package org.apache.spark.sql.comet -import scala.util.Using - -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.sql.{CometTestBase, Row} +import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.catalyst.expressions.Empty2Null import org.apache.spark.sql.classic.ExpressionUtils import org.apache.spark.sql.functions._ @@ -30,7 +27,6 @@ import org.apache.spark.sql.functions._ import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus class CometSparkInternalFunctionsSuite extends CometTestBase { - import testImplicits._ test("empty2null is offloaded to Comet") { assume(isSpark40Plus) @@ -40,25 +36,4 @@ class CometSparkInternalFunctionsSuite extends CometTestBase { checkSparkAnswerAndOperator(df) } } - - test("partitioned write with empty string partition value") { - withTempPath { path => - Seq(("", 1), ("a", 2)) - .toDF("part", "value") - .write - .partitionBy("part") - .parquet(path.toString) - Using(FileSystem.get(spark.sparkContext.hadoopConfiguration)) { fs => - val partitions = fs - .listStatus(new Path(path.toString)) - .filter(_.isDirectory) - .map(_.getPath.getName) - .sorted - assert(partitions.contains("part=a")) - assert(!partitions.contains("part=")) - assert(partitions.count(_.startsWith("part=__HIVE_DEFAULT_PARTITION__")) == 1) - } - checkAnswer(spark.read.parquet(path.toString), Row(1, null) :: Row(2, "a") :: Nil) - } - } }