|
20 | 20 | package org.apache.comet.serde |
21 | 21 |
|
22 | 22 | import org.apache.spark.sql.catalyst.expressions._ |
| 23 | +import org.apache.spark.sql.catalyst.expressions.objects.RowOrdering |
23 | 24 | import org.apache.spark.sql.types.{ArrayType, MapType} |
24 | 25 |
|
| 26 | +import org.apache.comet.CometSparkSessionExtensions.withInfo |
25 | 27 | import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} |
26 | 28 |
|
27 | 29 | object CometMapKeys extends CometExpressionSerde[MapKeys] { |
@@ -89,3 +91,21 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { |
89 | 91 | optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) |
90 | 92 | } |
91 | 93 | } |
| 94 | + |
| 95 | +object CometMapSort extends CometExpressionSerde[MapSort] { |
| 96 | + |
| 97 | + override def convert( |
| 98 | + expr: MapSort, |
| 99 | + inputs: Seq[Attribute], |
| 100 | + binding: Boolean): Option[ExprOuterClass.Expr] = { |
| 101 | + val keyType = expr.base.dataType.asInstanceOf[MapType].keyType |
| 102 | + if (!RowOrdering.isOrderable(keyType)) { |
| 103 | + withInfo(expr, s"map_sort requires orderable key type, got: $keyType") |
| 104 | + return None |
| 105 | + } |
| 106 | + |
| 107 | + val childExpr = exprToProtoInternal(expr.base, inputs, binding) |
| 108 | + val mapSortScalarExpr = scalarFunctionExprToProto("map_sort", childExpr) |
| 109 | + optExprWithInfo(mapSortScalarExpr, expr, expr.children: _*) |
| 110 | + } |
| 111 | +} |
0 commit comments