Skip to content

Commit d7eda2f

Browse files
committed
[ISSUE #10387] Support runtime hot-reload of maxClientEventCount in LiteEventDispatcher.ClientEventSet
1 parent 051ba27 commit d7eda2f

3 files changed

Lines changed: 43 additions & 7 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.commons.lang3.tuple.Triple;
2525
import org.apache.rocketmq.broker.BrokerController;
2626
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
27+
import org.apache.rocketmq.common.BrokerConfig;
2728
import org.apache.rocketmq.common.ServiceThread;
2829
import org.apache.rocketmq.common.constant.LoggerName;
2930
import org.apache.rocketmq.common.entity.ClientGroup;
@@ -445,15 +446,36 @@ protected class ClientEventSet {
445446
private final String group;
446447
private volatile long lastAccessTime = System.currentTimeMillis();
447448
private volatile long lastConsumeTime = System.currentTimeMillis();
449+
/**
450+
* Cache resolved max capacity to avoid per-offer SubscriptionGroupConfig lookup + attribute
451+
* parsing on the hot dispatch path. Soft-cap semantics tolerate a short staleness window,
452+
* so refresh lazily by TTL {@link BrokerConfig#getLiteEventCapacityCacheTtlMs()}.
453+
*/
454+
private volatile int maxCapacityCache;
455+
private volatile long capacityRefreshTime = System.currentTimeMillis();
448456

449457
public ClientEventSet(String group) {
450458
this.group = group;
451-
events = new LinkedBlockingQueue<>(LiteMetadataUtil.getMaxClientEventCount(group, brokerController));
459+
// Use a large bounded queue as a hard ceiling; the effective capacity is enforced
460+
// dynamically via soft-cap in offer() so that maxClientEventCount can be changed
461+
// at runtime without restart.
462+
this.events = new LinkedBlockingQueue<>(100_000);
463+
this.maxCapacityCache = LiteMetadataUtil.getMaxClientEventCount(group, brokerController);
464+
}
465+
466+
private int getMaxCapacity() {
467+
long now = System.currentTimeMillis();
468+
long ttl = brokerController.getBrokerConfig().getLiteEventCapacityCacheTtlMs();
469+
if (now - capacityRefreshTime > ttl) {
470+
maxCapacityCache = LiteMetadataUtil.getMaxClientEventCount(group, brokerController);
471+
capacityRefreshTime = now;
472+
}
473+
return maxCapacityCache;
452474
}
453475

454476
// return false if and only if the queue is full, has race condition with poll(), but no side effect.
455477
public boolean offer(String event) {
456-
if (events.remainingCapacity() == 0) {
478+
if (events.size() >= getMaxCapacity()) {
457479
return false;
458480
}
459481
boolean rst;
@@ -486,7 +508,8 @@ public boolean maybeBlock() {
486508

487509
public boolean isLowWaterMark() {
488510
int used = events.size();
489-
return (double) used / (used + events.remainingCapacity()) < LOW_WATER_MARK;
511+
int maxCapacity = getMaxCapacity();
512+
return maxCapacity <= 0 || (double) used / maxCapacity < LOW_WATER_MARK;
490513
}
491514

492515
public boolean isActiveConsuming() {
@@ -516,7 +539,7 @@ public void onUnregister(String clientId, String group, String lmqName) {
516539
}
517540

518541
/**
519-
* Mostly triggered when client channel closed, ensure that lite subscriptions is cleared before.
542+
* Mostly triggered when client channel closed, ensure that lite subscriptions is cleared before.
520543
*/
521544
@Override
522545
public void onRemoveAll(String clientId, String group) {
@@ -553,10 +576,12 @@ public String next() {
553576
static class LiteSubscriptionIterator implements Iterator<String> {
554577
private final Iterator<String> iterator;
555578
private final String parentTopic;
579+
556580
public LiteSubscriptionIterator(String parentTopic, Iterator<String> iterator) {
557581
this.parentTopic = parentTopic;
558582
this.iterator = iterator;
559583
}
584+
560585
@Override
561586
public boolean hasNext() {
562587
return iterator.hasNext();
@@ -572,6 +597,7 @@ protected static class FullDispatchRequest {
572597
private final String clientId;
573598
private final String group;
574599
private final long timestamp;
600+
575601
public FullDispatchRequest(String clientId, String group, long delayMillis) {
576602
this.clientId = clientId;
577603
this.group = group;

broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -580,9 +580,9 @@ public void testDoFullDispatchForClientNormalCase() {
580580
when(consumerOffsetManager.queryOffset(group, lmqName, 0)).thenReturn(50L);
581581

582582
LiteEventDispatcher.ClientEventSet eventSet = spy(liteEventDispatcher.new ClientEventSet(group));
583-
when(eventSet.maybeBlock()).thenReturn(false);
584-
when(eventSet.isLowWaterMark()).thenReturn(true);
585-
when(eventSet.offer(lmqName)).thenReturn(true);
583+
doReturn(false).when(eventSet).maybeBlock();
584+
doReturn(true).when(eventSet).isLowWaterMark();
585+
doReturn(true).when(eventSet).offer(lmqName);
586586

587587
liteEventDispatcher.clientEventMap.put(clientId, eventSet);
588588

common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,8 @@ public class BrokerConfig extends BrokerIdentity {
558558

559559
private int maxClientEventCount = 100;
560560

561+
private long liteEventCapacityCacheTtlMs = 5000;
562+
561563
private long liteEventFullDispatchDelayTime = 10 * 1000;
562564

563565
private long liteEventFullDispatchDelayTimeForWildcardGroup = 10 * 1000;
@@ -2441,6 +2443,14 @@ public void setMaxClientEventCount(int maxClientEventCount) {
24412443
this.maxClientEventCount = maxClientEventCount;
24422444
}
24432445

2446+
public long getLiteEventCapacityCacheTtlMs() {
2447+
return liteEventCapacityCacheTtlMs;
2448+
}
2449+
2450+
public void setLiteEventCapacityCacheTtlMs(long liteEventCapacityCacheTtlMs) {
2451+
this.liteEventCapacityCacheTtlMs = liteEventCapacityCacheTtlMs;
2452+
}
2453+
24442454
public long getLiteEventFullDispatchDelayTime() {
24452455
return liteEventFullDispatchDelayTime;
24462456
}

0 commit comments

Comments
 (0)