diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 026f8e892df2e..a95403cb6915b 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -655,6 +655,7 @@ public void apply(Settings value, Settings current, Settings previous) { // Settings related to admission control PerformanceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING, PerformanceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING, + PerformanceTrackerSettings.GLOBAL_IO_WINDOW_DURATION_SETTING, // Settings related to Searchable Snapshots Node.NODE_SEARCH_CACHE_SIZE_SETTING, diff --git a/server/src/main/java/org/opensearch/common/util/DoubleMovingAverage.java b/server/src/main/java/org/opensearch/common/util/DoubleMovingAverage.java new file mode 100644 index 0000000000000..1c1494f4ffdb8 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/util/DoubleMovingAverage.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util; + +/** + * MovingAverage is used to calculate the moving average of last 'n' observations of double type. + * + * @opensearch.internal + */ +public class DoubleMovingAverage { + private final int windowSize; + private final double[] observations; + + private volatile long count = 0; + private volatile double sum = 0.0; + private volatile double average = 0.0; + + public DoubleMovingAverage(int windowSize) { + checkWindowSize(windowSize); + this.windowSize = windowSize; + this.observations = new double[windowSize]; + } + + /** + * Used for changing the window size of {@code MovingAverage}. + * + * @param newWindowSize new window size. + * @return copy of original object with updated size. + */ + public DoubleMovingAverage copyWithSize(int newWindowSize) { + DoubleMovingAverage copy = new DoubleMovingAverage(newWindowSize); + // Start is inclusive, but end is exclusive + long start, end = count; + if (isReady() == false) { + start = 0; + } else { + start = end - windowSize; + } + // If the newWindow Size is smaller than the elements eligible to be copied over, then we adjust the start value + if (end - start > newWindowSize) { + start = end - newWindowSize; + } + for (int i = (int) start; i < end; i++) { + copy.record(observations[i % observations.length]); + } + return copy; + } + + private void checkWindowSize(int size) { + if (size <= 0) { + throw new IllegalArgumentException("window size must be greater than zero"); + } + } + + /** + * Records a new observation and evicts the n-th last observation. + */ + public synchronized double record(double value) { + double delta = value - observations[(int) (count % observations.length)]; + observations[(int) (count % observations.length)] = value; + + count++; + sum += delta; + average = sum / (double) Math.min(count, observations.length); + return average; + } + + public double getAverage() { + return average; + } + + public long getCount() { + return count; + } + + public boolean isReady() { + return count >= windowSize; + } +} diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java index 4b0a79783885f..cebe3a0221c32 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java @@ -228,6 +228,7 @@ private void monitorFSHealth() { } Files.delete(tempDataPath); final long elapsedTime = currentTimeMillisSupplier.getAsLong() - executionStartTime; + logger.info("health check took {}", elapsedTime); if (elapsedTime > slowPathLoggingThreshold.millis()) { logger.warn( "health check of [{}] took [{}ms] which is above the warn threshold of [{}]", diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java index 114702ff0d351..e27c1d09bf892 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java @@ -32,6 +32,8 @@ package org.opensearch.monitor.fs; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.core.common.io.stream.StreamInput; @@ -41,6 +43,7 @@ import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.throttling.tracker.AverageCpuUsageTracker; import java.io.IOException; import java.util.Arrays; @@ -220,9 +223,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws /** * The device status. * - * @opensearch.internal + * @opensearch.internal] */ public static class DeviceStats implements Writeable, ToXContentFragment { + private static final Logger logger = LogManager.getLogger(DeviceStats.class); final int majorDeviceNumber; final int minorDeviceNumber; @@ -235,6 +239,12 @@ public static class DeviceStats implements Writeable, ToXContentFragment { final long previousWritesCompleted; final long currentSectorsWritten; final long previousSectorsWritten; + final long currentIOTime; + final long previousIOTime; + final double currentReadTime; + final double previousReadTime; + final double currentWriteTime; + final double previousWriteTime; public DeviceStats( final int majorDeviceNumber, @@ -244,6 +254,9 @@ public DeviceStats( final long currentSectorsRead, final long currentWritesCompleted, final long currentSectorsWritten, + final long currentIOTime, + final double currentReadTime, + final double currentWriteTime, final DeviceStats previousDeviceStats ) { this( @@ -257,7 +270,13 @@ public DeviceStats( currentSectorsRead, previousDeviceStats != null ? previousDeviceStats.currentSectorsRead : -1, currentWritesCompleted, - previousDeviceStats != null ? previousDeviceStats.currentWritesCompleted : -1 + previousDeviceStats != null ? previousDeviceStats.currentWritesCompleted : -1, + currentIOTime, + previousDeviceStats != null ? previousDeviceStats.currentIOTime : -1, + currentReadTime, + previousDeviceStats != null ? previousDeviceStats.previousReadTime : -1.0, + currentWriteTime, + previousDeviceStats != null ? previousDeviceStats.previousWriteTime : -1.0 ); } @@ -272,7 +291,13 @@ private DeviceStats( final long currentSectorsRead, final long previousSectorsRead, final long currentWritesCompleted, - final long previousWritesCompleted + final long previousWritesCompleted, + final long currentIOTime, + final long previousIOTime, + final double currentReadTime, + final double previousReadTime, + final double currentWriteTime, + final double previousWriteTime ) { this.majorDeviceNumber = majorDeviceNumber; this.minorDeviceNumber = minorDeviceNumber; @@ -285,6 +310,12 @@ private DeviceStats( this.previousSectorsRead = previousSectorsRead; this.currentSectorsWritten = currentSectorsWritten; this.previousSectorsWritten = previousSectorsWritten; + this.currentIOTime = currentIOTime; + this.previousIOTime = previousIOTime; + this.currentReadTime = currentReadTime; + this.previousReadTime = previousReadTime; + this.currentWriteTime = currentWriteTime; + this.previousWriteTime = previousWriteTime; } public DeviceStats(StreamInput in) throws IOException { @@ -299,6 +330,12 @@ public DeviceStats(StreamInput in) throws IOException { previousSectorsRead = in.readLong(); currentSectorsWritten = in.readLong(); previousSectorsWritten = in.readLong(); + currentIOTime = in.readLong(); + previousIOTime = in.readLong(); + currentReadTime = in.readDouble(); + previousReadTime = in.readDouble(); + currentWriteTime = in.readDouble(); + previousWriteTime = in.readDouble(); } @Override @@ -314,6 +351,11 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(previousSectorsRead); out.writeLong(currentSectorsWritten); out.writeLong(previousSectorsWritten); + out.writeLong(currentIOTime); + out.writeLong(previousIOTime); + out.writeDouble(currentReadTime); + out.writeDouble(currentWriteTime); + out.writeDouble(previousWriteTime); } public long operations() { @@ -325,27 +367,90 @@ public long operations() { public long readOperations() { if (previousReadsCompleted == -1) return -1; + //logger.info("Current reads : {} , Previous reads : {}", currentReadsCompleted, previousReadsCompleted); + return (currentReadsCompleted - previousReadsCompleted); } public long writeOperations() { if (previousWritesCompleted == -1) return -1; + //logger.info("Current writes : {} , Previous writes : {}", currentWritesCompleted, previousWritesCompleted); return (currentWritesCompleted - previousWritesCompleted); } + public long currentReadOperations() { + return currentReadsCompleted; + } + + public long currentWriteOpetations() { + return currentWritesCompleted; + } + public long readKilobytes() { if (previousSectorsRead == -1) return -1; return (currentSectorsRead - previousSectorsRead) / 2; } + public long getCurrentReadKilobytes() { + return currentSectorsRead / 2; + } + + public long getCurrentWriteKilobytes() { + return currentSectorsWritten / 2; + } + public long writeKilobytes() { if (previousSectorsWritten == -1) return -1; return (currentSectorsWritten - previousSectorsWritten) / 2; } + public long ioTimeInMillis() { + if (previousIOTime == -1) return -1; + + return (currentIOTime - previousIOTime); + } + + public double getNewWriteLatency() { + //double readLatency = getReadTime() / readOperations(); + double writeLatency = getWriteTime() / writeOperations(); + return writeLatency; + } + + public double getNewReadLatency() { + //double readLatency = getReadTime() / readOperations(); + double readLatency = getReadTime() / readOperations(); + return readLatency; + } + + public double getReadTime() { + if(previousReadTime == -1.0) return -1.0; + return currentReadTime - previousReadTime; + } + + public double getWriteTime() { + if(previousWriteTime == -1.0) return -1.0; + return currentWriteTime - previousWriteTime; + } + + public long getCurrentIOTime() { + return this.currentIOTime; + } + + public double getCurrentReadTime() { + return this.currentReadTime; + } + + public double getCurrentWriteTime() { + return this.currentWriteTime; + } + + public String getDeviceName() { + return this.deviceName; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field("device_name", deviceName); @@ -354,6 +459,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(IoStats.WRITE_OPERATIONS, writeOperations()); builder.field(IoStats.READ_KILOBYTES, readKilobytes()); builder.field(IoStats.WRITE_KILOBYTES, writeKilobytes()); + builder.field(IoStats.IO_TIME_MS, ioTimeInMillis()); return builder; } @@ -371,6 +477,7 @@ public static class IoStats implements Writeable, ToXContentFragment { private static final String WRITE_OPERATIONS = "write_operations"; private static final String READ_KILOBYTES = "read_kilobytes"; private static final String WRITE_KILOBYTES = "write_kilobytes"; + private static final String IO_TIME_MS = "io_time_in_millis"; final DeviceStats[] devicesStats; final long totalOperations; @@ -378,6 +485,7 @@ public static class IoStats implements Writeable, ToXContentFragment { final long totalWriteOperations; final long totalReadKilobytes; final long totalWriteKilobytes; + final long totalIOTimeInMillis; public IoStats(final DeviceStats[] devicesStats) { this.devicesStats = devicesStats; @@ -386,18 +494,21 @@ public IoStats(final DeviceStats[] devicesStats) { long totalWriteOperations = 0; long totalReadKilobytes = 0; long totalWriteKilobytes = 0; + long totalIOTimeInMillis = 0; for (DeviceStats deviceStats : devicesStats) { totalOperations += deviceStats.operations() != -1 ? deviceStats.operations() : 0; totalReadOperations += deviceStats.readOperations() != -1 ? deviceStats.readOperations() : 0; totalWriteOperations += deviceStats.writeOperations() != -1 ? deviceStats.writeOperations() : 0; totalReadKilobytes += deviceStats.readKilobytes() != -1 ? deviceStats.readKilobytes() : 0; totalWriteKilobytes += deviceStats.writeKilobytes() != -1 ? deviceStats.writeKilobytes() : 0; + totalIOTimeInMillis += deviceStats.ioTimeInMillis() != -1 ? deviceStats.ioTimeInMillis() : 0; } this.totalOperations = totalOperations; this.totalReadOperations = totalReadOperations; this.totalWriteOperations = totalWriteOperations; this.totalReadKilobytes = totalReadKilobytes; this.totalWriteKilobytes = totalWriteKilobytes; + this.totalIOTimeInMillis = totalIOTimeInMillis; } public IoStats(StreamInput in) throws IOException { @@ -412,6 +523,7 @@ public IoStats(StreamInput in) throws IOException { this.totalWriteOperations = in.readLong(); this.totalReadKilobytes = in.readLong(); this.totalWriteKilobytes = in.readLong(); + this.totalIOTimeInMillis = in.readLong(); } @Override @@ -425,6 +537,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(totalWriteOperations); out.writeLong(totalReadKilobytes); out.writeLong(totalWriteKilobytes); + out.writeLong(totalIOTimeInMillis); } public DeviceStats[] getDevicesStats() { @@ -451,6 +564,10 @@ public long getTotalWriteKilobytes() { return totalWriteKilobytes; } + public long getTotalIOTimeMillis() { + return totalIOTimeInMillis; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { if (devicesStats.length > 0) { @@ -468,6 +585,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(WRITE_OPERATIONS, totalWriteOperations); builder.field(READ_KILOBYTES, totalReadKilobytes); builder.field(WRITE_KILOBYTES, totalWriteKilobytes); + builder.field(IO_TIME_MS, totalIOTimeInMillis); builder.endObject(); } return builder; diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java b/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java index e20d84cd9763e..ba5c222274bd2 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java @@ -123,6 +123,11 @@ final FsInfo.IoStats ioStats(final Set> devicesNumbers, final long sectorsRead = Long.parseLong(fields[5]); final long writesCompleted = Long.parseLong(fields[7]); final long sectorsWritten = Long.parseLong(fields[9]); + final double readTime = Double.parseDouble(fields[6]); + final double writeTime = Double.parseDouble(fields[10]); + final double readLatency = readTime / readsCompleted; + final double writeLatency = writeTime / writesCompleted; + final long ioTime = Long.parseLong(fields[12]); final FsInfo.DeviceStats deviceStats = new FsInfo.DeviceStats( majorDeviceNumber, minorDeviceNumber, @@ -131,6 +136,11 @@ final FsInfo.IoStats ioStats(final Set> devicesNumbers, sectorsRead, writesCompleted, sectorsWritten, + ioTime, + readTime, + writeTime, + readLatency, + writeLatency, deviceMap.get(Tuple.tuple(majorDeviceNumber, minorDeviceNumber)) ); devicesStats.add(deviceStats); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 79009267a93ee..b38ecec444393 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -795,7 +795,8 @@ protected Node( performanceCollectorService, threadPool, settings, - clusterService.getClusterSettings() + clusterService.getClusterSettings(), + monitorService.fsService() ); final AliasValidator aliasValidator = new AliasValidator(); diff --git a/server/src/main/java/org/opensearch/node/NodePerformanceStatistics.java b/server/src/main/java/org/opensearch/node/NodePerformanceStatistics.java index 45a2ca242de68..2119b032bc38b 100644 --- a/server/src/main/java/org/opensearch/node/NodePerformanceStatistics.java +++ b/server/src/main/java/org/opensearch/node/NodePerformanceStatistics.java @@ -11,6 +11,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.throttling.tracker.AverageDiskStats; import java.io.IOException; import java.util.Locale; @@ -25,10 +26,14 @@ public class NodePerformanceStatistics implements Writeable { double cpuUtilizationPercent; double memoryUtilizationPercent; - public NodePerformanceStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent, long timestamp) { + AverageDiskStats averageDiskStats; + + public NodePerformanceStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent, + AverageDiskStats averageDiskStats, long timestamp) { this.nodeId = nodeId; this.cpuUtilizationPercent = cpuUtilizationPercent; this.memoryUtilizationPercent = memoryUtilizationPercent; + this.averageDiskStats = averageDiskStats; this.timestamp = timestamp; } @@ -36,6 +41,7 @@ public NodePerformanceStatistics(StreamInput in) throws IOException { this.nodeId = in.readString(); this.cpuUtilizationPercent = in.readDouble(); this.memoryUtilizationPercent = in.readDouble(); + this.averageDiskStats = new AverageDiskStats(in); this.timestamp = in.readLong(); } @@ -44,6 +50,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(this.nodeId); out.writeDouble(this.cpuUtilizationPercent); out.writeDouble(this.memoryUtilizationPercent); + this.averageDiskStats.writeTo(out); out.writeLong(this.timestamp); } @@ -63,6 +70,7 @@ public String toString() { nodePerformanceStatistics.nodeId, nodePerformanceStatistics.cpuUtilizationPercent, nodePerformanceStatistics.memoryUtilizationPercent, + nodePerformanceStatistics.averageDiskStats, nodePerformanceStatistics.timestamp ); } diff --git a/server/src/main/java/org/opensearch/node/NodesPerformanceStats.java b/server/src/main/java/org/opensearch/node/NodesPerformanceStats.java index 7b1f45514f919..84fdc1acbbcb0 100644 --- a/server/src/main/java/org/opensearch/node/NodesPerformanceStats.java +++ b/server/src/main/java/org/opensearch/node/NodesPerformanceStats.java @@ -60,6 +60,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws "elapsed_time", new TimeValue(System.currentTimeMillis() - perfStats.timestamp, TimeUnit.MILLISECONDS).toString() ); + perfStats.averageDiskStats.toXContent(builder, params); } builder.endObject(); } diff --git a/server/src/main/java/org/opensearch/node/PerformanceCollectorService.java b/server/src/main/java/org/opensearch/node/PerformanceCollectorService.java index ebd448a2ad1a5..bf8fd3fd974d0 100644 --- a/server/src/main/java/org/opensearch/node/PerformanceCollectorService.java +++ b/server/src/main/java/org/opensearch/node/PerformanceCollectorService.java @@ -13,6 +13,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.throttling.tracker.AverageDiskStats; import java.util.HashMap; import java.util.Map; @@ -43,13 +44,17 @@ void removeNode(String nodeId) { nodeIdToPerfStats.remove(nodeId); } - public void addNodePerfStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent, long timestamp) { + public void addNodePerfStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent, + AverageDiskStats averageDiskStats, + long timestamp) { nodeIdToPerfStats.compute(nodeId, (id, nodePerfStats) -> { if (nodePerfStats == null) { - return new NodePerformanceStatistics(nodeId, cpuUtilizationPercent, memoryUtilizationPercent, timestamp); + return new NodePerformanceStatistics(nodeId, cpuUtilizationPercent, memoryUtilizationPercent, + averageDiskStats, timestamp); } else { nodePerfStats.cpuUtilizationPercent = cpuUtilizationPercent; nodePerfStats.memoryUtilizationPercent = memoryUtilizationPercent; + nodePerfStats.averageDiskStats = averageDiskStats; nodePerfStats.timestamp = timestamp; return nodePerfStats; } diff --git a/server/src/main/java/org/opensearch/throttling/tracker/AverageDiskStats.java b/server/src/main/java/org/opensearch/throttling/tracker/AverageDiskStats.java new file mode 100644 index 0000000000000..544dd862a859c --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/tracker/AverageDiskStats.java @@ -0,0 +1,97 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.tracker; + +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Locale; +import java.util.concurrent.TimeUnit; + +public class AverageDiskStats implements Writeable { + private final double readIopsAverage; + private final double writeIopsAverage; + private final double readKbAverage; + private final double writeKbAverage; + private final double readLatencyAverage; + private final double writeLatencyAverage; + private final double ioUtilizationPercent; + + public AverageDiskStats(double readIopsAverage, double writeIopsAverage, double readKbAverage, double writeKbAverage, + double readLatencyAverage, double writeLatencyAverage, double ioUtilizationPercent) { + this.readIopsAverage = readIopsAverage; + this.writeIopsAverage = writeIopsAverage; + this.readKbAverage = readKbAverage; + this.writeKbAverage = writeKbAverage; + this.readLatencyAverage = readLatencyAverage; + this.writeLatencyAverage = writeLatencyAverage; + this.ioUtilizationPercent = ioUtilizationPercent; + } + + public AverageDiskStats(StreamInput in) throws IOException { + this.readIopsAverage = in.readDouble(); + this.readKbAverage = in.readDouble(); + this.readLatencyAverage = in.readDouble(); + this.writeIopsAverage = in.readDouble(); + this.writeKbAverage = in.readDouble(); + this.writeLatencyAverage = in.readDouble(); + this.ioUtilizationPercent = in.readDouble(); + } + + public double getIoUtilizationPercent() { + return ioUtilizationPercent; + } + + public double getReadIopsAverage() { + return readIopsAverage; + } + + public double getReadKbAverage() { + return readKbAverage; + } + + public double getReadLatencyAverage() { + return readLatencyAverage; + } + + public double getWriteIopsAverage() { + return writeIopsAverage; + } + + public double getWriteKbAverage() { + return writeKbAverage; + } + + public double getWriteLatencyAverage() { + return writeLatencyAverage; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + + } + + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject("io_stats"); + builder.field("read_iops_average", String.format(Locale.ROOT, "%.1f", readIopsAverage )); + builder.field("write_iops_average", String.format(Locale.ROOT, "%.1f", writeIopsAverage)); + builder.field("read_throughput_average", String.format(Locale.ROOT, "%.1f", readKbAverage)); + builder.field("write_throughput_average", String.format(Locale.ROOT, "%.1f", writeKbAverage)); + builder.field("read_latency_average", String.format(Locale.ROOT, "%.8f", readLatencyAverage)); + builder.field("write_latency_average", String.format(Locale.ROOT, "%.8f", writeLatencyAverage)); + builder.field("io_utilization_percent", String.format(Locale.ROOT, "%.3f", ioUtilizationPercent)); + builder.endObject(); + return builder; + } +} diff --git a/server/src/main/java/org/opensearch/throttling/tracker/AverageIOUsageTracker.java b/server/src/main/java/org/opensearch/throttling/tracker/AverageIOUsageTracker.java new file mode 100644 index 0000000000000..e7c0f19be60c1 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/tracker/AverageIOUsageTracker.java @@ -0,0 +1,166 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.tracker; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.DoubleMovingAverage; +import org.opensearch.common.util.MovingAverage; +import org.opensearch.monitor.fs.FsService; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.threadpool.ThreadPool; +import java.io.IOException; + +import java.util.concurrent.atomic.AtomicReference; + +public class AverageIOUsageTracker extends AbstractLifecycleComponent { + + private static final Logger logger = LogManager.getLogger(AverageCpuUsageTracker.class); + + private FsService fsService; + + private final ThreadPool threadPool; + + private IoUsageFetcher ioUsageFetcher; + private final TimeValue windowDuration; + private final TimeValue pollingInterval; + private volatile Scheduler.Cancellable scheduledFuture; + private final AtomicReference ioTimeObservations = new AtomicReference<>(); + private final AtomicReference readIopsObservations = new AtomicReference<>(); + private final AtomicReference writeIopsObservations = new AtomicReference<>(); + private final AtomicReference readKbObservations = new AtomicReference<>(); + private final AtomicReference writeKbObservations = new AtomicReference<>(); + private final AtomicReference readLatencyObservations = new AtomicReference<>(); + private final AtomicReference writeLatencyObservations = new AtomicReference<>(); + + private long runs = 1; + + public AverageIOUsageTracker( + ThreadPool threadPool, + TimeValue pollingInterval, + TimeValue windowDuration, + ClusterSettings clusterSettings, + FsService fsService + ) { + //super(threadPool, pollingInterval, windowDuration); + setFsService(fsService); + this.threadPool = threadPool; + this.pollingInterval = pollingInterval; + this.windowDuration = windowDuration; + this.setWindowDuration(windowDuration); + this.ioUsageFetcher = new IoUsageFetcher(fsService); + clusterSettings.addSettingsUpdateConsumer( + PerformanceTrackerSettings.GLOBAL_IO_WINDOW_DURATION_SETTING, + this::setWindowDuration + ); + } + + + + public FsService getFsService() { + return fsService; + } + + public void setFsService(FsService fsService) { + this.fsService = fsService; + } + + public double getIoPercentAverage() { + return ioTimeObservations.get().getAverage(); + } + + public double getReadIopsAverage() { + return readIopsObservations.get().getAverage(); + } + + public double getWriteIopsAverage() { + return writeIopsObservations.get().getAverage(); + } + + public double getReadKbAverage() { + return readKbObservations.get().getAverage(); + } + + public double getWriteKbAverage() { + return writeKbObservations.get().getAverage(); + } + + public double getReadLatencyAverage() { + return readLatencyObservations.get().getAverage(); + } + + public double getWriteLatencyAverage() { + return writeLatencyObservations.get().getAverage(); + } + + public AverageDiskStats getAverageDiskStats() { + return new AverageDiskStats(getReadIopsAverage(), getWriteIopsAverage(), getReadKbAverage(), getWriteKbAverage(), + getReadLatencyAverage(), getWriteLatencyAverage(), getIoPercentAverage()); + } + + public void setWindowDuration(TimeValue windowDuration) { + int windowSize = (int) (windowDuration.nanos() / pollingInterval.nanos()); + logger.debug("updated window size: {}", windowSize); + ioTimeObservations.set(new MovingAverage(windowSize)); + readIopsObservations.set(new MovingAverage(windowSize)); + writeIopsObservations.set(new MovingAverage(windowSize)); + readKbObservations.set(new MovingAverage(windowSize)); + writeKbObservations.set(new MovingAverage(windowSize)); + readLatencyObservations.set(new DoubleMovingAverage(windowSize)); + writeLatencyObservations.set(new DoubleMovingAverage(windowSize)); + } + + private void recordUsage(IoUsageFetcher.DiskStats usage) { + if(usage.getIoTime() == 0.0) { + runs++; + return; + } else { + runs = 1; + } + ioTimeObservations.get().record(usage.getIoTime()); + readIopsObservations.get().record((long)usage.getReadOps()); + readKbObservations.get().record(usage.getReadkb()); + double readOps = usage.getReadOps() < 1 ? 1.0 : usage.getReadOps() * 1.0; + double writeOps = usage.getWriteOps() < 1 ? 1.0 : usage.getWriteOps() * 1.0; + double readTime = usage.getReadTime() < 1 ? 0.0 : usage.getReadTime(); + double writeTime = usage.getWriteTime() < 1 ? 0.0 : usage.getWriteTime(); + double readLatency = (readTime / readOps);// * runs; + double writeLatency = (writeTime/ writeOps);// * runs; + writeLatencyObservations.get().record(writeLatency); + readLatencyObservations.get().record(readLatency); + writeKbObservations.get().record(usage.getWritekb()); + writeIopsObservations.get().record((long) usage.getWriteOps()); + } + + @Override + protected void doStart() { + scheduledFuture = threadPool.scheduleWithFixedDelay(() -> { + IoUsageFetcher.DiskStats usage = getUsage(); + if(usage == null) return; + recordUsage(usage); + }, pollingInterval, ThreadPool.Names.GENERIC); + } + + @Override + protected void doStop() { + if (scheduledFuture != null) { + scheduledFuture.cancel(); + } + } + + @Override + protected void doClose() throws IOException {} + + public IoUsageFetcher.DiskStats getUsage() { + return ioUsageFetcher.getDiskUtilizationStats(); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/tracker/IoUsageFetcher.java b/server/src/main/java/org/opensearch/throttling/tracker/IoUsageFetcher.java new file mode 100644 index 0000000000000..3b7a9713bbe44 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/tracker/IoUsageFetcher.java @@ -0,0 +1,120 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.tracker; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.monitor.fs.FsInfo; +import org.opensearch.monitor.fs.FsService; + +import java.util.HashMap; +import java.util.Map; + +public class IoUsageFetcher { + private static final Logger logger = LogManager.getLogger(AverageCpuUsageTracker.class); + private Map previousIOTimeMap; + private FsService fsService; + public IoUsageFetcher(FsService fsService){ + this.fsService = fsService; + } + + class DiskStats { + public long ioTime; + public double readTime; + public double writeTime; + public double readOps; + public double writeOps; + public long readkb; + public long writekb; + public DiskStats(long ioTime, double readTime, double writeTime, double readOps, double writeOps, long readkb, long writekb) { + this.ioTime = ioTime; + this.readTime = readTime; + this.writeTime = writeTime; + this.readOps = readOps; + this.writeOps = writeOps; + this.readkb = readkb; + this.writekb = writekb; + } + + public long getIoTime() { + return ioTime; + } + + public double getReadOps() { + return readOps; + } + + public double getReadTime() { + return readTime; + } + + public long getReadkb() { + return readkb; + } + + public double getWriteOps() { + return writeOps; + } + + public double getWriteTime() { + return writeTime; + } + + public long getWritekb() { + return writekb; + } + } + public DiskStats getDiskUtilizationStats() { + Map currentIOTimeMap = new HashMap<>(); + long ioUsePercent = 0; + long readkb = 0; + long writekb = 0; + double readTime = 0; + double writeTime = 0; + double readLatency = 0.0; + double writeLatency = 0.0; + double readOps = 0.0; + double writeOps = 0.0; + if(this.fsService.stats().getIoStats() == null) { + return null; + } + for (FsInfo.DeviceStats devicesStat : this.fsService.stats().getIoStats().getDevicesStats()) { + if (previousIOTimeMap != null && previousIOTimeMap.containsKey(devicesStat.getDeviceName())){ + //logger.info(this.fsService.stats().getTimestamp()); + long ioSpentTime = devicesStat.getCurrentIOTime() - previousIOTimeMap.get(devicesStat.getDeviceName()).ioTime; + ioUsePercent = (ioSpentTime * 100) / (1000); + readOps += devicesStat.currentReadOperations() - previousIOTimeMap.get(devicesStat.getDeviceName()).readOps; + writeOps += devicesStat.currentWriteOpetations() - previousIOTimeMap.get(devicesStat.getDeviceName()).writeOps; + readkb += devicesStat.getCurrentReadKilobytes() - previousIOTimeMap.get(devicesStat.getDeviceName()).readkb; + writekb += devicesStat.getCurrentWriteKilobytes() - previousIOTimeMap.get(devicesStat.getDeviceName()).writekb; + readTime += devicesStat.getCurrentReadTime() - previousIOTimeMap.get(devicesStat.getDeviceName()).readTime; + writeTime += devicesStat.getCurrentWriteTime() - previousIOTimeMap.get(devicesStat.getDeviceName()).writeTime; + if(readTime < 1) readTime = 1; + if(readOps < 1) readOps = 1; + if(writeOps < 1) writeOps = 1; + if(writeTime < 1) writeTime = 1; + readLatency += (readTime / readOps); + writeLatency += (writeTime / writeOps); + } + DiskStats ps = new DiskStats(devicesStat.getCurrentIOTime(), devicesStat.getCurrentReadTime(), + devicesStat.getCurrentWriteTime(), devicesStat.currentReadOperations(), devicesStat.currentWriteOpetations(), + devicesStat.getCurrentReadKilobytes(), devicesStat.getCurrentWriteKilobytes()); + currentIOTimeMap.put(devicesStat.getDeviceName(), ps); + } + // logger.info("Read in MB : {} , Write in MB : {}", readkb/1000, writekb/1000); +// readLatency += (readOps / readTime) * 100; +// writeLatency += (writeOps / writeTime) * 100; +// logger.info("read ops : {} , writeops : {} , readtime: {} , writetime: {}", readOps, writeOps, readTime, writeTime); +// logger.info("Read latency : {} write latency : {}" , readLatency, writeLatency); + logger.info("IO use percent : {}", ioUsePercent); + previousIOTimeMap = currentIOTimeMap; + + return new DiskStats(ioUsePercent, readTime, writeTime, readOps, writeOps, readkb, writekb); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/tracker/NodePerformanceTracker.java b/server/src/main/java/org/opensearch/throttling/tracker/NodePerformanceTracker.java index bd71e3a6f2573..641f18ca6774a 100644 --- a/server/src/main/java/org/opensearch/throttling/tracker/NodePerformanceTracker.java +++ b/server/src/main/java/org/opensearch/throttling/tracker/NodePerformanceTracker.java @@ -14,6 +14,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.monitor.fs.FsService; import org.opensearch.node.PerformanceCollectorService; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; @@ -26,29 +27,37 @@ public class NodePerformanceTracker extends AbstractLifecycleComponent { private double cpuUtilizationPercent; private double memoryUtilizationPercent; + + private AverageDiskStats averageDiskStats; private ThreadPool threadPool; private volatile Scheduler.Cancellable scheduledFuture; private final ClusterSettings clusterSettings; private AverageCpuUsageTracker cpuUsageTracker; private AverageMemoryUsageTracker memoryUsageTracker; + + private AverageIOUsageTracker ioUsageTracker; private PerformanceCollectorService performanceCollectorService; private PerformanceTrackerSettings performanceTrackerSettings; private static final Logger logger = LogManager.getLogger(NodePerformanceTracker.class); private final TimeValue interval; + private final FsService fsService; + public static final String LOCAL_NODE = "LOCAL"; public NodePerformanceTracker( PerformanceCollectorService performanceCollectorService, ThreadPool threadPool, Settings settings, - ClusterSettings clusterSettings + ClusterSettings clusterSettings, + FsService fsService ) { this.performanceCollectorService = performanceCollectorService; this.threadPool = threadPool; this.clusterSettings = clusterSettings; this.performanceTrackerSettings = new PerformanceTrackerSettings(settings, clusterSettings); + this.fsService = fsService; interval = new TimeValue(performanceTrackerSettings.getRefreshInterval()); initialize(); } @@ -61,6 +70,18 @@ private double getAverageMemoryUsed() { return memoryUsageTracker.getAverage(); } + private AverageDiskStats getAverageIOUsed() { + return ioUsageTracker.getAverageDiskStats(); + } + + private void setAverageDiskStats(AverageDiskStats averageDiskStats) { + this.averageDiskStats = averageDiskStats; + } + + private AverageDiskStats getAverageDiskStats() { + return averageDiskStats; + } + private void setCpuUtilizationPercent(double cpuUtilizationPercent) { this.cpuUtilizationPercent = cpuUtilizationPercent; } @@ -80,10 +101,12 @@ public double getMemoryUtilizationPercent() { void doRun() { setCpuUtilizationPercent(getAverageCpuUsed()); setMemoryUtilizationPercent(getAverageMemoryUsed()); + setAverageDiskStats(getAverageIOUsed()); performanceCollectorService.addNodePerfStatistics( LOCAL_NODE, getCpuUtilizationPercent(), getMemoryUtilizationPercent(), + getAverageDiskStats(), System.currentTimeMillis() ); } @@ -102,6 +125,14 @@ void initialize() { performanceTrackerSettings.getMemoryWindowDuration(), clusterSettings ); + + ioUsageTracker = new AverageIOUsageTracker( + threadPool, + performanceTrackerSettings.getIoPollingInterval(), + performanceTrackerSettings.getIoWindowDuration(), + clusterSettings, + fsService + ); } @Override @@ -115,6 +146,7 @@ protected void doStart() { }, interval, ThreadPool.Names.GENERIC); cpuUsageTracker.doStart(); memoryUsageTracker.doStart(); + ioUsageTracker.doStart(); } @Override @@ -124,11 +156,13 @@ protected void doStop() { } cpuUsageTracker.doStop(); memoryUsageTracker.doStop(); + ioUsageTracker.doStop(); } @Override protected void doClose() throws IOException { cpuUsageTracker.doClose(); memoryUsageTracker.doClose(); + ioUsageTracker.doClose(); } } diff --git a/server/src/main/java/org/opensearch/throttling/tracker/PerformanceTrackerSettings.java b/server/src/main/java/org/opensearch/throttling/tracker/PerformanceTrackerSettings.java index 8e3aadf2e3604..cce796fd86b96 100644 --- a/server/src/main/java/org/opensearch/throttling/tracker/PerformanceTrackerSettings.java +++ b/server/src/main/java/org/opensearch/throttling/tracker/PerformanceTrackerSettings.java @@ -22,6 +22,9 @@ private static class Defaults { private static final long POLLING_INTERVAL = 500; private static final long WINDOW_DURATION = 30; private static final long REFRESH_INTERVAL = 1000; + + private static final long IO_POLLING_INTERVAL = 1000; + private static final long IO_WINDOW_DURATION = 60; } public static final Setting REFRESH_INTERVAL_MILLIS = Setting.longSetting( @@ -55,21 +58,39 @@ private static class Defaults { Setting.Property.NodeScope ); + public static final Setting GLOBAL_IO_AC_POLLING_INTERVAL_SETTING = Setting.positiveTimeSetting( + "node.perf_tracker.global_io_usage.polling_interval", + TimeValue.timeValueMillis(Defaults.IO_POLLING_INTERVAL), + Setting.Property.NodeScope + ); + public static final Setting GLOBAL_IO_WINDOW_DURATION_SETTING = Setting.positiveTimeSetting( + "node.perf_tracker.global_io_usage.window_duration", + TimeValue.timeValueSeconds(Defaults.IO_WINDOW_DURATION), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + private volatile long refreshInterval; private volatile TimeValue cpuWindowDuration; private volatile TimeValue cpuPollingInterval; private volatile TimeValue memoryWindowDuration; private volatile TimeValue memoryPollingInterval; + private volatile TimeValue ioWindowDuration; + private volatile TimeValue ioPollingInterval; + public PerformanceTrackerSettings(Settings settings, ClusterSettings clusterSettings) { this.cpuPollingInterval = GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING.get(settings); this.cpuWindowDuration = GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.get(settings); this.memoryPollingInterval = GLOBAL_JVM_USAGE_AC_POLLING_INTERVAL_SETTING.get(settings); this.memoryWindowDuration = GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.get(settings); + this.ioWindowDuration = GLOBAL_IO_WINDOW_DURATION_SETTING.get(settings); + this.ioPollingInterval = GLOBAL_IO_AC_POLLING_INTERVAL_SETTING.get(settings); this.refreshInterval = REFRESH_INTERVAL_MILLIS.get(settings); clusterSettings.addSettingsUpdateConsumer(GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING, this::setCpuWindowDuration); clusterSettings.addSettingsUpdateConsumer(GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING, this::setMemoryWindowDuration); + clusterSettings.addSettingsUpdateConsumer(GLOBAL_IO_WINDOW_DURATION_SETTING, this::setIOWindowDuration); } public TimeValue getCpuWindowDuration() { @@ -99,4 +120,18 @@ public void setCpuWindowDuration(TimeValue cpuWindowDuration) { public void setMemoryWindowDuration(TimeValue memoryWindowDuration) { this.memoryWindowDuration = memoryWindowDuration; } + + public void setIOWindowDuration(TimeValue ioWindowDuration) { + this.ioWindowDuration = ioWindowDuration; + } + + public TimeValue getIoWindowDuration() { return ioWindowDuration; } + + public void setIoPollingInterval(TimeValue ioPollingInterval) { + this.ioPollingInterval = ioPollingInterval; + } + + public TimeValue getIoPollingInterval() { + return ioPollingInterval; + } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 06a83e6ffd984..41fd46bfb5c8b 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -32,6 +32,7 @@ package org.opensearch.action.admin.cluster.node.stats; +import org.mockito.Mockito; import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.cluster.coordination.PendingClusterStateStats; @@ -66,6 +67,7 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.ThreadPoolStats; +import org.opensearch.throttling.tracker.AverageDiskStats; import org.opensearch.transport.TransportStats; import java.io.IOException; @@ -786,6 +788,7 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { nodeId, randomDoubleBetween(1.0, 100.0, true), randomDoubleBetween(1.0, 100.0, true), + Mockito.mock(AverageDiskStats.class), System.currentTimeMillis() ); nodePerfStats.put(nodeId, stats); diff --git a/server/src/test/java/org/opensearch/throttling/tracker/NodePerformanceTrackerTests.java b/server/src/test/java/org/opensearch/throttling/tracker/NodePerformanceTrackerTests.java index 9fdd491c12c64..41e03c482d4e4 100644 --- a/server/src/test/java/org/opensearch/throttling/tracker/NodePerformanceTrackerTests.java +++ b/server/src/test/java/org/opensearch/throttling/tracker/NodePerformanceTrackerTests.java @@ -11,6 +11,8 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.monitor.fs.FsService; import org.opensearch.node.NodePerformanceStatistics; import org.opensearch.node.PerformanceCollectorService; import org.opensearch.test.OpenSearchTestCase; @@ -52,7 +54,8 @@ public void testStats() throws InterruptedException { performanceCollectorService, threadPool, Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + mock(FsService.class) ); tracker.start(); Thread.sleep(2000);