Skip to content

Commit 69853fe

Browse files
Add pit for pagination query (#2940)
* Add pit for join queries (#2703) * Add search after for join Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Enable search after by default Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Add pit Signed-off-by: Rupal Mahajan <maharup@amazon.com> * nit Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Fix tests Signed-off-by: Rupal Mahajan <maharup@amazon.com> * ignore joinWithGeoIntersectNL Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Rerun CI with scroll Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Remove unused code and retrigger CI with search_after true Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Address comments Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Remove unused code change Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Update pit keep alive time with SQL_CURSOR_KEEP_ALIVE Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Fix scroll condition Signed-off-by: Rupal Mahajan <maharup@amazon.com> * nit Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Add pit before query execution Signed-off-by: Rupal Mahajan <maharup@amazon.com> * nit Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Move pit from join request builder to executor Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Remove unused methods Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Add pit in parent class's run() Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Add comment for fetching subsequent result in NestedLoopsElasticExecutor Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Update comment Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Add javadoc for pit handler Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Add pit interface Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Add pit handler unit test Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Fix failed unit test CI Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Fix spotless error Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Rename pit class and add logs Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Fix pit delete unit test Signed-off-by: Rupal Mahajan <maharup@amazon.com> --------- Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Add pit for multi query (#2753) * Add search after for join Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Enable search after by default Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Add pit Signed-off-by: Rupal Mahajan <maharup@amazon.com> * nit Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Fix tests Signed-off-by: Rupal Mahajan <maharup@amazon.com> * ignore joinWithGeoIntersectNL Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Rerun CI with scroll Signed-off-by: Rupal Mahajan <maharup@amazon.com> * draft Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Remove unused code and retrigger CI with search_after true Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Address comments Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Remove unused code change Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Update pit keep alive time with SQL_CURSOR_KEEP_ALIVE Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Fix scroll condition Signed-off-by: Rupal Mahajan <maharup@amazon.com> * nit Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Add pit before query execution Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Refactor get response with pit method Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Update remaining scroll search calls Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Fix integ test failures Signed-off-by: Rupal Mahajan <maharup@amazon.com> * nit Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Move pit from join request builder to executor Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Remove unused methods Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Move pit from request to executor Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Fix pit.delete call missed while merge Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Move getResponseWithHits method to util class Signed-off-by: Rupal Mahajan <maharup@amazon.com> * add try catch for create delete pit in minus executor Signed-off-by: Rupal Mahajan <maharup@amazon.com> * move all common fields to ElasticHitsExecutor Signed-off-by: Rupal Mahajan <maharup@amazon.com> * add javadoc for ElasticHitsExecutor Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Add missing javadoc Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Forcing an empty commit as last commit is stuck processing updates Signed-off-by: Rupal Mahajan <maharup@amazon.com> --------- Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Add pit to default cursor Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Run CI without pit unit test Signed-off-by: Rupal Mahajan <maharup@amazon.com> * Rerun CI without pit unit test Signed-off-by: Rupal Mahajan <maharup@amazon.com> * FIx unit tests for PIT changes Signed-off-by: Manasvini B S <manasvis@amazon.com> * Addressed comments Signed-off-by: Manasvini B S <manasvis@amazon.com> --------- Signed-off-by: Rupal Mahajan <maharup@amazon.com> Signed-off-by: Manasvini B S <manasvis@amazon.com> Co-authored-by: Rupal Mahajan <maharup@amazon.com>
1 parent 7815c96 commit 69853fe

12 files changed

Lines changed: 513 additions & 110 deletions

File tree

legacy/src/main/java/org/opensearch/sql/legacy/cursor/DefaultCursor.java

Lines changed: 119 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,19 @@
55

66
package org.opensearch.sql.legacy.cursor;
77

8+
import static org.opensearch.core.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS;
9+
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;
10+
11+
import com.fasterxml.jackson.core.JsonProcessingException;
12+
import com.fasterxml.jackson.databind.ObjectMapper;
813
import com.google.common.base.Strings;
14+
import java.io.ByteArrayInputStream;
15+
import java.io.ByteArrayOutputStream;
16+
import java.io.IOException;
17+
import java.security.AccessController;
18+
import java.security.PrivilegedAction;
919
import java.util.Base64;
20+
import java.util.Collections;
1021
import java.util.HashMap;
1122
import java.util.List;
1223
import java.util.Map;
@@ -18,6 +29,16 @@
1829
import lombok.Setter;
1930
import org.json.JSONArray;
2031
import org.json.JSONObject;
32+
import org.opensearch.common.settings.Settings;
33+
import org.opensearch.common.xcontent.XContentFactory;
34+
import org.opensearch.common.xcontent.XContentType;
35+
import org.opensearch.core.xcontent.NamedXContentRegistry;
36+
import org.opensearch.core.xcontent.XContentBuilder;
37+
import org.opensearch.core.xcontent.XContentParser;
38+
import org.opensearch.index.query.QueryBuilder;
39+
import org.opensearch.search.SearchModule;
40+
import org.opensearch.search.builder.SearchSourceBuilder;
41+
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
2142
import org.opensearch.sql.legacy.executor.format.Schema;
2243

2344
/**
@@ -40,6 +61,10 @@ public class DefaultCursor implements Cursor {
4061
private static final String SCROLL_ID = "s";
4162
private static final String SCHEMA_COLUMNS = "c";
4263
private static final String FIELD_ALIAS_MAP = "a";
64+
private static final String PIT_ID = "p";
65+
private static final String SEARCH_REQUEST = "r";
66+
private static final String SORT_FIELDS = "h";
67+
private static final ObjectMapper objectMapper = new ObjectMapper();
4368

4469
/**
4570
* To get mappings for index to check if type is date needed for
@@ -70,31 +95,85 @@ public class DefaultCursor implements Cursor {
7095
/** To get next batch of result */
7196
private String scrollId;
7297

98+
/** To get Point In Time */
99+
private String pitId;
100+
101+
/** To get next batch of result with search after api */
102+
private SearchSourceBuilder searchSourceBuilder;
103+
104+
/** To get last sort values * */
105+
private Object[] sortFields;
106+
73107
/** To reduce the number of rows left by fetchSize */
74108
@NonNull private Integer fetchSize;
75109

76110
private Integer limit;
77111

112+
/**
113+
* {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link QueryBuilder}
114+
* from DSL query string.
115+
*/
116+
private static final NamedXContentRegistry xContentRegistry =
117+
new NamedXContentRegistry(
118+
new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedXContents());
119+
78120
@Override
79121
public CursorType getType() {
80122
return type;
81123
}
82124

83125
@Override
84126
public String generateCursorId() {
85-
if (rowsLeft <= 0 || Strings.isNullOrEmpty(scrollId)) {
127+
if (rowsLeft <= 0 || isCursorIdNullOrEmpty()) {
86128
return null;
87129
}
88130
JSONObject json = new JSONObject();
89131
json.put(FETCH_SIZE, fetchSize);
90132
json.put(ROWS_LEFT, rowsLeft);
91133
json.put(INDEX_PATTERN, indexPattern);
92-
json.put(SCROLL_ID, scrollId);
93134
json.put(SCHEMA_COLUMNS, getSchemaAsJson());
94135
json.put(FIELD_ALIAS_MAP, fieldAliasMap);
136+
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
137+
json.put(PIT_ID, pitId);
138+
String sortFieldValue =
139+
AccessController.doPrivileged(
140+
(PrivilegedAction<String>)
141+
() -> {
142+
try {
143+
return objectMapper.writeValueAsString(sortFields);
144+
} catch (JsonProcessingException e) {
145+
throw new RuntimeException(
146+
"Failed to parse sort fields from JSON string.", e);
147+
}
148+
});
149+
json.put(SORT_FIELDS, sortFieldValue);
150+
setSearchRequestString(json, searchSourceBuilder);
151+
} else {
152+
json.put(SCROLL_ID, scrollId);
153+
}
95154
return String.format("%s:%s", type.getId(), encodeCursor(json));
96155
}
97156

157+
private void setSearchRequestString(JSONObject cursorJson, SearchSourceBuilder sourceBuilder) {
158+
try {
159+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
160+
XContentBuilder builder = XContentFactory.jsonBuilder(outputStream);
161+
sourceBuilder.toXContent(builder, null);
162+
builder.close();
163+
164+
String searchRequestBase64 = Base64.getEncoder().encodeToString(outputStream.toByteArray());
165+
cursorJson.put("searchSourceBuilder", searchRequestBase64);
166+
} catch (IOException ex) {
167+
throw new RuntimeException("Failed to set search request string on cursor json.", ex);
168+
}
169+
}
170+
171+
private boolean isCursorIdNullOrEmpty() {
172+
return LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)
173+
? Strings.isNullOrEmpty(pitId)
174+
: Strings.isNullOrEmpty(scrollId);
175+
}
176+
98177
public static DefaultCursor from(String cursorId) {
99178
/**
100179
* It is assumed that cursorId here is the second part of the original cursor passed by the
@@ -105,13 +184,50 @@ public static DefaultCursor from(String cursorId) {
105184
cursor.setFetchSize(json.getInt(FETCH_SIZE));
106185
cursor.setRowsLeft(json.getLong(ROWS_LEFT));
107186
cursor.setIndexPattern(json.getString(INDEX_PATTERN));
108-
cursor.setScrollId(json.getString(SCROLL_ID));
187+
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
188+
populateCursorForPit(json, cursor);
189+
} else {
190+
cursor.setScrollId(json.getString(SCROLL_ID));
191+
}
109192
cursor.setColumns(getColumnsFromSchema(json.getJSONArray(SCHEMA_COLUMNS)));
110193
cursor.setFieldAliasMap(fieldAliasMap(json.getJSONObject(FIELD_ALIAS_MAP)));
111194

112195
return cursor;
113196
}
114197

198+
private static void populateCursorForPit(JSONObject json, DefaultCursor cursor) {
199+
cursor.setPitId(json.getString(PIT_ID));
200+
201+
cursor.setSortFields(getSortFieldsFromJson(json));
202+
203+
// Retrieve and set the SearchSourceBuilder from the JSON field
204+
String searchSourceBuilderBase64 = json.getString("searchSourceBuilder");
205+
byte[] bytes = Base64.getDecoder().decode(searchSourceBuilderBase64);
206+
ByteArrayInputStream streamInput = new ByteArrayInputStream(bytes);
207+
try {
208+
XContentParser parser =
209+
XContentType.JSON
210+
.xContent()
211+
.createParser(xContentRegistry, IGNORE_DEPRECATIONS, streamInput);
212+
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.fromXContent(parser);
213+
cursor.setSearchSourceBuilder(sourceBuilder);
214+
} catch (IOException ex) {
215+
throw new RuntimeException("Failed to get searchSourceBuilder from cursor Id", ex);
216+
}
217+
}
218+
219+
private static Object[] getSortFieldsFromJson(JSONObject json) {
220+
return AccessController.doPrivileged(
221+
(PrivilegedAction<Object[]>)
222+
() -> {
223+
try {
224+
return objectMapper.readValue(json.getString(SORT_FIELDS), Object[].class);
225+
} catch (JsonProcessingException e) {
226+
throw new RuntimeException("Failed to parse sort fields from JSON string.", e);
227+
}
228+
});
229+
}
230+
115231
private JSONArray getSchemaAsJson() {
116232
JSONArray schemaJson = new JSONArray();
117233

legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorCloseExecutor.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.sql.legacy.executor.cursor;
77

88
import static org.opensearch.core.rest.RestStatus.OK;
9+
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;
910

1011
import java.util.Map;
1112
import org.apache.logging.log4j.LogManager;
@@ -18,8 +19,11 @@
1819
import org.opensearch.rest.RestChannel;
1920
import org.opensearch.sql.legacy.cursor.CursorType;
2021
import org.opensearch.sql.legacy.cursor.DefaultCursor;
22+
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
2123
import org.opensearch.sql.legacy.metrics.MetricName;
2224
import org.opensearch.sql.legacy.metrics.Metrics;
25+
import org.opensearch.sql.legacy.pit.PointInTimeHandler;
26+
import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl;
2327
import org.opensearch.sql.legacy.rewriter.matchtoterm.VerificationException;
2428

2529
public class CursorCloseExecutor implements CursorRestExecutor {
@@ -79,14 +83,26 @@ public String execute(Client client, Map<String, String> params) throws Exceptio
7983
}
8084

8185
private String handleDefaultCursorCloseRequest(Client client, DefaultCursor cursor) {
82-
String scrollId = cursor.getScrollId();
83-
ClearScrollResponse clearScrollResponse =
84-
client.prepareClearScroll().addScrollId(scrollId).get();
85-
if (clearScrollResponse.isSucceeded()) {
86-
return SUCCEEDED_TRUE;
86+
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
87+
String pitId = cursor.getPitId();
88+
PointInTimeHandler pit = new PointInTimeHandlerImpl(client, pitId);
89+
try {
90+
pit.delete();
91+
return SUCCEEDED_TRUE;
92+
} catch (RuntimeException e) {
93+
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
94+
return SUCCEEDED_FALSE;
95+
}
8796
} else {
88-
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
89-
return SUCCEEDED_FALSE;
97+
String scrollId = cursor.getScrollId();
98+
ClearScrollResponse clearScrollResponse =
99+
client.prepareClearScroll().addScrollId(scrollId).get();
100+
if (clearScrollResponse.isSucceeded()) {
101+
return SUCCEEDED_TRUE;
102+
} else {
103+
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
104+
return SUCCEEDED_FALSE;
105+
}
90106
}
91107
}
92108
}

legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorResultExecutor.java

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
package org.opensearch.sql.legacy.executor.cursor;
77

88
import static org.opensearch.core.rest.RestStatus.OK;
9+
import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE;
10+
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;
911

1012
import java.util.Arrays;
1113
import java.util.Map;
@@ -14,21 +16,25 @@
1416
import org.json.JSONException;
1517
import org.opensearch.OpenSearchException;
1618
import org.opensearch.action.search.ClearScrollResponse;
19+
import org.opensearch.action.search.SearchRequest;
1720
import org.opensearch.action.search.SearchResponse;
1821
import org.opensearch.client.Client;
1922
import org.opensearch.common.unit.TimeValue;
2023
import org.opensearch.rest.BytesRestResponse;
2124
import org.opensearch.rest.RestChannel;
2225
import org.opensearch.search.SearchHit;
2326
import org.opensearch.search.SearchHits;
24-
import org.opensearch.sql.common.setting.Settings;
27+
import org.opensearch.search.builder.PointInTimeBuilder;
28+
import org.opensearch.search.builder.SearchSourceBuilder;
2529
import org.opensearch.sql.legacy.cursor.CursorType;
2630
import org.opensearch.sql.legacy.cursor.DefaultCursor;
2731
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
2832
import org.opensearch.sql.legacy.executor.Format;
2933
import org.opensearch.sql.legacy.executor.format.Protocol;
3034
import org.opensearch.sql.legacy.metrics.MetricName;
3135
import org.opensearch.sql.legacy.metrics.Metrics;
36+
import org.opensearch.sql.legacy.pit.PointInTimeHandler;
37+
import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl;
3238
import org.opensearch.sql.legacy.rewriter.matchtoterm.VerificationException;
3339

3440
public class CursorResultExecutor implements CursorRestExecutor {
@@ -91,14 +97,27 @@ public String execute(Client client, Map<String, String> params) throws Exceptio
9197
}
9298

9399
private String handleDefaultCursorRequest(Client client, DefaultCursor cursor) {
94-
String previousScrollId = cursor.getScrollId();
95100
LocalClusterState clusterState = LocalClusterState.state();
96-
TimeValue scrollTimeout = clusterState.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE);
97-
SearchResponse scrollResponse =
98-
client.prepareSearchScroll(previousScrollId).setScroll(scrollTimeout).get();
101+
TimeValue paginationTimeout = clusterState.getSettingValue(SQL_CURSOR_KEEP_ALIVE);
102+
103+
SearchResponse scrollResponse = null;
104+
if (clusterState.getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
105+
String pitId = cursor.getPitId();
106+
SearchSourceBuilder source = cursor.getSearchSourceBuilder();
107+
source.searchAfter(cursor.getSortFields());
108+
source.pointInTimeBuilder(new PointInTimeBuilder(pitId));
109+
SearchRequest searchRequest = new SearchRequest();
110+
searchRequest.source(source);
111+
scrollResponse = client.search(searchRequest).actionGet();
112+
} else {
113+
String previousScrollId = cursor.getScrollId();
114+
scrollResponse =
115+
client.prepareSearchScroll(previousScrollId).setScroll(paginationTimeout).get();
116+
}
99117
SearchHits searchHits = scrollResponse.getHits();
100118
SearchHit[] searchHitArray = searchHits.getHits();
101119
String newScrollId = scrollResponse.getScrollId();
120+
String newPitId = scrollResponse.pointInTimeId();
102121

103122
int rowsLeft = (int) cursor.getRowsLeft();
104123
int fetch = cursor.getFetchSize();
@@ -124,16 +143,37 @@ private String handleDefaultCursorRequest(Client client, DefaultCursor cursor) {
124143

125144
if (rowsLeft <= 0) {
126145
/** Clear the scroll context on last page */
127-
ClearScrollResponse clearScrollResponse =
128-
client.prepareClearScroll().addScrollId(newScrollId).get();
129-
if (!clearScrollResponse.isSucceeded()) {
130-
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
131-
LOG.info("Error closing the cursor context {} ", newScrollId);
146+
if (newScrollId != null) {
147+
ClearScrollResponse clearScrollResponse =
148+
client.prepareClearScroll().addScrollId(newScrollId).get();
149+
if (!clearScrollResponse.isSucceeded()) {
150+
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
151+
LOG.info("Error closing the cursor context {} ", newScrollId);
152+
}
153+
}
154+
if (newPitId != null) {
155+
PointInTimeHandler pit = new PointInTimeHandlerImpl(client, newPitId);
156+
try {
157+
pit.delete();
158+
} catch (RuntimeException e) {
159+
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
160+
LOG.info("Error deleting point in time {} ", newPitId);
161+
}
132162
}
133163
}
134164

135165
cursor.setRowsLeft(rowsLeft);
136-
cursor.setScrollId(newScrollId);
166+
if (clusterState.getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
167+
cursor.setPitId(newPitId);
168+
cursor.setSearchSourceBuilder(cursor.getSearchSourceBuilder());
169+
cursor.setSortFields(
170+
scrollResponse
171+
.getHits()
172+
.getAt(scrollResponse.getHits().getHits().length - 1)
173+
.getSortValues());
174+
} else {
175+
cursor.setScrollId(newScrollId);
176+
}
137177
Protocol protocol = new Protocol(client, searchHits, format.name().toLowerCase(), cursor);
138178
return protocol.cursorFormat();
139179
}

0 commit comments

Comments
 (0)