Skip to content

Commit 33c0242

Browse files
committed
Enforce Lite exclusive subscription eviction via server-side tombstones
Lite exclusive eviction relied on a fire-and-forget unsubscribe notification. When the client missed it, the old consumer kept pulling, causing double consumption. Add a broker-local, in-memory tombstone keyed by (clientId, lmqName): - Written synchronously on eviction, before client notification. - Checked at the Lite pull entry; rejected pulls reuse "no message available", requiring no client change. - Lifecycle-aligned with the subscription set: cleared on resubscription, full-sync reconciliation, and disconnect/timeout. - Self-heals lost notifications by re-sending unsubscribe when a heartbeat still reports a tombstoned lmqName. Tests: add ExclusiveEvictionTombstonesTest; extend registry and pull-path tests to cover eviction, self-clear, reconciliation, and cleanup.
1 parent 051ba27 commit 33c0242

7 files changed

Lines changed: 592 additions & 0 deletions

File tree

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.broker.lite;
19+
20+
import java.util.Set;
21+
import java.util.concurrent.ConcurrentHashMap;
22+
import org.apache.rocketmq.common.lite.LiteUtil;
23+
24+
/**
25+
* Manages tombstones for exclusive subscription eviction.
26+
* <p>
27+
* When a client is evicted from a liteTopic (exclusive mode), an entry is placed here
28+
* to prevent the evicted client from pulling messages until the tombstone is cleared
29+
* during the next full subscription sync or client removal.
30+
* <p>
31+
* Key format: clientId$lmqName
32+
*/
33+
public class ExclusiveEvictionTombstones {
34+
35+
private static final char KEY_SEPARATOR = LiteUtil.SEPARATOR;
36+
37+
private final Set<String> tombstones = ConcurrentHashMap.newKeySet();
38+
39+
/**
40+
* Check whether a tombstone exists for the given client and lmqName.
41+
*/
42+
public boolean contains(String clientId, String lmqName) {
43+
return tombstones.contains(buildKey(clientId, lmqName));
44+
}
45+
46+
/**
47+
* Add a tombstone for the given client and lmqName.
48+
*/
49+
public void add(String clientId, String lmqName) {
50+
tombstones.add(buildKey(clientId, lmqName));
51+
}
52+
53+
/**
54+
* Remove the tombstone for the given client and lmqName, if present.
55+
*/
56+
public void remove(String clientId, String lmqName) {
57+
tombstones.remove(buildKey(clientId, lmqName));
58+
}
59+
60+
/**
61+
* Remove all tombstones belonging to the specified client.
62+
*/
63+
public void removeAllOf(String clientId) {
64+
String prefix = clientId + KEY_SEPARATOR;
65+
tombstones.removeIf(key -> key.startsWith(prefix));
66+
}
67+
68+
/**
69+
* For a given client, remove tombstones whose lmqName is NOT in the provided active set.
70+
* This is used during full subscription sync to clear stale tombstones.
71+
*/
72+
public void removeStale(String clientId, Set<String> activeLmqNames) {
73+
String prefix = clientId + KEY_SEPARATOR;
74+
tombstones.removeIf(key -> {
75+
if (!key.startsWith(prefix)) {
76+
return false;
77+
}
78+
String lmqName = key.substring(prefix.length());
79+
return !activeLmqNames.contains(lmqName);
80+
});
81+
}
82+
83+
/**
84+
* Return the current number of tombstones (for monitoring/testing).
85+
*/
86+
public int size() {
87+
return tombstones.size();
88+
}
89+
90+
private static String buildKey(String clientId, String lmqName) {
91+
return clientId + KEY_SEPARATOR + lmqName;
92+
}
93+
}

broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistry.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ public interface LiteSubscriptionRegistry {
5050

5151
void cleanSubscription(String lmqName, boolean notifyClient);
5252

53+
boolean hasExclusiveEvictionTombstone(String clientId, String lmqName);
54+
5355
void start();
5456

5557
void shutdown();

broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImpl.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ public class LiteSubscriptionRegistryImpl extends ServiceThread implements LiteS
6161
private final BrokerController brokerController;
6262
private final AbstractLiteLifecycleManager liteLifecycleManager;
6363

64+
private final ExclusiveEvictionTombstones exclusiveEvictionTombstones = new ExclusiveEvictionTombstones();
65+
6466
public LiteSubscriptionRegistryImpl(BrokerController brokerController,
6567
AbstractLiteLifecycleManager liteLifecycleManager) {
6668
this.brokerController = brokerController;
@@ -99,6 +101,10 @@ public void addPartialSubscription(String clientId, String group, String topic,
99101
// First remove the old subscription
100102
if (LiteMetadataUtil.isSubLiteExclusive(group, brokerController)) {
101103
excludeClientByLmqName(clientId, group, lmqName);
104+
// Boundary case: this client may have a stale tombstone from a previous eviction.
105+
// Since it is now actively re-claiming the lmqName, clear its own tombstone so
106+
// subsequent popLiteTopic is not blocked by the stale mark.
107+
exclusiveEvictionTombstones.remove(clientId, lmqName);
102108
}
103109
resetOffset(lmqName, group, clientId, offsetOption);
104110
addTopicGroup(clientGroup, lmqName);
@@ -144,12 +150,31 @@ public void addCompleteSubscription(String clientId, String group, String topic,
144150
thisSub.addLiteTopic(lmqName);
145151
addTopicGroup(clientGroup, lmqName);
146152
});
153+
// Tombstone operations only apply to exclusive groups.
154+
if (LiteMetadataUtil.isSubLiteExclusive(group, brokerController)) {
155+
// Boundary case: if any lmqName in the client's reported full subscription still has
156+
// a tombstone, the previous notifyUnsubscribeLite was likely lost. Re-send the
157+
// unsubscribe notification to drive the client's local state to converge.
158+
lmqNameNew.stream()
159+
.filter(lmqName -> exclusiveEvictionTombstones.contains(clientId, lmqName))
160+
.forEach(lmqName -> {
161+
LOGGER.info("re-notify unsubscribe for tombstoned lmqName, clientId:{}, group:{}, lmqName:{}",
162+
clientId, group, lmqName);
163+
notifyUnsubscribeLite(clientId, group, lmqName);
164+
});
165+
// Clean exclusive-eviction tombstones for liteTopics no longer in the client's full subscription set
166+
exclusiveEvictionTombstones.removeStale(clientId, lmqNameNew);
167+
}
147168
}
148169

149170
@Override
150171
public void removeCompleteSubscription(String clientId) {
151172
clientChannels.remove(clientId);
152173
LiteSubscription thisSub = client2Subscription.remove(clientId);
174+
// Only clean tombstones for exclusive groups.
175+
if (thisSub == null || LiteMetadataUtil.isSubLiteExclusive(thisSub.getGroup(), brokerController)) {
176+
exclusiveEvictionTombstones.removeAllOf(clientId);
177+
}
153178
if (thisSub == null) {
154179
return;
155180
}
@@ -299,6 +324,7 @@ protected void excludeClientByLmqName(String newClientId, String group, String l
299324
client2Subscription.remove(clientGroup.clientId);
300325
}
301326
}
327+
exclusiveEvictionTombstones.add(clientGroup.clientId, lmqName);
302328
notifyUnsubscribeLite(clientGroup.clientId, clientGroup.group, lmqName);
303329
boolean resetOffset = LiteMetadataUtil.isResetOffsetInExclusiveMode(group, brokerController);
304330
LOGGER.info("excludeClientByLmqName group:{}, lmqName:{}, resetOffset:{}, clientId:{} -> {}",
@@ -464,6 +490,16 @@ protected void cleanupExpiredSubscriptions(long checkTimeout) {
464490
LOGGER.info("Remove expired LiteSubscription, topic: {}, group: {}, clientId: {}, timeout: {}ms, expired: {}ms",
465491
topic, group, clientId, checkTimeout, System.currentTimeMillis() - liteSubscription.getUpdateTime());
466492
});
493+
494+
int tombstoneSize = exclusiveEvictionTombstones.size();
495+
if (tombstoneSize > 0) {
496+
LOGGER.info("ExclusiveEvictionTombstones size: {}", tombstoneSize);
497+
}
498+
}
499+
500+
@Override
501+
public boolean hasExclusiveEvictionTombstone(String clientId, String lmqName) {
502+
return exclusiveEvictionTombstones.contains(clientId, lmqName);
467503
}
468504

