Skip to content

Commit 5bf322f

Browse files
Fix PIT context leak in Legacy SQL for non-paginated queries (opensearch-project#5009)
Co-authored-by: Aaron Alvarez <aaarone@amazon.com>
1 parent 1bddb92 commit 5bf322f

4 files changed

Lines changed: 387 additions & 50 deletions

File tree

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.legacy;
7+
8+
import static org.hamcrest.Matchers.equalTo;
9+
import static org.hamcrest.Matchers.greaterThan;
10+
11+
import java.io.IOException;
12+
import org.json.JSONArray;
13+
import org.json.JSONObject;
14+
import org.junit.Before;
15+
import org.junit.Test;
16+
import org.opensearch.client.Request;
17+
import org.opensearch.client.Response;
18+
import org.opensearch.client.ResponseException;
19+
import org.opensearch.sql.legacy.utils.StringUtils;
20+
21+
/**
22+
* Integration test verifying PIT contexts are created only when needed and properly cleaned up.
23+
*
24+
* @see <a href="https://github.com/opensearch-project/sql/issues/5002">Issue #5002</a>
25+
*/
26+
public class PointInTimeLeakIT extends SQLIntegTestCase {
27+
28+
private static final String TEST_INDEX = "test-logs-2025.01.01";
29+
private static final String PIT_STATS_ENDPOINT =
30+
"/_nodes/stats/indices/search?filter_path=nodes.*.indices.search.point_in_time_current";
31+
32+
@Before
33+
public void setUpTestIndex() throws IOException {
34+
try {
35+
executeRequest(new Request("DELETE", "/" + TEST_INDEX));
36+
} catch (ResponseException e) {
37+
if (e.getResponse().getStatusLine().getStatusCode() != 404) {
38+
throw e;
39+
}
40+
}
41+
42+
Request createIndex = new Request("PUT", "/" + TEST_INDEX);
43+
createIndex.setJsonEntity(
44+
"{ \"mappings\": { \"properties\": { \"action\": {\"type\": \"text\", \"fields\":"
45+
+ " {\"keyword\": {\"type\": \"keyword\"}}}, \"timestamp\": {\"type\": \"date\"} "
46+
+ " } }}");
47+
executeRequest(createIndex);
48+
49+
Request bulkRequest = new Request("POST", "/" + TEST_INDEX + "/_bulk");
50+
bulkRequest.addParameter("refresh", "true");
51+
bulkRequest.setJsonEntity(
52+
"{\"index\":{}}\n"
53+
+ "{\"action\":\"login_success\",\"timestamp\":\"2025-01-01T10:00:00Z\"}\n"
54+
+ "{\"index\":{}}\n"
55+
+ "{\"action\":\"login_success\",\"timestamp\":\"2025-01-01T10:01:00Z\"}\n"
56+
+ "{\"index\":{}}\n"
57+
+ "{\"action\":\"login_failed\",\"timestamp\":\"2025-01-01T10:02:00Z\"}\n");
58+
executeRequest(bulkRequest);
59+
}
60+
61+
@Test
62+
public void testNoPitLeakWithoutFetchSize() throws IOException, InterruptedException {
63+
int baselinePitCount = getCurrentPitCount();
64+
65+
int numQueries = 10;
66+
67+
for (int i = 0; i < numQueries; i++) {
68+
String query =
69+
StringUtils.format(
70+
"SELECT * FROM %s WHERE action LIKE 'login%%' ORDER BY timestamp ASC", TEST_INDEX);
71+
72+
JSONObject response = executeQueryWithoutFetchSize(query);
73+
74+
assertTrue("Query should succeed", response.has("datarows"));
75+
JSONArray dataRows = response.getJSONArray("datarows");
76+
assertThat("Should return results", dataRows.length(), greaterThan(0));
77+
assertFalse("Should not have cursor for non-paginated query", response.has("cursor"));
78+
}
79+
80+
int currentPitCount = getCurrentPitCount();
81+
int leakedPits = currentPitCount - baselinePitCount;
82+
83+
assertThat("No PITs should leak after fix", leakedPits, equalTo(0));
84+
}
85+
86+
@Test
87+
public void testPitManagedProperlyWithFetchSize() throws IOException {
88+
int baselinePitCount = getCurrentPitCount();
89+
90+
String query =
91+
StringUtils.format(
92+
"SELECT * FROM %s WHERE action LIKE 'login%%' ORDER BY timestamp ASC", TEST_INDEX);
93+
94+
JSONObject response = executeQueryWithFetchSize(query, 2);
95+
96+
assertTrue("Should have cursor with fetch_size", response.has("cursor"));
97+
String cursor = response.getString("cursor");
98+
99+
JSONObject closeResponse = executeCursorCloseQuery(cursor);
100+
assertTrue("Cursor close should succeed", closeResponse.getBoolean("succeeded"));
101+
102+
int finalPitCount = getCurrentPitCount();
103+
104+
assertThat(
105+
"PIT should be cleaned up after cursor close", finalPitCount, equalTo(baselinePitCount));
106+
}
107+
108+
@Test
109+
public void testCompareV1AndV2EnginePitBehavior() throws IOException {
110+
int baselinePitCount = getCurrentPitCount();
111+
112+
String v1Query =
113+
StringUtils.format(
114+
"SELECT * FROM %s WHERE action LIKE 'login%%' ORDER BY timestamp ASC", TEST_INDEX);
115+
116+
JSONObject v1Response = executeQueryWithoutFetchSize(v1Query);
117+
int afterV1PitCount = getCurrentPitCount();
118+
int v1Leaked = afterV1PitCount - baselinePitCount;
119+
120+
String v2Query =
121+
StringUtils.format(
122+
"SELECT * FROM `%s` WHERE action LIKE 'login%%' ORDER BY timestamp ASC", TEST_INDEX);
123+
124+
JSONObject v2Response = executeQueryWithoutFetchSize(v2Query);
125+
int afterV2PitCount = getCurrentPitCount();
126+
int v2Leaked = afterV2PitCount - afterV1PitCount;
127+
128+
assertTrue("V1 should return results", v1Response.has("datarows"));
129+
assertTrue("V2 should return results", v2Response.has("datarows"));
130+
131+
assertThat("V1 Legacy SQL should not leak PITs", v1Leaked, equalTo(0));
132+
assertThat("V2 SQL should not leak PITs", v2Leaked, equalTo(0));
133+
}
134+
135+
private JSONObject executeQueryWithoutFetchSize(String query) throws IOException {
136+
Request sqlRequest = new Request("POST", "/_plugins/_sql?format=jdbc");
137+
sqlRequest.setJsonEntity(String.format("{\"query\": \"%s\"}", query));
138+
139+
Response response = client().performRequest(sqlRequest);
140+
return new JSONObject(TestUtils.getResponseBody(response));
141+
}
142+
143+
private JSONObject executeQueryWithFetchSize(String query, int fetchSize) throws IOException {
144+
Request sqlRequest = new Request("POST", "/_plugins/_sql?format=jdbc");
145+
sqlRequest.setJsonEntity(
146+
String.format("{\"query\": \"%s\", \"fetch_size\": %d}", query, fetchSize));
147+
148+
Response response = client().performRequest(sqlRequest);
149+
return new JSONObject(TestUtils.getResponseBody(response));
150+
}
151+
152+
private int getCurrentPitCount() throws IOException {
153+
Request statsRequest = new Request("GET", PIT_STATS_ENDPOINT);
154+
Response response = client().performRequest(statsRequest);
155+
JSONObject stats = new JSONObject(TestUtils.getResponseBody(response));
156+
157+
if (!stats.has("nodes")) {
158+
return 0;
159+
}
160+
161+
int totalPits = 0;
162+
JSONObject nodes = stats.getJSONObject("nodes");
163+
for (String nodeId : nodes.keySet()) {
164+
Object pitValue =
165+
stats.optQuery("/nodes/" + nodeId + "/indices/search/point_in_time_current");
166+
if (pitValue instanceof Number) {
167+
totalPits += ((Number) pitValue).intValue();
168+
}
169+
}
170+
171+
return totalPits;
172+
}
173+
}

legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java

Lines changed: 76 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.sql.legacy.executor.format;
77

8+
import java.util.Locale;
89
import java.util.Map;
910
import java.util.Objects;
1011
import org.apache.logging.log4j.LogManager;
@@ -21,11 +22,12 @@
2122
import org.opensearch.sql.legacy.exception.SqlParseException;
2223
import org.opensearch.sql.legacy.executor.QueryActionElasticExecutor;
2324
import org.opensearch.sql.legacy.executor.RestExecutor;
25+
import org.opensearch.sql.legacy.metrics.MetricName;
26+
import org.opensearch.sql.legacy.metrics.Metrics;
2427
import org.opensearch.sql.legacy.pit.PointInTimeHandler;
2528
import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl;
2629
import org.opensearch.sql.legacy.query.DefaultQueryAction;
2730
import org.opensearch.sql.legacy.query.QueryAction;
28-
import org.opensearch.sql.legacy.query.SqlOpenSearchRequestBuilder;
2931
import org.opensearch.sql.legacy.query.join.BackOffRetryStrategy;
3032
import org.opensearch.transport.client.Client;
3133

@@ -36,7 +38,7 @@ public class PrettyFormatRestExecutor implements RestExecutor {
3638
private final String format;
3739

3840
public PrettyFormatRestExecutor(String format) {
39-
this.format = format.toLowerCase();
41+
this.format = Objects.requireNonNull(format, "Format cannot be null").toLowerCase(Locale.ROOT);
4042
}
4143

4244
/** Execute the QueryAction and return the REST response using the channel. */
@@ -72,61 +74,98 @@ public String execute(Client client, Map<String, String> params, QueryAction que
7274
Object queryResult = QueryActionElasticExecutor.executeAnyAction(client, queryAction);
7375
protocol = new Protocol(client, queryAction, queryResult, format, Cursor.NULL_CURSOR);
7476
}
77+
} catch (SqlParseException e) {
78+
LOG.warn("SQL parsing error: {}", e.getMessage(), e);
79+
protocol = new Protocol(e);
80+
} catch (OpenSearchException e) {
81+
LOG.warn("An error occurred in OpenSearch engine: {}", e.getDetailedMessage(), e);
82+
protocol = new Protocol(e);
7583
} catch (Exception e) {
76-
if (e instanceof OpenSearchException) {
77-
LOG.warn(
78-
"An error occurred in OpenSearch engine: "
79-
+ ((OpenSearchException) e).getDetailedMessage(),
80-
e);
81-
} else {
82-
LOG.warn("Error happened in pretty formatter", e);
83-
}
84+
LOG.warn("Error happened in pretty formatter", e);
8485
protocol = new Protocol(e);
8586
}
8687

8788
return protocol.format();
8889
}
8990

