Skip to content

Commit 7e2e188

Browse files
javadoc changes and more
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
1 parent 01e320f commit 7e2e188

20 files changed

Lines changed: 158 additions & 136 deletions

CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1818
- [Remote cluster state] Upload global metadata in cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
1919
- [Remote cluster state] Download functionality of global metadata from remote store ([#10535](https://github.com/opensearch-project/OpenSearch/pull/10535))
2020
- [Remote cluster state] Restore global metadata from remote store when local state is lost after quorum loss ([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
21-
- [AdmissionControl] Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))
22-
- [AdmissionControl] Added changes to integrade cpu AC to ResourceUsageCollector and Emit Stats
21+
- [Admission Control] Add changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))
22+
- [Admission Control] Add changes to integrate CPU AC and ResourceUsageCollector with Stats ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))
2323

2424
### Dependencies
2525
- Bump `log4j-core` from 2.18.0 to 2.19.0

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,8 @@ public NodeStats(StreamInput in) throws IOException {
230230
} else {
231231
repositoriesStats = null;
232232
}
233-
if(in.getVersion().onOrAfter(Version.V_3_0_0)) {
233+
// TODO: change to V_2_12_0 on main after backport to 2.x
234+
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
234235
admissionControlStats = in.readOptionalWriteable(AdmissionControlStats::new);
235236
} else {
236237
admissionControlStats = null;
@@ -504,6 +505,10 @@ public void writeTo(StreamOutput out) throws IOException {
504505
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
505506
out.writeOptionalWriteable(repositoriesStats);
506507
}
508+
// TODO: change to V_2_12_0 on main after backport to 2.x
509+
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
510+
out.writeOptionalWriteable(admissionControlStats);
511+
}
507512
}
508513

509514
@Override

server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
171171
false,
172172
false,
173173
false,
174+
false,
174175
false
175176
);
176177
List<ShardStats> shardsStats = new ArrayList<>();

server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,8 @@ protected TransportReplicationAction(
221221

222222
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);
223223

224-
if(transportPrimaryAction.equals(TransportShardBulkAction.ACTION_NAME + PRIMARY_ACTION_SUFFIX)){
224+
// Register only TransportShardBulkAction for admission control ( primary indexing action )
225+
if (transportPrimaryAction.equals(TransportShardBulkAction.ACTION_NAME + PRIMARY_ACTION_SUFFIX)) {
225226
transportService.registerRequestHandler(
226227
transportPrimaryAction,
227228
executor,

server/src/main/java/org/opensearch/common/network/NetworkModule.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,16 +300,19 @@ public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
300300
return actualHandler;
301301
}
302302

303+
/**
304+
* Intercept the transport action and perform admission control if applicable
305+
*/
303306
@Override
304307
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
305308
String action,
306309
String executor,
307310
boolean forceExecution,
308311
TransportRequestHandler<T> actualHandler,
309-
AdmissionControlActionType transportActionType
312+
AdmissionControlActionType admissionControlActionType
310313
) {
311314
for (TransportInterceptor interceptor : this.transportInterceptors) {
312-
actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler, transportActionType);
315+
actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler, admissionControlActionType);
313316
}
314317
return actualHandler;
315318
}

server/src/main/java/org/opensearch/node/NodeService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ public NodeStats stats(
269269
searchPipelineStats ? this.searchPipelineService.stats() : null,
270270
segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null,
271271
repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null,
272-
admissionControl ? this.admissionControlService.stats(): null
272+
admissionControl ? this.admissionControlService.stats() : null
273273
);
274274
}
275275

server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,13 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.opensearch.cluster.service.ClusterService;
14-
import org.opensearch.common.settings.ClusterSettings;
1514
import org.opensearch.common.settings.Settings;
1615
import org.opensearch.node.ResourceUsageCollectorService;
1716
import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController;
1817
import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController;
1918
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
2019
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats;
21-
import org.opensearch.ratelimitting.admissioncontrol.stats.BaseAdmissionControllerStats;
22-
import org.opensearch.ratelimitting.admissioncontrol.stats.CPUBasedAdmissionControllerStats;
20+
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats;
2321
import org.opensearch.threadpool.ThreadPool;
2422

