2020import org .apache .calcite .rex .RexUtil ;
2121import org .apache .calcite .rex .RexWindow ;
2222import org .apache .calcite .sql .SqlKind ;
23+ import org .apache .calcite .sql .type .SqlTypeName ;
2324import org .apache .calcite .tools .RelBuilder ;
2425import org .apache .calcite .util .mapping .Mapping ;
2526import org .apache .logging .log4j .LogManager ;
@@ -76,15 +77,19 @@ protected void apply(
7677 }
7778
7879 List <RexNode > dedupColumns = windows .get (0 ).partitionKeys ;
80+ if (dedupColumns .stream ()
81+ .filter (rex -> rex .isA (SqlKind .INPUT_REF ))
82+ .anyMatch (rex -> rex .getType ().getSqlTypeName () == SqlTypeName .MAP )) {
83+ LOG .debug ("Cannot pushdown the dedup since the dedup fields contains MAP type" );
84+ // TODO https://github.com/opensearch-project/sql/issues/4564
85+ return ;
86+ }
7987 if (projectWithWindow .getProjects ().stream ()
8088 .filter (rex -> !rex .isA (SqlKind .ROW_NUMBER ))
8189 .filter (Predicate .not (dedupColumns ::contains ))
8290 .anyMatch (rex -> !rex .isA (SqlKind .INPUT_REF ))) {
83- // We can push down:
84- // | eval new_gender = lower(gender) | fields new_gender, age | dedup 1 new_gender
85- // But cannot push down:
86- // | eval new_age = age + 1 | fields gender, new_age | dedup 1 gender
8791 // TODO fallback to the approach of Collapse search
92+ // | eval new_age = age + 1 | fields gender, new_age | dedup 1 gender
8893 if (LOG .isDebugEnabled ()) {
8994 LOG .debug (
9095 "Cannot pushdown the dedup since the final outputs contain a column which is not"
@@ -99,8 +104,12 @@ protected void apply(
99104 .filter (rex -> rex instanceof RexCall )
100105 .toList ();
101106 if (!rexCallsExceptWindow .isEmpty ()
102- && hasRexCallAppliedOnDedupColumn (rexCallsExceptWindow , dedupColumns )) {
107+ && dedupColumnsContainRexCall (rexCallsExceptWindow , dedupColumns )) {
103108 // TODO https://github.com/opensearch-project/sql/issues/4789
109+ // | eval new_gender = lower(gender) | fields new_gender, age | dedup 1 new_gender
110+ if (LOG .isDebugEnabled ()) {
111+ LOG .debug ("Cannot pushdown the dedup since the dedup columns contain RexCall" );
112+ }
104113 return ;
105114 }
106115
@@ -114,24 +123,25 @@ && hasRexCallAppliedOnDedupColumn(rexCallsExceptWindow, dedupColumns)) {
114123 // Aggregate(literalAgg(dedupNumer), groups)
115124 // +- Project(groups, remaining)
116125 // +- Scan
117- // 1. push Scan
126+ // 1. Initial a RelBuilder to build aggregate by pushing Scan and Project
118127 RelBuilder relBuilder = call .builder ();
119128 relBuilder .push (scan );
129+ relBuilder .push (projectWithWindow ); // baseline the rowType to handle rename case
120130 Mapping mappingForDedupColumns =
121131 PlanUtils .mapping (dedupColumns , relBuilder .peek ().getRowType ());
122- // Create new Project: grouping first, then remaining finalOutput columns
132+
133+ // 2. Push a Project which groups is first, then remaining finalOutput columns
123134 List <RexNode > reordered = new ArrayList <>(PlanUtils .getInputRefs (dedupColumns ));
124135 projectWithWindow .getProjects ().stream ()
125136 .filter (rex -> !rex .isA (SqlKind .ROW_NUMBER ))
126137 .filter (Predicate .not (dedupColumns ::contains ))
127138 .forEach (reordered ::add );
128- // 2. push Project
129139 relBuilder .project (reordered , ImmutableList .of (), true );
130140 // childProject includes all list of finalOutput columns
131141 LogicalProject childProject = (LogicalProject ) relBuilder .peek ();
132142
143+ // 3. Push an Aggregate
133144 final List <RexNode > newDedupColumns = RexUtil .apply (mappingForDedupColumns , dedupColumns );
134- // 3. push Aggregate
135145 relBuilder .aggregate (relBuilder .groupKey (newDedupColumns ), relBuilder .literalAgg (dedupNumer ));
136146 PlanUtils .addIgnoreNullBucketHintToAggregate (relBuilder );
137147 // peek the aggregate after hint being added
@@ -145,13 +155,12 @@ && hasRexCallAppliedOnDedupColumn(rexCallsExceptWindow, dedupColumns)) {
145155 }
146156 }
147157
148- private static boolean hasRexCallAppliedOnDedupColumn (
158+ private static boolean dedupColumnsContainRexCall (
149159 List <RexNode > calls , List <RexNode > dedupColumns ) {
150160 List <Integer > dedupColumnIndicesFromCall =
151161 PlanUtils .getSelectColumns (calls ).stream ().distinct ().toList ();
152162 List <Integer > dedupColumnsIndicesFromPartitionKeys =
153163 PlanUtils .getSelectColumns (dedupColumns ).stream ().distinct ().toList ();
154- // check dedupColumnIndicesFromCall equals to dedupColumnsIndicesFromPartitionKeys
155164 return dedupColumnsIndicesFromPartitionKeys .stream ()
156165 .anyMatch (dedupColumnIndicesFromCall ::contains );
157166 }
0 commit comments