Skip to content

Commit f1f39ef

Browse files
ahkcsbowenlan-amzn
andauthored
Land analytics-engine PPL integration into main (opensearch-project#5430)
* Land analytics-engine PPL integration into main Single squashed delivery of the long-running feature/mustang-ppl-integration branch into main, consolidating 22 feature-branch PRs plus the conflict-resolved merge of current main. Squashed because the feature branch's history includes commits with missing or mismatched Signed-off-by trailers that block DCO at this scope — the equivalent issue documented for the catch-up squashes (opensearch-project#5397). The feature branch f006e29 is retained for individual-commit lineage. ### What this delivers Analytics-engine PPL integration — a new execution path that routes Parquet-backed (non-Lucene) indices through an analytics engine while keeping Lucene-backed indices on the existing v2 / Calcite paths. Headline pieces: - Query routing (opensearch-project#5267) — PPL queries against Parquet-backed indices hand off to the analytics-engine execution path; Lucene-backed indices continue through the legacy path - Explain support (opensearch-project#5275) — EXPLAIN covers the analytics-engine path - Profiling + UnifiedQueryParser (opensearch-project#5285) — migrates PPL parsing to the unified parser and wires profiling metrics through the analytics path - extendedPlugins wiring (opensearch-project#5302) — analytics-engine attaches as an OpenSearch extension via SPI - SQL REST endpoint integration (opensearch-project#5317) — same analytics-route fork applied to the SQL transport, plus delegateToV2Engine extraction in RestSqlAction - Async QueryPlanExecutor (opensearch-project#5396) — async execution for analytics-engine plans + version bump to OpenSearch 3.7 - Optional dependency (opensearch-project#5403) — analytics-engine becomes an optional runtime dep so the SQL bundle is shippable without it - Index-setting-based routing (opensearch-project#5429, opensearch-project#5432) — replaces the earlier table-name-prefix heuristic with an authoritative index-setting check Supporting infrastructure: - Gradle wrapper bump to 9.4.1 (opensearch-project#5406) - Jar-hell exclusions for arrow-flight-rpc / httpcore5-h2 / httpcore5-reactive / httpclient5 (opensearch-project#5400, opensearch-project#5409) - IT plumbing: CalciteEvalCommandIT / CalciteFieldFormatCommandIT carried through the helper-managed index path (opensearch-project#5407, opensearch-project#5417); CalciteReplaceCommandIT column-order-agnostic (opensearch-project#5415); @ignore'd Calcite ITs dropped from CalciteNoPushdownIT (opensearch-project#5416) - plugins.calcite.enabled=true defaulted on the unified query path (opensearch-project#5413) - PPL_REX_MAX_MATCH_LIMIT bridged into UnifiedQueryContext (opensearch-project#5418) - Calcite tolerance fixes: array() default type (opensearch-project#5421), containsNestedAggregator flat-leaf schemas (opensearch-project#5423) - Sandbox deps switched to analytics-api JDK 21 surface (opensearch-project#5426) ### Feature-branch commits squashed (22) opensearch-project#5432, opensearch-project#5429, opensearch-project#5426, opensearch-project#5423, opensearch-project#5421, opensearch-project#5418, opensearch-project#5403, opensearch-project#5417, opensearch-project#5415, opensearch-project#5416, opensearch-project#5413, opensearch-project#5407, opensearch-project#5409, opensearch-project#5406, opensearch-project#5400, opensearch-project#5396, opensearch-project#5317, opensearch-project#5302, opensearch-project#5285, opensearch-project#5275, opensearch-project#5267, opensearch-project#5397, opensearch-project#5286 ### Main commits absorbed via the merge (54) Brings the branch up to current upstream/main (54 commits since the last catch-up at opensearch-project#5397, divergence point 513e1b2). Highlights: opensearch-project#5419, opensearch-project#5408, opensearch-project#5414, opensearch-project#5399, opensearch-project#5394, opensearch-project#5361, opensearch-project#5360, opensearch-project#5240, opensearch-project#5266, opensearch-project#5278, plus 44 others (bugfixes, doc updates, infra). ### Conflict resolutions (7) Resolved during the merge of main into the feature branch. Resolution kept the feature branch's analytics-engine-path semantics where main's changes would have regressed them. - api/.../UnifiedQueryContext.java Blank-line-only conflict; took main's tighter formatting. - core/.../executor/QueryService.java Kept feature's CalciteClassLoaderHelper.withCalciteClassLoader(...) wrapping (required for analytics-engine classloader isolation) and the matching import. - integ-test/build.gradle Kept feature's detailed root-cause comment on the Gradle 9.4.1 TestEventReporterAsListener workaround; kept ASCII ordering of JSONRequestIT / JoinIT and SQLFunctionsIT / ShowIT / SourceFieldIT entries. - integ-test/.../CalciteEvalCommandIT.java Kept feature's if (!TestUtils.isIndexExist(...)) idempotency guards on test_eval and test_eval_agent setup (needed for the helper-managed index analytics-engine compatibility run). - legacy/.../RestSqlAction.java Kept feature's delegateToV2Engine(...) (extracted from the analytics-engine routing path). Both sides added handleException / getRestStatus / getRawErrorCode; removed the duplicate set git produced. - plugin/.../SQLPlugin.java Took the union of imports: ExecutionEngine + ExecutionEngine.ExplainResponse + QueryType. - plugin/.../transport/TransportPPLQueryAction.java Combined main's OpenSearchPluginModule(extensionsHolder.engines()) and feature's local pluginSettings / pluginSettingsRef wiring. EngineExtensionsHolder.java is a new file from main (opensearch-project#5298) preserved as-is. ### Compatibility / opt-in The analytics-engine path is gated by the extendedPlugins extension being installed (opensearch-project#5403 makes the dep optional). Clusters without analytics-engine installed see no behavior change. Clusters with analytics-engine installed route only Parquet-backed indices through the new path (opensearch-project#5429 — by index setting). ### Verification - ./gradlew :api:compileJava :core:compileJava :legacy:compileJava :opensearch-sql-plugin:compileJava :integ-test:compileTestJava passes locally Signed-off-by: Kai Huang <ahkcs@amazon.com> Co-authored-by: bowenlan-amzn <bowenlan23@gmail.com> * Address @penghuo: revert stray blank line in doctest/build.gradle After 'apply plugin: opensearch.testclusters', one blank line is enough — restoring the single-blank spacing to match upstream/main. Signed-off-by: Kai Huang <ahkcs@amazon.com> --------- Signed-off-by: Kai Huang <ahkcs@amazon.com> Co-authored-by: bowenlan-amzn <bowenlan23@gmail.com>
1 parent 0ff1eec commit f1f39ef

26 files changed

Lines changed: 1978 additions & 153 deletions

File tree

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
name: Analytics Engine Compatibility
2+
3+
on:
4+
pull_request:
5+
push:
6+
branches-ignore:
7+
- 'backport/**'
8+
- 'dependabot/**'
9+
paths:
10+
- '**/*.java'
11+
- '**gradle*'
12+
- 'integ-test/**'
13+
- '.github/workflows/analytics-engine-compat.yml'
14+
merge_group:
15+
16+
jobs:
17+
Get-CI-Image-Tag:
18+
uses: opensearch-project/opensearch-build/.github/workflows/get-ci-image-tag.yml@main
19+
with:
20+
product: opensearch
21+
22+
analytics-engine-compat:
23+
needs: Get-CI-Image-Tag
24+
runs-on: ubuntu-latest
25+
container:
26+
image: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-version-linux }}
27+
options: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-options }}
28+
29+
steps:
30+
- name: Run start commands
31+
run: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-command }}
32+
33+
- uses: actions/checkout@v4
34+
35+
- name: Set up JDK 25
36+
uses: actions/setup-java@v4
37+
with:
38+
distribution: 'temurin'
39+
java-version: 25
40+
41+
- name: Run analytics-engine compatibility smoke test
42+
run: |
43+
chown -R 1000:1000 `pwd`
44+
su `id -un 1000` -c "./gradlew :integ-test:analyticsEngineCompatIT"

