Skip to content

Commit d7f01ee

Browse files
opensearch-trigger-bot[bot]github-actions[bot]LantaoJin
authored
[Backport 2.19-dev] Fix PIT (Point in Time) resource leaks in v2 query engine (#5229)
* Fix PIT (Point in Time) resource leaks in v2 query engine (#5221) * Fix PIT (Point in Time) resource leaks in v2 query engine Signed-off-by: Lantao Jin <ltjin@amazon.com> * Fix PIT context leaks when a query without cursor/pagination fails during execution Signed-off-by: Lantao Jin <ltjin@amazon.com> * Avoid to expose sensitive document Signed-off-by: Lantao Jin <ltjin@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com> (cherry picked from commit e11e11e) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> * Resolve compile error Signed-off-by: Lantao Jin <ltjin@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Lantao Jin <ltjin@amazon.com>
1 parent 8f858fd commit d7f01ee

11 files changed

Lines changed: 155 additions & 23 deletions

File tree

async-query-core/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ plugins {
99
id 'jacoco'
1010
id 'antlr'
1111
id 'com.diffplug.spotless' version '6.22.0'
12-
id 'com.github.johnrengelman.shadow'
12+
id 'com.gradleup.shadow'
1313
}
1414

1515
repositories {

build.gradle

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,12 +139,13 @@ allprojects {
139139
resolutionStrategy.force "org.apache.httpcomponents.client5:httpclient5:5.4.4"
140140
resolutionStrategy.force "org.apache.httpcomponents.core5:httpcore5:5.3.4"
141141
resolutionStrategy.force "org.apache.httpcomponents.core5:httpcore5-h2:5.3.4"
142-
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-annotations:2.18.2"
143-
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:2.18.2"
144-
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-databind:2.18.2"
145-
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.18.2"
146-
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.18.2"
147-
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.18.2"
142+
resolutionStrategy.force "com.fasterxml.jackson:jackson-bom:2.18.6"
143+
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-annotations:2.18.6"
144+
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:2.18.6"
145+
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-databind:2.18.6"
146+
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.18.6"
147+
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.18.6"
148+
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.18.6"
148149
resolutionStrategy.force 'com.google.protobuf:protobuf-java:3.25.5'
149150
resolutionStrategy.force 'org.locationtech.jts:jts-core:1.19.0'
150151
resolutionStrategy.force 'com.google.errorprone:error_prone_annotations:2.28.0'

core/src/main/java/org/opensearch/sql/planner/physical/CursorCloseOperator.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import lombok.RequiredArgsConstructor;
1010
import org.opensearch.sql.data.model.ExprValue;
1111
import org.opensearch.sql.executor.ExecutionEngine;
12+
import org.opensearch.sql.storage.TableScanOperator;
1213

1314
/**
1415
* A plan node which blocks issuing a request in {@link #open} and getting results in {@link
@@ -51,4 +52,22 @@ public ExecutionEngine.Schema schema() {
5152
public void open() {
5253
// no-op, no search should be invoked.
5354
}
55+
56+
/**
57+
* Force cleanup of server-side resources. When a cursor is explicitly closed, any underlying
58+
* table scan must release its resources (e.g. PIT) unconditionally, even if pagination is not
59+
* complete.
60+
*/
61+
@Override
62+
public void close() {
63+
forceCloseChildren(input);
64+
}
65+
66+
private void forceCloseChildren(PhysicalPlan node) {
67+
if (node instanceof TableScanOperator) {
68+
((TableScanOperator) node).forceClose();
69+
} else {
70+
node.getChild().forEach(this::forceCloseChildren);
71+
}
72+
}
5473
}

core/src/main/java/org/opensearch/sql/storage/TableScanOperator.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,15 @@ public List<PhysicalPlan> getChild() {
2626
return Collections.emptyList();
2727
}
2828

29+
/**
30+
* Force cleanup of server-side resources (e.g. PIT, scroll) regardless of pagination state. Used
31+
* when the client explicitly closes a cursor mid-pagination. Default implementation delegates to
32+
* {@link #close()}.
33+
*/
34+
public void forceClose() {
35+
close();
36+
}
37+
2938
/**
3039
* Explain the execution plan.
3140
*

core/src/test/java/org/opensearch/sql/planner/physical/CursorCloseOperatorTest.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.junit.jupiter.api.DisplayNameGeneration;
1616
import org.junit.jupiter.api.DisplayNameGenerator;
1717
import org.junit.jupiter.api.Test;
18+
import org.opensearch.sql.storage.TableScanOperator;
1819

1920
@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class)
2021
public class CursorCloseOperatorTest {
@@ -36,11 +37,23 @@ public void open_is_not_propagated() {
3637
}
3738

3839
@Test
39-
public void close_is_propagated() {
40-
var child = mock(PhysicalPlan.class);
40+
public void close_calls_forceClose_on_table_scan() {
41+
var child = mock(TableScanOperator.class);
4142
var plan = new CursorCloseOperator(child);
4243
plan.close();
43-
verify(child).close();
44+
verify(child).forceClose();
45+
verify(child, never()).close();
46+
}
47+
48+
@Test
49+
public void close_traverses_tree_to_find_table_scan() {
50+
var scan = mock(TableScanOperator.class);
51+
// Wrap the scan in a regular PhysicalPlan node
52+
var middle = mock(PhysicalPlan.class);
53+
org.mockito.Mockito.when(middle.getChild()).thenReturn(java.util.List.of(scan));
54+
var plan = new CursorCloseOperator(middle);
55+
plan.close();
56+
verify(scan).forceClose();
4457
}
4558

4659
@Test

opensearch/src/main/java/org/opensearch/sql/opensearch/data/utils/OpenSearchJsonContent.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import java.util.Map;
1414
import lombok.RequiredArgsConstructor;
1515
import org.apache.commons.lang3.tuple.Pair;
16+
import org.apache.logging.log4j.LogManager;
17+
import org.apache.logging.log4j.Logger;
1618
import org.opensearch.OpenSearchParseException;
1719
import org.opensearch.common.Numbers;
1820
import org.opensearch.common.geo.GeoPoint;
@@ -25,7 +27,7 @@
2527
/** The Implementation of Content to represent {@link JsonNode}. */
2628
@RequiredArgsConstructor
2729
public class OpenSearchJsonContent implements Content {
28-
30+
private static final Logger LOG = LogManager.getLogger();
2931
private final JsonNode value;
3032

3133
@Override
@@ -156,6 +158,9 @@ public Pair<Double, Double> geoValue() {
156158
GeoUtils.parseGeoPoint(parser, point, true);
157159
return Pair.of(point.getLat(), point.getLon());
158160
} catch (IOException ex) {
161+
if (LOG.isDebugEnabled()) {
162+
LOG.debug("Error parsing geo point '{}'", value);
163+
}
159164
throw new OpenSearchParseException("error parsing geo point", ex);
160165
}
161166
}
@@ -175,7 +180,11 @@ private long parseLongValue(JsonNode node) {
175180
}
176181
return Numbers.toLong(node.textValue(), true);
177182
} else {
178-
throw new OpenSearchParseException("node must be a number");
183+
if (LOG.isDebugEnabled()) {
184+
LOG.debug("node '{}' must be a number", node);
185+
}
186+
throw new OpenSearchParseException(
187+
String.format("node must be a number, found %s", node.getNodeType()));
179188
}
180189
}
181190

@@ -189,7 +198,11 @@ private double parseDoubleValue(JsonNode node) {
189198
}
190199
return Double.parseDouble(node.textValue());
191200
} else {
192-
throw new OpenSearchParseException("node must be a number");
201+
if (LOG.isDebugEnabled()) {
202+
LOG.debug("node '{}' must be a number", node);
203+
}
204+
throw new OpenSearchParseException(
205+
String.format("node must be a number, found %s", node.getNodeType()));
193206
}
194207
}
195208

