Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import lombok.Setter;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.LedgerOffloader;
Expand Down Expand Up @@ -328,6 +329,11 @@ public class BrokerService implements Closeable {

private final TopicEventsDispatcher topicEventsDispatcher = new TopicEventsDispatcher();
private volatile boolean unloaded = false;
private final Set<ManagedLedger> orphanManagedLedgers = new HashSet<>();
private ScheduledFuture<?> clearOrphanManagedLedgerTask = null;
@VisibleForTesting
@Setter
private int clearOrphanManagedLedgerDelayMs = 1000 * 60; // 1 minute

public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception {
this.pulsar = pulsar;
Expand Down Expand Up @@ -1376,7 +1382,7 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
topicFuture.exceptionally(t -> {
pulsarStats.recordTopicLoadFailed();
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
return null;
return Optional.empty();
});
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
NonPersistentTopic nonPersistentTopic;
Expand Down Expand Up @@ -1663,9 +1669,12 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(),
() -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);

topicFuture.exceptionally(t -> {
topicFuture.exceptionallyAsync(t -> {
pulsarStats.recordTopicLoadFailed();
return null;
if (topics.remove(topic, topicFuture)) {
log.info("Removed topic {} due to failure {}", topic, t.getMessage());
}
return Optional.empty();
});

checkTopicNsOwnership(topic)
Expand Down Expand Up @@ -1871,6 +1880,9 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ "which is not expected. Closing the current one", topic);
}
executor().submit(() -> {
// The managed ledger might be held by a new PersistentTopic
scheduleCloseOrphanManagedLedgers(ledger);
persistentTopic.setCloseManagedLedger(false);
persistentTopic.close().whenComplete((ignore, ex) -> {
topics.remove(topic, topicFuture);
if (ex != null) {
Expand All @@ -1880,6 +1892,9 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
});
});
} else {
synchronized (orphanManagedLedgers) {
orphanManagedLedgers.remove(persistentTopic.getManagedLedger());
}
addTopicToStatsMaps(topicName, persistentTopic);
}
})
Expand Down Expand Up @@ -3813,6 +3828,43 @@ public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory
this.pulsarChannelInitFactory = factory;
}

private void scheduleCloseOrphanManagedLedgers(ManagedLedger managedLedger) {
synchronized (orphanManagedLedgers) {
orphanManagedLedgers.add(managedLedger);
if (clearOrphanManagedLedgerTask != null) {
return;
}
clearOrphanManagedLedgerTask = pulsar.getExecutor().schedule(() -> {
synchronized (orphanManagedLedgers) {
if (!orphanManagedLedgers.isEmpty()) {
log.info("Closing {} orphan managed ledgers", orphanManagedLedgers.size());
orphanManagedLedgers.forEach(ml -> {
ml.asyncClose(new AsyncCallbacks.CloseCallback() {
@Override
public void closeComplete(Object ctx) {
log.info("Closed orphan managed ledger {}", ml.getName());
}

@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.info("Failed to close orphan managed ledger {}", ml.getName(), exception);
}
}, null);
});
orphanManagedLedgers.clear();
}
clearOrphanManagedLedgerTask = null;
}
}, 1, TimeUnit.MINUTES);
}
}

public void setClearOrphanManagedLedgerDelayMs(int clearOrphanManagedLedgerDelayMs) {
synchronized (orphanManagedLedgers) {
this.clearOrphanManagedLedgerDelayMs = clearOrphanManagedLedgerDelayMs;
}
}

