Skip to content

Commit 2e324a8

Browse files
committed
revert changes in OpenSearchIndexScan
Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent 086e2b6 commit 2e324a8

5 files changed

Lines changed: 77 additions & 48 deletions

File tree

opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public class OpenSearchQueryRequest implements OpenSearchRequest {
8181

8282
@ToString.Exclude private Map<String, Object> afterKey;
8383

84-
@EqualsAndHashCode.Exclude @ToString.Exclude private boolean afterKeyToReset = false;
84+
@ToString.Exclude private boolean calciteEnabled;
8585

8686
@TestOnly
8787
static OpenSearchQueryRequest of(
@@ -108,7 +108,8 @@ public static OpenSearchQueryRequest of(
108108
SearchSourceBuilder sourceBuilder,
109109
OpenSearchExprValueFactory factory,
110110
List<String> includes) {
111-
return new OpenSearchQueryRequest(indexName, sourceBuilder, factory, includes, null, null);
111+
return new OpenSearchQueryRequest(
112+
indexName, sourceBuilder, factory, includes, null, null, false);
112113
}
113114

114115
/** Build an OpenSearchQueryRequest with PIT support. */
@@ -118,9 +119,10 @@ public static OpenSearchQueryRequest pitOf(
118119
OpenSearchExprValueFactory factory,
119120
List<String> includes,
120121
TimeValue cursorKeepAlive,
121-
String pitId) {
122+
String pitId,
123+
boolean calciteEnabled) {
122124
return new OpenSearchQueryRequest(
123-
indexName, sourceBuilder, factory, includes, cursorKeepAlive, pitId);
125+
indexName, sourceBuilder, factory, includes, cursorKeepAlive, pitId, calciteEnabled);
124126
}
125127

126128
/** Do not new it directly, use of() and pitOf() instead. */
@@ -130,13 +132,15 @@ public static OpenSearchQueryRequest pitOf(
130132
OpenSearchExprValueFactory factory,
131133
List<String> includes,
132134
TimeValue cursorKeepAlive,
133-
String pitId) {
135+
String pitId,
136+
boolean calciteEnabled) {
134137
this.indexName = indexName;
135138
this.sourceBuilder = sourceBuilder;
136139
this.exprValueFactory = factory;
137140
this.includes = includes;
138141
this.cursorKeepAlive = cursorKeepAlive;
139142
this.pitId = pitId;
143+
this.calciteEnabled = calciteEnabled;
140144
}
141145

142146
/**
@@ -181,7 +185,7 @@ public OpenSearchResponse search(
181185
Function<SearchRequest, SearchResponse> searchAction,
182186
Function<SearchScrollRequest, SearchResponse> scrollAction) {
183187
if (this.pitId == null) {
184-
return search(searchAction);
188+
return calciteEnabled ? search(searchAction) : searchForV2(searchAction);
185189
} else {
186190
// Search with PIT instead of scroll API
187191
return searchWithPIT(searchAction);
@@ -196,29 +200,41 @@ public boolean isCountAggRequest() {
196200
&& sourceBuilder.trackTotalHitsUpTo() == Integer.MAX_VALUE;
197201
}
198202

203+
/** Call the old search logic for v2, since we don't support paginating aggregate in v2. */
204+
@Deprecated
205+
private OpenSearchResponse searchForV2(Function<SearchRequest, SearchResponse> searchAction) {
206+
// When SearchRequest doesn't contain PitId, fetch single page request
207+
if (needClean) {
208+
return new OpenSearchResponse(
209+
SearchHits.empty(), exprValueFactory, includes, isCountAggRequest());
210+
} else {
211+
// get the value before set needClean = true
212+
boolean isCountAggRequest = isCountAggRequest();
213+
needClean = true;
214+
return new OpenSearchResponse(
215+
searchAction.apply(
216+
new SearchRequest().indices(indexName.getIndexNames()).source(sourceBuilder)),
217+
exprValueFactory,
218+
includes,
219+
isCountAggRequest);
220+
}
221+
}
222+
199223
private OpenSearchResponse search(Function<SearchRequest, SearchResponse> searchAction) {
200224
OpenSearchResponse openSearchResponse;
201225
if (needClean) {
202226
openSearchResponse =
203227
new OpenSearchResponse(
204228
SearchHits.empty(), exprValueFactory, includes, isCountAggRequest());
205229
} else {
206-
// On first call: reset builder to clear any afterKey from other requests
230+
// Set afterKey to request
207231
if (this.sourceBuilder.aggregations() != null) {
208232
this.sourceBuilder.aggregations().getAggregatorFactories().stream()
209233
.filter(b -> b instanceof CompositeAggregationBuilder)
210-
.forEach(
211-
c -> {
212-
if (!afterKeyToReset) {
213-
// First call: reset to clear any previous afterKey from shared builder
214-
// Use null instead of empty map to avoid "[after] has 0 value(s)" error
215-
((CompositeAggregationBuilder) c).aggregateAfter(null);
216-
afterKeyToReset = true;
217-
}
218-
if (afterKey != null && !afterKey.isEmpty()) {
219-
((CompositeAggregationBuilder) c).aggregateAfter(afterKey);
220-
}
221-
});
234+
.forEach(c -> ((CompositeAggregationBuilder) c).aggregateAfter(afterKey));
235+
if (LOG.isDebugEnabled()) {
236+
LOG.debug(sourceBuilder);
237+
}
222238
}
223239

224240
SearchRequest searchRequest =
@@ -230,11 +246,11 @@ private OpenSearchResponse search(Function<SearchRequest, SearchResponse> search
230246
this.searchResponse, exprValueFactory, includes, isCountAggRequest());
231247

232248
needClean = openSearchResponse.isEmpty();
233-
if (openSearchResponse.isCompositeAggregationResponse()) {
234-
InternalComposite compositeAgg =
235-
(InternalComposite) this.searchResponse.getAggregations().asList().get(0);
236-
// Update afterKey from response
237-
afterKey = compositeAgg.afterKey();
249+
// Get afterKey from response
250+
if (openSearchResponse.isAggregationResponse()) {
251+
openSearchResponse.getAggregations().asList().stream()
252+
.filter(b -> b instanceof InternalComposite)
253+
.forEach(c -> afterKey = ((InternalComposite) c).afterKey());
238254
}
239255
}
240256
return openSearchResponse;

opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,13 +133,18 @@ private OpenSearchRequest buildRequestWithPit(
133133
// Search with PIT request
134134
String pitId = createPit(indexName, cursorKeepAlive, client);
135135
return OpenSearchQueryRequest.pitOf(
136-
indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, pitId);
136+
indexName,
137+
sourceBuilder,
138+
exprValueFactory,
139+
includes,
140+
cursorKeepAlive,
141+
pitId,
142+
isCalciteEnabled());
137143
} else {
138144
sourceBuilder.from(startFrom);
139145
sourceBuilder.size(size);
140146
// Search with non-Pit request
141-
return OpenSearchQueryRequest.pitOf(
142-
indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, null);
147+
return OpenSearchQueryRequest.of(indexName, sourceBuilder, exprValueFactory, includes);
143148
}
144149
} else {
145150
if (startFrom != 0) {
@@ -149,7 +154,13 @@ private OpenSearchRequest buildRequestWithPit(
149154
// Search with PIT request
150155
String pitId = createPit(indexName, cursorKeepAlive, client);
151156
return OpenSearchQueryRequest.pitOf(
152-
indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, pitId);
157+
indexName,
158+
sourceBuilder,
159+
exprValueFactory,
160+
includes,
161+
cursorKeepAlive,
162+
pitId,
163+
isCalciteEnabled());
153164
}
154165
}
155166

@@ -451,4 +462,10 @@ private Supplier<NestedQueryBuilder> createEmptyNestedQuery(String path) {
451462
private BoolQueryBuilder query() {
452463
return (BoolQueryBuilder) sourceBuilder.query();
453464
}
465+
466+
private boolean isCalciteEnabled() {
467+
return settings == null
468+
|| settings.getSettingValue(Settings.Key.CALCITE_ENGINE_ENABLED) == null
469+
|| Boolean.TRUE.equals(settings.getSettingValue(Settings.Key.CALCITE_ENGINE_ENABLED));
470+
}
454471
}

opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Map;
2121
import java.util.stream.Collectors;
2222
import lombok.EqualsAndHashCode;
23+
import lombok.Getter;
2324
import lombok.ToString;
2425
import org.jetbrains.annotations.TestOnly;
2526
import org.opensearch.action.search.SearchResponse;
@@ -47,7 +48,7 @@ public class OpenSearchResponse implements Iterable<ExprValue> {
4748
private final SearchHits hits;
4849

4950
/** Search aggregation result. */
50-
private final Aggregations aggregations;
51+
@Getter private final Aggregations aggregations;
5152

5253
/** List of requested include fields. */
5354
private final List<String> includes;

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,6 @@ public TableScanBuilder createScanBuilder() {
228228
new OpenSearchIndexScan(
229229
client,
230230
requestBuilder.getMaxResponseSize(),
231-
requestBuilder.getMaxResultWindow(),
232-
this.getQueryBucketSize(),
233231
requestBuilder.build(
234232
indexName, cursorKeepAlive, client, cachedFieldOpenSearchTypes.isEmpty()));
235233
return new OpenSearchIndexScanBuilder(builder, createScanOperator);

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.opensearch.sql.opensearch.client.OpenSearchClient;
2323
import org.opensearch.sql.opensearch.request.OpenSearchQueryRequest;
2424
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
25+
import org.opensearch.sql.opensearch.response.OpenSearchResponse;
2526
import org.opensearch.sql.opensearch.storage.OpenSearchStorageEngine;
2627
import org.opensearch.sql.planner.SerializablePlan;
2728
import org.opensearch.sql.storage.TableScanOperator;
@@ -43,33 +44,28 @@ public class OpenSearchIndexScan extends TableScanOperator implements Serializab
4344
/** Number of rows returned. */
4445
private Integer queryCount;
4546

47+
/** Search response for current batch. */
4648
private Iterator<ExprValue> iterator;
4749

48-
private BackgroundSearchScanner bgScanner;
49-
5050
/** Creates index scan based on a provided OpenSearchRequestBuilder. */
5151
public OpenSearchIndexScan(
52-
OpenSearchClient client,
53-
int maxResponseSize,
54-
int maxResultWindow,
55-
int queryBucketSize,
56-
OpenSearchRequest request) {
52+
OpenSearchClient client, int maxResponseSize, OpenSearchRequest request) {
5753
this.maxResponseSize = maxResponseSize;
5854
this.client = client;
5955
this.request = request;
60-
this.bgScanner = new BackgroundSearchScanner(client, maxResultWindow, queryBucketSize);
6156
}
6257

6358
@TestOnly
6459
public OpenSearchIndexScan(OpenSearchClient client, OpenSearchRequest request) {
65-
this(client, Integer.MAX_VALUE, 10000, 10000, request);
60+
this(client, Integer.MAX_VALUE, request);
6661
}
6762

6863
@Override
6964
public void open() {
7065
super.open();
66+
iterator = Collections.emptyIterator();
7167
queryCount = 0;
72-
this.bgScanner.startScanning(request);
68+
fetchNextBatch();
7369
}
7470

7571
@Override
@@ -84,8 +80,8 @@ public boolean hasNext() {
8480
return false;
8581
}
8682

87-
if (iterator == null || (!iterator.hasNext() && !this.bgScanner.isScanDone())) {
88-
iterator = fetchNextBatch();
83+
if (!iterator.hasNext()) {
84+
fetchNextBatch();
8985
}
9086
return iterator.hasNext();
9187
}
@@ -101,16 +97,17 @@ public ExprValue next() {
10197
return iterator.next();
10298
}
10399

104-
private Iterator<ExprValue> fetchNextBatch() {
105-
BackgroundSearchScanner.SearchBatchResult result = bgScanner.fetchNextBatch(request);
106-
return result.iterator();
100+
private void fetchNextBatch() {
101+
OpenSearchResponse response = client.search(request);
102+
if (!response.isEmpty()) {
103+
iterator = response.iterator();
104+
}
107105
}
108106

109107
@Override
110108
public void close() {
111109
super.close();
112-
iterator = Collections.emptyIterator();
113-
bgScanner.close();
110+
114111
client.cleanup(request);
115112
}
116113

0 commit comments

Comments
 (0)