api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55

66
package org.opensearch.sql.api;
77

8+
import static org.opensearch.sql.common.setting.Settings.Key.CALCITE_ENGINE_ENABLED;
89
import static org.opensearch.sql.common.setting.Settings.Key.PPL_JOIN_SUBSEARCH_MAXOUT;
10+
import static org.opensearch.sql.common.setting.Settings.Key.PPL_REX_MAX_MATCH_LIMIT;
911
import static org.opensearch.sql.common.setting.Settings.Key.PPL_SUBSEARCH_MAXOUT;
1012
import static org.opensearch.sql.common.setting.Settings.Key.QUERY_SIZE_LIMIT;
1113

@@ -119,13 +121,31 @@ public static class Builder {
119121
/**
120122
* Setting values with defaults from SysLimit.DEFAULT. Only includes planning-required settings
121123
* to avoid coupling with OpenSearchSettings.
124+
*
125+
* <p>{@link Settings.Key#CALCITE_ENGINE_ENABLED} defaults to {@code true} here because the
126+
* unified query path is by definition Calcite-based — every query reaching this context flows
127+
* through Calcite's planner, never the v2 engine. The PPL {@link
128+
* org.opensearch.sql.api.parser.PPLQueryParser} reuses the v2 {@code AstBuilder}, which gates
129+
* Calcite-only commands (e.g. {@code visitTableCommand}) on this setting; without the default,
130+
* those commands fail at parse time even when the cluster setting is true.
131+
*
132+
* <p>{@link Settings.Key#PPL_REX_MAX_MATCH_LIMIT} defaults to {@code 10} here because {@code
133+
* AstBuilder.visitRexCommand} reads it unconditionally and unboxes to {@code int} — a {@code
134+
* null} return from {@code getSettingValue} NPEs the planner before any operator-level
135+
* capability check runs. The value mirrors the cluster-side default of {@code 10} registered by
136+
* {@code OpenSearchSettings.PPL_REX_MAX_MATCH_LIMIT_SETTING}. Cluster-side overrides reach this
137+
* map via {@link #setting(String, Object)} — the REST handler reads the live value from {@code
138+
* OpenSearchSettings} and routes it through that existing API, keeping {@link
139+
* UnifiedQueryContext} decoupled from any specific {@link Settings} implementation.
122140
*/
123141
private final Map<Settings.Key, Object> settings =
124142
new HashMap<Settings.Key, Object>(
125143
Map.of(
126144
QUERY_SIZE_LIMIT, SysLimit.DEFAULT.querySizeLimit(),
127145
PPL_SUBSEARCH_MAXOUT, SysLimit.DEFAULT.subsearchLimit(),
128-
PPL_JOIN_SUBSEARCH_MAXOUT, SysLimit.DEFAULT.joinSubsearchLimit()));
146+
PPL_JOIN_SUBSEARCH_MAXOUT, SysLimit.DEFAULT.joinSubsearchLimit(),
147+
CALCITE_ENGINE_ENABLED, true,
148+
PPL_REX_MAX_MATCH_LIMIT, 10));
129149

