Skip to content

Commit 5c5ffa2

Browse files
authored
Version bump to OpenSearch 3.7 with async QueryPlanExecutor (#5396)
* Version bump to OpenSearch 3.7 (Jackson 2 → 3 parser API + _source excludes serialization) (#5361) Signed-off-by: Kai Huang <ahkcs@amazon.com> * Update vendored analytics JARs from 3.6 to 3.7 Replace libs/ JARs with 3.7.0-SNAPSHOT versions built from the OpenSearch sandbox. Update build files to reference 3.7 file names. Keep files() references instead of Maven coordinates so CI can resolve dependencies without publishToMavenLocal. Signed-off-by: Ahmed Khatib <ahkcs@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Async QueryPlanExecutor + drop stub and stub-only ITs The analytics-framework 3.7 QueryPlanExecutor interface is async (void execute(plan, ctx, ActionListener<Stream>)). Update AnalyticsExecutionEngine to dispatch via the listener instead of the old synchronous return-value form, and rework the unit test mocks likewise. Drop StubQueryPlanExecutor and its callers (TransportPPLQueryAction, SQLPlugin, RestUnifiedQueryActionTest); the SQL plugin now expects the real analytics-engine executor to be supplied via Guice cross-plugin injection (a small AnalyticsExecutorHolder bridges the binding to the SQLPlugin REST handler that runs before Node injection). Drop AnalyticsPPLIT and AnalyticsExplainIT (and their expectedOutput/analytics fixtures): these tested stub canned data which no longer exists. Signed-off-by: Kai Huang <ahkcs@amazon.com> --------- Signed-off-by: Kai Huang <ahkcs@amazon.com> Signed-off-by: Ahmed Khatib <ahkcs@amazon.com>
1 parent 5fa54c3 commit 5c5ffa2

346 files changed

Lines changed: 656 additions & 1256 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

async-query/src/test/java/org/opensearch/sql/spark/transport/format/CreateAsyncQueryRequestConverterTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ public void fromXContentWithDuplicateFields() throws IOException {
5454
Assertions.assertTrue(
5555
illegalArgumentException
5656
.getMessage()
57-
.contains("Error while parsing the request body: Duplicate field 'datasource'"));
57+
.contains(
58+
"Error while parsing the request body: Duplicate Object property \"datasource\""));
5859
}
5960

6061
@Test

build.gradle

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
buildscript {
88
ext {
9-
opensearch_version = System.getProperty("opensearch.version", "3.6.0-SNAPSHOT")
9+
opensearch_version = System.getProperty("opensearch.version", "3.7.0-SNAPSHOT")
1010
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
1111
buildVersionQualifier = System.getProperty("build.version_qualifier", "")
1212
version_tokens = opensearch_version.tokenize('-')
@@ -127,6 +127,11 @@ allprojects {
127127
version += "-SNAPSHOT"
128128
}
129129

130+
// Path to the analytics-engine plugin ZIP. Override with
131+
// `-PanalyticsEngineZip=/path/to/zip` if needed.
132+
ext.analyticsEngineZip = project.findProperty('analyticsEngineZip') ?:
133+
"${rootDir}/libs/analytics-engine-3.7.0-SNAPSHOT.zip"
134+
130135
plugins.withId('java') {
131136
java {
132137
sourceCompatibility = JavaVersion.VERSION_21

core/build.gradle

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ plugins {
3333
}
3434

3535
repositories {
36+
mavenLocal()
3637
mavenCentral()
3738
}
3839

@@ -63,8 +64,12 @@ dependencies {
6364
}
6465
api 'org.apache.calcite:calcite-linq4j:1.41.0'
6566
api project(':common')
66-
compileOnly files("${rootDir}/libs/analytics-framework-3.6.0-SNAPSHOT.jar")
67-
testImplementation files("${rootDir}/libs/analytics-framework-3.6.0-SNAPSHOT.jar")
67+
compileOnly files("${rootDir}/libs/analytics-framework-3.7.0-SNAPSHOT.jar")
68+
// Needed because the analytics-framework's QueryPlanExecutor signature uses
69+
// org.opensearch.core.action.ActionListener; AnalyticsExecutionEngine references that type.
70+
compileOnly group: 'org.opensearch', name: 'opensearch-core', version: "${opensearch_version}"
71+
testImplementation files("${rootDir}/libs/analytics-framework-3.7.0-SNAPSHOT.jar")
72+
testImplementation group: 'org.opensearch', name: 'opensearch-core', version: "${opensearch_version}"
6873
implementation "com.github.seancfoley:ipaddress:5.4.2"
6974
implementation "com.jayway.jsonpath:json-path:2.9.0"
7075

core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.apache.calcite.rel.type.RelDataType;
1515
import org.apache.calcite.rel.type.RelDataTypeField;
1616
import org.opensearch.analytics.exec.QueryPlanExecutor;
17+
import org.opensearch.core.action.ActionListener;
1718
import org.opensearch.sql.ast.statement.ExplainMode;
1819
import org.opensearch.sql.calcite.CalcitePlanContext;
1920
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
@@ -70,25 +71,35 @@ public void explain(PhysicalPlan plan, ResponseListener<ExplainResponse> listene
7071
@Override
7172
public void execute(
7273
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {
73-
try {
74-
// Record EXECUTE metric before calling listener, because the listener's onResponse
75-
// triggers SimpleJsonResponseFormatter which calls QueryProfiling.finish() to snapshot
76-
// all metrics. The metric must be written before that snapshot.
77-
ProfileMetric execMetric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE);
78-
long execStart = System.nanoTime();
79-
80-
Iterable<Object[]> rows = planExecutor.execute(plan, null);
81-
82-
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
83-
List<ExprValue> results = convertRows(rows, fields);
84-
Schema schema = buildSchema(fields);
85-
86-
execMetric.set(System.nanoTime() - execStart);
87-
88-
listener.onResponse(new QueryResponse(schema, results, Cursor.None));
89-
} catch (Exception e) {
90-
listener.onFailure(e);
91-
}
74+
// QueryPlanExecutor became asynchronous in analytics-framework 3.7 — execution is dispatched
75+
// to a worker pool and results arrive on the listener. Record the execute metric in the
76+
// listener callback, before delegating to the user-supplied listener, so the metric snapshot
77+
// taken by SimpleJsonResponseFormatter sees the correct value.
78+
ProfileMetric execMetric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE);
79+
long execStart = System.nanoTime();
80+
81+
planExecutor.execute(
82+
plan,
83+
null,
84+
new ActionListener<>() {
85+
@Override
86+
public void onResponse(Iterable<Object[]> rows) {
87+
try {
88+
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
89+
List<ExprValue> results = convertRows(rows, fields);
90+
Schema schema = buildSchema(fields);
91+
execMetric.set(System.nanoTime() - execStart);
92+
listener.onResponse(new QueryResponse(schema, results, Cursor.None));
93+
} catch (Exception e) {
94+
listener.onFailure(e);
95+
}
96+
}
97+
98+
@Override
99+
public void onFailure(Exception e) {
100+
listener.onFailure(e);
101+
}
102+
});
92103
}
93104

94105
@Override

core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
import static org.junit.jupiter.api.Assertions.assertEquals;
99
import static org.junit.jupiter.api.Assertions.assertNotNull;
1010
import static org.junit.jupiter.api.Assertions.assertTrue;
11+
import static org.mockito.ArgumentMatchers.any;
12+
import static org.mockito.ArgumentMatchers.eq;
13+
import static org.mockito.Mockito.doAnswer;
1114
import static org.mockito.Mockito.mock;
1215
import static org.mockito.Mockito.when;
1316

@@ -25,6 +28,7 @@
2528
import org.junit.jupiter.api.BeforeEach;
2629
import org.junit.jupiter.api.Test;
2730
import org.opensearch.analytics.exec.QueryPlanExecutor;
31+
import org.opensearch.core.action.ActionListener;
2832
import org.opensearch.sql.calcite.CalcitePlanContext;
2933
import org.opensearch.sql.calcite.SysLimit;
3034
import org.opensearch.sql.common.response.ResponseListener;
@@ -57,11 +61,34 @@ private static void setSysLimit(CalcitePlanContext context, SysLimit sysLimit) t
5761
field.set(context, sysLimit);
5862
}
5963

64+
/** QueryPlanExecutor became async in analytics-framework 3.7 — stub the listener callback. */
65+
@SuppressWarnings("unchecked")
66+
private void stubExecutorWith(RelNode relNode, Iterable<Object[]> rows) {
67+
doAnswer(
68+
inv -> {
69+
((ActionListener<Iterable<Object[]>>) inv.getArgument(2)).onResponse(rows);
70+
return null;
71+
})
72+
.when(mockExecutor)
73+
.execute(eq(relNode), any(), any(ActionListener.class));
74+
}
75+
76+
@SuppressWarnings("unchecked")
77+
private void stubExecutorWithError(RelNode relNode, Exception error) {
78+
doAnswer(
79+
inv -> {
80+
((ActionListener<Iterable<Object[]>>) inv.getArgument(2)).onFailure(error);
81+
return null;
82+
})
83+
.when(mockExecutor)
84+
.execute(eq(relNode), any(), any(ActionListener.class));
85+
}
86+
6087
@Test
6188
void executeRelNode_basicTypesAndRows() {
6289
RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR, "age", SqlTypeName.INTEGER);
6390
Iterable<Object[]> rows = Arrays.asList(new Object[] {"Alice", 30}, new Object[] {"Bob", 25});
64-
when(mockExecutor.execute(relNode, null)).thenReturn(rows);
91+
stubExecutorWith(relNode, rows);
6592

6693
QueryResponse response = executeAndCapture(relNode);
6794
String dump = dumpResponse(response);
@@ -101,7 +128,7 @@ void executeRelNode_numericTypes() {
101128
"d", SqlTypeName.DOUBLE);
102129
Iterable<Object[]> rows =
103130
Collections.singletonList(new Object[] {(byte) 1, (short) 2, 3, 4L, 5.0f, 6.0});
104-
when(mockExecutor.execute(relNode, null)).thenReturn(rows);
131+
stubExecutorWith(relNode, rows);
105132

106133
QueryResponse response = executeAndCapture(relNode);
107134
String dump = dumpResponse(response);
@@ -138,7 +165,7 @@ void executeRelNode_temporalTypes() {
138165
RelNode relNode =
139166
mockRelNode("dt", SqlTypeName.DATE, "tm", SqlTypeName.TIME, "ts", SqlTypeName.TIMESTAMP);
140167
Iterable<Object[]> emptyRows = Collections.emptyList();
141-
when(mockExecutor.execute(relNode, null)).thenReturn(emptyRows);
168+
stubExecutorWith(relNode, emptyRows);
142169

143170
QueryResponse response = executeAndCapture(relNode);
144171
String dump = dumpResponse(response);
@@ -157,7 +184,7 @@ void executeRelNode_temporalTypes() {
157184
void executeRelNode_emptyResults() {
158185
RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR);
159186
Iterable<Object[]> emptyRows = Collections.emptyList();
160-
when(mockExecutor.execute(relNode, null)).thenReturn(emptyRows);
187+
stubExecutorWith(relNode, emptyRows);
161188

162189
QueryResponse response = executeAndCapture(relNode);
163190
String dump = dumpResponse(response);
@@ -170,7 +197,7 @@ void executeRelNode_emptyResults() {
170197
void executeRelNode_nullValues() {
171198
RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR, "age", SqlTypeName.INTEGER);
172199
Iterable<Object[]> rows = Collections.singletonList(new Object[] {null, null});
173-
when(mockExecutor.execute(relNode, null)).thenReturn(rows);
200+
stubExecutorWith(relNode, rows);
174201

175202
QueryResponse response = executeAndCapture(relNode);
176203
String dump = dumpResponse(response);
@@ -187,7 +214,7 @@ void executeRelNode_nullValues() {
187214
@Test
188215
void executeRelNode_errorPropagation() {
189216
RelNode relNode = mockRelNode("id", SqlTypeName.INTEGER);
190-
when(mockExecutor.execute(relNode, null)).thenThrow(new RuntimeException("Engine failure"));
217+
stubExecutorWithError(relNode, new RuntimeException("Engine failure"));
191218

192219
Exception error = executeAndCaptureError(relNode);
193220
System.out.println(dumpError("executeRelNode_errorPropagation", error));

direct-query/src/test/java/org/opensearch/sql/directquery/rest/RestDirectQueryManagementActionTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,16 @@ public void testWhenDataSourcesAreEnabled() {
103103
"{\"query\":\"up\",\"language\":\"promql\",\"options\":{\"queryType\":\"instant\",\"time\":\"1609459200\"}}";
104104
when(request.contentParser())
105105
.thenReturn(
106-
new org.opensearch.common.xcontent.json.JsonXContentParser(
106+
org.opensearch.common.xcontent.json.JsonXContent.jsonXContent.createParser(
107107
org.opensearch.core.xcontent.NamedXContentRegistry.EMPTY,
108108
org.opensearch.core.xcontent.DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
109-
new com.fasterxml.jackson.core.JsonFactory().createParser(requestContent)));
109+
requestContent));
110110
when(request.contentParser())
111111
.thenReturn(
112-
new org.opensearch.common.xcontent.json.JsonXContentParser(
112+
org.opensearch.common.xcontent.json.JsonXContent.jsonXContent.createParser(
113113
org.opensearch.core.xcontent.NamedXContentRegistry.EMPTY,
114114
org.opensearch.core.xcontent.DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
115-
new com.fasterxml.jackson.core.JsonFactory().createParser(requestContent)));
115+
requestContent));
116116

117117
unit.handleRequest(request, channel, nodeClient);
118118
verify(threadPool, Mockito.times(1))
@@ -389,10 +389,10 @@ private ActionListener makeRequest(String requestContent) {
389389
Mockito.when(request.param("dataSources")).thenReturn("testDataSource");
390390
Mockito.when(request.contentParser())
391391
.thenReturn(
392-
new org.opensearch.common.xcontent.json.JsonXContentParser(
392+
org.opensearch.common.xcontent.json.JsonXContent.jsonXContent.createParser(
393393
org.opensearch.core.xcontent.NamedXContentRegistry.EMPTY,
394394
org.opensearch.core.xcontent.DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
395-
new com.fasterxml.jackson.core.JsonFactory().createParser(requestContent)));
395+
requestContent));
396396
Mockito.when(request.consumedParams()).thenReturn(java.util.Collections.emptyList());
397397
Mockito.when(request.params()).thenReturn(java.util.Collections.emptyMap());
398398

docs/dev/opensearch-nested-field-subquery.md

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,7 @@ GET /employee_nested/_search
5353
"_source": {
5454
"includes": [
5555
"name"
56-
],
57-
"excludes": []
56+
]
5857
}
5958
}
6059
@@ -110,8 +109,7 @@ WHERE EXISTS(SELECT *
110109
"_source": {
111110
"includes": [
112111
"name"
113-
],
114-
"excludes": []
112+
]
115113
}
116114
}
117115

docs/dev/sql-nested-function-select-clause.md

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,7 @@ A basic nested function in the SELECT clause and output DSL pushed to OpenSearch
106106
"_source": {
107107
"includes": [
108108
"message.info"
109-
],
110-
"excludes": []
109+
]
111110
}
112111
}
113112
}
@@ -147,8 +146,7 @@ Example with multiple SELECT clause function calls sharing same path. These two
147146
"includes": [
148147
"message.info",
149148
"message.author"
150-
],
151-
"excludes": []
149+
]
152150
}
153151
}
154152
}
@@ -187,8 +185,7 @@ An example with multiple nested function calls in the SELECT clause having diffe
187185
"_source": {
188186
"includes": [
189187
"comment.data"
190-
],
191-
"excludes": []
188+
]
192189
}
193190
}
194191
}
@@ -207,8 +204,7 @@ An example with multiple nested function calls in the SELECT clause having diffe
207204
"_source": {
208205
"includes": [
209206
"message.info"
210-
],
211-
"excludes": []
207+
]
212208
}
213209
}
214210
}

