Skip to content

Commit 9a809b7

Browse files
committed
Update CalciteExplainIT, and fixes
Signed-off-by: Kai Huang <ahkcs@amazon.com> # Conflicts: # integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java
1 parent 3d0dc26 commit 9a809b7

11 files changed

Lines changed: 330 additions & 109 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/plan/LogicalPaginationLimit.java

Lines changed: 0 additions & 81 deletions
This file was deleted.

core/src/main/java/org/opensearch/sql/calcite/plan/LogicalSystemLimit.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ public enum SystemLimitType {
3535
JOIN_SUBSEARCH_MAXOUT,
3636
/** Max output to return from a subsearch. */
3737
SUBSEARCH_MAXOUT,
38+
/**
39+
* Pagination limit type for API pagination support.
40+
*
41+
* <p>This type is used to apply LIMIT/OFFSET for paginating query results. Unlike
42+
* QUERY_SIZE_LIMIT, pagination is applied as post-processing after the user's query executes.
43+
*/
44+
PAGINATION,
3845
}
3946

4047
@Getter private final SystemLimitType type;

core/src/main/java/org/opensearch/sql/executor/QueryService.java

Lines changed: 72 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
package org.opensearch.sql.executor;
77

8-
import java.security.AccessController;
9-
import java.security.PrivilegedAction;
108
import java.util.List;
119
import java.util.Optional;
1210
import javax.annotation.Nullable;
@@ -39,7 +37,6 @@
3937
import org.opensearch.sql.calcite.CalciteRelNodeVisitor;
4038
import org.opensearch.sql.calcite.OpenSearchSchema;
4139
import org.opensearch.sql.calcite.SysLimit;
42-
import org.opensearch.sql.calcite.plan.LogicalPaginationLimit;
4340
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
4441
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
4542
import org.opensearch.sql.common.response.ResponseListener;
@@ -118,6 +115,36 @@ public void explain(
118115
}
119116
}
120117