130150
/**
131151
* Sets the query language frontend to be used.

api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ public void testContextCreationWithDefaults() {
3333
"Settings should have default system limits",
3434
SysLimit.DEFAULT,
3535
SysLimit.fromSettings(context.getSettings()));
36+
assertEquals(
37+
"PPL_REX_MAX_MATCH_LIMIT default should be 10",
38+
Integer.valueOf(10),
39+
context.getSettings().getSettingValue(PPL_REX_MAX_MATCH_LIMIT));
3640
}
3741

3842
@Test
@@ -43,10 +47,15 @@ public void testContextCreationWithCustomConfig() {
4347
.catalog("opensearch", testSchema)
4448
.cacheMetadata(true)
4549
.setting("plugins.query.size_limit", 200)
50+
.setting("plugins.ppl.rex.max_match.limit", 5)
4651
.build();
4752

4853
Integer querySizeLimit = context.getSettings().getSettingValue(QUERY_SIZE_LIMIT);
4954
assertEquals("Custom setting should be applied", Integer.valueOf(200), querySizeLimit);
55+
assertEquals(
56+
"Cluster-side override for PPL_REX_MAX_MATCH_LIMIT should reach the unified path",
57+
Integer.valueOf(5),
58+
context.getSettings().getSettingValue(PPL_REX_MAX_MATCH_LIMIT));
5059
}
5160

5261
@Test(expected = IllegalArgumentException.class)

core/build.gradle

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ plugins {
3333
}
3434

3535
repositories {
36+
mavenLocal()
3637
mavenCentral()
3738
}
3839

@@ -63,6 +64,12 @@ dependencies {
6364
}
6465
api 'org.apache.calcite:calcite-linq4j:1.41.0'
6566
api project(':common')
67+
compileOnly 'org.opensearch.sandbox:analytics-api:3.7.0-SNAPSHOT'
68+
// Needed because analytics-api's QueryPlanExecutor signature uses
69+
// org.opensearch.core.action.ActionListener; AnalyticsExecutionEngine references that type.
70+
compileOnly group: 'org.opensearch', name: 'opensearch-core', version: "${opensearch_version}"
71+
testImplementation 'org.opensearch.sandbox:analytics-api:3.7.0-SNAPSHOT'
72+
testImplementation group: 'org.opensearch', name: 'opensearch-core', version: "${opensearch_version}"
6673
implementation "com.github.seancfoley:ipaddress:5.4.2"
6774
implementation "com.jayway.jsonpath:json-path:2.9.0"
6875

core/src/main/java/org/opensearch/sql/ast/statement/ExplainMode.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.util.Locale;
99
import lombok.Getter;
1010
import lombok.RequiredArgsConstructor;
11+
import org.apache.calcite.sql.SqlExplainLevel;
1112

1213
@RequiredArgsConstructor
1314
public enum ExplainMode {
@@ -26,4 +27,13 @@ public static ExplainMode of(String mode) {
2627
return ExplainMode.STANDARD;
2728
}
2829
}
30+
31+
/** Convert to Calcite SqlExplainLevel for RelOptUtil.toString(). */
32+
public SqlExplainLevel toExplainLevel() {
33+
return switch (this) {
34+
case SIMPLE -> SqlExplainLevel.NO_ATTRIBUTES;
35+
case COST -> SqlExplainLevel.ALL_ATTRIBUTES;
36+
default -> SqlExplainLevel.EXPPLAN_ATTRIBUTES;
37+
};
38+
}
2939
}

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1538,10 +1538,25 @@ private Pair<List<RexNode>, List<AggCall>> aggregateWithTrimming(
15381538
* count(a.b)] returns true.
15391539
*/
15401540
private boolean containsNestedAggregator(RelBuilder relBuilder, List<RexInputRef> aggCallRefs) {
1541+
// For each aggregator argument, take the part of its column name before the first dot
1542+
// (e.g. "city" from "city.location.latitude") and check whether that's a top-level
1543+
// ARRAY column — the marker for an OpenSearch `nested` field.
1544+
//
1545+
// The classic path always exposes a top-level column for object/nested parents. The
1546+
// analytics-engine path emits only the flat leaves ("city.name", "city.location.latitude")
1547+
// because parent placeholder types (MAP<VARCHAR, ANY>) can't round-trip through Substrait.
1548+
// RelDataType.getField returns null when the column doesn't exist — for analytics-engine,
1549+
// that null just means "not nested," which is the right answer.
1550+
RelDataType rowType = relBuilder.peek().getRowType();
15411551
return aggCallRefs.stream()
1542-
.map(r -> relBuilder.peek().getRowType().getFieldNames().get(r.getIndex()))
1552+
.map(r -> rowType.getFieldNames().get(r.getIndex()))
15431553
.map(name -> org.apache.commons.lang3.StringUtils.substringBefore(name, "."))
1544-
.anyMatch(root -> relBuilder.field(root).getType().getSqlTypeName() == SqlTypeName.ARRAY);
1554+
.anyMatch(
1555+
root -> {
1556+
RelDataTypeField field =
1557+
rowType.getField(root, /* caseSensitive= */ true, /* elideRecord= */ false);
1558+
return field != null && field.getType().getSqlTypeName() == SqlTypeName.ARRAY;
1559+
});
15451560
}
15461561

