Skip to content

Commit 435a124

Browse files
authored
Support ResourceMonitor with Calcite (opensearch-project#3738)
* Support ResourceMonitor with Calcite Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix IT Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix UT related to decimal literal Signed-off-by: Lantao Jin <ltjin@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent f1e3b5b commit 435a124

8 files changed

Lines changed: 54 additions & 11 deletions

File tree

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.exception;
7+
8+
/** Non-fallback to v2 exception for Calcite. */
9+
public class NonFallbackCalciteException extends QueryEngineException {
10+
11+
public NonFallbackCalciteException(String message) {
12+
super(message);
13+
}
14+
}

core/src/main/java/org/opensearch/sql/executor/QueryService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.opensearch.sql.common.setting.Settings.Key;
4242
import org.opensearch.sql.datasource.DataSourceService;
4343
import org.opensearch.sql.exception.CalciteUnsupportedException;
44+
import org.opensearch.sql.exception.NonFallbackCalciteException;
4445
import org.opensearch.sql.planner.PlanContext;
4546
import org.opensearch.sql.planner.Planner;
4647
import org.opensearch.sql.planner.logical.LogicalPaginate;
@@ -107,7 +108,7 @@ public void executeWithCalcite(
107108
return null;
108109
});
109110
} catch (Throwable t) {
110-
if (isCalciteFallbackAllowed()) {
111+
if (isCalciteFallbackAllowed() && !(t instanceof NonFallbackCalciteException)) {
111112
log.warn("Fallback to V2 query engine since got exception", t);
112113
executeWithLegacy(plan, queryType, listener, Optional.of(t));
113114
} else {

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteResourceMonitorIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@
55

66
package org.opensearch.sql.calcite.remote;
77

8-
import org.junit.Ignore;
98
import org.opensearch.sql.ppl.ResourceMonitorIT;
109

11-
@Ignore("https://github.com/opensearch-project/sql/issues/3454")
1210
public class CalciteResourceMonitorIT extends ResourceMonitorIT {
1311
@Override
1412
public void init() throws Exception {

opensearch/src/main/java/org/opensearch/sql/opensearch/monitor/OpenSearchResourceMonitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ public OpenSearchResourceMonitor(Settings settings, OpenSearchMemoryHealthy memo
4848
public boolean isHealthy() {
4949
try {
5050
ByteSizeValue limit = settings.getSettingValue(Settings.Key.QUERY_MEMORY_LIMIT);
51+
if (limit == null) {
52+
// undefined, be always healthy, this is useful in Calcite standalone ITs
53+
// since AlwaysHealthyMonitor is not work within Calcite tests.
54+
return true;
55+
}
5156
Supplier<Boolean> booleanSupplier =
5257
Retry.decorateSupplier(retry, () -> memoryMonitor.isMemoryHealthy(limit.getBytes()));
5358
return booleanSupplier.get();

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.opensearch.sql.opensearch.client.OpenSearchClient;
2626
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType;
2727
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
28+
import org.opensearch.sql.opensearch.monitor.OpenSearchMemoryHealthy;
29+
import org.opensearch.sql.opensearch.monitor.OpenSearchResourceMonitor;
2830
import org.opensearch.sql.opensearch.planner.physical.ADOperator;
2931
import org.opensearch.sql.opensearch.planner.physical.MLCommonsOperator;
3032
import org.opensearch.sql.opensearch.planner.physical.MLOperator;
@@ -261,6 +263,10 @@ public OpenSearchRequestBuilder createRequestBuilder() {
261263
return new OpenSearchRequestBuilder(createExprValueFactory(), settings);
262264
}
263265

266+
public OpenSearchResourceMonitor createOpenSearchResourceMonitor() {
267+
return new OpenSearchResourceMonitor(getSettings(), new OpenSearchMemoryHealthy());
268+
}
269+
264270
public OpenSearchRequest buildRequest(OpenSearchRequestBuilder requestBuilder) {
265271
final TimeValue cursorKeepAlive = settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE);
266272
return requestBuilder.build(

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ public Enumerator<Object> enumerator() {
101101
osIndex.getClient(),
102102
getFieldPath(),
103103
requestBuilder.getMaxResponseSize(),
104-
osIndex.buildRequest(requestBuilder));
104+
osIndex.buildRequest(requestBuilder),
105+
osIndex.createOpenSearchResourceMonitor());
105106
}
106107
};
107108
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import org.apache.calcite.linq4j.Enumerator;
1414
import org.opensearch.sql.data.model.ExprValue;
1515
import org.opensearch.sql.data.model.ExprValueUtils;
16+
import org.opensearch.sql.exception.NonFallbackCalciteException;
17+
import org.opensearch.sql.monitor.ResourceMonitor;
1618
import org.opensearch.sql.opensearch.client.OpenSearchClient;
1719
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
1820
import org.opensearch.sql.opensearch.response.OpenSearchResponse;
@@ -37,6 +39,12 @@ public class OpenSearchIndexEnumerator implements Enumerator<Object> {
3739
/** Largest number of rows allowed in the response. */
3840
@EqualsAndHashCode.Include @ToString.Include private final int maxResponseSize;
3941

42+
/** How many moveNext() calls to perform resource check once. */
43+
private static final long NUMBER_OF_NEXT_CALL_TO_CHECK = 1000;
44+
45+
/** ResourceMonitor. */
46+
private final ResourceMonitor monitor;
47+
4048
/** Number of rows returned. */
4149
private Integer queryCount;
4250

@@ -49,13 +57,18 @@ public OpenSearchIndexEnumerator(
4957
OpenSearchClient client,
5058
List<String> fields,
5159
int maxResponseSize,
52-
OpenSearchRequest request) {
60+
OpenSearchRequest request,
61+
ResourceMonitor monitor) {
5362
this.client = client;
5463
this.fields = fields;
5564
this.request = request;
5665
this.maxResponseSize = maxResponseSize;
66+
this.monitor = monitor;
5767
this.queryCount = 0;
5868
this.current = null;
69+
if (!this.monitor.isHealthy()) {
70+
throw new NonFallbackCalciteException("insufficient resources to run the query, quit.");
71+
}
5972
}
6073

6174
private void fetchNextBatch() {
@@ -88,6 +101,11 @@ public boolean moveNext() {
88101
return false;
89102
}
90103

104+
boolean shouldCheck = (queryCount % NUMBER_OF_NEXT_CALL_TO_CHECK == 0);
105+
if (shouldCheck && !this.monitor.isHealthy()) {
106+
throw new NonFallbackCalciteException("insufficient resources to load next row, quit.");
107+
}
108+
91109
if (iterator == null || !iterator.hasNext()) {
92110
fetchNextBatch();
93111
}

ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendcolTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,9 @@ public void testAppendcolStats() {
145145
verifyLogical(root, expectedLogical);
146146
String expectedResult =
147147
""
148-
+ "count()=5; DEPTNO=20; avg(SAL)=2175.00\n"
149-
+ "count()=3; DEPTNO=10; avg(SAL)=2916.66\n"
150-
+ "count()=6; DEPTNO=30; avg(SAL)=1566.66\n";
148+
+ "count()=5; DEPTNO=20; avg(SAL)=2175.\n"
149+
+ "count()=3; DEPTNO=10; avg(SAL)=2916.666666\n"
150+
+ "count()=6; DEPTNO=30; avg(SAL)=1566.666666\n";
151151
verifyResult(root, expectedResult);
152152

153153
String expectedSparkSql =
@@ -185,9 +185,9 @@ public void testAppendcolStatsOverride() {
185185
verifyLogical(root, expectedLogical);
186186
String expectedResult =
187187
""
188-
+ "count()=5; DEPTNO=20; avg(SAL)=2175.00\n"
189-
+ "count()=3; DEPTNO=10; avg(SAL)=2916.66\n"
190-
+ "count()=6; DEPTNO=30; avg(SAL)=1566.66\n";
188+
+ "count()=5; DEPTNO=20; avg(SAL)=2175.\n"
189+
+ "count()=3; DEPTNO=10; avg(SAL)=2916.666666\n"
190+
+ "count()=6; DEPTNO=30; avg(SAL)=1566.666666\n";
191191
verifyResult(root, expectedResult);
192192

193193
String expectedSparkSql =

0 commit comments

Comments
 (0)