Skip to content

Commit 543a75d

Browse files
authored
[fix][test] Fix flaky ResendRequestTest.testFailoverSingleAckedNormalTopic (#25364)
1 parent 2144968 commit 543a75d

1 file changed

Lines changed: 34 additions & 14 deletions

File tree

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@
2727
import java.util.Random;
2828
import java.util.concurrent.BlockingQueue;
2929
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.atomic.AtomicReference;
3031
import lombok.Cleanup;
3132
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
3233
import org.apache.pulsar.client.api.Consumer;
3334
import org.apache.pulsar.client.api.ConsumerBuilder;
35+
import org.apache.pulsar.client.api.ConsumerEventListener;
3436
import org.apache.pulsar.client.api.Message;
3537
import org.apache.pulsar.client.api.MessageId;
3638
import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -242,22 +244,40 @@ public void testFailoverSingleAckedNormalTopic() throws Exception {
242244
assertNotNull(topicRef);
243245
assertEquals(topicRef.getProducers().size(), 1);
244246

245-
// 2. Create consumer
247+
// 2. Create consumers
248+
// Use ConsumerEventListener to identify which consumer the dispatcher picks as active.
249+
// becameActive may be called once or twice (re-evaluation when consumer-b subscribes).
250+
// We track the latest active consumer and wait until both consumers have been registered.
251+
AtomicReference<Consumer<?>> activeConsumerRef = new AtomicReference<>();
252+
ConsumerEventListener listener = new ConsumerEventListener() {
253+
@Override
254+
public void becameActive(Consumer<?> consumer, int partitionId) {
255+
activeConsumerRef.set(consumer);
256+
}
257+
258+
@Override
259+
public void becameInactive(Consumer<?> consumer, int partitionId) {
260+
}
261+
};
246262
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName)
247263
.subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Failover)
248-
.acknowledgmentGroupTime(0, TimeUnit.SECONDS);
249-
Consumer<byte[]> consumer1 = consumerBuilder.clone().consumerName("consumer-1").subscribe();
250-
Consumer<byte[]> consumer2 = consumerBuilder.clone().consumerName("consumer-2").subscribe();
251-
252-
// Wait for failover consumer assignment to settle so consumer-1 is the active consumer
253-
Awaitility.await().untilAsserted(() -> {
254-
Subscription sub = topicRef.getSubscription(subscriptionName);
255-
assertNotNull(sub);
256-
AbstractDispatcherSingleActiveConsumer dispatcher =
257-
(AbstractDispatcherSingleActiveConsumer) sub.getDispatcher();
258-
assertEquals(dispatcher.getConsumers().size(), 2);
259-
assertEquals(dispatcher.getActiveConsumer().consumerName(), "consumer-1");
260-
});
264+
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).consumerEventListener(listener);
265+
Consumer<byte[]> consumerA = consumerBuilder.clone().consumerName("consumer-a").subscribe();
266+
Consumer<byte[]> consumerB = consumerBuilder.clone().consumerName("consumer-b").subscribe();
267+
268+
// Wait until the active consumer is assigned (becameActive has been called at least once)
269+
Awaitility.await().untilAsserted(() -> assertNotNull(activeConsumerRef.get()));
270+
271+
Consumer<?> activeConsumer = activeConsumerRef.get();
272+
Consumer<byte[]> consumer1; // active
273+
Consumer<byte[]> consumer2; // standby
274+
if (activeConsumer == consumerA) {
275+
consumer1 = consumerA;
276+
consumer2 = consumerB;
277+
} else {
278+
consumer1 = consumerB;
279+
consumer2 = consumerA;
280+
}
261281

262282
// 3. Producer publishes messages
263283
for (int i = 0; i < totalMessages; i++) {

0 commit comments

Comments
 (0)