Skip to content

Commit 2f6c5a6

Browse files
committed
Fix PIT (Point in Time) resource leaks in v2 query engine
Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent 8eaa276 commit 2f6c5a6

7 files changed

Lines changed: 73 additions & 11 deletions

File tree

core/src/main/java/org/opensearch/sql/planner/physical/CursorCloseOperator.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import lombok.RequiredArgsConstructor;
1010
import org.opensearch.sql.data.model.ExprValue;
1111
import org.opensearch.sql.executor.ExecutionEngine;
12+
import org.opensearch.sql.storage.TableScanOperator;
1213

1314
/**
1415
* A plan node which blocks issuing a request in {@link #open} and getting results in {@link
@@ -51,4 +52,22 @@ public ExecutionEngine.Schema schema() {
5152
public void open() {
5253
// no-op, no search should be invoked.
5354
}
55+
56+
/**
57+
* Force cleanup of server-side resources. When a cursor is explicitly closed, any underlying
58+
* table scan must release its resources (e.g. PIT) unconditionally, even if pagination is not
59+
* complete.
60+
*/
61+
@Override
62+
public void close() {
63+
forceCloseChildren(input);
64+
}
65+
66+
private void forceCloseChildren(PhysicalPlan node) {
67+
if (node instanceof TableScanOperator scan) {
68+
scan.forceClose();
69+
} else {
70+
node.getChild().forEach(this::forceCloseChildren);
71+
}
72+
}
5473
}

core/src/main/java/org/opensearch/sql/storage/TableScanOperator.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,15 @@ public List<PhysicalPlan> getChild() {
2626
return Collections.emptyList();
2727
}
2828

29+
/**
30+
* Force cleanup of server-side resources (e.g. PIT, scroll) regardless of pagination state. Used
31+
* when the client explicitly closes a cursor mid-pagination. Default implementation delegates to
32+
* {@link #close()}.
33+
*/
34+
public void forceClose() {
35+
close();
36+
}
37+
2938
/**
3039
* Explain the execution plan.
3140
*

core/src/test/java/org/opensearch/sql/planner/physical/CursorCloseOperatorTest.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.junit.jupiter.api.DisplayNameGeneration;
1616
import org.junit.jupiter.api.DisplayNameGenerator;
1717
import org.junit.jupiter.api.Test;
18+
import org.opensearch.sql.storage.TableScanOperator;
1819

1920
@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class)
2021
public class CursorCloseOperatorTest {
@@ -36,11 +37,23 @@ public void open_is_not_propagated() {
3637
}
3738

3839
@Test
39-
public void close_is_propagated() {
40-
var child = mock(PhysicalPlan.class);
40+
public void close_calls_forceClose_on_table_scan() {
41+
var child = mock(TableScanOperator.class);
4142
var plan = new CursorCloseOperator(child);
4243
plan.close();
43-
verify(child).close();
44+
verify(child).forceClose();
45+
verify(child, never()).close();
46+
}
47+
48+
@Test
49+
public void close_traverses_tree_to_find_table_scan() {
50+
var scan = mock(TableScanOperator.class);
51+
// Wrap the scan in a regular PhysicalPlan node
52+
var middle = mock(PhysicalPlan.class);
53+
org.mockito.Mockito.when(middle.getChild()).thenReturn(java.util.List.of(scan));
54+
var plan = new CursorCloseOperator(middle);
55+
plan.close();
56+
verify(scan).forceClose();
4457
}
4558

4659
@Test

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ public ExplainResponseNode visitTableScan(
145145
listener.onResponse(openSearchExplain.apply(plan));
146146
} catch (Exception e) {
147147
listener.onFailure(e);
148+
} finally {
149+
plan.close();
148150
}
149151
});
150152
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,26 @@ private void fetchNextBatch() {
108108
public void close() {
109109
super.close();
110110

111-
client.cleanup(request);
111+
if (request.hasAnotherBatch()) {
112+
// PIT is still needed for the next page — only clean up the in-memory state
113+
// without deleting the PIT from OpenSearch.
114+
client.cleanup(request);
115+
} else {
116+
// No more pages expected (last page consumed, non-paginated query, or error path).
117+
// Force delete the PIT to prevent leaking.
118+
client.forceCleanup(request);
119+
}
120+
}
121+
122+
/**
123+
* Force cleanup of server-side resources (PIT) regardless of pagination state. Used by {@link
124+
* org.opensearch.sql.planner.physical.CursorCloseOperator} when the client explicitly closes a
125+
* cursor mid-pagination.
126+
*/
127+
@Override
128+
public void forceClose() {
129+
super.close();
130+
client.forceCleanup(request);
112131
}
113132

114133
@Override

opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ void query_empty_result() {
7676
indexScan.open();
7777
assertFalse(indexScan.hasNext());
7878
}
79-
verify(client).cleanup(any());
79+
verify(client).forceCleanup(any());
8080
}
8181

8282
@Test

opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ void query_empty_result() {
174174
indexScan.open();
175175
assertFalse(indexScan.hasNext());
176176
}
177-
verify(client).cleanup(any());
177+
verify(client).forceCleanup(any());
178178
}
179179

180180
@Test
@@ -200,7 +200,7 @@ void query_all_results_with_query() {
200200
() -> assertEquals(employee(3, "Allen", "IT"), indexScan.next()),
201201
() -> assertFalse(indexScan.hasNext()));
202202
}
203-
verify(client).cleanup(any());
203+
verify(client).forceCleanup(any());
204204
}
205205

206206
static final OpenSearchRequest.IndexName EMPLOYEES_INDEX =
@@ -228,7 +228,7 @@ void query_all_results_with_scroll() {
228228
() -> assertEquals(employee(3, "Allen", "IT"), indexScan.next()),
229229
() -> assertFalse(indexScan.hasNext()));
230230
}
231-
verify(client).cleanup(any());
231+
verify(client).forceCleanup(any());
232232
}
233233

234234
@Test
@@ -255,7 +255,7 @@ void query_some_results_with_query() {
255255
() -> assertEquals(employee(3, "Allen", "IT"), indexScan.next()),
256256
() -> assertFalse(indexScan.hasNext()));
257257
}
258-
verify(client).cleanup(any());
258+
verify(client).forceCleanup(any());
259259
}
260260

261261
@Test
@@ -278,7 +278,7 @@ void query_some_results_with_scroll() {
278278
() -> assertEquals(employee(3, "Allen", "IT"), indexScan.next()),
279279
() -> assertFalse(indexScan.hasNext()));
280280
}
281-
verify(client).cleanup(any());
281+
verify(client).forceCleanup(any());
282282
}
283283

284284
static void mockTwoPageResponse(OpenSearchClient client) {
@@ -311,7 +311,7 @@ void query_results_limited_by_query_size() {
311311
() -> assertEquals(employee(2, "Smith", "HR"), indexScan.next()),
312312
() -> assertFalse(indexScan.hasNext()));
313313
}
314-
verify(client).cleanup(any());
314+
verify(client).forceCleanup(any());
315315
}
316316

317317
@Test

0 commit comments

Comments
 (0)