Skip to content

Commit bf75f28

Browse files
fix
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
1 parent 20f5d1a commit bf75f28

12 files changed

Lines changed: 455 additions & 65 deletions

File tree

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.util;
10+
11+
/**
12+
* MovingAverage is used to calculate the moving average of last 'n' observations of double type.
13+
*
14+
* @opensearch.internal
15+
*/
16+
public class DoubleMovingAverage {
17+
private final int windowSize;
18+
private final double[] observations;
19+
20+
private volatile long count = 0;
21+
private volatile double sum = 0.0;
22+
private volatile double average = 0.0;
23+
24+
public DoubleMovingAverage(int windowSize) {
25+
checkWindowSize(windowSize);
26+
this.windowSize = windowSize;
27+
this.observations = new double[windowSize];
28+
}
29+
30+
/**
31+
* Used for changing the window size of {@code MovingAverage}.
32+
*
33+
* @param newWindowSize new window size.
34+
* @return copy of original object with updated size.
35+
*/
36+
public DoubleMovingAverage copyWithSize(int newWindowSize) {
37+
DoubleMovingAverage copy = new DoubleMovingAverage(newWindowSize);
38+
// Start is inclusive, but end is exclusive
39+
long start, end = count;
40+
if (isReady() == false) {
41+
start = 0;
42+
} else {
43+
start = end - windowSize;
44+
}
45+
// If the newWindow Size is smaller than the elements eligible to be copied over, then we adjust the start value
46+
if (end - start > newWindowSize) {
47+
start = end - newWindowSize;
48+
}
49+
for (int i = (int) start; i < end; i++) {
50+
copy.record(observations[i % observations.length]);
51+
}
52+
return copy;
53+
}
54+
55+
private void checkWindowSize(int size) {
56+
if (size <= 0) {
57+
throw new IllegalArgumentException("window size must be greater than zero");
58+
}
59+
}
60+
61+
/**
62+
* Records a new observation and evicts the n-th last observation.
63+
*/
64+
public synchronized double record(double value) {
65+
double delta = value - observations[(int) (count % observations.length)];
66+
observations[(int) (count % observations.length)] = value;
67+
68+
count++;
69+
sum += delta;
70+
average = sum / (double) Math.min(count, observations.length);
71+
return average;
72+
}
73+
74+
public double getAverage() {
75+
return average;
76+
}
77+
78+
public long getCount() {
79+
return count;
80+
}
81+
82+
public boolean isReady() {
83+
return count >= windowSize;
84+
}
85+
}

