Skip to content

Commit b41513f

Browse files
committed
Enforce Lite exclusive subscription eviction via server-side tombstones
Lite exclusive eviction relied on a fire-and-forget unsubscribe notification. When the client missed it, the old consumer kept pulling, causing double consumption. Add a broker-local, in-memory tombstone keyed by (clientId, lmqName): - Written synchronously on eviction, before client notification. - Checked at the Lite pull entry; rejected pulls reuse "no message available", requiring no client change. - Lifecycle-aligned with the subscription set: cleared on resubscription, full-sync reconciliation, and disconnect/timeout. - Self-heals lost notifications by re-sending unsubscribe when a heartbeat still reports a tombstoned lmqName. Tests: add ExclusiveEvictionTombstonesTest; extend registry and pull-path tests to cover eviction, self-clear, reconciliation, and cleanup.
1 parent 051ba27 commit b41513f

6 files changed

Lines changed: 480 additions & 0 deletions

File tree

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.broker.lite;
19+
20+
import java.util.Set;
21+
import java.util.concurrent.ConcurrentHashMap;
22+
23+
/**
24+
* Manages tombstones for exclusive subscription eviction.
25+
* <p>
26+
* When a client is evicted from a liteTopic (exclusive mode), an entry is placed here
27+
* to prevent the evicted client from pulling messages until the tombstone is cleared
28+
* during the next full subscription sync or client removal.
29+
* <p>
30+
* Key format: clientId@lmqName
31+
*/
32+
public class ExclusiveEvictionTombstones {
33+
34+
private final Set<String> tombstones = ConcurrentHashMap.newKeySet();
35+
36+
/**
37+
* Check whether a tombstone exists for the given client and lmqName.
38+
*/
39+
public boolean contains(String clientId, String lmqName) {
40+
return tombstones.contains(buildKey(clientId, lmqName));
41+
}
42+
43+
/**
44+
* Add a tombstone for the given client and lmqName.
45+
*/
46+
public void add(String clientId, String lmqName) {
47+
tombstones.add(buildKey(clientId, lmqName));
48+
}
49+
50+
/**
51+
* Remove the tombstone for the given client and lmqName, if present.
52+
*/
53+
public void remove(String clientId, String lmqName) {
54+
tombstones.remove(buildKey(clientId, lmqName));
55+
}
56+
57+
/**
58+
* Remove all tombstones belonging to the specified client.
59+
*/
60+
public void removeAllOf(String clientId) {
61+
String prefix = clientId + "@";
62+
tombstones.removeIf(key -> key.startsWith(prefix));
63+
}
64+
65+
/**
66+
* For a given client, remove tombstones whose lmqName is NOT in the provided active set.
67+
* This is used during full subscription sync to clear stale tombstones.
68+
*/
69+
public void retainOnly(String clientId, Set<String> activeLmqNames) {
70+
String prefix = clientId + "@";
71+
tombstones.removeIf(key -> {
72+
if (!key.startsWith(prefix)) {
73+
return false;
74+
}
75+
String lmqName = key.substring(prefix.length());
76+
return !activeLmqNames.contains(lmqName);
77+
});
78+
}
79+
80+
/**
81+
* Return the current number of tombstones (for monitoring/testing).
82+
*/
83+
public int size() {
84+
return tombstones.size();
85+
}
86+
87+
private static String buildKey(String clientId, String lmqName) {
88+
return clientId + "@" + lmqName;
89+
}
90+
}

broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistry.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ public interface LiteSubscriptionRegistry {
5050

5151
void cleanSubscription(String lmqName, boolean notifyClient);
5252

53+
boolean hasExclusiveEvictionTombstone(String clientId, String lmqName);
54+
5355
void start();
5456

5557
void shutdown();

broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImpl.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ public class LiteSubscriptionRegistryImpl extends ServiceThread implements LiteS
6161
private final BrokerController brokerController;
6262
private final AbstractLiteLifecycleManager liteLifecycleManager;
6363

64+
private final ExclusiveEvictionTombstones exclusiveEvictionTombstones = new ExclusiveEvictionTombstones();
65+
6466
public LiteSubscriptionRegistryImpl(BrokerController brokerController,
6567
AbstractLiteLifecycleManager liteLifecycleManager) {
6668
this.brokerController = brokerController;
@@ -99,6 +101,10 @@ public void addPartialSubscription(String clientId, String group, String topic,
99101
// First remove the old subscription
100102
if (LiteMetadataUtil.isSubLiteExclusive(group, brokerController)) {
101103
excludeClientByLmqName(clientId, group, lmqName);
104+
// Boundary case: this client may have a stale tombstone from a previous eviction.
105+
// Since it is now actively re-claiming the lmqName, clear its own tombstone so
106+
// subsequent popLiteTopic is not blocked by the stale mark.
107+
exclusiveEvictionTombstones.remove(clientId, lmqName);
102108
}
103109
resetOffset(lmqName, group, clientId, offsetOption);
104110
addTopicGroup(clientGroup, lmqName);
@@ -144,12 +150,31 @@ public void addCompleteSubscription(String clientId, String group, String topic,
144150
thisSub.addLiteTopic(lmqName);
145151
addTopicGroup(clientGroup, lmqName);
146152
});
153+
// Tombstone operations only apply to exclusive groups.
154+
if (LiteMetadataUtil.isSubLiteExclusive(group, brokerController)) {
155+
// Boundary case: if any lmqName in the client's reported full subscription still has
156+
// a tombstone, the previous notifyUnsubscribeLite was likely lost. Re-send the
157+
// unsubscribe notification to drive the client's local state to converge.
158+
lmqNameNew.stream()
159+
.filter(lmqName -> exclusiveEvictionTombstones.contains(clientId, lmqName))
160+
.forEach(lmqName -> {
161+
LOGGER.info("re-notify unsubscribe for tombstoned lmqName, clientId:{}, group:{}, lmqName:{}",
162+
clientId, group, lmqName);
163+
notifyUnsubscribeLite(clientId, group, lmqName);
164+
});
165+
// Clean exclusive-eviction tombstones for liteTopics no longer in the client's full subscription set
166+
exclusiveEvictionTombstones.retainOnly(clientId, lmqNameNew);
167+
}
147168
}
148169

149170
@Override
150171
public void removeCompleteSubscription(String clientId) {
151172
clientChannels.remove(clientId);
152173
LiteSubscription thisSub = client2Subscription.remove(clientId);
174+
// Only clean tombstones for exclusive groups.
175+
if (thisSub == null || LiteMetadataUtil.isSubLiteExclusive(thisSub.getGroup(), brokerController)) {
176+
exclusiveEvictionTombstones.removeAllOf(clientId);
177+
}
153178
if (thisSub == null) {
154179
return;
155180
}
@@ -299,6 +324,7 @@ protected void excludeClientByLmqName(String newClientId, String group, String l
299324
client2Subscription.remove(clientGroup.clientId);
300325
}
301326
}
327+
exclusiveEvictionTombstones.add(clientGroup.clientId, lmqName);
302328
notifyUnsubscribeLite(clientGroup.clientId, clientGroup.group, lmqName);
303329
boolean resetOffset = LiteMetadataUtil.isResetOffsetInExclusiveMode(group, brokerController);
304330
LOGGER.info("excludeClientByLmqName group:{}, lmqName:{}, resetOffset:{}, clientId:{} -> {}",
@@ -464,6 +490,16 @@ protected void cleanupExpiredSubscriptions(long checkTimeout) {
464490
LOGGER.info("Remove expired LiteSubscription, topic: {}, group: {}, clientId: {}, timeout: {}ms, expired: {}ms",
465491
topic, group, clientId, checkTimeout, System.currentTimeMillis() - liteSubscription.getUpdateTime());
466492
});
493+
494+
int tombstoneSize = exclusiveEvictionTombstones.size();
495+
if (tombstoneSize > 0) {
496+
LOGGER.info("ExclusiveEvictionTombstones size: {}", tombstoneSize);
497+
}
498+
}
499+
500+
@Override
501+
public boolean hasExclusiveEvictionTombstone(String clientId, String lmqName) {
502+
return exclusiveEvictionTombstones.contains(clientId, lmqName);
467503
}
468504

469505
}

broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.opentelemetry.api.common.Attributes;
2323
import org.apache.rocketmq.broker.BrokerController;
2424
import org.apache.rocketmq.broker.lite.LiteEventDispatcher;
25+
import org.apache.rocketmq.broker.lite.LiteMetadataUtil;
2526
import org.apache.rocketmq.broker.longpolling.PollingResult;
2627
import org.apache.rocketmq.broker.longpolling.PopLiteLongPollingService;
2728
import org.apache.rocketmq.broker.metrics.LiteConsumerLagCalculator;
@@ -253,6 +254,7 @@ public Pair<StringBuilder, GetMessageResult> popByClientId(String clientHost, St
253254
StringBuilder orderCountInfoAll = new StringBuilder();
254255
AtomicLong total = new AtomicLong(0);
255256

257+
boolean isExclusiveGroup = LiteMetadataUtil.isSubLiteExclusive(group, brokerController);
256258
Set<String> processed = new HashSet<>(); // deduplication in one request
257259
Iterator<String> iterator = liteEventDispatcher.getEventIterator(clientId);
258260
while (total.get() < maxNum && iterator.hasNext()) {
@@ -263,6 +265,11 @@ public Pair<StringBuilder, GetMessageResult> popByClientId(String clientHost, St
263265
if (!processed.add(lmqName)) {
264266
continue; // wait for next pop request or re-fetch in current process, here prefer the former approach
265267
}
268+
// Tombstone check: reject pull if this client was evicted from the liteTopic (exclusive mode)
269+
if (isExclusiveGroup && brokerController.getLiteSubscriptionRegistry().hasExclusiveEvictionTombstone(clientId, lmqName)) {
270+
LOGGER.info("popLiteTopic rejected by tombstone: clientId={}, group={}, lmqName={}", clientId, group, lmqName);
271+
continue;
272+
}
266273
Pair<StringBuilder, GetMessageResult> pair = popLiteTopic(parentTopic, clientHost, group, lmqName,
267274
maxNum - total.get(), popTime, invisibleTime, attemptId);
268275
if (null == pair || pair.getObject2().getMessageCount() <= 0) {

0 commit comments

Comments
 (0)