|
39 | 39 | import org.opensearch.sql.executor.analytics.AnalyticsExecutionEngine; |
40 | 40 | import org.opensearch.sql.lang.LangSpec; |
41 | 41 | import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; |
42 | | -import org.opensearch.sql.ppl.domain.PPLQueryRequest; |
43 | 42 | import org.opensearch.sql.protocol.response.QueryResult; |
44 | 43 | import org.opensearch.sql.protocol.response.format.ResponseFormatter; |
45 | 44 | import org.opensearch.sql.protocol.response.format.SimpleJsonResponseFormatter; |
@@ -99,70 +98,56 @@ public boolean isAnalyticsIndex(String query, QueryType queryType) { |
99 | 98 | public void execute( |
100 | 99 | String query, |
101 | 100 | QueryType queryType, |
102 | | - PPLQueryRequest pplRequest, |
| 101 | + boolean profiling, |
103 | 102 | ActionListener<TransportPPLQueryResponse> listener) { |
104 | 103 | client |
105 | 104 | .threadPool() |
106 | 105 | .schedule( |
107 | | - withCurrentContext(() -> doExecute(query, queryType, pplRequest, listener)), |
| 106 | + withCurrentContext( |
| 107 | + () -> { |
| 108 | + try (UnifiedQueryContext context = buildContext(queryType, profiling)) { |
| 109 | + UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); |
| 110 | + RelNode plan = planner.plan(query); |
| 111 | + CalcitePlanContext planContext = context.getPlanContext(); |
| 112 | + plan = addQuerySizeLimit(plan, planContext); |
| 113 | + analyticsEngine.execute( |
| 114 | + plan, planContext, createQueryListener(queryType, listener)); |
| 115 | + } catch (Exception e) { |
| 116 | + listener.onFailure(e); |
| 117 | + } |
| 118 | + }), |
108 | 119 | new TimeValue(0), |
109 | 120 | SQL_WORKER_THREAD_POOL_NAME); |
110 | 121 | } |
111 | 122 |
|
112 | 123 | /** |
113 | 124 | * Explain a query through the unified query pipeline on the sql-worker thread pool. Returns |
114 | | - * ExplainResponse via ResponseListener so the caller (TransportPPLQueryAction) can format it |
115 | | - * using its own createExplainResponseListener. |
| 125 | + * ExplainResponse via ResponseListener so the caller can format it. |
116 | 126 | */ |
117 | 127 | public void explain( |
118 | 128 | String query, |
119 | 129 | QueryType queryType, |
120 | | - PPLQueryRequest pplRequest, |
| 130 | + ExplainMode mode, |
121 | 131 | ResponseListener<ExplainResponse> listener) { |
122 | 132 | client |
123 | 133 | .threadPool() |
124 | 134 | .schedule( |
125 | | - withCurrentContext(() -> doExplain(query, queryType, pplRequest, listener)), |
| 135 | + withCurrentContext( |
| 136 | + () -> { |
| 137 | + try (UnifiedQueryContext context = buildContext(queryType, false)) { |
| 138 | + UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); |
| 139 | + RelNode plan = planner.plan(query); |
| 140 | + CalcitePlanContext planContext = context.getPlanContext(); |
| 141 | + plan = addQuerySizeLimit(plan, planContext); |
| 142 | + analyticsEngine.explain(plan, mode, planContext, listener); |
| 143 | + } catch (Exception e) { |
| 144 | + listener.onFailure(e); |
| 145 | + } |
| 146 | + }), |
126 | 147 | new TimeValue(0), |
127 | 148 | SQL_WORKER_THREAD_POOL_NAME); |
128 | 149 | } |
129 | 150 |
|
130 | | - private void doExecute( |
131 | | - String query, |
132 | | - QueryType queryType, |
133 | | - PPLQueryRequest pplRequest, |
134 | | - ActionListener<TransportPPLQueryResponse> listener) { |
135 | | - try (UnifiedQueryContext context = buildContext(queryType, pplRequest.profile())) { |
136 | | - UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); |
137 | | - RelNode plan = planner.plan(query); |
138 | | - |
139 | | - CalcitePlanContext planContext = context.getPlanContext(); |
140 | | - plan = addQuerySizeLimit(plan, planContext); |
141 | | - |
142 | | - analyticsEngine.execute(plan, planContext, createQueryListener(queryType, listener)); |
143 | | - } catch (Exception e) { |
144 | | - listener.onFailure(e); |
145 | | - } |
146 | | - } |
147 | | - |
148 | | - private void doExplain( |
149 | | - String query, |
150 | | - QueryType queryType, |
151 | | - PPLQueryRequest pplRequest, |
152 | | - ResponseListener<ExplainResponse> listener) { |
153 | | - try (UnifiedQueryContext context = buildContext(queryType, pplRequest.profile())) { |
154 | | - UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); |
155 | | - RelNode plan = planner.plan(query); |
156 | | - |
157 | | - CalcitePlanContext planContext = context.getPlanContext(); |
158 | | - plan = addQuerySizeLimit(plan, planContext); |
159 | | - |
160 | | - analyticsEngine.explain(plan, pplRequest.mode(), planContext, listener); |
161 | | - } catch (Exception e) { |
162 | | - listener.onFailure(e); |
163 | | - } |
164 | | - } |
165 | | - |
166 | 151 | /** |
167 | 152 | * Build a lightweight context for parsing only (index name extraction). Does not require cluster |
168 | 153 | * state or catalog schema. |
@@ -194,55 +179,6 @@ private static Optional<String> extractIndexName( |
194 | 179 | return Optional.ofNullable(extractTableNameFromSqlNode(sqlNode)); |
195 | 180 | } |
196 | 181 |
|
197 | | - /** |
198 | | - * Execute a SQL query through the unified query pipeline. Uses {@link |
199 | | - * org.opensearch.sql.plugin.transport.TransportPPLQueryResponse} as the transport response type |
200 | | - * since both PPL and SQL share the same JSON response format. |
201 | | - */ |
202 | | - public void executeSql( |
203 | | - String query, QueryType queryType, ActionListener<TransportPPLQueryResponse> listener) { |
204 | | - client |
205 | | - .threadPool() |
206 | | - .schedule( |
207 | | - withCurrentContext( |
208 | | - () -> { |
209 | | - try (UnifiedQueryContext context = buildContext(queryType, false)) { |
210 | | - UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); |
211 | | - RelNode plan = planner.plan(query); |
212 | | - CalcitePlanContext planContext = context.getPlanContext(); |
213 | | - plan = addQuerySizeLimit(plan, planContext); |
214 | | - analyticsEngine.execute( |
215 | | - plan, planContext, createQueryListener(queryType, listener)); |
216 | | - } catch (Exception e) { |
217 | | - listener.onFailure(e); |
218 | | - } |
219 | | - }), |
220 | | - new TimeValue(0), |
221 | | - SQL_WORKER_THREAD_POOL_NAME); |
222 | | - } |
223 | | - |
224 | | - /** Explain a SQL query through the unified query pipeline. */ |
225 | | - public void explainSql( |
226 | | - String query, QueryType queryType, ResponseListener<ExplainResponse> listener) { |
227 | | - client |
228 | | - .threadPool() |
229 | | - .schedule( |
230 | | - withCurrentContext( |
231 | | - () -> { |
232 | | - try (UnifiedQueryContext context = buildContext(queryType, false)) { |
233 | | - UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); |
234 | | - RelNode plan = planner.plan(query); |
235 | | - CalcitePlanContext planContext = context.getPlanContext(); |
236 | | - plan = addQuerySizeLimit(plan, planContext); |
237 | | - analyticsEngine.explain(plan, ExplainMode.STANDARD, planContext, listener); |
238 | | - } catch (Exception e) { |
239 | | - listener.onFailure(e); |
240 | | - } |
241 | | - }), |
242 | | - new TimeValue(0), |
243 | | - SQL_WORKER_THREAD_POOL_NAME); |
244 | | - } |
245 | | - |
246 | 182 | /** Extracts the table name from a Calcite SqlNode parse tree. */ |
247 | 183 | private static String extractTableNameFromSqlNode(SqlNode sqlNode) { |
248 | 184 | if (sqlNode instanceof SqlSelect select) { |
|
0 commit comments