@@ -200,7 +213,11 @@ private boolean parseBooleanValue(JsonNode node) {
200213
} else if (node.isTextual()) {
201214
return Boolean.parseBoolean(node.textValue());
202215
} else {
203-
throw new OpenSearchParseException("node must be a boolean");
216+
if (LOG.isDebugEnabled()) {
217+
LOG.debug("node '{}' must be a boolean", node);
218+
}
219+
throw new OpenSearchParseException(
220+
String.format("node must be a boolean, found %s", node.getNodeType()));
204221
}
205222
}
206223
}

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,8 @@ public ExplainResponseNode visitTableScan(
156156
listener.onResponse(openSearchExplain.apply(plan));
157157
} catch (Exception e) {
158158
listener.onFailure(e);
159+
} finally {
160+
plan.close();
159161
}
160162
});
161163
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ public class OpenSearchIndexScan extends TableScanOperator implements Serializab
4646
/** Number of rows returned. */
4747
private Integer queryCount;
4848

49+
/**
50+
* Whether the cursor (including PIT) has been serialized for a subsequent page request. When
51+
* true, {@link #close()} must preserve the PIT because a future request will resume from it.
52+
*/
53+
private boolean cursorSerialized = false;
54+
4955
/** Search response for current batch. */
5056
private Iterator<ExprValue> iterator;
5157

