Skip to content

Commit d9d07eb

Browse files
javadoc changes and more
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
1 parent 01e320f commit d9d07eb

21 files changed

Lines changed: 182 additions & 152 deletions

CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1818
- [Remote cluster state] Upload global metadata in cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
1919
- [Remote cluster state] Download functionality of global metadata from remote store ([#10535](https://github.com/opensearch-project/OpenSearch/pull/10535))
2020
- [Remote cluster state] Restore global metadata from remote store when local state is lost after quorum loss ([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
21-
- [AdmissionControl] Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))
22-
- [AdmissionControl] Added changes to integrade cpu AC to ResourceUsageCollector and Emit Stats
21+
- [Admission Control] Add changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))
22+
- [Admission Control] Add changes to integrate CPU AC and ResourceUsageCollector with Stats ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))
2323

2424
### Dependencies
2525
- Bump `log4j-core` from 2.18.0 to 2.19.0

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,8 @@ public NodeStats(StreamInput in) throws IOException {
230230
} else {
231231
repositoriesStats = null;
232232
}
233-
if(in.getVersion().onOrAfter(Version.V_3_0_0)) {
233+
// TODO: change to V_2_12_0 on main after backport to 2.x
234+
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
234235
admissionControlStats = in.readOptionalWriteable(AdmissionControlStats::new);
235236
} else {
236237
admissionControlStats = null;
@@ -504,6 +505,10 @@ public void writeTo(StreamOutput out) throws IOException {
504505
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
505506
out.writeOptionalWriteable(repositoriesStats);
506507
}
508+
// TODO: change to V_2_12_0 on main after backport to 2.x
509+
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
510+
out.writeOptionalWriteable(admissionControlStats);
511+
}
507512
}
508513

509514
@Override

server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
171171
false,
172172
false,
173173
false,
174+
false,
174175
false
175176
);
176177
List<ShardStats> shardsStats = new ArrayList<>();

server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,8 @@ protected TransportReplicationAction(
221221

222222
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);
223223

224-
if(transportPrimaryAction.equals(TransportShardBulkAction.ACTION_NAME + PRIMARY_ACTION_SUFFIX)){
224+
// Register only TransportShardBulkAction for admission control ( primary indexing action )
225+
if (transportPrimaryAction.equals(TransportShardBulkAction.ACTION_NAME + PRIMARY_ACTION_SUFFIX)) {
225226
transportService.registerRequestHandler(
226227
transportPrimaryAction,
227228
executor,

server/src/main/java/org/opensearch/common/network/NetworkModule.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,16 +300,19 @@ public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
300300
return actualHandler;
301301
}
302302

303+
/**
304+
* Intercept the transport action and perform admission control if applicable
305+
*/
303306
@Override
304307
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
305308
String action,
306309
String executor,
307310
boolean forceExecution,
308311
TransportRequestHandler<T> actualHandler,
309-
AdmissionControlActionType transportActionType
312+
AdmissionControlActionType admissionControlActionType
310313
) {
311314
for (TransportInterceptor interceptor : this.transportInterceptors) {
312-
actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler, transportActionType);
315+
actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler, admissionControlActionType);
313316
}
314317
return actualHandler;
315318
}

server/src/main/java/org/opensearch/node/NodeService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ public NodeStats stats(
269269
searchPipelineStats ? this.searchPipelineService.stats() : null,
270270
segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null,
271271
repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null,
272-
admissionControl ? this.admissionControlService.stats(): null
272+
admissionControl ? this.admissionControlService.stats() : null
273273
);
274274
}
275275

server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,13 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.opensearch.cluster.service.ClusterService;
14-
import org.opensearch.common.settings.ClusterSettings;
1514
import org.opensearch.common.settings.Settings;
1615
import org.opensearch.node.ResourceUsageCollectorService;
1716
import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController;
1817
import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController;
1918
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
2019
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats;
21-
import org.opensearch.ratelimitting.admissioncontrol.stats.BaseAdmissionControllerStats;
22-
import org.opensearch.ratelimitting.admissioncontrol.stats.CPUBasedAdmissionControllerStats;
20+
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats;
2321
import org.opensearch.threadpool.ThreadPool;
2422