9091
/**
91-
* QueryActionElasticExecutor.executeAnyAction() returns SearchHits inside SearchResponse. In
92-
* order to get scroll ID if any, we need to execute DefaultQueryAction ourselves for
93-
* SearchResponse.
92+
* Builds protocol for default query execution.
93+
*
94+
* <p>Routes to pagination or non-pagination execution based on fetch_size parameter.
9495
*/
9596
private Protocol buildProtocolForDefaultQuery(Client client, DefaultQueryAction queryAction)
9697
throws SqlParseException {
9798

98-
PointInTimeHandler pit = null;
99-
SearchResponse response;
100-
SqlOpenSearchRequestBuilder sqlOpenSearchRequestBuilder = queryAction.explain();
99+
queryAction.explain();
101100

102-
pit = new PointInTimeHandlerImpl(client, queryAction.getSelect().getIndexArr());
101+
Integer fetchSize = queryAction.getSqlRequest().fetchSize();
102+
if (fetchSize != null && fetchSize > 0) {
103+
return buildProtocolWithPagination(client, queryAction, fetchSize);
104+
} else {
105+
return buildProtocolWithoutPagination(client, queryAction);
106+
}
107+
}
108+
109+
/** Executes query with pagination support using Point-in-Time (PIT). */
110+
private Protocol buildProtocolWithPagination(
111+
Client client, DefaultQueryAction queryAction, Integer fetchSize) {
112+
113+
PointInTimeHandler pit =
114+
new PointInTimeHandlerImpl(client, queryAction.getSelect().getIndexArr());
103115
pit.create();
116+
117+
try {
118+
SearchRequestBuilder searchRequest = queryAction.getRequestBuilder();
119+
searchRequest.setPointInTime(new PointInTimeBuilder(pit.getPitId()));
120+
SearchResponse response = searchRequest.get();
121+
122+
if (shouldCreateCursor(response, queryAction, fetchSize)) {
123+
DefaultCursor cursor = createCursorWithPit(pit, response, queryAction, fetchSize);
124+
return new Protocol(client, queryAction, response.getHits(), format, cursor);
125+
} else {
126+
pit.delete();
127+
return new Protocol(client, queryAction, response.getHits(), format, Cursor.NULL_CURSOR);
128+
}
129+
} catch (RuntimeException e) {
130+
try {
131+
pit.delete();
132+
} catch (RuntimeException deleteException) {
133+
LOG.error("Failed to delete PIT", deleteException);
134+
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
135+
}
136+
throw e;
137+
}
138+
}
139+
140+
private Protocol buildProtocolWithoutPagination(Client client, DefaultQueryAction queryAction) {
104141
SearchRequestBuilder searchRequest = queryAction.getRequestBuilder();
105-
searchRequest.setPointInTime(new PointInTimeBuilder(pit.getPitId()));
106-
response = searchRequest.get();
142+
SearchResponse response = searchRequest.get();
143+
return new Protocol(client, queryAction, response.getHits(), format, Cursor.NULL_CURSOR);
144+
}
107145

