Skip to content

Commit 0d46563

Browse files
committed
Re-index collations when pushing down projection
Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
1 parent 4c7f759 commit 0d46563

1 file changed

Lines changed: 47 additions & 23 deletions

File tree

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@
77

88
import com.google.common.collect.ImmutableList;
99
import java.util.ArrayList;
10-
import java.util.LinkedList;
10+
import java.util.LinkedHashMap;
1111
import java.util.List;
1212
import java.util.Map;
13-
import java.util.OptionalInt;
13+
import java.util.Objects;
1414
import java.util.stream.Collectors;
15-
import java.util.stream.IntStream;
1615
import lombok.Getter;
1716
import org.apache.calcite.plan.Convention;
1817
import org.apache.calcite.plan.RelOptCluster;
@@ -131,13 +130,30 @@ public CalciteLogicalIndexScan pushDownProject(List<Integer> selectedColumns) {
131130
builder.add(fieldList.get(project));
132131
}
133132
RelDataType newSchema = builder.build();
134-
CalciteLogicalIndexScan newScan = this.copyWithNewSchema(newSchema);
133+
134+
// Projection may alter the index of the collations.
135+
// E.g. For sort age
136+
// `Sort($1)\n TableScan(name, age)` may become
137+
// `Sort($0)\n Project(age)\n TableScan(name, age)` after projection.
138+
RelTraitSet traitSetWithReIndexedCollations = reIndexCollations(selectedColumns);
139+
140+
CalciteLogicalIndexScan newScan =
141+
new CalciteLogicalIndexScan(
142+
getCluster(),
143+
traitSetWithReIndexedCollations,
144+
hints,
145+
table,
146+
osIndex,
147+
newSchema,
148+
pushDownContext.clone());
149+
135150
Map<String, String> aliasMapping = this.osIndex.getAliasMapping();
136151
// For alias types, we need to push down its original path instead of the alias name.
137152
List<String> projectedFields =
138153
newSchema.getFieldNames().stream()
139154
.map(fieldName -> aliasMapping.getOrDefault(fieldName, fieldName))
140155
.toList();
156+
141157
newScan.pushDownContext.add(
142158
PushDownAction.of(
143159
PushDownType.PROJECT,
@@ -146,6 +162,24 @@ public CalciteLogicalIndexScan pushDownProject(List<Integer> selectedColumns) {
146162
return newScan;
147163
}
148164

165+
private RelTraitSet reIndexCollations(List<Integer> selectedColumns) {
166+
RelTraitSet newTraitSet;
167+
RelCollation relCollation = getTraitSet().getCollation();
168+
if (!Objects.isNull(relCollation) && !relCollation.getFieldCollations().isEmpty()) {
169+
List<RelFieldCollation> newCollations =
170+
relCollation.getFieldCollations().stream()
171+
.filter(collation -> selectedColumns.contains(collation.getFieldIndex()))
172+
.map(
173+
collation ->
174+
collation.withFieldIndex(selectedColumns.indexOf(collation.getFieldIndex())))
175+
.collect(Collectors.toList());
176+
newTraitSet = getTraitSet().plus(RelCollations.of(newCollations));
177+
} else {
178+
newTraitSet = getTraitSet();
179+
}
180+
return newTraitSet;
181+
}
182+
149183
public CalciteLogicalIndexScan pushDownAggregate(Aggregate aggregate) {
150184
try {
151185
CalciteLogicalIndexScan newScan = this.copyWithNewSchema(aggregate.getRowType());
@@ -267,25 +301,15 @@ public CalciteLogicalIndexScan pushDownSort(List<RelFieldCollation> collations)
267301
*/
268302
private static List<RelFieldCollation> mergeCollations(
269303
List<RelFieldCollation> existingCollations, List<RelFieldCollation> newCollations) {
270-
// We add new collations first, then existing collations
271-
// Consider `sort a | sort b`, the second sort should take precedence
272-
List<RelFieldCollation> concatenatedCollations = new ArrayList<>(newCollations);
273-
concatenatedCollations.addAll(existingCollations);
274-
// Within the same group of collations, the first occurrence of each field index
275-
// should take precedence, so we need to remove duplicates while preserving the order
276-
LinkedList<RelFieldCollation> mergedCollations = new LinkedList<>();
277-
for (RelFieldCollation collation : concatenatedCollations) {
278-
// If the collation is already in the merged list, remove it from the list before adding
279-
// This is because the sort that comes later in the list should take precedence
280-
OptionalInt index =
281-
IntStream.range(0, mergedCollations.size())
282-
.filter(i -> mergedCollations.get(i).getFieldIndex() == collation.getFieldIndex())
283-
.findFirst();
284-
if (index.isPresent()) {
285-
mergedCollations.remove(index.getAsInt());
286-
}
287-
mergedCollations.add(collation);
304+
Map<Integer, RelFieldCollation> mergedCollations = new LinkedHashMap<>();
305+
306+
for (RelFieldCollation collation : newCollations) {
307+
mergedCollations.put(collation.getFieldIndex(), collation);
308+
}
309+
310+
for (RelFieldCollation collation : existingCollations) {
311+
mergedCollations.put(collation.getFieldIndex(), collation);
288312
}
289-
return mergedCollations;
313+
return new ArrayList<>(mergedCollations.values());
290314
}
291315
}

0 commit comments

Comments
 (0)