Skip to content

Commit 90eb072

Browse files
committed
Pushdown system limit automatically for data-intensive operations
Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent 435a124 commit 90eb072

20 files changed

Lines changed: 505 additions & 12 deletions

File tree

common/src/main/java/org/opensearch/sql/common/setting/Settings.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ public enum Key {
4040
/** Common Settings for SQL and PPL. */
4141
QUERY_MEMORY_LIMIT("plugins.query.memory_limit"),
4242
QUERY_SIZE_LIMIT("plugins.query.size_limit"),
43+
QUERY_SYSTEM_LIMIT("plugins.query.system_limit"),
44+
// currently, only join could bloat data, so SYSTEM_LIMIT_JOIN equals to SYSTEM_LIMIT
45+
// TODO add more specific configs in future when we have other data-bloat operations.
46+
QUERY_SYSTEM_LIMIT_JOIN("plugins.query.system_limit.join"),
4347
ENCYRPTION_MASTER_KEY("plugins.query.datasources.encryption.masterkey"),
4448
DATASOURCES_URI_HOSTS_DENY_LIST("plugins.query.datasources.uri.hosts.denylist"),
4549
DATASOURCES_LIMIT("plugins.query.datasources.limit"),

core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import javax.annotation.Nullable;
2323
import org.apache.calcite.rex.RexInputRef;
24+
import org.apache.calcite.rex.RexLiteral;
2425
import org.apache.calcite.rex.RexNode;
2526
import org.apache.calcite.rex.RexVisitorImpl;
2627
import org.apache.calcite.rex.RexWindowBound;
@@ -353,4 +354,14 @@ public Void visitChildren(Node node, Object context) {
353354
};
354355
node.accept(leafVisitor, null);
355356
}
357+
358+
/**
359+
* Extract the integer value from a RexNode, which is used to extract value from offset and limit
360+
*
361+
* @param r RexNode to extract value from
362+
* @param defaultValue default value could be Integer or null
363+
*/
364+
static Integer intValue(RexNode r, Integer defaultValue) {
365+
return r instanceof RexLiteral ? ((RexLiteral) r).getValueAs(Integer.class) : defaultValue;
366+
}
356367
}

docs/user/admin/settings.rst

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ plugins.query.size_limit
166166
Description
167167
-----------
168168

169-
The new engine fetches a default size of index from OpenSearch set by this setting, the default value equals to max result window in index level (10000 by default). You can change the value to any value not greater than the max result window value in index level (`index.max_result_window`), here is an example::
169+
The size configures the maximum amount of rows to be fetched from query execution results. The default value is: 10000. You can change the value::
170170

171171
>> curl -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings -d '{
172172
"transient" : {
@@ -188,6 +188,51 @@ Result set::
188188
}
189189
}
190190

191+
plugins.query.system_limit
192+
==========================
193+
194+
Description
195+
-----------
196+
197+
The size configures the maximum amount of documents to be pull from OpenSearch for data-intensive operations (e.g. join, lookup). The default value is: 50000.
198+
199+
From v3.0.0, PPL introduces commands that may increase data volume. To prevent out-of-memory problem, the system automatically enforces this ``system limit`` in the pushdown context of the physical index scan operator for such commands.
200+
201+
Version
202+
-------
203+
3.1.0
204+
205+
Example
206+
-------
207+
208+
Change the system_limit to 10000::
209+
210+
sh$ curl -sS -H 'Content-Type: application/json' \
211+
... -X PUT localhost:9200/_plugins/_query/settings \
212+
... -d '{"persistent" : {"plugins.query.system_limit" : "10000"}}'
213+
{
214+
"acknowledged": true,
215+
"persistent": {
216+
"plugins": {
217+
"query": {
218+
"system_limit": "10000"
219+
}
220+
}
221+
},
222+
"transient": {}
223+
}
224+
225+
Rollback to default value::
226+
227+
sh$ curl -sS -H 'Content-Type: application/json' \
228+
... -X PUT localhost:9200/_plugins/_query/settings \
229+
... -d '{"persistent" : {"plugins.query.system_limit" : null}}'
230+
{
231+
"acknowledged": true,
232+
"persistent": {},
233+
"transient": {}
234+
}
235+
191236
plugins.query.memory_limit
192237
==========================
193238

