Skip to content

Commit f2bc431

Browse files
committed
Route analytics queries by index setting, not table-name prefix
Today `RestUnifiedQueryAction.isAnalyticsIndex` dispatches to the analytics engine when the source index name starts with `parquet_`. That's brittle — it conflates naming convention with storage type. An index created without the prefix but with pluggable dataformat enabled is silently sent to the Lucene path; an index named `parquet_foo` without the setting is mis-dispatched to analytics. Use the authoritative signal instead: the `index.pluggable.dataformat.enabled` setting on cluster-state metadata. This is the same setting integration tests (`CoordinatorReduceIT`, `CompositeCommitDeletionIT`, etc.) already use to create analytics-backed indices, and it's what `FieldStorageResolver` reads to decide field-level storage. Behavior: - `index.pluggable.dataformat.enabled=true` → analytics engine (DataFusion) - flag absent / false / index missing → Calcite→OpenSearch DSL path Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
1 parent 109375d commit f2bc431

2 files changed

Lines changed: 72 additions & 15 deletions

File tree

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,14 @@ public RestUnifiedQueryAction(
7272
this.pluginSettings = pluginSettings;
7373
}
7474

75+
static final String PLUGGABLE_DATAFORMAT_ENABLED_SETTING = "index.pluggable.dataformat.enabled";
76+
7577
/**
76-
* Check if the query targets an analytics engine index (e.g., Parquet-backed). Uses the context's
77-
* parser for index name extraction, supporting both PPL and SQL.
78+
* Check if the query targets an analytics-engine-backed index. Parses the query to extract the
79+
* source index name, then reads the cluster-state metadata for that index to see if {@code
80+
* index.pluggable.dataformat.enabled} is set. When the flag is on, the index is backed by a
81+
* pluggable columnar format (e.g. Parquet) and benefits from DataFusion execution; otherwise the
82+
* query falls through to the sql-plugin's Calcite→OpenSearch DSL path.
7883
*
7984
* <p>Note: This creates a separate UnifiedQueryContext for parsing. The context cannot be shared
8085
* with doExecute/doExplain because UnifiedQueryContext holds a Calcite JDBC connection that fails
@@ -87,18 +92,27 @@ public boolean isAnalyticsIndex(String query, QueryType queryType) {
8792
}
8893
try (UnifiedQueryContext context = buildParsingContext(queryType)) {
8994
return extractIndexName(query, queryType, context)
90-
.map(
91-
indexName -> {
92-
int lastDot = indexName.lastIndexOf('.');
93-
return lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName;
94-
})
95-
.map(tableName -> tableName.startsWith("parquet_"))
95+
.map(this::stripSchemaPrefix)
96+
.map(this::isPluggableDataformatIndex)
9697
.orElse(false);
9798
} catch (Exception e) {
9899
return false;
99100
}
100101
}
101102

103+
private String stripSchemaPrefix(String indexName) {
104+
int lastDot = indexName.lastIndexOf('.');
105+
return lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName;
106+
}
107+
108+
private boolean isPluggableDataformatIndex(String indexName) {
109+
var indexMetadata = clusterService.state().metadata().index(indexName);
110+
if (indexMetadata == null) {
111+
return false;
112+
}
113+
return indexMetadata.getSettings().getAsBoolean(PLUGGABLE_DATAFORMAT_ENABLED_SETTING, false);
114+
}
115+
102116
/** Execute a query through the unified query pipeline on the sql-worker thread pool. */
103117
public void execute(
104118
String query,

plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,44 +8,87 @@
88
import static org.junit.Assert.assertFalse;
99
import static org.junit.Assert.assertTrue;
1010
import static org.mockito.Mockito.mock;
11+
import static org.mockito.Mockito.when;
1112

1213
import org.apache.calcite.rel.RelNode;
1314
import org.junit.Before;
1415
import org.junit.Test;
1516
import org.opensearch.analytics.exec.QueryPlanExecutor;
17+
import org.opensearch.cluster.ClusterState;
18+
import org.opensearch.cluster.metadata.IndexMetadata;
19+
import org.opensearch.cluster.metadata.Metadata;
1620
import org.opensearch.cluster.service.ClusterService;
17-
import org.opensearch.sql.common.setting.Settings;
21+
import org.opensearch.common.settings.Settings;
1822
import org.opensearch.sql.executor.QueryType;
1923
import org.opensearch.transport.client.node.NodeClient;
2024

2125
/**
22-
* Tests for analytics index routing in RestUnifiedQueryAction. Uses context parser for AST-based
23-
* index name extraction.
26+
* Tests for analytics index routing in RestUnifiedQueryAction. Routing is driven by the {@code
27+
* index.pluggable.dataformat.enabled} index setting, read from cluster state.
2428
*/
2529
public class RestUnifiedQueryActionTest {
2630

31+
private ClusterService clusterService;
32+
private Metadata metadata;
2733
private RestUnifiedQueryAction action;
2834

2935
@Before
3036
public void setUp() {
37+
clusterService = mock(ClusterService.class);
38+
ClusterState clusterState = mock(ClusterState.class);
39+
metadata = mock(Metadata.class);
40+
when(clusterService.state()).thenReturn(clusterState);
41+
when(clusterState.metadata()).thenReturn(metadata);
42+
3143
@SuppressWarnings("unchecked")
3244
QueryPlanExecutor<RelNode, Iterable<Object[]>> executor = mock(QueryPlanExecutor.class);
3345
action =
3446
new RestUnifiedQueryAction(
35-
mock(NodeClient.class), mock(ClusterService.class), executor, mock(Settings.class));
47+
mock(NodeClient.class),
48+
clusterService,
49+
executor,
50+
mock(org.opensearch.sql.common.setting.Settings.class));
3651
}
3752

3853
@Test
39-
public void parquetIndexRoutesToAnalytics() {
54+
public void pluggableDataformatIndexRoutesToAnalytics() {
55+
registerIndex(
56+
"parquet_logs", Settings.builder().put("index.pluggable.dataformat.enabled", true).build());
57+
4058
assertTrue(action.isAnalyticsIndex("source = parquet_logs | fields ts", QueryType.PPL));
4159
assertTrue(
4260
action.isAnalyticsIndex("source = opensearch.parquet_logs | fields ts", QueryType.PPL));
4361
}
4462

4563
@Test
46-
public void nonParquetIndexRoutesToLucene() {
47-
assertFalse(action.isAnalyticsIndex("source = my_logs | fields ts", QueryType.PPL));
64+
public void indexWithoutSettingRoutesToLucene() {
65+
registerIndex("plain_logs", Settings.EMPTY);
66+
67+
assertFalse(action.isAnalyticsIndex("source = plain_logs | fields ts", QueryType.PPL));
68+
}
69+
70+
@Test
71+
public void indexWithSettingFalseRoutesToLucene() {
72+
registerIndex(
73+
"plain_logs", Settings.builder().put("index.pluggable.dataformat.enabled", false).build());
74+
75+
assertFalse(action.isAnalyticsIndex("source = plain_logs | fields ts", QueryType.PPL));
76+
}
77+
78+
@Test
79+
public void missingIndexRoutesToLucene() {
80+
assertFalse(action.isAnalyticsIndex("source = does_not_exist | fields ts", QueryType.PPL));
81+
}
82+
83+
@Test
84+
public void nullAndEmptyQueriesRouteToLucene() {
4885
assertFalse(action.isAnalyticsIndex(null, QueryType.PPL));
4986
assertFalse(action.isAnalyticsIndex("", QueryType.PPL));
5087
}
88+
89+
private void registerIndex(String name, Settings settings) {
90+
IndexMetadata indexMetadata = mock(IndexMetadata.class);
91+
when(indexMetadata.getSettings()).thenReturn(settings);
92+
when(metadata.index(name)).thenReturn(indexMetadata);
93+
}
5194
}

0 commit comments

Comments
 (0)