469505
}

broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.opentelemetry.api.common.Attributes;
2323
import org.apache.rocketmq.broker.BrokerController;
2424
import org.apache.rocketmq.broker.lite.LiteEventDispatcher;
25+
import org.apache.rocketmq.broker.lite.LiteMetadataUtil;
2526
import org.apache.rocketmq.broker.longpolling.PollingResult;
2627
import org.apache.rocketmq.broker.longpolling.PopLiteLongPollingService;
2728
import org.apache.rocketmq.broker.metrics.LiteConsumerLagCalculator;
@@ -253,6 +254,7 @@ public Pair<StringBuilder, GetMessageResult> popByClientId(String clientHost, St
253254
StringBuilder orderCountInfoAll = new StringBuilder();
254255
AtomicLong total = new AtomicLong(0);
255256

257+
boolean isExclusiveGroup = LiteMetadataUtil.isSubLiteExclusive(group, brokerController);
256258
Set<String> processed = new HashSet<>(); // deduplication in one request
257259
Iterator<String> iterator = liteEventDispatcher.getEventIterator(clientId);
258260
while (total.get() < maxNum && iterator.hasNext()) {
@@ -263,6 +265,11 @@ public Pair<StringBuilder, GetMessageResult> popByClientId(String clientHost, St
263265
if (!processed.add(lmqName)) {
264266
continue; // wait for next pop request or re-fetch in current process, here prefer the former approach
265267
}
268+
// Tombstone check: reject pull if this client was evicted from the liteTopic (exclusive mode)
269+
if (isExclusiveGroup && brokerController.getLiteSubscriptionRegistry().hasExclusiveEvictionTombstone(clientId, lmqName)) {
270+
LOGGER.info("popLiteTopic rejected by tombstone: clientId={}, group={}, lmqName={}", clientId, group, lmqName);
271+
continue;
272+
}
266273
Pair<StringBuilder, GetMessageResult> pair = popLiteTopic(parentTopic, clientHost, group, lmqName,
267274
maxNum - total.get(), popTime, invisibleTime, attemptId);
268275
if (null == pair || pair.getObject2().getMessageCount() <= 0) {
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.broker.lite;
19+
20+
import java.util.HashSet;
21+
import java.util.Set;
22+
import org.junit.Before;
23+
import org.junit.Test;
24+
25+
import static org.junit.Assert.assertEquals;
26+
import static org.junit.Assert.assertFalse;
27+
import static org.junit.Assert.assertTrue;
28+
29+
public class ExclusiveEvictionTombstonesTest {
30+
31+
private ExclusiveEvictionTombstones tombstones;
32+
33+
@Before
34+
public void setUp() {
35+
tombstones = new ExclusiveEvictionTombstones();
36+
}
37+
38+
@Test
39+
public void testAddAndContains() {
40+
// empty store
41+
assertFalse(tombstones.contains("client1", "lmq1"));
42+
assertEquals(0, tombstones.size());
43+
44+
// basic add
45+
tombstones.add("client1", "lmq1");
46+
assertTrue(tombstones.contains("client1", "lmq1"));
47+
assertFalse(tombstones.contains("client1", "lmq2")); // same client, different lmq
48+
assertFalse(tombstones.contains("client2", "lmq1")); // different client, same lmq
49+
assertEquals(1, tombstones.size());
50+
51+
// duplicate add is idempotent
52+
tombstones.add("client1", "lmq1");
53+
assertEquals(1, tombstones.size());
54+
}
55+
56+
@Test
57+
public void testRemoveAllOf() {
58+
tombstones.add("client1", "lmq1");
59+
tombstones.add("client1", "lmq2");
60+
tombstones.add("client2", "lmq1");
61+
62+
// non-existent client is no-op
63+
tombstones.removeAllOf("client3");
64+
assertEquals(3, tombstones.size());
65+
66+
// removes only target client
67+
tombstones.removeAllOf("client1");
68+
assertFalse(tombstones.contains("client1", "lmq1"));
69+
assertFalse(tombstones.contains("client1", "lmq2"));
70+
assertTrue(tombstones.contains("client2", "lmq1"));
71+
assertEquals(1, tombstones.size());
72+
}
73+
74+
@Test
75+
public void testRemoveStale() {
76+
tombstones.add("client1", "lmq1");
77+
tombstones.add("client1", "lmq2");
78+
tombstones.add("client1", "lmq3");
79+
tombstones.add("client2", "lmq1");
80+
81+
// removes stale, keeps active, does not affect other clients
82+
Set<String> activeSet = new HashSet<>();
83+
activeSet.add("lmq2");
84+
activeSet.add("lmq3");
85+
tombstones.removeStale("client1", activeSet);
86+
87+
assertFalse(tombstones.contains("client1", "lmq1")); // removed (not in activeSet)
88+
assertTrue(tombstones.contains("client1", "lmq2")); // retained
89+
assertTrue(tombstones.contains("client1", "lmq3")); // retained
90+
assertTrue(tombstones.contains("client2", "lmq1")); // unaffected
91+
assertEquals(3, tombstones.size());
92+
93+
// empty activeSet clears all for that client
94+
tombstones.removeStale("client1", new HashSet<>());
95+
assertFalse(tombstones.contains("client1", "lmq2"));
96+
assertFalse(tombstones.contains("client1", "lmq3"));
97+
assertTrue(tombstones.contains("client2", "lmq1")); // still unaffected
98+
}
99+
100+
@Test
101+
public void testSize() {
102+
assertEquals(0, tombstones.size());
103+
104+
tombstones.add("c1", "l1");
105+
assertEquals(1, tombstones.size());
106+
107+
tombstones.add("c1", "l2");
108+
assertEquals(2, tombstones.size());
109+
110+
tombstones.add("c2", "l1");
111+
assertEquals(3, tombstones.size());
112+
113+
tombstones.removeAllOf("c1");
114+
assertEquals(1, tombstones.size());
115+
}
116+
117+
/**
118+
* Verifies that real RocketMQ clientId formats (containing '@') and lmqName formats
119+
* (e.g. topic@group for wildcard) do not cause cross-client collisions.
120+
* This is the core safety guarantee of using '\0' separator instead of '@'.
121+
*/
122+
@Test
123+
public void testRealClientIdFormat_NoCrossClientCollision() {
124+
// RocketMQ clientId: IP@instanceName vs IP@instanceName@unitName
125+
// clientA is a strict prefix of clientB if '@' were the separator
126+
String clientA = "10.0.0.1@DEFAULT";
127+
String clientB = "10.0.0.1@DEFAULT@unit1";
128+
String lmqPlain = "lmq1";
129+
String lmqWildcard = "parentTopic@wildcardGroup"; // lmqName also contains '@'
130+
131+
tombstones.add(clientA, lmqPlain);
132+
tombstones.add(clientA, lmqWildcard);
133+
tombstones.add(clientB, lmqPlain);
134+
135+
// basic isolation
136+
assertTrue(tombstones.contains(clientA, lmqPlain));
137+
assertTrue(tombstones.contains(clientA, lmqWildcard));
138+
assertTrue(tombstones.contains(clientB, lmqPlain));
139+
assertFalse(tombstones.contains(clientA, "parentTopic")); // partial lmqName no match
140+
assertFalse(tombstones.contains("10.0.0.1@DEFAULT", lmqPlain + "@extra")); // no false match
141+
assertEquals(3, tombstones.size());
142+
143+
// removeStale for clientA does NOT affect clientB
144+
Set<String> activeSet = new HashSet<>();
145+
activeSet.add(lmqWildcard);
146+
tombstones.removeStale(clientA, activeSet);
147+
148+
assertFalse(tombstones.contains(clientA, lmqPlain)); // removed
149+
assertTrue(tombstones.contains(clientA, lmqWildcard)); // retained
150+
assertTrue(tombstones.contains(clientB, lmqPlain)); // unaffected
151+
assertEquals(2, tombstones.size());
152+
153+
// removeAllOf clientA does NOT affect clientB
154+
tombstones.removeAllOf(clientA);
155+
assertFalse(tombstones.contains(clientA, lmqWildcard));
156+
assertTrue(tombstones.contains(clientB, lmqPlain));
157+
assertEquals(1, tombstones.size());
158+
}
159+
}

0 commit comments

Comments
 (0)