docs/user/ppl/admin/settings.rst

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ plugins.query.size_limit
125125
Description
126126
-----------
127127

128-
The size configures the maximum amount of rows to be fetched from PPL execution results. The default value is: 10000
128+
The size configures the maximum amount of rows to be fetched from query execution results. The default value is: 10000
129129

130130
Example
131131
-------
@@ -159,3 +159,48 @@ Rollback to default value::
159159
}
160160

161161
Note: the legacy settings of ``opendistro.query.size_limit`` is deprecated, it will fallback to the new settings if you request an update with the legacy name.
162+
163+
plugins.query.system_limit
164+
==========================
165+
166+
Description
167+
-----------
168+
169+
The size configures the maximum amount of documents to be pull from OpenSearch for data-intensive operations (e.g. join, lookup). The default value is: 50000.
170+
171+
From v3.0.0, PPL introduces commands that may increase data volume. To prevent out-of-memory problem, the system automatically enforces this ``system limit`` in the pushdown context of the physical index scan operator for such commands.
172+
173+
Version
174+
-------
175+
3.1.0
176+
177+
Example
178+
-------
179+
180+
Change the system_limit to 10000::
181+
182+
sh$ curl -sS -H 'Content-Type: application/json' \
183+
... -X PUT localhost:9200/_plugins/_query/settings \
184+
... -d '{"persistent" : {"plugins.query.system_limit" : "10000"}}'
185+
{
186+
"acknowledged": true,
187+
"persistent": {
188+
"plugins": {
189+
"query": {
190+
"system_limit": "10000"
191+
}
192+
}
193+
},
194+
"transient": {}
195+
}
196+
197+
Rollback to default value::
198+
199+
sh$ curl -sS -H 'Content-Type: application/json' \
200+
... -X PUT localhost:9200/_plugins/_query/settings \
201+
... -d '{"persistent" : {"plugins.query.system_limit" : null}}'
202+
{
203+
"acknowledged": true,
204+
"persistent": {},
205+
"transient": {}
206+
}

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,14 @@
55

66
package org.opensearch.sql.calcite.remote;
77

8+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_HOBBIES;
9+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_OCCUPATION;
10+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STATE_COUNTRY;
11+
import static org.opensearch.sql.util.MatcherUtils.assertJsonEqualsIgnoreId;
12+
813
import java.io.IOException;
914
import org.junit.Ignore;
15+
import org.junit.jupiter.api.Test;
1016
import org.opensearch.sql.ppl.ExplainIT;
1117

1218
public class CalciteExplainIT extends ExplainIT {
@@ -15,6 +21,9 @@ public void init() throws Exception {
1521
super.init();
1622
enableCalcite();
1723
disallowCalciteFallback();
24+
loadIndex(Index.STATE_COUNTRY);
25+
loadIndex(Index.OCCUPATION);
26+
loadIndex(Index.HOBBIES);
1827
}
1928

2029
@Override
@@ -43,6 +52,65 @@ public void testTrendlineWithSortPushDownExplain() throws Exception {
4352
"https://github.com/opensearch-project/sql/issues/3466");
4453
}
4554

