diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerOrderInfoLockManager.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerOrderInfoLockManager.java index 08569977e0f..7340e4beb55 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerOrderInfoLockManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerOrderInfoLockManager.java @@ -35,18 +35,18 @@ public class QueueLevelConsumerOrderInfoLockManager { private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); - private ConsumerOrderInfoManager consumerOrderInfoManager; private final BrokerController brokerController; private final Map timeoutMap = new ConcurrentHashMap<>(); private final Timer timer; - private static final int TIMER_TICK_MS = 100; public QueueLevelConsumerOrderInfoLockManager(BrokerController brokerController) { this.brokerController = brokerController; + long tickMs = brokerController.getBrokerConfig().getPopOrderLockTimerTickMs(); + int ticksPerWheel = brokerController.getBrokerConfig().getPopOrderLockTimerTicksPerWheel(); this.timer = new HashedWheelTimer( new ThreadFactoryImpl("ConsumerOrderInfoLockManager_"), - TIMER_TICK_MS, TimeUnit.MILLISECONDS); + tickMs, TimeUnit.MILLISECONDS, ticksPerWheel); } /** diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/LiteManagerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/LiteManagerProcessorTest.java index 5518a2fa100..24fe1b9f7b8 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/LiteManagerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/LiteManagerProcessorTest.java @@ -146,6 +146,7 @@ public void setUp() { when(brokerController.getLiteEventDispatcher()).thenReturn(liteEventDispatcher); when(brokerController.getPopLiteMessageProcessor()).thenReturn(popLiteMessageProcessor); when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager); + when(brokerController.getBrokerConfig()).thenReturn(new BrokerConfig()); ConsumerOrderInfoManager consumerOrderInfoManager = new MemoryConsumerOrderInfoManager(brokerController); when(popLiteMessageProcessor.getConsumerOrderInfoManager()).thenReturn(consumerOrderInfoManager); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 08e27a20ee3..c97ff2fc297 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -572,6 +572,10 @@ public class BrokerConfig extends BrokerIdentity { private int liteLagLatencyTopK = 50; + // HashedWheelTimer config for pop order lock manager + private long popOrderLockTimerTickMs = 100; + private int popOrderLockTimerTicksPerWheel = 512; + public String getConfigBlackList() { return configBlackList; } @@ -2489,6 +2493,22 @@ public void setLiteLagLatencyTopK(int liteLagLatencyTopK) { this.liteLagLatencyTopK = liteLagLatencyTopK; } + public long getPopOrderLockTimerTickMs() { + return popOrderLockTimerTickMs; + } + + public void setPopOrderLockTimerTickMs(long popOrderLockTimerTickMs) { + this.popOrderLockTimerTickMs = popOrderLockTimerTickMs; + } + + public int getPopOrderLockTimerTicksPerWheel() { + return popOrderLockTimerTicksPerWheel; + } + + public void setPopOrderLockTimerTicksPerWheel(int popOrderLockTimerTicksPerWheel) { + this.popOrderLockTimerTicksPerWheel = popOrderLockTimerTicksPerWheel; + } + public boolean isUseMessageFilterForNotification() { return useMessageFilterForNotification; }