15471562
/**
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.utils;
7+
8+
import java.util.concurrent.Callable;
9+
10+
/**
11+
* Helper for setting the thread context classloader before Calcite operations. This is needed for
12+
* patched Calcite (CALCITE-3745): when analytics-engine is the parent classloader, Janino uses the
13+
* parent's classloader which can't see SQL plugin classes. The patched Calcite checks {@code
14+
* Thread.currentThread().getContextClassLoader()} first. This helper sets it to the SQL plugin's
15+
* classloader (child) which can see both parent and child classes.
16+
*
17+
* @see <a href="https://issues.apache.org/jira/browse/CALCITE-3745">CALCITE-3745</a>
18+
* @see <a href="https://github.com/opensearch-project/sql/issues/5306">sql#5306</a>
19+
*/
20+
public final class CalciteClassLoaderHelper {
21+
22+
private CalciteClassLoaderHelper() {}
23+
24+
/**
25+
* Run an action with the thread context classloader set to the caller's classloader.
26+
*
27+
* @param action the action to run
28+
* @param callerClass the class whose classloader should be used (pass {@code MyClass.class})
29+
* @param <T> the return type
30+
* @return the result of the action
31+
*/
32+
public static <T> T withCalciteClassLoader(Callable<T> action, Class<?> callerClass) {
33+
Thread currentThread = Thread.currentThread();
34+
ClassLoader originalCl = currentThread.getContextClassLoader();
35+
currentThread.setContextClassLoader(callerClass.getClassLoader());
36+
try {
37+
return action.call();
38+
} catch (RuntimeException e) {
39+
throw e;
40+
} catch (Exception e) {
41+
throw new RuntimeException(e);
42+
} finally {
43+
currentThread.setContextClassLoader(originalCl);
44+
}
45+
}
46+
47+
/**
48+
* Run a void action with the thread context classloader set to the caller's classloader.
49+
*
50+
* @see #withCalciteClassLoader(Callable, Class)
51+
*/
52+
public static void withCalciteClassLoader(Runnable action, Class<?> callerClass) {
53+
withCalciteClassLoader(
54+
() -> {
55+
action.run();
56+
return null;
57+
},
58+
callerClass);
59+
}
60+
}

