Skip to content

Commit 373b394

Browse files
authored
BucketAggretationParser should handle more non-composite bucket types (opensearch-project#4706)
* BucketAggretationParser should handle more non-composite bucket types Signed-off-by: Lantao Jin <ltjin@amazon.com> * support multi-terms parser Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix IT Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix typo Signed-off-by: Lantao Jin <ltjin@amazon.com> * Update javadoc Signed-off-by: Lantao Jin <ltjin@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent d1ffabd commit 373b394

5 files changed

Lines changed: 164 additions & 14 deletions

File tree

integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_metrics_multi_terms.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ calcite:
88
LogicalFilter(condition=[AND(IS NOT NULL($4), IS NOT NULL($7))])
99
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
1010
physical: |
11-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},count()=COUNT()), SORT_AGG_METRICS->[2 ASC FIRST], PROJECT->[count(), gender, state], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"multi_terms_buckets":{"multi_terms":{"terms":[{"field":"gender.keyword"},{"field":"state.keyword"}],"size":1000,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"_count":"desc"},{"_key":"asc"}]},"aggregations":{"count()":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
11+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},count()=COUNT()), SORT_AGG_METRICS->[2 ASC FIRST], PROJECT->[count(), gender, state], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"gender|state":{"multi_terms":{"terms":[{"field":"gender.keyword"},{"field":"state.keyword"}],"size":1000,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"_count":"desc"},{"_key":"asc"}]},"aggregations":{"count()":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/multi_terms_keyword.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@ calcite:
99
LogicalFilter(condition=[AND(>=($17, TIMESTAMP('2023-01-05 00:00:00':VARCHAR)), <($17, TIMESTAMP('2023-01-05 05:00:00':VARCHAR)))])
1010
CalciteLogicalIndexScan(table=[[OpenSearch, big5]])
1111
physical: |
12-
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[PROJECT->[process.name, cloud.region, @timestamp], FILTER->SEARCH($2, Sarg[['2023-01-05 00:00:00':VARCHAR..'2023-01-05 05:00:00':VARCHAR)]:VARCHAR), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},count()=COUNT()), SORT_AGG_METRICS->[2 DESC LAST], PROJECT->[count(), process.name, cloud.region], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"range":{"@timestamp":{"from":"2023-01-05T00:00:00.000Z","to":"2023-01-05T05:00:00.000Z","include_lower":true,"include_upper":false,"format":"date_time","boost":1.0}}},"_source":{"includes":["process.name","cloud.region","@timestamp"],"excludes":[]},"aggregations":{"multi_terms_buckets":{"multi_terms":{"terms":[{"field":"process.name"},{"field":"cloud.region"}],"size":10,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"_count":"desc"},{"_key":"asc"}]},"aggregations":{"count()":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
12+
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[PROJECT->[process.name, cloud.region, @timestamp], FILTER->SEARCH($2, Sarg[['2023-01-05 00:00:00':VARCHAR..'2023-01-05 05:00:00':VARCHAR)]:VARCHAR), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},count()=COUNT()), SORT_AGG_METRICS->[2 DESC LAST], PROJECT->[count(), process.name, cloud.region], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"range":{"@timestamp":{"from":"2023-01-05T00:00:00.000Z","to":"2023-01-05T05:00:00.000Z","include_lower":true,"include_upper":false,"format":"date_time","boost":1.0}}},"_source":{"includes":["process.name","cloud.region","@timestamp"],"excludes":[]},"aggregations":{"process.name|cloud.region":{"multi_terms":{"terms":[{"field":"process.name"},{"field":"cloud.region"}],"size":10,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"_count":"desc"},{"_key":"asc"}]},"aggregations":{"count()":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
setup:
2+
- do:
3+
indices.create:
4+
index: test
5+
body:
6+
mappings:
7+
properties:
8+
"dateV":
9+
type: date
10+
"intV":
11+
type: integer
12+
"boolV":
13+
type: boolean
14+
"stringV":
15+
type: keyword
16+
- do:
17+
bulk:
18+
index: test
19+
refresh: true
20+
body:
21+
- '{"index":{}}'
22+
- '{"dateV":"2023-10-08T10:00:00.000Z","intV":10,"boolV":true,"stringV":"hello"}'
23+
- do:
24+
query.settings:
25+
body:
26+
transient:
27+
plugins.calcite.enabled : true
28+
---
29+
teardown:
30+
- do:
31+
query.settings:
32+
body:
33+
transient:
34+
plugins.calcite.enabled : false
35+
36+
---
37+
"String bucket parser should work in non-composite aggregate":
38+
- skip:
39+
features:
40+
- headers
41+
- do:
42+
headers:
43+
Content-Type: 'application/json'
44+
ppl:
45+
body:
46+
query: source=test | stats bucket_nullable=false count() by stringV
47+
48+
- match: { total: 1 }
49+
- match: { datarows: [[1, "hello"]] }
50+
51+
---
52+
"Boolean bucket parser should work in non-composite aggregate":
53+
- skip:
54+
features:
55+
- headers
56+
- do:
57+
headers:
58+
Content-Type: 'application/json'
59+
ppl:
60+
body:
61+
query: source=test | stats bucket_nullable=false count() by boolV
62+
63+
- match: { total: 1 }
64+
- match: { datarows: [[1, true]] }
65+
66+
---
67+
"Integer bucket parser should work in non-composite aggregate":
68+
- skip:
69+
features:
70+
- headers
71+
- do:
72+
headers:
73+
Content-Type: 'application/json'
74+
ppl:
75+
body:
76+
query: source=test | stats bucket_nullable=false count() by intV
77+
78+
- match: { total: 1 }
79+
- match: { datarows: [[1, 10]] }
80+
81+
---
82+
"Date bucket parser should work in non-composite aggregate":
83+
- skip:
84+
features:
85+
- headers
86+
- do:
87+
headers:
88+
Content-Type: 'application/json'
89+
ppl:
90+
body:
91+
query: source=test | stats bucket_nullable=false count() by dateV
92+
93+
- match: { total: 1 }
94+
- match: { datarows: [[1, "2023-10-08 10:00:00"]] }
95+
96+
---
97+
"Data histogram bucket parser should work in non-composite aggregate":
98+
- skip:
99+
features:
100+
- headers
101+
- do:
102+
headers:
103+
Content-Type: 'application/json'
104+
ppl:
105+
body:
106+
query: source=test | stats bucket_nullable=false count() by span(dateV, 1d)
107+
108+
- match: { total: 1 }
109+
- match: { datarows: [[1, "2023-10-08 00:00:00"]] }
110+
111+
---
112+
"Histogram bucket parser should work in non-composite aggregate":
113+
- skip:
114+
features:
115+
- headers
116+
- do:
117+
headers:
118+
Content-Type: 'application/json'
119+
ppl:
120+
body:
121+
query: source=test | stats bucket_nullable=false count() by span(intV, 1)
122+
123+
- match: { total: 1 }
124+
- match: { datarows: [[1, 10]] }
125+
126+
---
127+
"Multi-terms bucket parser should work in non-composite aggregate":
128+
- skip:
129+
features:
130+
- headers
131+
- do:
132+
headers:
133+
Content-Type: 'application/json'
134+
ppl:
135+
body:
136+
query: source=test | stats bucket_nullable=false count() by stringV, intV
137+
138+
- match: { total: 1 }
139+
- match: { datarows: [[1, "hello", 10]] }

opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/BucketAggregationParser.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import java.util.Map;
1111
import java.util.Objects;
1212
import java.util.Optional;
13+
import java.util.stream.Collectors;
14+
import java.util.stream.IntStream;
1315
import lombok.EqualsAndHashCode;
1416
import lombok.Getter;
1517
import org.opensearch.search.SearchHits;
@@ -19,11 +21,11 @@
1921
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation;
2022
import org.opensearch.search.aggregations.bucket.histogram.InternalAutoDateHistogram;
2123
import org.opensearch.search.aggregations.bucket.range.Range;
22-
import org.opensearch.search.aggregations.bucket.terms.ParsedStringTerms;
24+
import org.opensearch.search.aggregations.bucket.terms.InternalMultiTerms;
2325

2426
/**
25-
* Use BucketAggregationParser only when there is a single group-by key, it returns multiple
26-
* buckets. {@link CompositeAggregationParser} is used for multiple group by keys
27+
* Use BucketAggregationParser for {@link MultiBucketsAggregation}, where it returns multiple
28+
* buckets.
2729
*/
2830
@EqualsAndHashCode
2931
public class BucketAggregationParser implements OpenSearchAggregationResponseParser {
@@ -118,21 +120,22 @@ public List<Map<String, Object>> parse(SearchHits hits) {
118120
* bucket's key.
119121
*
120122
* @param bucket the aggregation bucket to extract data from
121-
* @param name the field name to use for range buckets (ignored for composite buckets)
122-
* @return an Optional containing the extracted key-value pairs, or empty if bucket type is
123-
* unsupported
123+
* @param name the aggregation name
124+
* @return an Optional containing the extracted key-value pairs
124125
*/
125126
protected Optional<Map<String, Object>> extract(
126127
MultiBucketsAggregation.Bucket bucket, String name) {
127128
Map<String, Object> extracted;
128129
if (bucket instanceof CompositeAggregation.Bucket compositeBucket) {
129130
extracted = compositeBucket.getKey();
130-
} else if (bucket instanceof Range.Bucket
131-
|| bucket instanceof InternalAutoDateHistogram.Bucket
132-
|| bucket instanceof ParsedStringTerms.ParsedBucket) {
133-
extracted = Map.of(name, bucket.getKey());
131+
} else if (bucket instanceof InternalMultiTerms.Bucket) {
132+
List<String> keys = Arrays.asList(name.split("\\|"));
133+
extracted =
134+
IntStream.range(0, keys.size())
135+
.boxed()
136+
.collect(Collectors.toMap(keys::get, ((List<Object>) bucket.getKey())::get));
134137
} else {
135-
extracted = null;
138+
extracted = Map.of(name, bucket.getKey());
136139
}
137140
return Optional.ofNullable(extracted);
138141
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/AggPushDownAction.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.Collections;
1111
import java.util.List;
1212
import java.util.Map;
13+
import java.util.stream.Collectors;
1314
import lombok.EqualsAndHashCode;
1415
import lombok.Getter;
1516
import org.apache.calcite.rel.RelFieldCollation;
@@ -84,6 +85,13 @@ private BucketAggregationParser convertTo(OpenSearchAggregationResponseParser pa
8485
}
8586
}
8687

88+
private String multiTermsBucketNameAsString(CompositeAggregationBuilder composite) {
89+
return composite.sources().stream()
90+
.map(TermsValuesSourceBuilder.class::cast)
91+
.map(TermsValuesSourceBuilder::name)
92+
.collect(Collectors.joining("|")); // PIPE cannot be used in identifier
93+
}
94+
8795
public void pushDownSortAggMetrics(List<RelFieldCollation> collations, List<String> fieldNames) {
8896
if (aggregationBuilder.getLeft().isEmpty()) return;
8997
AggregationBuilder builder = aggregationBuilder.getLeft().getFirst();
@@ -152,7 +160,7 @@ public void pushDownSortAggMetrics(List<RelFieldCollation> collations, List<Stri
152160
src -> src instanceof TermsValuesSourceBuilder terms && !terms.missingBucket())) {
153161
// multi-term agg
154162
MultiTermsAggregationBuilder multiTermsBuilder =
155-
new MultiTermsAggregationBuilder("multi_terms_buckets");
163+
new MultiTermsAggregationBuilder(multiTermsBucketNameAsString(composite));
156164
multiTermsBuilder.size(composite.size());
157165
multiTermsBuilder.terms(
158166
composite.sources().stream()

0 commit comments

Comments
 (0)