diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java index b92af5b7c69f3..9dcba966b0d34 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java @@ -32,6 +32,7 @@ import org.apache.pulsar.policies.data.loadbalancer.BrokerData; import org.apache.pulsar.policies.data.loadbalancer.BundleData; import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; +import org.apache.pulsar.policies.data.loadbalancer.TimeAverageBrokerData; import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData; /** @@ -70,9 +71,11 @@ public Multimap findBundlesForUnloading(final LoadData loadData, MutableDouble minMsgRate = new MutableDouble(Integer.MAX_VALUE); MutableDouble minThroughputRate = new MutableDouble(Integer.MAX_VALUE); brokersData.forEach((broker, data) -> { - double msgRate = data.getLocalData().getMsgRateIn() + data.getLocalData().getMsgRateOut(); - double throughputRate = data.getLocalData().getMsgThroughputIn() - + data.getLocalData().getMsgThroughputOut(); + TimeAverageBrokerData timeAverageData = data.getTimeAverageData(); + double msgRate = timeAverageData.getShortTermMsgRateIn() + + timeAverageData.getShortTermMsgRateOut(); + double throughputRate = timeAverageData.getShortTermMsgThroughputIn() + + timeAverageData.getShortTermMsgThroughputOut(); if (msgRate > maxMsgRate.getValue() || throughputRate > maxThroughputRate.getValue()) { overloadedBroker.setValue(broker); maxMsgRate.setValue(msgRate);