server/src/main/java/org/opensearch/common/util/MovingAverage.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public class MovingAverage {
1818
private final long[] observations;
1919

2020
private volatile long count = 0;
21-
private volatile long sum = 0;
21+
private volatile double sum = 0;
2222
private volatile double average = 0;
2323

2424
public MovingAverage(int windowSize) {
@@ -67,7 +67,7 @@ public synchronized double record(long value) {
6767

6868
count++;
6969
sum += delta;
70-
average = (double) sum / Math.min(count, observations.length);
70+
average = sum / Math.min(count, observations.length);
7171
return average;
7272
}
7373

server/src/main/java/org/opensearch/monitor/fs/FsInfo.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232

3333
package org.opensearch.monitor.fs;
3434

35+
import org.apache.logging.log4j.LogManager;
36+
import org.apache.logging.log4j.Logger;
3537
import org.opensearch.Version;
3638
import org.opensearch.common.Nullable;
3739
import org.opensearch.core.common.io.stream.StreamInput;
@@ -41,6 +43,7 @@
4143
import org.opensearch.core.xcontent.ToXContentFragment;
4244
import org.opensearch.core.xcontent.ToXContentObject;
4345
import org.opensearch.core.xcontent.XContentBuilder;
46+
import org.opensearch.throttling.tracker.AverageCpuUsageTracker;
4447

4548
import java.io.IOException;
4649
import java.util.Arrays;
@@ -223,6 +226,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
223226
* @opensearch.internal]
224227
*/
225228
public static class DeviceStats implements Writeable, ToXContentFragment {
229+
private static final Logger logger = LogManager.getLogger(DeviceStats.class);
226230

227231
final int majorDeviceNumber;
228232
final int minorDeviceNumber;
@@ -389,11 +393,14 @@ public long operations() {
389393
public long readOperations() {
390394
if (previousReadsCompleted == -1) return -1;
391395

396+
//logger.info("Current reads : {} , Previous reads : {}", currentReadsCompleted, previousReadsCompleted);
397+
392398
return (currentReadsCompleted - previousReadsCompleted);
393399
}
394400

395401
public long writeOperations() {
396402
if (previousWritesCompleted == -1) return -1;
403+
//logger.info("Current writes : {} , Previous writes : {}", currentWritesCompleted, previousWritesCompleted);
397404

398405
return (currentWritesCompleted - previousWritesCompleted);
399406
}
@@ -412,6 +419,14 @@ public long readKilobytes() {
412419
return (currentSectorsRead - previousSectorsRead) / 2;
413420
}
414421

422+
public long getCurrentReadKilobytes() {
423+
return currentSectorsRead / 2;
424+
}
425+
426+
public long getCurrentWriteKilobytes() {
427+
return currentSectorsWritten / 2;
428+
}
429+
415430
public long writeKilobytes() {
416431
if (previousSectorsWritten == -1) return -1;
417432

server/src/main/java/org/opensearch/node/NodePerformanceStatistics.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.opensearch.core.common.io.stream.StreamInput;
1212
import org.opensearch.core.common.io.stream.StreamOutput;
1313
import org.opensearch.core.common.io.stream.Writeable;
14+
import org.opensearch.throttling.tracker.AverageDiskStats;
1415

1516
import java.io.IOException;
1617
import java.util.Locale;
@@ -25,17 +26,22 @@ public class NodePerformanceStatistics implements Writeable {
2526
double cpuUtilizationPercent;
2627
double memoryUtilizationPercent;
2728

28-
public NodePerformanceStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent, long timestamp) {
29+
AverageDiskStats averageDiskStats;
30+
31+
public NodePerformanceStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent,
32+
AverageDiskStats averageDiskStats, long timestamp) {
2933
this.nodeId = nodeId;
3034
this.cpuUtilizationPercent = cpuUtilizationPercent;
3135
this.memoryUtilizationPercent = memoryUtilizationPercent;
36+
this.averageDiskStats = averageDiskStats;
3237
this.timestamp = timestamp;
3338
}
3439

3540
public NodePerformanceStatistics(StreamInput in) throws IOException {
3641
this.nodeId = in.readString();
3742
this.cpuUtilizationPercent = in.readDouble();
3843
this.memoryUtilizationPercent = in.readDouble();
44+
this.averageDiskStats = new AverageDiskStats(in);
3945
this.timestamp = in.readLong();
4046
}
4147

@@ -44,6 +50,7 @@ public void writeTo(StreamOutput out) throws IOException {
4450
out.writeString(this.nodeId);
4551
out.writeDouble(this.cpuUtilizationPercent);
4652
out.writeDouble(this.memoryUtilizationPercent);
53+
this.averageDiskStats.writeTo(out);
4754
out.writeLong(this.timestamp);
4855
}
4956

@@ -63,6 +70,7 @@ public String toString() {
6370
nodePerformanceStatistics.nodeId,
6471
nodePerformanceStatistics.cpuUtilizationPercent,
6572
nodePerformanceStatistics.memoryUtilizationPercent,
73+
nodePerformanceStatistics.averageDiskStats,
6674
nodePerformanceStatistics.timestamp
6775
);
6876
}

server/src/main/java/org/opensearch/node/NodesPerformanceStats.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
6060
"elapsed_time",
6161
new TimeValue(System.currentTimeMillis() - perfStats.timestamp, TimeUnit.MILLISECONDS).toString()
6262
);
63+
perfStats.averageDiskStats.toXContent(builder, params);
6364
}
6465
builder.endObject();
6566
}

server/src/main/java/org/opensearch/node/PerformanceCollectorService.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.cluster.node.DiscoveryNode;
1414
import org.opensearch.cluster.service.ClusterService;
1515
import org.opensearch.common.util.concurrent.ConcurrentCollections;
16+
import org.opensearch.throttling.tracker.AverageDiskStats;
1617

1718
import java.util.HashMap;
1819
import java.util.Map;
@@ -43,13 +44,17 @@ void removeNode(String nodeId) {
4344
nodeIdToPerfStats.remove(nodeId);
4445
}
4546

