Skip to content

Commit c2ea6a9

Browse files
coderzclhotari
authored andcommitted
[fix][broker] Avoid recursive update in ConcurrentHashMap during policy cache cleanup (#24939)
(cherry picked from commit 344905f)
1 parent 645f233 commit c2ea6a9

2 files changed

Lines changed: 53 additions & 5 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,7 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
589589
// Read policies in background
590590
.thenAccept(__ -> readMorePoliciesAsync(reader));
591591
});
592-
initFuture.exceptionally(ex -> {
592+
initFuture.exceptionallyAsync(ex -> {
593593
try {
594594
if (closed.get()) {
595595
return null;
@@ -603,7 +603,7 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
603603
namespace, cleanupEx);
604604
}
605605
return null;
606-
});
606+
}, pulsarService.getExecutor());
607607
// let caller know we've got an exception.
608608
return initFuture;
609609
}).thenApply(__ -> true);

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import static org.assertj.core.api.Assertions.assertThat;
2122
import static org.mockito.Mockito.spy;
2223
import static org.testng.AssertJUnit.assertEquals;
2324
import static org.testng.AssertJUnit.assertNotNull;
2425
import static org.testng.AssertJUnit.assertNull;
26+
import java.time.Duration;
2527
import java.util.HashSet;
2628
import java.util.List;
2729
import java.util.Map;
30+
import java.util.Optional;
2831
import java.util.Set;
2932
import java.util.UUID;
3033
import java.util.concurrent.CompletableFuture;
@@ -45,7 +48,6 @@
4548
import org.apache.pulsar.common.policies.data.ClusterData;
4649
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
4750
import org.apache.pulsar.common.policies.data.TopicPolicies;
48-
import org.assertj.core.api.Assertions;
4951
import org.awaitility.Awaitility;
5052
import org.mockito.Mockito;
5153
import org.testng.Assert;
@@ -319,12 +321,12 @@ public void testGetTopicPoliciesWithCleanCache() throws Exception {
319321
spy(new ConcurrentHashMap<TopicName, TopicPolicies>());
320322
FieldUtils.writeDeclaredField(topicPoliciesService, "policiesCache", spyPoliciesCache, true);
321323

322-
Awaitility.await().untilAsserted(() -> Assertions.assertThat(
324+
Awaitility.await().untilAsserted(() -> assertThat(
323325
TopicPolicyTestUtils.getTopicPolicies(topicPoliciesService, TopicName.get(topic))).isNull());
324326

325327
admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
326328
Awaitility.await().untilAsserted(() -> {
327-
Assertions.assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(),
329+
assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(),
328330
TopicName.get(topic))).isNotNull();
329331
});
330332

@@ -421,4 +423,50 @@ public void testPrepareInitPoliciesCacheAsyncWhenNamespaceBeingDeleted() throws
421423
service.prepareInitPoliciesCacheAsync(namespaceName).get();
422424
admin.namespaces().deleteNamespace(NAMESPACE5);
423425
}
426+
427+
@Test
428+
public void testCreateNamespaceEventsSystemTopicFactoryException() throws Exception {
429+
final String namespace = "system-topic/namespace-6";
430+
431+
admin.namespaces().createNamespace(namespace);
432+
433+
TopicName topicName = TopicName.get("persistent", NamespaceName.get(namespace), "topic-1");
434+
435+
SystemTopicBasedTopicPoliciesService service =
436+
Mockito.spy((SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService());
437+
438+
// inject exception when create NamespaceEventsSystemTopicFactory
439+
Mockito.doThrow(new RuntimeException("test exception"))
440+
.doCallRealMethod()
441+
.when(service)
442+
.getNamespaceEventsSystemTopicFactory();
443+
444+
CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture;
445+
Optional<TopicPolicies> topicPoliciesOptional;
446+
try {
447+
topicPoliciesFuture =
448+
service.getTopicPoliciesAsync(topicName, TopicPoliciesService.GetType.LOCAL_ONLY);
449+
topicPoliciesOptional = topicPoliciesFuture.join();
450+
Assert.fail();
451+
} catch (Exception e) {
452+
Assert.assertTrue(e.getCause().getMessage().contains("test exception"));
453+
}
454+
455+
Awaitility.await().untilAsserted(() -> {
456+
assertThat(service.updateTopicPoliciesAsync(topicName, false, false, topicPolicies ->
457+
topicPolicies.setMaxConsumerPerTopic(10)))
458+
.succeedsWithin(Duration.ofSeconds(2));
459+
});
460+
461+
topicPoliciesFuture =
462+
service.getTopicPoliciesAsync(topicName, TopicPoliciesService.GetType.LOCAL_ONLY);
463+
topicPoliciesOptional = topicPoliciesFuture.join();
464+
465+
Assert.assertNotNull(topicPoliciesOptional);
466+
Assert.assertTrue(topicPoliciesOptional.isPresent());
467+
468+
TopicPolicies topicPolicies = topicPoliciesOptional.get();
469+
Assert.assertNotNull(topicPolicies);
470+
Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
471+
}
424472
}

0 commit comments

Comments
 (0)