Skip to content

Commit 7dca5cc

Browse files
authored
fix: Honor PPL fetch_size on the analytics-engine route (#5567)
* fix: Honor PPL fetch_size on the analytics-engine route PPL's fetch_size caps a response to N rows (no cursor) — the V2 path lowers it to a top-level `head N` in AstStatementBuilder.visitPplStatement. The analytics-engine route bypasses that builder: TransportPPLQueryAction forwards only the query string to RestUnifiedQueryAction.execute(), so fetch_size was dropped and the engine returned the full result set. Thread the request's fetchSize through execute() and apply an equivalent top-level limit on the planned RelNode (addFetchSizeLimit), using the same relBuilder.limit primitive that `head` lowers to. fetchSize <= 0 keeps the prior "system default" behavior; the SQL path (separate cursor-based fetch_size) is unchanged. Before/after (CalcitePPLFetchSizeIT-equivalent, analytics-engine route): before: 9/19 pass (10 fail — fetch_size ignored, full set returned) after: 19/19 pass Signed-off-by: Kai Huang <ahkcs@amazon.com> * chore: spotlessApply on RestUnifiedQueryAction (javadoc reflow + signature wrap) Signed-off-by: Kai Huang <ahkcs@amazon.com> --------- Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 08d8ba1 commit 7dca5cc

2 files changed

Lines changed: 24 additions & 2 deletions

File tree

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,24 +152,26 @@ public void execute(
152152
QueryType queryType,
153153
boolean profiling,
154154
ActionListener<TransportPPLQueryResponse> listener) {
155-
doExecute(query, queryType, profiling, null, listener);
155+
doExecute(query, queryType, profiling, 0, null, listener);
156156
}
157157

158158
/** Execute linked to {@code parentTask} so a front-end cancel propagates into the engine. */
159159
public void execute(
160160
String query,
161161
QueryType queryType,
162162
boolean profiling,
163+
int fetchSize,
163164
Task parentTask,
164165
ActionListener<TransportPPLQueryResponse> listener) {
165166
assert parentTask != null : "parentTask required for cancellation propagation";
166-
doExecute(query, queryType, profiling, parentTask, listener);
167+
doExecute(query, queryType, profiling, fetchSize, parentTask, listener);
167168
}
168169

169170
private void doExecute(
170171
String query,
171172
QueryType queryType,
172173
boolean profiling,
174+
int fetchSize,
173175
Task parentTask,
174176
ActionListener<TransportPPLQueryResponse> listener) {
175177
client
@@ -193,6 +195,10 @@ private void doExecute(
193195
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
194196
RelNode plan = planner.plan(query);
195197
CalcitePlanContext planContext = context.getPlanContext();
198+
// PPL fetch_size caps the response to N rows (no cursor) — the V2 path attaches
199+
// a `head N` in AstStatementBuilder; the unified path parses only the query
200+
// string, so apply the equivalent top-level limit here before the system cap.
201+
plan = addFetchSizeLimit(plan, planContext, fetchSize);
196202
plan = addQuerySizeLimit(plan, planContext);
197203
if (profiling) {
198204
analyticsEngine.executeWithProfile(
@@ -339,6 +345,21 @@ private static RelNode addQuerySizeLimit(RelNode plan, CalcitePlanContext contex
339345
context.relBuilder.literal(context.sysLimit.querySizeLimit()));
340346
}
341347

348+
/**
349+
* Cap the result to {@code fetchSize} rows when {@code fetchSize > 0}, mirroring PPL's {@code
350+
* fetch_size} (which the V2 path lowers to a top-level {@code head N} in AstStatementBuilder).
351+
* {@code fetchSize <= 0} means "use system default", so no limit is added. Uses the same {@code
352+
* relBuilder.limit} primitive that {@code head} lowers to, so the analytics backend sees an
353+
* ordinary fetch limit.
354+
*/
355+
private static RelNode addFetchSizeLimit(
356+
RelNode plan, CalcitePlanContext context, int fetchSize) {
357+
if (fetchSize <= 0) {
358+
return plan;
359+
}
360+
return context.relBuilder.push(plan).limit(0, fetchSize).build();
361+
}
362+
342363
private ResponseListener<QueryResponse> createQueryListener(
343364
QueryType queryType, ActionListener<TransportPPLQueryResponse> transportListener) {
344365
ResponseFormatter<QueryResult> formatter = new SimpleJsonResponseFormatter(PRETTY);

plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ protected void doExecute(
189189
transformedRequest.getRequest(),
190190
QueryType.PPL,
191191
transformedRequest.profile(),
192+
transformedRequest.getFetchSize(),
192193
task,
193194
clearingListener);
194195
}

0 commit comments

Comments
 (0)