Skip to content

Commit 829ca34

Browse files
committed
Add AnalyticsExecutionEngine and query routing for Parquet-backed indices (opensearch-project#5247)
Implements the execution engine adapter and query routing infrastructure for Project Mustang's unified query pipeline (Option B). PPL queries targeting parquet_ prefixed indices are routed through UnifiedQueryPlanner and AnalyticsExecutionEngine instead of the existing Lucene path. Key changes: - QueryPlanExecutor interface: contract for analytics engine execution - AnalyticsExecutionEngine: converts QueryPlanExecutor results to QueryResponse - RestUnifiedQueryAction: orchestrates schema building, planning, execution - RestPPLQueryAction: routing branch for parquet_ indices - StubQueryPlanExecutor: canned data for development/testing Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent fe95703 commit 829ca34

9 files changed

Lines changed: 974 additions & 1 deletion

File tree

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.executor.analytics;
7+
8+
import java.util.ArrayList;
9+
import java.util.LinkedHashMap;
10+
import java.util.List;
11+
import java.util.Map;
12+
import org.apache.calcite.plan.RelOptUtil;
13+
import org.apache.calcite.rel.RelNode;
14+
import org.apache.calcite.rel.type.RelDataType;
15+
import org.apache.calcite.rel.type.RelDataTypeField;
16+
import org.apache.calcite.sql.SqlExplainLevel;
17+
import org.opensearch.sql.ast.statement.ExplainMode;
18+
import org.opensearch.sql.calcite.CalcitePlanContext;
19+
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
20+
import org.opensearch.sql.common.response.ResponseListener;
21+
import org.opensearch.sql.data.model.ExprTupleValue;
22+
import org.opensearch.sql.data.model.ExprValue;
23+
import org.opensearch.sql.data.model.ExprValueUtils;
24+
import org.opensearch.sql.data.type.ExprType;
25+
import org.opensearch.sql.executor.ExecutionContext;
26+
import org.opensearch.sql.executor.ExecutionEngine;
27+
import org.opensearch.sql.executor.pagination.Cursor;
28+
import org.opensearch.sql.planner.physical.PhysicalPlan;
29+
30+
/**
31+
* Execution engine adapter for the analytics engine (Project Mustang).
32+
*
33+
* <p>Bridges the analytics engine's {@link QueryPlanExecutor} with the SQL plugin's {@link
34+
* ExecutionEngine} response pipeline. Takes a Calcite {@link RelNode}, delegates execution to the
35+
* analytics engine, and converts the raw results into {@link QueryResponse}.
36+
*/
37+
public class AnalyticsExecutionEngine implements ExecutionEngine {
38+
39+
private final QueryPlanExecutor planExecutor;
40+
41+
public AnalyticsExecutionEngine(QueryPlanExecutor planExecutor) {
42+
this.planExecutor = planExecutor;
43+
}
44+
45+
/** Not supported. Analytics queries use the RelNode path exclusively. */
46+
@Override
47+
public void execute(PhysicalPlan plan, ResponseListener<QueryResponse> listener) {
48+
listener.onFailure(
49+
new UnsupportedOperationException("Analytics engine only supports RelNode execution"));
50+
}
51+
52+
/** Not supported. Analytics queries use the RelNode path exclusively. */
53+
@Override
54+
public void execute(
55+
PhysicalPlan plan, ExecutionContext context, ResponseListener<QueryResponse> listener) {
56+
listener.onFailure(
57+
new UnsupportedOperationException("Analytics engine only supports RelNode execution"));
58+
}
59+
60+
/** Not supported. Analytics queries use the RelNode path exclusively. */
61+
@Override
62+
public void explain(PhysicalPlan plan, ResponseListener<ExplainResponse> listener) {
63+
listener.onFailure(
64+
new UnsupportedOperationException("Analytics engine only supports RelNode execution"));
65+
}
66+
67+
@Override
68+
public void execute(
69+
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {
70+
try {
71+
Integer querySizeLimit = context.sysLimit.querySizeLimit();
72+
Iterable<Object[]> rows = planExecutor.execute(plan, null);
73+
74+
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
75+
List<ExprValue> results = convertRows(rows, fields, querySizeLimit);
76+
Schema schema = buildSchema(fields);
77+
78+
listener.onResponse(new QueryResponse(schema, results, Cursor.None));
79+
} catch (Exception e) {
80+
listener.onFailure(e);
81+
}
82+
}
83+
84+
@Override
85+
public void explain(
86+
RelNode plan,
87+
ExplainMode mode,
88+
CalcitePlanContext context,
89+
ResponseListener<ExplainResponse> listener) {
90+
try {
91+
SqlExplainLevel level =
92+
mode == ExplainMode.SIMPLE
93+
? SqlExplainLevel.NO_ATTRIBUTES
94+
: mode == ExplainMode.COST
95+
? SqlExplainLevel.ALL_ATTRIBUTES
96+
: SqlExplainLevel.EXPPLAN_ATTRIBUTES;
97+
String logical = RelOptUtil.toString(plan, level);
98+
ExplainResponseNodeV2 nodeV2 = new ExplainResponseNodeV2(logical, null, null);
99+
listener.onResponse(new ExplainResponse(nodeV2));
100+
} catch (Exception e) {
101+
listener.onFailure(e);
102+
}
103+
}
104+
105+
private List<ExprValue> convertRows(
106+
Iterable<Object[]> rows, List<RelDataTypeField> fields, Integer querySizeLimit) {
107+
List<ExprValue> results = new ArrayList<>();
108+
for (Object[] row : rows) {
109+
if (querySizeLimit != null && results.size() >= querySizeLimit) {
110+
break;
111+
}
112+
Map<String, ExprValue> valueMap = new LinkedHashMap<>();
113+
for (int i = 0; i < fields.size(); i++) {
114+
String columnName = fields.get(i).getName();
115+
Object value = (i < row.length) ? row[i] : null;
116+
valueMap.put(columnName, ExprValueUtils.fromObjectValue(value));
117+
}
118+
results.add(ExprTupleValue.fromExprValueMap(valueMap));
119+
}
120+
return results;
121+
}
122+
123+
private Schema buildSchema(List<RelDataTypeField> fields) {
124+
List<Schema.Column> columns = new ArrayList<>();
125+
for (RelDataTypeField field : fields) {
126+
ExprType exprType = convertType(field.getType());
127+
columns.add(new Schema.Column(field.getName(), null, exprType));
128+
}
129+
return new Schema(columns);
130+
}
131+
132+
private ExprType convertType(RelDataType type) {
133+
try {
134+
return OpenSearchTypeFactory.convertRelDataTypeToExprType(type);
135+
} catch (IllegalArgumentException e) {
136+
return org.opensearch.sql.data.type.ExprCoreType.UNKNOWN;
137+
}
138+
}
139+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.executor.analytics;
7+
8+
import org.apache.calcite.rel.RelNode;
9+
10+
/**
11+
* Executes a Calcite {@link RelNode} logical plan against the analytics engine.
12+
*
13+
* <p>This is a local equivalent of {@code org.opensearch.analytics.exec.QueryPlanExecutor} from the
14+
* analytics-framework library. It will be replaced by the upstream interface once the
15+
* analytics-framework JAR is published.
16+
*
17+
* @see <a
18+
* href="https://github.com/opensearch-project/OpenSearch/blob/9142d0e789c6a6c4708f1bc015745ed55202eefe/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/exec/QueryPlanExecutor.java">Upstream
19+
* QueryPlanExecutor</a>
20+
*/
21+
@FunctionalInterface
22+
public interface QueryPlanExecutor {
23+
24+
/**
25+
* Executes the given logical plan and returns result rows.
26+
*
27+
* @param plan the Calcite RelNode subtree to execute
28+
* @param context execution context (opaque to avoid server dependency)
29+
* @return rows produced by the engine
30+
*/
31+
Iterable<Object[]> execute(RelNode plan, Object context);
32+
}

0 commit comments

Comments
 (0)