From 6470b8e50ffd76fa7873c32dc22111efd9e16bd8 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Tue, 3 Feb 2026 16:07:09 -0800 Subject: [PATCH 1/7] Support fetch_size API for PPL Signed-off-by: Kai Huang # Conflicts: # integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java --- .../opensearch/sql/ast/statement/Query.java | 8 +- .../opensearch/sql/executor/QueryService.java | 26 ++- .../sql/executor/execution/QueryPlan.java | 4 +- docs/user/interfaces/endpoint.rst | 63 +++++- docs/user/ppl/limitations/limitations.md | 2 +- .../sql/calcite/remote/CalciteExplainIT.java | 69 ++++++ .../org/opensearch/sql/ppl/FetchSizeIT.java | 210 ++++++++++++++++++ .../calcite/explain_fetch_size_push.yaml | 8 + ...ain_fetch_size_smaller_than_head_push.yaml | 9 + .../explain_fetch_size_with_head_push.yaml | 9 + .../explain_fetch_size_push.yaml | 11 + ...ain_fetch_size_smaller_than_head_push.yaml | 13 ++ .../explain_fetch_size_with_head_push.yaml | 13 ++ .../org/opensearch/sql/ppl/PPLService.java | 1 + .../sql/ppl/domain/PPLQueryRequest.java | 22 ++ .../sql/ppl/parser/AstStatementBuilder.java | 15 +- .../opensearch/sql/ppl/PPLServiceTest.java | 6 +- .../sql/ppl/domain/PPLQueryRequestTest.java | 37 +++ .../ppl/parser/AstStatementBuilderTest.java | 74 ++++++ 19 files changed, 576 insertions(+), 24 deletions(-) create mode 100644 integ-test/src/test/java/org/opensearch/sql/ppl/FetchSizeIT.java create mode 100644 integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_push.yaml create mode 100644 integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_smaller_than_head_push.yaml create mode 100644 integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_with_head_push.yaml create mode 100644 integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_push.yaml create mode 100644 integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_smaller_than_head_push.yaml create mode 100644 integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_with_head_push.yaml diff --git a/core/src/main/java/org/opensearch/sql/ast/statement/Query.java b/core/src/main/java/org/opensearch/sql/ast/statement/Query.java index 6681d6d1d2c..4d02773e38a 100644 --- a/core/src/main/java/org/opensearch/sql/ast/statement/Query.java +++ b/core/src/main/java/org/opensearch/sql/ast/statement/Query.java @@ -7,7 +7,6 @@ import lombok.EqualsAndHashCode; import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.ToString; import org.opensearch.sql.ast.AbstractNodeVisitor; @@ -19,13 +18,18 @@ @Setter @ToString @EqualsAndHashCode(callSuper = false) -@RequiredArgsConstructor public class Query extends Statement { protected final UnresolvedPlan plan; protected final int fetchSize; private final QueryType queryType; + public Query(UnresolvedPlan plan, int fetchSize, QueryType queryType) { + this.plan = plan; + this.fetchSize = fetchSize; + this.queryType = queryType; + } + @Override public R accept(AbstractNodeVisitor visitor, C context) { return visitor.visitQuery(this, context); diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index b2c219b0e4e..63491000d43 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -100,9 +100,9 @@ public void executeWithCalcite( QueryProfiling.activate(QueryContext.isProfileEnabled()); ProfileMetric analyzeMetric = profileContext.getOrCreateMetric(MetricName.ANALYZE); long analyzeStart = System.nanoTime(); + SysLimit sysLimit = SysLimit.fromSettings(settings); CalcitePlanContext context = - CalcitePlanContext.create( - buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType); + CalcitePlanContext.create(buildFrameworkConfig(), sysLimit, queryType); RelNode relNode = analyze(plan, context); RelNode calcitePlan = convertToCalcitePlan(relNode, context); analyzeMetric.set(System.nanoTime() - analyzeStart); @@ -236,16 +236,18 @@ public void executePlan( .getSplit() .ifPresentOrElse( split -> executionEngine.execute(plan(plan), new ExecutionContext(split), listener), - () -> - executionEngine.execute( - plan(plan), - ExecutionContext.querySizeLimit( - // For pagination, querySizeLimit shouldn't take effect. - // See {@link PaginationWindowIT::testQuerySizeLimitDoesNotEffectPageSize} - plan instanceof LogicalPaginate - ? null - : SysLimit.fromSettings(settings).querySizeLimit()), - listener)); + () -> { + Integer effectiveLimit; + if (plan instanceof LogicalPaginate) { + // For pagination, querySizeLimit shouldn't take effect. + // See {@link PaginationWindowIT::testQuerySizeLimitDoesNotEffectPageSize} + effectiveLimit = null; + } else { + effectiveLimit = SysLimit.fromSettings(settings).querySizeLimit(); + } + executionEngine.execute( + plan(plan), ExecutionContext.querySizeLimit(effectiveLimit), listener); + }); } catch (Exception e) { listener.onFailure(e); } diff --git a/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java index 5896871f5d0..ce6018a1d1a 100644 --- a/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java +++ b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java @@ -29,7 +29,7 @@ public class QueryPlan extends AbstractPlan { protected final Optional pageSize; - /** Constructor. */ + /** Constructor without page size. */ public QueryPlan( QueryId queryId, QueryType queryType, @@ -43,7 +43,7 @@ public QueryPlan( this.pageSize = Optional.empty(); } - /** Constructor with page size. */ + /** Constructor with page size (for pagination). */ public QueryPlan( QueryId queryId, QueryType queryType, diff --git a/docs/user/interfaces/endpoint.rst b/docs/user/interfaces/endpoint.rst index 9143ba62dac..4cf63cd2934 100644 --- a/docs/user/interfaces/endpoint.rst +++ b/docs/user/interfaces/endpoint.rst @@ -208,13 +208,13 @@ Explain:: } } -Cursor -====== +Cursor (SQL) +============ Description ----------- -To get paginated response for a query, user needs to provide `fetch_size` parameter as part of normal query. The value of `fetch_size` should be greater than `0`. In absence of `fetch_size` or a value of `0`, it will fallback to non-paginated response. This feature is only available over `jdbc` format for now. +To get paginated response for a SQL query, user needs to provide `fetch_size` parameter as part of normal query. The value of `fetch_size` should be greater than `0`. In absence of `fetch_size` or a value of `0`, it will fallback to non-paginated response. This feature is only available over `jdbc` format for now. Example ------- @@ -266,3 +266,60 @@ Result set:: "size": 5, "status": 200 } + +Fetch Size (PPL) +================ + +Description +----------- + +PPL also supports the ``fetch_size`` parameter, but with different semantics from SQL. In PPL, ``fetch_size`` limits the number of rows returned in a single, complete response. **PPL does not support cursor-based pagination** — no cursor is returned and there is no way to fetch additional pages. The value of ``fetch_size`` should be between ``1`` and ``10000``. In absence of ``fetch_size`` or a value of ``0``, it will use the system default behavior (no limit). + ++--------------------+-------------------------------------+------------------------------------+ +| Aspect | SQL ``fetch_size`` | PPL ``fetch_size`` | ++====================+=====================================+====================================+ +| Purpose | Cursor-based pagination | Response size limiting | ++--------------------+-------------------------------------+------------------------------------+ +| Returns cursor? | Yes | No | ++--------------------+-------------------------------------+------------------------------------+ +| Can fetch more? | Yes (with cursor) | No (single response) | ++--------------------+-------------------------------------+------------------------------------+ +| Maximum value | No hard limit | 10,000 | ++--------------------+-------------------------------------+------------------------------------+ + +Example +------- + +PPL query:: + + >> curl -H 'Content-Type: application/json' -X POST localhost:9200/_plugins/_ppl -d '{ + "fetch_size" : 5, + "query" : "source = accounts | fields firstname, lastname | where age > 20" + }' + +Result set:: + + { + "schema": [ + { + "name": "firstname", + "type": "text" + }, + { + "name": "lastname", + "type": "text" + } + ], + "total": 5, + "datarows": [ + ["Cherry", "Carey"], + ["Lindsey", "Hawkins"], + ["Sargent", "Powers"], + ["Campos", "Olsen"], + ["Savannah", "Kirby"] + ], + "size": 5, + "status": 200 + } + +Note that unlike the SQL response above, there is no ``cursor`` field in the PPL response. The response is complete and final. diff --git a/docs/user/ppl/limitations/limitations.md b/docs/user/ppl/limitations/limitations.md index ac7494386dd..eea7b884d97 100644 --- a/docs/user/ppl/limitations/limitations.md +++ b/docs/user/ppl/limitations/limitations.md @@ -62,7 +62,7 @@ For the following functionalities, the query will be forwarded to the V2 query e * ML * Kmeans * `show datasources` and command -* Commands with `fetch_size` parameter +* SQL commands with `fetch_size` parameter (cursor-based pagination). Note: PPL's `fetch_size` (response size limiting, no cursor) is supported in Calcite Engine. ## Malformed Field Names in Object Fields diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index 4846480fc1d..b233dd35cc6 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -5,6 +5,7 @@ package org.opensearch.sql.calcite.remote; +import static org.opensearch.sql.legacy.TestUtils.getResponseBody; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ALIAS; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; @@ -25,8 +26,12 @@ import java.io.IOException; import java.util.Locale; +import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; +import org.opensearch.client.Request; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.Response; import org.opensearch.sql.ast.statement.ExplainMode; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.common.setting.Settings.Key; @@ -2497,4 +2502,68 @@ public void testExplainMvCombine() throws IOException { String expected = loadExpectedPlan("explain_mvcombine.yaml"); assertYamlEqualsIgnoreId(expected, actual); } + + // ==================== fetch_size explain tests ==================== + + @Test + public void testExplainFetchSizePushDown() throws IOException { + // fetch_size=5 injects Head(5, 0) on top of the plan + // Logical plan: LogicalSort(fetch=[5]) wraps the Project + String expected = loadExpectedPlan("explain_fetch_size_push.yaml"); + assertYamlEqualsIgnoreId( + expected, + explainQueryWithFetchSizeYaml( + String.format("source=%s | fields age", TEST_INDEX_ACCOUNT), 5)); + } + + @Test + public void testExplainFetchSizeWithSmallerHead() throws IOException { + // fetch_size=10 with user's | head 3 + // Two LogicalSort nodes: inner fetch=[3] from user head, outer fetch=[10] from fetch_size + // Effective limit = min(3, 10) = 3 + String expected = loadExpectedPlan("explain_fetch_size_with_head_push.yaml"); + assertYamlEqualsIgnoreId( + expected, + explainQueryWithFetchSizeYaml( + String.format("source=%s | head 3 | fields age", TEST_INDEX_ACCOUNT), 10)); + } + + @Test + public void testExplainFetchSizeSmallerThanHead() throws IOException { + // fetch_size=5 with user's | head 100 + // Two LogicalSort nodes: inner fetch=[100] from user head, outer fetch=[5] from fetch_size + // Effective limit = min(100, 5) = 5 + String expected = loadExpectedPlan("explain_fetch_size_smaller_than_head_push.yaml"); + assertYamlEqualsIgnoreId( + expected, + explainQueryWithFetchSizeYaml( + String.format("source=%s | head 100 | fields age", TEST_INDEX_ACCOUNT), 5)); + } + + /** + * Send an explain request with fetch_size in the JSON body and return YAML output. + * + * @param query the PPL query string + * @param fetchSize the fetch_size parameter value + * @return the explain output as YAML string + */ + private String explainQueryWithFetchSizeYaml(String query, int fetchSize) throws IOException { + Request request = + new Request( + "POST", + String.format( + "/_plugins/_ppl/_explain?format=%s&mode=%s", Format.YAML, ExplainMode.STANDARD)); + String jsonBody = + String.format( + Locale.ROOT, "{\n \"query\": \"%s\",\n \"fetch_size\": %d\n}", query, fetchSize); + request.setJsonEntity(jsonBody); + + RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + restOptionsBuilder.addHeader("Content-Type", "application/json"); + request.setOptions(restOptionsBuilder); + + Response response = client().performRequest(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + return getResponseBody(response, true); + } } diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/FetchSizeIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/FetchSizeIT.java new file mode 100644 index 00000000000..ee8006d2441 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/FetchSizeIT.java @@ -0,0 +1,210 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl; + +import static org.opensearch.sql.legacy.TestUtils.getResponseBody; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; +import static org.opensearch.sql.plugin.rest.RestPPLQueryAction.QUERY_API_ENDPOINT; + +import java.io.IOException; +import java.util.Locale; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.Assert; +import org.junit.jupiter.api.Test; +import org.opensearch.client.Request; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.Response; +import org.opensearch.client.ResponseException; + +/** Integration tests for PPL fetch_size parameter. */ +public class FetchSizeIT extends PPLIntegTestCase { + + @Override + public void init() throws Exception { + super.init(); + loadIndex(Index.ACCOUNT); + loadIndex(Index.BANK); + } + + @Test + public void testFetchSizeLimitsResults() throws IOException { + // accounts index has 1000 documents, request only 5 + JSONObject result = executeQueryWithFetchSize("source=" + TEST_INDEX_ACCOUNT, 5); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(5, dataRows.length()); + } + + @Test + public void testFetchSizeWithFields() throws IOException { + JSONObject result = + executeQueryWithFetchSize( + String.format("source=%s | fields firstname, age", TEST_INDEX_ACCOUNT), 3); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(3, dataRows.length()); + } + + @Test + public void testFetchSizeWithFilter() throws IOException { + // Filter + fetch_size + JSONObject result = + executeQueryWithFetchSize( + String.format("source=%s | where age > 30", TEST_INDEX_ACCOUNT), 10); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(10, dataRows.length()); + } + + @Test + public void testFetchSizeWithSort() throws IOException { + // Sort + fetch_size - should get the first N results after sorting + JSONObject result = + executeQueryWithFetchSize( + String.format("source=%s | sort age | fields firstname, age", TEST_INDEX_ACCOUNT), 5); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(5, dataRows.length()); + } + + @Test + public void testFetchSizeWithEval() throws IOException { + // Eval command + fetch_size - ensures fetch_size works with post-processing commands + JSONObject result = + executeQueryWithFetchSize( + String.format( + "source=%s | eval age_plus_10 = age + 10 | fields firstname, age, age_plus_10", + TEST_INDEX_ACCOUNT), + 7); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(7, dataRows.length()); + } + + @Test + public void testFetchSizeWithDedup() throws IOException { + // Dedup command + fetch_size - dedup may return fewer than fetch_size if not enough unique + // values + JSONObject result = + executeQueryWithFetchSize( + String.format("source=%s | dedup gender | fields gender", TEST_INDEX_ACCOUNT), 100); + JSONArray dataRows = result.getJSONArray("datarows"); + // There are only 2 genders (M, F) in the dataset, so we should get at most 2 + assertTrue(dataRows.length() <= 2); + } + + @Test + public void testFetchSizeWithRename() throws IOException { + JSONObject result = + executeQueryWithFetchSize( + String.format( + "source=%s | rename firstname as first_name | fields first_name, age", + TEST_INDEX_ACCOUNT), + 4); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(4, dataRows.length()); + } + + @Test + public void testFetchSizeZeroReturnsAllResults() throws IOException { + // fetch_size=0 should be treated as "no limit" (use system default) + JSONObject result = executeQueryWithFetchSize(String.format("source=%s", TEST_INDEX_BANK), 0); + JSONArray dataRows = result.getJSONArray("datarows"); + // Bank index has 7 documents + assertEquals(7, dataRows.length()); + } + + @Test + public void testFetchSizeLargerThanDataset() throws IOException { + // When fetch_size is larger than the dataset, return all available results + JSONObject result = + executeQueryWithFetchSize(String.format("source=%s", TEST_INDEX_BANK), 1000); + JSONArray dataRows = result.getJSONArray("datarows"); + // Bank index has 7 documents, so we should get 7, not 1000 + assertEquals(7, dataRows.length()); + } + + @Test + public void testFetchSizeOne() throws IOException { + JSONObject result = + executeQueryWithFetchSize( + String.format("source=%s | fields firstname", TEST_INDEX_ACCOUNT), 1); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(1, dataRows.length()); + } + + @Test + public void testFetchSizeWithStats() throws IOException { + // Stats aggregation - fetch_size should still apply to aggregation results + JSONObject result = + executeQueryWithFetchSize( + String.format("source=%s | stats count() by gender", TEST_INDEX_ACCOUNT), 100); + JSONArray dataRows = result.getJSONArray("datarows"); + // Stats by gender should return 2 rows (M and F) + assertEquals(2, dataRows.length()); + } + + @Test + public void testFetchSizeWithHead() throws IOException { + // Both head command and fetch_size - the smaller limit should win + // head 3 limits to 3, fetch_size 10 would allow 10, so we get 3 + JSONObject result = + executeQueryWithFetchSize( + String.format("source=%s | head 3 | fields firstname", TEST_INDEX_ACCOUNT), 10); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(3, dataRows.length()); + } + + @Test + public void testFetchSizeSmallerThanHead() throws IOException { + // fetch_size smaller than head - fetch_size should further limit + // head 100 would return 100, but fetch_size 5 limits to 5 + JSONObject result = + executeQueryWithFetchSize( + String.format("source=%s | head 100 | fields firstname", TEST_INDEX_ACCOUNT), 5); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(5, dataRows.length()); + } + + @Test + public void testFetchSizeExceedsMaxReturnsError() throws IOException { + // fetch_size > 10000 should return an error + ResponseException exception = + assertThrows( + ResponseException.class, + () -> executeQueryWithFetchSize(String.format("source=%s", TEST_INDEX_ACCOUNT), 15000)); + String responseBody = getResponseBody(exception.getResponse(), true); + assertTrue(responseBody.contains("fetch_size")); + } + + @Test + public void testWithoutFetchSizeReturnsDefaultBehavior() throws IOException { + // Without fetch_size, should return results up to system default + JSONObject result = executeQuery(String.format("source=%s", TEST_INDEX_BANK)); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(7, dataRows.length()); + } + + /** + * Execute a PPL query with fetch_size parameter. + * + * @param query the PPL query string + * @param fetchSize the maximum number of results to return + * @return the JSON response + */ + protected JSONObject executeQueryWithFetchSize(String query, int fetchSize) throws IOException { + Request request = new Request("POST", QUERY_API_ENDPOINT); + String jsonBody = + String.format( + Locale.ROOT, "{\n \"query\": \"%s\",\n \"fetch_size\": %d\n}", query, fetchSize); + request.setJsonEntity(jsonBody); + + RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + restOptionsBuilder.addHeader("Content-Type", "application/json"); + request.setOptions(restOptionsBuilder); + + Response response = client().performRequest(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + return jsonify(getResponseBody(response, true)); + } +} diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_push.yaml new file mode 100644 index 00000000000..d35c47cf5cb --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_push.yaml @@ -0,0 +1,8 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(fetch=[5]) + LogicalProject(age=[$8]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->5, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_smaller_than_head_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_smaller_than_head_push.yaml new file mode 100644 index 00000000000..7c80ddf56df --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_smaller_than_head_push.yaml @@ -0,0 +1,9 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(fetch=[5]) + LogicalProject(age=[$8]) + LogicalSort(fetch=[100]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->100, LIMIT->5, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_with_head_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_with_head_push.yaml new file mode 100644 index 00000000000..ba828e445b5 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_with_head_push.yaml @@ -0,0 +1,9 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(fetch=[10]) + LogicalProject(age=[$8]) + LogicalSort(fetch=[3]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->3, LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":3,"timeout":"1m","_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=3, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_push.yaml new file mode 100644 index 00000000000..ca4931ec61a --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_push.yaml @@ -0,0 +1,11 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(fetch=[5]) + LogicalProject(age=[$8]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..16=[{inputs}], age=[$t8]) + EnumerableLimit(fetch=[5]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_smaller_than_head_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_smaller_than_head_push.yaml new file mode 100644 index 00000000000..a3099df5ff1 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_smaller_than_head_push.yaml @@ -0,0 +1,13 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(fetch=[5]) + LogicalProject(age=[$8]) + LogicalSort(fetch=[100]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..16=[{inputs}], age=[$t8]) + EnumerableLimit(fetch=[5]) + EnumerableLimit(fetch=[100]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_with_head_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_with_head_push.yaml new file mode 100644 index 00000000000..6fb5ef8a97c --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_with_head_push.yaml @@ -0,0 +1,13 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(fetch=[10]) + LogicalProject(age=[$8]) + LogicalSort(fetch=[3]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableLimit(fetch=[10]) + EnumerableCalc(expr#0..16=[{inputs}], age=[$t8]) + EnumerableLimit(fetch=[3]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java b/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java index ecae97283ed..d6f025a4540 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java @@ -97,6 +97,7 @@ private AbstractPlan plan( new AstBuilder(request.getRequest(), settings), AstStatementBuilder.StatementBuilderContext.builder() .isExplain(request.isExplainRequest()) + .fetchSize(request.getFetchSize()) .format(request.getFormat()) .explainMode(request.getExplainMode()) .build())); diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java b/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java index ef21e0f2803..7280f4b7db0 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java @@ -18,6 +18,8 @@ public class PPLQueryRequest { private static final String DEFAULT_PPL_PATH = "/_plugins/_ppl"; + private static final String FETCH_SIZE_FIELD = "fetch_size"; + private static final int MAX_FETCH_SIZE = 10000; public static final PPLQueryRequest NULL = new PPLQueryRequest("", null, DEFAULT_PPL_PATH, ""); @@ -93,4 +95,24 @@ public Format format() { public ExplainMode mode() { return ExplainMode.of(explainMode); } + + /** + * Get the maximum number of results to return. Unlike SQL's fetch_size which enables cursor-based + * pagination, PPL's fetch_size simply limits the response to N rows without cursor support. + * + * @return fetch_size value from request, or 0 if not specified (meaning use system default) + * @throws IllegalArgumentException if fetch_size exceeds MAX_FETCH_SIZE (10000) + */ + public int getFetchSize() { + if (jsonContent == null) { + return 0; + } + int fetchSize = jsonContent.optInt(FETCH_SIZE_FIELD, 0); + if (fetchSize > MAX_FETCH_SIZE) { + throw new IllegalArgumentException( + String.format( + Locale.ROOT, "fetch_size must be less than or equal to %d", MAX_FETCH_SIZE)); + } + return fetchSize; + } } diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstStatementBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstStatementBuilder.java index 9d8338ecea7..cee084bc71b 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstStatementBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstStatementBuilder.java @@ -15,6 +15,7 @@ import org.opensearch.sql.ast.statement.Explain; import org.opensearch.sql.ast.statement.Query; import org.opensearch.sql.ast.statement.Statement; +import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Project; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser; @@ -30,7 +31,12 @@ public class AstStatementBuilder extends OpenSearchPPLParserBaseVisitor 0) { + rawPlan = new Head(context.getFetchSize(), 0).attach(rawPlan); + } + UnresolvedPlan plan = addSelectAll(rawPlan); + Query query = new Query(plan, 0, PPL); if (ctx.explainStatement() != null) { if (ctx.explainStatement().explainMode() == null) { return new Explain(query, PPL); @@ -51,7 +57,14 @@ protected Statement aggregateResult(Statement aggregate, Statement nextResult) { @Builder public static class StatementBuilderContext { private final boolean isExplain; + + /** + * Maximum number of results to return. 0 means use system default. Unlike SQL's fetch_size + * which enables cursor-based pagination, PPL's fetch_size limits the response to N rows without + * cursor support. + */ private final int fetchSize; + private final String format; private final String explainMode; } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java index 5db142a6506..4faadd4850c 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java @@ -102,7 +102,7 @@ public void onFailure(Exception e) { public void testExecuteShouldPass() { doAnswer( invocation -> { - ResponseListener listener = invocation.getArgument(1); + ResponseListener listener = invocation.getArgument(2); listener.onResponse(new QueryResponse(schema, Collections.emptyList(), Cursor.None)); return null; }) @@ -119,7 +119,7 @@ public void testExecuteShouldPass() { public void testExecuteCsvFormatShouldPass() { doAnswer( invocation -> { - ResponseListener listener = invocation.getArgument(1); + ResponseListener listener = invocation.getArgument(2); listener.onResponse(new QueryResponse(schema, Collections.emptyList(), Cursor.None)); return null; }) @@ -173,7 +173,7 @@ public void testExplainWithIllegalQueryShouldBeCaughtByHandler() { public void testPrometheusQuery() { doAnswer( invocation -> { - ResponseListener listener = invocation.getArgument(1); + ResponseListener listener = invocation.getArgument(2); listener.onResponse(new QueryResponse(schema, Collections.emptyList(), Cursor.None)); return null; }) diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/domain/PPLQueryRequestTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/domain/PPLQueryRequestTest.java index f4e90395cb5..e357a857d0c 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/domain/PPLQueryRequestTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/domain/PPLQueryRequestTest.java @@ -8,6 +8,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import org.json.JSONObject; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -55,4 +56,40 @@ public void testUnsupportedFormat() { exceptionRule.expectMessage("response in " + format + " format is not supported."); request.format(); } + + @Test + public void testGetFetchSizeReturnsValueFromJson() { + JSONObject json = new JSONObject("{\"query\": \"source=t\", \"fetch_size\": 100}"); + PPLQueryRequest request = new PPLQueryRequest("source=t", json, "/_plugins/_ppl"); + assertEquals(100, request.getFetchSize()); + } + + @Test + public void testGetFetchSizeReturnsZeroWhenNotSpecified() { + JSONObject json = new JSONObject("{\"query\": \"source=t\"}"); + PPLQueryRequest request = new PPLQueryRequest("source=t", json, "/_plugins/_ppl"); + assertEquals(0, request.getFetchSize()); + } + + @Test + public void testGetFetchSizeReturnsZeroWhenJsonContentIsNull() { + PPLQueryRequest request = new PPLQueryRequest("source=t", null, "/_plugins/_ppl"); + assertEquals(0, request.getFetchSize()); + } + + @Test + public void testGetFetchSizeWithLargeValue() { + JSONObject json = new JSONObject("{\"query\": \"source=t\", \"fetch_size\": 10000}"); + PPLQueryRequest request = new PPLQueryRequest("source=t", json, "/_plugins/_ppl"); + assertEquals(10000, request.getFetchSize()); + } + + @Test + public void testGetFetchSizeThrowsWhenExceedsMax() { + JSONObject json = new JSONObject("{\"query\": \"source=t\", \"fetch_size\": 15000}"); + PPLQueryRequest request = new PPLQueryRequest("source=t", json, "/_plugins/_ppl"); + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("fetch_size must be less than or equal to 10000"); + request.getFetchSize(); + } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstStatementBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstStatementBuilderTest.java index 568a771732f..4229bbc8af3 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstStatementBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstStatementBuilderTest.java @@ -58,6 +58,80 @@ public void buildExplainStatement() { new Explain(new Query(project(search(relation("t"), "a:1"), AllFields.of()), 0, PPL), PPL)); } + @Test + public void buildQueryStatementWithFetchSize() { + // When fetchSize > 0, a Head node is injected below Project (addSelectAll wraps the top) + assertEqualWithFetchSize( + "search source=t a=1", + 100, + new Query(project(head(search(relation("t"), "a:1"), 100, 0), AllFields.of()), 0, PPL)); + } + + @Test + public void buildQueryStatementWithFetchSizeZero() { + // fetchSize=0 means use system default - no Head node injected + assertEqualWithFetchSize( + "search source=t a=1", + 0, + new Query(project(search(relation("t"), "a:1"), AllFields.of()), 0, PPL)); + } + + @Test + public void buildQueryStatementWithLargeFetchSize() { + assertEqualWithFetchSize( + "search source=t a=1", + 10000, + new Query(project(head(search(relation("t"), "a:1"), 10000, 0), AllFields.of()), 0, PPL)); + } + + @Test + public void buildQueryStatementWithFetchSizeAndSmallerHead() { + // User query has head 3, fetchSize=10 + // Head(10) wraps Head(3), then Project(*) wraps on top + // The inner head 3 limits first, so only 3 rows are returned + assertEqualWithFetchSize( + "source=t | head 3", + 10, + new Query(project(head(head(relation("t"), 3, 0), 10, 0), AllFields.of()), 0, PPL)); + } + + @Test + public void buildQueryStatementWithFetchSizeSmallerThanHead() { + // User query has head 100, fetchSize=5 + // Head(5) wraps Head(100), then Project(*) wraps on top + // The outer head 5 limits, so only 5 rows are returned + assertEqualWithFetchSize( + "source=t | head 100", + 5, + new Query(project(head(head(relation("t"), 100, 0), 5, 0), AllFields.of()), 0, PPL)); + } + + @Test + public void buildQueryStatementWithFetchSizeAndHeadWithOffset() { + // User query has head 3 from 1 (with offset), fetchSize=10 + // The inner head offset is preserved, outer Head always has offset 0 + assertEqualWithFetchSize( + "source=t | head 3 from 1", + 10, + new Query(project(head(head(relation("t"), 3, 1), 10, 0), AllFields.of()), 0, PPL)); + } + + private void assertEqualWithFetchSize(String query, int fetchSize, Statement expectedStatement) { + Node actualPlan = planWithFetchSize(query, fetchSize); + assertEquals(expectedStatement, actualPlan); + } + + private Node planWithFetchSize(String query, int fetchSize) { + final AstStatementBuilder builder = + new AstStatementBuilder( + new AstBuilder(query, settings), + AstStatementBuilder.StatementBuilderContext.builder() + .isExplain(false) + .fetchSize(fetchSize) + .build()); + return builder.visit(parser.parse(query)); + } + private void assertEqual(String query, Statement expectedStatement) { Node actualPlan = plan(query, false); assertEquals(expectedStatement, actualPlan); From 08055cd73ee8696d333f220aa83fc541760842c3 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 4 Feb 2026 10:28:44 -0800 Subject: [PATCH 2/7] update Signed-off-by: Kai Huang --- .../opensearch/sql/ast/statement/Query.java | 8 ++---- .../opensearch/sql/executor/QueryService.java | 26 +++++++++---------- .../sql/executor/execution/QueryPlan.java | 4 +-- 3 files changed, 16 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/ast/statement/Query.java b/core/src/main/java/org/opensearch/sql/ast/statement/Query.java index 4d02773e38a..6681d6d1d2c 100644 --- a/core/src/main/java/org/opensearch/sql/ast/statement/Query.java +++ b/core/src/main/java/org/opensearch/sql/ast/statement/Query.java @@ -7,6 +7,7 @@ import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.ToString; import org.opensearch.sql.ast.AbstractNodeVisitor; @@ -18,18 +19,13 @@ @Setter @ToString @EqualsAndHashCode(callSuper = false) +@RequiredArgsConstructor public class Query extends Statement { protected final UnresolvedPlan plan; protected final int fetchSize; private final QueryType queryType; - public Query(UnresolvedPlan plan, int fetchSize, QueryType queryType) { - this.plan = plan; - this.fetchSize = fetchSize; - this.queryType = queryType; - } - @Override public R accept(AbstractNodeVisitor visitor, C context) { return visitor.visitQuery(this, context); diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index 63491000d43..b2c219b0e4e 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -100,9 +100,9 @@ public void executeWithCalcite( QueryProfiling.activate(QueryContext.isProfileEnabled()); ProfileMetric analyzeMetric = profileContext.getOrCreateMetric(MetricName.ANALYZE); long analyzeStart = System.nanoTime(); - SysLimit sysLimit = SysLimit.fromSettings(settings); CalcitePlanContext context = - CalcitePlanContext.create(buildFrameworkConfig(), sysLimit, queryType); + CalcitePlanContext.create( + buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType); RelNode relNode = analyze(plan, context); RelNode calcitePlan = convertToCalcitePlan(relNode, context); analyzeMetric.set(System.nanoTime() - analyzeStart); @@ -236,18 +236,16 @@ public void executePlan( .getSplit() .ifPresentOrElse( split -> executionEngine.execute(plan(plan), new ExecutionContext(split), listener), - () -> { - Integer effectiveLimit; - if (plan instanceof LogicalPaginate) { - // For pagination, querySizeLimit shouldn't take effect. - // See {@link PaginationWindowIT::testQuerySizeLimitDoesNotEffectPageSize} - effectiveLimit = null; - } else { - effectiveLimit = SysLimit.fromSettings(settings).querySizeLimit(); - } - executionEngine.execute( - plan(plan), ExecutionContext.querySizeLimit(effectiveLimit), listener); - }); + () -> + executionEngine.execute( + plan(plan), + ExecutionContext.querySizeLimit( + // For pagination, querySizeLimit shouldn't take effect. + // See {@link PaginationWindowIT::testQuerySizeLimitDoesNotEffectPageSize} + plan instanceof LogicalPaginate + ? null + : SysLimit.fromSettings(settings).querySizeLimit()), + listener)); } catch (Exception e) { listener.onFailure(e); } diff --git a/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java index ce6018a1d1a..5896871f5d0 100644 --- a/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java +++ b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java @@ -29,7 +29,7 @@ public class QueryPlan extends AbstractPlan { protected final Optional pageSize; - /** Constructor without page size. */ + /** Constructor. */ public QueryPlan( QueryId queryId, QueryType queryType, @@ -43,7 +43,7 @@ public QueryPlan( this.pageSize = Optional.empty(); } - /** Constructor with page size (for pagination). */ + /** Constructor with page size. */ public QueryPlan( QueryId queryId, QueryType queryType, From a0aaeeb25194ad41c8e033ab0b4d637979308b7f Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 4 Feb 2026 10:54:47 -0800 Subject: [PATCH 3/7] remove upper limit Signed-off-by: Kai Huang --- docs/user/interfaces/endpoint.rst | 4 +--- .../java/org/opensearch/sql/ppl/FetchSizeIT.java | 12 ------------ .../sql/ppl/domain/PPLQueryRequest.java | 15 ++++----------- .../sql/ppl/domain/PPLQueryRequestTest.java | 11 +---------- 4 files changed, 6 insertions(+), 36 deletions(-) diff --git a/docs/user/interfaces/endpoint.rst b/docs/user/interfaces/endpoint.rst index 4cf63cd2934..ca0cfd55cba 100644 --- a/docs/user/interfaces/endpoint.rst +++ b/docs/user/interfaces/endpoint.rst @@ -273,7 +273,7 @@ Fetch Size (PPL) Description ----------- -PPL also supports the ``fetch_size`` parameter, but with different semantics from SQL. In PPL, ``fetch_size`` limits the number of rows returned in a single, complete response. **PPL does not support cursor-based pagination** — no cursor is returned and there is no way to fetch additional pages. The value of ``fetch_size`` should be between ``1`` and ``10000``. In absence of ``fetch_size`` or a value of ``0``, it will use the system default behavior (no limit). +PPL also supports the ``fetch_size`` parameter, but with different semantics from SQL. In PPL, ``fetch_size`` limits the number of rows returned in a single, complete response. **PPL does not support cursor-based pagination** — no cursor is returned and there is no way to fetch additional pages. The value of ``fetch_size`` should be greater than ``0``. In absence of ``fetch_size`` or a value of ``0``, it will use the system default behavior (no limit). The effective upper bound is governed by the ``plugins.query.size_limit`` cluster setting (defaults to ``index.max_result_window``, which is 10000). +--------------------+-------------------------------------+------------------------------------+ | Aspect | SQL ``fetch_size`` | PPL ``fetch_size`` | @@ -284,8 +284,6 @@ PPL also supports the ``fetch_size`` parameter, but with different semantics fro +--------------------+-------------------------------------+------------------------------------+ | Can fetch more? | Yes (with cursor) | No (single response) | +--------------------+-------------------------------------+------------------------------------+ -| Maximum value | No hard limit | 10,000 | -+--------------------+-------------------------------------+------------------------------------+ Example ------- diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/FetchSizeIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/FetchSizeIT.java index ee8006d2441..cd7c13ea3f6 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/FetchSizeIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/FetchSizeIT.java @@ -19,7 +19,6 @@ import org.opensearch.client.Request; import org.opensearch.client.RequestOptions; import org.opensearch.client.Response; -import org.opensearch.client.ResponseException; /** Integration tests for PPL fetch_size parameter. */ public class FetchSizeIT extends PPLIntegTestCase { @@ -166,17 +165,6 @@ public void testFetchSizeSmallerThanHead() throws IOException { assertEquals(5, dataRows.length()); } - @Test - public void testFetchSizeExceedsMaxReturnsError() throws IOException { - // fetch_size > 10000 should return an error - ResponseException exception = - assertThrows( - ResponseException.class, - () -> executeQueryWithFetchSize(String.format("source=%s", TEST_INDEX_ACCOUNT), 15000)); - String responseBody = getResponseBody(exception.getResponse(), true); - assertTrue(responseBody.contains("fetch_size")); - } - @Test public void testWithoutFetchSizeReturnsDefaultBehavior() throws IOException { // Without fetch_size, should return results up to system default diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java b/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java index 7280f4b7db0..3ae300e995b 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java @@ -5,7 +5,6 @@ package org.opensearch.sql.ppl.domain; -import java.util.Locale; import java.util.Optional; import lombok.Getter; import lombok.Setter; @@ -19,7 +18,6 @@ public class PPLQueryRequest { private static final String DEFAULT_PPL_PATH = "/_plugins/_ppl"; private static final String FETCH_SIZE_FIELD = "fetch_size"; - private static final int MAX_FETCH_SIZE = 10000; public static final PPLQueryRequest NULL = new PPLQueryRequest("", null, DEFAULT_PPL_PATH, ""); @@ -98,21 +96,16 @@ public ExplainMode mode() { /** * Get the maximum number of results to return. Unlike SQL's fetch_size which enables cursor-based - * pagination, PPL's fetch_size simply limits the response to N rows without cursor support. + * pagination, PPL's fetch_size simply limits the response to N rows without cursor support. The + * effective upper bound is governed by the {@code plugins.query.size_limit} cluster setting + * (defaults to {@code index.max_result_window}, which is 10000). * * @return fetch_size value from request, or 0 if not specified (meaning use system default) - * @throws IllegalArgumentException if fetch_size exceeds MAX_FETCH_SIZE (10000) */ public int getFetchSize() { if (jsonContent == null) { return 0; } - int fetchSize = jsonContent.optInt(FETCH_SIZE_FIELD, 0); - if (fetchSize > MAX_FETCH_SIZE) { - throw new IllegalArgumentException( - String.format( - Locale.ROOT, "fetch_size must be less than or equal to %d", MAX_FETCH_SIZE)); - } - return fetchSize; + return jsonContent.optInt(FETCH_SIZE_FIELD, 0); } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/domain/PPLQueryRequestTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/domain/PPLQueryRequestTest.java index e357a857d0c..fcc6538767a 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/domain/PPLQueryRequestTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/domain/PPLQueryRequestTest.java @@ -79,17 +79,8 @@ public void testGetFetchSizeReturnsZeroWhenJsonContentIsNull() { @Test public void testGetFetchSizeWithLargeValue() { - JSONObject json = new JSONObject("{\"query\": \"source=t\", \"fetch_size\": 10000}"); - PPLQueryRequest request = new PPLQueryRequest("source=t", json, "/_plugins/_ppl"); - assertEquals(10000, request.getFetchSize()); - } - - @Test - public void testGetFetchSizeThrowsWhenExceedsMax() { JSONObject json = new JSONObject("{\"query\": \"source=t\", \"fetch_size\": 15000}"); PPLQueryRequest request = new PPLQueryRequest("source=t", json, "/_plugins/_ppl"); - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("fetch_size must be less than or equal to 10000"); - request.getFetchSize(); + assertEquals(15000, request.getFetchSize()); } } From 8c3f5f0c7386ca9568bf2887be9459957fdef4b7 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 4 Feb 2026 11:03:44 -0800 Subject: [PATCH 4/7] Support fetch_size as a URL parameter Signed-off-by: Kai Huang --- .../org/opensearch/sql/ppl/FetchSizeIT.java | 36 +++++++++++++++++++ .../request/PPLQueryRequestFactory.java | 8 +++++ .../sql/plugin/rest/RestPPLQueryAction.java | 2 +- .../sql/ppl/domain/PPLQueryRequest.java | 1 + 4 files changed, 46 insertions(+), 1 deletion(-) diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/FetchSizeIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/FetchSizeIT.java index cd7c13ea3f6..243e383f0a0 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/FetchSizeIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/FetchSizeIT.java @@ -165,6 +165,42 @@ public void testFetchSizeSmallerThanHead() throws IOException { assertEquals(5, dataRows.length()); } + @Test + public void testFetchSizeAsUrlParameter() throws IOException { + // fetch_size specified as URL parameter instead of JSON body + Request request = new Request("POST", QUERY_API_ENDPOINT + "?fetch_size=5"); + String jsonBody = + String.format( + Locale.ROOT, "{\n \"query\": \"source=%s | fields firstname\"\n}", TEST_INDEX_ACCOUNT); + request.setJsonEntity(jsonBody); + RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + restOptionsBuilder.addHeader("Content-Type", "application/json"); + request.setOptions(restOptionsBuilder); + Response response = client().performRequest(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + JSONObject result = jsonify(getResponseBody(response, true)); + assertEquals(5, result.getJSONArray("datarows").length()); + } + + @Test + public void testFetchSizeJsonBodyTakesPrecedenceOverUrlParam() throws IOException { + // JSON body fetch_size=3 should take precedence over URL param fetch_size=10 + Request request = new Request("POST", QUERY_API_ENDPOINT + "?fetch_size=10"); + String jsonBody = + String.format( + Locale.ROOT, + "{\n \"query\": \"source=%s | fields firstname\",\n \"fetch_size\": 3\n}", + TEST_INDEX_ACCOUNT); + request.setJsonEntity(jsonBody); + RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + restOptionsBuilder.addHeader("Content-Type", "application/json"); + request.setOptions(restOptionsBuilder); + Response response = client().performRequest(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + JSONObject result = jsonify(getResponseBody(response, true)); + assertEquals(3, result.getJSONArray("datarows").length()); + } + @Test public void testWithoutFetchSizeReturnsDefaultBehavior() throws IOException { // Without fetch_size, should return results up to system default diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java b/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java index 0dd8e1a9651..0b458b59305 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java @@ -30,6 +30,7 @@ public class PPLQueryRequestFactory { private static final String DEFAULT_EXPLAIN_MODE = "standard"; private static final String QUERY_PARAMS_PRETTY = "pretty"; private static final String QUERY_PARAMS_PROFILE = "profile"; + private static final String QUERY_PARAMS_FETCH_SIZE = "fetch_size"; /** * Build {@link PPLQueryRequest} from {@link RestRequest}. @@ -84,6 +85,13 @@ private static PPLQueryRequest parsePPLRequestFromPayload(RestRequest restReques String queryString = jsonContent.optString(PPL_FIELD_NAME, ""); boolean enableProfile = profileRequested && isProfileSupported(restRequest.path(), format, queryString); + // Support fetch_size as a URL parameter if not already in the JSON body + if (!jsonContent.has(QUERY_PARAMS_FETCH_SIZE) + && restRequest.params().containsKey(QUERY_PARAMS_FETCH_SIZE)) { + jsonContent.put( + QUERY_PARAMS_FETCH_SIZE, + Integer.parseInt(restRequest.params().get(QUERY_PARAMS_FETCH_SIZE))); + } PPLQueryRequest pplRequest = new PPLQueryRequest( jsonContent.getString(PPL_FIELD_NAME), diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java index 54efb861cf5..ffdd90504f7 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java @@ -77,7 +77,7 @@ public String getName() { @Override protected Set responseParams() { Set responseParams = new HashSet<>(super.responseParams()); - responseParams.addAll(Arrays.asList("format", "mode", "sanitize")); + responseParams.addAll(Arrays.asList("format", "mode", "sanitize", "fetch_size")); return responseParams; } diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java b/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java index 3ae300e995b..caf666b3b4e 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java @@ -5,6 +5,7 @@ package org.opensearch.sql.ppl.domain; +import java.util.Locale; import java.util.Optional; import lombok.Getter; import lombok.Setter; From 899ef016f713d0809fe696e9df84c1cc7c398d09 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 4 Feb 2026 11:18:11 -0800 Subject: [PATCH 5/7] add Test coverage Signed-off-by: Kai Huang --- docs/user/interfaces/endpoint.rst | 13 ++++++++- .../org/opensearch/sql/ppl/FetchSizeIT.java | 28 +++++++++++++++++++ .../request/PPLQueryRequestFactory.java | 11 ++++++-- .../sql/ppl/domain/PPLQueryRequestTest.java | 9 ++++++ 4 files changed, 57 insertions(+), 4 deletions(-) diff --git a/docs/user/interfaces/endpoint.rst b/docs/user/interfaces/endpoint.rst index ca0cfd55cba..81e6dbe7095 100644 --- a/docs/user/interfaces/endpoint.rst +++ b/docs/user/interfaces/endpoint.rst @@ -275,6 +275,8 @@ Description PPL also supports the ``fetch_size`` parameter, but with different semantics from SQL. In PPL, ``fetch_size`` limits the number of rows returned in a single, complete response. **PPL does not support cursor-based pagination** — no cursor is returned and there is no way to fetch additional pages. The value of ``fetch_size`` should be greater than ``0``. In absence of ``fetch_size`` or a value of ``0``, it will use the system default behavior (no limit). The effective upper bound is governed by the ``plugins.query.size_limit`` cluster setting (defaults to ``index.max_result_window``, which is 10000). +``fetch_size`` can be specified either as a URL parameter or in the JSON request body. If both are provided, the JSON body value takes precedence. + +--------------------+-------------------------------------+------------------------------------+ | Aspect | SQL ``fetch_size`` | PPL ``fetch_size`` | +====================+=====================================+====================================+ @@ -285,7 +287,7 @@ PPL also supports the ``fetch_size`` parameter, but with different semantics fro | Can fetch more? | Yes (with cursor) | No (single response) | +--------------------+-------------------------------------+------------------------------------+ -Example +Example 1: JSON body ------- PPL query:: @@ -295,6 +297,15 @@ PPL query:: "query" : "source = accounts | fields firstname, lastname | where age > 20" }' +Example 2: URL parameter +------- + +PPL query:: + + >> curl -H 'Content-Type: application/json' -X POST localhost:9200/_plugins/_ppl?fetch_size=5 -d '{ + "query" : "source = accounts | fields firstname, lastname | where age > 20" + }' + Result set:: { diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/FetchSizeIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/FetchSizeIT.java index 243e383f0a0..abd75536d55 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/FetchSizeIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/FetchSizeIT.java @@ -123,6 +123,34 @@ public void testFetchSizeLargerThanDataset() throws IOException { assertEquals(7, dataRows.length()); } + @Test + public void testFetchSizeAtSystemLimit() throws IOException { + // fetch_size at the default system limit (10000) should work without error + JSONObject result = + executeQueryWithFetchSize(String.format("source=%s", TEST_INDEX_BANK), 10000); + JSONArray dataRows = result.getJSONArray("datarows"); + // Bank index has 7 documents, so we get all of them + assertEquals(7, dataRows.length()); + } + + @Test + public void testFetchSizeExceedingSystemLimitIsCapped() throws IOException { + // fetch_size > system limit (10000) is accepted but capped by LogicalSystemLimit + JSONObject result = + executeQueryWithFetchSize(String.format("source=%s", TEST_INDEX_BANK), 10001); + JSONArray dataRows = result.getJSONArray("datarows"); + // Bank index has 7 documents, result is capped by system limit but dataset is smaller + assertEquals(7, dataRows.length()); + } + + @Test + public void testNegativeFetchSizeReturnsAllResults() throws IOException { + // Negative fetch_size is treated as "no limit" (same as 0) + JSONObject result = executeQueryWithFetchSize(String.format("source=%s", TEST_INDEX_BANK), -1); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(7, dataRows.length()); + } + @Test public void testFetchSizeOne() throws IOException { JSONObject result = diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java b/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java index 0b458b59305..0d07dab966a 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java @@ -88,9 +88,14 @@ private static PPLQueryRequest parsePPLRequestFromPayload(RestRequest restReques // Support fetch_size as a URL parameter if not already in the JSON body if (!jsonContent.has(QUERY_PARAMS_FETCH_SIZE) && restRequest.params().containsKey(QUERY_PARAMS_FETCH_SIZE)) { - jsonContent.put( - QUERY_PARAMS_FETCH_SIZE, - Integer.parseInt(restRequest.params().get(QUERY_PARAMS_FETCH_SIZE))); + try { + jsonContent.put( + QUERY_PARAMS_FETCH_SIZE, + Integer.parseInt(restRequest.params().get(QUERY_PARAMS_FETCH_SIZE))); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "Invalid fetch_size parameter: must be a valid integer", e); + } } PPLQueryRequest pplRequest = new PPLQueryRequest( diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/domain/PPLQueryRequestTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/domain/PPLQueryRequestTest.java index fcc6538767a..9fe3a4fef64 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/domain/PPLQueryRequestTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/domain/PPLQueryRequestTest.java @@ -77,6 +77,15 @@ public void testGetFetchSizeReturnsZeroWhenJsonContentIsNull() { assertEquals(0, request.getFetchSize()); } + @Test + public void testGetFetchSizeHandlesExplicitNull() { + JSONObject json = new JSONObject(); + json.put("query", "source=t"); + json.put("fetch_size", JSONObject.NULL); + PPLQueryRequest request = new PPLQueryRequest("source=t", json, "/_plugins/_ppl"); + assertEquals(0, request.getFetchSize()); + } + @Test public void testGetFetchSizeWithLargeValue() { JSONObject json = new JSONObject("{\"query\": \"source=t\", \"fetch_size\": 15000}"); From a4daaeae842daa904bfb853989ab18e22a1f6393 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 4 Feb 2026 13:42:44 -0800 Subject: [PATCH 6/7] update docs Signed-off-by: Kai Huang --- docs/user/interfaces/endpoint.rst | 32 +++++++++++++----------- docs/user/ppl/limitations/limitations.md | 2 +- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/docs/user/interfaces/endpoint.rst b/docs/user/interfaces/endpoint.rst index 81e6dbe7095..4023f7dd7a6 100644 --- a/docs/user/interfaces/endpoint.rst +++ b/docs/user/interfaces/endpoint.rst @@ -267,25 +267,31 @@ Result set:: "status": 200 } -Fetch Size (PPL) -================ +Fetch Size (PPL) [Experimental] +================================ Description ----------- -PPL also supports the ``fetch_size`` parameter, but with different semantics from SQL. In PPL, ``fetch_size`` limits the number of rows returned in a single, complete response. **PPL does not support cursor-based pagination** — no cursor is returned and there is no way to fetch additional pages. The value of ``fetch_size`` should be greater than ``0``. In absence of ``fetch_size`` or a value of ``0``, it will use the system default behavior (no limit). The effective upper bound is governed by the ``plugins.query.size_limit`` cluster setting (defaults to ``index.max_result_window``, which is 10000). +The ``fetch_size`` parameter limits the number of rows returned in a PPL query response. The value of ``fetch_size`` should be greater than ``0``. In absence of ``fetch_size`` or a value of ``0``, the result size is governed by the ``plugins.query.size_limit`` cluster setting. ``fetch_size`` can be specified either as a URL parameter or in the JSON request body. If both are provided, the JSON body value takes precedence. -+--------------------+-------------------------------------+------------------------------------+ -| Aspect | SQL ``fetch_size`` | PPL ``fetch_size`` | -+====================+=====================================+====================================+ -| Purpose | Cursor-based pagination | Response size limiting | -+--------------------+-------------------------------------+------------------------------------+ -| Returns cursor? | Yes | No | -+--------------------+-------------------------------------+------------------------------------+ -| Can fetch more? | Yes (with cursor) | No (single response) | -+--------------------+-------------------------------------+------------------------------------+ +If ``fetch_size`` is larger than ``plugins.query.size_limit``, the result is capped at ``plugins.query.size_limit``. The effective number of rows returned is always ``min(fetch_size, plugins.query.size_limit)``. + +.. note:: + + Unlike SQL's ``fetch_size`` which enables cursor-based pagination, PPL's ``fetch_size`` does not return a cursor and does not support fetching additional pages. The response is always complete and final. + + +--------------------+-------------------------------------+------------------------------------+ + | Aspect | SQL ``fetch_size`` | PPL ``fetch_size`` | + +====================+=====================================+====================================+ + | Purpose | Cursor-based pagination | Response size limiting | + +--------------------+-------------------------------------+------------------------------------+ + | Returns cursor? | Yes | No | + +--------------------+-------------------------------------+------------------------------------+ + | Can fetch more? | Yes (with cursor) | No (single response) | + +--------------------+-------------------------------------+------------------------------------+ Example 1: JSON body ------- @@ -330,5 +336,3 @@ Result set:: "size": 5, "status": 200 } - -Note that unlike the SQL response above, there is no ``cursor`` field in the PPL response. The response is complete and final. diff --git a/docs/user/ppl/limitations/limitations.md b/docs/user/ppl/limitations/limitations.md index eea7b884d97..e532f64a790 100644 --- a/docs/user/ppl/limitations/limitations.md +++ b/docs/user/ppl/limitations/limitations.md @@ -62,7 +62,7 @@ For the following functionalities, the query will be forwarded to the V2 query e * ML * Kmeans * `show datasources` and command -* SQL commands with `fetch_size` parameter (cursor-based pagination). Note: PPL's `fetch_size` (response size limiting, no cursor) is supported in Calcite Engine. +* SQL queries with `fetch_size` parameter (cursor-based pagination). Note: PPL's `fetch_size` (response size limiting, no cursor) is supported in Calcite Engine. ## Malformed Field Names in Object Fields From 672878e08146c9abdd5a8620a57d3aec440cfa61 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 4 Feb 2026 13:55:17 -0800 Subject: [PATCH 7/7] formatting Signed-off-by: Kai Huang --- docs/user/interfaces/endpoint.rst | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/docs/user/interfaces/endpoint.rst b/docs/user/interfaces/endpoint.rst index 4023f7dd7a6..26dca94d228 100644 --- a/docs/user/interfaces/endpoint.rst +++ b/docs/user/interfaces/endpoint.rst @@ -279,19 +279,20 @@ The ``fetch_size`` parameter limits the number of rows returned in a PPL query r If ``fetch_size`` is larger than ``plugins.query.size_limit``, the result is capped at ``plugins.query.size_limit``. The effective number of rows returned is always ``min(fetch_size, plugins.query.size_limit)``. -.. note:: - - Unlike SQL's ``fetch_size`` which enables cursor-based pagination, PPL's ``fetch_size`` does not return a cursor and does not support fetching additional pages. The response is always complete and final. - - +--------------------+-------------------------------------+------------------------------------+ - | Aspect | SQL ``fetch_size`` | PPL ``fetch_size`` | - +====================+=====================================+====================================+ - | Purpose | Cursor-based pagination | Response size limiting | - +--------------------+-------------------------------------+------------------------------------+ - | Returns cursor? | Yes | No | - +--------------------+-------------------------------------+------------------------------------+ - | Can fetch more? | Yes (with cursor) | No (single response) | - +--------------------+-------------------------------------+------------------------------------+ +Note +---- + +Unlike SQL's ``fetch_size`` which enables cursor-based pagination, PPL's ``fetch_size`` does not return a cursor and does not support fetching additional pages. The response is always complete and final. + ++--------------------+-------------------------------------+------------------------------------+ +| Aspect | SQL ``fetch_size`` | PPL ``fetch_size`` | ++====================+=====================================+====================================+ +| Purpose | Cursor-based pagination | Response size limiting | ++--------------------+-------------------------------------+------------------------------------+ +| Returns cursor? | Yes | No | ++--------------------+-------------------------------------+------------------------------------+ +| Can fetch more? | Yes (with cursor) | No (single response) | ++--------------------+-------------------------------------+------------------------------------+ Example 1: JSON body -------