Skip to content

Commit 234f608

Browse files
authored
Pushdown the top rare commands to nested aggregation (opensearch-project#4707)
* Optimize the top rare commands to nested aggregation Signed-off-by: Lantao Jin <ltjin@amazon.com> * Fix cost computation Signed-off-by: Lantao Jin <ltjin@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent 9953a5a commit 234f608

45 files changed

Lines changed: 552 additions & 269 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
1616
import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC;
1717
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_DEDUP;
18-
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_NAME_MAIN;
19-
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_NAME_SUBSEARCH;
20-
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_NAME_TOP_RARE;
18+
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_MAIN;
19+
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_RARE_TOP;
20+
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_SUBSEARCH;
2121
import static org.opensearch.sql.calcite.utils.PlanUtils.getRelation;
2222
import static org.opensearch.sql.calcite.utils.PlanUtils.getRexCall;
2323
import static org.opensearch.sql.calcite.utils.PlanUtils.transformPlanToAttachChild;
@@ -1643,7 +1643,7 @@ public RelNode visitAppendCol(AppendCol node, CalcitePlanContext context) {
16431643
List.of(),
16441644
WindowFrame.toCurrentRow());
16451645
context.relBuilder.projectPlus(
1646-
context.relBuilder.alias(mainRowNumber, ROW_NUMBER_COLUMN_NAME_MAIN));
1646+
context.relBuilder.alias(mainRowNumber, ROW_NUMBER_COLUMN_FOR_MAIN));
16471647

16481648
// 3. build subsearch tree (attach relation to subsearch)
16491649
UnresolvedPlan relation = getRelation(node);
@@ -1661,7 +1661,7 @@ public RelNode visitAppendCol(AppendCol node, CalcitePlanContext context) {
16611661
List.of(),
16621662
WindowFrame.toCurrentRow());
16631663
context.relBuilder.projectPlus(
1664-
context.relBuilder.alias(subsearchRowNumber, ROW_NUMBER_COLUMN_NAME_SUBSEARCH));
1664+
context.relBuilder.alias(subsearchRowNumber, ROW_NUMBER_COLUMN_FOR_SUBSEARCH));
16651665

16661666
List<String> subsearchFields = context.relBuilder.peek().getRowType().getFieldNames();
16671667
List<String> mainFields = context.relBuilder.peek(1).getRowType().getFieldNames();
@@ -1675,17 +1675,17 @@ public RelNode visitAppendCol(AppendCol node, CalcitePlanContext context) {
16751675
// 7. join with condition `_row_number_main_ = _row_number_subsearch_`
16761676
RexNode joinCondition =
16771677
context.relBuilder.equals(
1678-
context.relBuilder.field(2, 0, ROW_NUMBER_COLUMN_NAME_MAIN),
1679-
context.relBuilder.field(2, 1, ROW_NUMBER_COLUMN_NAME_SUBSEARCH));
1678+
context.relBuilder.field(2, 0, ROW_NUMBER_COLUMN_FOR_MAIN),
1679+
context.relBuilder.field(2, 1, ROW_NUMBER_COLUMN_FOR_SUBSEARCH));
16801680
context.relBuilder.join(
16811681
JoinAndLookupUtils.translateJoinType(Join.JoinType.FULL), joinCondition);
16821682

