Skip to content

Commit 98c9983

Browse files
poorbarcodelhotari
andauthored
[fix][broker] Consumer stuck when delete subscription __compaction failed (#23980)
Co-authored-by: Lari Hotari <lhotari@apache.org>
1 parent d9519d5 commit 98c9983

4 files changed

Lines changed: 276 additions & 12 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,9 @@ public static boolean isDedupCursorName(String name) {
242242
protected final MessageDeduplication messageDeduplication;
243243

244244
private static final Long COMPACTION_NEVER_RUN = -0xfebecffeL;
245-
private volatile CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(
245+
volatile CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(
246246
COMPACTION_NEVER_RUN);
247+
final AtomicBoolean disablingCompaction = new AtomicBoolean(false);
247248
private TopicCompactionService topicCompactionService;
248249

249250
// TODO: Create compaction strategy from topic policy when exposing strategic compaction to users.
@@ -1340,18 +1341,42 @@ private void asyncDeleteCursorWithCleanCompactionLedger(PersistentSubscription s
13401341
return;
13411342
}
13421343

1343-
currentCompaction.handle((__, e) -> {
1344-
if (e != null) {
1345-
log.warn("[{}][{}] Last compaction task failed", topic, subscriptionName);
1344+
// Avoid concurrently execute compaction and unsubscribing.
1345+
synchronized (this) {
1346+
if (!disablingCompaction.compareAndSet(false, true)) {
1347+
unsubscribeFuture.completeExceptionally(
1348+
new SubscriptionBusyException("the subscription is deleting by another task"));
1349+
return;
13461350
}
1347-
return ((PulsarCompactorSubscription) subscription).cleanCompactedLedger();
1348-
}).whenComplete((__, ex) -> {
1349-
if (ex != null) {
1350-
log.error("[{}][{}] Error cleaning compacted ledger", topic, subscriptionName, ex);
1351-
unsubscribeFuture.completeExceptionally(ex);
1351+
}
1352+
// Unsubscribe compaction cursor and delete compacted ledger.
1353+
currentCompaction.thenCompose(__ -> {
1354+
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
1355+
return unsubscribeFuture;
1356+
}).thenAccept(__ -> {
1357+
try {
1358+
((PulsarCompactorSubscription) subscription).cleanCompactedLedger();
1359+
} catch (Exception ex) {
1360+
Long compactedLedger = null;
1361+
Optional<CompactedTopicContext> compactedTopicContext = getCompactedTopicContext();
1362+
if (compactedTopicContext.isPresent() && compactedTopicContext.get().getLedger() != null) {
1363+
compactedLedger = compactedTopicContext.get().getLedger().getId();
1364+
}
1365+
log.error("[{}][{}][{}] Error cleaning compacted ledger", topic, subscriptionName, compactedLedger, ex);
1366+
} finally {
1367+
// Reset the variable: disablingCompaction,
1368+
disablingCompaction.compareAndSet(true, false);
1369+
}
1370+
}).exceptionally(ex -> {
1371+
if (currentCompaction.isCompletedExceptionally()) {
1372+
log.warn("[{}][{}] Last compaction task failed", topic, subscriptionName);
13521373
} else {
1353-
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
1374+
log.warn("[{}][{}] Failed to delete cursor task failed", topic, subscriptionName);
13541375
}
1376+
// Reset the variable: disablingCompaction,
1377+
disablingCompaction.compareAndSet(true, false);
1378+
unsubscribeFuture.completeExceptionally(ex);
1379+
return null;
13551380
});
13561381
}
13571382

@@ -3941,6 +3966,10 @@ public synchronized void triggerCompaction()
39413966
log.info("[{}] Topic is closing or deleting, skip triggering compaction", topic);
39423967
return;
39433968
}
3969+
if (disablingCompaction.get()) {
3970+
log.info("[{}] Compaction is disabling, skip triggering compaction", topic);
3971+
return;
3972+
}
39443973

39453974
if (strategicCompactionMap.containsKey(topic)) {
39463975
currentCompaction = brokerService.pulsar().getStrategicCompactor()

pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,15 @@ public CompletableFuture<CompactedTopicContext> newCompactedLedger(Position p, l
8383
compactionHorizon = p;
8484

8585
// delete the ledger from the old context once the new one is open
86-
return compactedTopicContext.thenCompose(
87-
__ -> previousContext != null ? previousContext : CompletableFuture.completedFuture(null));
86+
return compactedTopicContext.thenCompose(ctx -> {
87+
if (ctx != null && ctx.getLedger() != null && ctx.getLedger().getId() == compactedLedgerId) {
88+
// Print an error log here, which is not expected.
89+
log.error("[__compaction] Using the same compacted ledger to override the old one, which is not"
90+
+ " expected and it may cause a ledger lost error. {} -> {}", compactedLedgerId,
91+
ctx.getLedger().getId());
92+
}
93+
return previousContext != null ? previousContext : CompletableFuture.completedFuture(null);
94+
});
8895
}
8996
}
9097

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service.persistent;
20+
21+
import static org.testng.Assert.assertEquals;
22+
import static org.testng.Assert.assertTrue;
23+
import static org.testng.Assert.fail;
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.atomic.AtomicBoolean;
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
import java.util.concurrent.atomic.AtomicReference;
28+
import org.apache.bookkeeper.mledger.Position;
29+
import org.apache.pulsar.broker.BrokerTestUtil;
30+
import org.apache.pulsar.client.admin.PulsarAdminException;
31+
import org.apache.pulsar.client.api.MessageId;
32+
import org.apache.pulsar.client.api.ProducerConsumerBase;
33+
import org.apache.pulsar.client.api.Schema;
34+
import org.apache.pulsar.common.naming.TopicName;
35+
import org.apache.pulsar.common.util.FutureUtil;
36+
import org.apache.pulsar.compaction.Compactor;
37+
import org.apache.zookeeper.MockZooKeeper;
38+
import org.awaitility.Awaitility;
39+
import org.awaitility.reflect.WhiteboxImpl;
40+
import org.testng.annotations.AfterClass;
41+
import org.testng.annotations.BeforeClass;
42+
import org.testng.annotations.Test;
43+
44+
@Test(groups = "broker")
45+
public class CompactionConcurrencyTest extends ProducerConsumerBase {
46+
// don't make this over 2000ms, otherwise the test will be flaky due to ZKSessionWatcher
47+
static final int DELETE_OPERATION_DELAY_MS = 1900;
48+
49+
@BeforeClass
50+
@Override
51+
protected void setup() throws Exception {
52+
super.internalSetup();
53+
super.producerBaseSetup();
54+
}
55+
56+
@AfterClass
57+
@Override
58+
protected void cleanup() throws Exception {
59+
super.internalCleanup();
60+
}
61+
62+
@Override
63+
protected void doInitConf() throws Exception {
64+
super.doInitConf();
65+
// Disable the scheduled task: compaction.
66+
conf.setBrokerServiceCompactionMonitorIntervalInSeconds(Integer.MAX_VALUE);
67+
// Disable the scheduled task: retention.
68+
conf.setRetentionCheckIntervalInSeconds(Integer.MAX_VALUE);
69+
}
70+
71+
private void triggerCompactionAndWait(String topicName) throws Exception {
72+
PersistentTopic persistentTopic =
73+
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get();
74+
persistentTopic.triggerCompaction();
75+
Awaitility.await().untilAsserted(() -> {
76+
Position lastConfirmPos = persistentTopic.getManagedLedger().getLastConfirmedEntry();
77+
Position markDeletePos = persistentTopic
78+
.getSubscription(Compactor.COMPACTION_SUBSCRIPTION).getCursor().getMarkDeletedPosition();
79+
assertEquals(markDeletePos.getLedgerId(), lastConfirmPos.getLedgerId());
80+
assertEquals(markDeletePos.getEntryId(), lastConfirmPos.getEntryId());
81+
});
82+
}
83+
84+
@Test
85+
public void testDisableCompactionConcurrently() throws Exception {
86+
String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
87+
admin.topics().createNonPartitionedTopic(topicName);
88+
admin.topicPolicies().setCompactionThreshold(topicName, 1);
89+
admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
90+
var producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
91+
producer.newMessage().key("k0").value("v0").send();
92+
triggerCompactionAndWait(topicName);
93+
admin.topics().deleteSubscription(topicName, "s1");
94+
PersistentTopic persistentTopic =
95+
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get();
96+
AtomicBoolean disablingCompaction = persistentTopic.disablingCompaction;
97+
98+
// Disable compaction.
99+
// Inject a delay when the first time of deleting cursor.
100+
AtomicInteger times = new AtomicInteger();
101+
String cursorPath = String.format("/managed-ledgers/%s/__compaction",
102+
TopicName.get(topicName).getPersistenceNamingEncoding());
103+
admin.topicPolicies().removeCompactionThreshold(topicName);
104+
mockZooKeeper.delay(DELETE_OPERATION_DELAY_MS, (op, path) -> {
105+
return op == MockZooKeeper.Op.DELETE && cursorPath.equals(path) && times.incrementAndGet() == 1;
106+
});
107+
mockZooKeeperGlobal.delay(DELETE_OPERATION_DELAY_MS, (op, path) -> {
108+
return op == MockZooKeeper.Op.DELETE && cursorPath.equals(path) && times.incrementAndGet() == 1;
109+
});
110+
AtomicReference<CompletableFuture<Void>> f1 = new AtomicReference<CompletableFuture<Void>>();
111+
AtomicReference<CompletableFuture<Void>> f2 = new AtomicReference<CompletableFuture<Void>>();
112+
new Thread(() -> {
113+
f1.set(admin.topics().deleteSubscriptionAsync(topicName, "__compaction"));
114+
}).start();
115+
new Thread(() -> {
116+
f2.set(admin.topics().deleteSubscriptionAsync(topicName, "__compaction"));
117+
}).start();
118+
119+
// Verify: the next compaction will be skipped.
120+
Awaitility.await().untilAsserted(() -> {
121+
assertTrue(disablingCompaction.get());
122+
});
123+
producer.newMessage().key("k1").value("v1").send();
124+
producer.newMessage().key("k2").value("v2").send();
125+
CompletableFuture<Long> currentCompaction1 = persistentTopic.currentCompaction;
126+
WhiteboxImpl.getInternalState(persistentTopic, "currentCompaction");
127+
persistentTopic.triggerCompaction();
128+
CompletableFuture<Long> currentCompaction2 = persistentTopic.currentCompaction;
129+
assertTrue(currentCompaction1 == currentCompaction2);
130+
131+
// Verify: one of the requests should fail.
132+
Awaitility.await().untilAsserted(() -> {
133+
assertTrue(f1.get() != null);
134+
assertTrue(f2.get() != null);
135+
assertTrue(f1.get().isDone());
136+
assertTrue(f2.get().isDone());
137+
assertTrue(f1.get().isCompletedExceptionally() || f2.get().isCompletedExceptionally());
138+
assertTrue(!f1.get().isCompletedExceptionally() || !f2.get().isCompletedExceptionally());
139+
});
140+
try {
141+
f1.get().join();
142+
f2.get().join();
143+
fail("Should fail");
144+
} catch (Exception ex) {
145+
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
146+
assertTrue(actEx instanceof PulsarAdminException.PreconditionFailedException);
147+
}
148+
149+
// cleanup.
150+
producer.close();
151+
admin.topics().delete(topicName, false);
152+
}
153+
}

pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,28 +21,36 @@
2121
import static org.testng.Assert.assertEquals;
2222
import static org.testng.Assert.assertFalse;
2323
import static org.testng.Assert.assertNotEquals;
24+
import static org.testng.Assert.assertTrue;
25+
import static org.testng.Assert.fail;
2426
import java.util.ArrayList;
2527
import java.util.List;
2628
import java.util.Optional;
2729
import java.util.concurrent.CompletableFuture;
2830
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicInteger;
2932
import org.apache.bookkeeper.mledger.Position;
3033
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
3134
import org.apache.pulsar.broker.BrokerTestUtil;
3235
import org.apache.pulsar.broker.service.Topic;
3336
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
37+
import org.apache.pulsar.client.admin.PulsarAdminException;
3438
import org.apache.pulsar.client.api.CompressionType;
3539
import org.apache.pulsar.client.api.Consumer;
3640
import org.apache.pulsar.client.api.Message;
3741
import org.apache.pulsar.client.api.MessageId;
3842
import org.apache.pulsar.client.api.Producer;
3943
import org.apache.pulsar.client.api.ProducerBuilder;
4044
import org.apache.pulsar.client.api.ProducerConsumerBase;
45+
import org.apache.pulsar.client.api.Reader;
4146
import org.apache.pulsar.client.api.Schema;
4247
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
4348
import org.apache.pulsar.client.impl.MessageIdImpl;
4449
import org.apache.pulsar.client.impl.ReaderImpl;
50+
import org.apache.pulsar.common.naming.TopicName;
4551
import org.apache.pulsar.common.util.FutureUtil;
52+
import org.apache.zookeeper.KeeperException;
53+
import org.apache.zookeeper.MockZooKeeper;
4654
import org.awaitility.Awaitility;
4755
import org.testng.annotations.AfterClass;
4856
import org.testng.annotations.BeforeClass;
@@ -308,6 +316,73 @@ public void testGetLastMessageIdAfterCompactionWithCompression(boolean enabledBa
308316
admin.topics().delete(topicName, false);
309317
}
310318

319+
@DataProvider
320+
public Object[][] isInjectedCursorDeleteError() {
321+
return new Object[][] {
322+
{false},
323+
{true}
324+
};
325+
}
326+
327+
@Test(dataProvider = "isInjectedCursorDeleteError")
328+
public void testReadMsgsAfterDisableCompaction(boolean isInjectedCursorDeleteError) throws Exception {
329+
String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
330+
admin.topics().createNonPartitionedTopic(topicName);
331+
admin.topicPolicies().setCompactionThreshold(topicName, 1);
332+
admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
333+
var producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
334+
producer.newMessage().key("k0").value("v0").send();
335+
producer.newMessage().key("k1").value("v1").send();
336+
producer.newMessage().key("k2").value("v2").send();
337+
triggerCompactionAndWait(topicName);
338+
admin.topics().deleteSubscription(topicName, "s1");
339+
340+
// Disable compaction.
341+
// Inject a failure that the first time to delete cursor will fail.
342+
if (isInjectedCursorDeleteError) {
343+
AtomicInteger times = new AtomicInteger();
344+
String cursorPath = String.format("/managed-ledgers/%s/__compaction",
345+
TopicName.get(topicName).getPersistenceNamingEncoding());
346+
admin.topicPolicies().removeCompactionThreshold(topicName);
347+
mockZooKeeper.failConditional(KeeperException.Code.SESSIONEXPIRED, (op, path) -> {
348+
return op == MockZooKeeper.Op.DELETE && cursorPath.equals(path) && times.incrementAndGet() == 1;
349+
});
350+
mockZooKeeperGlobal.failConditional(KeeperException.Code.SESSIONEXPIRED, (op, path) -> {
351+
return op == MockZooKeeper.Op.DELETE && cursorPath.equals(path) && times.incrementAndGet() == 1;
352+
});
353+
try {
354+
admin.topics().deleteSubscription(topicName, "__compaction");
355+
fail("Should fail");
356+
} catch (Exception ex) {
357+
assertTrue(ex instanceof PulsarAdminException.ServerSideErrorException);
358+
}
359+
}
360+
361+
// Create a reader with start at earliest.
362+
// Verify: the reader will receive 3 messages.
363+
admin.topics().unload(topicName);
364+
Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topicName).readCompacted(true)
365+
.startMessageId(MessageId.earliest).create();
366+
producer.newMessage().key("k3").value("v3").send();
367+
assertTrue(reader.hasMessageAvailable());
368+
Message<String> m0 = reader.readNext(10, TimeUnit.SECONDS);
369+
assertEquals(m0.getValue(), "v0");
370+
assertTrue(reader.hasMessageAvailable());
371+
Message<String> m1 = reader.readNext(10, TimeUnit.SECONDS);
372+
assertEquals(m1.getValue(), "v1");
373+
assertTrue(reader.hasMessageAvailable());
374+
Message<String> m2 = reader.readNext(10, TimeUnit.SECONDS);
375+
assertEquals(m2.getValue(), "v2");
376+
assertTrue(reader.hasMessageAvailable());
377+
Message<String> m3 = reader.readNext(10, TimeUnit.SECONDS);
378+
assertEquals(m3.getValue(), "v3");
379+
380+
// cleanup.
381+
producer.close();
382+
reader.close();
383+
admin.topics().delete(topicName, false);
384+
}
385+
311386
@Test(dataProvider = "enabledBatch")
312387
public void testGetLastMessageIdAfterCompactionEndWithNullMsg(boolean enabledBatch) throws Exception {
313388
String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");

0 commit comments

Comments
 (0)