Skip to content

Commit 7030613

Browse files
authored
Wires EngineContextProvider (analytics-engine API) into the SQL plugin for schema resolution (#5486)
* Wires `EngineContextProvider` (analytics-engine API) into the SQL plugin so PPL/SQL queries routed to the analytics engine bind to a single per-query `QueryRequestContext` (ClusterState + Calcite schema) captured once at REST entry. The same snapshot drives both Calcite planning and analytics-engine execution, eliminating drift between schema resolution at parse time and state lookups at execution time. - `RestUnifiedQueryAction` calls `contextProvider.getContext()` per query; the result feeds `UnifiedQueryContext.catalog(...)` and `AnalyticsExecutionEngine.execute(..., QueryRequestContext, listener)`. - `EngineContextProviderHolder` — small static bridge from Guice (where the binding lives) to the REST handler (constructed before cross-plugin Guice bindings are satisfied). Mirrors the existing `AnalyticsExecutorHolder` pattern. - `TransportPPLQueryAction` picks up the provider via `@Inject(optional = true) setEngineContextProvider` and populates the holder. `SQLPlugin`'s router waits for both executor + provider to be available. - `isAnalyticsIndex` short-circuits on `cluster.pluggable.dataformat="composite"` so aliases, wildcards, comma-lists, and data streams (which `Metadata#index()` can't resolve) route to the analytics engine without a per-index lookup. Pairs with the OpenSearch-side PR that introduces `EngineContextProvider` / `QueryRequestContext` and threads the snapshot through `DefaultPlanExecutor`. Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * spotless Signed-off-by: Marc Handalian <marc.handalian@gmail.com> --------- Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
1 parent 25529e7 commit 7030613

6 files changed

Lines changed: 113 additions & 12 deletions

File tree

