Skip to content

Commit cbc0ca7

Browse files
committed
Route analytics queries by index setting, not table-name prefix
Today `RestUnifiedQueryAction.isAnalyticsIndex` dispatches to the analytics engine when the source index name starts with `parquet_`. That's brittle — it conflates naming convention with storage type. An index created without the prefix but with pluggable dataformat enabled is silently sent to the Lucene path; an index named `parquet_foo` without the setting is mis-dispatched to analytics. Use the authoritative signal instead: the `index.pluggable.dataformat.enabled` setting on cluster-state metadata. This is the same setting integration tests (`CoordinatorReduceIT`, `CompositeCommitDeletionIT`, etc.) already use to create analytics-backed indices, and it's what `FieldStorageResolver` reads to decide field-level storage. Behavior: - `index.pluggable.dataformat.enabled=true` → analytics engine (DataFusion) - flag absent / false / index missing → Calcite→OpenSearch DSL path Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
1 parent e113b7f commit cbc0ca7

2 files changed

Lines changed: 69 additions & 15 deletions

File tree

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,14 @@ public RestUnifiedQueryAction(
6969
this.analyticsEngine = new AnalyticsExecutionEngine(planExecutor);
7070
}
7171

72+
static final String PLUGGABLE_DATAFORMAT_ENABLED_SETTING = "index.pluggable.dataformat.enabled";
73+
7274
/**
73-
* Check if the query targets an analytics engine index (e.g., Parquet-backed). Uses the context's
74-
* parser for index name extraction, supporting both PPL and SQL.
75+
* Check if the query targets an analytics-engine-backed index. Parses the query to extract the
76+
* source index name, then reads the cluster-state metadata for that index to see if {@code
77+
* index.pluggable.dataformat.enabled} is set. When the flag is on, the index is backed by a
78+
* pluggable columnar format (e.g. Parquet) and benefits from DataFusion execution; otherwise the
79+
* query falls through to the sql-plugin's Calcite→OpenSearch DSL path.
7580
*
7681
* <p>Note: This creates a separate UnifiedQueryContext for parsing. The context cannot be shared
7782
* with doExecute/doExplain because UnifiedQueryContext holds a Calcite JDBC connection that fails
@@ -84,18 +89,27 @@ public boolean isAnalyticsIndex(String query, QueryType queryType) {
8489
}
8590
try (UnifiedQueryContext context = buildParsingContext(queryType)) {
8691
return extractIndexName(query, queryType, context)
87-
.map(
88-
indexName -> {
89-
int lastDot = indexName.lastIndexOf('.');
90-
return lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName;
91-
})
92-
.map(tableName -> tableName.startsWith("parquet_"))
92+
.map(this::stripSchemaPrefix)
93+
.map(this::isPluggableDataformatIndex)
9394
.orElse(false);
9495
} catch (Exception e) {
9596
return false;
9697
}
9798
}
9899

100+
private String stripSchemaPrefix(String indexName) {
101+
int lastDot = indexName.lastIndexOf('.');
102+
return lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName;
103+
}
104+
105+
private boolean isPluggableDataformatIndex(String indexName) {
106+
var indexMetadata = clusterService.state().metadata().index(indexName);
107+
if (indexMetadata == null) {
108+
return false;
109+
}
110+
return indexMetadata.getSettings().getAsBoolean(PLUGGABLE_DATAFORMAT_ENABLED_SETTING, false);
111+
}
112+
99113
/** Execute a query through the unified query pipeline on the sql-worker thread pool. */
100114
public void execute(
101115
String query,

plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,42 +8,82 @@
88
import static org.junit.Assert.assertFalse;
99
import static org.junit.Assert.assertTrue;
1010
import static org.mockito.Mockito.mock;
11+
import static org.mockito.Mockito.when;
1112

1213
import org.apache.calcite.rel.RelNode;
1314
import org.junit.Before;
1415
import org.junit.Test;
1516
import org.opensearch.analytics.exec.QueryPlanExecutor;
17+
import org.opensearch.cluster.ClusterState;
18+
import org.opensearch.cluster.metadata.IndexMetadata;
19+
import org.opensearch.cluster.metadata.Metadata;
1620
import org.opensearch.cluster.service.ClusterService;
21+
import org.opensearch.common.settings.Settings;
1722
import org.opensearch.sql.executor.QueryType;
1823
import org.opensearch.transport.client.node.NodeClient;
1924

2025
/**
21-
* Tests for analytics index routing in RestUnifiedQueryAction. Uses context parser for AST-based
22-
* index name extraction.
26+
* Tests for analytics index routing in RestUnifiedQueryAction. Routing is driven by the {@code
27+
* index.pluggable.dataformat.enabled} index setting, read from cluster state.
2328
*/
2429
public class RestUnifiedQueryActionTest {
2530

31+
private ClusterService clusterService;
32+
private Metadata metadata;
2633
private RestUnifiedQueryAction action;
2734

2835
@Before
2936
public void setUp() {
37+
clusterService = mock(ClusterService.class);
38+
ClusterState clusterState = mock(ClusterState.class);
39+
metadata = mock(Metadata.class);
40+
when(clusterService.state()).thenReturn(clusterState);
41+
when(clusterState.metadata()).thenReturn(metadata);
42+
3043
@SuppressWarnings("unchecked")
3144
QueryPlanExecutor<RelNode, Iterable<Object[]>> executor = mock(QueryPlanExecutor.class);
32-
action =
33-
new RestUnifiedQueryAction(mock(NodeClient.class), mock(ClusterService.class), executor);
45+
action = new RestUnifiedQueryAction(mock(NodeClient.class), clusterService, executor);
3446
}
3547

3648
@Test
37-
public void parquetIndexRoutesToAnalytics() {
49+
public void pluggableDataformatIndexRoutesToAnalytics() {
50+
registerIndex(
51+
"parquet_logs", Settings.builder().put("index.pluggable.dataformat.enabled", true).build());
52+
3853
assertTrue(action.isAnalyticsIndex("source = parquet_logs | fields ts", QueryType.PPL));
3954
assertTrue(
4055
action.isAnalyticsIndex("source = opensearch.parquet_logs | fields ts", QueryType.PPL));
4156
}
4257

4358
@Test
44-
public void nonParquetIndexRoutesToLucene() {
45-
assertFalse(action.isAnalyticsIndex("source = my_logs | fields ts", QueryType.PPL));
59+
public void indexWithoutSettingRoutesToLucene() {
60+
registerIndex("plain_logs", Settings.EMPTY);
61+
62+
assertFalse(action.isAnalyticsIndex("source = plain_logs | fields ts", QueryType.PPL));
63+
}
64+
65+
@Test
66+
public void indexWithSettingFalseRoutesToLucene() {
67+
registerIndex(
68+
"plain_logs", Settings.builder().put("index.pluggable.dataformat.enabled", false).build());
69+
70+
assertFalse(action.isAnalyticsIndex("source = plain_logs | fields ts", QueryType.PPL));
71+
}
72+
73+
@Test
74+
public void missingIndexRoutesToLucene() {
75+
assertFalse(action.isAnalyticsIndex("source = does_not_exist | fields ts", QueryType.PPL));
76+
}
77+
78+
@Test
79+
public void nullAndEmptyQueriesRouteToLucene() {
4680
assertFalse(action.isAnalyticsIndex(null, QueryType.PPL));
4781
assertFalse(action.isAnalyticsIndex("", QueryType.PPL));
4882
}
83+
84+
private void registerIndex(String name, Settings settings) {
85+
IndexMetadata indexMetadata = mock(IndexMetadata.class);
86+
when(indexMetadata.getSettings()).thenReturn(settings);
87+
when(metadata.index(name)).thenReturn(indexMetadata);
88+
}
4989
}

0 commit comments

Comments
 (0)