Skip to content

Commit a3136ed

Browse files
author
Kazantsev Maksim
committed
work
1 parent 0ef353c commit a3136ed

3 files changed

Lines changed: 36 additions & 19 deletions

File tree

native/core/src/execution/planner.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ use num::{BigInt, ToPrimitive};
134134
use object_store::path::Path;
135135
use std::cmp::max;
136136
use std::{collections::HashMap, sync::Arc};
137+
use datafusion_functions_nested::map::map;
137138
use url::Url;
138139

139140
// For clippy error on type_complexity.
@@ -676,6 +677,14 @@ impl PhysicalPlanner {
676677
ExprStruct::MonotonicallyIncreasingId(_) => Ok(Arc::new(
677678
MonotonicallyIncreasingId::from_partition_id(self.partition),
678679
)),
680+
ExprStruct::CreateMap(expr) => {
681+
let keys = expr.keys.iter().map(|expr| self.create_expr(expr, Arc::clone(&input_schema)))
682+
.collect::<Vec<_>>();
683+
let values = expr.values.iter().map(|expr| self.create_expr(expr, Arc::clone(&input_schema)))
684+
.collect::<Result<Vec<_>, _>>()?;
685+
let create_map = map(keys, values);
686+
Ok(Arc::new(create_map))
687+
},
679688
expr => Err(GeneralError(format!("Not implemented: {expr:?}"))),
680689
}
681690
}

native/proto/src/proto/expr.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ message Expr {
8585
Rand randn = 62;
8686
EmptyExpr spark_partition_id = 63;
8787
EmptyExpr monotonically_increasing_id = 64;
88+
CreateMap create_map = 65;
8889
}
8990
}
9091

@@ -410,3 +411,8 @@ message ArrayJoin {
410411
message Rand {
411412
int64 seed = 1;
412413
}
414+
415+
message CreateMap {
416+
repeated Expr keys = 1;
417+
repeated Expr values = 2;
418+
}

spark/src/main/scala/org/apache/comet/serde/maps.scala

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919

2020
package org.apache.comet.serde
2121

22+
import scala.jdk.CollectionConverters._
23+
2224
import org.apache.spark.sql.catalyst.expressions._
2325
import org.apache.spark.sql.types.{ArrayType, MapType}
2426

25-
import org.apache.comet.serde.QueryPlanSerde.{exprToProto, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType}
27+
import org.apache.comet.CometSparkSessionExtensions.withInfo
28+
import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType}
2629

2730
object CometMapKeys extends CometExpressionSerde[MapKeys] {
2831

@@ -95,23 +98,22 @@ object CometCreateMap extends CometExpressionSerde[CreateMap] {
9598
expr: CreateMap,
9699
inputs: Seq[Attribute],
97100
binding: Boolean): Option[ExprOuterClass.Expr] = {
98-
val keys = CreateArray(expr.keys)
99-
val values = CreateArray(expr.values)
100-
val keysProtoExpr = exprToProtoInternal(keys, inputs, binding)
101-
val valuesProtoExpr = exprToProtoInternal(values, inputs, binding)
102-
// scalastyle:off println
103-
println(keysProtoExpr)
104-
// scalastyle:on println line=102 column=4
105-
// scalastyle:off println
106-
// println(valuesProtoExpr)
107-
// scalastyle:on println line=103 column=4
108-
val createMapScalarExpr =
109-
scalarFunctionExprToProtoWithReturnType(
110-
"map",
111-
expr.dataType,
112-
false,
113-
keysProtoExpr,
114-
valuesProtoExpr)
115-
optExprWithInfo(createMapScalarExpr, expr, expr.children: _*)
101+
val keysProtoExpr = expr.keys.map(exprToProtoInternal(_, inputs, binding))
102+
val valuesProtoExpr = expr.values.map(exprToProtoInternal(_, inputs, binding))
103+
if (keysProtoExpr.forall(_.isDefined) && valuesProtoExpr.forall(_.isDefined)) {
104+
val createMapProtoExpr = ExprOuterClass.CreateMap
105+
.newBuilder()
106+
.addAllValues(keysProtoExpr.map(_.get).asJava)
107+
.addAllValues(valuesProtoExpr.map(_.get).asJava)
108+
.build()
109+
Some(
110+
ExprOuterClass.Expr
111+
.newBuilder()
112+
.setCreateMap(createMapProtoExpr)
113+
.build())
114+
} else {
115+
withInfo(expr, expr.children: _*)
116+
None
117+
}
116118
}
117119
}

0 commit comments

Comments
 (0)