55+
@Test
56+
public void testPushDownSystemLimitForJoinExplain() throws Exception {
57+
// the SYSTEM LIMIT should apply to each table of join
58+
String expected = loadFromFile("expectedOutput/calcite/explain_join_push_system_limit.json");
59+
60+
assertJsonEqualsIgnoreId(
61+
expected,
62+
explainQueryToString(
63+
String.format(
64+
"source=%s | inner join left=a, right=b ON a.name = b.name %s | fields "
65+
+ "a.name, a.age, a.state, a.country, b.occupation, b.country, b.salary",
66+
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION)));
67+
}
68+
69+
@Test
70+
public void testPushDownSystemLimitForMultipleJoinExplain() throws Exception {
71+
// the SYSTEM LIMIT should apply to each table of multi-join
72+
String expected =
73+
loadFromFile("expectedOutput/calcite/explain_multi_join_push_system_limit.json");
74+
75+
assertJsonEqualsIgnoreId(
76+
expected,
77+
explainQueryToString(
78+
String.format(
79+
"source=%s | inner join left=a, right=b ON a.name = b.name %s"
80+
+ " | left join left=a, right=b ON a.name = b.name %s",
81+
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION, TEST_INDEX_HOBBIES)));
82+
}
83+
84+
@Test
85+
public void testPushDownSystemLimitForExistsSubqueryExplain() throws Exception {
86+
// the SYSTEM LIMIT should apply to each table of exists subquery
87+
// since exists subquery translates to hash join
88+
String expected =
89+
loadFromFile("expectedOutput/calcite/explain_exists_subquery_push_system_limit.json");
90+
91+
assertJsonEqualsIgnoreId(
92+
expected,
93+
explainQueryToString(
94+
String.format(
95+
"source = %s exists [ source = %s | where name = %s.name ]",
96+
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION, TEST_INDEX_STATE_COUNTRY)));
97+
}
98+
99+
@Test
100+
public void testPushDownSystemLimitForInSubqueryExplain() throws Exception {
101+
// the SYSTEM LIMIT should apply to each table of in subquery
102+
// since in subquery translates to hash join
103+
String expected =
104+
loadFromFile("expectedOutput/calcite/explain_in_subquery_push_system_limit.json");
105+
106+
assertJsonEqualsIgnoreId(
107+
expected,
108+
explainQueryToString(
109+
String.format(
110+
"source = %s | where name in [ source = %s | fields name ]",
111+
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION)));
112+
}
113+
46114
@Override
47115
@Ignore("test only in v2")
48116
public void testExplainModeUnsupportedInV2() throws IOException {}

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteSettingsIT.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,14 @@
55

66
package org.opensearch.sql.calcite.remote;
77

8+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
9+
import static org.opensearch.sql.util.MatcherUtils.rows;
10+
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
11+
import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows;
12+
813
import java.io.IOException;
14+
import org.json.JSONObject;
15+
import org.junit.jupiter.api.Test;
916
import org.opensearch.sql.common.setting.Settings.Key;
1017
import org.opensearch.sql.ppl.SettingsIT;
1118

@@ -30,4 +37,52 @@ public void testQuerySizeLimit_NoPushdown() throws IOException {
3037
}
3138
});
3239
}
40+
41+
@Test
42+
public void testQuerySystemLimit() throws IOException {
43+
// system limit won't impact data-intensive operations
44+
setQuerySystemLimit(2);
45+
JSONObject result =
46+
executeQuery(String.format("search source=%s age>35 | fields firstname", TEST_INDEX_BANK));
47+
verifyDataRows(result, rows("Hattie"), rows("Elinor"), rows("Virginia"));
48+
49+
// for non data-intensive operations, the rows still control by query.size_limit
50+
setQuerySizeLimit(1);
51+
result =
52+
executeQuery(String.format("search source=%s age>35 | fields firstname", TEST_INDEX_BANK));
53+
verifyDataRows(result, rows("Hattie"));
54+
resetQuerySizeLimit();
55+
resetQuerySystemLimit();
56+
}
57+
58+
@Test
59+
public void testQuerySystemLimitWithJoin() throws IOException {
60+
JSONObject result =
61+
executeQuery(
62+
String.format(
63+
"search source=%s age>35 | fields firstname"
64+
+ " | full join left=l right=r on l.firstname=r.firstname"
65+
+ " [ search source=%s | fields firstname ]",
66+
TEST_INDEX_BANK, TEST_INDEX_BANK));
67+
verifyNumOfRows(result, 7);
68+
69+
// system limit will impact data-intensive operations
70+
setQuerySystemLimit(2);
71+
result =
72+
executeQuery(
73+
String.format(
74+
"search source=%s age>35 | fields firstname"
75+
+ " | full join left=l right=r on l.firstname=r.firstname"
76+
+ " [ search source=%s | fields firstname ]",
77+
TEST_INDEX_BANK, TEST_INDEX_BANK));
78+
verifyNumOfRows(result, 3);
79+
80+
// amount of final result should equals to query.size_limit
81+
setQuerySizeLimit(1);
82+
result =
83+
executeQuery(String.format("search source=%s age>35 | fields firstname", TEST_INDEX_BANK));
84+
verifyNumOfRows(result, 1);
85+
resetQuerySizeLimit();
86+
resetQuerySystemLimit();
87+
}
3388
}

integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ private Settings defaultSettings() {
117117
.put(Key.CALCITE_FALLBACK_ALLOWED, false)
118118
.put(Key.CALCITE_PUSHDOWN_ENABLED, false)
119119
.put(Key.CALCITE_PUSHDOWN_ROWCOUNT_ESTIMATION_FACTOR, 0.9)
120+
.put(Key.QUERY_SYSTEM_LIMIT, 5000)
120121
.put(Key.DEFAULT_PATTERN_METHOD, "SIMPLE_PATTERN")
121122
.build();
122123

@@ -144,6 +145,7 @@ protected Settings enablePushdown() {
144145
.put(Key.CALCITE_FALLBACK_ALLOWED, false)
145146
.put(Key.CALCITE_PUSHDOWN_ENABLED, true)
146147
.put(Key.CALCITE_PUSHDOWN_ROWCOUNT_ESTIMATION_FACTOR, 0.9)
148+
.put(Key.QUERY_SYSTEM_LIMIT, 5000)
147149
.put(Key.DEFAULT_PATTERN_METHOD, "SIMPLE_PATTERN")
148150
.build();
149151

integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ public abstract class SQLIntegTestCase extends OpenSearchSQLRestTestCase {
9292
Integer.parseInt(System.getProperty("defaultQuerySizeLimit", "200"));
9393
public static final Integer DEFAULT_MAX_RESULT_WINDOW =
9494
Integer.parseInt(System.getProperty("defaultMaxResultWindow", "10000"));
95+
public static final Integer DEFAULT_QUERY_SYSTEM_LIMIT =
96+
Integer.parseInt(System.getProperty("defaultMaxResultWindow", "50000"));
9597

9698
public boolean shouldResetQuerySizeLimit() {
9799
return true;
@@ -187,6 +189,20 @@ protected void resetQuerySizeLimit() throws IOException {
187189
DEFAULT_QUERY_SIZE_LIMIT.toString()));
188190
}
189191

192+
protected void setQuerySystemLimit(Integer limit) throws IOException {
193+
updateClusterSettings(
194+
new ClusterSetting(
195+
"transient", Settings.Key.QUERY_SYSTEM_LIMIT.getKeyValue(), limit.toString()));
196+
}
197+
198+
protected void resetQuerySystemLimit() throws IOException {
199+
updateClusterSettings(
200+
new ClusterSetting(
201+
"transient",
202+
Settings.Key.QUERY_SYSTEM_LIMIT.getKeyValue(),
203+
DEFAULT_QUERY_SYSTEM_LIMIT.toString()));
204+
}
205+
190206
@SneakyThrows
191207
protected void setDataSourcesEnabled(String clusterSettingType, boolean value) {
192208
updateClusterSettings(

integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ public void init() throws Exception {
2525
loadIndex(Index.ACCOUNT);
2626
}
2727

28+
protected String loadFromFile(String filename) throws Exception {
29+
URI uri = Resources.getResource(filename).toURI();
30+
return new String(Files.readAllBytes(Paths.get(uri)));
31+
}
32+
2833
@Test
2934
public void testExplain() throws Exception {
3035
String expected =
@@ -247,11 +252,6 @@ public void testTrendlineWithSortPushDownExplain() throws Exception {
247252
+ "| fields ageTrend"));
248253
}
249254

250-
String loadFromFile(String filename) throws Exception {
251-
URI uri = Resources.getResource(filename).toURI();
252-
return new String(Files.readAllBytes(Paths.get(uri)));
253-
}
254-
255255
@Test
256256
public void testExplainModeUnsupportedInV2() throws IOException {
257257
try {

integ-test/src/test/java/org/opensearch/sql/ppl/SettingsIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public void testQuerySizeLimit() throws IOException {
3333
result =
3434
executeQuery(String.format("search source=%s age>35 | fields firstname", TEST_INDEX_BANK));
3535
verifyDataRows(result, rows("Hattie"));
36+
resetQuerySizeLimit();
3637
}
3738

3839
@Test
@@ -62,5 +63,6 @@ public void testQuerySizeLimit_NoPushdown() throws IOException {
6263
"search source=%s | eval a = 1 | where age>35 | fields firstname",
6364
TEST_INDEX_BANK));
6465
verifyDataRows(result, rows("Hattie"));
66+
resetQuerySizeLimit();
6567
}
6668
}

0 commit comments

Comments
 (0)