46-
public void addNodePerfStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent, long timestamp) {
47+
public void addNodePerfStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent,
48+
AverageDiskStats averageDiskStats,
49+
long timestamp) {
4750
nodeIdToPerfStats.compute(nodeId, (id, nodePerfStats) -> {
4851
if (nodePerfStats == null) {
49-
return new NodePerformanceStatistics(nodeId, cpuUtilizationPercent, memoryUtilizationPercent, timestamp);
52+
return new NodePerformanceStatistics(nodeId, cpuUtilizationPercent, memoryUtilizationPercent,
53+
averageDiskStats, timestamp);
5054
} else {
5155
nodePerfStats.cpuUtilizationPercent = cpuUtilizationPercent;
5256
nodePerfStats.memoryUtilizationPercent = memoryUtilizationPercent;
57+
nodePerfStats.averageDiskStats = averageDiskStats;
5358
nodePerfStats.timestamp = timestamp;
5459
return nodePerfStats;
5560
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.throttling.tracker;
10+
11+
import org.opensearch.common.unit.TimeValue;
12+
import org.opensearch.core.common.io.stream.StreamInput;
13+
import org.opensearch.core.common.io.stream.StreamOutput;
14+
import org.opensearch.core.common.io.stream.Writeable;
15+
import org.opensearch.core.xcontent.ToXContent;
16+
import org.opensearch.core.xcontent.XContentBuilder;
17+
18+
import java.io.IOException;
19+
import java.util.Locale;
20+
import java.util.concurrent.TimeUnit;
21+
22+
public class AverageDiskStats implements Writeable {
23+
private final double readIopsAverage;
24+
private final double writeIopsAverage;
25+
private final double readKbAverage;
26+
private final double writeKbAverage;
27+
private final double readLatencyAverage;
28+
private final double writeLatencyAverage;
29+
private final double ioUtilizationPercent;
30+
31+
public AverageDiskStats(double readIopsAverage, double writeIopsAverage, double readKbAverage, double writeKbAverage,
32+
double readLatencyAverage, double writeLatencyAverage, double ioUtilizationPercent) {
33+
this.readIopsAverage = readIopsAverage;
34+
this.writeIopsAverage = writeIopsAverage;
35+
this.readKbAverage = readKbAverage;
36+
this.writeKbAverage = writeKbAverage;
37+
this.readLatencyAverage = readLatencyAverage;
38+
this.writeLatencyAverage = writeLatencyAverage;
39+
this.ioUtilizationPercent = ioUtilizationPercent;
40+
}
41+
42+
public AverageDiskStats(StreamInput in) throws IOException {
43+
this.readIopsAverage = in.readDouble();
44+
this.readKbAverage = in.readDouble();
45+
this.readLatencyAverage = in.readDouble();
46+
this.writeIopsAverage = in.readDouble();
47+
this.writeKbAverage = in.readDouble();
48+
this.writeLatencyAverage = in.readDouble();
49+
this.ioUtilizationPercent = in.readDouble();
50+
}
51+
52+
public double getIoUtilizationPercent() {
53+
return ioUtilizationPercent;
54+
}
55+
56+
public double getReadIopsAverage() {
57+
return readIopsAverage;
58+
}
59+
60+
public double getReadKbAverage() {
61+
return readKbAverage;
62+
}
63+
64+
public double getReadLatencyAverage() {
65+
return readLatencyAverage;
66+
}
67+
68+
public double getWriteIopsAverage() {
69+
return writeIopsAverage;
70+
}
71+
72+
public double getWriteKbAverage() {
73+
return writeKbAverage;
74+
}
75+
76+
public double getWriteLatencyAverage() {
77+
return writeLatencyAverage;
78+
}
79+
80+
@Override
81+
public void writeTo(StreamOutput out) throws IOException {
82+
83+
}
84+
85+
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
86+
builder.startObject("io_stats");
87+
builder.field("read_iops_average", String.format(Locale.ROOT, "%.1f", readIopsAverage ));
88+
builder.field("write_iops_average", String.format(Locale.ROOT, "%.1f", writeIopsAverage));
89+
builder.field("read_throughput_average", String.format(Locale.ROOT, "%.1f", readKbAverage));
90+
builder.field("write_throughput_average", String.format(Locale.ROOT, "%.1f", writeKbAverage));
91+
builder.field("read_latency_average", String.format(Locale.ROOT, "%.8f", readLatencyAverage));
92+
builder.field("write_latency_average", String.format(Locale.ROOT, "%.8f", writeLatencyAverage));
93+
builder.field("io_utilization_percent", String.format(Locale.ROOT, "%.3f", ioUtilizationPercent));
94+
builder.endObject();
95+
return builder;
96+
}
97+
}

0 commit comments

Comments
 (0)