Skip to content

Commit 96c8b3e

Browse files
addressing comments
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
1 parent 78984d1 commit 96c8b3e

11 files changed

Lines changed: 52 additions & 132 deletions

File tree

distribution/build.gradle

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -621,7 +621,13 @@ subprojects {
621621
}
622622
}
623623

624-
['archives:darwin-tar'
624+
['archives:darwin-tar',
625+
'archives:integ-test-zip',
626+
'archives:linux-arm64-tar',
627+
'archives:linux-tar',
628+
'archives:windows-zip',
629+
'packages:arm64-rpm', 'packages:arm64-deb',
630+
'packages:rpm', 'packages:deb'
625631
].forEach { subName ->
626632
Project subproject = project("${project.path}:${subName}")
627633
Configuration configuration = configurations.create(subproject.name)

server/src/main/java/org/opensearch/action/ActionModule.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,6 @@
241241
import org.opensearch.action.search.GetAllPitsAction;
242242
import org.opensearch.action.search.MultiSearchAction;
243243
import org.opensearch.action.search.NodesGetAllPitsAction;
244-
import org.opensearch.action.search.PitGetAllPitsAction;
245244
import org.opensearch.action.search.SearchAction;
246245
import org.opensearch.action.search.SearchScrollAction;
247246
import org.opensearch.action.search.TransportClearScrollAction;
@@ -250,7 +249,6 @@
250249
import org.opensearch.action.search.TransportGetAllPitsAction;
251250
import org.opensearch.action.search.TransportMultiSearchAction;
252251
import org.opensearch.action.search.TransportNodesGetAllPitsAction;
253-
import org.opensearch.action.search.TransportPitGetAllPitsAction;
254252
import org.opensearch.action.search.TransportSearchAction;
255253
import org.opensearch.action.search.TransportSearchScrollAction;
256254
import org.opensearch.action.support.ActionFilters;
@@ -683,7 +681,6 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
683681
actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class);
684682
actions.register(PitSegmentsAction.INSTANCE, TransportPitSegmentsAction.class);
685683
actions.register(NodesGetAllPitsAction.INSTANCE, TransportNodesGetAllPitsAction.class);
686-
actions.register(PitGetAllPitsAction.INSTANCE, TransportPitGetAllPitsAction.class);
687684

688685
// Remote Store
689686
actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class);