108-
Protocol protocol;
109-
if (isDefaultCursor(response, queryAction)) {
110-
DefaultCursor defaultCursor = new DefaultCursor();
111-
defaultCursor.setLimit(queryAction.getSelect().getRowCount());
112-
defaultCursor.setFetchSize(queryAction.getSqlRequest().fetchSize());
113-
114-
defaultCursor.setPitId(pit.getPitId());
115-
defaultCursor.setSearchSourceBuilder(queryAction.getRequestBuilder().request().source());
116-
defaultCursor.setSortFields(
146+
private DefaultCursor createCursorWithPit(
147+
PointInTimeHandler pit,
148+
SearchResponse response,
149+
DefaultQueryAction queryAction,
150+
Integer fetchSize) {
151+
DefaultCursor cursor = new DefaultCursor();
152+
cursor.setLimit(queryAction.getSelect().getRowCount());
153+
cursor.setFetchSize(fetchSize);
154+
cursor.setPitId(pit.getPitId());
155+
cursor.setSearchSourceBuilder(queryAction.getRequestBuilder().request().source());
156+
157+
if (response.getHits().getHits().length > 0) {
158+
cursor.setSortFields(
117159
response.getHits().getAt(response.getHits().getHits().length - 1).getSortValues());
118-
119-
protocol = new Protocol(client, queryAction, response.getHits(), format, defaultCursor);
120-
} else {
121-
protocol = new Protocol(client, queryAction, response.getHits(), format, Cursor.NULL_CURSOR);
122160
}
123161

124-
return protocol;
162+
return cursor;
125163
}
126164

127-
protected boolean isDefaultCursor(SearchResponse searchResponse, DefaultQueryAction queryAction) {
128-
return queryAction.getSqlRequest().fetchSize() != 0
129-
&& Objects.requireNonNull(searchResponse.getHits().getTotalHits()).value()
130-
>= queryAction.getSqlRequest().fetchSize();
165+
protected boolean shouldCreateCursor(
166+
SearchResponse searchResponse, DefaultQueryAction queryAction, Integer fetchSize) {
167+
return fetchSize != null
168+
&& searchResponse.getHits() != null
169+
&& Objects.requireNonNull(searchResponse.getHits().getTotalHits()).value() >= fetchSize;
131170
}
132171
}

0 commit comments

Comments
 (0)