55
66package org .opensearch .sql .legacy .executor .format ;
77
8- import static org .opensearch .sql .common .setting .Settings .Key .SQL_PAGINATION_API_SEARCH_AFTER ;
9-
8+ import java .util .Locale ;
109import java .util .Map ;
1110import java .util .Objects ;
1211import org .apache .logging .log4j .LogManager ;
1514import org .opensearch .action .search .SearchRequestBuilder ;
1615import org .opensearch .action .search .SearchResponse ;
1716import org .opensearch .client .Client ;
18- import org .opensearch .core .common .Strings ;
1917import org .opensearch .core .rest .RestStatus ;
2018import org .opensearch .rest .BytesRestResponse ;
2119import org .opensearch .rest .RestChannel ;
2220import org .opensearch .search .builder .PointInTimeBuilder ;
2321import org .opensearch .sql .legacy .cursor .Cursor ;
2422import org .opensearch .sql .legacy .cursor .DefaultCursor ;
25- import org .opensearch .sql .legacy .esdomain .LocalClusterState ;
2623import org .opensearch .sql .legacy .exception .SqlParseException ;
2724import org .opensearch .sql .legacy .executor .QueryActionElasticExecutor ;
2825import org .opensearch .sql .legacy .executor .RestExecutor ;
26+ import org .opensearch .sql .legacy .metrics .MetricName ;
27+ import org .opensearch .sql .legacy .metrics .Metrics ;
2928import org .opensearch .sql .legacy .pit .PointInTimeHandler ;
3029import org .opensearch .sql .legacy .pit .PointInTimeHandlerImpl ;
3130import org .opensearch .sql .legacy .query .DefaultQueryAction ;
3231import org .opensearch .sql .legacy .query .QueryAction ;
33- import org .opensearch .sql .legacy .query .SqlOpenSearchRequestBuilder ;
3432import org .opensearch .sql .legacy .query .join .BackOffRetryStrategy ;
3533
3634public class PrettyFormatRestExecutor implements RestExecutor {
@@ -40,7 +38,7 @@ public class PrettyFormatRestExecutor implements RestExecutor {
4038 private final String format ;
4139
4240 public PrettyFormatRestExecutor (String format ) {
43- this .format = format .toLowerCase ();
41+ this .format = Objects . requireNonNull ( format , "Format cannot be null" ) .toLowerCase (Locale . ROOT );
4442 }
4543
4644 /** Execute the QueryAction and return the REST response using the channel. */
@@ -76,70 +74,98 @@ public String execute(Client client, Map<String, String> params, QueryAction que
7674 Object queryResult = QueryActionElasticExecutor .executeAnyAction (client , queryAction );
7775 protocol = new Protocol (client , queryAction , queryResult , format , Cursor .NULL_CURSOR );
7876 }
77+ } catch (SqlParseException e ) {
78+ LOG .warn ("SQL parsing error: {}" , e .getMessage (), e );
79+ protocol = new Protocol (e );
80+ } catch (OpenSearchException e ) {
81+ LOG .warn ("An error occurred in OpenSearch engine: {}" , e .getDetailedMessage (), e );
82+ protocol = new Protocol (e );
7983 } catch (Exception e ) {
80- if (e instanceof OpenSearchException ) {
81- LOG .warn (
82- "An error occurred in OpenSearch engine: "
83- + ((OpenSearchException ) e ).getDetailedMessage (),
84- e );
85- } else {
86- LOG .warn ("Error happened in pretty formatter" , e );
87- }
84+ LOG .warn ("Error happened in pretty formatter" , e );
8885 protocol = new Protocol (e );
8986 }
9087
9188 return protocol .format ();
9289 }
9390
9491 /**
95- * QueryActionElasticExecutor.executeAnyAction() returns SearchHits inside SearchResponse. In
96- * order to get scroll ID if any, we need to execute DefaultQueryAction ourselves for
97- * SearchResponse .
92+ * Builds protocol for default query execution.
93+ *
94+ * <p>Routes to pagination or non-pagination execution based on fetch_size parameter .
9895 */
9996 private Protocol buildProtocolForDefaultQuery (Client client , DefaultQueryAction queryAction )
10097 throws SqlParseException {
10198
102- PointInTimeHandler pit = null ;
103- SearchResponse response ;
104- SqlOpenSearchRequestBuilder sqlOpenSearchRequestBuilder = queryAction .explain ();
105- if (LocalClusterState .state ().getSettingValue (SQL_PAGINATION_API_SEARCH_AFTER )) {
106- pit = new PointInTimeHandlerImpl (client , queryAction .getSelect ().getIndexArr ());
107- pit .create ();
108- SearchRequestBuilder searchRequest = queryAction .getRequestBuilder ();
109- searchRequest .setPointInTime (new PointInTimeBuilder (pit .getPitId ()));
110- response = searchRequest .get ();
99+ queryAction .explain ();
100+
101+ Integer fetchSize = queryAction .getSqlRequest ().fetchSize ();
102+ if (fetchSize != null && fetchSize > 0 ) {
103+ return buildProtocolWithPagination (client , queryAction , fetchSize );
111104 } else {
112- response = ( SearchResponse ) sqlOpenSearchRequestBuilder . get ( );
105+ return buildProtocolWithoutPagination ( client , queryAction );
113106 }
107+ }
114108
115- Protocol protocol ;
116- if (isDefaultCursor (response , queryAction )) {
117- DefaultCursor defaultCursor = new DefaultCursor ();
118- defaultCursor .setLimit (queryAction .getSelect ().getRowCount ());
119- defaultCursor .setFetchSize (queryAction .getSqlRequest ().fetchSize ());
120- if (LocalClusterState .state ().getSettingValue (SQL_PAGINATION_API_SEARCH_AFTER )) {
121- defaultCursor .setPitId (pit .getPitId ());
122- defaultCursor .setSearchSourceBuilder (queryAction .getRequestBuilder ().request ().source ());
123- defaultCursor .setSortFields (
124- response .getHits ().getAt (response .getHits ().getHits ().length - 1 ).getSortValues ());
109+ /** Executes query with pagination support using Point-in-Time (PIT). */
110+ private Protocol buildProtocolWithPagination (
111+ Client client , DefaultQueryAction queryAction , Integer fetchSize ) {
112+
113+ PointInTimeHandler pit =
114+ new PointInTimeHandlerImpl (client , queryAction .getSelect ().getIndexArr ());
115+ pit .create ();
116+
117+ try {
118+ SearchRequestBuilder searchRequest = queryAction .getRequestBuilder ();
119+ searchRequest .setPointInTime (new PointInTimeBuilder (pit .getPitId ()));
120+ SearchResponse response = searchRequest .get ();
121+
122+ if (shouldCreateCursor (response , queryAction , fetchSize )) {
123+ DefaultCursor cursor = createCursorWithPit (pit , response , queryAction , fetchSize );
124+ return new Protocol (client , queryAction , response .getHits (), format , cursor );
125125 } else {
126- defaultCursor .setScrollId (response .getScrollId ());
126+ pit .delete ();
127+ return new Protocol (client , queryAction , response .getHits (), format , Cursor .NULL_CURSOR );
127128 }
128- protocol = new Protocol (client , queryAction , response .getHits (), format , defaultCursor );
129- } else {
130- protocol = new Protocol (client , queryAction , response .getHits (), format , Cursor .NULL_CURSOR );
129+ } catch (RuntimeException e ) {
130+ try {
131+ pit .delete ();
132+ } catch (RuntimeException deleteException ) {
133+ LOG .error ("Failed to delete PIT" , deleteException );
134+ Metrics .getInstance ().getNumericalMetric (MetricName .FAILED_REQ_COUNT_SYS ).increment ();
135+ }
136+ throw e ;
131137 }
138+ }
132139
133- return protocol ;
140+ private Protocol buildProtocolWithoutPagination (Client client , DefaultQueryAction queryAction ) {
141+ SearchRequestBuilder searchRequest = queryAction .getRequestBuilder ();
142+ SearchResponse response = searchRequest .get ();
143+ return new Protocol (client , queryAction , response .getHits (), format , Cursor .NULL_CURSOR );
134144 }
135145
136- protected boolean isDefaultCursor (SearchResponse searchResponse , DefaultQueryAction queryAction ) {
137- if (LocalClusterState .state ().getSettingValue (SQL_PAGINATION_API_SEARCH_AFTER )) {
138- return queryAction .getSqlRequest ().fetchSize () != 0
139- && Objects .requireNonNull (searchResponse .getHits ().getTotalHits ()).value
140- >= queryAction .getSqlRequest ().fetchSize ();
141- } else {
142- return !Strings .isNullOrEmpty (searchResponse .getScrollId ());
146+ private DefaultCursor createCursorWithPit (
147+ PointInTimeHandler pit ,
148+ SearchResponse response ,
149+ DefaultQueryAction queryAction ,
150+ Integer fetchSize ) {
151+ DefaultCursor cursor = new DefaultCursor ();
152+ cursor .setLimit (queryAction .getSelect ().getRowCount ());
153+ cursor .setFetchSize (fetchSize );
154+ cursor .setPitId (pit .getPitId ());
155+ cursor .setSearchSourceBuilder (queryAction .getRequestBuilder ().request ().source ());
156+
157+ if (response .getHits ().getHits ().length > 0 ) {
158+ cursor .setSortFields (
159+ response .getHits ().getAt (response .getHits ().getHits ().length - 1 ).getSortValues ());
143160 }
161+
162+ return cursor ;
163+ }
164+
165+ protected boolean shouldCreateCursor (
166+ SearchResponse searchResponse , DefaultQueryAction queryAction , Integer fetchSize ) {
167+ return fetchSize != null
168+ && searchResponse .getHits () != null
169+ && Objects .requireNonNull (searchResponse .getHits ().getTotalHits ()).value >= fetchSize ;
144170 }
145171}
0 commit comments