16831683
if (!node.isOverride()) {
16841684
// 8. if override = false, drop both _row_number_ columns
16851685
context.relBuilder.projectExcept(
16861686
List.of(
1687-
context.relBuilder.field(ROW_NUMBER_COLUMN_NAME_MAIN),
1688-
context.relBuilder.field(ROW_NUMBER_COLUMN_NAME_SUBSEARCH)));
1687+
context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_MAIN),
1688+
context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_SUBSEARCH)));
16891689
return context.relBuilder.peek();
16901690
} else {
16911691
// 9. if override = true, override the duplicated columns in main by subsearch values
@@ -1697,11 +1697,11 @@ public RelNode visitAppendCol(AppendCol node, CalcitePlanContext context) {
16971697
mainFields.stream().filter(subsearchFields::contains).collect(Collectors.toSet());
16981698
RexNode caseCondition =
16991699
context.relBuilder.equals(
1700-
context.relBuilder.field(ROW_NUMBER_COLUMN_NAME_MAIN),
1701-
context.relBuilder.field(ROW_NUMBER_COLUMN_NAME_SUBSEARCH));
1700+
context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_MAIN),
1701+
context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_SUBSEARCH));
17021702
for (int mainFieldIndex = 0; mainFieldIndex < mainFields.size(); mainFieldIndex++) {
17031703
String mainFieldName = mainFields.get(mainFieldIndex);
1704-
if (mainFieldName.equals(ROW_NUMBER_COLUMN_NAME_MAIN)) {
1704+
if (mainFieldName.equals(ROW_NUMBER_COLUMN_FOR_MAIN)) {
17051705
continue;
17061706
}
17071707
finalFieldNames.add(mainFieldName);
@@ -1726,7 +1726,7 @@ public RelNode visitAppendCol(AppendCol node, CalcitePlanContext context) {
17261726
subsearchFieldIndex < subsearchFields.size();
17271727
subsearchFieldIndex++) {
17281728
String subsearchFieldName = subsearchFields.get(subsearchFieldIndex);
1729-
if (subsearchFieldName.equals(ROW_NUMBER_COLUMN_NAME_SUBSEARCH)) {
1729+
if (subsearchFieldName.equals(ROW_NUMBER_COLUMN_FOR_SUBSEARCH)) {
17301730
continue;
17311731
}
17321732
if (!duplicatedFields.contains(subsearchFieldName)) {
@@ -1887,14 +1887,15 @@ public RelNode visitRareTopN(RareTopN node, CalcitePlanContext context) {
18871887
addIgnoreNullBucketHintToAggregate(context);
18881888
}
18891889

1890-
// 2. add a window column
1890+
// 2. add count() column with sort direction
18911891
List<RexNode> partitionKeys = rexVisitor.analyze(node.getGroupExprList(), context);
18921892
RexNode countField;
18931893
if (node.getCommandType() == RareTopN.CommandType.TOP) {
18941894
countField = context.relBuilder.desc(context.relBuilder.field(countFieldName));
18951895
} else {
18961896
countField = context.relBuilder.field(countFieldName);
18971897
}
1898+
18981899
RexNode rowNumberWindowOver =
18991900
PlanUtils.makeOver(
19001901
context,
@@ -1905,22 +1906,22 @@ public RelNode visitRareTopN(RareTopN node, CalcitePlanContext context) {
19051906
List.of(countField),
19061907
WindowFrame.toCurrentRow());
19071908
context.relBuilder.projectPlus(
1908-
context.relBuilder.alias(rowNumberWindowOver, ROW_NUMBER_COLUMN_NAME_TOP_RARE));
1909+
context.relBuilder.alias(rowNumberWindowOver, ROW_NUMBER_COLUMN_FOR_RARE_TOP));
19091910

19101911
// 3. filter row_number() <= k in each partition
19111912
int k = node.getNoOfResults();
19121913
context.relBuilder.filter(
19131914
context.relBuilder.lessThanOrEqual(
1914-
context.relBuilder.field(ROW_NUMBER_COLUMN_NAME_TOP_RARE),
1915+
context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_RARE_TOP),
19151916
context.relBuilder.literal(k)));
19161917

19171918
// 4. project final output. the default output is group by list + field list
19181919
Boolean showCount = (Boolean) argumentMap.get(RareTopN.Option.showCount.name()).getValue();
19191920
if (showCount) {
1920-
context.relBuilder.projectExcept(context.relBuilder.field(ROW_NUMBER_COLUMN_NAME_TOP_RARE));
1921+
context.relBuilder.projectExcept(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_RARE_TOP));
19211922
} else {
19221923
context.relBuilder.projectExcept(
1923-
context.relBuilder.field(ROW_NUMBER_COLUMN_NAME_TOP_RARE),
1924+
context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_RARE_TOP),
19241925
context.relBuilder.field(countFieldName));
19251926
}
19261927
return context.relBuilder.peek();

core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@ public interface PlanUtils {
6262
/** this is only for dedup command, do not reuse it in other command */
6363
String ROW_NUMBER_COLUMN_FOR_DEDUP = "_row_number_dedup_";
6464

65-
String ROW_NUMBER_COLUMN_NAME_TOP_RARE = "_row_number_top_rare_";
66-
String ROW_NUMBER_COLUMN_NAME_MAIN = "_row_number_main_";
67-
String ROW_NUMBER_COLUMN_NAME_SUBSEARCH = "_row_number_subsearch_";
65+
String ROW_NUMBER_COLUMN_FOR_RARE_TOP = "_row_number_rare_top_";
66+
String ROW_NUMBER_COLUMN_FOR_MAIN = "_row_number_main_";
67+
String ROW_NUMBER_COLUMN_FOR_SUBSEARCH = "_row_number_subsearch_";
6868

6969
static SpanUnit intervalUnitToSpanUnit(IntervalUnit unit) {
7070
return switch (unit) {
@@ -447,10 +447,18 @@ static RexNode derefMapCall(RexNode rexNode) {
447447
return rexNode;
448448
}
449449

450-
/** Check if contains RexOver */
450+
/** Check if contains RexOver introduced by dedup */
451451
static boolean containsRowNumberDedup(LogicalProject project) {
452452
return project.getProjects().stream()
453-
.anyMatch(p -> p instanceof RexOver && p.getKind() == SqlKind.ROW_NUMBER);
453+
.anyMatch(p -> p instanceof RexOver && p.getKind() == SqlKind.ROW_NUMBER)
454+
&& project.getRowType().getFieldNames().contains(ROW_NUMBER_COLUMN_FOR_DEDUP);
455+
}
456+
457+
/** Check if contains RexOver introduced by dedup top/rare */
458+
static boolean containsRowNumberRareTop(LogicalProject project) {
459+
return project.getProjects().stream()
460+
.anyMatch(p -> p instanceof RexOver && p.getKind() == SqlKind.ROW_NUMBER)
461+
&& project.getRowType().getFieldNames().contains(ROW_NUMBER_COLUMN_FOR_RARE_TOP);
454462
}
455463

456464
/** Get all RexWindow list from LogicalProject */

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,29 +1049,29 @@ public void testExplainCountsByAgg() throws IOException {
10491049
}
10501050

10511051
@Test
1052-
public void testExplainSortOnMetrics() throws IOException {
1052+
public void testExplainSortOnMeasure() throws IOException {
10531053
enabledOnlyWhenPushdownIsEnabled();
1054-
String expected = loadExpectedPlan("explain_agg_sort_on_metrics1.yaml");
1054+
String expected = loadExpectedPlan("explain_agg_sort_on_measure1.yaml");
10551055
assertYamlEqualsIgnoreId(
10561056
expected,
10571057
explainQueryYaml(
10581058
"source=opensearch-sql_test_index_account | stats bucket_nullable=false count() by"
10591059
+ " state | sort `count()`"));
1060-
expected = loadExpectedPlan("explain_agg_sort_on_metrics2.yaml");
1060+
expected = loadExpectedPlan("explain_agg_sort_on_measure2.yaml");
10611061
assertYamlEqualsIgnoreId(
10621062
expected,
10631063
explainQueryYaml(
10641064
"source=opensearch-sql_test_index_account | stats bucket_nullable=false sum(balance)"
10651065
+ " as sum by state | sort - sum"));
10661066
// TODO limit should pushdown to non-composite agg
1067-
expected = loadExpectedPlan("explain_agg_sort_on_metrics3.yaml");
1067+
expected = loadExpectedPlan("explain_agg_sort_on_measure3.yaml");
10681068
assertYamlEqualsIgnoreId(
10691069
expected,
10701070
explainQueryYaml(
10711071
String.format(
10721072
"source=%s | stats count() as cnt by span(birthdate, 1d) | sort - cnt",
10731073
TEST_INDEX_BANK)));
1074-
expected = loadExpectedPlan("explain_agg_sort_on_metrics4.yaml");
1074+
expected = loadExpectedPlan("explain_agg_sort_on_measure4.yaml");
10751075
assertYamlEqualsIgnoreId(
10761076
expected,
10771077
explainQueryYaml(
@@ -1082,9 +1082,9 @@ public void testExplainSortOnMetrics() throws IOException {
10821082
}
10831083

10841084
@Test
1085-
public void testExplainSortOnMetricsMultiTerms() throws IOException {
1085+
public void testExplainSortOnMeasureMultiTerms() throws IOException {
10861086
enabledOnlyWhenPushdownIsEnabled();
1087-
String expected = loadExpectedPlan("explain_agg_sort_on_metrics_multi_terms.yaml");
1087+
String expected = loadExpectedPlan("explain_agg_sort_on_measure_multi_terms.yaml");
10881088
assertYamlEqualsIgnoreId(
10891089
expected,
10901090
explainQueryYaml(
@@ -1093,11 +1093,11 @@ public void testExplainSortOnMetricsMultiTerms() throws IOException {
10931093
}
10941094

10951095
@Test
1096-
public void testExplainCompositeMultiBucketsAutoDateThenSortOnMetricsNotPushdown()
1096+
public void testExplainCompositeMultiBucketsAutoDateThenSortOnMeasureNotPushdown()
10971097
throws IOException {
10981098
enabledOnlyWhenPushdownIsEnabled();
10991099
assertYamlEqualsIgnoreId(
1100-
loadExpectedPlan("agg_composite_multi_terms_autodate_sort_agg_metric_not_push.yaml"),
1100+
loadExpectedPlan("agg_composite_multi_terms_autodate_sort_agg_measure_not_push.yaml"),
11011101
explainQueryYaml(
11021102
String.format(
11031103
"source=%s | bin timestamp bins=3 | stats bucket_nullable=false avg(value), count()"
@@ -1106,10 +1106,10 @@ public void testExplainCompositeMultiBucketsAutoDateThenSortOnMetricsNotPushdown
11061106
}
11071107

11081108
@Test
1109-
public void testExplainCompositeRangeThenSortOnMetricsNotPushdown() throws IOException {
1109+
public void testExplainCompositeRangeThenSortOnMeasureNotPushdown() throws IOException {
11101110
enabledOnlyWhenPushdownIsEnabled();
11111111
assertYamlEqualsIgnoreId(
1112-
loadExpectedPlan("agg_composite_range_sort_agg_metric_not_push.yaml"),
1112+
loadExpectedPlan("agg_composite_range_sort_agg_measure_not_push.yaml"),
11131113
explainQueryYaml(
11141114
String.format(
11151115
"source=%s | eval value_range = case(value < 7000, 'small'"
@@ -1119,10 +1119,10 @@ public void testExplainCompositeRangeThenSortOnMetricsNotPushdown() throws IOExc
11191119
}
11201120

11211121
@Test
1122-
public void testExplainCompositeAutoDateThenSortOnMetricsNotPushdown() throws IOException {
1122+
public void testExplainCompositeAutoDateThenSortOnMeasureNotPushdown() throws IOException {
11231123
enabledOnlyWhenPushdownIsEnabled();
11241124
assertYamlEqualsIgnoreId(
1125-
loadExpectedPlan("agg_composite_autodate_sort_agg_metric_not_push.yaml"),
1125+
loadExpectedPlan("agg_composite_autodate_sort_agg_measure_not_push.yaml"),
11261126
explainQueryYaml(
11271127
String.format(
11281128
"source=%s | bin timestamp bins=3 | stats bucket_nullable=false avg(value), count()"
@@ -1131,10 +1131,10 @@ public void testExplainCompositeAutoDateThenSortOnMetricsNotPushdown() throws IO
11311131
}
11321132

11331133
@Test
1134-
public void testExplainCompositeRangeAutoDateThenSortOnMetricsNotPushdown() throws IOException {
1134+
public void testExplainCompositeRangeAutoDateThenSortOnMeasureNotPushdown() throws IOException {
11351135
enabledOnlyWhenPushdownIsEnabled();
11361136
assertYamlEqualsIgnoreId(
1137-
loadExpectedPlan("agg_composite_autodate_range_metric_sort_agg_metric_not_push.yaml"),
1137+
loadExpectedPlan("agg_composite_autodate_range_metric_sort_agg_measure_not_push.yaml"),
11381138
explainQueryYaml(
11391139
String.format(
11401140
"source=%s | bin timestamp bins=3 | eval value_range = case(value < 7000, 'small'"
@@ -1144,16 +1144,16 @@ public void testExplainCompositeRangeAutoDateThenSortOnMetricsNotPushdown() thro
11441144
}
11451145

11461146
@Test
1147-
public void testExplainMultipleAggregatorsWithSortOnOneMetricNotPushDown() throws IOException {
1147+
public void testExplainMultipleAggregatorsWithSortOnOneMeasureNotPushDown() throws IOException {
11481148
enabledOnlyWhenPushdownIsEnabled();
11491149
String expected =
1150-
loadExpectedPlan("explain_multiple_agg_with_sort_on_one_metric_not_push1.yaml");
1150+
loadExpectedPlan("explain_multiple_agg_with_sort_on_one_measure_not_push1.yaml");
11511151
assertYamlEqualsIgnoreId(
11521152
expected,
11531153
explainQueryYaml(
11541154
"source=opensearch-sql_test_index_account | stats bucket_nullable=false count() as c,"
11551155
+ " sum(balance) as s by state | sort c"));
1156-
expected = loadExpectedPlan("explain_multiple_agg_with_sort_on_one_metric_not_push2.yaml");
1156+
expected = loadExpectedPlan("explain_multiple_agg_with_sort_on_one_measure_not_push2.yaml");
11571157
assertYamlEqualsIgnoreId(
11581158
expected,
11591159
explainQueryYaml(

integ-test/src/test/resources/expectedOutput/calcite/agg_composite_autodate_range_metric_sort_agg_metric_not_push.yaml renamed to integ-test/src/test/resources/expectedOutput/calcite/agg_composite_autodate_range_metric_sort_agg_measure_not_push.yaml

File renamed without changes.

integ-test/src/test/resources/expectedOutput/calcite/agg_composite_autodate_sort_agg_metric_not_push.yaml renamed to integ-test/src/test/resources/expectedOutput/calcite/agg_composite_autodate_sort_agg_measure_not_push.yaml

File renamed without changes.

integ-test/src/test/resources/expectedOutput/calcite/agg_composite_multi_terms_autodate_sort_agg_metric_not_push.yaml renamed to integ-test/src/test/resources/expectedOutput/calcite/agg_composite_multi_terms_autodate_sort_agg_measure_not_push.yaml

File renamed without changes.

integ-test/src/test/resources/expectedOutput/calcite/agg_composite_range_sort_agg_metric_not_push.yaml renamed to integ-test/src/test/resources/expectedOutput/calcite/agg_composite_range_sort_agg_measure_not_push.yaml

File renamed without changes.

integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_metrics1.yaml renamed to integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_measure1.yaml

File renamed without changes.

integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_metrics2.yaml renamed to integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_measure2.yaml

File renamed without changes.

integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_metrics3.yaml renamed to integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_measure3.yaml

File renamed without changes.

0 commit comments

Comments
 (0)