Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.entity.ClientGroup;
Expand Down Expand Up @@ -445,15 +446,36 @@ protected class ClientEventSet {
private final String group;
private volatile long lastAccessTime = System.currentTimeMillis();
private volatile long lastConsumeTime = System.currentTimeMillis();
/**
* Cache resolved max capacity to avoid per-offer SubscriptionGroupConfig lookup + attribute
* parsing on the hot dispatch path. Soft-cap semantics tolerate a short staleness window,
* so refresh lazily by TTL {@link BrokerConfig#getLiteEventCapacityCacheTtlMs()}.
*/
private volatile int maxCapacityCache;
private volatile long capacityRefreshTime = System.currentTimeMillis();

public ClientEventSet(String group) {
this.group = group;
events = new LinkedBlockingQueue<>(LiteMetadataUtil.getMaxClientEventCount(group, brokerController));
// Use a large bounded queue as a hard ceiling; the effective capacity is enforced
// dynamically via soft-cap in offer() so that maxClientEventCount can be changed
// at runtime without restart.
this.events = new LinkedBlockingQueue<>(100_000);
this.maxCapacityCache = LiteMetadataUtil.getMaxClientEventCount(group, brokerController);
}

private int getMaxCapacity() {
long now = System.currentTimeMillis();
long ttl = brokerController.getBrokerConfig().getLiteEventCapacityCacheTtlMs();
if (now - capacityRefreshTime > ttl) {
maxCapacityCache = LiteMetadataUtil.getMaxClientEventCount(group, brokerController);
capacityRefreshTime = now;
}
return maxCapacityCache;
}

// return false if and only if the queue is full, has race condition with poll(), but no side effect.
public boolean offer(String event) {
if (events.remainingCapacity() == 0) {
if (events.size() >= getMaxCapacity()) {
return false;
}
boolean rst;
Expand Down Expand Up @@ -486,7 +508,8 @@ public boolean maybeBlock() {

public boolean isLowWaterMark() {
int used = events.size();
return (double) used / (used + events.remainingCapacity()) < LOW_WATER_MARK;
int maxCapacity = getMaxCapacity();
return maxCapacity <= 0 || (double) used / maxCapacity < LOW_WATER_MARK;
}

public boolean isActiveConsuming() {
Expand Down Expand Up @@ -516,7 +539,7 @@ public void onUnregister(String clientId, String group, String lmqName) {
}

/**
* Mostly triggered when client channel closed, ensure that lite subscriptions is cleared before.
* Mostly triggered when client channel closed, ensure that lite subscriptions is cleared before.
*/
@Override
public void onRemoveAll(String clientId, String group) {
Expand Down Expand Up @@ -553,10 +576,12 @@ public String next() {
static class LiteSubscriptionIterator implements Iterator<String> {
private final Iterator<String> iterator;
private final String parentTopic;

public LiteSubscriptionIterator(String parentTopic, Iterator<String> iterator) {
this.parentTopic = parentTopic;
this.iterator = iterator;
}

@Override
public boolean hasNext() {
return iterator.hasNext();
Expand All @@ -572,6 +597,7 @@ protected static class FullDispatchRequest {
private final String clientId;
private final String group;
private final long timestamp;

public FullDispatchRequest(String clientId, String group, long delayMillis) {
this.clientId = clientId;
this.group = group;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,9 +580,9 @@ public void testDoFullDispatchForClientNormalCase() {
when(consumerOffsetManager.queryOffset(group, lmqName, 0)).thenReturn(50L);

LiteEventDispatcher.ClientEventSet eventSet = spy(liteEventDispatcher.new ClientEventSet(group));
when(eventSet.maybeBlock()).thenReturn(false);
when(eventSet.isLowWaterMark()).thenReturn(true);
when(eventSet.offer(lmqName)).thenReturn(true);
doReturn(false).when(eventSet).maybeBlock();
doReturn(true).when(eventSet).isLowWaterMark();
doReturn(true).when(eventSet).offer(lmqName);

liteEventDispatcher.clientEventMap.put(clientId, eventSet);

Expand Down
10 changes: 10 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,8 @@ public class BrokerConfig extends BrokerIdentity {

private int maxClientEventCount = 100;

private long liteEventCapacityCacheTtlMs = 5000;

private long liteEventFullDispatchDelayTime = 10 * 1000;

private long liteEventFullDispatchDelayTimeForWildcardGroup = 10 * 1000;
Expand Down Expand Up @@ -2441,6 +2443,14 @@ public void setMaxClientEventCount(int maxClientEventCount) {
this.maxClientEventCount = maxClientEventCount;
}

public long getLiteEventCapacityCacheTtlMs() {
return liteEventCapacityCacheTtlMs;
}

public void setLiteEventCapacityCacheTtlMs(long liteEventCapacityCacheTtlMs) {
this.liteEventCapacityCacheTtlMs = liteEventCapacityCacheTtlMs;
}

public long getLiteEventFullDispatchDelayTime() {
return liteEventFullDispatchDelayTime;
}
Expand Down
Loading