Skip to content

Commit ada3501

Browse files
committed
feat: add CometMapSort serializer
1 parent c514fda commit ada3501

2 files changed

Lines changed: 23 additions & 2 deletions

File tree

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ object QueryPlanSerde extends Logging with CometExprShim {
131131
classOf[MapValues] -> CometMapValues,
132132
classOf[MapFromArrays] -> CometMapFromArrays,
133133
classOf[MapContainsKey] -> CometMapContainsKey,
134-
classOf[MapFromEntries] -> CometMapFromEntries)
134+
classOf[MapFromEntries] -> CometMapFromEntries,
135+
classOf[MapSort] -> CometMapSort)
135136

136137
private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
137138
classOf[CreateNamedStruct] -> CometCreateNamedStruct,

spark/src/main/scala/org/apache/comet/serde/maps.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
package org.apache.comet.serde
2121

2222
import org.apache.spark.sql.catalyst.expressions._
23+
import org.apache.spark.sql.catalyst.expressions.objects.RowOrdering
2324
import org.apache.spark.sql.types._
2425

25-
import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto}
26+
import org.apache.comet.CometSparkSessionExtensions.withInfo
27+
import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType}
2628

2729
object CometMapKeys extends CometExpressionSerde[MapKeys] {
2830

@@ -156,3 +158,21 @@ object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from
156158
Compatible(None)
157159
}
158160
}
161+
162+
object CometMapSort extends CometExpressionSerde[MapSort] {
163+
164+
override def convert(
165+
expr: MapSort,
166+
inputs: Seq[Attribute],
167+
binding: Boolean): Option[ExprOuterClass.Expr] = {
168+
val keyType = expr.base.dataType.asInstanceOf[MapType].keyType
169+
if (!RowOrdering.isOrderable(keyType)) {
170+
withInfo(expr, s"map_sort requires orderable key type, got: $keyType")
171+
return None
172+
}
173+
174+
val childExpr = exprToProtoInternal(expr.base, inputs, binding)
175+
val mapSortScalarExpr = scalarFunctionExprToProto("map_sort", childExpr)
176+
optExprWithInfo(mapSortScalarExpr, expr, expr.children: _*)
177+
}
178+
}

0 commit comments

Comments
 (0)