@@ -110,7 +116,26 @@ private void fetchNextBatch() {
110116
public void close() {
111117
super.close();
112118

113-
client.cleanup(request);
119+
if (request.hasAnotherBatch() && cursorSerialized) {
120+
// PIT has been serialized into a cursor for the next page request.
121+
// Only clean up in-memory state; the PIT must survive for the next request.
122+
client.cleanup(request);
123+
} else {
124+
// No more pages, or query failed/aborted before cursor was serialized.
125+
// Force delete the PIT to prevent leaking.
126+
client.forceCleanup(request);
127+
}
128+
}
129+
130+
/**
131+
* Force cleanup of server-side resources (PIT) regardless of pagination state. Used by {@link
132+
* org.opensearch.sql.planner.physical.CursorCloseOperator} when the client explicitly closes a
133+
* cursor mid-pagination.
134+
*/
135+
@Override
136+
public void forceClose() {
137+
super.close();
138+
client.forceCleanup(request);
114139
}
115140

116141
@Override
@@ -176,5 +201,8 @@ public void writeExternal(ObjectOutput out) throws IOException {
176201
out.write(reqAsBytes, 0, reqOut.size());
177202

178203
out.writeInt(maxResponseSize);
204+
205+
// Mark that the PIT has been serialized into a cursor, so close() preserves it.
206+
cursorSerialized = true;
179207
}
180208
}

opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ void query_empty_result() {
7979
indexScan.open();
8080
assertFalse(indexScan.hasNext());
8181
}
82-
verify(client).cleanup(any());
82+
verify(client).forceCleanup(any());
8383
}
8484

8585
@Test

opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ void query_empty_result() {
223223
indexScan.open();
224224
assertFalse(indexScan.hasNext());
225225
}
226-
verify(client).cleanup(any());
226+
verify(client).forceCleanup(any());
227227
}
228228

229229
@Test
@@ -249,7 +249,7 @@ void query_all_results_with_query() {
249249
() -> assertEquals(employee(3, "Allen", "IT"), indexScan.next()),
250250
() -> assertFalse(indexScan.hasNext()));
251251
}
252-
verify(client).cleanup(any());
252+
verify(client).forceCleanup(any());
253253
}
254254

255255
static final OpenSearchRequest.IndexName EMPLOYEES_INDEX =
@@ -277,7 +277,7 @@ void query_all_results_with_scroll() {
277277
() -> assertEquals(employee(3, "Allen", "IT"), indexScan.next()),
278278
() -> assertFalse(indexScan.hasNext()));
279279
}
280-
verify(client).cleanup(any());
280+
verify(client).forceCleanup(any());
281281
}
282282

283283
@Test
@@ -304,7 +304,7 @@ void query_some_results_with_query() {
304304
() -> assertEquals(employee(3, "Allen", "IT"), indexScan.next()),
305305
() -> assertFalse(indexScan.hasNext()));
306306
}
307-
verify(client).cleanup(any());
307+
verify(client).forceCleanup(any());
308308
}
309309

310310
@Test
@@ -327,7 +327,50 @@ void query_some_results_with_scroll() {
327327
() -> assertEquals(employee(3, "Allen", "IT"), indexScan.next()),
328328
() -> assertFalse(indexScan.hasNext()));
329329
}
330-
verify(client).cleanup(any());
330+
verify(client).forceCleanup(any());
331+
}
332+
333+
/**
334+
* When close() is called mid-pagination without cursor serialization (e.g., query failed or
335+
* aborted), the PIT should be force-deleted to prevent leaking.
336+
*/
337+
@Test
338+
void close_mid_pagination_without_cursor_serialized_should_force_cleanup() {
339+
var request = mock(OpenSearchRequest.class);
340+
when(request.hasAnotherBatch()).thenReturn(true);
341+
var indexScan = new OpenSearchIndexScan(client, request);
342+
indexScan.close();
343+
verify(client).forceCleanup(request);
344+
verify(client, never()).cleanup(any());
345+
}
346+
347+
/**
348+
* When close() is called after cursor has been serialized (normal pagination), the PIT should be
349+
* preserved via cleanup() (in-memory only, no PIT deletion).
350+
*/
351+
@Test
352+
@SneakyThrows
353+
void close_mid_pagination_with_cursor_serialized_should_cleanup() {
354+
var request = mock(OpenSearchRequest.class);
355+
when(request.hasAnotherBatch()).thenReturn(true);
356+
var indexScan = new OpenSearchIndexScan(client, request);
357+
358+
// Simulate successful cursor serialization by calling writeExternal
359+
var out = mock(ObjectOutput.class);
360+
indexScan.writeExternal(out);
361+
362+
indexScan.close();
363+
verify(client).cleanup(request);
364+
verify(client, never()).forceCleanup(any());
365+
}
366+
367+
/** forceClose() should always force-delete the PIT regardless of pagination state. */
368+
@Test
369+
void forceClose_should_always_force_cleanup() {
370+
var request = mock(OpenSearchRequest.class);
371+
var indexScan = new OpenSearchIndexScan(client, request);
372+
indexScan.forceClose();
373+
verify(client).forceCleanup(request);
331374
}
332375

333376
static void mockTwoPageResponse(OpenSearchClient client) {
@@ -360,7 +403,7 @@ void query_results_limited_by_query_size() {
360403
() -> assertEquals(employee(2, "Smith", "HR"), indexScan.next()),
361404
() -> assertFalse(indexScan.hasNext()));
362405
}
363-
verify(client).cleanup(any());
406+
verify(client).forceCleanup(any());
364407
}
365408

366409
@Test

0 commit comments

Comments
 (0)