Skip to content

Commit aea33da

Browse files
Add setting plugins.query.executionengine.async_query.enabled (#2510) (#2512)
* Add setting plugins.query.executionengine.async_query.enabled * fix format --------- (cherry picked from commit cddffc6) Signed-off-by: Peng Huo <penghuo@gmail.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 73ca8c6 commit aea33da

7 files changed

Lines changed: 176 additions & 5 deletions

File tree

common/src/main/java/org/opensearch/sql/common/setting/Settings.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@ public enum Key {
4545
AUTO_INDEX_MANAGEMENT_ENABLED(
4646
"plugins.query.executionengine.spark.auto_index_management.enabled"),
4747
SESSION_INACTIVITY_TIMEOUT_MILLIS(
48-
"plugins.query.executionengine.spark.session_inactivity_timeout_millis");
48+
"plugins.query.executionengine.spark.session_inactivity_timeout_millis"),
49+
50+
/** Async query Settings * */
51+
ASYNC_QUERY_ENABLED("plugins.query.executionengine.async_query.enabled");
4952

5053
@Getter private final String keyValue;
5154

docs/user/admin/settings.rst

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -562,4 +562,36 @@ SQL query::
562562
}
563563
}
564564
}
565-
}
565+
}
566+
567+
plugins.query.executionengine.async_query.enabled
568+
===============================
569+
570+
Description
571+
-----------
572+
You can disable submit async query to reject all coming requests.
573+
574+
1. The default value is true.
575+
2. This setting is node scope.
576+
3. This setting can be updated dynamically.
577+
578+
Request::
579+
580+
sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \
581+
... -d '{"transient":{"plugins.query.executionengine.async_query.enabled":"false"}}'
582+
{
583+
"acknowledged": true,
584+
"persistent": {},
585+
"transient": {
586+
"plugins": {
587+
"query": {
588+
"executionengine": {
589+
"async_query": {
590+
"enabled": "false"
591+
}
592+
}
593+
}
594+
}
595+
}
596+
}
597+
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.asyncquery;
7+
8+
import static org.hamcrest.Matchers.equalTo;
9+
import static org.opensearch.sql.legacy.TestUtils.getResponseBody;
10+
11+
import java.io.IOException;
12+
import java.util.Locale;
13+
import org.json.JSONObject;
14+
import org.junit.Assert;
15+
import org.junit.Test;
16+
import org.opensearch.client.Request;
17+
import org.opensearch.client.RequestOptions;
18+
import org.opensearch.client.Response;
19+
import org.opensearch.client.ResponseException;
20+
import org.opensearch.sql.ppl.PPLIntegTestCase;
21+
import org.opensearch.sql.util.TestUtils;
22+
23+
public class AsyncQueryIT extends PPLIntegTestCase {
24+
25+
public static final String ASYNC_QUERY_ACTION_URL = "/_plugins/_async_query";
26+
27+
@Test
28+
public void asyncQueryEnabledSettingsTest() throws IOException {
29+
String setting = "plugins.query.executionengine.async_query.enabled";
30+
// disable
31+
updateClusterSettings(new ClusterSetting(PERSISTENT, setting, "false"));
32+
33+
String query = "select 1";
34+
Response response = null;
35+
try {
36+
executeAsyncQueryToString(query);
37+
} catch (ResponseException ex) {
38+
response = ex.getResponse();
39+
}
40+
41+
JSONObject result = new JSONObject(TestUtils.getResponseBody(response));
42+
assertThat(result.getInt("status"), equalTo(400));
43+
JSONObject error = result.getJSONObject("error");
44+
assertThat(error.getString("reason"), equalTo("Invalid Request"));
45+
assertThat(
46+
error.getString("details"),
47+
equalTo("plugins.query.executionengine.async_query.enabled setting is false"));
48+
assertThat(error.getString("type"), equalTo("IllegalAccessException"));
49+
50+
// reset the setting
51+
updateClusterSettings(new ClusterSetting(PERSISTENT, setting, null));
52+
}
53+
54+
protected String executeAsyncQueryToString(String query) throws IOException {
55+
Response response = client().performRequest(buildAsyncRequest(query, ASYNC_QUERY_ACTION_URL));
56+
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
57+
return getResponseBody(response, true);
58+
}
59+
60+
protected Request buildAsyncRequest(String query, String endpoint) {
61+
Request request = new Request("POST", endpoint);
62+
request.setJsonEntity(String.format(Locale.ROOT, "{\n" + " \"query\": \"%s\"\n" + "}", query));
63+
request.setJsonEntity(
64+
String.format(
65+
Locale.ROOT,
66+
"{\n"
67+
+ " \"datasource\": \"mys3\",\n"
68+
+ " \"lang\": \"sql\",\n"
69+
+ " \"query\": \"%s\"\n"
70+
+ "}",
71+
query));
72+
73+
RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder();
74+
restOptionsBuilder.addHeader("Content-Type", "application/json");
75+
request.setOptions(restOptionsBuilder);
76+
return request;
77+
}
78+
}

opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,13 @@ public class OpenSearchSettings extends Settings {
131131
Setting.Property.NodeScope,
132132
Setting.Property.Dynamic);
133133

134+
public static final Setting<Boolean> ASYNC_QUERY_ENABLED_SETTING =
135+
Setting.boolSetting(
136+
Key.ASYNC_QUERY_ENABLED.getKeyValue(),
137+
true,
138+
Setting.Property.NodeScope,
139+
Setting.Property.Dynamic);
140+
134141
public static final Setting<String> SPARK_EXECUTION_ENGINE_CONFIG =
135142
Setting.simpleString(
136143
Key.SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue(),
@@ -250,6 +257,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
250257
Key.DATASOURCES_URI_HOSTS_DENY_LIST,
251258
DATASOURCE_URI_HOSTS_DENY_LIST,
252259
new Updater(Key.DATASOURCES_URI_HOSTS_DENY_LIST));
260+
register(
261+
settingBuilder,
262+
clusterSettings,
263+
Key.ASYNC_QUERY_ENABLED,
264+
ASYNC_QUERY_ENABLED_SETTING,
265+
new Updater(Key.ASYNC_QUERY_ENABLED));
253266
register(
254267
settingBuilder,
255268
clusterSettings,
@@ -362,6 +375,7 @@ public static List<Setting<?>> pluginSettings() {
362375
.add(METRICS_ROLLING_WINDOW_SETTING)
363376
.add(METRICS_ROLLING_INTERVAL_SETTING)
364377
.add(DATASOURCE_URI_HOSTS_DENY_LIST)
378+
.add(ASYNC_QUERY_ENABLED_SETTING)
365379
.add(SPARK_EXECUTION_ENGINE_CONFIG)
366380
.add(SPARK_EXECUTION_SESSION_LIMIT_SETTING)
367381
.add(SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING)

spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,8 @@ private static boolean isClientError(Exception e) {
236236
return e instanceof IllegalArgumentException
237237
|| e instanceof IllegalStateException
238238
|| e instanceof DataSourceNotFoundException
239-
|| e instanceof AsyncQueryNotFoundException;
239+
|| e instanceof AsyncQueryNotFoundException
240+
|| e instanceof IllegalAccessException;
240241
}
241242

242243
private void addSystemErrorMetric(RestRequest.Method requestMethod) {

spark/src/main/java/org/opensearch/sql/spark/transport/TransportCreateAsyncQueryRequestAction.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@
77

88
package org.opensearch.sql.spark.transport;
99

10+
import java.util.Locale;
1011
import org.opensearch.action.ActionType;
1112
import org.opensearch.action.support.ActionFilters;
1213
import org.opensearch.action.support.HandledTransportAction;
1314
import org.opensearch.common.inject.Inject;
1415
import org.opensearch.core.action.ActionListener;
16+
import org.opensearch.sql.common.setting.Settings;
17+
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;
1518
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
1619
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
1720
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceImpl;
@@ -26,6 +29,7 @@ public class TransportCreateAsyncQueryRequestAction
2629
extends HandledTransportAction<CreateAsyncQueryActionRequest, CreateAsyncQueryActionResponse> {
2730

2831
private final AsyncQueryExecutorService asyncQueryExecutorService;
32+
private final OpenSearchSettings pluginSettings;
2933

3034
public static final String NAME = "cluster:admin/opensearch/ql/async_query/create";
3135
public static final ActionType<CreateAsyncQueryActionResponse> ACTION_TYPE =
@@ -35,9 +39,11 @@ public class TransportCreateAsyncQueryRequestAction
3539
public TransportCreateAsyncQueryRequestAction(
3640
TransportService transportService,
3741
ActionFilters actionFilters,
38-
AsyncQueryExecutorServiceImpl jobManagementService) {
42+
AsyncQueryExecutorServiceImpl jobManagementService,
43+
OpenSearchSettings pluginSettings) {
3944
super(NAME, transportService, actionFilters, CreateAsyncQueryActionRequest::new);
4045
this.asyncQueryExecutorService = jobManagementService;
46+
this.pluginSettings = pluginSettings;
4147
}
4248

4349
@Override
@@ -46,6 +52,16 @@ protected void doExecute(
4652
CreateAsyncQueryActionRequest request,
4753
ActionListener<CreateAsyncQueryActionResponse> listener) {
4854
try {
55+
if (!(Boolean) pluginSettings.getSettingValue(Settings.Key.ASYNC_QUERY_ENABLED)) {
56+
listener.onFailure(
57+
new IllegalAccessException(
58+
String.format(
59+
Locale.ROOT,
60+
"%s setting is " + "false",
61+
Settings.Key.ASYNC_QUERY_ENABLED.getKeyValue())));
62+
return;
63+
}
64+
4965
CreateAsyncQueryRequest createAsyncQueryRequest = request.getCreateAsyncQueryRequest();
5066
CreateAsyncQueryResponse createAsyncQueryResponse =
5167
asyncQueryExecutorService.createAsyncQuery(createAsyncQueryRequest);

spark/src/test/java/org/opensearch/sql/spark/transport/TransportCreateAsyncQueryRequestActionTest.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.opensearch.sql.spark.transport;
99

1010
import static org.mockito.Mockito.doThrow;
11+
import static org.mockito.Mockito.never;
1112
import static org.mockito.Mockito.times;
1213
import static org.mockito.Mockito.verify;
1314
import static org.mockito.Mockito.when;
@@ -25,6 +26,8 @@
2526
import org.mockito.junit.jupiter.MockitoExtension;
2627
import org.opensearch.action.support.ActionFilters;
2728
import org.opensearch.core.action.ActionListener;
29+
import org.opensearch.sql.common.setting.Settings;
30+
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;
2831
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceImpl;
2932
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
3033
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse;
@@ -42,6 +45,7 @@ public class TransportCreateAsyncQueryRequestActionTest {
4245
@Mock private AsyncQueryExecutorServiceImpl jobExecutorService;
4346
@Mock private Task task;
4447
@Mock private ActionListener<CreateAsyncQueryActionResponse> actionListener;
48+
@Mock private OpenSearchSettings pluginSettings;
4549

4650
@Captor
4751
private ArgumentCaptor<CreateAsyncQueryActionResponse> createJobActionResponseArgumentCaptor;
@@ -52,7 +56,10 @@ public class TransportCreateAsyncQueryRequestActionTest {
5256
public void setUp() {
5357
action =
5458
new TransportCreateAsyncQueryRequestAction(
55-
transportService, new ActionFilters(new HashSet<>()), jobExecutorService);
59+
transportService,
60+
new ActionFilters(new HashSet<>()),
61+
jobExecutorService,
62+
pluginSettings);
5663
}
5764

5865
@Test
@@ -61,6 +68,7 @@ public void testDoExecute() {
6168
new CreateAsyncQueryRequest("source = my_glue.default.alb_logs", "my_glue", LangType.SQL);
6269
CreateAsyncQueryActionRequest request =
6370
new CreateAsyncQueryActionRequest(createAsyncQueryRequest);
71+
when(pluginSettings.getSettingValue(Settings.Key.ASYNC_QUERY_ENABLED)).thenReturn(true);
6472
when(jobExecutorService.createAsyncQuery(createAsyncQueryRequest))
6573
.thenReturn(new CreateAsyncQueryResponse("123", null));
6674
action.doExecute(task, request, actionListener);
@@ -78,6 +86,7 @@ public void testDoExecuteWithSessionId() {
7886
"source = my_glue.default.alb_logs", "my_glue", LangType.SQL, MOCK_SESSION_ID);
7987
CreateAsyncQueryActionRequest request =
8088
new CreateAsyncQueryActionRequest(createAsyncQueryRequest);
89+
when(pluginSettings.getSettingValue(Settings.Key.ASYNC_QUERY_ENABLED)).thenReturn(true);
8190
when(jobExecutorService.createAsyncQuery(createAsyncQueryRequest))
8291
.thenReturn(new CreateAsyncQueryResponse("123", MOCK_SESSION_ID));
8392
action.doExecute(task, request, actionListener);
@@ -95,6 +104,7 @@ public void testDoExecuteWithException() {
95104
new CreateAsyncQueryRequest("source = my_glue.default.alb_logs", "my_glue", LangType.SQL);
96105
CreateAsyncQueryActionRequest request =
97106
new CreateAsyncQueryActionRequest(createAsyncQueryRequest);
107+
when(pluginSettings.getSettingValue(Settings.Key.ASYNC_QUERY_ENABLED)).thenReturn(true);
98108
doThrow(new RuntimeException("Error"))
99109
.when(jobExecutorService)
100110
.createAsyncQuery(createAsyncQueryRequest);
@@ -105,4 +115,21 @@ public void testDoExecuteWithException() {
105115
Assertions.assertTrue(exception instanceof RuntimeException);
106116
Assertions.assertEquals("Error", exception.getMessage());
107117
}
118+
119+
@Test
120+
public void asyncQueryDisabled() {
121+
CreateAsyncQueryRequest createAsyncQueryRequest =
122+
new CreateAsyncQueryRequest("source = my_glue.default.alb_logs", "my_glue", LangType.SQL);
123+
CreateAsyncQueryActionRequest request =
124+
new CreateAsyncQueryActionRequest(createAsyncQueryRequest);
125+
when(pluginSettings.getSettingValue(Settings.Key.ASYNC_QUERY_ENABLED)).thenReturn(false);
126+
action.doExecute(task, request, actionListener);
127+
verify(jobExecutorService, never()).createAsyncQuery(createAsyncQueryRequest);
128+
Mockito.verify(actionListener).onFailure(exceptionArgumentCaptor.capture());
129+
Exception exception = exceptionArgumentCaptor.getValue();
130+
Assertions.assertTrue(exception instanceof IllegalAccessException);
131+
Assertions.assertEquals(
132+
"plugins.query.executionengine.async_query.enabled " + "setting is false",
133+
exception.getMessage());
134+
}
108135
}

0 commit comments

Comments
 (0)