3434import java .util .stream .Collectors ;
3535import lombok .Cleanup ;
3636import lombok .extern .slf4j .Slf4j ;
37+ import org .apache .pulsar .broker .service .SharedPulsarBaseTest ;
3738import org .apache .pulsar .client .impl .BatchMessageIdImpl ;
3839import org .apache .pulsar .client .impl .MessageIdImpl ;
3940import org .apache .pulsar .common .util .FutureUtil ;
4041import org .awaitility .Awaitility ;
41- import org .testng .annotations .AfterClass ;
42- import org .testng .annotations .BeforeClass ;
4342import org .testng .annotations .DataProvider ;
4443import org .testng .annotations .Test ;
4544
4645@ Slf4j
4746@ Test (groups = "broker-api" )
48- public class ClientDeduplicationTest extends ProducerConsumerBase {
47+ public class ClientDeduplicationTest extends SharedPulsarBaseTest {
4948
5049 @ DataProvider
5150 public static Object [][] batchingTypes () {
@@ -55,22 +54,9 @@ public static Object[][] batchingTypes() {
5554 };
5655 }
5756
58- @ BeforeClass
59- @ Override
60- protected void setup () throws Exception {
61- super .internalSetup ();
62- super .producerBaseSetup ();
63- }
64-
65- @ AfterClass (alwaysRun = true )
66- @ Override
67- protected void cleanup () throws Exception {
68- super .internalCleanup ();
69- }
70-
7157 @ Test (priority = -1 )
7258 public void testNamespaceDeduplicationApi () throws Exception {
73- final String namespace = "my-property/my-ns" ;
59+ final String namespace = getNamespace () ;
7460 assertNull (admin .namespaces ().getDeduplicationStatus (namespace ));
7561 admin .namespaces ().setDeduplicationStatus (namespace , true );
7662 Awaitility .await ().untilAsserted (() -> assertTrue (admin .namespaces ().getDeduplicationStatus (namespace )));
@@ -82,8 +68,8 @@ public void testNamespaceDeduplicationApi() throws Exception {
8268
8369 @ Test
8470 public void testProducerSequenceAfterReconnect () throws Exception {
85- final String topic = "persistent://my-property/my-ns/testProducerSequenceAfterReconnect" ;
86- admin .namespaces ().setDeduplicationStatus ("my-property/my-ns" , true );
71+ final String topic = newTopicName () ;
72+ admin .namespaces ().setDeduplicationStatus (getNamespace () , true );
8773
8874 ProducerBuilder <byte []> producerBuilder = pulsarClient .newProducer ().topic (topic )
8975 .producerName ("my-producer-name" );
@@ -113,8 +99,8 @@ public void testProducerSequenceAfterReconnect() throws Exception {
11399
114100 @ Test
115101 public void testProducerSequenceAfterRestart () throws Exception {
116- String topic = "persistent://my-property/my-ns/testProducerSequenceAfterRestart" ;
117- admin .namespaces ().setDeduplicationStatus ("my-property/my-ns" , true );
102+ String topic = newTopicName () ;
103+ admin .namespaces ().setDeduplicationStatus (getNamespace () , true );
118104
119105 ProducerBuilder <byte []> producerBuilder = pulsarClient .newProducer ().topic (topic )
120106 .producerName ("my-producer-name" );
@@ -130,8 +116,8 @@ public void testProducerSequenceAfterRestart() throws Exception {
130116
131117 producer .close ();
132118
133- // Kill and restart broker
134- restartBroker ( );
119+ // Unload topic to force reload of dedup state
120+ admin . topics (). unload ( topic );
135121
136122 producer = producerBuilder .create ();
137123 assertEquals (producer .getLastSequenceId (), 9L );
@@ -147,8 +133,8 @@ public void testProducerSequenceAfterRestart() throws Exception {
147133
148134 @ Test (timeOut = 30000 )
149135 public void testProducerDeduplication () throws Exception {
150- String topic = "persistent://my-property/my-ns/testProducerDeduplication" ;
151- admin .namespaces ().setDeduplicationStatus ("my-property/my-ns" , true );
136+ String topic = newTopicName () ;
137+ admin .namespaces ().setDeduplicationStatus (getNamespace () , true );
152138
153139 // Set infinite timeout
154140 ProducerBuilder <byte []> producerBuilder = pulsarClient .newProducer ().topic (topic )
@@ -180,8 +166,8 @@ public void testProducerDeduplication() throws Exception {
180166 Message <byte []> msg = consumer .receive (1 , TimeUnit .SECONDS );
181167 assertNull (msg );
182168
183- // Kill and restart broker
184- restartBroker ( );
169+ // Unload topic to force reload of dedup state
170+ admin . topics (). unload ( topic );
185171
186172 producer = producerBuilder .create ();
187173 assertEquals (producer .getLastSequenceId (), 2L );
@@ -198,9 +184,8 @@ public void testProducerDeduplication() throws Exception {
198184
199185 @ Test (timeOut = 30000 , dataProvider = "batchingTypes" )
200186 public void testProducerDeduplicationWithDiscontinuousSequenceId (BatcherBuilder batcherBuilder ) throws Exception {
201- String topic = "persistent://my-property/my-ns/testProducerDeduplicationWithDiscontinuousSequenceId-"
202- + System .currentTimeMillis ();
203- admin .namespaces ().setDeduplicationStatus ("my-property/my-ns" , true );
187+ String topic = newTopicName ();
188+ admin .namespaces ().setDeduplicationStatus (getNamespace (), true );
204189
205190 // Set infinite timeout
206191 ProducerBuilder <byte []> producerBuilder =
@@ -244,8 +229,8 @@ public void testProducerDeduplicationWithDiscontinuousSequenceId(BatcherBuilder
244229 assertNull (msg );
245230
246231 producer .close ();
247- // Kill and restart broker
248- restartBroker ( );
232+ // Unload topic to force reload of dedup state
233+ admin . topics (). unload ( topic );
249234
250235 producer = producerBuilder .create ();
251236 assertEquals (producer .getLastSequenceId (), 6L );
@@ -263,8 +248,8 @@ public void testProducerDeduplicationWithDiscontinuousSequenceId(BatcherBuilder
263248
264249 @ Test (timeOut = 30000 )
265250 public void testProducerDeduplicationNonBatchAsync () throws Exception {
266- String topic = "persistent://my-property/my-ns/testProducerDeduplicationNonBatchAsync" ;
267- admin .namespaces ().setDeduplicationStatus ("my-property/my-ns" , true );
251+ String topic = newTopicName () ;
252+ admin .namespaces ().setDeduplicationStatus (getNamespace () , true );
268253
269254 // Set infinite timeout
270255 ProducerBuilder <byte []> producerBuilder = pulsarClient .newProducer ().topic (topic )
@@ -295,8 +280,8 @@ public void testProducerDeduplicationNonBatchAsync() throws Exception {
295280 Message <byte []> msg = consumer .receive (1 , TimeUnit .SECONDS );
296281 assertNull (msg );
297282
298- // Kill and restart broker
299- restartBroker ( );
283+ // Unload topic to force reload of dedup state
284+ admin . topics (). unload ( topic );
300285
301286 producer = producerBuilder .create ();
302287 assertEquals (producer .getLastSequenceId (), 5L );
@@ -313,8 +298,8 @@ public void testProducerDeduplicationNonBatchAsync() throws Exception {
313298
314299 @ Test (timeOut = 30000 )
315300 public void testKeyBasedBatchingOrder () throws Exception {
316- final String topic = "persistent://my-property/my-ns/test-key-based-batching-order" ;
317- admin .namespaces ().setDeduplicationStatus ("my-property/my-ns" , true );
301+ final String topic = newTopicName () ;
302+ admin .namespaces ().setDeduplicationStatus (getNamespace () , true );
318303
319304 final Consumer <String > consumer = pulsarClient .newConsumer (Schema .STRING )
320305 .topic (topic )
@@ -379,13 +364,13 @@ public void testKeyBasedBatchingOrder() throws Exception {
379364
380365 @ Test
381366 public void testUpdateSequenceIdInSyncCodeSegment () throws Exception {
382- final String topic = "persistent://my-property/my-ns/testUpdateSequenceIdInSyncCodeSegment" ;
367+ final String topic = newTopicName () ;
383368 int totalMessage = 200 ;
384369 int threadSize = 5 ;
385- String topicName = "subscription" ;
370+ String subscriptionName = "subscription" ;
386371 @ Cleanup ("shutdownNow" )
387372 ExecutorService executorService = Executors .newFixedThreadPool (threadSize );
388- conf . setBrokerDeduplicationEnabled ( true );
373+ admin . namespaces (). setDeduplicationStatus ( getNamespace (), true );
389374
390375 //build producer/consumer
391376 Producer <byte []> producer = pulsarClient .newProducer ()
@@ -397,7 +382,7 @@ public void testUpdateSequenceIdInSyncCodeSegment() throws Exception {
397382 Consumer <byte []> consumer = pulsarClient .newConsumer ()
398383 .topic (topic )
399384 .subscriptionType (SubscriptionType .Exclusive )
400- .subscriptionName (topicName )
385+ .subscriptionName (subscriptionName )
401386 .subscribe ();
402387
403388 CountDownLatch countDownLatch = new CountDownLatch (threadSize );
0 commit comments