From f86b01cc565d922fd1fb694992680b3daa15146b Mon Sep 17 00:00:00 2001 From: Aaron Alvarez Date: Wed, 31 Dec 2025 15:16:37 -0800 Subject: [PATCH 1/7] Fix PIT context leak in Legacy SQL for non-paginated queries The Legacy SQL engine created PIT contexts for all queries but only cleaned them up for paginated queries (fetch_size > 0). This caused PIT accumulation until the 300 limit was hit. Solution: - Only create PIT when fetch_size > 0 - Add try-catch-finally for proper PIT cleanup - Add null safety check before cursor creation Testing: 9 unit tests + 3 integration tests added Resolves #5002 Signed-off-by: Aaron Alvarez --- .../sql/legacy/PointInTimeLeakIT.java | 225 ++++++++++++++++++ .../format/PrettyFormatRestExecutor.java | 77 ++++-- .../PrettyFormatRestExecutorPitTest.java | 208 ++++++++++++++++ 3 files changed, 489 insertions(+), 21 deletions(-) create mode 100644 integ-test/src/test/java/org/opensearch/sql/legacy/PointInTimeLeakIT.java create mode 100644 legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorPitTest.java diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/PointInTimeLeakIT.java b/integ-test/src/test/java/org/opensearch/sql/legacy/PointInTimeLeakIT.java new file mode 100644 index 00000000000..4864a8f6d6c --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/PointInTimeLeakIT.java @@ -0,0 +1,225 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.legacy; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +import java.io.IOException; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.Before; +import org.junit.Test; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.client.ResponseException; +import org.opensearch.sql.legacy.utils.StringUtils; + +/** + * Integration test to verify Point-in-Time (PIT) context leak fix in Legacy SQL engine. + * + *

Fix: When queries are executed without fetch_size (non-paginated), the Legacy SQL engine now + * conditionally creates PIT contexts only when pagination is requested (fetch_size > 0) and + * properly cleans them up when not used for cursor-based pagination. + * + *

This test verifies that: + * + *

+ * + * @see Issue #5002 + */ +public class PointInTimeLeakIT extends SQLIntegTestCase { + + private static final String TEST_INDEX = "test-logs-2025.01.01"; + private static final String PIT_STATS_ENDPOINT = + "/_nodes/stats/indices/search?filter_path=nodes.*.indices.search.point_in_time_current"; + + @Before + public void setUpTestIndex() throws IOException { + try { + executeRequest(new Request("DELETE", "/" + TEST_INDEX)); + } catch (ResponseException e) { + // Ignore 404 - index doesn't exist, which is expected in clean state + if (e.getResponse().getStatusLine().getStatusCode() != 404) { + throw e; + } + } + + Request createIndex = new Request("PUT", "/" + TEST_INDEX); + createIndex.setJsonEntity( + "{ \"mappings\": { \"properties\": { \"action\": {\"type\": \"text\", \"fields\":" + + " {\"keyword\": {\"type\": \"keyword\"}}}, \"timestamp\": {\"type\": \"date\"} " + + " } }}"); + executeRequest(createIndex); + + Request bulkRequest = new Request("POST", "/" + TEST_INDEX + "/_bulk"); + bulkRequest.addParameter("refresh", "true"); + bulkRequest.setJsonEntity( + "{\"index\":{}}\n" + + "{\"action\":\"login_success\",\"timestamp\":\"2025-01-01T10:00:00Z\"}\n" + + "{\"index\":{}}\n" + + "{\"action\":\"login_success\",\"timestamp\":\"2025-01-01T10:01:00Z\"}\n" + + "{\"index\":{}}\n" + + "{\"action\":\"login_failed\",\"timestamp\":\"2025-01-01T10:02:00Z\"}\n"); + executeRequest(bulkRequest); + } + + /** + * Test verifying that PIT leak is fixed when executing queries without fetch_size. + * + *

This test: + * + *