core/src/main/java/org/opensearch/sql/executor/QueryService.java

Lines changed: 40 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.opensearch.sql.calcite.SysLimit;
3636
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit;
3737
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType;
38+
import org.opensearch.sql.calcite.utils.CalciteClassLoaderHelper;
3839
import org.opensearch.sql.common.error.ErrorReport;
3940
import org.opensearch.sql.common.error.QueryProcessingStage;
4041
import org.opensearch.sql.common.error.StageErrorHandler;
@@ -142,33 +143,37 @@ public void executeWithCalcite(
142143
QueryProfiling.activate(QueryContext.isProfileEnabled());
143144
ProfileMetric analyzeMetric = profileContext.getOrCreateMetric(MetricName.ANALYZE);
144145
long analyzeStart = System.nanoTime();
145-
CalcitePlanContext context =
146-
CalcitePlanContext.create(
147-
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
146+
CalciteClassLoaderHelper.withCalciteClassLoader(
147+
() -> {
148+
CalcitePlanContext context =
149+
CalcitePlanContext.create(
150+
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
148151

149-
context.setHighlightConfig(highlightConfig);
152+
context.setHighlightConfig(highlightConfig);
150153

151-
// Wrap analyze with ANALYZING stage tracking
152-
RelNode relNode =
153-
StageErrorHandler.executeStage(
154-
QueryProcessingStage.ANALYZING,
155-
() -> analyze(plan, context),
156-
"while preparing and validating the query plan");
154+
// Wrap analyze with ANALYZING stage tracking
155+
RelNode relNode =
156+
StageErrorHandler.executeStage(
157+
QueryProcessingStage.ANALYZING,
158+
() -> analyze(plan, context),
159+
"while preparing and validating the query plan");
157160

158-
// Wrap plan conversion with PLAN_CONVERSION stage tracking
159-
RelNode calcitePlan =
160-
StageErrorHandler.executeStage(
161-
QueryProcessingStage.PLAN_CONVERSION,
162-
() -> convertToCalcitePlan(relNode, context),
163-
"while converting the query to an executable plan");
161+
// Wrap plan conversion with PLAN_CONVERSION stage tracking
162+
RelNode calcitePlan =
163+
StageErrorHandler.executeStage(
164+
QueryProcessingStage.PLAN_CONVERSION,
165+
() -> convertToCalcitePlan(relNode, context),
166+
"while converting the query to an executable plan");
164167

165-
analyzeMetric.set(System.nanoTime() - analyzeStart);
168+
analyzeMetric.set(System.nanoTime() - analyzeStart);
166169

167-
// Wrap execution with EXECUTING stage tracking
168-
StageErrorHandler.executeStageVoid(
169-
QueryProcessingStage.EXECUTING,
170-
() -> executionEngine.execute(calcitePlan, context, listener),
171-
"while running the query");
170+
// Wrap execution with EXECUTING stage tracking
171+
StageErrorHandler.executeStageVoid(
172+
QueryProcessingStage.EXECUTING,
173+
() -> executionEngine.execute(calcitePlan, context, listener),
174+
"while running the query");
175+
},
176+
QueryService.class);
172177
} catch (Throwable t) {
173178
if (isCalciteFallbackAllowed(t) && !(t instanceof NonFallbackCalciteException)) {
174179
log.warn("Fallback to V2 query engine since got exception", t);
@@ -191,17 +196,21 @@ public void explainWithCalcite(
191196
() -> {
192197
try {
193198
QueryProfiling.noop();
194-
CalcitePlanContext context =
195-
CalcitePlanContext.create(
196-
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
197-
context.setHighlightConfig(highlightConfig);
198-
context.run(
199+
CalciteClassLoaderHelper.withCalciteClassLoader(
199200
() -> {
200-
RelNode relNode = analyze(plan, context);
201-
RelNode calcitePlan = convertToCalcitePlan(relNode, context);
202-
executionEngine.explain(calcitePlan, mode, context, listener);
201+
CalcitePlanContext context =
202+
CalcitePlanContext.create(
203+
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
204+
context.setHighlightConfig(highlightConfig);
205+
context.run(
206+
() -> {
207+
RelNode relNode = analyze(plan, context);
208+
RelNode calcitePlan = convertToCalcitePlan(relNode, context);
209+
executionEngine.explain(calcitePlan, mode, context, listener);
210+
},
211+
settings);
203212
},
204-
settings);
213+
QueryService.class);
205214
} catch (Throwable t) {
206215
if (isCalciteFallbackAllowed(t)) {
207216
log.warn("Fallback to V2 query engine since got exception", t);

0 commit comments

Comments
 (0)