2523
import java.util.ArrayList;
@@ -47,8 +45,14 @@ public class AdmissionControlService {
4745
* @param settings Immutable settings instance
4846
* @param clusterService ClusterService Instance
4947
* @param threadPool ThreadPool Instance
48+
* @param resourceUsageCollectorService Instance used to get node resource usage stats
5049
*/
51-
public AdmissionControlService(Settings settings, ClusterService clusterService, ThreadPool threadPool, ResourceUsageCollectorService resourceUsageCollectorService) {
50+
public AdmissionControlService(
51+
Settings settings,
52+
ClusterService clusterService,
53+
ThreadPool threadPool,
54+
ResourceUsageCollectorService resourceUsageCollectorService
55+
) {
5256
this.threadPool = threadPool;
5357
this.admissionControlSettings = new AdmissionControlSettings(clusterService.getClusterSettings(), settings);
5458
this.ADMISSION_CONTROLLERS = new ConcurrentHashMap<>();
@@ -72,7 +76,9 @@ private void initialise() {
7276
* @param admissionControlActionType type of the admissionControllerActionType
7377
*/
7478
public void applyTransportAdmissionControl(String action, AdmissionControlActionType admissionControlActionType) {
75-
this.ADMISSION_CONTROLLERS.forEach((name, admissionController) -> { admissionController.apply(action, admissionControlActionType); });
79+
this.ADMISSION_CONTROLLERS.forEach(
80+
(name, admissionController) -> { admissionController.apply(action, admissionControlActionType); }
81+
);
7682
}
7783

7884
/**
@@ -90,7 +96,12 @@ public void registerAdmissionController(String admissionControllerName) {
9096
private AdmissionController controllerFactory(String admissionControllerName) {
9197
switch (admissionControllerName) {
9298
case CPU_BASED_ADMISSION_CONTROLLER:
93-
return new CPUBasedAdmissionController(admissionControllerName, this.settings, this.clusterService, this.resourceUsageCollectorService);
99+
return new CPUBasedAdmissionController(
100+
admissionControllerName,
101+
this.settings,
102+
this.clusterService,
103+
this.resourceUsageCollectorService
104+
);
94105
default:
95106
throw new IllegalArgumentException("Not Supported AdmissionController : " + admissionControllerName);
96107
}
@@ -113,26 +124,15 @@ public AdmissionController getAdmissionController(String controllerName) {
113124
return this.ADMISSION_CONTROLLERS.getOrDefault(controllerName, null);
114125
}
115126

116-
public AdmissionControlStats stats(){
117-
List<BaseAdmissionControllerStats> statsList = new ArrayList<>();
118-
if(this.ADMISSION_CONTROLLERS.size() > 0){
127+
public AdmissionControlStats stats() {
128+
List<AdmissionControllerStats> statsList = new ArrayList<>();
129+
if (this.ADMISSION_CONTROLLERS.size() > 0) {
119130
this.ADMISSION_CONTROLLERS.forEach((controllerName, admissionController) -> {
120-
BaseAdmissionControllerStats admissionControllerStats = controllerStatsFactory(admissionController);
121-
if(admissionControllerStats != null) {
122-
statsList.add(admissionControllerStats);
123-
}
131+
AdmissionControllerStats admissionControllerStats = new AdmissionControllerStats(admissionController, controllerName);
132+
statsList.add(admissionControllerStats);
124133
});
125134
return new AdmissionControlStats(statsList);
126135
}
127136
return null;
128137
}
129-
130-
private BaseAdmissionControllerStats controllerStatsFactory(AdmissionController admissionController) {
131-
switch (admissionController.getName()) {
132-
case CPU_BASED_ADMISSION_CONTROLLER:
133-
return new CPUBasedAdmissionControllerStats(admissionController);
134-
default:
135-
return null;
136-
}
137-
}
138138
}

server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.opensearch.cluster.service.ClusterService;
1212
import org.opensearch.common.util.concurrent.ConcurrentCollections;
1313
import org.opensearch.node.ResourceUsageCollectorService;
14-
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService;
1514
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
1615
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
1716

@@ -33,11 +32,16 @@ public abstract class AdmissionController {
3332
public final ClusterService clusterService;
3433

3534
/**
36-
* @param rejectionCount initialised rejectionCount value for AdmissionController
37-
* @param admissionControllerName name of the admissionController
35+
* @param admissionControllerName name of the admissionController
36+
* @param rejectionCount initialised rejectionCount value for AdmissionController
37+
* @param resourceUsageCollectorService instance used to get resource usage stats of the node
3838
* @param clusterService
3939
*/
40-
public AdmissionController(AtomicLong rejectionCount, String admissionControllerName, ResourceUsageCollectorService resourceUsageCollectorService, ClusterService clusterService) {
40+
public AdmissionController(
41+
String admissionControllerName, AtomicLong rejectionCount,
42+
ResourceUsageCollectorService resourceUsageCollectorService,
43+
ClusterService clusterService
44+
) {
4145
this.rejectionCount = rejectionCount;
4246
this.admissionControllerName = admissionControllerName;
4347
this.resourceUsageCollectorService = resourceUsageCollectorService;
@@ -62,8 +66,7 @@ public Boolean isAdmissionControllerEnforced(AdmissionControlMode admissionContr
6266
}
6367

6468
/**
65-
* Increment the tracking-objects and apply the admission control if threshold is breached.
66-
* Mostly applicable while applying admission controller
69+
* Apply admission control based on the resource usage for an action
6770
*/
6871
public abstract void apply(String action, AdmissionControlActionType admissionControlActionType);
6972

@@ -74,9 +77,12 @@ public String getName() {
7477
return this.admissionControllerName;
7578
}
7679

80+
/**
81+
* Add rejection count to the rejection count metric tracked by the admission-controller
82+
*/
7783
public void addRejectionCount(String admissionControlActionType, long count) {
7884
AtomicLong updatedCount = new AtomicLong(0);
79-
if(this.rejectionCountMap.containsKey(admissionControlActionType)){
85+
if (this.rejectionCountMap.containsKey(admissionControlActionType)) {
8086
updatedCount.addAndGet(this.rejectionCountMap.get(admissionControlActionType).get());
8187
}
8288
updatedCount.addAndGet(count);
@@ -91,6 +97,9 @@ public long getRejectionCount(String admissionControlActionType) {
9197
return rejectionCount.get();
9298
}
9399

100+
/**
101+
* Get rejection stats of the admission controller
102+
*/
94103
public Map<String, Long> getRejectionStats() {
95104
Map<String, Long> rejectionStats = new HashMap<>();
96105
rejectionCountMap.forEach((actionType, count) -> rejectionStats.put(actionType, count.get()));

server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,8 @@
1616
import org.opensearch.node.NodeResourceUsageStats;
1717
import org.opensearch.node.ResourceUsageCollectorService;
1818
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
19-
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
2019
import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings;
2120

22-
import java.util.HashMap;
23-
import java.util.Map;
2421
import java.util.Optional;
2522
import java.util.concurrent.atomic.AtomicLong;
2623

@@ -35,54 +32,82 @@ public class CPUBasedAdmissionController extends AdmissionController {
3532
/**
3633
*
3734
* @param admissionControllerName State of the admission controller
35+
* @param settings Immutable settings instance
36+
* @param clusterService ClusterService Instance
37+
* @param resourceUsageCollectorService Instance used to get node resource usage stats
3838
*/
39-
public CPUBasedAdmissionController(String admissionControllerName, Settings settings, ClusterService clusterService, ResourceUsageCollectorService resourceUsageCollectorService) {
40-
super(new AtomicLong(0), admissionControllerName, resourceUsageCollectorService, clusterService);
39+
public CPUBasedAdmissionController(
40+
String admissionControllerName,
41+
Settings settings,
42+
ClusterService clusterService,
43+
ResourceUsageCollectorService resourceUsageCollectorService
44+
) {
45+
super(admissionControllerName, new AtomicLong(0), resourceUsageCollectorService, clusterService);
4146
this.settings = new CPUBasedAdmissionControllerSettings(clusterService.getClusterSettings(), settings);
4247
}
4348

4449
/**
45-
* This function will take of applying admission controller based on CPU usage
50+
* Apply admission control based on process CPU usage
4651
* @param action is the transport action
4752
*/
4853
@Override
4954
public void apply(String action, AdmissionControlActionType admissionControlActionType) {
50-
// TODO Will extend this logic further currently just incrementing rejectionCount
5155
if (this.isEnabledForTransportLayer(this.settings.getTransportLayerAdmissionControllerMode())) {
5256
this.applyForTransportLayer(action, admissionControlActionType);
5357
}
5458
}
5559

60+
/**
61+
* Apply transport layer admission control if configured limit has been reached
62+
*/
5663
private void applyForTransportLayer(String actionName, AdmissionControlActionType admissionControlActionType) {
57-
if (isLimitsBreached(admissionControlActionType)) {
64+
if (isLimitsBreached(actionName, admissionControlActionType)) {
5865
this.addRejectionCount(admissionControlActionType.getType(), 1);
5966
if (this.isAdmissionControllerEnforced(this.settings.getTransportLayerAdmissionControllerMode())) {
60-
throw new OpenSearchRejectedExecutionException("Action ["+ actionName +"] was rejected due to CPU usage admission controller limit breached");
67+
throw new OpenSearchRejectedExecutionException(
68+
String.format("CPU usage admission controller limit reached for action [%s]", admissionControlActionType.name())
69+
);
6170
}
6271
}
6372
}
6473

65-
private boolean isLimitsBreached(AdmissionControlActionType transportActionType) {
66-
long maxCpuLimit = this.getCpuRejectionThreshold(transportActionType);
67-
Optional<NodeResourceUsageStats> nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics(this.clusterService.state().nodes().getLocalNodeId());
68-
if(nodePerformanceStatistics.isPresent()) {
69-
double cpuUsage = nodePerformanceStatistics.get().getCpuUtilizationPercent();
70-
if (cpuUsage >= maxCpuLimit){
71-
LOGGER.warn("CpuBasedAdmissionController rejected the request as the current CPU usage [" +
72-
cpuUsage + "%] exceeds the allowed limit [" + maxCpuLimit + "%]");
73-
return true;
74+
/**
75+
* Check if resource usage limits are breached for an admission control
76+
*/
77+
private boolean isLimitsBreached(String actionName, AdmissionControlActionType admissionControlActionType) {
78+
// check if cluster state is ready
79+
if (clusterService.state() != null && clusterService.state().nodes() != null) {
80+
long maxCpuLimit = this.getCpuRejectionThreshold(admissionControlActionType);
81+
Optional<NodeResourceUsageStats> nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics(
82+
this.clusterService.state().nodes().getLocalNodeId()
83+
);
84+
if (nodePerformanceStatistics.isPresent()) {
85+
double cpuUsage = nodePerformanceStatistics.get().getCpuUtilizationPercent();
86+
if (cpuUsage >= maxCpuLimit) {
87+
LOGGER.warn(
88+
"CpuBasedAdmissionController rejected the request as the current CPU " +
89+
"usage [{}] exceeds the allowed limit [{}] for transport action [{}]", cpuUsage, maxCpuLimit, actionName
90+
);
91+
return true;
92+
}
7493
}
7594
}
7695
return false;
7796
}
78-
private long getCpuRejectionThreshold(AdmissionControlActionType transportActionType) {
79-
switch (transportActionType) {
97+
98+
/**
99+
* Get CPU rejection threshold for action type
100+
*/
101+
private long getCpuRejectionThreshold(AdmissionControlActionType admissionControlActionType) {
102+
switch (admissionControlActionType) {
80103
case SEARCH:
81104
return this.settings.getSearchCPULimit();
82105
case INDEXING:
83106
return this.settings.getIndexingCPULimit();
84107
default:
85-
throw new IllegalArgumentException("Not Supported TransportAction Type: " + transportActionType.getType());
108+
throw new IllegalArgumentException(
109+
"Admission control not Supported for AdmissionControlActionType: " + admissionControlActionType.getType()
110+
);
86111
}
87112
}
88113
}

0 commit comments

Comments
 (0)