Skip to content

Commit 7e71535

Browse files
committed
Fix doctest and IT
Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent 4ea6724 commit 7e71535

8 files changed

Lines changed: 45 additions & 14 deletions

File tree

core/src/main/java/org/opensearch/sql/exception/NonFallbackCalciteException.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,8 @@ public class NonFallbackCalciteException extends QueryEngineException {
1111
public NonFallbackCalciteException(String message) {
1212
super(message);
1313
}
14+
15+
public NonFallbackCalciteException(String message, Throwable cause) {
16+
super(message, cause);
17+
}
1418
}

integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ private Settings defaultSettings() {
113113
new ImmutableMap.Builder<Key, Object>()
114114
.put(Key.QUERY_SIZE_LIMIT, 200)
115115
.put(Key.QUERY_BUCKET_SIZE, 1000)
116+
.put(Key.SEARCH_MAX_BUCKETS, 65535)
116117
.put(Key.SQL_CURSOR_KEEP_ALIVE, TimeValue.timeValueMinutes(1))
117118
.put(Key.FIELD_TYPE_TOLERANCE, true)
118119
.put(Key.CALCITE_ENGINE_ENABLED, true)

integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ private Settings defaultSettings() {
167167
new ImmutableMap.Builder<Key, Object>()
168168
.put(Key.QUERY_SIZE_LIMIT, 200)
169169
.put(Key.QUERY_BUCKET_SIZE, 1000)
170+
.put(Key.SEARCH_MAX_BUCKETS, 65535)
170171
.put(Key.SQL_CURSOR_KEEP_ALIVE, TimeValue.timeValueMinutes(1))
171172
.put(Key.FIELD_TYPE_TOLERANCE, true)
172173
.build();

opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@ public class OpenSearchQueryRequest implements OpenSearchRequest {
7979

8080
private SearchResponse searchResponse = null;
8181

82-
@ToString.Exclude private Map<String, Object> afterKey = Collections.emptyMap();
82+
@ToString.Exclude private Map<String, Object> afterKey;
83+
84+
@EqualsAndHashCode.Exclude @ToString.Exclude private boolean afterKeyToReset = false;
8385

8486
@TestOnly
8587
static OpenSearchQueryRequest of(
@@ -201,13 +203,22 @@ private OpenSearchResponse search(Function<SearchRequest, SearchResponse> search
201203
new OpenSearchResponse(
202204
SearchHits.empty(), exprValueFactory, includes, isCountAggRequest());
203205
} else {
206+
// On first call: reset builder to clear any afterKey from other requests
204207
if (this.sourceBuilder.aggregations() != null) {
205208
this.sourceBuilder.aggregations().getAggregatorFactories().stream()
206209
.filter(b -> b instanceof CompositeAggregationBuilder)
207-
.forEach(c -> ((CompositeAggregationBuilder) c).aggregateAfter(afterKey));
208-
if (LOG.isDebugEnabled()) {
209-
LOG.debug(sourceBuilder);
210-
}
210+
.forEach(
211+
c -> {
212+
if (!afterKeyToReset) {
213+
// First call: reset to clear any previous afterKey from shared builder
214+
// Use null instead of empty map to avoid "[after] has 0 value(s)" error
215+
((CompositeAggregationBuilder) c).aggregateAfter(null);
216+
afterKeyToReset = true;
217+
}
218+
if (afterKey != null && !afterKey.isEmpty()) {
219+
((CompositeAggregationBuilder) c).aggregateAfter(afterKey);
220+
}
221+
});
211222
}
212223

213224
SearchRequest searchRequest =

opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,16 @@ public boolean isCompositeAggregationResponse() {
122122
&& (aggregations.asList().get(0) instanceof InternalComposite);
123123
}
124124

125+
/**
126+
* Get the size of composite aggregation bucket. Must be called after
127+
* isCompositeAggregationResponse() is true.
128+
*/
125129
public int getCompositeBucketSize() {
126130
if (isCompositeAggregationResponse()) {
127131
return ((InternalComposite) aggregations.asList().get(0)).getBuckets().size();
128132
}
129-
throw new RuntimeException("Should not go to here");
133+
assert false : "Should never call here";
134+
return -1;
130135
}
131136

132137
public boolean isCountResponse() {

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package org.opensearch.sql.opensearch.storage;
77

8+
import static org.opensearch.search.aggregations.MultiBucketConsumerService.DEFAULT_MAX_BUCKETS;
9+
810
import com.google.common.annotations.VisibleForTesting;
911
import java.util.HashMap;
1012
import java.util.LinkedHashMap;
@@ -199,9 +201,15 @@ public Integer getMaxResultWindow() {
199201
}
200202

201203
public Integer getQueryBucketSize() {
202-
return Math.min(
203-
settings.getSettingValue(Settings.Key.QUERY_BUCKET_SIZE),
204-
settings.getSettingValue(Settings.Key.SEARCH_MAX_BUCKETS));
204+
return Math.min(settings.getSettingValue(Settings.Key.QUERY_BUCKET_SIZE), getMaxBuckets());
205+
}
206+
207+
public Integer getMaxBuckets() {
208+
try {
209+
return settings.getSettingValue(Settings.Key.SEARCH_MAX_BUCKETS);
210+
} catch (Exception e) {
211+
return DEFAULT_MAX_BUCKETS;
212+
}
205213
}
206214

207215
/** TODO: Push down operations to index scan operator as much as possible in future. */

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/BackgroundSearchScanner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ private OpenSearchResponse getCurrentResponse(OpenSearchRequest request) {
123123
throw new NonFallbackCalciteException(
124124
"Failed to fetch data from the index: the background task failed or interrupted.\n"
125125
+ " Inner error: "
126-
+ e.getMessage());
126+
+ e.getMessage(),
127+
e);
127128
}
128129
} else {
129130
return client.search(request);

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/AggPushDownAction.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package org.opensearch.sql.opensearch.storage.scan.context;
77

8+
import static org.opensearch.search.aggregations.MultiBucketConsumerService.DEFAULT_MAX_BUCKETS;
9+
810
import java.util.ArrayList;
911
import java.util.Collection;
1012
import java.util.Collections;
@@ -43,8 +45,6 @@
4345
@Getter
4446
@EqualsAndHashCode
4547
public class AggPushDownAction implements OSRequestBuilderAction {
46-
private static final int MAX_BUCKET_SIZE = 65535;
47-
4848
private Pair<List<AggregationBuilder>, OpenSearchAggregationResponseParser> builderAndParser;
4949
private final Map<String, OpenSearchDataType> extendedTypeMapping;
5050
private final long scriptCount;
@@ -186,13 +186,13 @@ public void rePushDownRareTop(RareTopDigest digest) {
186186
for (int i = 0; i < composite.sources().size(); i++) {
187187
TermsValuesSourceBuilder terms = (TermsValuesSourceBuilder) composite.sources().get(i);
188188
if (i == 0) { // first
189-
aggregationBuilder = buildTermsAggregationBuilder(terms, null, MAX_BUCKET_SIZE);
189+
aggregationBuilder = buildTermsAggregationBuilder(terms, null, DEFAULT_MAX_BUCKETS);
190190
} else if (i == composite.sources().size() - 1) { // last
191191
aggregationBuilder.subAggregation(
192192
buildTermsAggregationBuilder(terms, bucketOrder, digest.number()));
193193
} else {
194194
aggregationBuilder.subAggregation(
195-
buildTermsAggregationBuilder(terms, null, MAX_BUCKET_SIZE));
195+
buildTermsAggregationBuilder(terms, null, DEFAULT_MAX_BUCKETS));
196196
}
197197
}
198198
} else {

0 commit comments

Comments
 (0)