Skip to content

Commit 768eea4

Browse files
Addressing comments
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
1 parent e7b0c49 commit 768eea4

14 files changed

Lines changed: 156 additions & 103 deletions

File tree

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
import org.opensearch.monitor.os.OsStats;
5757
import org.opensearch.monitor.process.ProcessStats;
5858
import org.opensearch.node.AdaptiveSelectionStats;
59-
import org.opensearch.node.GlobalPerformanceStats;
59+
import org.opensearch.node.NodesPerformanceStats;
6060
import org.opensearch.script.ScriptCacheStats;
6161
import org.opensearch.script.ScriptStats;
6262
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
@@ -144,7 +144,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
144144
private SearchPipelineStats searchPipelineStats;
145145

146146
@Nullable
147-
private GlobalPerformanceStats globalPerformanceStats;
147+
private NodesPerformanceStats nodesPerformanceStats;
148148

149149
public NodeStats(StreamInput in) throws IOException {
150150
super(in);
@@ -202,10 +202,10 @@ public NodeStats(StreamInput in) throws IOException {
202202
} else {
203203
searchPipelineStats = null;
204204
}
205-
if (in.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.11 when we backport
206-
globalPerformanceStats = in.readOptionalWriteable(GlobalPerformanceStats::new);
205+
if (in.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.12 when we backport
206+
nodesPerformanceStats = in.readOptionalWriteable(NodesPerformanceStats::new);
207207
} else {
208-
globalPerformanceStats = null;
208+
nodesPerformanceStats = null;
209209
}
210210
}
211211

@@ -225,7 +225,7 @@ public NodeStats(
225225
@Nullable DiscoveryStats discoveryStats,
226226
@Nullable IngestStats ingestStats,
227227
@Nullable AdaptiveSelectionStats adaptiveSelectionStats,
228-
@Nullable GlobalPerformanceStats globalPerformanceStats,
228+
@Nullable NodesPerformanceStats nodesPerformanceStats,
229229
@Nullable ScriptCacheStats scriptCacheStats,
230230
@Nullable IndexingPressureStats indexingPressureStats,
231231
@Nullable ShardIndexingPressureStats shardIndexingPressureStats,
@@ -251,7 +251,7 @@ public NodeStats(
251251
this.discoveryStats = discoveryStats;
252252
this.ingestStats = ingestStats;
253253
this.adaptiveSelectionStats = adaptiveSelectionStats;
254-
this.globalPerformanceStats = globalPerformanceStats;
254+
this.nodesPerformanceStats = nodesPerformanceStats;
255255
this.scriptCacheStats = scriptCacheStats;
256256
this.indexingPressureStats = indexingPressureStats;
257257
this.shardIndexingPressureStats = shardIndexingPressureStats;
@@ -356,8 +356,8 @@ public AdaptiveSelectionStats getAdaptiveSelectionStats() {
356356
}
357357

358358
@Nullable
359-
public GlobalPerformanceStats getNodesPerformanceStats() {
360-
return globalPerformanceStats;
359+
public NodesPerformanceStats getNodesPerformanceStats() {
360+
return nodesPerformanceStats;
361361
}
362362

363363
@Nullable
@@ -446,8 +446,8 @@ public void writeTo(StreamOutput out) throws IOException {
446446
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
447447
out.writeOptionalWriteable(searchPipelineStats);
448448
}
449-
if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // TODO : make it 2.11 when we backport
450-
out.writeOptionalWriteable(globalPerformanceStats);
449+
if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.12 when we backport
450+
out.writeOptionalWriteable(nodesPerformanceStats);
451451
}
452452
}
453453

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ public enum Metric {
214214
FILE_CACHE_STATS("file_cache"),
215215
TASK_CANCELLATION("task_cancellation"),
216216
SEARCH_PIPELINE("search_pipeline"),
217-
GLOBAL_PERFORMANCE_STATS("performance_stats");
217+
PERFORMANCE_STATS("performance_stats");
218218

219219
private String metricName;
220220

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
125125
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics),
126126
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics),
127127
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics),
128-
NodesStatsRequest.Metric.GLOBAL_PERFORMANCE_STATS.containedIn(metrics)
128+
NodesStatsRequest.Metric.PERFORMANCE_STATS.containedIn(metrics)
129129
);
130130
}
131131

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,7 +1075,7 @@ protected Node(
10751075
settings,
10761076
clusterService.getClusterSettings()
10771077
);
1078-
final PerfStatsCollectorService perfStatsCollectorService = new PerfStatsCollectorService(
1078+
final PerformanceCollectorService performanceCollectorService = new PerformanceCollectorService(
10791079
nodePerformanceTracker,
10801080
clusterService,
10811081
threadPool
@@ -1102,7 +1102,7 @@ protected Node(
11021102
searchPipelineService,
11031103
fileCache,
11041104
taskCancellationMonitoringService,
1105-
perfStatsCollectorService
1105+
performanceCollectorService
11061106
);
11071107

11081108
final SearchService searchService = newSearchService(
@@ -1223,8 +1223,8 @@ protected Node(
12231223
b.bind(RerouteService.class).toInstance(rerouteService);
12241224
b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
12251225
b.bind(FsHealthService.class).toInstance(fsHealthService);
1226-
b.bind(PerfStatsCollectorService.class).toInstance(perfStatsCollectorService);
12271226
b.bind(NodePerformanceTracker.class).toInstance(nodePerformanceTracker);
1227+
b.bind(PerformanceCollectorService.class).toInstance(performanceCollectorService);
12281228
b.bind(SystemIndices.class).toInstance(systemIndices);
12291229
b.bind(IdentityService.class).toInstance(identityService);
12301230
b.bind(Tracer.class).toInstance(tracer);
@@ -1342,7 +1342,7 @@ public Node start() throws NodeValidationException {
13421342
injector.getInstance(SearchService.class).start();
13431343
injector.getInstance(FsHealthService.class).start();
13441344
injector.getInstance(NodePerformanceTracker.class).start();
1345-
injector.getInstance(PerfStatsCollectorService.class).start();
1345+
injector.getInstance(PerformanceCollectorService.class).start();
13461346
nodeService.getMonitorService().start();
13471347
nodeService.getSearchBackpressureService().start();
13481348
nodeService.getTaskCancellationMonitoringService().start();
@@ -1506,7 +1506,7 @@ private Node stop() {
15061506
injector.getInstance(NodeConnectionsService.class).stop();
15071507
injector.getInstance(FsHealthService.class).stop();
15081508
injector.getInstance(NodePerformanceTracker.class).stop();
1509-
injector.getInstance(PerfStatsCollectorService.class).stop();
1509+
injector.getInstance(PerformanceCollectorService.class).stop();
15101510
nodeService.getMonitorService().stop();
15111511
nodeService.getSearchBackpressureService().stop();
15121512
injector.getInstance(GatewayService.class).stop();
@@ -1572,8 +1572,8 @@ public synchronized void close() throws IOException {
15721572
toClose.add(injector.getInstance(FsHealthService.class));
15731573
toClose.add(() -> stopWatch.stop().start("node_performance_tracker"));
15741574
toClose.add(injector.getInstance(NodePerformanceTracker.class));
1575-
toClose.add(() -> stopWatch.stop().start("perf_stats_collector"));
1576-
toClose.add(injector.getInstance(PerfStatsCollectorService.class));
1575+
toClose.add(() -> stopWatch.stop().start("performance_collector"));
1576+
toClose.add(injector.getInstance(PerformanceCollectorService.class));
15771577
toClose.add(() -> stopWatch.stop().start("gateway"));
15781578
toClose.add(injector.getInstance(GatewayService.class));
15791579
toClose.add(() -> stopWatch.stop().start("search"));

server/src/main/java/org/opensearch/node/NodePerformanceStatistics.java renamed to server/src/main/java/org/opensearch/node/NodePerformanceStats.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,20 @@
1919
* This represents the performance stats of a node along with the timestamp at which the stats object was created
2020
* in the respective node
2121
*/
22-
public class NodePerformanceStatistics implements Writeable {
22+
public class NodePerformanceStats implements Writeable {
2323
final String nodeId;
2424
long timestamp;
2525
double cpuUtilizationPercent;
2626
double memoryUtilizationPercent;
2727

28-
public NodePerformanceStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent, long timestamp) {
28+
public NodePerformanceStats(String nodeId, long timestamp, double memoryUtilizationPercent, double cpuUtilizationPercent) {
2929
this.nodeId = nodeId;
3030
this.cpuUtilizationPercent = cpuUtilizationPercent;
3131
this.memoryUtilizationPercent = memoryUtilizationPercent;
3232
this.timestamp = timestamp;
3333
}
3434

35-
public NodePerformanceStatistics(StreamInput in) throws IOException {
35+
public NodePerformanceStats(StreamInput in) throws IOException {
3636
this.nodeId = in.readString();
3737
this.timestamp = in.readLong();
3838
this.cpuUtilizationPercent = in.readDouble();
@@ -58,12 +58,12 @@ public String toString() {
5858
return sb.toString();
5959
}
6060

61-
NodePerformanceStatistics(NodePerformanceStatistics nodePerformanceStatistics) {
61+
NodePerformanceStats(NodePerformanceStats nodePerformanceStats) {
6262
this(
63-
nodePerformanceStatistics.nodeId,
64-
nodePerformanceStatistics.cpuUtilizationPercent,
65-
nodePerformanceStatistics.memoryUtilizationPercent,
66-
nodePerformanceStatistics.timestamp
63+
nodePerformanceStats.nodeId,
64+
nodePerformanceStats.timestamp,
65+
nodePerformanceStats.memoryUtilizationPercent,
66+
nodePerformanceStats.cpuUtilizationPercent
6767
);
6868
}
6969

server/src/main/java/org/opensearch/node/NodeService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public class NodeService implements Closeable {
8383
private final ScriptService scriptService;
8484
private final HttpServerTransport httpServerTransport;
8585
private final ResponseCollectorService responseCollectorService;
86-
private final PerfStatsCollectorService perfStatsCollectorService;
86+
private final PerformanceCollectorService performanceCollectorService;
8787
private final SearchTransportService searchTransportService;
8888
private final IndexingPressureService indexingPressureService;
8989
private final AggregationUsageService aggregationUsageService;
@@ -116,7 +116,7 @@ public class NodeService implements Closeable {
116116
SearchPipelineService searchPipelineService,
117117
FileCache fileCache,
118118
TaskCancellationMonitoringService taskCancellationMonitoringService,
119-
PerfStatsCollectorService perfStatsCollectorService
119+
PerformanceCollectorService performanceCollectorService
120120
) {
121121
this.settings = settings;
122122
this.threadPool = threadPool;
@@ -139,7 +139,7 @@ public class NodeService implements Closeable {
139139
this.clusterService = clusterService;
140140
this.fileCache = fileCache;
141141
this.taskCancellationMonitoringService = taskCancellationMonitoringService;
142-
this.perfStatsCollectorService = perfStatsCollectorService;
142+
this.performanceCollectorService = performanceCollectorService;
143143
clusterService.addStateApplier(ingestService);
144144
clusterService.addStateApplier(searchPipelineService);
145145
}
@@ -241,7 +241,7 @@ public NodeStats stats(
241241
discoveryStats ? discovery.stats() : null,
242242
ingest ? ingestService.stats() : null,
243243
adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null,
244-
nodesPerfStats ? perfStatsCollectorService.stats() : null,
244+
nodesPerfStats ? performanceCollectorService.stats() : null,
245245
scriptCache ? scriptService.cacheStats() : null,
246246
indexingPressure ? this.indexingPressureService.nodeStats() : null,
247247
shardIndexingPressure ? this.indexingPressureService.shardStats(indices) : null,

server/src/main/java/org/opensearch/node/GlobalPerformanceStats.java renamed to server/src/main/java/org/opensearch/node/NodesPerformanceStats.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,17 @@
2424
* This class represents performance stats such as CPU, Memory and IO resource usage of each node along with the time
2525
* elapsed from when the stats were recorded.
2626
*/
27-
public class GlobalPerformanceStats implements Writeable, ToXContentFragment {
27+
public class NodesPerformanceStats implements Writeable, ToXContentFragment {
2828

2929
// Map of node id to perf stats of the corresponding node.
30-
private final Map<String, NodePerformanceStatistics> nodeIdToPerfStatsMap;
30+
private final Map<String, NodePerformanceStats> nodeIdToPerfStatsMap;
3131

32-
public GlobalPerformanceStats(Map<String, NodePerformanceStatistics> nodeIdToPerfStatsMap) {
32+
public NodesPerformanceStats(Map<String, NodePerformanceStats> nodeIdToPerfStatsMap) {
3333
this.nodeIdToPerfStatsMap = nodeIdToPerfStatsMap;
3434
}
3535

36-
public GlobalPerformanceStats(StreamInput in) throws IOException {
37-
this.nodeIdToPerfStatsMap = in.readMap(StreamInput::readString, NodePerformanceStatistics::new);
36+
public NodesPerformanceStats(StreamInput in) throws IOException {
37+
this.nodeIdToPerfStatsMap = in.readMap(StreamInput::readString, NodePerformanceStats::new);
3838
}
3939

4040
@Override
@@ -45,7 +45,7 @@ public void writeTo(StreamOutput out) throws IOException {
4545
/**
4646
* Returns map of node id to perf stats of the corresponding node.
4747
*/
48-
public Map<String, NodePerformanceStatistics> getNodeIdToNodePerfStatsMap() {
48+
public Map<String, NodePerformanceStats> getNodeIdToNodePerfStatsMap() {
4949
return nodeIdToPerfStatsMap;
5050
}
5151

@@ -54,7 +54,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
5454
builder.startObject("performance_stats");
5555
for (String nodeId : nodeIdToPerfStatsMap.keySet()) {
5656
builder.startObject(nodeId);
57-
NodePerformanceStatistics perfStats = nodeIdToPerfStatsMap.get(nodeId);
57+
NodePerformanceStats perfStats = nodeIdToPerfStatsMap.get(nodeId);
5858
if (perfStats != null) {
5959
builder.field(
6060
"elapsed_time",

server/src/main/java/org/opensearch/node/PerfStatsCollectorService.java renamed to server/src/main/java/org/opensearch/node/PerformanceCollectorService.java

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.opensearch.threadpool.Scheduler;
2222
import org.opensearch.threadpool.ThreadPool;
2323

24-
import java.io.IOException;
2524
import java.util.HashMap;
2625
import java.util.Map;
2726
import java.util.Optional;
@@ -31,24 +30,28 @@
3130
* This collects node level performance statistics such as cpu, memory, IO of each node and makes it available for
3231
* coordinator node to aid in throttling, ranking etc
3332
*/
34-
public class PerfStatsCollectorService extends AbstractLifecycleComponent implements ClusterStateListener {
33+
public class PerformanceCollectorService extends AbstractLifecycleComponent implements ClusterStateListener {
3534

3635
/**
37-
* This refresh interval denotes the polling interval of PerfStatsCollectorService to refresh the performance stats
36+
* This refresh interval denotes the polling interval of PerformanceCollectorService to refresh the performance stats
3837
* from local node
3938
*/
4039
private static long REFRESH_INTERVAL_IN_MILLIS = 1000;
4140

42-
private static final Logger logger = LogManager.getLogger(PerfStatsCollectorService.class);
43-
private final ConcurrentMap<String, NodePerformanceStatistics> nodeIdToPerfStats = ConcurrentCollections.newConcurrentMap();
41+
private static final Logger logger = LogManager.getLogger(PerformanceCollectorService.class);
42+
private final ConcurrentMap<String, NodePerformanceStats> nodeIdToPerfStats = ConcurrentCollections.newConcurrentMap();
4443

4544
private ThreadPool threadPool;
4645
private volatile Scheduler.Cancellable scheduledFuture;
4746

4847
private NodePerformanceTracker nodePerformanceTracker;
4948
private ClusterService clusterService;
5049

51-
public PerfStatsCollectorService(NodePerformanceTracker nodePerformanceTracker, ClusterService clusterService, ThreadPool threadPool) {
50+
public PerformanceCollectorService(
51+
NodePerformanceTracker nodePerformanceTracker,
52+
ClusterService clusterService,
53+
ThreadPool threadPool
54+
) {
5255
this.threadPool = threadPool;
5356
this.nodePerformanceTracker = nodePerformanceTracker;
5457
this.clusterService = clusterService;
@@ -71,10 +74,10 @@ void removeNodePerfStatistics(String nodeId) {
7174
/**
7275
* Collect node performance statistics along with the timestamp
7376
*/
74-
public void collectNodePerfStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent, long timestamp) {
77+
public void collectNodePerfStatistics(String nodeId, long timestamp, double memoryUtilizationPercent, double cpuUtilizationPercent) {
7578
nodeIdToPerfStats.compute(nodeId, (id, nodePerfStats) -> {
7679
if (nodePerfStats == null) {
77-
return new NodePerformanceStatistics(nodeId, cpuUtilizationPercent, memoryUtilizationPercent, timestamp);
80+
return new NodePerformanceStats(nodeId, timestamp, memoryUtilizationPercent, cpuUtilizationPercent);
7881
} else {
7982
nodePerfStats.cpuUtilizationPercent = cpuUtilizationPercent;
8083
nodePerfStats.memoryUtilizationPercent = memoryUtilizationPercent;
@@ -87,9 +90,9 @@ public void collectNodePerfStatistics(String nodeId, double cpuUtilizationPercen
8790
/**
8891
* Get all node statistics which will be used for node stats
8992
*/
90-
public Map<String, NodePerformanceStatistics> getAllNodeStatistics() {
91-
Map<String, NodePerformanceStatistics> nodeStats = new HashMap<>(nodeIdToPerfStats.size());
92-
nodeIdToPerfStats.forEach((nodeId, nodePerfStats) -> { nodeStats.put(nodeId, new NodePerformanceStatistics(nodePerfStats)); });
93+
public Map<String, NodePerformanceStats> getAllNodeStatistics() {
94+
Map<String, NodePerformanceStats> nodeStats = new HashMap<>(nodeIdToPerfStats.size());
95+
nodeIdToPerfStats.forEach((nodeId, nodePerfStats) -> { nodeStats.put(nodeId, new NodePerformanceStats(nodePerfStats)); });
9396
return nodeStats;
9497
}
9598

@@ -98,27 +101,27 @@ public Map<String, NodePerformanceStatistics> getAllNodeStatistics() {
98101
* performance stats information exists for the given node. Returns an empty
99102
* {@code Optional} if the node was not found.
100103
*/
101-
public Optional<NodePerformanceStatistics> getNodeStatistics(final String nodeId) {
102-
return Optional.ofNullable(nodeIdToPerfStats.get(nodeId)).map(perfStats -> new NodePerformanceStatistics(perfStats));
104+
public Optional<NodePerformanceStats> getNodeStatistics(final String nodeId) {
105+
return Optional.ofNullable(nodeIdToPerfStats.get(nodeId)).map(perfStats -> new NodePerformanceStats(perfStats));
103106
}
104107

105108
/**
106109
* Returns collected performance statistics of all nodes
107110
*/
108-
public GlobalPerformanceStats stats() {
109-
return new GlobalPerformanceStats(getAllNodeStatistics());
111+
public NodesPerformanceStats stats() {
112+
return new NodesPerformanceStats(getAllNodeStatistics());
110113
}
111114

112115
/**
113116
* Fetch local node performance statistics and add it to store along with the current timestamp
114117
*/
115-
private void getLocalNodePerformanceStats() {
118+
private void collectLocalNodePerformanceStats() {
116119
if (nodePerformanceTracker.isReady() && clusterService.state() != null) {
117120
collectNodePerfStatistics(
118121
clusterService.state().nodes().getLocalNodeId(),
119-
nodePerformanceTracker.getCpuUtilizationPercent(),
122+
System.currentTimeMillis(),
120123
nodePerformanceTracker.getMemoryUtilizationPercent(),
121-
System.currentTimeMillis()
124+
nodePerformanceTracker.getCpuUtilizationPercent()
122125
);
123126
}
124127
}
@@ -130,9 +133,9 @@ protected void doStart() {
130133
*/
131134
scheduledFuture = threadPool.scheduleWithFixedDelay(() -> {
132135
try {
133-
getLocalNodePerformanceStats();
136+
collectLocalNodePerformanceStats();
134137
} catch (Exception e) {
135-
logger.warn("failure in PerfStatsCollectorService", e);
138+
logger.warn("failure in PerformanceCollectorService", e);
136139
}
137140
}, new TimeValue(REFRESH_INTERVAL_IN_MILLIS), ThreadPool.Names.GENERIC);
138141
}
@@ -145,5 +148,5 @@ protected void doStop() {
145148
}
146149

147150
@Override
148-
protected void doClose() throws IOException {}
151+
protected void doClose() {}
149152
}

0 commit comments

Comments
 (0)