@AllArgsConstructor
@Getter
private static class TopicLoadingContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import lombok.Getter;
import lombok.Setter;
import lombok.Value;
import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsOnMetadataServerException;
Expand Down Expand Up @@ -320,6 +321,8 @@ private static class EstimateTimeBasedBacklogQuotaCheckResult {

// The last position that can be dispatched to consumers
private volatile Position lastDispatchablePosition;
@Setter
private volatile boolean closeManagedLedger = true;

/***
* We use 3 futures to prevent a new closing if there is an in-progress deletion or closing. We make Pulsar return
Expand Down Expand Up @@ -1792,7 +1795,11 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
}
}, null));

disconnectClientsInCurrentCall.thenRun(closeLedgerAfterCloseClients).exceptionally(exception -> {
disconnectClientsInCurrentCall.thenRun(() -> {
if (closeManagedLedger) {
closeLedgerAfterCloseClients.run();
}
}).exceptionally(exception -> {
log.error("[{}] Error closing topic", topic, exception);
unfenceTopicToResume();
closeFuture.completeExceptionally(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,7 @@ public void testNoOrphanTopicAfterCreateTimeout() throws Exception {
// Make topic load timeout 5 times.
AtomicInteger timeoutCounter = new AtomicInteger();
for (int i = 0; i < 5; i++) {
mockZooKeeper.delay(topicLoadTimeoutSeconds * 2 * 1000, (op, path) -> {
if (mlPath.equals(path)) {
log.info("Topic load timeout: " + timeoutCounter.incrementAndGet());
return true;
}
return false;
});
delayMetadataOperation(i, tpName);
}

// Load topic.
Expand Down Expand Up @@ -130,15 +124,7 @@ public void testCloseLedgerThatTopicAfterCreateTimeout() throws Exception {
String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2");

// Mock message deduplication recovery speed topicLoadTimeoutSeconds
String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" +
TopicName.get(tpName).getPersistenceNamingEncoding() + "/" + DEDUPLICATION_CURSOR_NAME;
mockZooKeeper.delay(topicLoadTimeoutSeconds * 1000, (op, path) -> {
if (mlPath.equals(path)) {
log.info("Topic load timeout: " + path);
return true;
}
return false;
});
delayMetadataOperation(0, tpName);

// First load topic will trigger timeout
// The first topic load will trigger a timeout. When the topic closes, it will call transactionBuffer.close.
Expand Down Expand Up @@ -169,7 +155,7 @@ public CompletableFuture<Void> closeAsync() {
// Once the first load topic times out, immediately to load the topic again.
Producer<byte[]> producer = pulsarClient.newProducer().topic(tpName).create();
for (int i = 0; i < 10; i++) {
MessageId send = producer.send("msg".getBytes());
MessageId send = producer.sendAsync("msg".getBytes()).get(3, TimeUnit.SECONDS);
Thread.sleep(100);
assertNotNull(send);
}
Expand Down Expand Up @@ -236,7 +222,7 @@ public Object[][] whetherTimeoutOrNot() {
@Test(timeOut = 60 * 1000, dataProvider = "whetherTimeoutOrNot")
public void testCheckOwnerShipFails(boolean injectTimeout) throws Exception {
if (injectTimeout) {
pulsar.getConfig().setTopicLoadTimeoutSeconds(5);
pulsar.getConfig().setTopicLoadTimeoutSeconds(2);
}
String ns = "public" + "/" + UUID.randomUUID().toString().replaceAll("-", "");
String tpName = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp");
Expand All @@ -251,7 +237,7 @@ public void testCheckOwnerShipFails(boolean injectTimeout) throws Exception {
TopicName paramTp = (TopicName) invocation.getArguments()[0];
if (paramTp.toString().equalsIgnoreCase(tpName) && failedTimes.incrementAndGet() <= 2) {
if (injectTimeout) {
Thread.sleep(10 * 1000);
Thread.sleep(conf.getTopicLoadTimeoutSeconds() * 1000 + 500);
}
log.info("Failed {} times", failedTimes.get());
return CompletableFuture.failedFuture(new RuntimeException("mocked error"));
Expand Down Expand Up @@ -313,4 +299,16 @@ public void testTopicLoadAndDeleteAtTheSameTime(boolean injectTimeout) throws Ex
consumer.close();
admin.topics().delete(tpName);
}

private void delayMetadataOperation(int i, String topic) {
final var mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/"
+ TopicName.get(topic).getPersistenceNamingEncoding() + "/" + DEDUPLICATION_CURSOR_NAME;
mockZooKeeper.delay(conf.getTopicLoadTimeoutSeconds() * 1000 + 500, (__, path) -> {
if (mlPath.equals(path)) {
log.info("Topic {} load timeout: {}", i, path);
return true;
}
return false;
});
}
}
Loading