Skip to content

Commit 804d4f1

Browse files
authored
[BugFix] Fix SHOW/DESCRIBE statement routing under cluster.pluggable.dataformat setting (#5528)
When cluster.pluggable.dataformat=composite, isAnalyticsIndex routed every query to the analytics engine, which cannot serve the system catalog (*_ODFE_SYS_TABLE, .DATASOURCES) that SHOW/DESCRIBE resolve. Detect system-catalog queries (including legacy-syntax SHOW/DESCRIBE that the V2 parser rejects) and keep them on the default pipeline while data queries continue to the analytics engine. Also log query routing to the analytics engine at both call sites. Signed-off-by: Chen Dai <daichen@amazon.com>
1 parent 785e277 commit 804d4f1

4 files changed

Lines changed: 106 additions & 3 deletions

File tree

plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.opensearch.script.ScriptService;
6464
import org.opensearch.sql.ast.statement.ExplainMode;
6565
import org.opensearch.sql.common.response.ResponseListener;
66+
import org.opensearch.sql.common.utils.QueryContext;
6667
import org.opensearch.sql.datasource.DataSourceService;
6768
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelper;
6869
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
@@ -241,6 +242,7 @@ private BiFunction<SQLQueryRequest, RestChannel, Boolean> createSqlAnalyticsRout
241242
|| !unifiedQueryHandler.isAnalyticsIndex(sqlRequest.getQuery(), QueryType.SQL)) {
242243
return false;
243244
}
245+
LOGGER.info("[{}] Routing SQL query to analytics engine", QueryContext.getRequestId());
244246
if (sqlRequest.isExplainRequest()) {
245247
unifiedQueryHandler.explain(
246248
sqlRequest.getQuery(),

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.Map;
1414
import java.util.Optional;
1515
import org.apache.calcite.rel.RelNode;
16+
import org.apache.commons.lang3.Strings;
1617
import org.apache.logging.log4j.LogManager;
1718
import org.apache.logging.log4j.Logger;
1819
import org.apache.logging.log4j.ThreadContext;
@@ -40,6 +41,7 @@
4041
import org.opensearch.sql.protocol.response.QueryResult;
4142
import org.opensearch.sql.protocol.response.format.ResponseFormatter;
4243
import org.opensearch.sql.protocol.response.format.SimpleJsonResponseFormatter;
44+
import org.opensearch.sql.utils.SystemIndexUtils;
4345
import org.opensearch.transport.client.node.NodeClient;
4446

4547
/**
@@ -95,7 +97,17 @@ public boolean isAnalyticsIndex(String query, QueryType queryType) {
9597
.equals(
9698
IndicesService.CLUSTER_PLUGGABLE_DATAFORMAT_VALUE_SETTING.get(
9799
clusterService.getSettings()))) {
98-
return true;
100+
// Analytics engine can't serve system catalog; SHOW/DESCRIBE fall back to default pipeline
101+
try (UnifiedQueryContext context = buildParsingContext(queryType)) {
102+
boolean systemCatalog =
103+
extractIndexName(query, queryType, context)
104+
.map(RestUnifiedQueryAction::isSystemCatalog)
105+
.orElse(false);
106+
return !systemCatalog;
107+
} catch (Exception e) {
108+
// Check legacy-syntax SHOW/DESCRIBE; otherwise let AE handle and surface the error.
109+
return !isLegacySystemCatalogQuery(query);
110+
}
99111
}
100112
try (UnifiedQueryContext context = buildParsingContext(queryType)) {
101113
return extractIndexName(query, queryType, context)
@@ -107,6 +119,16 @@ public boolean isAnalyticsIndex(String query, QueryType queryType) {
107119
}
108120
}
109121

122+
private static boolean isSystemCatalog(String name) {
123+
return SystemIndexUtils.isSystemIndex(name)
124+
|| SystemIndexUtils.DATASOURCES_TABLE_NAME.equals(name);
125+
}
126+
127+
private static boolean isLegacySystemCatalogQuery(String query) {
128+
String trimmed = query.trim();
129+
return Strings.CI.startsWith(trimmed, "SHOW ") || Strings.CI.startsWith(trimmed, "DESCRIBE ");
130+
}
131+
110132
private String stripSchemaPrefix(String indexName) {
111133
int lastDot = indexName.lastIndexOf('.');
112134
return lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName;

plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import java.util.Optional;
1515
import java.util.function.Supplier;
1616
import org.apache.calcite.rel.RelNode;
17+
import org.apache.logging.log4j.LogManager;
18+
import org.apache.logging.log4j.Logger;
1719
import org.opensearch.action.ActionRequest;
1820
import org.opensearch.action.support.ActionFilters;
1921
import org.opensearch.action.support.HandledTransportAction;
@@ -59,6 +61,8 @@
5961
public class TransportPPLQueryAction
6062
extends HandledTransportAction<ActionRequest, TransportPPLQueryResponse> {
6163

64+
private static final Logger LOG = LogManager.getLogger(TransportPPLQueryAction.class);
65+
6266
private final Injector injector;
6367

6468
private final Supplier<Boolean> pplEnabled;
@@ -171,6 +175,7 @@ protected void doExecute(
171175
// Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices.
172176
if (unifiedQueryHandler != null
173177
&& unifiedQueryHandler.isAnalyticsIndex(transformedRequest.getRequest(), QueryType.PPL)) {
178+
LOG.info("[{}] Routing PPL query to analytics engine", QueryContext.getRequestId());
174179
if (transformedRequest.isExplainRequest()) {
175180
unifiedQueryHandler.explain(
176181
transformedRequest.getRequest(),

plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.opensearch.cluster.service.ClusterService;
2222
import org.opensearch.common.settings.Settings;
2323
import org.opensearch.index.IndexSettings;
24+
import org.opensearch.indices.IndicesService;
2425
import org.opensearch.sql.executor.QueryType;
2526
import org.opensearch.transport.client.node.NodeClient;
2627

@@ -142,7 +143,7 @@ public void nullAndEmptyQueriesRouteToLucene() {
142143
}
143144

144145
@Test
145-
public void showStatementRoutesToLucene() {
146+
public void showStatementNotRoutedToAnalyticsEngine() {
146147
registerIndex(
147148
"parquet_logs",
148149
Settings.builder()
@@ -154,7 +155,7 @@ public void showStatementRoutesToLucene() {
154155
}
155156

156157
@Test
157-
public void describeStatementRoutesToLucene() {
158+
public void describeStatementNotRoutedToAnalyticsEngine() {
158159
registerIndex(
159160
"parquet_logs",
160161
Settings.builder()
@@ -165,6 +166,79 @@ public void describeStatementRoutesToLucene() {
165166
assertFalse(action.isAnalyticsIndex("DESCRIBE TABLES LIKE 'parquet_logs'", QueryType.SQL));
166167
}
167168

169+
@Test
170+
public void showStatementNotRoutedToAnalyticsEngineUnderClusterComposite() {
171+
enableClusterComposite();
172+
assertFalse(action.isAnalyticsIndex("SHOW TABLES LIKE 'parquet_logs'", QueryType.SQL));
173+
}
174+
175+
@Test
176+
public void describeStatementNotRoutedToAnalyticsEngineUnderClusterComposite() {
177+
enableClusterComposite();
178+
assertFalse(action.isAnalyticsIndex("DESCRIBE TABLES LIKE 'parquet_logs'", QueryType.SQL));
179+
}
180+
181+
@Test
182+
public void dataQueryStillRoutesToAnalyticsUnderClusterComposite() {
183+
enableClusterComposite();
184+
assertTrue(action.isAnalyticsIndex("SELECT * FROM parquet_logs", QueryType.SQL));
185+
}
186+
187+
@Test
188+
public void unparseableQueryRoutesToAnalyticsUnderClusterComposite() {
189+
enableClusterComposite();
190+
// malformed -> AE re-parses & reports
191+
assertTrue(action.isAnalyticsIndex("SELECT FROM WHERE", QueryType.SQL));
192+
}
193+
194+
@Test
195+
public void legacyShowNotRoutedToAnalyticsEngineUnderClusterComposite() {
196+
enableClusterComposite();
197+
// unquoted LIKE is rejected by the V2 parser, but still belongs on the default pipeline
198+
assertFalse(action.isAnalyticsIndex("SHOW TABLES LIKE %", QueryType.SQL));
199+
}
200+
201+
@Test
202+
public void legacyDescribeNotRoutedToAnalyticsEngineUnderClusterComposite() {
203+
enableClusterComposite();
204+
// legacy DESCRIBE syntax is rejected by the V2 parser, but belongs on the default pipeline
205+
assertFalse(action.isAnalyticsIndex("DESCRIBE my_index", QueryType.SQL));
206+
}
207+
208+
@Test
209+
public void pplDescribeNotRoutedToAnalyticsEngineUnderClusterComposite() {
210+
enableClusterComposite();
211+
assertFalse(action.isAnalyticsIndex("describe parquet_logs", QueryType.PPL));
212+
}
213+
214+
@Test
215+
public void pplShowDatasourcesNotRoutedToAnalyticsEngineUnderClusterComposite() {
216+
enableClusterComposite();
217+
assertFalse(action.isAnalyticsIndex("show datasources", QueryType.PPL));
218+
}
219+
220+
@Test
221+
public void pplDataQueryStillRoutesToAnalyticsUnderClusterComposite() {
222+
enableClusterComposite();
223+
assertTrue(action.isAnalyticsIndex("source = parquet_logs | fields ts", QueryType.PPL));
224+
}
225+
226+
@Test
227+
public void pplUnparseableQueryRoutesToAnalyticsUnderClusterComposite() {
228+
enableClusterComposite();
229+
// malformed -> AE re-parses & reports
230+
assertTrue(action.isAnalyticsIndex("source = parquet_logs | | fields ts", QueryType.PPL));
231+
}
232+
233+
private void enableClusterComposite() {
234+
when(clusterService.getSettings())
235+
.thenReturn(
236+
Settings.builder()
237+
.put(
238+
IndicesService.CLUSTER_PLUGGABLE_DATAFORMAT_VALUE_SETTING.getKey(), "composite")
239+
.build());
240+
}
241+
168242
private void registerIndex(String name, Settings settings) {
169243
IndexMetadata indexMetadata = mock(IndexMetadata.class);
170244
when(indexMetadata.getSettings()).thenReturn(settings);

0 commit comments

Comments
 (0)