server/src/main/java/org/opensearch/action/search/DeletePitAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
public class DeletePitAction extends ActionType<DeletePitResponse> {
1717

1818
public static final DeletePitAction INSTANCE = new DeletePitAction();
19-
public static final String NAME = "cluster:admin/pit/delete";
19+
public static final String NAME = "cluster:admin/point_in_time/delete";
2020

2121
private DeletePitAction() {
2222
super(NAME, DeletePitResponse::new);

server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
public class GetAllPitsAction extends ActionType<GetAllPitNodesResponse> {
1717
public static final GetAllPitsAction INSTANCE = new GetAllPitsAction();
18-
public static final String NAME = "cluster:admin/pit/read";
18+
public static final String NAME = "cluster:admin/point_in_time/read";
1919

2020
private GetAllPitsAction() {
2121
super(NAME, GetAllPitNodesResponse::new);

server/src/main/java/org/opensearch/action/search/NodesGetAllPitsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
*/
1515
public class NodesGetAllPitsAction extends ActionType<GetAllPitNodesResponse> {
1616
public static final NodesGetAllPitsAction INSTANCE = new NodesGetAllPitsAction();
17-
public static final String NAME = "cluster:admin/pit/readall";
17+
public static final String NAME = "cluster:admin/point_in_time/read_from_nodes";
1818

1919
private NodesGetAllPitsAction() {
2020
super(NAME, GetAllPitNodesResponse::new);

server/src/main/java/org/opensearch/action/search/PitGetAllPitsAction.java

Lines changed: 0 additions & 23 deletions
This file was deleted.

server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java

Lines changed: 12 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -8,79 +8,28 @@
88

99
package org.opensearch.action.search;
1010

11-
import org.opensearch.action.FailedNodeException;
11+
import org.opensearch.action.ActionListener;
1212
import org.opensearch.action.support.ActionFilters;
13-
import org.opensearch.action.support.nodes.TransportNodesAction;
14-
import org.opensearch.cluster.service.ClusterService;
13+
import org.opensearch.action.support.HandledTransportAction;
1514
import org.opensearch.common.inject.Inject;
16-
import org.opensearch.common.io.stream.StreamInput;
17-
import org.opensearch.search.SearchService;
18-
import org.opensearch.threadpool.ThreadPool;
15+
import org.opensearch.tasks.Task;
1916
import org.opensearch.transport.TransportService;
2017

21-
import java.io.IOException;
22-
import java.util.List;
23-
2418
/**
25-
* Transport action to get all active PIT contexts across all nodes
19+
* Transport action to get all active PIT contexts in the cluster
2620
*/
27-
public class TransportGetAllPitsAction extends TransportNodesAction<
28-
GetAllPitNodesRequest,
29-
GetAllPitNodesResponse,
30-
GetAllPitNodeRequest,
31-
GetAllPitNodeResponse> {
32-
private final SearchService searchService;
33-
34-
@Inject
35-
public TransportGetAllPitsAction(
36-
ThreadPool threadPool,
37-
ClusterService clusterService,
38-
TransportService transportService,
39-
ActionFilters actionFilters,
40-
SearchService searchService
41-
) {
42-
super(
43-
GetAllPitsAction.NAME,
44-
threadPool,
45-
clusterService,
46-
transportService,
47-
actionFilters,
48-
GetAllPitNodesRequest::new,
49-
GetAllPitNodeRequest::new,
50-
ThreadPool.Names.SAME,
51-
GetAllPitNodeResponse.class
52-
);
53-
this.searchService = searchService;
54-
}
21+
public class TransportGetAllPitsAction extends HandledTransportAction<GetAllPitNodesRequest, GetAllPitNodesResponse> {
5522

56-
@Override
57-
protected GetAllPitNodesResponse newResponse(
58-
GetAllPitNodesRequest request,
59-
List<GetAllPitNodeResponse> getAllPitNodeRespons,
60-
List<FailedNodeException> failures
61-
) {
62-
return new GetAllPitNodesResponse(clusterService.getClusterName(), getAllPitNodeRespons, failures);
63-
}
23+
private final PitService pitService;
6424

65-
@Override
66-
protected GetAllPitNodeRequest newNodeRequest(GetAllPitNodesRequest request) {
67-
return new GetAllPitNodeRequest();
68-
}
69-
70-
@Override
71-
protected GetAllPitNodeResponse newNodeResponse(StreamInput in) throws IOException {
72-
return new GetAllPitNodeResponse(in);
25+
@Inject
26+
public TransportGetAllPitsAction(ActionFilters actionFilters, TransportService transportService, PitService pitService) {
27+
super(GetAllPitsAction.NAME, transportService, actionFilters, in -> new GetAllPitNodesRequest(in));
28+
this.pitService = pitService;
7329
}
7430

75-
/**
76-
* This retrieves all active PITs in the node
77-
*/
7831
@Override
79-
protected GetAllPitNodeResponse nodeOperation(GetAllPitNodeRequest request) {
80-
GetAllPitNodeResponse nodeResponse = new GetAllPitNodeResponse(
81-
transportService.getLocalNode(),
82-
searchService.getAllPITReaderContexts()
83-
);
84-
return nodeResponse;
32+
protected void doExecute(Task task, GetAllPitNodesRequest request, ActionListener<GetAllPitNodesResponse> listener) {
33+
pitService.getAllPits(listener);
8534
}
8635
}

server/src/main/java/org/opensearch/action/search/TransportPitGetAllPitsAction.java

Lines changed: 0 additions & 35 deletions
This file was deleted.

server/src/main/java/org/opensearch/client/support/AbstractClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -337,11 +337,11 @@
337337
import org.opensearch.action.search.DeletePitResponse;
338338
import org.opensearch.action.search.GetAllPitNodesRequest;
339339
import org.opensearch.action.search.GetAllPitNodesResponse;
340+
import org.opensearch.action.search.GetAllPitsAction;
340341
import org.opensearch.action.search.MultiSearchAction;
341342
import org.opensearch.action.search.MultiSearchRequest;
342343
import org.opensearch.action.search.MultiSearchRequestBuilder;
343344
import org.opensearch.action.search.MultiSearchResponse;
344-
import org.opensearch.action.search.PitGetAllPitsAction;
345345
import org.opensearch.action.search.SearchAction;
346346
import org.opensearch.action.search.SearchRequest;
347347
import org.opensearch.action.search.SearchRequestBuilder;
@@ -1274,12 +1274,12 @@ public ActionFuture<AcknowledgedResponse> deleteDanglingIndex(DeleteDanglingInde
12741274

12751275
@Override
12761276
public void getAllPits(GetAllPitNodesRequest request, ActionListener<GetAllPitNodesResponse> listener) {
1277-
execute(PitGetAllPitsAction.INSTANCE, request, listener);
1277+
execute(GetAllPitsAction.INSTANCE, request, listener);
12781278
}
12791279

12801280
@Override
12811281
public ActionFuture<GetAllPitNodesResponse> getAllPits(GetAllPitNodesRequest request) {
1282-
return execute(PitGetAllPitsAction.INSTANCE, request);
1282+
return execute(GetAllPitsAction.INSTANCE, request);
12831283
}
12841284

12851285
@Override

server/src/main/java/org/opensearch/search/SearchService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -881,6 +881,7 @@ public void createPitReaderContext(ShardId shardId, TimeValue keepAlive, ActionL
881881
shard.awaitShardSearchActive(ignored -> {
882882
Engine.SearcherSupplier searcherSupplier = null;
883883
ReaderContext readerContext = null;
884+
Releasable decreasePitContexts = openPitContexts::decrementAndGet;
884885
try {
885886
if (openPitContexts.incrementAndGet() > maxOpenPitContext) {
886887
throw new OpenSearchRejectedExecutionException(
@@ -902,15 +903,16 @@ public void createPitReaderContext(ShardId shardId, TimeValue keepAlive, ActionL
902903
searchOperationListener.onNewPitContext(finalReaderContext);
903904

904905
readerContext.addOnClose(() -> {
905-
openPitContexts.decrementAndGet();
906906
searchOperationListener.onFreeReaderContext(finalReaderContext);
907907
searchOperationListener.onFreePitContext(finalReaderContext);
908908
});
909+
readerContext.addOnClose(decreasePitContexts);
909910
// add the newly created pit reader context to active readers
910911
putReaderContext(readerContext);
911912
readerContext = null;
912913
listener.onResponse(finalReaderContext.id());
913914
} catch (Exception exc) {
915+
Releasables.closeWhileHandlingException(decreasePitContexts);
914916
Releasables.closeWhileHandlingException(searcherSupplier, readerContext);
915917
listener.onFailure(exc);
916918
}

0 commit comments

Comments
 (0)