2523
import java.util.ArrayList;
@@ -47,8 +45,14 @@ public class AdmissionControlService {
4745
* @param settings Immutable settings instance
4846
* @param clusterService ClusterService Instance
4947
* @param threadPool ThreadPool Instance
48+
* @param resourceUsageCollectorService Instance used to get node resource usage stats
5049
*/
51-
public AdmissionControlService(Settings settings, ClusterService clusterService, ThreadPool threadPool, ResourceUsageCollectorService resourceUsageCollectorService) {
50+
public AdmissionControlService(
51+
Settings settings,
52+
ClusterService clusterService,
53+
ThreadPool threadPool,
54+
ResourceUsageCollectorService resourceUsageCollectorService
55+
) {
5256
this.threadPool = threadPool;
5357
this.admissionControlSettings = new AdmissionControlSettings(clusterService.getClusterSettings(), settings);
5458
this.ADMISSION_CONTROLLERS = new ConcurrentHashMap<>();
@@ -68,11 +72,13 @@ private void initialise() {
6872

6973
/**
7074
*
71-
* @param action transport action that is being executed. we are using it for logging while request is rejected
72-
* @param admissionControlActionType type of the admissionControllerActionType
75+
* @param action Transport action name
76+
* @param admissionControlActionType admissionControllerActionType value
7377
*/
7478
public void applyTransportAdmissionControl(String action, AdmissionControlActionType admissionControlActionType) {
75-
this.ADMISSION_CONTROLLERS.forEach((name, admissionController) -> { admissionController.apply(action, admissionControlActionType); });
79+
this.ADMISSION_CONTROLLERS.forEach(
80+
(name, admissionController) -> { admissionController.apply(action, admissionControlActionType); }
81+
);
7682
}
7783

7884
/**
@@ -90,7 +96,12 @@ public void registerAdmissionController(String admissionControllerName) {
9096
private AdmissionController controllerFactory(String admissionControllerName) {
9197
switch (admissionControllerName) {
9298
case CPU_BASED_ADMISSION_CONTROLLER:
93-
return new CPUBasedAdmissionController(admissionControllerName, this.settings, this.clusterService, this.resourceUsageCollectorService);
99+
return new CPUBasedAdmissionController(
100+
admissionControllerName,
101+
this.resourceUsageCollectorService,
102+
this.clusterService,
103+
this.settings
104+
);
94105
default:
95106
throw new IllegalArgumentException("Not Supported AdmissionController : " + admissionControllerName);
96107
}
@@ -113,26 +124,18 @@ public AdmissionController getAdmissionController(String controllerName) {
113124
return this.ADMISSION_CONTROLLERS.getOrDefault(controllerName, null);
114125
}
115126

116-
public AdmissionControlStats stats(){
117-
List<BaseAdmissionControllerStats> statsList = new ArrayList<>();
118-
if(this.ADMISSION_CONTROLLERS.size() > 0){
127+
/**
128+
* Return admission control stats
129+
*/
130+
public AdmissionControlStats stats() {
131+
List<AdmissionControllerStats> statsList = new ArrayList<>();
132+
if (this.ADMISSION_CONTROLLERS.size() > 0) {
119133
this.ADMISSION_CONTROLLERS.forEach((controllerName, admissionController) -> {
120-
BaseAdmissionControllerStats admissionControllerStats = controllerStatsFactory(admissionController);
121-
if(admissionControllerStats != null) {
122-
statsList.add(admissionControllerStats);
123-
}
134+
AdmissionControllerStats admissionControllerStats = new AdmissionControllerStats(admissionController, controllerName);
135+
statsList.add(admissionControllerStats);
124136
});
125137
return new AdmissionControlStats(statsList);
126138
}
127139
return null;
128140
}
129-
130-
private BaseAdmissionControllerStats controllerStatsFactory(AdmissionController admissionController) {
131-
switch (admissionController.getName()) {
132-
case CPU_BASED_ADMISSION_CONTROLLER:
133-
return new CPUBasedAdmissionControllerStats(admissionController);
134-
default:
135-
return null;
136-
}
137-
}
138141
}

server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.opensearch.cluster.service.ClusterService;
1212
import org.opensearch.common.util.concurrent.ConcurrentCollections;
1313
import org.opensearch.node.ResourceUsageCollectorService;
14-
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService;
1514
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
1615
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
1716

@@ -33,11 +32,17 @@ public abstract class AdmissionController {
3332
public final ClusterService clusterService;
3433

3534
/**
36-
* @param rejectionCount initialised rejectionCount value for AdmissionController
37-
* @param admissionControllerName name of the admissionController
35+
* @param admissionControllerName name of the admissionController
36+
* @param resourceUsageCollectorService instance used to get resource usage stats of the node
37+
* @param rejectionCount initialised rejectionCount value for AdmissionController
3838
* @param clusterService
3939
*/
40-
public AdmissionController(AtomicLong rejectionCount, String admissionControllerName, ResourceUsageCollectorService resourceUsageCollectorService, ClusterService clusterService) {
40+
public AdmissionController(
41+
String admissionControllerName,
42+
ResourceUsageCollectorService resourceUsageCollectorService,
43+
AtomicLong rejectionCount,
44+
ClusterService clusterService
45+
) {
4146
this.rejectionCount = rejectionCount;
4247
this.admissionControllerName = admissionControllerName;
4348
this.resourceUsageCollectorService = resourceUsageCollectorService;
@@ -62,8 +67,7 @@ public Boolean isAdmissionControllerEnforced(AdmissionControlMode admissionContr
6267
}
6368

6469
/**
65-
* Increment the tracking-objects and apply the admission control if threshold is breached.
66-
* Mostly applicable while applying admission controller
70+
* Apply admission control based on the resource usage for an action
6771
*/
6872
public abstract void apply(String action, AdmissionControlActionType admissionControlActionType);
6973

@@ -74,9 +78,12 @@ public String getName() {
7478
return this.admissionControllerName;
7579
}
7680

81+
/**
82+
* Add rejection count to the rejection count metric tracked by the admission controller
83+
*/
7784
public void addRejectionCount(String admissionControlActionType, long count) {
7885
AtomicLong updatedCount = new AtomicLong(0);
79-
if(this.rejectionCountMap.containsKey(admissionControlActionType)){
86+
if (this.rejectionCountMap.containsKey(admissionControlActionType)) {
8087
updatedCount.addAndGet(this.rejectionCountMap.get(admissionControlActionType).get());
8188
}
8289
updatedCount.addAndGet(count);
@@ -91,6 +98,9 @@ public long getRejectionCount(String admissionControlActionType) {
9198
return rejectionCount.get();
9299
}
93100

101+
/**
102+
* Get rejection stats of the admission controller
103+
*/
94104
public Map<String, Long> getRejectionStats() {
95105
Map<String, Long> rejectionStats = new HashMap<>();
96106
rejectionCountMap.forEach((actionType, count) -> rejectionStats.put(actionType, count.get()));

server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java

Lines changed: 53 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,8 @@
1616
import org.opensearch.node.NodeResourceUsageStats;
1717
import org.opensearch.node.ResourceUsageCollectorService;
1818
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
19-
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
2019
import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings;
2120

22-
import java.util.HashMap;
23-
import java.util.Map;
2421
import java.util.Optional;
2522
import java.util.concurrent.atomic.AtomicLong;
2623

@@ -33,56 +30,89 @@ public class CPUBasedAdmissionController extends AdmissionController {
3330
public CPUBasedAdmissionControllerSettings settings;
3431

3532
/**
36-
*
37-
* @param admissionControllerName State of the admission controller
33+
* @param admissionControllerName Name of the admission controller
34+
* @param resourceUsageCollectorService Instance used to get node resource usage stats
35+
* @param clusterService ClusterService Instance
36+
* @param settings Immutable settings instance
3837
*/
39-
public CPUBasedAdmissionController(String admissionControllerName, Settings settings, ClusterService clusterService, ResourceUsageCollectorService resourceUsageCollectorService) {
40-
super(new AtomicLong(0), admissionControllerName, resourceUsageCollectorService, clusterService);
38+
public CPUBasedAdmissionController(
39+
String admissionControllerName,
40+
ResourceUsageCollectorService resourceUsageCollectorService,
41+
ClusterService clusterService,
42+
Settings settings
43+
) {
44+
super(admissionControllerName, resourceUsageCollectorService, new AtomicLong(0), clusterService);
4145
this.settings = new CPUBasedAdmissionControllerSettings(clusterService.getClusterSettings(), settings);
4246
}
4347

4448
/**
45-
* This function will take of applying admission controller based on CPU usage
49+
* Apply admission control based on process CPU usage
4650
* @param action is the transport action
4751
*/
4852
@Override
4953
public void apply(String action, AdmissionControlActionType admissionControlActionType) {
50-
// TODO Will extend this logic further currently just incrementing rejectionCount
5154
if (this.isEnabledForTransportLayer(this.settings.getTransportLayerAdmissionControllerMode())) {
5255
this.applyForTransportLayer(action, admissionControlActionType);
5356
}
5457
}
5558

59+
/**
60+
* Apply transport layer admission control if configured limit has been reached
61+
*/
5662
private void applyForTransportLayer(String actionName, AdmissionControlActionType admissionControlActionType) {
57-
if (isLimitsBreached(admissionControlActionType)) {
63+
if (isLimitsBreached(actionName, admissionControlActionType)) {
5864
this.addRejectionCount(admissionControlActionType.getType(), 1);
5965
if (this.isAdmissionControllerEnforced(this.settings.getTransportLayerAdmissionControllerMode())) {
60-
throw new OpenSearchRejectedExecutionException("Action ["+ actionName +"] was rejected due to CPU usage admission controller limit breached");
66+
throw new OpenSearchRejectedExecutionException(
67+
String.format("CPU usage admission controller limit reached for action [%s]", admissionControlActionType.name())
68+
);
6169
}
6270
}
6371
}
6472

65-
private boolean isLimitsBreached(AdmissionControlActionType transportActionType) {
66-
long maxCpuLimit = this.getCpuRejectionThreshold(transportActionType);
67-
Optional<NodeResourceUsageStats> nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics(this.clusterService.state().nodes().getLocalNodeId());
68-
if(nodePerformanceStatistics.isPresent()) {
69-
double cpuUsage = nodePerformanceStatistics.get().getCpuUtilizationPercent();
70-
if (cpuUsage >= maxCpuLimit){
71-
LOGGER.warn("CpuBasedAdmissionController rejected the request as the current CPU usage [" +
72-
cpuUsage + "%] exceeds the allowed limit [" + maxCpuLimit + "%]");
73-
return true;
73+
/**
74+
* Check if the configured resource usage limits are breached for the action
75+
*/
76+
private boolean isLimitsBreached(String actionName, AdmissionControlActionType admissionControlActionType) {
77+
// check if cluster state is ready
78+
if (clusterService.state() != null && clusterService.state().nodes() != null) {
79+
long maxCpuLimit = this.getCpuRejectionThreshold(admissionControlActionType);
80+
Optional<NodeResourceUsageStats> nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics(
81+
this.clusterService.state().nodes().getLocalNodeId()
82+
);
83+
if (nodePerformanceStatistics.isPresent()) {
84+
double cpuUsage = nodePerformanceStatistics.get().getCpuUtilizationPercent();
85+
if (cpuUsage >= maxCpuLimit) {
86+
LOGGER.warn(
87+
"CpuBasedAdmissionController rejected the request as the current CPU "
88+
+ "usage [{}] exceeds the allowed limit [{}] for transport action [{}]",
89+
cpuUsage,
90+
maxCpuLimit,
91+
actionName
92+
);
93+
return true;
94+
}
7495
}
7596
}
7697
return false;
7798
}
78-
private long getCpuRejectionThreshold(AdmissionControlActionType transportActionType) {
79-
switch (transportActionType) {
99+
100+
/**
101+
* Get CPU rejection threshold based on action type
102+
*/
103+
private long getCpuRejectionThreshold(AdmissionControlActionType admissionControlActionType) {
104+
switch (admissionControlActionType) {
80105
case SEARCH:
81106
return this.settings.getSearchCPULimit();
82107
case INDEXING:
83108
return this.settings.getIndexingCPULimit();
84109
default:
85-
throw new IllegalArgumentException("Not Supported TransportAction Type: " + transportActionType.getType());
110+
throw new IllegalArgumentException(
111+
String.format(
112+
"Admission control not Supported for AdmissionControlActionType: %s",
113+
admissionControlActionType.getType()
114+
)
115+
);
86116
}
87117
}
88118
}

0 commit comments

Comments
 (0)