Skip to content

Commit b662ade

Browse files
committed
[improve][broker] PIP-391: Enable batch index ACK by default
TODO: Fix MLPendingAckStoreTest#testMainProcess
1 parent 7635f3c commit b662ade

23 files changed

Lines changed: 7 additions & 63 deletions

File tree

conf/broker.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -664,7 +664,7 @@ delayedDeliveryFixedDelayDetectionLookahead=50000
664664
delayedDeliveryMaxDelayInMillis=0
665665

666666
# Whether to enable acknowledge of batch local index.
667-
acknowledgmentAtBatchIndexLevelEnabled=false
667+
acknowledgmentAtBatchIndexLevelEnabled=true
668668

669669
# Enable tracking of replicated subscriptions state across clusters.
670670
enableReplicatedSubscriptions=true

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
423423
private long delayedDeliveryMaxDelayInMillis = 0;
424424

425425
@FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index")
426-
private boolean acknowledgmentAtBatchIndexLevelEnabled = false;
426+
private boolean acknowledgmentAtBatchIndexLevelEnabled = true;
427427

428428
@FieldContext(
429429
category = CATEGORY_WEBSOCKET,

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -859,7 +859,6 @@ public void testGetPositionStatsInPendingAckStatsFroBatch() throws Exception {
859859
@Cleanup
860860
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
861861
.subscriptionName(subscriptionName)
862-
.enableBatchIndexAcknowledgment(true)
863862
.subscriptionType(SubscriptionType.Shared)
864863
.isAckReceiptEnabled(true)
865864
.topic(topic)

pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ protected void startBroker() throws Exception {
7575
conf.setConfigurationMetadataStoreUrl("zk:localhost:3181");
7676
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
7777
conf.setBookkeeperClientExposeStatsToPrometheus(true);
78-
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
7978

8079
conf.setBrokerShutdownTimeoutMs(0L);
8180
conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1033,7 +1033,6 @@ private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subTyp
10331033
.isAckReceiptEnabled(true)
10341034
.subscriptionName(subscriptionName)
10351035
.subscriptionType(subType)
1036-
.enableBatchIndexAcknowledgment(true)
10371036
.subscribe();
10381037

10391038
@Cleanup

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest {
6969
@BeforeClass
7070
@Override
7171
protected void setup() throws Exception {
72-
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
7372
super.baseSetup();
7473
}
7574

@@ -87,7 +86,6 @@ public void testBatchMessageAck() {
8786
.subscriptionName(subscriptionName)
8887
.receiverQueueSize(50)
8988
.subscriptionType(SubscriptionType.Shared)
90-
.enableBatchIndexAcknowledgment(true)
9189
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
9290
.subscribe();
9391

@@ -212,7 +210,6 @@ public void testBatchMessageMultiNegtiveAck() throws Exception{
212210
.subscriptionName(subscriptionName)
213211
.subscriptionType(SubscriptionType.Shared)
214212
.receiverQueueSize(10)
215-
.enableBatchIndexAcknowledgment(true)
216213
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
217214
.subscribe();
218215

@@ -254,7 +251,6 @@ public void testBatchMessageMultiNegtiveAck() throws Exception{
254251
.subscriptionName(subscriptionName2)
255252
.subscriptionType(SubscriptionType.Shared)
256253
.receiverQueueSize(10)
257-
.enableBatchIndexAcknowledgment(true)
258254
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
259255
.subscribe();
260256
@Cleanup
@@ -310,7 +306,6 @@ public void testAckMessageWithNotOwnerConsumerUnAckMessageCount() throws Excepti
310306
.isAckReceiptEnabled(true)
311307
.subscriptionName(subName)
312308
.subscriptionType(SubscriptionType.Shared)
313-
.enableBatchIndexAcknowledgment(true)
314309
.subscribe();
315310

316311
@Cleanup
@@ -322,7 +317,6 @@ public void testAckMessageWithNotOwnerConsumerUnAckMessageCount() throws Excepti
322317
.isAckReceiptEnabled(true)
323318
.subscriptionName(subName)
324319
.subscriptionType(SubscriptionType.Shared)
325-
.enableBatchIndexAcknowledgment(true)
326320
.subscribe();
327321

328322
for (int i = 0; i < 5; i++) {
@@ -385,7 +379,6 @@ public void testNegativeAckAndLongAckDelayWillNotLeadRepeatConsume() throws Exce
385379
.subscriptionName(subscriptionName)
386380
.subscriptionType(SubscriptionType.Shared)
387381
.negativeAckRedeliveryDelay(redeliveryDelaySeconds, TimeUnit.SECONDS)
388-
.enableBatchIndexAcknowledgment(true)
389382
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
390383
.acknowledgmentGroupTime(1, TimeUnit.HOURS)
391384
.subscribe();
@@ -461,7 +454,6 @@ public void testMixIndexAndNonIndexUnAckMessageCount() throws Exception {
461454
.subscriptionName("sub")
462455
.subscriptionType(SubscriptionType.Shared)
463456
.acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS)
464-
.enableBatchIndexAcknowledgment(true)
465457
.isAckReceiptEnabled(true)
466458
.subscribe();
467459

@@ -492,7 +484,6 @@ public void testUnAckMessagesWhenConcurrentDeliveryAndAck() throws Exception {
492484
.topic(topicName)
493485
.receiverQueueSize(receiverQueueSize)
494486
.subscriptionName(subName)
495-
.enableBatchIndexAcknowledgment(true)
496487
.subscriptionType(SubscriptionType.Shared)
497488
.isAckReceiptEnabled(true);
498489

@@ -666,7 +657,6 @@ public void testPermitsIfHalfAckBatchMessage() throws Exception {
666657
.topic(topicName)
667658
.receiverQueueSize(receiverQueueSize)
668659
.subscriptionName(subName)
669-
.enableBatchIndexAcknowledgment(true)
670660
.subscriptionType(SubscriptionType.Shared)
671661
.isAckReceiptEnabled(true);
672662

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ protected void setup() throws Exception {
8787
conf.setDefaultRetentionSizeInMB(100);
8888
conf.setDefaultRetentionTimeInMinutes(100);
8989
super.baseSetup();
90-
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
9190
}
9291

9392
@AfterClass(alwaysRun = true)

pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,6 @@ public void ackCommitTest() throws Exception {
283283
.topic(ACK_COMMIT_TOPIC)
284284
.subscriptionName(subscriptionName)
285285
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
286-
.enableBatchIndexAcknowledgment(true)
287286
.subscriptionType(SubscriptionType.Shared)
288287
.subscribe();
289288

@@ -348,7 +347,6 @@ public void ackAbortTest() throws Exception {
348347
.subscriptionName(subscriptionName)
349348
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
350349
.enableBatchIndexAcknowledgment(true)
351-
.subscriptionType(SubscriptionType.Shared)
352350
.subscribe();
353351
Awaitility.await().until(consumer::isConnected);
354352

pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,6 @@ public Consumer<byte[]> getConsumer(String topicName, String subName) throws Pul
397397
.topic(topicName)
398398
.subscriptionName(subName)
399399
.subscriptionType(SubscriptionType.Shared)
400-
.enableBatchIndexAcknowledgment(true)
401400
.subscribe();
402401
}
403402

@@ -1451,9 +1450,6 @@ public void testGetConnectExceptionForAckMsgWhenCnxIsNull() throws Exception {
14511450
public void testPendingAckBatchMessageCommit() throws Exception {
14521451
String topic = NAMESPACE1 + "/testPendingAckBatchMessageCommit";
14531452

1454-
// enable batch index ack
1455-
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
1456-
14571453
@Cleanup
14581454
Producer<byte[]> producer = pulsarClient
14591455
.newProducer(Schema.BYTES)

pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ public void testTransactionBufferLowWaterMark() throws Exception {
117117
.topic(TOPIC)
118118
.subscriptionName("test")
119119
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
120-
.enableBatchIndexAcknowledgment(true)
121120
.subscriptionType(SubscriptionType.Failover)
122121
.subscribe();
123122
final String TEST1 = "test1";
@@ -196,7 +195,6 @@ public void testPendingAckLowWaterMark() throws Exception {
196195
.topic(TOPIC)
197196
.subscriptionName(subName)
198197
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
199-
.enableBatchIndexAcknowledgment(true)
200198
.subscriptionType(SubscriptionType.Failover)
201199
.subscribe();
202200
final String TEST1 = "test1";

0 commit comments

Comments
 (0)