Skip to content

Commit 45d9a28

Browse files
committed
Support push down sort on aggregation measure for more than one agg calls (opensearch-project#4759)
Signed-off-by: Lantao Jin <ltjin@amazon.com> (cherry picked from commit 3d548b9)
1 parent 8ead12a commit 45d9a28

31 files changed

Lines changed: 256 additions & 148 deletions
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.plan;
7+
8+
import org.apache.calcite.plan.Contexts;
9+
import org.apache.calcite.plan.RelRule;
10+
import org.apache.calcite.rel.core.RelFactories;
11+
import org.apache.calcite.tools.RelBuilderFactory;
12+
import org.immutables.value.Value;
13+
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;
14+
15+
public interface OpenSearchRuleConfig extends RelRule.Config {
16+
17+
/** Return a custom RelBuilderFactory for creating OpenSearchRelBuilder */
18+
@Override
19+
@Value.Default
20+
default RelBuilderFactory relBuilderFactory() {
21+
return CalciteToolsHelper.proto(Contexts.of(RelFactories.DEFAULT_STRUCT));
22+
}
23+
}

core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggGroupMergeRule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public void apply(RelOptRuleCall call, LogicalAggregate aggregate, LogicalProjec
105105

106106
/** Rule configuration. */
107107
@Value.Immutable
108-
public interface Config extends RelRule.Config {
108+
public interface Config extends OpenSearchRuleConfig {
109109
Config GROUP_MERGE =
110110
ImmutablePPLAggGroupMergeRule.Config.builder()
111111
.build()

core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ private RexNode aliasMaybe(RelBuilder builder, RexNode node, String alias) {
241241

242242
/** Rule configuration. */
243243
@Value.Immutable
244-
public interface Config extends RelRule.Config {
244+
public interface Config extends OpenSearchRuleConfig {
245245
Config SUM_CONVERTER =
246246
ImmutablePPLAggregateConvertRule.Config.builder()
247247
.build()

core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import org.apache.calcite.tools.FrameworkConfig;
8484
import org.apache.calcite.tools.Frameworks;
8585
import org.apache.calcite.tools.RelBuilder;
86+
import org.apache.calcite.tools.RelBuilderFactory;
8687
import org.apache.calcite.tools.RelRunner;
8788
import org.apache.calcite.util.Holder;
8889
import org.apache.calcite.util.Util;
@@ -127,6 +128,10 @@ public static Connection connect(FrameworkConfig config, JavaTypeFactory typeFac
127128
}
128129
}
129130

131+
public static RelBuilderFactory proto(final Context context) {
132+
return (cluster, schema) -> new OpenSearchRelBuilder(context, cluster, schema);
133+
}
134+
130135
/**
131136
* This method copied from {@link Frameworks#withPrepare(FrameworkConfig,
132137
* Frameworks.BasePrepareAction)}. The purpose is the method {@link

integ-test/src/test/java/org/opensearch/sql/calcite/clickbench/PPLClickBenchIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,12 @@ public void test() throws IOException {
7272
}
7373
logger.info("Running Query{}", i);
7474
String ppl = sanitize(loadFromFile("clickbench/queries/q" + i + ".ppl"));
75-
timing(summary, "q" + i, ppl);
7675
// V2 gets unstable scripts, ignore them when comparing plan
7776
if (isCalciteEnabled()) {
7877
String expected = loadExpectedPlan("clickbench/q" + i + ".yaml");
7978
assertYamlEqualsIgnoreId(expected, explainQueryYaml(ppl));
8079
}
80+
timing(summary, "q" + i, ppl);
8181
}
8282
}
8383
}

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1187,6 +1187,24 @@ public void testExplainSortOnMeasureMultiTermsWithScript() throws IOException {
11871187
+ " sort `count()`"));
11881188
}
11891189

1190+
@Test
1191+
public void testExplainSortOnMeasureComplex() throws IOException {
1192+
enabledOnlyWhenPushdownIsEnabled();
1193+
String expected = loadExpectedPlan("explain_agg_sort_on_measure_complex1.yaml");
1194+
assertYamlEqualsIgnoreId(
1195+
expected,
1196+
explainQueryYaml(
1197+
"source=opensearch-sql_test_index_account | stats bucket_nullable=false sum(balance),"
1198+
+ " count() as c, dc(employer) by state | sort - c"));
1199+
expected = loadExpectedPlan("explain_agg_sort_on_measure_complex2.yaml");
1200+
assertYamlEqualsIgnoreId(
1201+
expected,
1202+
explainQueryYaml(
1203+
"source=opensearch-sql_test_index_account | eval new_state = lower(state) | stats"
1204+
+ " bucket_nullable=false sum(balance), count(), dc(employer) as d by gender,"
1205+
+ " new_state | sort - d"));
1206+
}
1207+
11901208
@Test
11911209
public void testExplainCompositeMultiBucketsAutoDateThenSortOnMeasureNotPushdown()
11921210
throws IOException {
@@ -1239,15 +1257,15 @@ public void testExplainCompositeRangeAutoDateThenSortOnMeasureNotPushdown() thro
12391257
}
12401258

12411259
@Test
1242-
public void testExplainMultipleAggregatorsWithSortOnOneMeasureNotPushDown() throws IOException {
1260+
public void testExplainMultipleCollationsWithSortOnOneMeasureNotPushDown() throws IOException {
12431261
enabledOnlyWhenPushdownIsEnabled();
12441262
String expected =
12451263
loadExpectedPlan("explain_multiple_agg_with_sort_on_one_measure_not_push1.yaml");
12461264
assertYamlEqualsIgnoreId(
12471265
expected,
12481266
explainQueryYaml(
12491267
"source=opensearch-sql_test_index_account | stats bucket_nullable=false count() as c,"
1250-
+ " sum(balance) as s by state | sort c"));
1268+
+ " sum(balance) as s by state | sort c, state"));
12511269
expected = loadExpectedPlan("explain_multiple_agg_with_sort_on_one_measure_not_push2.yaml");
12521270
assertYamlEqualsIgnoreId(
12531271
expected,
@@ -1256,6 +1274,17 @@ public void testExplainMultipleAggregatorsWithSortOnOneMeasureNotPushDown() thro
12561274
+ " sum(balance) as s by state | sort c, s"));
12571275
}
12581276

1277+
@Test
1278+
public void testExplainSortOnMeasureMultiBucketsNotMultiTermsNotPushDown() throws IOException {
1279+
enabledOnlyWhenPushdownIsEnabled();
1280+
String expected = loadExpectedPlan("explain_agg_sort_on_measure_multi_buckets_not_pushed.yaml");
1281+
assertYamlEqualsIgnoreId(
1282+
expected,
1283+
explainQueryYaml(
1284+
"source=opensearch-sql_test_index_account | stats bucket_nullable=false count() as c,"
1285+
+ " sum(balance) as s by state, span(age, 5) | sort c"));
1286+
}
1287+
12591288
@Test
12601289
public void testExplainEvalMax() throws IOException {
12611290
String expected = loadExpectedPlan("explain_eval_max.json");

integ-test/src/test/java/org/opensearch/sql/ppl/StatsCommandIT.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1166,4 +1166,55 @@ public void testStatsSpanSortOnMeasureMultiTermsWithScript() throws IOException
11661166
resetQueryBucketSize();
11671167
}
11681168
}
1169+
1170+
@Test
1171+
public void testStatsSortOnMeasureComplex() throws IOException {
1172+
try {
1173+
setQueryBucketSize(5);
1174+
JSONObject response =
1175+
executeQuery(
1176+
String.format(
1177+
"source=%s | stats bucket_nullable=false sum(balance), count() as c, dc(employer)"
1178+
+ " as d by state | sort - c | head 5",
1179+
TEST_INDEX_ACCOUNT));
1180+
verifySchema(
1181+
response,
1182+
schema("sum(balance)", null, "bigint"),
1183+
schema("c", null, "bigint"),
1184+
schema("d", null, "bigint"),
1185+
schema("state", null, "string"));
1186+
System.out.println(response);
1187+
verifyDataRows(
1188+
response,
1189+
rows(782199, 30, 30, "TX"),
1190+
rows(732523, 28, 28, "MD"),
1191+
rows(657957, 27, 27, "ID"),
1192+
rows(541575, 25, 25, "ME"),
1193+
rows(643489, 25, 25, "AL"));
1194+
response =
1195+
executeQuery(
1196+
String.format(
1197+
"source=%s | eval new_state = lower(state) | stats bucket_nullable=false"
1198+
+ " sum(balance), count() as c, dc(employer) as d by gender, new_state | sort"
1199+
+ " - d | head 5",
1200+
TEST_INDEX_ACCOUNT));
1201+
verifySchema(
1202+
response,
1203+
schema("sum(balance)", null, "bigint"),
1204+
schema("c", null, "bigint"),
1205+
schema("d", null, "bigint"),
1206+
schema("gender", null, "string"),
1207+
schema("new_state", null, "string"));
1208+
System.out.println(response);
1209+
verifyDataRows(
1210+
response,
1211+
rows(484567, 18, 18, "M", "md"),
1212+
rows(376394, 17, 17, "M", "id"),
1213+
rows(505688, 17, 17, "F", "tx"),
1214+
rows(375409, 16, 16, "M", "me"),
1215+
rows(432776, 15, 15, "M", "ok"));
1216+
} finally {
1217+
resetQueryBucketSize();
1218+
}
1219+
}
11691220
}

integ-test/src/test/resources/expectedOutput/calcite/clickbench/q10.yaml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,4 @@ calcite:
88
LogicalFilter(condition=[IS NOT NULL($68)])
99
CalciteLogicalIndexScan(table=[[OpenSearch, hits]])
1010
physical: |
11-
EnumerableLimit(fetch=[10000])
12-
EnumerableLimit(fetch=[10])
13-
EnumerableSort(sort0=[$1], dir0=[DESC-nulls-last])
14-
CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={1},sum(AdvEngineID)=SUM($0),c=COUNT(),avg(ResolutionWidth)=AVG($2),dc(UserID)=COUNT(DISTINCT $3)), PROJECT->[sum(AdvEngineID), c, avg(ResolutionWidth), dc(UserID), RegionID]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"RegionID":{"terms":{"field":"RegionID","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"sum(AdvEngineID)":{"sum":{"field":"AdvEngineID"}},"avg(ResolutionWidth)":{"avg":{"field":"ResolutionWidth"}},"dc(UserID)":{"cardinality":{"field":"UserID"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
11+
CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={1},sum(AdvEngineID)=SUM($0),c=COUNT(),avg(ResolutionWidth)=AVG($2),dc(UserID)=COUNT(DISTINCT $3)), SORT_AGG_METRICS->[2 DESC LAST], PROJECT->[sum(AdvEngineID), c, avg(ResolutionWidth), dc(UserID), RegionID], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"RegionID":{"terms":{"field":"RegionID","size":10,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"c":"desc"},{"_key":"asc"}]},"aggregations":{"sum(AdvEngineID)":{"sum":{"field":"AdvEngineID"}},"avg(ResolutionWidth)":{"avg":{"field":"ResolutionWidth"}},"dc(UserID)":{"cardinality":{"field":"UserID"}},"c":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/clickbench/q23.yaml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,4 @@ calcite:
99
LogicalFilter(condition=[AND(ILIKE($97, '%Google%', '\'), <>($63, ''), NOT(ILIKE($26, '%.google.%', '\')))])
1010
CalciteLogicalIndexScan(table=[[OpenSearch, hits]])
1111
physical: |
12-
EnumerableLimit(fetch=[10000])
13-
EnumerableLimit(fetch=[10])
14-
EnumerableSort(sort0=[$0], dir0=[DESC-nulls-last])
15-
CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[PROJECT->[URL, SearchPhrase, UserID, Title], FILTER->AND(ILIKE($3, '%Google%', '\'), <>($1, ''), NOT(ILIKE($0, '%.google.%', '\'))), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},c=COUNT(),dc(UserID)=COUNT(DISTINCT $1)), PROJECT->[c, dc(UserID), SearchPhrase]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"wildcard":{"Title":{"wildcard":"*Google*","case_insensitive":true,"boost":1.0}}},{"bool":{"must":[{"exists":{"field":"SearchPhrase","boost":1.0}}],"must_not":[{"term":{"SearchPhrase":{"value":"","boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}},{"bool":{"must_not":[{"wildcard":{"URL":{"wildcard":"*.google.*","case_insensitive":true,"boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["URL","SearchPhrase","UserID","Title"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"SearchPhrase":{"terms":{"field":"SearchPhrase","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"dc(UserID)":{"cardinality":{"field":"UserID"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
12+
CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[PROJECT->[URL, SearchPhrase, UserID, Title], FILTER->AND(ILIKE($3, '%Google%', '\'), <>($1, ''), NOT(ILIKE($0, '%.google.%', '\'))), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},c=COUNT(),dc(UserID)=COUNT(DISTINCT $1)), SORT_AGG_METRICS->[1 DESC LAST], PROJECT->[c, dc(UserID), SearchPhrase], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"wildcard":{"Title":{"wildcard":"*Google*","case_insensitive":true,"boost":1.0}}},{"bool":{"must":[{"exists":{"field":"SearchPhrase","boost":1.0}}],"must_not":[{"term":{"SearchPhrase":{"value":"","boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}},{"bool":{"must_not":[{"wildcard":{"URL":{"wildcard":"*.google.*","case_insensitive":true,"boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["URL","SearchPhrase","UserID","Title"],"excludes":[]},"aggregations":{"SearchPhrase":{"terms":{"field":"SearchPhrase","size":10,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"c":"desc"},{"_key":"asc"}]},"aggregations":{"dc(UserID)":{"cardinality":{"field":"UserID"}},"c":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/clickbench/q31.yaml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,4 @@ calcite:
99
LogicalFilter(condition=[<>($63, '')])
1010
CalciteLogicalIndexScan(table=[[OpenSearch, hits]])
1111
physical: |
12-
EnumerableLimit(fetch=[10000])
13-
EnumerableLimit(fetch=[10])
14-
EnumerableSort(sort0=[$0], dir0=[DESC-nulls-last])
15-
CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[PROJECT->[SearchPhrase, SearchEngineID, IsRefresh, ClientIP, ResolutionWidth], FILTER->AND(<>($0, ''), IS NOT NULL($1), IS NOT NULL($3)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},c=COUNT(),sum(IsRefresh)=SUM($2),avg(ResolutionWidth)=AVG($3)), PROJECT->[c, sum(IsRefresh), avg(ResolutionWidth), SearchEngineID, ClientIP]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"bool":{"must":[{"exists":{"field":"SearchPhrase","boost":1.0}}],"must_not":[{"term":{"SearchPhrase":{"value":"","boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}},{"exists":{"field":"SearchEngineID","boost":1.0}},{"exists":{"field":"ClientIP","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["SearchPhrase","SearchEngineID","IsRefresh","ClientIP","ResolutionWidth"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"SearchEngineID":{"terms":{"field":"SearchEngineID","missing_bucket":false,"order":"asc"}}},{"ClientIP":{"terms":{"field":"ClientIP","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"sum(IsRefresh)":{"sum":{"field":"IsRefresh"}},"avg(ResolutionWidth)":{"avg":{"field":"ResolutionWidth"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
12+
CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[PROJECT->[SearchPhrase, SearchEngineID, IsRefresh, ClientIP, ResolutionWidth], FILTER->AND(<>($0, ''), IS NOT NULL($1), IS NOT NULL($3)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},c=COUNT(),sum(IsRefresh)=SUM($2),avg(ResolutionWidth)=AVG($3)), SORT_AGG_METRICS->[2 DESC LAST], PROJECT->[c, sum(IsRefresh), avg(ResolutionWidth), SearchEngineID, ClientIP], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"bool":{"must":[{"exists":{"field":"SearchPhrase","boost":1.0}}],"must_not":[{"term":{"SearchPhrase":{"value":"","boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}},{"exists":{"field":"SearchEngineID","boost":1.0}},{"exists":{"field":"ClientIP","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["SearchPhrase","SearchEngineID","IsRefresh","ClientIP","ResolutionWidth"],"excludes":[]},"aggregations":{"SearchEngineID|ClientIP":{"multi_terms":{"terms":[{"field":"SearchEngineID"},{"field":"ClientIP"}],"size":10,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"c":"desc"},{"_key":"asc"}]},"aggregations":{"sum(IsRefresh)":{"sum":{"field":"IsRefresh"}},"avg(ResolutionWidth)":{"avg":{"field":"ResolutionWidth"}},"c":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

0 commit comments

Comments
 (0)