Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.opensearch.script.ScriptService;
import org.opensearch.sql.ast.statement.ExplainMode;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.common.utils.QueryContext;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelper;
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
Expand Down Expand Up @@ -241,6 +242,7 @@ private BiFunction<SQLQueryRequest, RestChannel, Boolean> createSqlAnalyticsRout
|| !unifiedQueryHandler.isAnalyticsIndex(sqlRequest.getQuery(), QueryType.SQL)) {
return false;
}
LOGGER.info("[{}] Routing SQL query to analytics engine", QueryContext.getRequestId());
if (sqlRequest.isExplainRequest()) {
unifiedQueryHandler.explain(
sqlRequest.getQuery(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.Map;
import java.util.Optional;
import org.apache.calcite.rel.RelNode;
import org.apache.commons.lang3.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;
Expand Down Expand Up @@ -40,6 +41,7 @@
import org.opensearch.sql.protocol.response.QueryResult;
import org.opensearch.sql.protocol.response.format.ResponseFormatter;
import org.opensearch.sql.protocol.response.format.SimpleJsonResponseFormatter;
import org.opensearch.sql.utils.SystemIndexUtils;
import org.opensearch.transport.client.node.NodeClient;

/**
Expand Down Expand Up @@ -95,7 +97,17 @@ public boolean isAnalyticsIndex(String query, QueryType queryType) {
.equals(
IndicesService.CLUSTER_PLUGGABLE_DATAFORMAT_VALUE_SETTING.get(
clusterService.getSettings()))) {
return true;
// Analytics engine can't serve system catalog; SHOW/DESCRIBE fall back to default pipeline
try (UnifiedQueryContext context = buildParsingContext(queryType)) {
boolean systemCatalog =
extractIndexName(query, queryType, context)
.map(RestUnifiedQueryAction::isSystemCatalog)
.orElse(false);
return !systemCatalog;
} catch (Exception e) {
// Check legacy-syntax SHOW/DESCRIBE; otherwise let AE handle and surface the error.
return !isLegacySystemCatalogQuery(query);
}
}
try (UnifiedQueryContext context = buildParsingContext(queryType)) {
return extractIndexName(query, queryType, context)
Expand All @@ -107,6 +119,16 @@ public boolean isAnalyticsIndex(String query, QueryType queryType) {
}
}

private static boolean isSystemCatalog(String name) {
return SystemIndexUtils.isSystemIndex(name)
|| SystemIndexUtils.DATASOURCES_TABLE_NAME.equals(name);
}

private static boolean isLegacySystemCatalogQuery(String query) {
String trimmed = query.trim();
return Strings.CI.startsWith(trimmed, "SHOW ") || Strings.CI.startsWith(trimmed, "DESCRIBE ");
}

private String stripSchemaPrefix(String indexName) {
int lastDot = indexName.lastIndexOf('.');
return lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.calcite.rel.RelNode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
Expand Down Expand Up @@ -59,6 +61,8 @@
public class TransportPPLQueryAction
extends HandledTransportAction<ActionRequest, TransportPPLQueryResponse> {

private static final Logger LOG = LogManager.getLogger(TransportPPLQueryAction.class);

private final Injector injector;

private final Supplier<Boolean> pplEnabled;
Expand Down Expand Up @@ -171,6 +175,7 @@ protected void doExecute(
// Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices.
if (unifiedQueryHandler != null
&& unifiedQueryHandler.isAnalyticsIndex(transformedRequest.getRequest(), QueryType.PPL)) {
LOG.info("[{}] Routing PPL query to analytics engine", QueryContext.getRequestId());
if (transformedRequest.isExplainRequest()) {
unifiedQueryHandler.explain(
transformedRequest.getRequest(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.IndicesService;
import org.opensearch.sql.executor.QueryType;
import org.opensearch.transport.client.node.NodeClient;

Expand Down Expand Up @@ -142,7 +143,7 @@ public void nullAndEmptyQueriesRouteToLucene() {
}

@Test
public void showStatementRoutesToLucene() {
public void showStatementNotRoutedToAnalyticsEngine() {
registerIndex(
"parquet_logs",
Settings.builder()
Expand All @@ -154,7 +155,7 @@ public void showStatementRoutesToLucene() {
}

@Test
public void describeStatementRoutesToLucene() {
public void describeStatementNotRoutedToAnalyticsEngine() {
registerIndex(
"parquet_logs",
Settings.builder()
Expand All @@ -165,6 +166,79 @@ public void describeStatementRoutesToLucene() {
assertFalse(action.isAnalyticsIndex("DESCRIBE TABLES LIKE 'parquet_logs'", QueryType.SQL));
}

@Test
public void showStatementNotRoutedToAnalyticsEngineUnderClusterComposite() {
enableClusterComposite();
assertFalse(action.isAnalyticsIndex("SHOW TABLES LIKE 'parquet_logs'", QueryType.SQL));
}

@Test
public void describeStatementNotRoutedToAnalyticsEngineUnderClusterComposite() {
enableClusterComposite();
assertFalse(action.isAnalyticsIndex("DESCRIBE TABLES LIKE 'parquet_logs'", QueryType.SQL));
}

@Test
public void dataQueryStillRoutesToAnalyticsUnderClusterComposite() {
enableClusterComposite();
assertTrue(action.isAnalyticsIndex("SELECT * FROM parquet_logs", QueryType.SQL));
}

@Test
public void unparseableQueryRoutesToAnalyticsUnderClusterComposite() {
enableClusterComposite();
// malformed -> AE re-parses & reports
assertTrue(action.isAnalyticsIndex("SELECT FROM WHERE", QueryType.SQL));
}

@Test
public void legacyShowNotRoutedToAnalyticsEngineUnderClusterComposite() {
enableClusterComposite();
// unquoted LIKE is rejected by the V2 parser, but still belongs on the default pipeline
assertFalse(action.isAnalyticsIndex("SHOW TABLES LIKE %", QueryType.SQL));
}

@Test
public void legacyDescribeNotRoutedToAnalyticsEngineUnderClusterComposite() {
enableClusterComposite();
// legacy DESCRIBE syntax is rejected by the V2 parser, but belongs on the default pipeline
assertFalse(action.isAnalyticsIndex("DESCRIBE my_index", QueryType.SQL));
}

@Test
public void pplDescribeNotRoutedToAnalyticsEngineUnderClusterComposite() {
enableClusterComposite();
assertFalse(action.isAnalyticsIndex("describe parquet_logs", QueryType.PPL));
}

@Test
public void pplShowDatasourcesNotRoutedToAnalyticsEngineUnderClusterComposite() {
enableClusterComposite();
assertFalse(action.isAnalyticsIndex("show datasources", QueryType.PPL));
}

@Test
public void pplDataQueryStillRoutesToAnalyticsUnderClusterComposite() {
enableClusterComposite();
assertTrue(action.isAnalyticsIndex("source = parquet_logs | fields ts", QueryType.PPL));
}

@Test
public void pplUnparseableQueryRoutesToAnalyticsUnderClusterComposite() {
enableClusterComposite();
// malformed -> AE re-parses & reports
assertTrue(action.isAnalyticsIndex("source = parquet_logs | | fields ts", QueryType.PPL));
}

private void enableClusterComposite() {
when(clusterService.getSettings())
.thenReturn(
Settings.builder()
.put(
IndicesService.CLUSTER_PLUGGABLE_DATAFORMAT_VALUE_SETTING.getKey(), "composite")
.build());
}

private void registerIndex(String name, Settings settings) {
IndexMetadata indexMetadata = mock(IndexMetadata.class);
when(indexMetadata.getSettings()).thenReturn(settings);
Expand Down
Loading