Skip to content

Commit e0c0d5e

Browse files
authored
[feature][txn] Fix individual ack batch message with transaction abort redevlier duplicate messages (#14327)
### Motivation If individual ack batch message with transaction and abort this transaction, we will redeliver this message. but this batch message some bit sit are acked by another transaction and re consume this bit sit will produce `TransactionConflictException`, we don't need to redeliver this bit sit witch is acked by another transaction. if batch have batch size 5 1. txn1 ack 0, 1 the ackSet is 00111 2. txn2 ack 2 3 4 the ack Set is 11000 3. abort txn2 redeliver this position is 00111 4. but now we don't filter txn1 ackSet so redeliver this position bitSet is 111111 ### Modifications When filter the message we should filter the bit sit witch is real ack or in pendingAck state ### Verifying this change add the test
1 parent 66fda61 commit e0c0d5e

7 files changed

Lines changed: 118 additions & 6 deletions

File tree

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,18 @@ public static void andAckSet(PositionImpl currentPosition, PositionImpl otherPos
4545
if (currentPosition == null || otherPosition == null) {
4646
return;
4747
}
48-
BitSetRecyclable thisAckSet = BitSetRecyclable.valueOf(currentPosition.getAckSet());
49-
BitSetRecyclable otherAckSet = BitSetRecyclable.valueOf(otherPosition.getAckSet());
48+
currentPosition.setAckSet(andAckSet(currentPosition.getAckSet(), otherPosition.getAckSet()));
49+
}
50+
51+
//This method is do `and` operation for ack set
52+
public static long[] andAckSet(long[] firstAckSet, long[] secondAckSet) {
53+
BitSetRecyclable thisAckSet = BitSetRecyclable.valueOf(firstAckSet);
54+
BitSetRecyclable otherAckSet = BitSetRecyclable.valueOf(secondAckSet);
5055
thisAckSet.and(otherAckSet);
51-
currentPosition.setAckSet(thisAckSet.toLongArray());
56+
long[] ackSet = thisAckSet.toLongArray();
5257
thisAckSet.recycle();
5358
otherAckSet.recycle();
59+
return ackSet;
5460
}
5561

5662
//This method is compare two position which position is bigger than another one.

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
2122
import io.netty.buffer.ByteBuf;
2223
import io.prometheus.client.Gauge;
2324
import java.util.ArrayList;
@@ -37,8 +38,10 @@
3738
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
3839
import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
3940
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
41+
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
4042
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
4143
import org.apache.pulsar.broker.service.plugin.EntryFilter;
44+
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
4245
import org.apache.pulsar.client.api.transaction.TxnID;
4346
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
4447
import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -217,8 +220,28 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
217220
batchSizes.setBatchSize(i, batchSize);
218221
long[] ackSet = null;
219222
if (indexesAcks != null && cursor != null) {
223+
PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
220224
ackSet = cursor
221-
.getDeletedBatchIndexesAsLongArray(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
225+
.getDeletedBatchIndexesAsLongArray(position);
226+
// some batch messages ack bit sit will be in pendingAck state, so don't send all bit sit to consumer
227+
if (subscription instanceof PersistentSubscription
228+
&& ((PersistentSubscription) subscription)
229+
.getPendingAckHandle() instanceof PendingAckHandleImpl) {
230+
PositionImpl positionInPendingAck =
231+
((PersistentSubscription) subscription).getPositionInPendingAck(position);
232+
// if this position not in pendingAck state, don't need to do any op
233+
if (positionInPendingAck != null) {
234+
if (positionInPendingAck.hasAckSet()) {
235+
// need to or ackSet in pendingAck state and cursor ackSet which bit sit has been acked
236+
if (ackSet != null) {
237+
ackSet = andAckSet(ackSet, positionInPendingAck.getAckSet());
238+
} else {
239+
// if actSet is null, use pendingAck ackSet
240+
ackSet = positionInPendingAck.getAckSet();
241+
}
242+
}
243+
}
244+
}
222245
if (ackSet != null) {
223246
indexesAcks.setIndexesAcks(i, Pair.of(batchSize, ackSet));
224247
} else {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1227,6 +1227,9 @@ public Map<String, String> getSubscriptionProperties() {
12271227
return subscriptionProperties;
12281228
}
12291229

1230+
public PositionImpl getPositionInPendingAck(PositionImpl position) {
1231+
return pendingAckHandle.getPositionInPendingAck(position);
1232+
}
12301233
@Override
12311234
public CompletableFuture<Void> updateSubscriptionProperties(Map<String, String> subscriptionProperties) {
12321235
Map<String, String> newSubscriptionProperties;
@@ -1240,7 +1243,6 @@ public CompletableFuture<Void> updateSubscriptionProperties(Map<String, String>
12401243
this.subscriptionProperties = newSubscriptionProperties;
12411244
});
12421245
}
1243-
12441246
/**
12451247
* Return a merged map that contains the cursor properties specified by used
12461248
* (eg. when using compaction subscription) and the subscription properties.

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,17 @@ CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, List<MutablePa
159159
*/
160160
boolean checkIfPendingAckStoreInit();
161161

162+
/**
163+
* If it returns null, it means this Position is not in pendingAck.
164+
* <p>
165+
* If it does not return null, it means this Position is in pendingAck and if it is batch Position,
166+
* it will return the corresponding ackSet in pendingAck
167+
*
168+
* @param position {@link Position} witch need to get in pendingAck
169+
* @return {@link Position} return the position in pendingAck
170+
*/
171+
PositionImpl getPositionInPendingAck(PositionImpl position);
172+
162173
/**
163174
* Get the stats of this message position is in pending ack.
164175
* @param position message position.

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ public boolean checkIfPendingAckStoreInit() {
103103
}
104104

105105
@Override
106+
public PositionImpl getPositionInPendingAck(PositionImpl position) {
107+
return null;
108+
}
106109
public PositionInPendingAckStats checkPositionInPendingAckState(PositionImpl position, Integer batchIndex) {
107110
return null;
108111
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1061,6 +1061,17 @@ public boolean checkIfPendingAckStoreInit() {
10611061
return this.pendingAckStoreFuture != null && this.pendingAckStoreFuture.isDone();
10621062
}
10631063

1064+
@Override
1065+
public PositionImpl getPositionInPendingAck(PositionImpl position) {
1066+
if (individualAckPositions != null) {
1067+
MutablePair<PositionImpl, Integer> positionPair = this.individualAckPositions.get(position);
1068+
if (positionPair != null) {
1069+
return positionPair.getLeft();
1070+
}
1071+
}
1072+
return null;
1073+
}
1074+
10641075
protected void handleCacheRequest() {
10651076
while (true) {
10661077
Runnable runnable = acceptQueue.poll();

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,62 @@ public Object[][] enableBatch() {
120120
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
121121
}
122122

123+
@Test
124+
private void testIndividualAckAbortFilterAckSetInPendingAckState() throws Exception {
125+
final String topicName = NAMESPACE1 + "/testIndividualAckAbortFilterAckSetInPendingAckState";
126+
final int count = 9;
127+
Producer<Integer> producer = pulsarClient
128+
.newProducer(Schema.INT32)
129+
.topic(topicName)
130+
.enableBatching(true)
131+
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
132+
.batchingMaxMessages(count).create();
133+
134+
@Cleanup
135+
Consumer<Integer> consumer = pulsarClient
136+
.newConsumer(Schema.INT32)
137+
.topic(topicName)
138+
.isAckReceiptEnabled(true)
139+
.subscriptionName("test")
140+
.subscriptionType(SubscriptionType.Shared)
141+
.enableBatchIndexAcknowledgment(true)
142+
.subscribe();
143+
144+
for (int i = 0; i < count; i++) {
145+
producer.sendAsync(i);
146+
}
147+
148+
Transaction firstTransaction = getTxn();
149+
150+
Transaction secondTransaction = getTxn();
151+
152+
// firstTransaction ack the first three messages and don't end the firstTransaction
153+
for (int i = 0; i < count / 3; i++) {
154+
consumer.acknowledgeAsync(consumer.receive().getMessageId(), firstTransaction).get();
155+
}
156+
157+
// if secondTransaction abort we only can receive the middle three messages
158+
for (int i = 0; i < count / 3; i++) {
159+
consumer.acknowledgeAsync(consumer.receive().getMessageId(), secondTransaction).get();
160+
}
161+
162+
// consumer normal ack the last three messages
163+
for (int i = 0; i < count / 3; i++) {
164+
consumer.acknowledgeAsync(consumer.receive()).get();
165+
}
166+
167+
// if secondTransaction abort we only can receive the middle three messages
168+
secondTransaction.abort().get();
169+
170+
// can receive 3 4 5 bit sit message
171+
for (int i = 0; i < count / 3; i++) {
172+
assertEquals(consumer.receive().getValue().intValue(), i + 3);
173+
}
174+
175+
// can't receive message anymore
176+
assertNull(consumer.receive(2, TimeUnit.SECONDS));
177+
}
178+
123179
@Test(dataProvider="enableBatch")
124180
private void produceCommitTest(boolean enableBatch) throws Exception {
125181
@Cleanup
@@ -674,7 +730,7 @@ private void txnCumulativeAckTest(boolean batchEnable, int maxBatchSize, Subscri
674730
admin.topics().delete(normalTopic, true);
675731
}
676732

677-
private Transaction getTxn() throws Exception {
733+
public Transaction getTxn() throws Exception {
678734
return pulsarClient
679735
.newTransaction()
680736
.withTransactionTimeout(10, TimeUnit.SECONDS)

0 commit comments

Comments
 (0)