Skip to content

Commit f65cac0

Browse files
committed
Support composite aggregation paginating in HAVING clause
Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent 885230f commit f65cac0

25 files changed

Lines changed: 472 additions & 75 deletions

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1271,6 +1271,30 @@ public void testExplainCountsByAgg() throws IOException {
12711271
TEST_INDEX_ACCOUNT)));
12721272
}
12731273

1274+
@Test
1275+
public void testHaving() throws IOException {
1276+
enabledOnlyWhenPushdownIsEnabled();
1277+
String expected = loadExpectedPlan("explain_agg_having1.yaml");
1278+
assertYamlEqualsIgnoreId(
1279+
expected,
1280+
explainQueryYaml(
1281+
"source=opensearch-sql_test_index_account | stats count() as c by"
1282+
+ " state | where c > 10"));
1283+
expected = loadExpectedPlan("explain_agg_having2.yaml");
1284+
assertYamlEqualsIgnoreId(
1285+
expected,
1286+
explainQueryYaml(
1287+
"source=opensearch-sql_test_index_account | stats bucket_nullable = false count() by"
1288+
+ " state | where `count()` > 10"));
1289+
expected = loadExpectedPlan("explain_agg_having3.yaml");
1290+
assertYamlEqualsIgnoreId(
1291+
expected,
1292+
explainQueryYaml(
1293+
"source=opensearch-sql_test_index_account | stats avg(balance) as avg, count() as cnt"
1294+
+ " by state | eval new_avg = avg + 1000, new_cnt = cnt + 1 | where new_avg > 1000"
1295+
+ " or new_cnt > 1"));
1296+
}
1297+
12741298
@Test
12751299
public void testExplainSortOnMeasure() throws IOException {
12761300
enabledOnlyWhenPushdownIsEnabled();

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStatsCommandIT.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,13 @@
55

66
package org.opensearch.sql.calcite.remote;
77

8+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT;
9+
import static org.opensearch.sql.util.MatcherUtils.rows;
10+
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
11+
12+
import java.io.IOException;
13+
import org.json.JSONObject;
14+
import org.junit.jupiter.api.Test;
815
import org.opensearch.sql.ppl.StatsCommandIT;
916

1017
public class CalciteStatsCommandIT extends StatsCommandIT {
@@ -14,4 +21,20 @@ public void init() throws Exception {
1421
enableCalcite();
1522
setQuerySizeLimit(2000);
1623
}
24+
25+
@Test
26+
public void testStatsHaving() throws IOException {
27+
try {
28+
setQueryBucketSize(2);
29+
JSONObject response =
30+
executeQuery(
31+
String.format(
32+
"source=%s | stats sum(balance) as a by state | where a > 780000",
33+
TEST_INDEX_ACCOUNT));
34+
System.out.println(response);
35+
verifyDataRows(response, rows(782199, "TX"));
36+
} finally {
37+
resetQueryBucketSize();
38+
}
39+
}
1740
}

integ-test/src/test/resources/expectedOutput/calcite/clickbench/q28.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@ calcite:
1414
EnumerableLimit(fetch=[25])
1515
EnumerableSort(sort0=[$0], dir0=[DESC-nulls-last])
1616
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[100000], expr#4=[>($t1, $t3)], proj#0..2=[{exprs}], $condition=[$t4])
17-
CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[PROJECT->[URL, CounterID], FILTER->AND(<>($0, ''), IS NOT NULL($1)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},l=AVG($1),c=COUNT()), PROJECT->[l, c, CounterID]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"bool":{"must":[{"exists":{"field":"URL","boost":1.0}}],"must_not":[{"term":{"URL":{"value":"","boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}},{"exists":{"field":"CounterID","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["URL","CounterID"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"CounterID":{"terms":{"field":"CounterID","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"l":{"avg":{"script":{"source":"{\"langType\":\"calcite\",\"script\":\"rO0ABXQA/3sKICAib3AiOiB7CiAgICAibmFtZSI6ICJDSEFSX0xFTkdUSCIsCiAgICAia2luZCI6ICJDSEFSX0xFTkdUSCIsCiAgICAic3ludGF4IjogIkZVTkNUSU9OIgogIH0sCiAgIm9wZXJhbmRzIjogWwogICAgewogICAgICAiZHluYW1pY1BhcmFtIjogMCwKICAgICAgInR5cGUiOiB7CiAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgIm51bGxhYmxlIjogdHJ1ZSwKICAgICAgICAicHJlY2lzaW9uIjogLTEKICAgICAgfQogICAgfQogIF0KfQ==\"}","lang":"opensearch_compounded_script","params":{"utcTimestamp": 0,"SOURCES":[0],"DIGESTS":["URL"]}}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
17+
CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[PROJECT->[URL, CounterID], FILTER->AND(<>($0, ''), IS NOT NULL($1)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},l=AVG($1),c=COUNT()), HAVING->>($1, 100000), PROJECT->[l, c, CounterID]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"bool":{"must":[{"exists":{"field":"URL","boost":1.0}}],"must_not":[{"term":{"URL":{"value":"","boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}},{"exists":{"field":"CounterID","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["URL","CounterID"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"CounterID":{"terms":{"field":"CounterID","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"l":{"avg":{"script":{"source":"{\"langType\":\"calcite\",\"script\":\"rO0ABXQA/3sKICAib3AiOiB7CiAgICAibmFtZSI6ICJDSEFSX0xFTkdUSCIsCiAgICAia2luZCI6ICJDSEFSX0xFTkdUSCIsCiAgICAic3ludGF4IjogIkZVTkNUSU9OIgogIH0sCiAgIm9wZXJhbmRzIjogWwogICAgewogICAgICAiZHluYW1pY1BhcmFtIjogMCwKICAgICAgInR5cGUiOiB7CiAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgIm51bGxhYmxlIjogdHJ1ZSwKICAgICAgICAicHJlY2lzaW9uIjogLTEKICAgICAgfQogICAgfQogIF0KfQ==\"}","lang":"opensearch_compounded_script","params":{"utcTimestamp": 0,"SOURCES":[0],"DIGESTS":["URL"]}}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0, paginatingAgg=true)])

integ-test/src/test/resources/expectedOutput/calcite/clickbench/q29.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@ calcite:
1515
EnumerableLimit(fetch=[25])
1616
EnumerableSort(sort0=[$0], dir0=[DESC-nulls-last])
1717
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[100000], expr#5=[>($t1, $t4)], proj#0..3=[{exprs}], $condition=[$t5])
18-
CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[PROJECT->[Referer], FILTER-><>($0, ''), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},l=AVG($2),c=COUNT(),min(Referer)=MIN($1)), PROJECT->[l, c, min(Referer), k]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"exists":{"field":"Referer","boost":1.0}}],"must_not":[{"term":{"Referer":{"value":"","boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["Referer"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"k":{"terms":{"script":{"source":"{\"langType\":\"calcite\",\"script\":\"rO0ABXQCGXsKICAib3AiOiB7CiAgICAibmFtZSI6ICJSRUdFWFBfUkVQTEFDRSIsCiAgICAia2luZCI6ICJPVEhFUl9GVU5DVElPTiIsCiAgICAic3ludGF4IjogIkZVTkNUSU9OIgogIH0sCiAgIm9wZXJhbmRzIjogWwogICAgewogICAgICAiZHluYW1pY1BhcmFtIjogMCwKICAgICAgInR5cGUiOiB7CiAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgIm51bGxhYmxlIjogdHJ1ZSwKICAgICAgICAicHJlY2lzaW9uIjogLTEKICAgICAgfQogICAgfSwKICAgIHsKICAgICAgImR5bmFtaWNQYXJhbSI6IDEsCiAgICAgICJ0eXBlIjogewogICAgICAgICJ0eXBlIjogIlZBUkNIQVIiLAogICAgICAgICJudWxsYWJsZSI6IGZhbHNlLAogICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICB9CiAgICB9LAogICAgewogICAgICAiZHluYW1pY1BhcmFtIjogMiwKICAgICAgInR5cGUiOiB7CiAgICAgICAgInR5cGUiOiAiQ0hBUiIsCiAgICAgICAgIm51bGxhYmxlIjogZmFsc2UsCiAgICAgICAgInByZWNpc2lvbiI6IDIKICAgICAgfQogICAgfQogIF0KfQ==\"}","lang":"opensearch_compounded_script","params":{"utcTimestamp": 0,"SOURCES":[0,2,2],"DIGESTS":["Referer","^https?://(?:www\\.)?([^/]+)/.*$","$1"]}},"missing_bucket":false,"order":"asc"}}}]},"aggregations":{"l":{"avg":{"script":{"source":"{\"langType\":\"calcite\",\"script\":\"rO0ABXQA/3sKICAib3AiOiB7CiAgICAibmFtZSI6ICJDSEFSX0xFTkdUSCIsCiAgICAia2luZCI6ICJDSEFSX0xFTkdUSCIsCiAgICAic3ludGF4IjogIkZVTkNUSU9OIgogIH0sCiAgIm9wZXJhbmRzIjogWwogICAgewogICAgICAiZHluYW1pY1BhcmFtIjogMCwKICAgICAgInR5cGUiOiB7CiAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgIm51bGxhYmxlIjogdHJ1ZSwKICAgICAgICAicHJlY2lzaW9uIjogLTEKICAgICAgfQogICAgfQogIF0KfQ==\"}","lang":"opensearch_compounded_script","params":{"utcTimestamp": 0,"SOURCES":[0],"DIGESTS":["Referer"]}}}},"min(Referer)":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"fields":[{"field":"Referer"}],"sort":[{"Referer":{"order":"asc"}}]}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
18+
CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[PROJECT->[Referer], FILTER-><>($0, ''), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},l=AVG($2),c=COUNT(),min(Referer)=MIN($1)), HAVING->>($1, 100000), PROJECT->[l, c, min(Referer), k]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"exists":{"field":"Referer","boost":1.0}}],"must_not":[{"term":{"Referer":{"value":"","boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["Referer"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"k":{"terms":{"script":{"source":"{\"langType\":\"calcite\",\"script\":\"rO0ABXQCGXsKICAib3AiOiB7CiAgICAibmFtZSI6ICJSRUdFWFBfUkVQTEFDRSIsCiAgICAia2luZCI6ICJPVEhFUl9GVU5DVElPTiIsCiAgICAic3ludGF4IjogIkZVTkNUSU9OIgogIH0sCiAgIm9wZXJhbmRzIjogWwogICAgewogICAgICAiZHluYW1pY1BhcmFtIjogMCwKICAgICAgInR5cGUiOiB7CiAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgIm51bGxhYmxlIjogdHJ1ZSwKICAgICAgICAicHJlY2lzaW9uIjogLTEKICAgICAgfQogICAgfSwKICAgIHsKICAgICAgImR5bmFtaWNQYXJhbSI6IDEsCiAgICAgICJ0eXBlIjogewogICAgICAgICJ0eXBlIjogIlZBUkNIQVIiLAogICAgICAgICJudWxsYWJsZSI6IGZhbHNlLAogICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICB9CiAgICB9LAogICAgewogICAgICAiZHluYW1pY1BhcmFtIjogMiwKICAgICAgInR5cGUiOiB7CiAgICAgICAgInR5cGUiOiAiQ0hBUiIsCiAgICAgICAgIm51bGxhYmxlIjogZmFsc2UsCiAgICAgICAgInByZWNpc2lvbiI6IDIKICAgICAgfQogICAgfQogIF0KfQ==\"}","lang":"opensearch_compounded_script","params":{"utcTimestamp": 0,"SOURCES":[0,2,2],"DIGESTS":["Referer","^https?://(?:www\\.)?([^/]+)/.*$","$1"]}},"missing_bucket":false,"order":"asc"}}}]},"aggregations":{"l":{"avg":{"script":{"source":"{\"langType\":\"calcite\",\"script\":\"rO0ABXQA/3sKICAib3AiOiB7CiAgICAibmFtZSI6ICJDSEFSX0xFTkdUSCIsCiAgICAia2luZCI6ICJDSEFSX0xFTkdUSCIsCiAgICAic3ludGF4IjogIkZVTkNUSU9OIgogIH0sCiAgIm9wZXJhbmRzIjogWwogICAgewogICAgICAiZHluYW1pY1BhcmFtIjogMCwKICAgICAgInR5cGUiOiB7CiAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgIm51bGxhYmxlIjogdHJ1ZSwKICAgICAgICAicHJlY2lzaW9uIjogLTEKICAgICAgfQogICAgfQogIF0KfQ==\"}","lang":"opensearch_compounded_script","params":{"utcTimestamp": 0,"SOURCES":[0],"DIGESTS":["Referer"]}}}},"min(Referer)":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"fields":[{"field":"Referer"}],"sort":[{"Referer":{"order":"asc"}}]}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0, paginatingAgg=true)])
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalFilter(condition=[>($0, 10)])
5+
LogicalProject(c=[$1], state=[$0])
6+
LogicalAggregate(group=[{0}], c=[COUNT()])
7+
LogicalProject(state=[$7])
8+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
9+
physical: |
10+
EnumerableLimit(fetch=[10000])
11+
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[10], expr#3=[>($t0, $t2)], proj#0..1=[{exprs}], $condition=[$t3])
12+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},c=COUNT()), HAVING->>($0, 10), PROJECT->[c, state]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"state":{"terms":{"field":"state.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0, paginatingAgg=true)])
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalFilter(condition=[>($0, 10)])
5+
LogicalProject(count()=[$1], state=[$0])
6+
LogicalAggregate(group=[{0}], count()=[COUNT()])
7+
LogicalProject(state=[$7])
8+
LogicalFilter(condition=[IS NOT NULL($7)])
9+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
10+
physical: |
11+
EnumerableLimit(fetch=[10000])
12+
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[10], expr#3=[>($t0, $t2)], proj#0..1=[{exprs}], $condition=[$t3])
13+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), HAVING->>($0, 10), PROJECT->[count(), state]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"state":{"terms":{"field":"state.keyword","missing_bucket":false,"order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0, paginatingAgg=true)])
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalFilter(condition=[OR(>($3, 1000), >($4, 1))])
5+
LogicalProject(avg=[$1], cnt=[$2], state=[$0], new_avg=[+($1, 1000)], new_cnt=[+($2, 1)])
6+
LogicalAggregate(group=[{0}], avg=[AVG($1)], cnt=[COUNT()])
7+
LogicalProject(state=[$7], balance=[$3])
8+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
9+
physical: |
10+
EnumerableLimit(fetch=[10000])
11+
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t1):DOUBLE], expr#5=[/($t4, $t2)], expr#6=[1000], expr#7=[+($t5, $t6)], expr#8=[1], expr#9=[+($t3, $t8)], expr#10=[>($t7, $t6)], expr#11=[>($t9, $t8)], expr#12=[OR($t10, $t11)], avg=[$t5], cnt=[$t3], state=[$t0], new_avg=[$t7], new_cnt=[$t9], $condition=[$t12])
12+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},agg#0=SUM($1),agg#1=COUNT($1),cnt=COUNT()), HAVING->OR(>(+($1, 1000), 1000), >(+($2, 1), 1))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"state":{"terms":{"field":"state.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"$f1":{"sum":{"field":"balance"}},"$f2":{"value_count":{"field":"balance"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0, paginatingAgg=true)])

integ-test/src/test/resources/expectedOutput/calcite/explain_output.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@ calcite:
1818
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[1], expr#3=[<=($t1, $t2)], proj#0..1=[{exprs}], $condition=[$t3])
1919
EnumerableWindow(window#0=[window(partition {0} order by [0] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
2020
EnumerableCalc(expr#0=[{inputs}], expr#1=[2], expr#2=[+($t0, $t1)], expr#3=[IS NOT NULL($t0)], age2=[$t2], $condition=[$t3])
21-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[city, state, age], FILTER->>($2, 30), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg_age=AVG($2)), SORT->[0 ASC FIRST], PROJECT->[avg_age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"range":{"age":{"from":30,"to":null,"include_lower":false,"include_upper":true,"boost":1.0}}},"_source":{"includes":["city","state","age"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"state":{"terms":{"field":"state.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"city":{"terms":{"field":"city.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"avg_age":{"avg":{"field":"age"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
21+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[city, state, age], FILTER->>($2, 30), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg_age=AVG($2)), PROJECT->[avg_age, state], SORT->[1 ASC FIRST], HAVING->IS NOT NULL($0), PROJECT->[avg_age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"range":{"age":{"from":30,"to":null,"include_lower":false,"include_upper":true,"boost":1.0}}},"_source":{"includes":["city","state","age"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"state":{"terms":{"field":"state.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"city":{"terms":{"field":"city.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"avg_age":{"avg":{"field":"age"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0, paginatingAgg=true)])
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.opensearch.planner.rules;
7+
8+
import java.util.function.Predicate;
9+
import org.apache.calcite.plan.RelOptRuleCall;
10+
import org.apache.calcite.plan.RelRule;
11+
import org.apache.calcite.rel.logical.LogicalFilter;
12+
import org.apache.calcite.rel.logical.LogicalProject;
13+
import org.immutables.value.Value;
14+
import org.opensearch.sql.calcite.plan.OpenSearchRuleConfig;
15+
import org.opensearch.sql.opensearch.storage.scan.AbstractCalciteIndexScan;
16+
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;
17+
18+
@Value.Enclosing
19+
public class HavingPushdownRule extends RelRule<HavingPushdownRule.Config> {
20+
protected HavingPushdownRule(HavingPushdownRule.Config config) {
21+
super(config);
22+
}
23+
24+
@Override
25+
public void onMatch(RelOptRuleCall call) {
26+
final LogicalFilter filter = call.rel(0);
27+
final LogicalProject project = call.rel(1);
28+
final CalciteLogicalIndexScan scan = call.rel(2);
29+
CalciteLogicalIndexScan newScan = scan.pushDownHavingFlag(filter);
30+
if (newScan != null) {
31+
call.transformTo(
32+
call.builder()
33+
.push(newScan)
34+
.project(project.getProjects())
35+
.filter(filter.getCondition())
36+
.build());
37+
}
38+
}
39+
40+
@Value.Immutable
41+
public interface Config extends OpenSearchRuleConfig {
42+
HavingPushdownRule.Config DEFAULT =
43+
ImmutableHavingPushdownRule.Config.builder()
44+
.build()
45+
.withDescription("Filter-Project-TableScan(AggPushed)")
46+
.withOperandSupplier(
47+
b0 ->
48+
b0.operand(LogicalFilter.class)
49+
.oneInput(
50+
b1 ->
51+
b1.operand(LogicalProject.class)
52+
.oneInput(
53+
b2 ->
54+
b2.operand(CalciteLogicalIndexScan.class)
55+
.predicate(
56+
Predicate.not(
57+
AbstractCalciteIndexScan
58+
::noAggregatePushed)
59+
.and(
60+
Predicate.not(
61+
AbstractCalciteIndexScan
62+
::isHavingFlagPushed)))
63+
.noInputs())));
64+
65+
@Override
66+
default HavingPushdownRule toRule() {
67+
return new HavingPushdownRule(this);
68+
}
69+
}
70+
}

0 commit comments

Comments
 (0)