core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,20 @@ public void explain(PhysicalPlan plan, ResponseListener<ExplainResponse> listene
7777
@Override
7878
public void execute(
7979
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {
80+
execute(plan, context, null, listener);
81+
}
82+
83+
/**
84+
* Overload that threads a {@link org.opensearch.analytics.QueryRequestContext} snapshot down to
85+
* the analytics-engine's plan executor via {@link
86+
* org.opensearch.analytics.exec.QueryPlanExecutor}'s opaque context slot. The snapshot is
87+
* captured once at query entry so planner and executor see the same cluster state.
88+
*/
89+
public void execute(
90+
RelNode plan,
91+
CalcitePlanContext context,
92+
org.opensearch.analytics.QueryRequestContext queryCtx,
93+
ResponseListener<QueryResponse> listener) {
8094
// QueryPlanExecutor became asynchronous in analytics-framework 3.7 — execution is dispatched
8195
// to a worker pool and results arrive on the listener. Record the execute metric in the
8296
// listener callback, before delegating to the user-supplied listener, so the metric snapshot
@@ -86,7 +100,7 @@ public void execute(
86100

87101
planExecutor.execute(
88102
plan,
89-
null,
103+
queryCtx,
90104
new ActionListener<>() {
91105
@Override
92106
public void onResponse(Iterable<Object[]> rows) {

plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,11 +225,13 @@ private BiFunction<SQLQueryRequest, RestChannel, Boolean> createSqlAnalyticsRout
225225
() -> {
226226
if (cached[0] == null) {
227227
var executor = AnalyticsExecutorHolder.get();
228-
if (executor == null) {
228+
var contextProvider = org.opensearch.sql.plugin.rest.EngineContextProviderHolder.get();
229+
if (executor == null || contextProvider == null) {
229230
return null;
230231
}
231232
cached[0] =
232-
new RestUnifiedQueryAction(client, clusterService, executor, pluginSettings);
233+
new RestUnifiedQueryAction(
234+
client, clusterService, executor, contextProvider, pluginSettings);
233235
}
234236
return cached[0];
235237
};
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.plugin.rest;
7+
8+
import org.opensearch.analytics.EngineContextProvider;
9+
10+
/**
11+
* Bridge for sharing the analytics-engine {@link EngineContextProvider} singleton between the PPL
12+
* transport action (where Guice resolves the binding via {@code @Inject}) and the REST-only SQL
13+
* router (where Guice cannot, because {@code SQLPlugin#getRestHandlers} runs before the Node-level
14+
* injector satisfies {@code @Inject} parameters).
15+
*
16+
* <p>Mirrors {@link AnalyticsExecutorHolder} — same cross-plugin Guice gap, same solution. The
17+
* holder is populated once by {@link org.opensearch.sql.plugin.transport.TransportPPLQueryAction}'s
18+
* ctor and then read by the SQL router. The context itself is intended to be a stable singleton
19+
* produced by AnalyticsPlugin#createComponents.
20+
*/
21+
public final class EngineContextProviderHolder {
22+
23+
private static volatile EngineContextProvider context;
24+
25+
private EngineContextProviderHolder() {}
26+
27+
public static void set(EngineContextProvider instance) {
28+
context = instance;
29+
}
30+
31+
public static EngineContextProvider get() {
32+
return context;
33+
}
34+
}

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
import org.apache.logging.log4j.Logger;
1818
import org.apache.logging.log4j.ThreadContext;
1919
import org.opensearch.analytics.exec.QueryPlanExecutor;
20-
import org.opensearch.analytics.schema.OpenSearchSchemaBuilder;
2120
import org.opensearch.cluster.service.ClusterService;
2221
import org.opensearch.common.unit.TimeValue;
2322
import org.opensearch.core.action.ActionListener;
2423
import org.opensearch.index.IndexSettings;
24+
import org.opensearch.indices.IndicesService;
2525
import org.opensearch.sql.api.UnifiedQueryContext;
2626
import org.opensearch.sql.api.UnifiedQueryPlanner;
2727
import org.opensearch.sql.ast.AbstractNodeVisitor;
@@ -54,16 +54,19 @@ public class RestUnifiedQueryAction {
5454
private final AnalyticsExecutionEngine analyticsEngine;
5555
private final NodeClient client;
5656
private final ClusterService clusterService;
57+
private final org.opensearch.analytics.EngineContextProvider contextProvider;
5758
private final org.opensearch.sql.common.setting.Settings pluginSettings;
5859

5960
public RestUnifiedQueryAction(
6061
NodeClient client,
6162
ClusterService clusterService,
6263
QueryPlanExecutor<RelNode, Iterable<Object[]>> planExecutor,
64+
org.opensearch.analytics.EngineContextProvider contextProvider,
6365
org.opensearch.sql.common.setting.Settings pluginSettings) {
6466
this.client = client;
6567
this.clusterService = clusterService;
6668
this.analyticsEngine = new AnalyticsExecutionEngine(planExecutor);
69+
this.contextProvider = contextProvider;
6770
this.pluginSettings = pluginSettings;
6871
}
6972

@@ -82,6 +85,17 @@ public boolean isAnalyticsIndex(String query, QueryType queryType) {
8285
if (query == null || query.isEmpty()) {
8386
return false;
8487
}
88+
// Cluster-level opt-in: when `cluster.pluggable.dataformat="composite"`, new indices
89+
// inherit `index.pluggable.dataformat="composite"` at creation (see
90+
// MetadataCreateIndexService), so every queryable target is analytics-eligible. Skip
91+
// the per-index lookup — it doesn't work for aliases, wildcards, comma-lists, or data
92+
// streams (Metadata#index() only resolves concrete names).
93+
if ("composite"
94+
.equals(
95+
IndicesService.CLUSTER_PLUGGABLE_DATAFORMAT_VALUE_SETTING.get(
96+
clusterService.getSettings()))) {
97+
return true;
98+
}
8599
try (UnifiedQueryContext context = buildParsingContext(queryType)) {
86100
return extractIndexName(query, queryType, context)
87101
.map(this::stripSchemaPrefix)
@@ -118,7 +132,12 @@ public void execute(
118132
.schedule(
119133
withCurrentContext(
120134
() -> {
121-
UnifiedQueryContext context = buildContext(queryType, profiling);
135+
// Ask the engine for a per-query context — it binds the snapshot
136+
// (cluster state + schema built from it) and returns the pair, so the
137+
// schema we plan against and the state the executor uses are the same view.
138+
org.opensearch.analytics.QueryRequestContext queryCtx =
139+
contextProvider.getContext();
140+
UnifiedQueryContext context = buildContext(queryType, profiling, queryCtx);
122141
ActionListener<TransportPPLQueryResponse> closingListener =
123142
wrapWithContextClose(context, listener);
124143
try {
@@ -127,7 +146,10 @@ public void execute(
127146
CalcitePlanContext planContext = context.getPlanContext();
128147
plan = addQuerySizeLimit(plan, planContext);
129148
analyticsEngine.execute(
130-
plan, planContext, createQueryListener(queryType, closingListener));
149+
plan,
150+
planContext,
151+
queryCtx,
152+
createQueryListener(queryType, closingListener));
131153
} catch (Exception e) {
132154
closingListener.onFailure(e);
133155
}
@@ -150,7 +172,9 @@ public void explain(
150172
.schedule(
151173
withCurrentContext(
152174
() -> {
153-
try (UnifiedQueryContext context = buildContext(queryType, false)) {
175+
org.opensearch.analytics.QueryRequestContext queryCtx =
176+
contextProvider.getContext();
177+
try (UnifiedQueryContext context = buildContext(queryType, false, queryCtx)) {
154178
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
155179
RelNode plan = planner.plan(query);
156180
CalcitePlanContext planContext = context.getPlanContext();
@@ -172,11 +196,15 @@ private UnifiedQueryContext buildParsingContext(QueryType queryType) {
172196
return applyClusterOverrides(UnifiedQueryContext.builder().language(queryType)).build();
173197
}
174198

175-
private UnifiedQueryContext buildContext(QueryType queryType, boolean profiling) {
199+
private UnifiedQueryContext buildContext(
200+
QueryType queryType,
201+
boolean profiling,
202+
org.opensearch.analytics.QueryRequestContext queryCtx) {
176203
return applyClusterOverrides(
177204
UnifiedQueryContext.builder()
178205
.language(queryType)
179-
.catalog(SCHEMA_NAME, OpenSearchSchemaBuilder.buildSchema(clusterService.state()))
206+
// Schema captured by queryCtx — same cluster state the executor will use.
207+
.catalog(SCHEMA_NAME, queryCtx.schema())
180208
.defaultNamespace(SCHEMA_NAME)
181209
.profiling(profiling))
182210
.build();

plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,27 @@ public TransportPPLQueryAction(
109109
public void setQueryPlanExecutor(
110110
QueryPlanExecutor<RelNode, Iterable<Object[]>> queryPlanExecutor) {
111111
AnalyticsExecutorHolder.set(queryPlanExecutor);
112-
this.unifiedQueryHandler =
113-
new RestUnifiedQueryAction(
114-
clientRef, clusterServiceRef, queryPlanExecutor, pluginSettingsRef);
112+
// Build the SQL router once both bridges are populated (engine context might arrive
113+
// first or last depending on Guice ordering). buildUnifiedQueryHandler is idempotent.
114+
buildUnifiedQueryHandlerIfReady();
115+
}
116+
117+
/** Invoked by Guice iff analytics-engine bound {@code EngineContextProvider}. */
118+
@Inject(optional = true)
119+
public void setEngineContext(org.opensearch.analytics.EngineContextProvider contextProvider) {
120+
org.opensearch.sql.plugin.rest.EngineContextProviderHolder.set(contextProvider);
121+
buildUnifiedQueryHandlerIfReady();
122+
}
123+
124+
private void buildUnifiedQueryHandlerIfReady() {
125+
QueryPlanExecutor<RelNode, Iterable<Object[]>> executor = AnalyticsExecutorHolder.get();
126+
org.opensearch.analytics.EngineContextProvider contextProvider =
127+
org.opensearch.sql.plugin.rest.EngineContextProviderHolder.get();
128+
if (executor != null && contextProvider != null) {
129+
this.unifiedQueryHandler =
130+
new RestUnifiedQueryAction(
131+
clientRef, clusterServiceRef, executor, contextProvider, pluginSettingsRef);
132+
}
115133
}
116134

117135
/**

plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.calcite.rel.RelNode;
1414
import org.junit.Before;
1515
import org.junit.Test;
16+
import org.opensearch.analytics.EngineContextProvider;
1617
import org.opensearch.analytics.exec.QueryPlanExecutor;
1718
import org.opensearch.cluster.ClusterState;
1819
import org.opensearch.cluster.metadata.IndexMetadata;
@@ -40,6 +41,9 @@ public void setUp() {
4041
metadata = mock(Metadata.class);
4142
when(clusterService.state()).thenReturn(clusterState);
4243
when(clusterState.metadata()).thenReturn(metadata);
44+
// isAnalyticsIndex short-circuits on the cluster.pluggable.dataformat setting; the per-index
45+
// path is only exercised when this returns something other than "composite".
46+
when(clusterService.getSettings()).thenReturn(Settings.EMPTY);
4347

4448
@SuppressWarnings("unchecked")
4549
QueryPlanExecutor<RelNode, Iterable<Object[]>> executor = mock(QueryPlanExecutor.class);
@@ -48,6 +52,7 @@ public void setUp() {
4852
mock(NodeClient.class),
4953
clusterService,
5054
executor,
55+
mock(EngineContextProvider.class),
5156
mock(org.opensearch.sql.common.setting.Settings.class));
5257
}
5358

0 commit comments

Comments
 (0)