Skip to content

Commit 08be6f9

Browse files
authored
Support enumerable TopK (#4993)
* Support enumerable TopK Signed-off-by: Lantao Jin <ltjin@amazon.com> * Support merging ELimit+ESort to ETopK Signed-off-by: Lantao Jin <ltjin@amazon.com> * remove the converter rule Signed-off-by: Lantao Jin <ltjin@amazon.com> * address commments Signed-off-by: Lantao Jin <ltjin@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent a7f238e commit 08be6f9

39 files changed

Lines changed: 334 additions & 217 deletions

File tree

integ-test/src/test/resources/expectedOutput/calcite/agg_composite_autodate_range_metric_sort_agg_measure_not_push.yaml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,5 @@ calcite:
99
LogicalProject(@timestamp=[$0], category=[$1], value=[$2], _id=[$4], _index=[$5], _score=[$6], _maxscore=[$7], _sort=[$8], _routing=[$9], timestamp=[WIDTH_BUCKET($3, 3, -(MAX($3) OVER (), MIN($3) OVER ()), MAX($3) OVER ())], value_range=[CASE(<($2, 7000), 'small':VARCHAR, 'great':VARCHAR)])
1010
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]])
1111
physical: |
12-
EnumerableLimit(fetch=[10000])
13-
EnumerableSort(sort0=[$1], dir0=[ASC-nulls-first])
14-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1, 2},avg(value)=AVG($3),cnt=COUNT()), PROJECT->[avg(value), cnt, timestamp, value_range, category]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"timestamp":{"auto_date_histogram":{"field":"timestamp","buckets":3,"minimum_interval":null},"aggregations":{"value_range":{"range":{"field":"value","ranges":[{"key":"small","to":7000.0},{"key":"great","from":7000.0}],"keyed":true},"aggregations":{"avg(value)":{"avg":{"field":"value"}}}}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
12+
CalciteEnumerableTopK(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10000])
13+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1, 2},avg(value)=AVG($3),cnt=COUNT()), PROJECT->[avg(value), cnt, timestamp, value_range, category]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"timestamp":{"auto_date_histogram":{"field":"timestamp","buckets":3,"minimum_interval":null},"aggregations":{"value_range":{"range":{"field":"value","ranges":[{"key":"small","to":7000.0},{"key":"great","from":7000.0}],"keyed":true},"aggregations":{"avg(value)":{"avg":{"field":"value"}}}}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/agg_composite_autodate_sort_agg_measure_not_push.yaml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,5 @@ calcite:
99
LogicalProject(@timestamp=[$0], category=[$1], value=[$2], _id=[$4], _index=[$5], _score=[$6], _maxscore=[$7], _sort=[$8], _routing=[$9], timestamp=[WIDTH_BUCKET($3, 3, -(MAX($3) OVER (), MIN($3) OVER ()), MAX($3) OVER ())])
1010
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]])
1111
physical: |
12-
EnumerableLimit(fetch=[10000])
13-
EnumerableSort(sort0=[$1], dir0=[ASC-nulls-first])
14-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg(value)=AVG($2),cnt=COUNT()), PROJECT->[avg(value), cnt, timestamp, category]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"timestamp":{"auto_date_histogram":{"field":"timestamp","buckets":3,"minimum_interval":null},"aggregations":{"avg(value)":{"avg":{"field":"value"}}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
12+
CalciteEnumerableTopK(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10000])
13+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg(value)=AVG($2),cnt=COUNT()), PROJECT->[avg(value), cnt, timestamp, category]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"timestamp":{"auto_date_histogram":{"field":"timestamp","buckets":3,"minimum_interval":null},"aggregations":{"avg(value)":{"avg":{"field":"value"}}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/agg_composite_multi_terms_autodate_sort_agg_measure_not_push.yaml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ calcite:
99
LogicalProject(@timestamp=[$0], category=[$1], value=[$2], _id=[$4], _index=[$5], _score=[$6], _maxscore=[$7], _sort=[$8], _routing=[$9], timestamp=[WIDTH_BUCKET($3, 3, -(MAX($3) OVER (), MIN($3) OVER ()), MAX($3) OVER ())])
1010
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]])
1111
physical: |
12-
EnumerableLimit(fetch=[10000])
13-
EnumerableSort(sort0=[$1], dir0=[ASC-nulls-first])
14-
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t1):DOUBLE], avg(value)=[$t4], cnt=[$t3], category=[$t0], value=[$t1], timestamp=[$t2])
15-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1, 2},cnt=COUNT())], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":false,"order":"asc"}}},{"value":{"terms":{"field":"value","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"timestamp":{"auto_date_histogram":{"field":"timestamp","buckets":3,"minimum_interval":null}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
12+
CalciteEnumerableTopK(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10000])
13+
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t1):DOUBLE], avg(value)=[$t4], cnt=[$t3], category=[$t0], value=[$t1], timestamp=[$t2])
14+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1, 2},cnt=COUNT())], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":false,"order":"asc"}}},{"value":{"terms":{"field":"value","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"timestamp":{"auto_date_histogram":{"field":"timestamp","buckets":3,"minimum_interval":null}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/agg_composite_range_sort_agg_measure_not_push.yaml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,5 @@ calcite:
99
LogicalProject(@timestamp=[$0], category=[$1], value=[$2], timestamp=[$3], _id=[$4], _index=[$5], _score=[$6], _maxscore=[$7], _sort=[$8], _routing=[$9], value_range=[CASE(<($2, 7000), 'small':VARCHAR, 'great':VARCHAR)])
1010
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]])
1111
physical: |
12-
EnumerableLimit(fetch=[10000])
13-
EnumerableSort(sort0=[$1], dir0=[ASC-nulls-first])
14-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg(value)=AVG($2),cnt=COUNT()), PROJECT->[avg(value), cnt, value_range, category]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"value_range":{"range":{"field":"value","ranges":[{"key":"small","to":7000.0},{"key":"great","from":7000.0}],"keyed":true},"aggregations":{"avg(value)":{"avg":{"field":"value"}}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
12+
CalciteEnumerableTopK(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10000])
13+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg(value)=AVG($2),cnt=COUNT()), PROJECT->[avg(value), cnt, value_range, category]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"value_range":{"range":{"field":"value","ranges":[{"key":"small","to":7000.0},{"key":"great","from":7000.0}],"keyed":true},"aggregations":{"avg(value)":{"avg":{"field":"value"}}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/big5/dedup_metrics_size_field.yaml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,5 @@ calcite:
99
LogicalFilter(condition=[IS NOT NULL($28)])
1010
CalciteLogicalIndexScan(table=[[OpenSearch, big5]])
1111
physical: |
12-
EnumerableLimit(fetch=[10000])
13-
EnumerableSort(sort0=[$7], dir0=[DESC-nulls-last])
14-
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=LogicalProject#,group={0},agg#0=LITERAL_AGG(1))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"metrics.size":{"terms":{"field":"metrics.size","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"$f1":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"_source":{"includes":["metrics.size","agent","process","log","message","tags","cloud","input","@timestamp","ecs","data_stream","meta","host","metrics","aws","event"],"excludes":[]},"script_fields":{}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
12+
CalciteEnumerableTopK(sort0=[$7], dir0=[DESC-nulls-last], fetch=[10000])
13+
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=LogicalProject#,group={0},agg#0=LITERAL_AGG(1))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"metrics.size":{"terms":{"field":"metrics.size","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"$f1":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"_source":{"includes":["metrics.size","agent","process","log","message","tags","cloud","input","@timestamp","ecs","data_stream","meta","host","metrics","aws","event"],"excludes":[]},"script_fields":{}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/chart_multiple_group_keys.yaml

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,17 @@ calcite:
1919
LogicalFilter(condition=[AND(IS NOT NULL($4), IS NOT NULL($7))])
2020
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])
2121
physical: |
22-
EnumerableLimit(fetch=[10000])
23-
EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])
24-
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], proj#0..1=[{exprs}], avg(balance)=[$t8])
25-
EnumerableAggregate(group=[{0, 1}], agg#0=[$SUM0($2)], agg#1=[COUNT($2)])
26-
EnumerableCalc(expr#0..4=[{inputs}], expr#5=[IS NULL($t1)], expr#6=['NULL'], expr#7=[10], expr#8=[<=($t4, $t7)], expr#9=['OTHER'], expr#10=[CASE($t5, $t6, $t8, $t1, $t9)], gender=[$t0], age=[$t10], avg(balance)=[$t2])
27-
EnumerableMergeJoin(condition=[=($1, $3)], joinType=[left])
28-
EnumerableSort(sort0=[$1], dir0=[ASC])
29-
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[SAFE_CAST($t1)], gender=[$t0], age=[$t3], avg(balance)=[$t2])
30-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[FILTER->AND(IS NOT NULL($0), IS NOT NULL($1)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 2},avg(balance)=AVG($1))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"exists":{"field":"gender","boost":1.0}},{"exists":{"field":"balance","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"age":{"terms":{"field":"age","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"avg(balance)":{"avg":{"field":"balance"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
31-
EnumerableSort(sort0=[$0], dir0=[ASC])
32-
EnumerableCalc(expr#0..2=[{inputs}], age=[$t0], $1=[$t2])
33-
EnumerableWindow(window#0=[window(order by [1 DESC-nulls-last] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
34-
EnumerableAggregate(group=[{0}], __grand_total__=[SUM($1)])
35-
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[SAFE_CAST($t0)], expr#3=[IS NOT NULL($t2)], age=[$t2], avg(balance)=[$t1], $condition=[$t3])
36-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[FILTER->AND(IS NOT NULL($0), IS NOT NULL($1)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 2},avg(balance)=AVG($1)), PROJECT->[age, avg(balance)]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"exists":{"field":"gender","boost":1.0}},{"exists":{"field":"balance","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"age":{"terms":{"field":"age","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"avg(balance)":{"avg":{"field":"balance"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
22+
CalciteEnumerableTopK(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC], fetch=[10000])
23+
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], proj#0..1=[{exprs}], avg(balance)=[$t8])
24+
EnumerableAggregate(group=[{0, 1}], agg#0=[$SUM0($2)], agg#1=[COUNT($2)])
25+
EnumerableCalc(expr#0..4=[{inputs}], expr#5=[IS NULL($t1)], expr#6=['NULL'], expr#7=[10], expr#8=[<=($t4, $t7)], expr#9=['OTHER'], expr#10=[CASE($t5, $t6, $t8, $t1, $t9)], gender=[$t0], age=[$t10], avg(balance)=[$t2])
26+
EnumerableMergeJoin(condition=[=($1, $3)], joinType=[left])
27+
EnumerableSort(sort0=[$1], dir0=[ASC])
28+
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[SAFE_CAST($t1)], gender=[$t0], age=[$t3], avg(balance)=[$t2])
29+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[FILTER->AND(IS NOT NULL($0), IS NOT NULL($1)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 2},avg(balance)=AVG($1))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"exists":{"field":"gender","boost":1.0}},{"exists":{"field":"balance","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"age":{"terms":{"field":"age","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"avg(balance)":{"avg":{"field":"balance"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
30+
EnumerableSort(sort0=[$0], dir0=[ASC])
31+
EnumerableCalc(expr#0..2=[{inputs}], age=[$t0], $1=[$t2])
32+
EnumerableWindow(window#0=[window(order by [1 DESC-nulls-last] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
33+
EnumerableAggregate(group=[{0}], __grand_total__=[SUM($1)])
34+
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[SAFE_CAST($t0)], expr#3=[IS NOT NULL($t2)], age=[$t2], avg(balance)=[$t1], $condition=[$t3])
35+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[FILTER->AND(IS NOT NULL($0), IS NOT NULL($1)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 2},avg(balance)=AVG($1)), PROJECT->[age, avg(balance)]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"exists":{"field":"gender","boost":1.0}},{"exists":{"field":"balance","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"age":{"terms":{"field":"age","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"avg(balance)":{"avg":{"field":"balance"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

0 commit comments

Comments
 (0)