Skip to content

Commit 7f221e1

Browse files
committed
Route analytics queries by index setting, not table-name prefix
Today `RestUnifiedQueryAction.isAnalyticsIndex` dispatches to the analytics engine when the source index name starts with `parquet_`. That's brittle — it conflates naming convention with storage type. An index created without the prefix but with pluggable dataformat enabled is silently sent to the Lucene path; an index named `parquet_foo` without the setting is mis-dispatched to analytics. Use the authoritative signal instead: the `index.pluggable.dataformat.enabled` setting on cluster-state metadata. This is the same setting integration tests (`CoordinatorReduceIT`, `CompositeCommitDeletionIT`, etc.) already use to create analytics-backed indices, and it's what `FieldStorageResolver` reads to decide field-level storage. Behavior: - `index.pluggable.dataformat.enabled=true` → analytics engine (DataFusion) - flag absent / false / index missing → Calcite→OpenSearch DSL path Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
1 parent 109375d commit 7f221e1

2 files changed

Lines changed: 63 additions & 15 deletions

File tree

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.opensearch.cluster.service.ClusterService;
2828
import org.opensearch.common.unit.TimeValue;
2929
import org.opensearch.core.action.ActionListener;
30+
import org.opensearch.index.IndexSettings;
3031
import org.opensearch.sql.api.UnifiedQueryContext;
3132
import org.opensearch.sql.api.UnifiedQueryPlanner;
3233
import org.opensearch.sql.ast.AbstractNodeVisitor;
@@ -73,8 +74,9 @@ public RestUnifiedQueryAction(
7374
}
7475

