Skip to content

Commit 24dd79c

Browse files
authored
Wire analytics-engine as extendedPlugins dependency (#5302)
* [Mustang] Wire analytics-engine as extendedPlugins dependency (#5247) Step 1: Plugin wiring and dependency integration. - Add analytics-engine as extendedPlugins in plugin/build.gradle - Vendor analytics-framework JAR (interfaces) and analytics-engine ZIP (plugin) built from OpenSearch sandbox/3.6 branch - Delete local QueryPlanExecutor interface, use upstream org.opensearch.analytics.exec.QueryPlanExecutor from JAR - Replace StubSchemaProvider with OpenSearchSchemaBuilder which reads real index mappings from ClusterState - Delete StubSchemaProvider (no longer needed) - Exclude shared JARs (Calcite, Guava, commons-*, etc.) from SQL plugin bundle to avoid jar hell with analytics-engine classloader - Load analytics-engine plugin in integTest and remoteCluster test clusters before opensearch-sql-plugin - Create parquet_logs and parquet_metrics indices in ITs so OpenSearchSchemaBuilder can resolve the schema - Update explain expected files for alphabetical field ordering Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Add analytics-engine plugin to all test clusters Every test cluster that loads opensearch-sql-plugin needs the analytics-engine plugin because SQL declares it as extendedPlugins. Added to yamlRestTest, integTestWithSecurity, remoteIntegTestWithSecurity, and integJdbcTest clusters. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Add analytics-engine plugin to doctest test cluster Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Add commons-text to analytics-engine ZIP and fix isAnalyticsIndex NPE commons-text is needed by Calcite (parent classloader) for fuzzy matching but was only in the SQL plugin (child classloader). Also use lightweight parsing context in isAnalyticsIndex to avoid requiring cluster state. Signed-off-by: Kai Huang <ahkcs@amazon.com> * Fix Janino classloader issue when analytics-engine is parent plugin Calcite's EnumerableInterpretable.getBindable() hardcodes EnumerableInterpretable.class.getClassLoader() for Janino compilation. When analytics-engine is the parent classloader via extendedPlugins, this returns the parent classloader which cannot see SQL plugin classes, causing CompileException for any Enumerable code generation. Override implement() in OpenSearchCalcitePreparingStmt to use our own compileWithPluginClassLoader() which does the same code generation but uses CalciteToolsHelper.class.getClassLoader() (SQL plugin's child classloader) so Janino can resolve both parent and child classes. Signed-off-by: Kai Huang <ahkcs@amazon.com> * Reverse classloader: make SQL plugin parent of analytics-engine The previous approach (analytics-engine as parent) caused Janino classloader issues in 4 Calcite code paths. Reversing the relationship makes SQL plugin the parent, so Janino uses the SQL plugin classloader which can see both Calcite and SQL plugin classes. Changes: - Remove extendedPlugins=['analytics-engine'] from SQL plugin - Add ExtensiblePlugin interface to SQLPlugin - Rebuild analytics-engine ZIP with extended.plugins=opensearch-sql and without Calcite JARs (inherited from parent) - Move OpenSearchSchemaBuilder to analytics-framework JAR - Change analytics-framework from compileOnly to api in core - Reverse test cluster plugin load order (SQL first) - Revert all Janino classloader fixes (no longer needed) - Add classloader architecture options doc Signed-off-by: Kai Huang <ahkcs@amazon.com> * Remove classloader options doc from PR Signed-off-by: Kai Huang <ahkcs@amazon.com> * Remove unnecessary Janino classloader workarounds With SQL plugin as parent (Option C), EnumerableInterpretable and RexExecutable use the SQL plugin classloader which sees all classes. The implementEnumerable(), compileWithPluginClassLoader(), and PluginClassLoaderRexExecutor workarounds are no longer needed. Signed-off-by: Kai Huang <ahkcs@amazon.com> * Revert "Remove unnecessary Janino classloader workarounds" This reverts commit 23e0d13. Signed-off-by: Kai Huang <ahkcs@amazon.com> * Revert "Remove classloader options doc from PR" This reverts commit 0ad18a8. Signed-off-by: Kai Huang <ahkcs@amazon.com> * Revert "Reverse classloader: make SQL plugin parent of analytics-engine" This reverts commit be27381. Signed-off-by: Kai Huang <ahkcs@amazon.com> * Revert "Fix Janino classloader issue when analytics-engine is parent plugin" This reverts commit 2c4c401. Signed-off-by: Kai Huang <ahkcs@amazon.com> * Patch Calcite CALCITE-3745: use thread context classloader for Janino Calcite hardcodes EnumerableInterpretable.class.getClassLoader() and RexExecutable.class.getClassLoader() for Janino compilation. When analytics-engine is the parent classloader, this returns the parent which cannot see SQL plugin classes, causing CompileException. Patch calcite-core to prefer Thread.currentThread().getContextClassLoader() (CALCITE-3745), and set the context classloader to the SQL plugin's classloader before Calcite execution in two places: - CalciteToolsHelper.OpenSearchRelRunners.run() - CalciteScriptEngine.compile() This keeps analytics-engine as the natural parent while fixing all Janino classloader issues with a minimal, well-understood patch. Signed-off-by: Kai Huang <ahkcs@amazon.com> * Fix CalciteExplainIT boolean string literal explain plan difference With the patched Calcite classloader, SAFE_CAST('TRUE') is no longer silently folded to a boolean literal in the logical plan. Split the expected YAML so boolean literal tests (male = true) and string literal tests (male = 'TRUE') have separate expected outputs. The physical plan (term query pushdown) is identical in both cases. Signed-off-by: Kai Huang <ahkcs@amazon.com> * Add setContextClassLoader wrapper to QueryService Calcite paths The analyze() and convertToCalcitePlan() steps in executeWithCalcite() and explainWithCalcite() trigger RexSimplify which uses Janino for constant folding. Without the context classloader set, UDF expressions like SAFE_CAST('TRUE') can't be folded, causing explain plan differences vs main branch. Adding the wrapper here covers the entire Calcite lifecycle for both execute and explain paths. Revert the separate boolean string literal YAML since all three boolean tests now produce the same plan (SAFE_CAST folds correctly). Signed-off-by: Kai Huang <ahkcs@amazon.com> * Refactor context classloader wrapping into CalciteClassLoaderHelper Extract the CALCITE-3745 context classloader fix into a dedicated CalciteClassLoaderHelper utility class. Remove redundant wrapper from OpenSearchRelRunners.run() (already covered by QueryService callers). Three call sites remain: - QueryService.executeWithCalcite() — covers planning + execution - QueryService.explainWithCalcite() — covers planning + explain - CalciteScriptEngine.compile() — covers pushdown scripts (shard thread) Signed-off-by: Kai Huang <ahkcs@amazon.com> * Revert CalciteToolsHelper comment to sync with main Signed-off-by: Kai Huang <ahkcs@amazon.com> --------- Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 151cffc commit 24dd79c

25 files changed

Lines changed: 253 additions & 142 deletions

File tree

core/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ dependencies {
6363
}
6464
api 'org.apache.calcite:calcite-linq4j:1.41.0'
6565
api project(':common')
66+
compileOnly files("${rootDir}/libs/analytics-framework-3.6.0-SNAPSHOT.jar")
67+
testImplementation files("${rootDir}/libs/analytics-framework-3.6.0-SNAPSHOT.jar")
6668
implementation "com.github.seancfoley:ipaddress:5.4.2"
6769
implementation "com.jayway.jsonpath:json-path:2.9.0"
6870

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.utils;
7+
8+
import java.util.concurrent.Callable;
9+
10+
/**
11+
* Helper for setting the thread context classloader before Calcite operations. This is needed for
12+
* patched Calcite (CALCITE-3745): when analytics-engine is the parent classloader, Janino uses the
13+
* parent's classloader which can't see SQL plugin classes. The patched Calcite checks {@code
14+
* Thread.currentThread().getContextClassLoader()} first. This helper sets it to the SQL plugin's
15+
* classloader (child) which can see both parent and child classes.
16+
*
17+
* @see <a href="https://issues.apache.org/jira/browse/CALCITE-3745">CALCITE-3745</a>
18+
* @see <a href="https://github.com/opensearch-project/sql/issues/5306">sql#5306</a>
19+
*/
20+
public final class CalciteClassLoaderHelper {
21+
22+
private CalciteClassLoaderHelper() {}
23+
24+
/**
25+
* Run an action with the thread context classloader set to the caller's classloader.
26+
*
27+
* @param action the action to run
28+
* @param callerClass the class whose classloader should be used (pass {@code MyClass.class})
29+
* @param <T> the return type
30+
* @return the result of the action
31+
*/
32+
public static <T> T withCalciteClassLoader(Callable<T> action, Class<?> callerClass) {
33+
Thread currentThread = Thread.currentThread();
34+
ClassLoader originalCl = currentThread.getContextClassLoader();
35+
currentThread.setContextClassLoader(callerClass.getClassLoader());
36+
try {
37+
return action.call();
38+
} catch (RuntimeException e) {
39+
throw e;
40+
} catch (Exception e) {
41+
throw new RuntimeException(e);
42+
} finally {
43+
currentThread.setContextClassLoader(originalCl);
44+
}
45+
}
46+
47+
/**
48+
* Run a void action with the thread context classloader set to the caller's classloader.
49+
*
50+
* @see #withCalciteClassLoader(Callable, Class)
51+
*/
52+
public static void withCalciteClassLoader(Runnable action, Class<?> callerClass) {
53+
withCalciteClassLoader(
54+
() -> {
55+
action.run();
56+
return null;
57+
},
58+
callerClass);
59+
}
60+
}

core/src/main/java/org/opensearch/sql/executor/QueryService.java

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.opensearch.sql.calcite.SysLimit;
3636
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit;
3737
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType;
38+
import org.opensearch.sql.calcite.utils.CalciteClassLoaderHelper;
3839
import org.opensearch.sql.common.response.ResponseListener;
3940
import org.opensearch.sql.common.setting.Settings;
4041
import org.opensearch.sql.common.utils.QueryContext;
@@ -139,14 +140,18 @@ public void executeWithCalcite(
139140
QueryProfiling.activate(QueryContext.isProfileEnabled());
140141
ProfileMetric analyzeMetric = profileContext.getOrCreateMetric(MetricName.ANALYZE);
141142
long analyzeStart = System.nanoTime();
142-
CalcitePlanContext context =
143-
CalcitePlanContext.create(
144-
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
145-
context.setHighlightConfig(highlightConfig);
146-
RelNode relNode = analyze(plan, context);
147-
RelNode calcitePlan = convertToCalcitePlan(relNode, context);
148-
analyzeMetric.set(System.nanoTime() - analyzeStart);
149-
executionEngine.execute(calcitePlan, context, listener);
143+
CalciteClassLoaderHelper.withCalciteClassLoader(
144+
() -> {
145+
CalcitePlanContext context =
146+
CalcitePlanContext.create(
147+
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
148+
context.setHighlightConfig(highlightConfig);
149+
RelNode relNode = analyze(plan, context);
150+
RelNode calcitePlan = convertToCalcitePlan(relNode, context);
151+
analyzeMetric.set(System.nanoTime() - analyzeStart);
152+
executionEngine.execute(calcitePlan, context, listener);
153+
},
154+
QueryService.class);
150155
} catch (Throwable t) {
151156
if (isCalciteFallbackAllowed(t) && !(t instanceof NonFallbackCalciteException)) {
152157
log.warn("Fallback to V2 query engine since got exception", t);
@@ -169,17 +174,21 @@ public void explainWithCalcite(
169174
() -> {
170175
try {
171176
QueryProfiling.noop();
172-
CalcitePlanContext context =
173-
CalcitePlanContext.create(
174-
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
175-
context.setHighlightConfig(highlightConfig);
176-
context.run(
177+
CalciteClassLoaderHelper.withCalciteClassLoader(
177178
() -> {
178-
RelNode relNode = analyze(plan, context);
179-
RelNode calcitePlan = convertToCalcitePlan(relNode, context);
180-
executionEngine.explain(calcitePlan, mode, context, listener);
179+
CalcitePlanContext context =
180+
CalcitePlanContext.create(
181+
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
182+
context.setHighlightConfig(highlightConfig);
183+
context.run(
184+
() -> {
185+
RelNode relNode = analyze(plan, context);
186+
RelNode calcitePlan = convertToCalcitePlan(relNode, context);
187+
executionEngine.explain(calcitePlan, mode, context, listener);
188+
},
189+
settings);
181190
},
182-
settings);
191+
QueryService.class);
183192
} catch (Throwable t) {
184193
if (isCalciteFallbackAllowed(t)) {
185194
log.warn("Fallback to V2 query engine since got exception", t);

core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.calcite.rel.RelNode;
1414
import org.apache.calcite.rel.type.RelDataType;
1515
import org.apache.calcite.rel.type.RelDataTypeField;
16+
import org.opensearch.analytics.exec.QueryPlanExecutor;
1617
import org.opensearch.sql.ast.statement.ExplainMode;
1718
import org.opensearch.sql.calcite.CalcitePlanContext;
1819
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
@@ -38,9 +39,9 @@
3839
*/
3940
public class AnalyticsExecutionEngine implements ExecutionEngine {
4041

41-
private final QueryPlanExecutor planExecutor;
42+
private final QueryPlanExecutor<RelNode, Iterable<Object[]>> planExecutor;
4243

43-
public AnalyticsExecutionEngine(QueryPlanExecutor planExecutor) {
44+
public AnalyticsExecutionEngine(QueryPlanExecutor<RelNode, Iterable<Object[]>> planExecutor) {
4445
this.planExecutor = planExecutor;
4546
}
4647

core/src/main/java/org/opensearch/sql/executor/analytics/QueryPlanExecutor.java

Lines changed: 0 additions & 32 deletions
This file was deleted.

core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.calcite.sql.type.SqlTypeName;
2525
import org.junit.jupiter.api.BeforeEach;
2626
import org.junit.jupiter.api.Test;
27+
import org.opensearch.analytics.exec.QueryPlanExecutor;
2728
import org.opensearch.sql.calcite.CalcitePlanContext;
2829
import org.opensearch.sql.calcite.SysLimit;
2930
import org.opensearch.sql.common.response.ResponseListener;
@@ -35,12 +36,15 @@
3536
class AnalyticsExecutionEngineTest {
3637

3738
private AnalyticsExecutionEngine engine;
38-
private QueryPlanExecutor mockExecutor;
39+
40+
@SuppressWarnings("unchecked")
41+
private QueryPlanExecutor<RelNode, Iterable<Object[]>> mockExecutor;
42+
3943
private CalcitePlanContext mockContext;
4044

4145
@BeforeEach
4246
void setUp() throws Exception {
43-
mockExecutor = mock(QueryPlanExecutor.class);
47+
mockExecutor = (QueryPlanExecutor<RelNode, Iterable<Object[]>>) mock(QueryPlanExecutor.class);
4448
engine = new AnalyticsExecutionEngine(mockExecutor);
4549
mockContext = mock(CalcitePlanContext.class);
4650
setSysLimit(mockContext, SysLimit.DEFAULT);

doctest/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ testClusters {
195195
}))
196196
*/
197197
plugin(getJobSchedulerPlugin(jsPlugin, bwcOpenSearchJSDownload))
198+
plugin provider { (RegularFile) (() -> file("${rootDir}/libs/analytics-engine-3.6.0-SNAPSHOT.zip")) }
198199
plugin ':opensearch-sql-plugin'
199200
testDistribution = 'archive'
200201
}

integ-test/build.gradle

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,35 +270,44 @@ def getGeoSpatialPlugin() {
270270
}
271271
}
272272

273+
def getAnalyticsEnginePlugin() {
274+
provider { (RegularFile) (() -> file("${rootDir}/libs/analytics-engine-3.6.0-SNAPSHOT.zip")) }
275+
}
276+
273277
testClusters {
274278
integTest {
275279
testDistribution = 'archive'
276280
plugin(getJobSchedulerPlugin())
277281
plugin(getGeoSpatialPlugin())
282+
plugin(getAnalyticsEnginePlugin())
278283
plugin ":opensearch-sql-plugin"
279284
setting "plugins.query.datasources.encryption.masterkey", "1234567812345678"
280285
}
281286
yamlRestTest {
282287
testDistribution = 'archive'
283288
plugin(getJobSchedulerPlugin())
284289
plugin(getGeoSpatialPlugin())
290+
plugin(getAnalyticsEnginePlugin())
285291
plugin ":opensearch-sql-plugin"
286292
setting "plugins.query.datasources.encryption.masterkey", "1234567812345678"
287293
}
288294
remoteCluster {
289295
testDistribution = 'archive'
290296
plugin(getJobSchedulerPlugin())
291297
plugin(getGeoSpatialPlugin())
298+
plugin(getAnalyticsEnginePlugin())
292299
plugin ":opensearch-sql-plugin"
293300
}
294301
integTestWithSecurity {
295302
testDistribution = 'archive'
296303
plugin(getJobSchedulerPlugin())
304+
plugin(getAnalyticsEnginePlugin())
297305
plugin ":opensearch-sql-plugin"
298306
}
299307
remoteIntegTestWithSecurity {
300308
testDistribution = 'archive'
301309
plugin(getJobSchedulerPlugin())
310+
plugin(getAnalyticsEnginePlugin())
302311
plugin ":opensearch-sql-plugin"
303312
}
304313
}
@@ -352,8 +361,10 @@ task stopPrometheus(type: KillProcessTask) {
352361
stopPrometheus.mustRunAfter startPrometheus
353362

354363
task integJdbcTest(type: RestIntegTestTask) {
355-
testClusters.findAll {c -> c.clusterName == "integJdbcTest"}.first().
364+
testClusters.findAll {c -> c.clusterName == "integJdbcTest"}.first().with {
365+
plugin(getAnalyticsEnginePlugin())
356366
plugin ":opensearch-sql-plugin"
367+
}
357368

358369
useJUnitPlatform()
359370
dependsOn ':opensearch-sql-plugin:bundlePlugin'

integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsExplainIT.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55

66
package org.opensearch.sql.ppl;
77

8+
import static org.opensearch.sql.legacy.TestUtils.isIndexExist;
89
import static org.opensearch.sql.util.MatcherUtils.assertYamlEqualsIgnoreId;
910

1011
import java.io.IOException;
1112
import org.junit.Test;
13+
import org.opensearch.client.Request;
1214

1315
/**
1416
* Explain integration tests for queries routed through the analytics engine path (Project Mustang).
@@ -25,7 +27,22 @@ public class AnalyticsExplainIT extends PPLIntegTestCase {
2527

2628
@Override
2729
protected void init() throws Exception {
28-
// No index loading needed -- stub schema and data are hardcoded
30+
// Create parquet_logs index so OpenSearchSchemaBuilder can build the schema for explain tests.
31+
if (!isIndexExist(client(), "parquet_logs")) {
32+
Request request = new Request("PUT", "/parquet_logs");
33+
request.setJsonEntity(
34+
"{"
35+
+ "\"mappings\": {"
36+
+ " \"properties\": {"
37+
+ " \"ts\": {\"type\": \"date\"},"
38+
+ " \"status\": {\"type\": \"integer\"},"
39+
+ " \"message\": {\"type\": \"keyword\"},"
40+
+ " \"ip_addr\": {\"type\": \"keyword\"}"
41+
+ " }"
42+
+ "}"
43+
+ "}");
44+
client().performRequest(request);
45+
}
2946
}
3047

3148
private String loadAnalyticsExpectedPlan(String fileName) {

integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsPPLIT.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.sql.ppl;
77

88
import static org.opensearch.sql.legacy.TestUtils.getResponseBody;
9+
import static org.opensearch.sql.legacy.TestUtils.isIndexExist;
910
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT;
1011
import static org.opensearch.sql.plugin.rest.RestPPLQueryAction.QUERY_API_ENDPOINT;
1112
import static org.opensearch.sql.util.MatcherUtils.rows;
@@ -40,8 +41,48 @@ public class AnalyticsPPLIT extends PPLIntegTestCase {
4041

4142
@Override
4243
protected void init() throws Exception {
43-
// No index loading needed -- stub schema and data are hardcoded
44-
// in RestUnifiedQueryAction and StubQueryPlanExecutor
44+
// Create parquet indices with mappings so OpenSearchSchemaBuilder can build the schema.
45+
// The stub executor still returns canned data regardless of the index contents.
46+
createParquetLogsIndex();
47+
createParquetMetricsIndex();
48+
}
49+
50+
private void createParquetLogsIndex() throws IOException {
51+
if (isIndexExist(client(), "parquet_logs")) {
52+
return;
53+
}
54+
Request request = new Request("PUT", "/parquet_logs");
55+
request.setJsonEntity(
56+
"{"
57+
+ "\"mappings\": {"
58+
+ " \"properties\": {"
59+
+ " \"ts\": {\"type\": \"date\"},"
60+
+ " \"status\": {\"type\": \"integer\"},"
61+
+ " \"message\": {\"type\": \"keyword\"},"
62+
+ " \"ip_addr\": {\"type\": \"keyword\"}"
63+
+ " }"
64+
+ "}"
65+
+ "}");
66+
client().performRequest(request);
67+
}
68+
69+
private void createParquetMetricsIndex() throws IOException {
70+
if (isIndexExist(client(), "parquet_metrics")) {
71+
return;
72+
}
73+
Request request = new Request("PUT", "/parquet_metrics");
74+
request.setJsonEntity(
75+
"{"
76+
+ "\"mappings\": {"
77+
+ " \"properties\": {"
78+
+ " \"ts\": {\"type\": \"date\"},"
79+
+ " \"cpu\": {\"type\": \"double\"},"
80+
+ " \"memory\": {\"type\": \"double\"},"
81+
+ " \"host\": {\"type\": \"keyword\"}"
82+
+ " }"
83+
+ "}"
84+
+ "}");
85+
client().performRequest(request);
4586
}
4687

4788
// --- Full table scan tests with schema + data verification ---

0 commit comments

Comments
 (0)