Skip to content

Commit 79cdc1c

Browse files
committed
fix compile error
Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent b0fd263 commit 79cdc1c

7 files changed

Lines changed: 75 additions & 2 deletions

File tree

core/src/main/java/org/opensearch/sql/planner/physical/CursorCloseOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ public void close() {
6464
}
6565

6666
private void forceCloseChildren(PhysicalPlan node) {
67-
if (node instanceof TableScanOperator scan) {
68-
scan.forceClose();
67+
if (node instanceof TableScanOperator) {
68+
((TableScanOperator) node).forceClose();
6969
} else {
7070
node.getChild().forEach(this::forceCloseChildren);
7171
}

opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ public interface OpenSearchClient {
7676
*/
7777
Map<String, String> meta();
7878

79+
/**
80+
* Force to clean up resources related to the search request.
81+
*
82+
* @param request search request
83+
*/
84+
void forceCleanup(OpenSearchRequest request);
85+
7986
/**
8087
* Clean up resources related to the search request, for example scroll context.
8188
*

opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,27 @@ public Map<String, String> meta() {
166166
client.settings().get("plugins.sql.pagination.api", "true"));
167167
}
168168

169+
@Override
170+
public void forceCleanup(OpenSearchRequest request) {
171+
if (request instanceof OpenSearchScrollRequest) {
172+
request.forceClean(
173+
scrollId -> {
174+
try {
175+
client.prepareClearScroll().addScrollId(scrollId).get();
176+
} catch (Exception e) {
177+
throw new IllegalStateException(
178+
"Failed to clean up resources for search request " + request, e);
179+
}
180+
});
181+
} else {
182+
request.forceClean(
183+
pitId -> {
184+
DeletePitRequest deletePitRequest = new DeletePitRequest(pitId);
185+
deletePit(deletePitRequest);
186+
});
187+
}
188+
}
189+
169190
@Override
170191
public void cleanup(OpenSearchRequest request) {
171192
if (request instanceof OpenSearchScrollRequest) {

opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,29 @@ public Map<String, String> meta() {
175175
}
176176
}
177177

178+
@Override
179+
public void forceCleanup(OpenSearchRequest request) {
180+
if (request instanceof OpenSearchScrollRequest) {
181+
request.forceClean(
182+
scrollId -> {
183+
try {
184+
ClearScrollRequest clearRequest = new ClearScrollRequest();
185+
clearRequest.addScrollId(scrollId);
186+
client.clearScroll(clearRequest, RequestOptions.DEFAULT);
187+
} catch (IOException e) {
188+
throw new IllegalStateException(
189+
"Failed to clean up resources for search request " + request, e);
190+
}
191+
});
192+
} else {
193+
request.forceClean(
194+
pitId -> {
195+
DeletePitRequest deletePitRequest = new DeletePitRequest(pitId);
196+
deletePit(deletePitRequest);
197+
});
198+
}
199+
}
200+
178201
@Override
179202
public void cleanup(OpenSearchRequest request) {
180203
if (request instanceof OpenSearchScrollRequest) {

opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,19 @@ public void clean(Consumer<String> cleanAction) {
221221
}
222222
}
223223

224+
@Override
225+
public void forceClean(Consumer<String> cleanAction) {
226+
try {
227+
if (this.pitId != null) {
228+
cleanAction.accept(this.pitId);
229+
searchDone = true;
230+
}
231+
} finally {
232+
this.pitId = null;
233+
this.searchAfter = null;
234+
}
235+
}
236+
224237
@Override
225238
public boolean hasAnotherBatch() {
226239
if (this.pitId != null) {

opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ OpenSearchResponse search(
4343
*/
4444
void clean(Consumer<String> cleanAction);
4545

46+
/** Force clean the request. */
47+
void forceClean(Consumer<String> cleanAction);
48+
4649
/**
4750
* Get the OpenSearchExprValueFactory.
4851
*

opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,12 @@ public void clean(Consumer<String> cleanAction) {
123123
}
124124
}
125125

126+
@Override
127+
public void forceClean(Consumer<String> cleanAction) {
128+
throw new UnsupportedOperationException(
129+
"Force clean is unsupported in OpenSearchScrollRequest");
130+
}
131+
126132
/**
127133
* Is scroll started which means pages after first is being requested.
128134
*

0 commit comments

Comments
 (0)