+ */ + @Test + public void testNoPitLeakWithoutFetchSize() throws IOException, InterruptedException { + int baselinePitCount = getCurrentPitCount(); + System.out.println("Baseline PIT count: " + baselinePitCount); + + int numQueries = 10; + + for (int i = 0; i < numQueries; i++) { + String query = + StringUtils.format( + "SELECT * FROM %s WHERE action LIKE 'login%%' ORDER BY timestamp ASC", TEST_INDEX); + + JSONObject response = executeQueryWithoutFetchSize(query); + + assertTrue("Query should succeed", response.has("datarows")); + JSONArray dataRows = response.getJSONArray("datarows"); + assertThat("Should return results", dataRows.length(), greaterThan(0)); + assertFalse("Should not have cursor for non-paginated query", response.has("cursor")); + + System.out.println( + String.format( + "[%d/%d] Query executed, returned %d rows", i + 1, numQueries, dataRows.length())); + } + + int currentPitCount = getCurrentPitCount(); + int leakedPits = currentPitCount - baselinePitCount; + + System.out.println( + String.format( + "After %d queries: Current PIT count = %d, Leaked PITs = %d", + numQueries, currentPitCount, leakedPits)); + + assertThat("No PITs should leak after fix", leakedPits, equalTo(0)); + + System.out.println("✓ FIX VERIFIED: No PIT contexts leaked!"); + } + + /** + * Test showing expected behavior: queries with fetch_size properly manage PITs. + * + *

When fetch_size is specified, PITs are properly managed through cursor lifecycle. + */ + @Test + public void testPitManagedProperlyWithFetchSize() throws IOException { + int baselinePitCount = getCurrentPitCount(); + + String query = + StringUtils.format( + "SELECT * FROM %s WHERE action LIKE 'login%%' ORDER BY timestamp ASC", TEST_INDEX); + + JSONObject response = executeQueryWithFetchSize(query, 2); + + assertTrue("Should have cursor with fetch_size", response.has("cursor")); + String cursor = response.getString("cursor"); + + JSONObject closeResponse = executeCursorCloseQuery(cursor); + assertTrue("Cursor close should succeed", closeResponse.getBoolean("succeeded")); + + int finalPitCount = getCurrentPitCount(); + + System.out.println( + String.format( + "With proper cursor management: Baseline=%d, Final=%d", + baselinePitCount, finalPitCount)); + } + + /** + * Test comparing Legacy SQL vs V2 SQL engine behavior. V2 engine may create PITs internally but + * cleans them up properly. + */ + @Test + public void testCompareV1AndV2EnginePitBehavior() throws IOException { + int baselinePitCount = getCurrentPitCount(); + + String v1Query = + StringUtils.format( + "SELECT * FROM %s WHERE action LIKE 'login%%' ORDER BY timestamp ASC", TEST_INDEX); + + JSONObject v1Response = executeQueryWithoutFetchSize(v1Query); + int afterV1PitCount = getCurrentPitCount(); + int v1Leaked = afterV1PitCount - baselinePitCount; + + System.out.println(String.format("V1 Legacy SQL: PITs leaked = %d", v1Leaked)); + + String v2Query = + StringUtils.format( + "SELECT * FROM `%s` WHERE action LIKE 'login%%' ORDER BY timestamp ASC", TEST_INDEX); + + JSONObject v2Response = executeQueryWithoutFetchSize(v2Query); + int afterV2PitCount = getCurrentPitCount(); + + System.out.println( + String.format("After V1 and V2 queries: Total PIT count = %d", afterV2PitCount)); + + assertTrue("V1 should return results", v1Response.has("datarows")); + assertTrue("V2 should return results", v2Response.has("datarows")); + } + + private JSONObject executeQueryWithoutFetchSize(String query) throws IOException { + Request sqlRequest = new Request("POST", "/_plugins/_sql?format=jdbc"); + sqlRequest.setJsonEntity(String.format("{\"query\": \"%s\"}", query)); + + Response response = client().performRequest(sqlRequest); + return new JSONObject(TestUtils.getResponseBody(response)); + } + + private JSONObject executeQueryWithFetchSize(String query, int fetchSize) throws IOException { + Request sqlRequest = new Request("POST", "/_plugins/_sql?format=jdbc"); + sqlRequest.setJsonEntity( + String.format("{\"query\": \"%s\", \"fetch_size\": %d}", query, fetchSize)); + + Response response = client().performRequest(sqlRequest); + return new JSONObject(TestUtils.getResponseBody(response)); + } + + private int getCurrentPitCount() throws IOException { + Request statsRequest = new Request("GET", PIT_STATS_ENDPOINT); + Response response = client().performRequest(statsRequest); + JSONObject stats = new JSONObject(TestUtils.getResponseBody(response)); + + int totalPits = 0; + if (stats.has("nodes")) { + JSONObject nodes = stats.getJSONObject("nodes"); + for (String nodeId : nodes.keySet()) { + JSONObject node = nodes.getJSONObject(nodeId); + if (node.has("indices")) { + JSONObject indices = node.getJSONObject("indices"); + if (indices.has("search")) { + JSONObject search = indices.getJSONObject("search"); + if (search.has("point_in_time_current")) { + totalPits += search.getInt("point_in_time_current"); + } + } + } + } + } + + return totalPits; + } +} diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java index 9f613f68c33..05bd8bd1221 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java @@ -21,11 +21,12 @@ import org.opensearch.sql.legacy.exception.SqlParseException; import org.opensearch.sql.legacy.executor.QueryActionElasticExecutor; import org.opensearch.sql.legacy.executor.RestExecutor; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.legacy.pit.PointInTimeHandler; import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl; import org.opensearch.sql.legacy.query.DefaultQueryAction; import org.opensearch.sql.legacy.query.QueryAction; -import org.opensearch.sql.legacy.query.SqlOpenSearchRequestBuilder; import org.opensearch.sql.legacy.query.join.BackOffRetryStrategy; import org.opensearch.transport.client.Client; @@ -91,41 +92,75 @@ public String execute(Client client, Map params, QueryAction que * QueryActionElasticExecutor.executeAnyAction() returns SearchHits inside SearchResponse. In * order to get scroll ID if any, we need to execute DefaultQueryAction ourselves for * SearchResponse. + * + *

This method conditionally creates PIT only when pagination is requested (fetch_size > 0) and + * ensures proper cleanup of PIT resources when they are not used for cursor-based pagination. */ private Protocol buildProtocolForDefaultQuery(Client client, DefaultQueryAction queryAction) throws SqlParseException { PointInTimeHandler pit = null; SearchResponse response; - SqlOpenSearchRequestBuilder sqlOpenSearchRequestBuilder = queryAction.explain(); - - pit = new PointInTimeHandlerImpl(client, queryAction.getSelect().getIndexArr()); - pit.create(); - SearchRequestBuilder searchRequest = queryAction.getRequestBuilder(); - searchRequest.setPointInTime(new PointInTimeBuilder(pit.getPitId())); - response = searchRequest.get(); - Protocol protocol; - if (isDefaultCursor(response, queryAction)) { - DefaultCursor defaultCursor = new DefaultCursor(); - defaultCursor.setLimit(queryAction.getSelect().getRowCount()); - defaultCursor.setFetchSize(queryAction.getSqlRequest().fetchSize()); + boolean cursorCreated = false; - defaultCursor.setPitId(pit.getPitId()); - defaultCursor.setSearchSourceBuilder(queryAction.getRequestBuilder().request().source()); - defaultCursor.setSortFields( - response.getHits().getAt(response.getHits().getHits().length - 1).getSortValues()); + queryAction.explain(); - protocol = new Protocol(client, queryAction, response.getHits(), format, defaultCursor); - } else { - protocol = new Protocol(client, queryAction, response.getHits(), format, Cursor.NULL_CURSOR); + int fetchSize = queryAction.getSqlRequest().fetchSize(); + if (fetchSize > 0) { + pit = new PointInTimeHandlerImpl(client, queryAction.getSelect().getIndexArr()); + pit.create(); + } + + try { + SearchRequestBuilder searchRequest = queryAction.getRequestBuilder(); + if (pit != null) { + searchRequest.setPointInTime(new PointInTimeBuilder(pit.getPitId())); + } + response = searchRequest.get(); + + if (pit != null && isDefaultCursor(response, queryAction)) { + DefaultCursor defaultCursor = new DefaultCursor(); + defaultCursor.setLimit(queryAction.getSelect().getRowCount()); + defaultCursor.setFetchSize(fetchSize); + defaultCursor.setPitId(pit.getPitId()); + defaultCursor.setSearchSourceBuilder(queryAction.getRequestBuilder().request().source()); + defaultCursor.setSortFields( + response.getHits().getAt(response.getHits().getHits().length - 1).getSortValues()); + + protocol = new Protocol(client, queryAction, response.getHits(), format, defaultCursor); + cursorCreated = true; + } else { + protocol = + new Protocol(client, queryAction, response.getHits(), format, Cursor.NULL_CURSOR); + } + } catch (Exception e) { + if (pit != null) { + try { + pit.delete(); + } catch (RuntimeException cleanupException) { + LOG.error("Failed to delete PIT during exception handling", cleanupException); + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + } + } + throw e; + } finally { + // Cursor owns PIT lifecycle when created; otherwise clean up here + if (pit != null && !cursorCreated) { + try { + pit.delete(); + } catch (RuntimeException e) { + LOG.error("Failed to delete PIT in finally block", e); + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + } + } } return protocol; } protected boolean isDefaultCursor(SearchResponse searchResponse, DefaultQueryAction queryAction) { - return queryAction.getSqlRequest().fetchSize() != 0 + return queryAction.getSqlRequest().fetchSize() > 0 && Objects.requireNonNull(searchResponse.getHits().getTotalHits()).value() >= queryAction.getSqlRequest().fetchSize(); } diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorPitTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorPitTest.java new file mode 100644 index 00000000000..46fa85081d3 --- /dev/null +++ b/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorPitTest.java @@ -0,0 +1,208 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.legacy.executor.format; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import org.apache.lucene.search.TotalHits; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.opensearch.action.search.SearchRequestBuilder; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; +import org.opensearch.search.builder.PointInTimeBuilder; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; +import org.opensearch.sql.legacy.query.DefaultQueryAction; +import org.opensearch.sql.legacy.query.SqlOpenSearchRequestBuilder; +import org.opensearch.sql.legacy.request.SqlRequest; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; +import org.opensearch.transport.client.Client; + +/** + * Unit tests for PIT lifecycle management in PrettyFormatRestExecutor. + * + *

These tests verify that: + * + *

+ * + *

Note: Due to the private nature of buildProtocolForDefaultQuery(), these tests verify behavior + * through the public execute() method. Integration tests in PointInTimeLeakIT provide end-to-end + * validation of the PIT leak fix. + */ +@RunWith(MockitoJUnitRunner.Silent.class) +public class PrettyFormatRestExecutorPitTest { + + @Mock private Client client; + @Mock private DefaultQueryAction queryAction; + @Mock private SqlRequest sqlRequest; + @Mock private SqlOpenSearchRequestBuilder requestBuilder; + @Mock private SearchRequestBuilder searchRequestBuilder; + @Mock private SearchResponse searchResponse; + @Mock private SearchHit searchHit; + + private PrettyFormatRestExecutor executor; + private Map params; + + @Before + public void setUp() throws Exception { + OpenSearchSettings settings = mock(OpenSearchSettings.class); + LocalClusterState.state().setPluginSettings(settings); + + when(queryAction.getSqlRequest()).thenReturn(sqlRequest); + when(queryAction.explain()).thenReturn(requestBuilder); + when(queryAction.getRequestBuilder()).thenReturn(searchRequestBuilder); + when(searchRequestBuilder.get()).thenReturn(searchResponse); + + SearchHits hits = + new SearchHits( + new SearchHit[] {searchHit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F); + when(searchResponse.getHits()).thenReturn(hits); + + executor = new PrettyFormatRestExecutor("jdbc"); + params = new HashMap<>(); + } + + /** + * Test that verifies PIT is NOT created when fetch_size is 0. + * + *

Expected: Query executes successfully without creating PIT context. + */ + @Test + public void testNoPitCreatedWhenFetchSizeIsZero() throws Exception { + when(sqlRequest.fetchSize()).thenReturn(0); + + executor.execute(client, params, queryAction); + + verify(searchRequestBuilder, never()).setPointInTime(any(PointInTimeBuilder.class)); + verify(searchRequestBuilder, times(1)).get(); + } + + @Test + public void testNoPitCreatedWhenFetchSizeNotSpecified() throws Exception { + when(sqlRequest.fetchSize()).thenReturn(0); + + executor.execute(client, params, queryAction); + + verify(searchRequestBuilder, never()).setPointInTime(any(PointInTimeBuilder.class)); + } + + /** + * Test that verifies behavior when fetch_size > 0 but results fit in one page. + * + *

In this scenario: + * + *

+ * + *

Note: Full PIT creation/deletion verification requires integration testing due to private + * method access. + */ + @Test + public void testFetchSizeSpecifiedButResultsFitInOnePage() throws Exception { + when(sqlRequest.fetchSize()).thenReturn(100); + + SearchHits hits = + new SearchHits( + new SearchHit[] {searchHit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F); + when(searchResponse.getHits()).thenReturn(hits); + + executor.execute(client, params, queryAction); + } + + /** + * Test that verifies behavior when fetch_size > 0 and cursor is needed. + * + *

In this scenario: + * + *

+ * + *

Note: Full PIT lifecycle verification requires integration testing. + */ + @Test + public void testCursorCreatedWhenResultsExceedFetchSize() throws Exception { + when(sqlRequest.fetchSize()).thenReturn(5); + + SearchHits hits = + new SearchHits( + new SearchHit[] {searchHit}, new TotalHits(10, TotalHits.Relation.EQUAL_TO), 1.0F); + when(searchResponse.getHits()).thenReturn(hits); + + executor.execute(client, params, queryAction); + } + + /** + * Test isDefaultCursor logic with various fetch_size values. + * + *

This is the key decision point for determining if cursor (and PIT management) is needed. + */ + @Test + public void testIsDefaultCursorLogic() { + when(sqlRequest.fetchSize()).thenReturn(0); + org.junit.Assert.assertFalse( + "No cursor when fetch_size=0", executor.isDefaultCursor(searchResponse, queryAction)); + + when(sqlRequest.fetchSize()).thenReturn(100); + SearchHits fewHits = + new SearchHits( + new SearchHit[] {searchHit}, new TotalHits(5, TotalHits.Relation.EQUAL_TO), 1.0F); + when(searchResponse.getHits()).thenReturn(fewHits); + org.junit.Assert.assertFalse( + "No cursor when results < fetch_size", + executor.isDefaultCursor(searchResponse, queryAction)); + + when(sqlRequest.fetchSize()).thenReturn(5); + SearchHits manyHits = + new SearchHits( + new SearchHit[] {searchHit}, new TotalHits(10, TotalHits.Relation.EQUAL_TO), 1.0F); + when(searchResponse.getHits()).thenReturn(manyHits); + org.junit.Assert.assertTrue( + "Cursor created when results >= fetch_size", + executor.isDefaultCursor(searchResponse, queryAction)); + } + + /** + * Test that verifies query execution completes successfully in all scenarios. + * + *

This ensures our PIT management changes don't break normal query execution. + */ + @Test + public void testQueryExecutionSucceedsWithVariousFetchSizes() throws Exception { + when(sqlRequest.fetchSize()).thenReturn(0); + String result1 = executor.execute(client, params, queryAction); + org.junit.Assert.assertNotNull("Result should not be null", result1); + + when(sqlRequest.fetchSize()).thenReturn(100); + String result2 = executor.execute(client, params, queryAction); + org.junit.Assert.assertNotNull("Result should not be null", result2); + + when(sqlRequest.fetchSize()).thenReturn(1); + String result3 = executor.execute(client, params, queryAction); + org.junit.Assert.assertNotNull("Result should not be null", result3); + } +} From b95393cb9ee6fc71d9e80ac81d5b53fe95407d39 Mon Sep 17 00:00:00 2001 From: Aaron Alvarez Date: Fri, 2 Jan 2026 11:15:16 -0800 Subject: [PATCH 2/7] Addressing Potential NullPointerException when fetchSize() returns null Signed-off-by: Aaron Alvarez --- .../org/opensearch/sql/legacy/PointInTimeLeakIT.java | 6 ++++++ .../executor/format/PrettyFormatRestExecutor.java | 12 +++++++----- .../format/PrettyFormatRestExecutorPitTest.java | 10 +++++++--- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/PointInTimeLeakIT.java b/integ-test/src/test/java/org/opensearch/sql/legacy/PointInTimeLeakIT.java index 4864a8f6d6c..0e8a6179aad 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/PointInTimeLeakIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/PointInTimeLeakIT.java @@ -147,6 +147,9 @@ public void testPitManagedProperlyWithFetchSize() throws IOException { String.format( "With proper cursor management: Baseline=%d, Final=%d", baselinePitCount, finalPitCount)); + + assertThat( + "PIT should be cleaned up after cursor close", finalPitCount, equalTo(baselinePitCount)); } /** @@ -179,6 +182,9 @@ public void testCompareV1AndV2EnginePitBehavior() throws IOException { assertTrue("V1 should return results", v1Response.has("datarows")); assertTrue("V2 should return results", v2Response.has("datarows")); + + // Both engines should not leak PITs for non-paginated queries + assertThat("V1 Legacy SQL should not leak PITs", v1Leaked, equalTo(0)); } private JSONObject executeQueryWithoutFetchSize(String query) throws IOException { diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java index 05bd8bd1221..9122d8a8ec5 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java @@ -106,8 +106,8 @@ private Protocol buildProtocolForDefaultQuery(Client client, DefaultQueryAction queryAction.explain(); - int fetchSize = queryAction.getSqlRequest().fetchSize(); - if (fetchSize > 0) { + Integer fetchSize = queryAction.getSqlRequest().fetchSize(); + if (fetchSize != null && fetchSize > 0) { pit = new PointInTimeHandlerImpl(client, queryAction.getSelect().getIndexArr()); pit.create(); } @@ -138,6 +138,7 @@ private Protocol buildProtocolForDefaultQuery(Client client, DefaultQueryAction if (pit != null) { try { pit.delete(); + pit = null; // Prevent double deletion in finally } catch (RuntimeException cleanupException) { LOG.error("Failed to delete PIT during exception handling", cleanupException); Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); @@ -160,8 +161,9 @@ private Protocol buildProtocolForDefaultQuery(Client client, DefaultQueryAction } protected boolean isDefaultCursor(SearchResponse searchResponse, DefaultQueryAction queryAction) { - return queryAction.getSqlRequest().fetchSize() > 0 - && Objects.requireNonNull(searchResponse.getHits().getTotalHits()).value() - >= queryAction.getSqlRequest().fetchSize(); + Integer fetchSize = queryAction.getSqlRequest().fetchSize(); + return fetchSize != null + && fetchSize > 0 + && Objects.requireNonNull(searchResponse.getHits().getTotalHits()).value() >= fetchSize; } } diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorPitTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorPitTest.java index 46fa85081d3..86907762afa 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorPitTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorPitTest.java @@ -98,7 +98,7 @@ public void testNoPitCreatedWhenFetchSizeIsZero() throws Exception { @Test public void testNoPitCreatedWhenFetchSizeNotSpecified() throws Exception { - when(sqlRequest.fetchSize()).thenReturn(0); + when(sqlRequest.fetchSize()).thenReturn(null); // Simulates fetch_size not in request executor.execute(client, params, queryAction); @@ -128,7 +128,9 @@ public void testFetchSizeSpecifiedButResultsFitInOnePage() throws Exception { new SearchHit[] {searchHit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F); when(searchResponse.getHits()).thenReturn(hits); - executor.execute(client, params, queryAction); + String result = executor.execute(client, params, queryAction); + + org.junit.Assert.assertNotNull("Query should execute successfully", result); } /** @@ -153,7 +155,9 @@ public void testCursorCreatedWhenResultsExceedFetchSize() throws Exception { new SearchHit[] {searchHit}, new TotalHits(10, TotalHits.Relation.EQUAL_TO), 1.0F); when(searchResponse.getHits()).thenReturn(hits); - executor.execute(client, params, queryAction); + String result = executor.execute(client, params, queryAction); + + org.junit.Assert.assertNotNull("Query should execute successfully", result); } /** From 5656e28c3a9bbd16c553a028ca2484ab5f40f831 Mon Sep 17 00:00:00 2001 From: Aaron Alvarez Date: Mon, 5 Jan 2026 18:43:04 -0800 Subject: [PATCH 3/7] Refactoring the code to separate concerns and make the code more readable Signed-off-by: Aaron Alvarez --- .../sql/legacy/PointInTimeLeakIT.java | 61 +--------- .../format/PrettyFormatRestExecutor.java | 103 +++++++++-------- .../PrettyFormatRestExecutorPitTest.java | 105 ++---------------- .../format/PrettyFormatRestExecutorTest.java | 24 ++-- 4 files changed, 74 insertions(+), 219 deletions(-) diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/PointInTimeLeakIT.java b/integ-test/src/test/java/org/opensearch/sql/legacy/PointInTimeLeakIT.java index 0e8a6179aad..9d4fae0129e 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/PointInTimeLeakIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/PointInTimeLeakIT.java @@ -19,19 +19,7 @@ import org.opensearch.sql.legacy.utils.StringUtils; /** - * Integration test to verify Point-in-Time (PIT) context leak fix in Legacy SQL engine. - * - *

Fix: When queries are executed without fetch_size (non-paginated), the Legacy SQL engine now - * conditionally creates PIT contexts only when pagination is requested (fetch_size > 0) and - * properly cleans them up when not used for cursor-based pagination. - * - *

This test verifies that: - * - *

+ * Integration test verifying PIT contexts are created only when needed and properly cleaned up. * * @see Issue #5002 */ @@ -46,7 +34,6 @@ public void setUpTestIndex() throws IOException { try { executeRequest(new Request("DELETE", "/" + TEST_INDEX)); } catch (ResponseException e) { - // Ignore 404 - index doesn't exist, which is expected in clean state if (e.getResponse().getStatusLine().getStatusCode() != 404) { throw e; } @@ -71,22 +58,9 @@ public void setUpTestIndex() throws IOException { executeRequest(bulkRequest); } - /** - * Test verifying that PIT leak is fixed when executing queries without fetch_size. - * - *

This test: - * - *

- */ @Test public void testNoPitLeakWithoutFetchSize() throws IOException, InterruptedException { int baselinePitCount = getCurrentPitCount(); - System.out.println("Baseline PIT count: " + baselinePitCount); int numQueries = 10; @@ -101,30 +75,14 @@ public void testNoPitLeakWithoutFetchSize() throws IOException, InterruptedExcep JSONArray dataRows = response.getJSONArray("datarows"); assertThat("Should return results", dataRows.length(), greaterThan(0)); assertFalse("Should not have cursor for non-paginated query", response.has("cursor")); - - System.out.println( - String.format( - "[%d/%d] Query executed, returned %d rows", i + 1, numQueries, dataRows.length())); } int currentPitCount = getCurrentPitCount(); int leakedPits = currentPitCount - baselinePitCount; - System.out.println( - String.format( - "After %d queries: Current PIT count = %d, Leaked PITs = %d", - numQueries, currentPitCount, leakedPits)); - assertThat("No PITs should leak after fix", leakedPits, equalTo(0)); - - System.out.println("✓ FIX VERIFIED: No PIT contexts leaked!"); } - /** - * Test showing expected behavior: queries with fetch_size properly manage PITs. - * - *

When fetch_size is specified, PITs are properly managed through cursor lifecycle. - */ @Test public void testPitManagedProperlyWithFetchSize() throws IOException { int baselinePitCount = getCurrentPitCount(); @@ -143,19 +101,10 @@ public void testPitManagedProperlyWithFetchSize() throws IOException { int finalPitCount = getCurrentPitCount(); - System.out.println( - String.format( - "With proper cursor management: Baseline=%d, Final=%d", - baselinePitCount, finalPitCount)); - assertThat( "PIT should be cleaned up after cursor close", finalPitCount, equalTo(baselinePitCount)); } - /** - * Test comparing Legacy SQL vs V2 SQL engine behavior. V2 engine may create PITs internally but - * cleans them up properly. - */ @Test public void testCompareV1AndV2EnginePitBehavior() throws IOException { int baselinePitCount = getCurrentPitCount(); @@ -168,23 +117,19 @@ public void testCompareV1AndV2EnginePitBehavior() throws IOException { int afterV1PitCount = getCurrentPitCount(); int v1Leaked = afterV1PitCount - baselinePitCount; - System.out.println(String.format("V1 Legacy SQL: PITs leaked = %d", v1Leaked)); - String v2Query = StringUtils.format( "SELECT * FROM `%s` WHERE action LIKE 'login%%' ORDER BY timestamp ASC", TEST_INDEX); JSONObject v2Response = executeQueryWithoutFetchSize(v2Query); int afterV2PitCount = getCurrentPitCount(); - - System.out.println( - String.format("After V1 and V2 queries: Total PIT count = %d", afterV2PitCount)); + int v2Leaked = afterV2PitCount - afterV1PitCount; assertTrue("V1 should return results", v1Response.has("datarows")); assertTrue("V2 should return results", v2Response.has("datarows")); - // Both engines should not leak PITs for non-paginated queries assertThat("V1 Legacy SQL should not leak PITs", v1Leaked, equalTo(0)); + assertThat("V2 SQL should not leak PITs", v2Leaked, equalTo(0)); } private JSONObject executeQueryWithoutFetchSize(String query) throws IOException { diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java index 9122d8a8ec5..fd78b82f007 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java @@ -40,7 +40,6 @@ public PrettyFormatRestExecutor(String format) { this.format = format.toLowerCase(); } - /** Execute the QueryAction and return the REST response using the channel. */ @Override public void execute( Client client, Map params, QueryAction queryAction, RestChannel channel) { @@ -89,79 +88,77 @@ public String execute(Client client, Map params, QueryAction que } /** - * QueryActionElasticExecutor.executeAnyAction() returns SearchHits inside SearchResponse. In - * order to get scroll ID if any, we need to execute DefaultQueryAction ourselves for - * SearchResponse. + * Builds protocol for default query execution. * - *

This method conditionally creates PIT only when pagination is requested (fetch_size > 0) and - * ensures proper cleanup of PIT resources when they are not used for cursor-based pagination. + *

Routes to pagination or non-pagination execution based on fetch_size parameter. */ private Protocol buildProtocolForDefaultQuery(Client client, DefaultQueryAction queryAction) throws SqlParseException { - PointInTimeHandler pit = null; - SearchResponse response; - Protocol protocol; - boolean cursorCreated = false; - queryAction.explain(); Integer fetchSize = queryAction.getSqlRequest().fetchSize(); if (fetchSize != null && fetchSize > 0) { - pit = new PointInTimeHandlerImpl(client, queryAction.getSelect().getIndexArr()); - pit.create(); + return buildProtocolWithPagination(client, queryAction, fetchSize); + } else { + return buildProtocolWithoutPagination(client, queryAction); } + } + + /** Executes query with pagination support using Point-in-Time (PIT). */ + private Protocol buildProtocolWithPagination( + Client client, DefaultQueryAction queryAction, Integer fetchSize) { + + PointInTimeHandler pit = + new PointInTimeHandlerImpl(client, queryAction.getSelect().getIndexArr()); + pit.create(); try { SearchRequestBuilder searchRequest = queryAction.getRequestBuilder(); - if (pit != null) { - searchRequest.setPointInTime(new PointInTimeBuilder(pit.getPitId())); - } - response = searchRequest.get(); - - if (pit != null && isDefaultCursor(response, queryAction)) { - DefaultCursor defaultCursor = new DefaultCursor(); - defaultCursor.setLimit(queryAction.getSelect().getRowCount()); - defaultCursor.setFetchSize(fetchSize); - defaultCursor.setPitId(pit.getPitId()); - defaultCursor.setSearchSourceBuilder(queryAction.getRequestBuilder().request().source()); - defaultCursor.setSortFields( - response.getHits().getAt(response.getHits().getHits().length - 1).getSortValues()); - - protocol = new Protocol(client, queryAction, response.getHits(), format, defaultCursor); - cursorCreated = true; + searchRequest.setPointInTime(new PointInTimeBuilder(pit.getPitId())); + SearchResponse response = searchRequest.get(); + + if (shouldCreateCursor(response, queryAction, fetchSize)) { + DefaultCursor cursor = createCursorWithPit(pit, response, queryAction, fetchSize); + return new Protocol(client, queryAction, response.getHits(), format, cursor); } else { - protocol = - new Protocol(client, queryAction, response.getHits(), format, Cursor.NULL_CURSOR); + pit.delete(); + return new Protocol(client, queryAction, response.getHits(), format, Cursor.NULL_CURSOR); } - } catch (Exception e) { - if (pit != null) { - try { - pit.delete(); - pit = null; // Prevent double deletion in finally - } catch (RuntimeException cleanupException) { - LOG.error("Failed to delete PIT during exception handling", cleanupException); - Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); - } + } catch (RuntimeException e) { + try { + pit.delete(); + } catch (RuntimeException deleteException) { + LOG.error("Failed to delete PIT", deleteException); + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); } throw e; - } finally { - // Cursor owns PIT lifecycle when created; otherwise clean up here - if (pit != null && !cursorCreated) { - try { - pit.delete(); - } catch (RuntimeException e) { - LOG.error("Failed to delete PIT in finally block", e); - Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); - } - } } + } - return protocol; + private Protocol buildProtocolWithoutPagination(Client client, DefaultQueryAction queryAction) { + SearchRequestBuilder searchRequest = queryAction.getRequestBuilder(); + SearchResponse response = searchRequest.get(); + return new Protocol(client, queryAction, response.getHits(), format, Cursor.NULL_CURSOR); } - protected boolean isDefaultCursor(SearchResponse searchResponse, DefaultQueryAction queryAction) { - Integer fetchSize = queryAction.getSqlRequest().fetchSize(); + private DefaultCursor createCursorWithPit( + PointInTimeHandler pit, + SearchResponse response, + DefaultQueryAction queryAction, + Integer fetchSize) { + DefaultCursor cursor = new DefaultCursor(); + cursor.setLimit(queryAction.getSelect().getRowCount()); + cursor.setFetchSize(fetchSize); + cursor.setPitId(pit.getPitId()); + cursor.setSearchSourceBuilder(queryAction.getRequestBuilder().request().source()); + cursor.setSortFields( + response.getHits().getAt(response.getHits().getHits().length - 1).getSortValues()); + return cursor; + } + + protected boolean shouldCreateCursor( + SearchResponse searchResponse, DefaultQueryAction queryAction, Integer fetchSize) { return fetchSize != null && fetchSize > 0 && Objects.requireNonNull(searchResponse.getHits().getTotalHits()).value() >= fetchSize; diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorPitTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorPitTest.java index 86907762afa..ac9178ec5d5 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorPitTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorPitTest.java @@ -5,6 +5,7 @@ package org.opensearch.sql.legacy.executor.format; +import static org.junit.Assert.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -32,22 +33,7 @@ import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.transport.client.Client; -/** - * Unit tests for PIT lifecycle management in PrettyFormatRestExecutor. - * - *

These tests verify that: - * - *

- * - *

Note: Due to the private nature of buildProtocolForDefaultQuery(), these tests verify behavior - * through the public execute() method. Integration tests in PointInTimeLeakIT provide end-to-end - * validation of the PIT leak fix. - */ +/** Unit tests for PIT lifecycle management in PrettyFormatRestExecutor. */ @RunWith(MockitoJUnitRunner.Silent.class) public class PrettyFormatRestExecutorPitTest { @@ -81,11 +67,6 @@ public void setUp() throws Exception { params = new HashMap<>(); } - /** - * Test that verifies PIT is NOT created when fetch_size is 0. - * - *

Expected: Query executes successfully without creating PIT context. - */ @Test public void testNoPitCreatedWhenFetchSizeIsZero() throws Exception { when(sqlRequest.fetchSize()).thenReturn(0); @@ -105,20 +86,6 @@ public void testNoPitCreatedWhenFetchSizeNotSpecified() throws Exception { verify(searchRequestBuilder, never()).setPointInTime(any(PointInTimeBuilder.class)); } - /** - * Test that verifies behavior when fetch_size > 0 but results fit in one page. - * - *

In this scenario: - * - *

- * - *

Note: Full PIT creation/deletion verification requires integration testing due to private - * method access. - */ @Test public void testFetchSizeSpecifiedButResultsFitInOnePage() throws Exception { when(sqlRequest.fetchSize()).thenReturn(100); @@ -130,22 +97,9 @@ public void testFetchSizeSpecifiedButResultsFitInOnePage() throws Exception { String result = executor.execute(client, params, queryAction); - org.junit.Assert.assertNotNull("Query should execute successfully", result); + assertNotNull("Query should execute successfully", result); } - /** - * Test that verifies behavior when fetch_size > 0 and cursor is needed. - * - *

In this scenario: - * - *

- * - *

Note: Full PIT lifecycle verification requires integration testing. - */ @Test public void testCursorCreatedWhenResultsExceedFetchSize() throws Exception { when(sqlRequest.fetchSize()).thenReturn(5); @@ -157,56 +111,17 @@ public void testCursorCreatedWhenResultsExceedFetchSize() throws Exception { String result = executor.execute(client, params, queryAction); - org.junit.Assert.assertNotNull("Query should execute successfully", result); + assertNotNull("Query should execute successfully", result); } - /** - * Test isDefaultCursor logic with various fetch_size values. - * - *

This is the key decision point for determining if cursor (and PIT management) is needed. - */ - @Test - public void testIsDefaultCursorLogic() { - when(sqlRequest.fetchSize()).thenReturn(0); - org.junit.Assert.assertFalse( - "No cursor when fetch_size=0", executor.isDefaultCursor(searchResponse, queryAction)); - - when(sqlRequest.fetchSize()).thenReturn(100); - SearchHits fewHits = - new SearchHits( - new SearchHit[] {searchHit}, new TotalHits(5, TotalHits.Relation.EQUAL_TO), 1.0F); - when(searchResponse.getHits()).thenReturn(fewHits); - org.junit.Assert.assertFalse( - "No cursor when results < fetch_size", - executor.isDefaultCursor(searchResponse, queryAction)); - - when(sqlRequest.fetchSize()).thenReturn(5); - SearchHits manyHits = - new SearchHits( - new SearchHit[] {searchHit}, new TotalHits(10, TotalHits.Relation.EQUAL_TO), 1.0F); - when(searchResponse.getHits()).thenReturn(manyHits); - org.junit.Assert.assertTrue( - "Cursor created when results >= fetch_size", - executor.isDefaultCursor(searchResponse, queryAction)); - } - - /** - * Test that verifies query execution completes successfully in all scenarios. - * - *

This ensures our PIT management changes don't break normal query execution. - */ @Test public void testQueryExecutionSucceedsWithVariousFetchSizes() throws Exception { - when(sqlRequest.fetchSize()).thenReturn(0); - String result1 = executor.execute(client, params, queryAction); - org.junit.Assert.assertNotNull("Result should not be null", result1); - - when(sqlRequest.fetchSize()).thenReturn(100); - String result2 = executor.execute(client, params, queryAction); - org.junit.Assert.assertNotNull("Result should not be null", result2); + int[] fetchSizes = {0, 100, 1}; - when(sqlRequest.fetchSize()).thenReturn(1); - String result3 = executor.execute(client, params, queryAction); - org.junit.Assert.assertNotNull("Result should not be null", result3); + for (int fetchSize : fetchSizes) { + when(sqlRequest.fetchSize()).thenReturn(fetchSize); + String result = executor.execute(client, params, queryAction); + assertNotNull("Result should not be null for fetchSize=" + fetchSize, result); + } } } diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorTest.java index de51bd448e6..d51aa220c9c 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorTest.java @@ -21,52 +21,50 @@ import org.opensearch.search.SearchHits; import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.query.DefaultQueryAction; -import org.opensearch.sql.legacy.request.SqlRequest; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; +/** + * Unit tests for shouldCreateCursor() method. + * + *

For PIT lifecycle tests, see {@link PrettyFormatRestExecutorPitTest}. + */ @RunWith(MockitoJUnitRunner.class) public class PrettyFormatRestExecutorTest { @Mock private SearchResponse searchResponse; @Mock private SearchHit searchHit; @Mock private DefaultQueryAction queryAction; - @Mock private SqlRequest sqlRequest; private PrettyFormatRestExecutor executor; @Before public void setUp() { OpenSearchSettings settings = mock(OpenSearchSettings.class); LocalClusterState.state().setPluginSettings(settings); - when(queryAction.getSqlRequest()).thenReturn(sqlRequest); executor = new PrettyFormatRestExecutor("jdbc"); } @Test - public void testIsDefaultCursor_fetchSizeZero() { - when(sqlRequest.fetchSize()).thenReturn(0); - - assertFalse(executor.isDefaultCursor(searchResponse, queryAction)); + public void testShouldCreateCursor_fetchSizeZero() { + assertFalse(executor.shouldCreateCursor(searchResponse, queryAction, 0)); } @Test - public void testIsDefaultCursor_totalHitsLessThanFetchSize() { - when(sqlRequest.fetchSize()).thenReturn(10); + public void testShouldCreateCursor_totalHitsLessThanFetchSize() { when(searchResponse.getHits()) .thenReturn( new SearchHits( new SearchHit[] {searchHit}, new TotalHits(5, TotalHits.Relation.EQUAL_TO), 1.0F)); - assertFalse(executor.isDefaultCursor(searchResponse, queryAction)); + assertFalse(executor.shouldCreateCursor(searchResponse, queryAction, 10)); } @Test - public void testIsDefaultCursor_totalHitsGreaterThanOrEqualToFetchSize() { - when(sqlRequest.fetchSize()).thenReturn(5); + public void testShouldCreateCursor_totalHitsGreaterThanOrEqualToFetchSize() { when(searchResponse.getHits()) .thenReturn( new SearchHits( new SearchHit[] {searchHit}, new TotalHits(5, TotalHits.Relation.EQUAL_TO), 1.0F)); - assertTrue(executor.isDefaultCursor(searchResponse, queryAction)); + assertTrue(executor.shouldCreateCursor(searchResponse, queryAction, 5)); } } From a101480553a83bb5e435f32a2e18b8ab597dca86 Mon Sep 17 00:00:00 2001 From: Aaron Alvarez Date: Tue, 6 Jan 2026 12:46:52 -0800 Subject: [PATCH 4/7] Improving error and exception handling Signed-off-by: Aaron Alvarez --- .../sql/legacy/PointInTimeLeakIT.java | 23 +++++++-------- .../format/PrettyFormatRestExecutor.java | 29 +++++++++++-------- 2 files changed, 27 insertions(+), 25 deletions(-) diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/PointInTimeLeakIT.java b/integ-test/src/test/java/org/opensearch/sql/legacy/PointInTimeLeakIT.java index 9d4fae0129e..ab0f196ce33 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/PointInTimeLeakIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/PointInTimeLeakIT.java @@ -154,20 +154,17 @@ private int getCurrentPitCount() throws IOException { Response response = client().performRequest(statsRequest); JSONObject stats = new JSONObject(TestUtils.getResponseBody(response)); + if (!stats.has("nodes")) { + return 0; + } + int totalPits = 0; - if (stats.has("nodes")) { - JSONObject nodes = stats.getJSONObject("nodes"); - for (String nodeId : nodes.keySet()) { - JSONObject node = nodes.getJSONObject(nodeId); - if (node.has("indices")) { - JSONObject indices = node.getJSONObject("indices"); - if (indices.has("search")) { - JSONObject search = indices.getJSONObject("search"); - if (search.has("point_in_time_current")) { - totalPits += search.getInt("point_in_time_current"); - } - } - } + JSONObject nodes = stats.getJSONObject("nodes"); + for (String nodeId : nodes.keySet()) { + Object pitValue = + stats.optQuery("/nodes/" + nodeId + "/indices/search/point_in_time_current"); + if (pitValue instanceof Number) { + totalPits += ((Number) pitValue).intValue(); } } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java index fd78b82f007..8ef4b1396a0 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java @@ -5,6 +5,7 @@ package org.opensearch.sql.legacy.executor.format; +import java.util.Locale; import java.util.Map; import java.util.Objects; import org.apache.logging.log4j.LogManager; @@ -37,9 +38,10 @@ public class PrettyFormatRestExecutor implements RestExecutor { private final String format; public PrettyFormatRestExecutor(String format) { - this.format = format.toLowerCase(); + this.format = Objects.requireNonNull(format, "Format cannot be null").toLowerCase(Locale.ROOT); } + /** Execute the QueryAction and return the REST response using the channel. */ @Override public void execute( Client client, Map params, QueryAction queryAction, RestChannel channel) { @@ -72,15 +74,14 @@ public String execute(Client client, Map params, QueryAction que Object queryResult = QueryActionElasticExecutor.executeAnyAction(client, queryAction); protocol = new Protocol(client, queryAction, queryResult, format, Cursor.NULL_CURSOR); } + } catch (SqlParseException e) { + LOG.warn("SQL parsing error: {}", e.getMessage(), e); + protocol = new Protocol(e); + } catch (OpenSearchException e) { + LOG.warn("An error occurred in OpenSearch engine: {}", e.getDetailedMessage(), e); + protocol = new Protocol(e); } catch (Exception e) { - if (e instanceof OpenSearchException) { - LOG.warn( - "An error occurred in OpenSearch engine: " - + ((OpenSearchException) e).getDetailedMessage(), - e); - } else { - LOG.warn("Error happened in pretty formatter", e); - } + LOG.warn("Error happened in pretty formatter", e); protocol = new Protocol(e); } @@ -152,15 +153,19 @@ private DefaultCursor createCursorWithPit( cursor.setFetchSize(fetchSize); cursor.setPitId(pit.getPitId()); cursor.setSearchSourceBuilder(queryAction.getRequestBuilder().request().source()); - cursor.setSortFields( - response.getHits().getAt(response.getHits().getHits().length - 1).getSortValues()); + + if (response.getHits().getHits().length > 0) { + cursor.setSortFields( + response.getHits().getAt(response.getHits().getHits().length - 1).getSortValues()); + } + return cursor; } protected boolean shouldCreateCursor( SearchResponse searchResponse, DefaultQueryAction queryAction, Integer fetchSize) { return fetchSize != null - && fetchSize > 0 + && searchResponse.getHits() != null && Objects.requireNonNull(searchResponse.getHits().getTotalHits()).value() >= fetchSize; } } From b3ea403b460eb53551711cabe1a8c30a5da36bee Mon Sep 17 00:00:00 2001 From: Aaron Alvarez Date: Tue, 6 Jan 2026 13:11:58 -0800 Subject: [PATCH 5/7] trigger CI Signed-off-by: Aaron Alvarez From 84a24335170fbed7f71bd6b95308167f9956e76b Mon Sep 17 00:00:00 2001 From: Aaron Alvarez Date: Tue, 6 Jan 2026 14:15:20 -0800 Subject: [PATCH 6/7] trigger CI Signed-off-by: Aaron Alvarez From 1d5296ceff45cb62513c6d6493f66bb86d9c68f8 Mon Sep 17 00:00:00 2001 From: Aaron Alvarez Date: Tue, 6 Jan 2026 17:27:51 -0800 Subject: [PATCH 7/7] trigger CI Signed-off-by: Aaron Alvarez