Skip to content

Commit 0480e82

Browse files
committed
fix: pass return type for MapSort serde
The MapSort serde for Spark 4.0 called scalarFunctionExprToProto without a return type. The Rust planner then looked up "map_sort" in the session UDF registry to infer the type, but map_sort is only handled via the create_comet_physical_fun match dispatch, not registered as a UDF, causing "There is no UDF named 'map_sort' in the registry" at execution time (e.g., group-by on a map column in CollationSuite). Pass ms.dataType explicitly via scalarFunctionExprToProtoWithReturnType, matching the pattern used by ceil, floor, and other scalar functions.
1 parent f99ba23 commit 0480e82

2 files changed

Lines changed: 19 additions & 2 deletions

File tree

spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.comet.CometSparkSessionExtensions.withInfo
3030
import org.apache.comet.expressions.{CometCast, CometEvalMode}
3131
import org.apache.comet.serde.{CommonStringExprs, Compatible, ExprOuterClass, Incompatible}
3232
import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr}
33-
import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto}
33+
import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType}
3434

3535
/**
3636
* `CometExprShim` acts as a shim for parsing expressions from different Spark versions.
@@ -130,7 +130,11 @@ trait CometExprShim extends CommonStringExprs {
130130

131131
case ms: MapSort =>
132132
val childExpr = exprToProtoInternal(ms.child, inputs, binding)
133-
val mapSortExpr = scalarFunctionExprToProto("map_sort", childExpr)
133+
val mapSortExpr = scalarFunctionExprToProtoWithReturnType(
134+
"map_sort",
135+
ms.dataType,
136+
failOnError = false,
137+
childExpr)
134138
optExprWithInfo(mapSortExpr, ms, ms.child)
135139

136140
case _ => None

spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.functions._
2727
import org.apache.spark.sql.internal.SQLConf
2828
import org.apache.spark.sql.types.BinaryType
2929

30+
import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
3031
import org.apache.comet.serde.CometMapFromEntries
3132
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions}
3233

@@ -221,6 +222,18 @@ class CometMapExpressionSuite extends CometTestBase {
221222
}
222223
}
223224

225+
test("group by map column with string values") {
226+
assume(isSpark40Plus, "Spark 4.0 inserts MapSort for group-by on map keys")
227+
withTable("t_map_group") {
228+
sql("""
229+
|CREATE TABLE t_map_group USING parquet AS
230+
|SELECT map(cast(id as string), cast(id + 100 as string)) as m
231+
|FROM range(5)
232+
""".stripMargin)
233+
checkSparkAnswer(sql("SELECT m, count(*) FROM t_map_group GROUP BY m"))
234+
}
235+
}
236+
224237
test("map_from_entries - fallback for binary type") {
225238
def fallbackReason(reason: String) = reason
226239
val table = "t2"

0 commit comments

Comments
 (0)