Skip to content

Commit 6e3d5d8

Browse files
[fix][broker][branch-4.0] Fix failed testFinishTakeSnapshotWhenTopicLoading due to topic future cache conflicts (#24947)
1 parent 74931c9 commit 6e3d5d8

1 file changed

Lines changed: 25 additions & 26 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,31 +1117,7 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
11171117
// The topic level policies are not needed now, but the meaning of calling
11181118
// "getTopicPoliciesBypassSystemTopic" will wait for system topic policies initialization.
11191119
getTopicPoliciesBypassSystemTopic(topicName, TopicPoliciesService.GetType.LOCAL_ONLY)
1120-
.thenCompose(optionalTopicPolicies -> {
1121-
if (topicName.isPartitioned()) {
1122-
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
1123-
return fetchPartitionedTopicMetadataAsync(topicNameEntity)
1124-
.thenCompose((metadata) -> {
1125-
// Allow creating non-partitioned persistent topic that name includes
1126-
// `partition`
1127-
if (metadata.partitions == 0
1128-
|| topicName.getPartitionIndex() < metadata.partitions) {
1129-
return topics.computeIfAbsent(topicName.toString(), (tpName) ->
1130-
loadOrCreatePersistentTopic(context));
1131-
} else {
1132-
final String errorMsg =
1133-
String.format("Illegal topic partition name %s with max allowed "
1134-
+ "%d partitions", topicName, metadata.partitions);
1135-
log.warn(errorMsg);
1136-
return FutureUtil.failedFuture(
1137-
new BrokerServiceException.NotAllowedException(errorMsg));
1138-
}
1139-
});
1140-
} else {
1141-
return topics.computeIfAbsent(topicName.toString(), (tpName) ->
1142-
loadOrCreatePersistentTopic(context));
1143-
}
1144-
}).thenRun(() -> {
1120+
.thenRun(() -> {
11451121
final var inserted = new MutableBoolean(false);
11461122
final var cachedFuture = topics.computeIfAbsent(topicName.toString(), ___ -> {
11471123
inserted.setTrue();
@@ -1678,9 +1654,32 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional<ClusterData> c
16781654
* loading and puts them into queue once in-process topics are created.
16791655
*/
16801656
protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(TopicLoadingContext context) {
1657+
final var topicName = context.getTopicName();
16811658
final var topic = context.getTopicName().toString();
1659+
final CompletableFuture<Void> ownedFuture;
1660+
if (topicName.isPartitioned()) {
1661+
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
1662+
ownedFuture = fetchPartitionedTopicMetadataAsync(topicNameEntity)
1663+
.thenCompose((metadata) -> {
1664+
// Allow creating non-partitioned persistent topic that name includes
1665+
// `partition`
1666+
if (metadata.partitions == 0
1667+
|| topicName.getPartitionIndex() < metadata.partitions) {
1668+
return checkTopicNsOwnership(topic);
1669+
} else {
1670+
final String errorMsg =
1671+
String.format("Illegal topic partition name %s with max allowed "
1672+
+ "%d partitions", topicName, metadata.partitions);
1673+
log.warn(errorMsg);
1674+
return FutureUtil.failedFuture(
1675+
new BrokerServiceException.NotAllowedException(errorMsg));
1676+
}
1677+
});
1678+
} else {
1679+
ownedFuture = checkTopicNsOwnership(topic);
1680+
}
16821681
final var topicFuture = context.getTopicFuture();
1683-
checkTopicNsOwnership(topic)
1682+
ownedFuture
16841683
.thenRun(() -> {
16851684
final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get();
16861685

0 commit comments

Comments
 (0)