7576
/**
76-
* Check if the query targets an analytics engine index (e.g., Parquet-backed). Uses the context's
77-
* parser for index name extraction, supporting both PPL and SQL.
77+
* Returns true iff the target index has {@link
78+
* IndexSettings#PLUGGABLE_DATAFORMAT_ENABLED_SETTING} set, routing it to DataFusion instead of
79+
* the Calcite→DSL path.
7880
*
7981
* <p>Note: This creates a separate UnifiedQueryContext for parsing. The context cannot be shared
8082
* with doExecute/doExplain because UnifiedQueryContext holds a Calcite JDBC connection that fails
@@ -87,18 +89,25 @@ public boolean isAnalyticsIndex(String query, QueryType queryType) {
8789
}
8890
try (UnifiedQueryContext context = buildParsingContext(queryType)) {
8991
return extractIndexName(query, queryType, context)
90-
.map(
91-
indexName -> {
92-
int lastDot = indexName.lastIndexOf('.');
93-
return lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName;
94-
})
95-
.map(tableName -> tableName.startsWith("parquet_"))
92+
.map(this::stripSchemaPrefix)
93+
.map(this::isPluggableDataformatIndex)
9694
.orElse(false);
9795
} catch (Exception e) {
9896
return false;
9997
}
10098
}
10199

100+
private String stripSchemaPrefix(String indexName) {
101+
int lastDot = indexName.lastIndexOf('.');
102+
return lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName;
103+
}
104+
105+
private boolean isPluggableDataformatIndex(String indexName) {
106+
var indexMetadata = clusterService.state().metadata().index(indexName);
107+
return indexMetadata != null
108+
&& IndexSettings.PLUGGABLE_DATAFORMAT_ENABLED_SETTING.get(indexMetadata.getSettings());
109+
}
110+
102111
/** Execute a query through the unified query pipeline on the sql-worker thread pool. */
103112
public void execute(
104113
String query,

plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,44 +8,83 @@
88
import static org.junit.Assert.assertFalse;
99
import static org.junit.Assert.assertTrue;
1010
import static org.mockito.Mockito.mock;
11+
import static org.mockito.Mockito.when;
1112

1213
import org.apache.calcite.rel.RelNode;
1314
import org.junit.Before;
1415
import org.junit.Test;
1516
import org.opensearch.analytics.exec.QueryPlanExecutor;
17+
import org.opensearch.cluster.ClusterState;
18+
import org.opensearch.cluster.metadata.IndexMetadata;
19+
import org.opensearch.cluster.metadata.Metadata;
1620
import org.opensearch.cluster.service.ClusterService;
17-
import org.opensearch.sql.common.setting.Settings;
21+
import org.opensearch.common.settings.Settings;
22+
import org.opensearch.index.IndexSettings;
1823
import org.opensearch.sql.executor.QueryType;
1924
import org.opensearch.transport.client.node.NodeClient;
2025

2126
/**
22-
* Tests for analytics index routing in RestUnifiedQueryAction. Uses context parser for AST-based
23-
* index name extraction.
27+
* Tests for analytics index routing in RestUnifiedQueryAction. Routing is driven by the {@code
28+
* index.pluggable.dataformat.enabled} index setting, read from cluster state.
2429
*/
2530
public class RestUnifiedQueryActionTest {
2631

32+
private ClusterService clusterService;
33+
private Metadata metadata;
2734
private RestUnifiedQueryAction action;
2835

2936
@Before
3037
public void setUp() {
38+
clusterService = mock(ClusterService.class);
39+
ClusterState clusterState = mock(ClusterState.class);
40+
metadata = mock(Metadata.class);
41+
when(clusterService.state()).thenReturn(clusterState);
42+
when(clusterState.metadata()).thenReturn(metadata);
43+
3144
@SuppressWarnings("unchecked")
3245
QueryPlanExecutor<RelNode, Iterable<Object[]>> executor = mock(QueryPlanExecutor.class);
3346
action =
3447
new RestUnifiedQueryAction(
35-
mock(NodeClient.class), mock(ClusterService.class), executor, mock(Settings.class));
48+
mock(NodeClient.class),
49+
clusterService,
50+
executor,
51+
mock(org.opensearch.sql.common.setting.Settings.class));
3652
}
3753

3854
@Test
39-
public void parquetIndexRoutesToAnalytics() {
55+
public void pluggableDataformatIndexRoutesToAnalytics() {
56+
registerIndex(
57+
"parquet_logs",
58+
Settings.builder()
59+
.put(IndexSettings.PLUGGABLE_DATAFORMAT_ENABLED_SETTING.getKey(), true)
60+
.build());
61+
4062
assertTrue(action.isAnalyticsIndex("source = parquet_logs | fields ts", QueryType.PPL));
4163
assertTrue(
4264
action.isAnalyticsIndex("source = opensearch.parquet_logs | fields ts", QueryType.PPL));
4365
}
4466

4567
@Test
46-
public void nonParquetIndexRoutesToLucene() {
47-
assertFalse(action.isAnalyticsIndex("source = my_logs | fields ts", QueryType.PPL));
68+
public void indexWithoutSettingRoutesToLucene() {
69+
registerIndex("plain_logs", Settings.EMPTY);
70+
71+
assertFalse(action.isAnalyticsIndex("source = plain_logs | fields ts", QueryType.PPL));
72+
}
73+
74+
@Test
75+
public void missingIndexRoutesToLucene() {
76+
assertFalse(action.isAnalyticsIndex("source = does_not_exist | fields ts", QueryType.PPL));
77+
}
78+
79+
@Test
80+
public void nullAndEmptyQueriesRouteToLucene() {
4881
assertFalse(action.isAnalyticsIndex(null, QueryType.PPL));
4982
assertFalse(action.isAnalyticsIndex("", QueryType.PPL));
5083
}
84+
85+
private void registerIndex(String name, Settings settings) {
86+
IndexMetadata indexMetadata = mock(IndexMetadata.class);
87+
when(indexMetadata.getSettings()).thenReturn(settings);
88+
when(metadata.index(name)).thenReturn(indexMetadata);
89+
}
5190
}

0 commit comments

Comments
 (0)