118+
/**
119+
* Explain with pagination support.
120+
*
121+
* @param plan the unresolved plan
122+
* @param queryType the query type
123+
* @param listener the response listener
124+
* @param format the explain format
125+
* @param pageSize page size for pagination
126+
* @param offset pagination offset (0-based)
127+
*/
128+
public void explain(
129+
UnresolvedPlan plan,
130+
QueryType queryType,
131+
ResponseListener<ExecutionEngine.ExplainResponse> listener,
132+
Explain.ExplainFormat format,
133+
int pageSize,
134+
int offset) {
135+
if (shouldUseCalcite(queryType)) {
136+
explainWithCalcite(plan, queryType, listener, format, pageSize, offset);
137+
} else {
138+
// For legacy path (SQL), wrap with Paginate when pagination is enabled
139+
if (pageSize > 0) {
140+
explainWithLegacy(
141+
new Paginate(pageSize, plan), queryType, listener, format, Optional.empty());
142+
} else {
143+
explainWithLegacy(plan, queryType, listener, format, Optional.empty());
144+
}
145+
}
146+
}
147+
121148
public void executeWithCalcite(
122149
UnresolvedPlan plan,
123150
QueryType queryType,
@@ -143,26 +170,21 @@ public void executeWithCalcite(
143170
CalcitePlanContext.run(
144171
() -> {
145172
try {
146-
AccessController.doPrivileged(
147-
(PrivilegedAction<Void>)
148-
() -> {
149-
CalcitePlanContext context =
150-
CalcitePlanContext.create(
151-
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
173+
CalcitePlanContext context =
174+
CalcitePlanContext.create(
175+
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
152176

153-
// Set pagination parameters if enabled
154-
if (pageSize > 0) {
155-
context.setPaginationSize(pageSize);
156-
context.setPaginationOffset(offset);
157-
}
177+
// Set pagination parameters if enabled
178+
if (pageSize > 0) {
179+
context.setPaginationSize(pageSize);
180+
context.setPaginationOffset(offset);
181+
}
158182

159-
RelNode relNode = analyze(plan, context);
160-
relNode = mergeAdjacentFilters(relNode);
161-
RelNode optimized = optimize(relNode, context);
162-
RelNode calcitePlan = convertToCalcitePlan(optimized);
163-
executionEngine.execute(calcitePlan, context, listener);
164-
return null;
165-
});
183+
RelNode relNode = analyze(plan, context);
184+
relNode = mergeAdjacentFilters(relNode);
185+
RelNode optimized = optimize(relNode, context);
186+
RelNode calcitePlan = convertToCalcitePlan(optimized);
187+
executionEngine.execute(calcitePlan, context, listener);
166188
} catch (Throwable t) {
167189
if (isCalciteFallbackAllowed(t) && !(t instanceof NonFallbackCalciteException)) {
168190
log.warn("Fallback to V2 query engine since got exception", t);
@@ -191,12 +213,39 @@ public void explainWithCalcite(
191213
QueryType queryType,
192214
ResponseListener<ExecutionEngine.ExplainResponse> listener,
193215
Explain.ExplainFormat format) {
216+
explainWithCalcite(plan, queryType, listener, format, 0, 0);
217+
}
218+
219+
/**
220+
* Explain with Calcite engine and pagination support.
221+
*
222+
* @param plan the unresolved plan
223+
* @param queryType the query type
224+
* @param listener the response listener
225+
* @param format the explain format
226+
* @param pageSize page size (0 = no pagination)
227+
* @param offset pagination offset (0-based)
228+
*/
229+
public void explainWithCalcite(
230+
UnresolvedPlan plan,
231+
QueryType queryType,
232+
ResponseListener<ExecutionEngine.ExplainResponse> listener,
233+
Explain.ExplainFormat format,
234+
int pageSize,
235+
int offset) {
194236
CalcitePlanContext.run(
195237
() -> {
196238
try {
197239
CalcitePlanContext context =
198240
CalcitePlanContext.create(
199241
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
242+
243+
// Set pagination parameters if enabled
244+
if (pageSize > 0) {
245+
context.setPaginationSize(pageSize);
246+
context.setPaginationOffset(offset);
247+
}
248+
200249
context.run(
201250
() -> {
202251
RelNode relNode = analyze(plan, context);
@@ -344,9 +393,8 @@ public PhysicalPlan plan(LogicalPlan plan) {
344393
public RelNode optimize(RelNode plan, CalcitePlanContext context) {
345394
if (context.isPaginationEnabled()) {
346395
// Apply pagination LIMIT/OFFSET on top of user's complete query result
347-
// Using LogicalPaginationLimit to preserve input's collation and ensure
348-
// pagination is applied as post-processing, not merged with user's head/sort
349-
return LogicalPaginationLimit.create(
396+
return LogicalSystemLimit.create(
397+
SystemLimitType.PAGINATION,
350398
plan,
351399
context.relBuilder.literal(context.getPaginationOffset()),
352400
context.relBuilder.literal(context.getPaginationSize()));

core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package org.opensearch.sql.executor.execution;
77

88
import java.util.Optional;
9-
import org.apache.commons.lang3.NotImplementedException;
109
import org.opensearch.sql.ast.statement.Explain;
1110
import org.opensearch.sql.ast.tree.UnresolvedPlan;
1211
import org.opensearch.sql.common.response.ResponseListener;
@@ -88,9 +87,8 @@ public void execute() {
8887
public void explain(
8988
ResponseListener<ExecutionEngine.ExplainResponse> listener, Explain.ExplainFormat format) {
9089
if (pageSize.isPresent()) {
91-
listener.onFailure(
92-
new NotImplementedException(
93-
"`explain` feature for paginated requests is not implemented yet."));
90+
queryService.explain(
91+
plan, getQueryType(), listener, format, pageSize.get(), paginationOffset);
9492
} else {
9593
queryService.explain(plan, getQueryType(), listener, format);
9694
}

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2130,4 +2130,73 @@ public void testRexStandardizationForScript() throws IOException {
21302130
TEST_INDEX_BANK),
21312131
true));
21322132
}
2133+
2134+
// Pagination explain tests
2135+
2136+
/**
2137+
* Test explain for head X from Y syntax. This verifies the explain plan shows the correct
2138+
* LIMIT/OFFSET structure.
2139+
*/
2140+
@Test
2141+
public void testExplainHeadFromOffset() throws IOException {
2142+
enabledOnlyWhenPushdownIsEnabled();
2143+
String query =
2144+
String.format(
2145+
"source=%s | fields firstname, age | sort age | head 3 from 2", TEST_INDEX_BANK);
2146+
var result = explainQueryYaml(query);
2147+
String expected = loadExpectedPlan("explain_head_from_offset.yaml");
2148+
assertYamlEqualsIgnoreId(expected, result);
2149+
}
2150+
2151+
/**
2152+
* Test explain for head X from Y with filter. This verifies the explain plan shows the correct
2153+
* LIMIT/OFFSET structure with filter applied first.
2154+
*/
2155+
@Test
2156+
public void testExplainHeadFromOffsetWithFilter() throws IOException {
2157+
enabledOnlyWhenPushdownIsEnabled();
2158+
String query =
2159+
String.format(
2160+
"source=%s | where age > 30 | fields firstname, age | sort age | head 2 from 3",
2161+
TEST_INDEX_BANK);
2162+
var result = explainQueryYaml(query);
2163+
String expected = loadExpectedPlan("explain_head_from_offset_with_filter.yaml");
2164+
assertYamlEqualsIgnoreId(expected, result);
2165+
}
2166+
2167+
/**
2168+
* Test explain for pagination API. This verifies that using pageSize and offset parameters
2169+
* produces a plan with LogicalSystemLimit(type=PAGINATION) applied on top.
2170+
*/
2171+
@Test
2172+
public void testExplainPaginationApi() throws IOException {
2173+
enabledOnlyWhenPushdownIsEnabled();
2174+
String query = String.format("source=%s | fields firstname, age | sort age", TEST_INDEX_BANK);
2175+
var result = explainQueryYamlWithPagination(query, 3, 2);
2176+
String expected = loadExpectedPlan("explain_pagination_api.yaml");
2177+
assertYamlEqualsIgnoreId(expected, result);
2178+
}
2179+
2180+
/**
2181+
* Test using both pagination API and head X from Y together. The query uses head to limit results
2182+
* first, then pagination API applies additional LIMIT/OFFSET on top of that result.
2183+
*
2184+
* <p>Example: Query "head 5 from 1" returns rows 2-6 (5 rows starting from offset 1). Then
2185+
* pagination with pageSize=2, offset=1 returns rows 2-3 of that result (i.e., original rows 3-4).
2186+
*/
2187+
@Test
2188+
public void testExplainPaginationApiWithHeadFrom() throws IOException {
2189+
enabledOnlyWhenPushdownIsEnabled();
2190+
// Query with head 5 from 1, then apply pagination pageSize=2, offset=1
2191+
String query =
2192+
String.format(
2193+
"source=%s | fields firstname, age | sort age | head 5 from 1", TEST_INDEX_BANK);
2194+
var result = explainQueryYamlWithPagination(query, 2, 1);
2195+
2196+
// The plan should show both:
2197+
// 1. LogicalSystemLimit with type=[PAGINATION] (from pagination API)
2198+
// 2. LogicalSort with offset and fetch (from head 5 from 1)
2199+
String expected = loadExpectedPlan("explain_pagination_api_with_head_from.yaml");
2200+
assertYamlEqualsIgnoreId(expected, result);
2201+
}
21332202
}

0 commit comments

Comments
 (0)