docs/user/beyond/fulltext.rst

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,7 @@ Explain::
8080
"includes" : [
8181
"account_number",
8282
"address"
83-
],
84-
"excludes" : [ ]
83+
]
8584
}
8685
}
8786

@@ -150,8 +149,7 @@ Explain::
150149
"includes" : [
151150
"account_number",
152151
"address"
153-
],
154-
"excludes" : [ ]
152+
]
155153
}
156154
}
157155

@@ -230,8 +228,7 @@ Explain::
230228
"includes" : [
231229
"firstname",
232230
"lastname"
233-
],
234-
"excludes" : [ ]
231+
]
235232
}
236233
}
237234

@@ -311,8 +308,7 @@ Explain::
311308
"includes" : [
312309
"account_number",
313310
"address"
314-
],
315-
"excludes" : [ ]
311+
]
316312
}
317313
}
318314

@@ -386,8 +382,7 @@ Explain::
386382
"includes" : [
387383
"account_number",
388384
"address"
389-
],
390-
"excludes" : [ ]
385+
]
391386
}
392387
}
393388

@@ -492,8 +487,7 @@ Explain::
492487
"account_number",
493488
"address",
494489
"_score"
495-
],
496-
"excludes" : [ ]
490+
]
497491
},
498492
"sort" : [
499493
{

docs/user/beyond/partiql.rst

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -286,8 +286,7 @@ Explain::
286286
"_source" : {
287287
"includes" : [
288288
"projects.name"
289-
],
290-
"excludes" : [ ]
289+
]
291290
}
292291
}
293292
}
@@ -305,8 +304,7 @@ Explain::
305304
"_source" : {
306305
"includes" : [
307306
"name"
308-
],
309-
"excludes" : [ ]
307+
]
310308
}
311309
}
312310

@@ -423,8 +421,7 @@ Explain::
423421
"_source" : {
424422
"includes" : [
425423
"name"
426-
],
427-
"excludes" : [ ]
424+
]
428425
}
429426
}
430427

0 commit comments

Comments
 (0)